Version 4 of A Thread-Safe Message Queue

Updated 2011-10-12 06:29:30 by hae

NEM 5 Feb 2007: A message on comp.lang.tcl asked if there was an implementation of a message queue for Tcl's threads. A message queue is a queue data structure that supports inter-process communication (IPC). The idea is that a producer thread pushes messages onto the queue at one rate and then a consumer thread can remove them as it is ready to process them. The implementation below is synchronous, in that the queue has a fixed size. Any attempt to push a message onto a full queue or to read from an empty queue will cause the calling thread to suspend until the condition is satisfied. This allows for a simple means of coordinating workload between two threads. (In other words, the implementation below is equivalent to a simple bounded buffer). More advanced implementations are possible, such as allowing the consumer to specify a pattern -- only messages which match the pattern are removed from the queue (similar to Erlang's receive statement).

A few minutes later: V0.2 - fixed a couple of bugs, removed some unnecessary locks.

 # mqueue.tcl --
 #
 #       Simple blocking message queue for Tcl threads.
 #
 # Copyright (c) 2007 Neil Madden.
 #
 # License: http://www.cs.nott.ac.uk/~nem/license.terms (Tcl-style).
 
 package require Tcl     8.4
 package require Thread  2.6
 package provide mqueue  0.2
 
 namespace eval mqueue {
     namespace export create destroy push pop
     proc ::mqueue {subcommand args} {
         uplevel 1 [linsert $args 0 ::mqueue::$subcommand]
     }
     tsv::lock ::mqueue {
         if {![tsv::exists ::mqueue id]} {
             tsv::set ::mqueue id 0
         }
     }
 
     proc lock {mutex script} {
         thread::mutex lock $mutex
         set rc [catch { uplevel 1 $script } ret]
         thread::mutex unlock $mutex
         return -code $rc $ret
     }
 
     proc create {{size 1}} {
         set id [tsv::incr ::mqueue id]
         set self "::mqueue$id"
         tsv::set $self mutex  [thread::mutex create]
         tsv::set $self read   [thread::cond  create]
         tsv::set $self write  [thread::cond  create]
         tsv::set $self size   $size
         tsv::set $self buffer [list]
         return $self
     }
 
     proc destroy queue {
         thread::cond  destroy [tsv::get $queue read]
         thread::cond  destroy [tsv::get $queue write]
         thread::mutex destroy [tsv::get $queue mutex]
         tsv::unset $queue
     }
 
     proc push {queue data} {
         lock [tsv::get $queue mutex] {
             while {[tsv::llength $queue buffer] >= [tsv::get $queue size]} {
                 # Full already
                 thread::cond wait [tsv::get $queue write] \
                     [tsv::get $queue mutex]
             }
             tsv::lappend $queue buffer $data
             thread::cond notify [tsv::get $queue read]
         }
     }
 
     proc pop queue {
         lock [tsv::get $queue mutex] {
             while {[tsv::llength $queue buffer] == 0} {
                 # Empty
                 thread::cond wait [tsv::get $queue read] \
                     [tsv::get $queue mutex]
             }
             set ret [tsv::lpop $queue buffer]
             thread::cond notify [tsv::get $queue write]
         }
         return $ret
     }
 }

This should be basically thread-safe, except that calling destroy while the queue is in use will likely result in an error. However, shared state concurrency is hard to do right so a review would be welcome, especially from someone who actually uses the thread package regularly (I don't use it very often).

As an example, here is a simple producer/consumer scenario:

 package require mqueue 0.2
 set t [thread::create]
 thread::send $t {
     package require mqueue 0.2
     proc produce queue {
         puts "Producer thread starting..."
         while 1 {
             puts "Looping"
             mqueue push $queue "Message: [incr i]"
         }
     }
 }
 set q [mqueue create 5]
 thread::send -async $t [list produce $q]
 while 1 { puts [mqueue pop $q]; after 200 }