Filament - A very lightweight thread package

TP Jan 8, 2006: Here is another tiny/toy thread/coroutine/generator package.

Filaments are defined with explicit init, run, and message receive code. Another explicit command starts the filaments, running until either a) all filaments have terminated, or b) all filaments are in a wait state (i.e., deadlocked). If filaments are deadlocked, one of three deadlock resolution types can be specified when the filaments are run:

  1. kill - terminate all deadlocked filaments.
  2. vwait - enter Tcl eventloop and wait on a variable, this allows filaments to be restarted with an external messages sent to one or more filaments.
  3. return - keeps filaments in the wait state, allowing an external message to be sent and later re-run.

Filaments run inside their own intepreter. Filaments can send messages to other filaments, sleep, wait for a messages, terminate themselves or others, or execute commands in the top level interpreter.

When creating filaments, an explicit list of variables is used to define which variables should be retained while a filament is not running. These simply become global variables in each filament's interpreter. Filaments that invoke wait, sleep, or kill themselves propogate a specific error message, which is caught by the scheduler.

Scheduling is pretty much done on a random basis, except that filaments that are in the wait state and have a message available for delivery are scheduled in the same iteration.

After Task Scheduler shares many of the same ideas, but uses after to schedule tasks.

 ############################################################################
 #
 # filament
 #
 # A very light weight non-preemptive thread/coroutine/generator package
 #
 # Similar to the real Tcl 'thread' package, each filament gets its own
 # interpreter, and communicates with other filaments by sending messages.
 # Unlike 'thread', messages are just strings rather than commands, and
 # explicit 'init', 'run', and 'recv' states have separate code bodies.
 # Filaments must be in a 'wait' state in order to recieve messages.
 #
 # Tom Poindexter, January 2006
 #

 package provide filament 0.1

 namespace eval ::filament {
     variable nextCnt 0
     variable status
     variable messages
     array set status {}
     array set messages {}
     variable msgQueue {}
 }

 ############################################################################
 #
 # ::filament::new
 #
 # Create a new filament.  The filament will begin the the 'run' state, unless
 # another state is set during initialization.  Each filament creates a
 # slave interp by the same name.
 #
 # name - name of the new filament, or "#auto" for a generated name
 # args - a list of "type-list code" pairs,
 #
 #    vars var-list    a list of variables used by the filament, any other
 #                     variables used in code are not retained.
 #
 #    init code        code to be executed on filament creation.
 #                     also define proc's for your run & recv code here.
 #                     the last command can be 'filament_wait' or
 #                     'filament_sleep' if a beginning state other than 'run'
 #                     is required.
 #
 #    run  code        code to be executed when a filament is run.  defaults to
 #                     'filament_wait' if no run code is defined.
 #
 #    recv code        code to be executed when a message is received, the
 #                     variable MESSAGE is set with a two element list of the
 #                     sending filament, and the message.  defaults to
 #                     'filament_wait' if no run code is defined.
 #
 #
 # A type-list can specify any of "init", "run", or "recv".  E.g., if the
 # code to execute for 'run' and 'recv' are the same, then type-list should
 # be a list of {run recv}.
 #
 # Any uncaught error generated by init, run, or recv will cause the
 # filament to be terminated.  The global variable 'FILAMENT' is set to the
 # name of the filament.
 #

 proc ::filament::new {name args} {
     variable nextCnt
     variable status
     variable messages

     if {$name eq "#auto"} {
         set name filament[incr nextCnt]
     }
     if {([llength $args] % 2) != 0} {
         error "'args' must contain list of 'type code' pairs"
     }
     if {[info exists status($name)]} {
         error "a filament with name \"$name\" already exists"
     }
     set vars ""
     set init ""
     set run  filament_wait
     set recv filament_wait
     set ok 0
     foreach {type codestr} $args {
         if {[lsearch $type vars] >= 0} {
             set vars $codestr
         }
         if {[lsearch $type init] >= 0} {
             set init $codestr
         }
         if {[lsearch $type run] >= 0} {
             set run $codestr
             set ok 1
         }
         if {[lsearch $type recv] >= 0} {
             set recv $codestr
             set ok 1
         }
     }
     if {! $ok} {
         error "neither 'run' or 'recv' code was defined"
     }
     regsub -all "#\[^\n\]*\n" $vars { } vars
     if {[catch {llength $vars}]} {
         error "vars not a valid list of variable names"
     }
     lappend vars FILAMENT MESSAGE
     set globalvars [concat [linsert $vars 0 global]]
     interp create $name
     interp eval $name [list set FILAMENT $name]
     interp eval $name [list set MESSAGE {}]
     set status($name) run
     set messages($name) ""
     set runproc "proc RUN TYPE \{\n \
                 $globalvars \n\
                 switch \$TYPE [list run $run recv $recv ]\}"
     interp eval $name $runproc
     interp alias $name filament_send {} ::filament::send $name
     interp alias $name filament_wait {} ::filament::wait $name
     interp alias $name filament_kill {} ::filament::kill $name
     interp alias $name filament_sleep {} ::filament::sleep $name
     interp alias $name filament_names {} ::filament::names $name
     interp alias $name filament_messages {} ::filament::messages $name
     interp alias $name filament_uplevel0 {} ::filament::uplevel0 $name
     if {[catch {interp eval $name $init} err]} {
         if {$err ne "FILAMENT_OK"} {
             catch {interp delete $name}
             catch {unset status($name)}
             catch {unset messages($name)}
             error "::filament::new: error during $name init:\n$err"
         }
     }
     return $name
 }

 ############################################################################
 #
 # ::filament::send
 #
 # Implements the 'filament_send' command
 #
 # Filaments can send messages to other filaments.  filament_send does not
 # cause the calling filament to yield.  The target filament(s) are
 # added to the msgQueue so that they are serviced before other running
 # filaments.  ::filament::send can also be used by the top level code
 # to prime initial messages or to re-prime filaments that become
 # deadlocked.  The filament receiving the message has it's 'recv' code
 # invoked, the global variable MESSAGE contains a list of two elements,
 # the id of the sending filament, and the message.
 #
 #     filament_send message ?id?
 #
 #     message - the message to send
 #
 #     id - the name of a filament to send.  if id is a null string,
 #          or not specified, then the message is sent to all other
 #          filaments, excluding self

 proc ::filament::send {this message {id ""}} {
     variable status
     variable messages
     variable msgQueue
     if {![string length $id]} {
         foreach name [array names status] {
             if {$name ne $this} {
                 lappend messages($name) [list $this $message]
                 lappend msgQueue $name
             }
         }
     } elseif {![info exists status($id)]} {
         error "filament $id does not exist"
     } else {
         lappend messages($id) [list $this $message]
         lappend msgQueue $id
     }
 }


 ############################################################################
 #
 # ::filament::wait
 #
 # Implements the 'filament_wait' command
 #
 #    filament_wait
 #
 # The calling filament waits until a message is received.  Upon receiving a
 # message, the 'recv' code is executed.  filament_wait will cause the
 # calling filament to yield.  The global variable MESSAGE receives a list of
 # the sending filament id and the message.

 proc ::filament::wait {this} {
     variable status
     set status($this) wait
     error FILAMENT_OK
 }


 ############################################################################
 #
 # ::filament::kill
 #
 # Implements the 'filament_kill' command
 #
 # Cause a filament to terminate.  If a filament is killing itself, the
 # calling filament will immediately be terminated.  If killing another
 # filament, the calling filament does not yield.
 #
 #    filament_kill ?id? ?when?
 #
 #    id - the id to terminate.  if not specified, then the calling filament
 #         is termiated.
 #
 #    when - "waitkill" or "kill".  "waitkill" terminates the filament after
 #         any pending messages have been delivered (default).
 #         "kill" terminates the filament immediately.

 proc ::filament::kill {this {id ""} {when waitkill}} {
     variable status
     if {![string length $id]} {
         set id $this
     }
     if {[string length $id] && ![info exists status($id)]} {
         error "filament $id does not exist"
     }
     # when - "kill" or "waitkill" until all messages to filament are delivered
     switch $when {
         kill     {set status($id) kill}
         waitkill {set status($id) waitkill}
         default {error "\"when\" argument \"$when\",  must be either \"wait\" or \"kill\""}
     }
     if {$this eq $id} {
         error FILAMENT_OK
     }
 }


 ############################################################################
 #
 # ::filament::sleep
 #
 # Implements the 'filament_sleep' command.
 #
 #    filament_sleep
 #
 # The calling filament will yield, and be later rescheduled to run.

 proc ::filament::sleep {this} {
     variable status
     set status($this) sleep
     error FILAMENT_OK
 }


 ############################################################################
 #
 # ::filament::messages
 #
 # Implements the 'filament_messages' command.
 #
 #    filament_messages ?id?
 #
 # Returns the number of messages waiting to be delivered.  An error is raised
 # if the filament does not exist.
 # If id is not specified, then the number of messages for the calling
 # filament is returned.

 proc ::filament::messages {this {id ""}} {
     variable messages
     if {![string length $id]} {
         set id $this
     }
     if {[info exists messages($id)]} {
         return [llength $messages($id)]
     } else {
         error "filament id \"$id\" does not exists"
     }
 }


 ############################################################################
 #
 # ::filament::names
 #
 # Implements the 'filament_names' command.
 #
 #    filament_names
 #
 # Returns a list of the names of all filaments and their status.  status
 # is one of: "run", "wait", "waitkill", "kill", "sleep"
 #

 proc ::filament::names {this} {
     variable status
     return [array get status]
 }


 ############################################################################
 #
 # ::filament::uplevel0
 #
 # Implements the 'filament_uplevel0' command.
 #
 #    filament_uplevel0 args
 #
 # Runs a command as uplevel #0, any value is returned.  This can be used
 # to set/read global variables, run top level procs, access shared files, etc.
 #

 proc ::filament::uplevel0 {this args} {
     return [uplevel #0 $args]
 }



 ############################################################################
 #
 # ::filament::doKill
 #
 # Kill helper - destroy the filament interpreter, cleanup status & messages
 #

 proc ::filament::doKill {name} {
     variable status
     variable messages
     catch {interp delete $name}
     catch {unset status($name)}
     catch {unset messages($name)}
 }


 ############################################################################
 #
 # ::filament::doRun
 #
 # Run helper, type is either 'run' or 'recv'.  If an interp throws the
 # error "FILAMENT_OK", then we'll assume that one of our own.
 # Any other error thrown by the interp will immediately terminate the
 # filament and invoke 'bgerror' to report the condition.

 proc ::filament::doRun {name type} {
     variable status
     if { [catch {interp eval $name RUN $type} err] } {
         if {$err eq "FILAMENT_OK"} {
             # check if this filament is still valid
             if {[info exists status($name)]} {
                 # yes, reschedule
             }
         } else {
             # some other error, kill this filament
             catch {doKill $name}
             catch {bgerror "\"::filament::run $name $type\", error:\n$err"}
         }
     } else {
         # returned without error
         return $err
     }
     return ""
 }


 ############################################################################
 #
 # ::filament::run
 #
 # Begin execution of filamants.
 #
 #   ::filament::run ?deadlock_resolution? ?waitvar?
 #
 # Execution will continue until:
 #
 #   a) no filaments remain, all have been terminated
 #
 #   b) all filaments are in a 'wait' state and no messages queued (deadlocked)
 #
 #   optional arguments:
 #
 #   deadlock_resolution -
 #              "kill" - cause all filaments to be terminated (default)
 #              "vwait" - enter Tcl event loop by executing "vwait $waitvar"
 #                       this allows ::filament::send to be used to send
 #                       a message to a filament, and run again
 #              "return" - return leaving filaments interps active.  it is
 #                       possible to send messages to filaments, and rerun
 #                       ::filament::run to restart.
 #
 #   waitvar - the name of the variable for deadlock_resolution method "vwait"
 #
 # ::filament::run returns a list of filaments that are deadlocked, if any
 #

 proc ::filament::run {{deadlock kill} {waitvar ""}} {
     variable status
     variable messages
     variable msgQueue
     if {[lsearch {kill vwait return} $deadlock] == -1} {
         error "\"deadlock\" argument must be \"kill\", \"vwait\", or \"return\""
     }
     if {$deadlock eq "vwait" && ![string length $waitvar]} {
         error "vwait specified, but \"waitvar\" is null"
     }
     while {1} {
         set ran 0
         # get names of all filaments, this list can be modified
         # during the loop by sending messages, lappending the targets
         set msgQueue [array names status]
         while {[llength $msgQueue]} {
             set name [lindex $msgQueue 0]
             set msgQueue [lrange $msgQueue 1 end]
             if {! [info exists status($name)]} {
                 continue
             } else {
                 set state $status($name)
             }
             switch $state {
                 run {
                     doRun $name run
                     set ran 1
                 }
                 waitkill -
                 wait {
                     set allmessages ""
                     catch {set allmessages $messages($name)}
                     set messages($name) ""
                     foreach msg $allmessages {
                         if {[info exists status($name)]} {
                             # default next state to 'run'. the actual 'recv'
                             # code in the filament may call filament_wait,
                             # filament_sleep, or filament_kill to change
                             # to a different state upon exit.
                             set status($name) run
                             interp eval $name [list set MESSAGE $msg]
                             doRun $name recv
                             set ran 1
                         }
                     }

                     if {[info exists status($name)]} {
                         # if waitkill, then kill the filament now
                         if {$state eq "waitkill" && \
                                         [llength $messages($name)]==0} {
                             doKill $name
                         } else {
                             catch {interp eval $name [list set MESSAGE {}]}
                         }
                     }
                 }
                 kill {
                     doKill $name
                 }
                 sleep {
                     set status($name) run
                     set ran 1
                 }
             }
         }

         # if nothing was run and filaments exist, then we must be deadlocked
         set len [llength [array names status]]
         if {! $ran && $len} {
             switch $deadlock {
                 kill {
                     foreach {name state} [array get status] {
                         doKill $name
                     }
                     break
                 }
                 vwait {
                     vwait $waitvar
                 }
                 return {
                     break
                 }
             }
         } elseif {! $len} {
             # all filaments have ended
             break
         }

     }
     # return the filament names and status
     return [array names status]
 }


Here are some simple examples to demostrate Filament.


 ####################################################################
 # an anonymous filament, produces consecutive integers, and
 # sends that result to two different consumers.  terminate after 25
 # integers have been produced  randomly, the
 # consumer threads are also killed, randomly with or without pending messages
 # delivered.
 #

 package require filament

 filament::new #auto vars {i} init {
     set i 0
 } run {
     incr i
     puts -nonewline "$FILAMENT running: "
     if {$i > 25} {
         set luck [expr rand()]
         if {$luck > .66} {
             puts "killing consumers, undelivered messages are lost!"
             catch {filament_kill "" kill}
         } elseif {$luck > .33}  {
             puts "killing consumers, pending messages to be delivered"
             filament_kill consumer_odd
             filament_kill consumer_even
             catch {filament_kill busy_bee}
         } else {
             puts "ignoring consumers"
             catch {filament_kill busy_bee}
         }
         puts "$FILAMENT killing self"
         filament_kill
     }
     if {$i % 2} {
         puts "produced $i, sending to consumer_odd"
         filament_send $i consumer_odd
     } else {
         puts "produced $i, sending to consumer_even"
         filament_send $i consumer_even
         catch {filament_send "hey! be quiet!" busy_bee}
     }

 }


 # consumer_odd - in running state, immediately wait for a message
 # to be delivered.


 filament::new consumer_odd run {
     puts "    consumer_odd is waiting"
     filament_wait
 } recv {
     foreach {from msg} $MESSAGE {break}
     puts "consumer_odd got message $msg from $from"
 }


 # consumer_even - in running state, do some trival work, but every
 # fifth invocation, wait for a message

 filament::new consumer_even vars {i} init {
     set i 0
 } run {
     puts -nonewline "    consumer_even is running: "
     if {[incr i] % 5 == 0} {
         puts "now waiting for messages"
         filament_wait
     }
     puts "going to sleep"
     filament_sleep
 } recv {
     foreach {from msg} $MESSAGE {break}
     puts "consumer_even got message $msg from $from"
 }


 # busy_bee - a filament that runs and discards any messages
 # randomly fail to test the error catching code

 filament::new busy_bee {run recv} {
     if {rand() < .05} {
         error "this will raise an error, causing busy_bee to terminate"
     }
     puts -nonewline "     buzzzzzzzz"
     if {[filament_messages]} {
         puts ":  has [filament_messages] messages waiting"
         filament_wait
     }
     puts ""
 }

 # report background errors
 proc bgerror {message} {
     puts "bgerror got: $message"
 }

 # start filaments

 puts "starting trivial filaments\n\n"
 set final [filament::run return]
 puts "\n\nfilaments left waiting in deadlock: $final"

 if {[llength $final]} {
     puts ""
     puts "sending an external message to all filaments"
     # note that the first argument (sending filament id) can be anything
     filament::send BIG_DADDY external_message
     filament::run kill
 }
 puts "trivial done"




Implement the Language Shootout [1 ] cheap-concurrency benchmark.

 #####################################################################
 # shootout cheap-concurrency
 # http://shootout.alioth.debian.org/debian/benchmark.php?test=message&lang=all

 package require filament

 # set argv 3000

 set N [lindex $argv 0]
 set Nthreads 500


 for {set i 1} {$i < $Nthreads} {incr i} {
     set next [expr {$i + 1}]
     filament::new t$i vars {next} init "set next t$next; filament_wait" recv {
         set x [lindex $MESSAGE 1]
         filament_send [expr {$x + 1}] $next
         filament_wait
     }
 }
 filament::new t$i init {filament_wait} recv {
     set x [lindex $MESSAGE 1]
     filament_uplevel0 set sum [expr {$x + 1}]
     filament_wait
 }


 proc cheap-concurrency {N} {
     global sum
     set sum 0
     for {set i 0} {$i < $N} {incr i} {
         filament::send "" $sum t1
         filament::run return
     }
     puts $sum
 }

 cheap-concurrency $N

A simple demo using vwait to restart deadlocked filaments:

 #####################################################################
 # using vwait to inject messages

 package require filament

 filament::new getmsg vars {i} init {set i 0; filament_wait} recv {
     puts "filament getmsg got $MESSAGE"
 } run {
     if {[incr i] > 5} {
         puts "getmsg: $i done"
         filament_kill
     } else {
         puts "getmsg: $i waiting"
         filament_wait
     }
 }

 set ::wait_is_over 0
 set after_code {
     filament::send mcp wakeup!
     set ::wait_is_over 1
     after 1000  [set ::after_code]
 }
 after 0 $after_code
 filament::run vwait ::wait_is_over



See also: