[NEM] ''5 Feb 2007'': A message on [comp.lang.tcl] asked if there was an implementation of a message queue for Tcl's threads. A message queue is a [queue] data structure that supports inter-process communication (IPC). The idea is that a producer thread pushes messages onto the queue at one rate and then a consumer thread can remove them as it is ready to process them. The implementation below is synchronous, in that the queue has a fixed size. Any attempt to push a message onto a full queue or to read from an empty queue will cause the calling thread to suspend until the condition is satisfied. This allows for a simple means of coordinating workload between two threads. (In other words, the implementation below is equivalent to a simple ''bounded buffer''). More advanced implementations are possible, such as allowing the consumer to specify a ''pattern'' -- only messages which match the pattern are removed from the queue (similar to [Erlang]'s ''receive'' statement). ''A few minutes later'': V0.2 - fixed a couple of bugs, removed some unnecessary locks. ====== # mqueue.tcl -- # # Simple blocking message queue for Tcl threads. # # Copyright (c) 2007 Neil Madden. # # License: http://www.cs.nott.ac.uk/~nem/license.terms (Tcl-style). package require Tcl 8.4 package require Thread 2.6 package provide mqueue 0.2 namespace eval mqueue { namespace export create destroy push pop proc ::mqueue {subcommand args} { uplevel 1 [linsert $args 0 ::mqueue::$subcommand] } tsv::lock ::mqueue { if {![tsv::exists ::mqueue id]} { tsv::set ::mqueue id 0 } } proc lock {mutex script} { thread::mutex lock $mutex set rc [catch { uplevel 1 $script } ret] thread::mutex unlock $mutex return -code $rc $ret } proc create {{size 1}} { set id [tsv::incr ::mqueue id] set self "::mqueue$id" tsv::set $self mutex [thread::mutex create] tsv::set $self read [thread::cond create] tsv::set $self write [thread::cond create] tsv::set $self size $size tsv::set $self buffer [list] return $self } proc destroy queue { thread::cond destroy [tsv::get $queue read] thread::cond destroy [tsv::get $queue write] thread::mutex destroy [tsv::get $queue mutex] tsv::unset $queue } proc push {queue data} { lock [tsv::get $queue mutex] { while {[tsv::llength $queue buffer] >= [tsv::get $queue size]} { # Full already thread::cond wait [tsv::get $queue write] \ [tsv::get $queue mutex] } tsv::lappend $queue buffer $data thread::cond notify [tsv::get $queue read] } } proc pop queue { lock [tsv::get $queue mutex] { while {[tsv::llength $queue buffer] == 0} { # Empty thread::cond wait [tsv::get $queue read] \ [tsv::get $queue mutex] } set ret [tsv::lpop $queue buffer] thread::cond notify [tsv::get $queue write] } return $ret } } ====== This should be basically thread-safe, except that calling ''destroy'' while the queue is in use will likely result in an error. However, shared state concurrency is hard to do right so a review would be welcome, especially from someone who actually uses the thread package regularly (I don't use it very often). As an example, here is a simple producer/consumer scenario: ====== package require mqueue 0.1 set t [thread::create] thread::send $t { package require mqueue 0.1 proc produce queue { puts "Producer thread starting..." while 1 { puts "Looping" mqueue push $queue "Message: [incr i]" } } } set q [mqueue create 5] thread::send -async $t [list produce $q] while 1 { puts [mqueue pop $q]; after 200 } ====== <> Threads | Data Structure | Interprocess Communication