TP Jan 8, 2006: Here is another tiny/toy thread/coroutine/generator package.
Filaments are defined with explicit init, run, and message receive code. Another explicit command starts the filaments, running until either a) all filaments have terminated, or b) all filaments are in a wait state (i.e., deadlocked). If filaments are deadlocked, one of three deadlock resolution types can be specified when the filaments are run:
Filaments run inside their own intepreter. Filaments can send messages to other filaments, sleep, wait for a messages, terminate themselves or others, or execute commands in the top level interpreter.
When creating filaments, an explicit list of variables is used to define which variables should be retained while a filament is not running. These simply become global variables in each filament's interpreter. Filaments that invoke wait, sleep, or kill themselves propogate a specific error message, which is caught by the scheduler.
Scheduling is pretty much done on a random basis, except that filaments that are in the wait state and have a message available for delivery are scheduled in the same iteration.
After Task Scheduler shares many of the same ideas, but uses after to schedule tasks.
############################################################################ # # filament # # A very light weight non-preemptive thread/coroutine/generator package # # Similar to the real Tcl 'thread' package, each filament gets its own # interpreter, and communicates with other filaments by sending messages. # Unlike 'thread', messages are just strings rather than commands, and # explicit 'init', 'run', and 'recv' states have separate code bodies. # Filaments must be in a 'wait' state in order to recieve messages. # # Tom Poindexter, January 2006 # package provide filament 0.1 namespace eval ::filament { variable nextCnt 0 variable status variable messages array set status {} array set messages {} variable msgQueue {} } ############################################################################ # # ::filament::new # # Create a new filament. The filament will begin the the 'run' state, unless # another state is set during initialization. Each filament creates a # slave interp by the same name. # # name - name of the new filament, or "#auto" for a generated name # args - a list of "type-list code" pairs, # # vars var-list a list of variables used by the filament, any other # variables used in code are not retained. # # init code code to be executed on filament creation. # also define proc's for your run & recv code here. # the last command can be 'filament_wait' or # 'filament_sleep' if a beginning state other than 'run' # is required. # # run code code to be executed when a filament is run. defaults to # 'filament_wait' if no run code is defined. # # recv code code to be executed when a message is received, the # variable MESSAGE is set with a two element list of the # sending filament, and the message. defaults to # 'filament_wait' if no run code is defined. # # # A type-list can specify any of "init", "run", or "recv". E.g., if the # code to execute for 'run' and 'recv' are the same, then type-list should # be a list of {run recv}. # # Any uncaught error generated by init, run, or recv will cause the # filament to be terminated. The global variable 'FILAMENT' is set to the # name of the filament. # proc ::filament::new {name args} { variable nextCnt variable status variable messages if {$name eq "#auto"} { set name filament[incr nextCnt] } if {([llength $args] % 2) != 0} { error "'args' must contain list of 'type code' pairs" } if {[info exists status($name)]} { error "a filament with name \"$name\" already exists" } set vars "" set init "" set run filament_wait set recv filament_wait set ok 0 foreach {type codestr} $args { if {[lsearch $type vars] >= 0} { set vars $codestr } if {[lsearch $type init] >= 0} { set init $codestr } if {[lsearch $type run] >= 0} { set run $codestr set ok 1 } if {[lsearch $type recv] >= 0} { set recv $codestr set ok 1 } } if {! $ok} { error "neither 'run' or 'recv' code was defined" } regsub -all "#\[^\n\]*\n" $vars { } vars if {[catch {llength $vars}]} { error "vars not a valid list of variable names" } lappend vars FILAMENT MESSAGE set globalvars [concat [linsert $vars 0 global]] interp create $name interp eval $name [list set FILAMENT $name] interp eval $name [list set MESSAGE {}] set status($name) run set messages($name) "" set runproc "proc RUN TYPE \{\n \ $globalvars \n\ switch \$TYPE [list run $run recv $recv ]\}" interp eval $name $runproc interp alias $name filament_send {} ::filament::send $name interp alias $name filament_wait {} ::filament::wait $name interp alias $name filament_kill {} ::filament::kill $name interp alias $name filament_sleep {} ::filament::sleep $name interp alias $name filament_names {} ::filament::names $name interp alias $name filament_messages {} ::filament::messages $name interp alias $name filament_uplevel0 {} ::filament::uplevel0 $name if {[catch {interp eval $name $init} err]} { if {$err ne "FILAMENT_OK"} { catch {interp delete $name} catch {unset status($name)} catch {unset messages($name)} error "::filament::new: error during $name init:\n$err" } } return $name } ############################################################################ # # ::filament::send # # Implements the 'filament_send' command # # Filaments can send messages to other filaments. filament_send does not # cause the calling filament to yield. The target filament(s) are # added to the msgQueue so that they are serviced before other running # filaments. ::filament::send can also be used by the top level code # to prime initial messages or to re-prime filaments that become # deadlocked. The filament receiving the message has it's 'recv' code # invoked, the global variable MESSAGE contains a list of two elements, # the id of the sending filament, and the message. # # filament_send message ?id? # # message - the message to send # # id - the name of a filament to send. if id is a null string, # or not specified, then the message is sent to all other # filaments, excluding self proc ::filament::send {this message {id ""}} { variable status variable messages variable msgQueue if {![string length $id]} { foreach name [array names status] { if {$name ne $this} { lappend messages($name) [list $this $message] lappend msgQueue $name } } } elseif {![info exists status($id)]} { error "filament $id does not exist" } else { lappend messages($id) [list $this $message] lappend msgQueue $id } } ############################################################################ # # ::filament::wait # # Implements the 'filament_wait' command # # filament_wait # # The calling filament waits until a message is received. Upon receiving a # message, the 'recv' code is executed. filament_wait will cause the # calling filament to yield. The global variable MESSAGE receives a list of # the sending filament id and the message. proc ::filament::wait {this} { variable status set status($this) wait error FILAMENT_OK } ############################################################################ # # ::filament::kill # # Implements the 'filament_kill' command # # Cause a filament to terminate. If a filament is killing itself, the # calling filament will immediately be terminated. If killing another # filament, the calling filament does not yield. # # filament_kill ?id? ?when? # # id - the id to terminate. if not specified, then the calling filament # is termiated. # # when - "waitkill" or "kill". "waitkill" terminates the filament after # any pending messages have been delivered (default). # "kill" terminates the filament immediately. proc ::filament::kill {this {id ""} {when waitkill}} { variable status if {![string length $id]} { set id $this } if {[string length $id] && ![info exists status($id)]} { error "filament $id does not exist" } # when - "kill" or "waitkill" until all messages to filament are delivered switch $when { kill {set status($id) kill} waitkill {set status($id) waitkill} default {error "\"when\" argument \"$when\", must be either \"wait\" or \"kill\""} } if {$this eq $id} { error FILAMENT_OK } } ############################################################################ # # ::filament::sleep # # Implements the 'filament_sleep' command. # # filament_sleep # # The calling filament will yield, and be later rescheduled to run. proc ::filament::sleep {this} { variable status set status($this) sleep error FILAMENT_OK } ############################################################################ # # ::filament::messages # # Implements the 'filament_messages' command. # # filament_messages ?id? # # Returns the number of messages waiting to be delivered. An error is raised # if the filament does not exist. # If id is not specified, then the number of messages for the calling # filament is returned. proc ::filament::messages {this {id ""}} { variable messages if {![string length $id]} { set id $this } if {[info exists messages($id)]} { return [llength $messages($id)] } else { error "filament id \"$id\" does not exists" } } ############################################################################ # # ::filament::names # # Implements the 'filament_names' command. # # filament_names # # Returns a list of the names of all filaments and their status. status # is one of: "run", "wait", "waitkill", "kill", "sleep" # proc ::filament::names {this} { variable status return [array get status] } ############################################################################ # # ::filament::uplevel0 # # Implements the 'filament_uplevel0' command. # # filament_uplevel0 args # # Runs a command as uplevel #0, any value is returned. This can be used # to set/read global variables, run top level procs, access shared files, etc. # proc ::filament::uplevel0 {this args} { return [uplevel #0 $args] } ############################################################################ # # ::filament::doKill # # Kill helper - destroy the filament interpreter, cleanup status & messages # proc ::filament::doKill {name} { variable status variable messages catch {interp delete $name} catch {unset status($name)} catch {unset messages($name)} } ############################################################################ # # ::filament::doRun # # Run helper, type is either 'run' or 'recv'. If an interp throws the # error "FILAMENT_OK", then we'll assume that one of our own. # Any other error thrown by the interp will immediately terminate the # filament and invoke 'bgerror' to report the condition. proc ::filament::doRun {name type} { variable status if { [catch {interp eval $name RUN $type} err] } { if {$err eq "FILAMENT_OK"} { # check if this filament is still valid if {[info exists status($name)]} { # yes, reschedule } } else { # some other error, kill this filament catch {doKill $name} catch {bgerror "\"::filament::run $name $type\", error:\n$err"} } } else { # returned without error return $err } return "" } ############################################################################ # # ::filament::run # # Begin execution of filamants. # # ::filament::run ?deadlock_resolution? ?waitvar? # # Execution will continue until: # # a) no filaments remain, all have been terminated # # b) all filaments are in a 'wait' state and no messages queued (deadlocked) # # optional arguments: # # deadlock_resolution - # "kill" - cause all filaments to be terminated (default) # "vwait" - enter Tcl event loop by executing "vwait $waitvar" # this allows ::filament::send to be used to send # a message to a filament, and run again # "return" - return leaving filaments interps active. it is # possible to send messages to filaments, and rerun # ::filament::run to restart. # # waitvar - the name of the variable for deadlock_resolution method "vwait" # # ::filament::run returns a list of filaments that are deadlocked, if any # proc ::filament::run {{deadlock kill} {waitvar ""}} { variable status variable messages variable msgQueue if {[lsearch {kill vwait return} $deadlock] == -1} { error "\"deadlock\" argument must be \"kill\", \"vwait\", or \"return\"" } if {$deadlock eq "vwait" && ![string length $waitvar]} { error "vwait specified, but \"waitvar\" is null" } while {1} { set ran 0 # get names of all filaments, this list can be modified # during the loop by sending messages, lappending the targets set msgQueue [array names status] while {[llength $msgQueue]} { set name [lindex $msgQueue 0] set msgQueue [lrange $msgQueue 1 end] if {! [info exists status($name)]} { continue } else { set state $status($name) } switch $state { run { doRun $name run set ran 1 } waitkill - wait { set allmessages "" catch {set allmessages $messages($name)} set messages($name) "" foreach msg $allmessages { if {[info exists status($name)]} { # default next state to 'run'. the actual 'recv' # code in the filament may call filament_wait, # filament_sleep, or filament_kill to change # to a different state upon exit. set status($name) run interp eval $name [list set MESSAGE $msg] doRun $name recv set ran 1 } } if {[info exists status($name)]} { # if waitkill, then kill the filament now if {$state eq "waitkill" && \ [llength $messages($name)]==0} { doKill $name } else { catch {interp eval $name [list set MESSAGE {}]} } } } kill { doKill $name } sleep { set status($name) run set ran 1 } } } # if nothing was run and filaments exist, then we must be deadlocked set len [llength [array names status]] if {! $ran && $len} { switch $deadlock { kill { foreach {name state} [array get status] { doKill $name } break } vwait { vwait $waitvar } return { break } } } elseif {! $len} { # all filaments have ended break } } # return the filament names and status return [array names status] }
Here are some simple examples to demostrate Filament.
#################################################################### # an anonymous filament, produces consecutive integers, and # sends that result to two different consumers. terminate after 25 # integers have been produced randomly, the # consumer threads are also killed, randomly with or without pending messages # delivered. # package require filament filament::new #auto vars {i} init { set i 0 } run { incr i puts -nonewline "$FILAMENT running: " if {$i > 25} { set luck [expr rand()] if {$luck > .66} { puts "killing consumers, undelivered messages are lost!" catch {filament_kill "" kill} } elseif {$luck > .33} { puts "killing consumers, pending messages to be delivered" filament_kill consumer_odd filament_kill consumer_even catch {filament_kill busy_bee} } else { puts "ignoring consumers" catch {filament_kill busy_bee} } puts "$FILAMENT killing self" filament_kill } if {$i % 2} { puts "produced $i, sending to consumer_odd" filament_send $i consumer_odd } else { puts "produced $i, sending to consumer_even" filament_send $i consumer_even catch {filament_send "hey! be quiet!" busy_bee} } } # consumer_odd - in running state, immediately wait for a message # to be delivered. filament::new consumer_odd run { puts " consumer_odd is waiting" filament_wait } recv { foreach {from msg} $MESSAGE {break} puts "consumer_odd got message $msg from $from" } # consumer_even - in running state, do some trival work, but every # fifth invocation, wait for a message filament::new consumer_even vars {i} init { set i 0 } run { puts -nonewline " consumer_even is running: " if {[incr i] % 5 == 0} { puts "now waiting for messages" filament_wait } puts "going to sleep" filament_sleep } recv { foreach {from msg} $MESSAGE {break} puts "consumer_even got message $msg from $from" } # busy_bee - a filament that runs and discards any messages # randomly fail to test the error catching code filament::new busy_bee {run recv} { if {rand() < .05} { error "this will raise an error, causing busy_bee to terminate" } puts -nonewline " buzzzzzzzz" if {[filament_messages]} { puts ": has [filament_messages] messages waiting" filament_wait } puts "" } # report background errors proc bgerror {message} { puts "bgerror got: $message" } # start filaments puts "starting trivial filaments\n\n" set final [filament::run return] puts "\n\nfilaments left waiting in deadlock: $final" if {[llength $final]} { puts "" puts "sending an external message to all filaments" # note that the first argument (sending filament id) can be anything filament::send BIG_DADDY external_message filament::run kill } puts "trivial done"
Implement the Language Shootout [L1 ] cheap-concurrency benchmark.
##################################################################### # shootout cheap-concurrency # http://shootout.alioth.debian.org/debian/benchmark.php?test=message&lang=all package require filament # set argv 3000 set N [lindex $argv 0] set Nthreads 500 for {set i 1} {$i < $Nthreads} {incr i} { set next [expr {$i + 1}] filament::new t$i vars {next} init "set next t$next; filament_wait" recv { set x [lindex $MESSAGE 1] filament_send [expr {$x + 1}] $next filament_wait } } filament::new t$i init {filament_wait} recv { set x [lindex $MESSAGE 1] filament_uplevel0 set sum [expr {$x + 1}] filament_wait } proc cheap-concurrency {N} { global sum set sum 0 for {set i 0} {$i < $N} {incr i} { filament::send "" $sum t1 filament::run return } puts $sum } cheap-concurrency $N
A simple demo using vwait to restart deadlocked filaments:
##################################################################### # using vwait to inject messages package require filament filament::new getmsg vars {i} init {set i 0; filament_wait} recv { puts "filament getmsg got $MESSAGE" } run { if {[incr i] > 5} { puts "getmsg: $i done" filament_kill } else { puts "getmsg: $i waiting" filament_wait } } set ::wait_is_over 0 set after_code { filament::send mcp wakeup! set ::wait_is_over 1 after 1000 [set ::after_code] } after 0 $after_code filament::run vwait ::wait_is_over
See also: