Before embarking on a project in Ruby threads I was under the impression that it'd be smooth sailing, but that wasn't the case! There are a few gotchas.

This blog post assumes that you're already familiar with using threads in Ruby, and that you know the basic producer/consumer model.

I'm just going to launch into the code and create a basic working queue.

require "thread"
require "colorize"

# A minimal-ish demonstration of a ruby thread queue.
class ThreadPool

  EOQ = :end_of_queue # Marker for the end of the queue

  def initialize
    @queue = SizedQueue.new(100) # Thread-safe queue,
  end                            # blocks push after 100 items

What if your producer creates items quicker than the consumers can do work on them? Using SizedQueue nips this in the bud.

That EOQ (aka 'sentinal') item will be explained shortly.

Now, lets open up all our threads! We're going to have one producer thread and 5 consumer threads. Keeping the consumer threads in an array is easiest for when we need to join them at the end.

  def run
    t1 = Thread.new { producer } # Start the producer in a thread

    t2 = Array.new(5) do |n| # Hold all the consumers in an array
      Thread.new { consumer(n) } # And start them in threads
    end

    t1.join # Wait for the producer to finish
    print "\n! PRODUCER EXITING, pushed #{consumers}\n\n"

    t2.each(&:join) # Wait for all the consumers to finish
  end

Now for the producer! We're just going to loop around 20 times and push a random number onto the queue, then we're going to push the EOQ variable to signify the last item in the queue.


  def producer
    20.times do |i|                 # Just put on 20 items
      payload = (rand * 2).round(2) # Simulate a payload

      print "> pushing ##{i}: #{payload}\n".yellow
      @queue.push payload # Push it onto our queue of items
    end
    @queue.push EOQ       # Push our "EOQ" signal
  end

Now for the gotcha:

Each consumer has to watch for the EOQ object at the end of the queue, but once it's popped it, it has push it _back on again so that the other

  # The consumer pops a payload and does some work, unless
  # it's the EOQ item pushed by the produces. Then we need to put
  # it back on so that the other consumers can see it too.
  def consumer(id)
    loop do                         # Always working
      if (payload = @queue.pop) == EOQ
        @queue.push EOQ
        break
      end
      work(payload, id)
    end
    print ">>CONSUMER #{id} EXITING\n"
  end

  def work(payload, id)
    sleep payload
    print ">>worker #{id} done: #{payload}\n".blue.bold
  end

end

ThreadPool.new.run if $PROGRAM_NAME == __FILE__