coroutine-enabled event handling

See the tcllib packages coroutine and coroutine::auto for nicely polished versions of CGM's code below.


CGM Here are quick-and-dirty versions of coroutine-enabled after, gets and vwait, with a simple test program. This uses the new coroutine facilities in tcl8.6a2.
APN I believe this, or a variant, has been folded into the coroutine module in tcllib.
CGM Yes, the tcllib coroutine module looks very like a cleaned up and expanded version of the code I originally posted here, though that's not mentioned in the source.
AK My apologies for that, I usually am good about giving attribution. Have made a note to self now to fix this.
CGM Thanks Andreas!

When run:

  1. 3 coroutines are started using proc counter which increment and print local counters at different speeds, using co_after for their delays.
  2. 1 coroutine is started using proc count_input which reads lines of input using co_gets, echoes them preceded by line count, then if the input is integer it is assigned to variable ::trigger1 otherwise it's assigned to variable trigger2.
  3. 2 coroutines are started (procs waiter1, waiter2) which use co_vwait to wait in parallel for updates to variables ::trigger1 and ::trigger2 then add/append these to a total and report them.

Note that each coroutine maintains state in its local variables, but they all run effectively in parallel.


 # Helpers lifted from wiki:
 proc spawn cmd {
     set k [gensym]
     coroutine $k {*}$cmd
 }
 proc gensym {{prefix "::coroutine"}} {
     variable gensymid
     return $prefix[incr gensymid]
 }
 #########################################################################
 # co_after delay 
 # - calling coroutine pauses for (at least) delay milliseconds and then 
 # continues, while allowing other events to be processed in the meantime. 
 
 proc co_after ms {
     after $ms [info coroutine]
     yield
 }
 
 proc counter {id ms} {
     set count 0
     while 1 {
         puts "$id [incr count]"
         co_after $ms
     }
 }
 
 spawn {counter A 2000}
 spawn {counter {B  } 3000}
 spawn {counter {C    } 4000}
 
 #########################################################################
 # co_gets channelId 
 # - calling coroutine reads a line from specified channel.  If no data is 
 # available the coroutine waits for data, while allowing other events to 
 # be processed. 
 
 proc co_gets chan {
     # Note: We need a loop here because even if more data became available
     # on the channel it may not be a complete line. As the regular gets
     # blocks until a complete line is present so must we.
     set line ""
     while 1 {
         fileevent $chan readable [info coroutine]
         yield
         fileevent $chan readable {}
         if {[gets $chan line] >= 0 || [eof $chan]} break
     }
     return $line
 }
 
 proc count_input {} {
     set count 0
     while 1 {
         set input [co_gets stdin]
         puts "INPUT LINE [incr count]: $input"
         if {[string is integer -strict $input]} {
             set ::trigger1 $input
         } else {
             set ::trigger2 $input
         }
     }
 }
 
 spawn count_input
 
 #########################################################################
 # co_vwait variable 
 # - calling coroutine waits until the named variable is set and then 
 # continues.  Unlike standard vwait, these calls do not nest, ie. 
 # multiple co::vwaits wait in parallel and can be activated in any order.
 
 proc co_vwait_callback {coro args} {$coro}
 
 proc co_vwait varname {
     upvar $varname var
     set callback "co_vwait_callback [info coroutine]"
     trace add variable var write $callback
     yield
     trace remove variable var write $callback
 }
 
 proc waiter1 {} {
     set total 0
     while 1 {
         co_vwait ::trigger1
         puts "WAITER1 TOTAL: [incr total $::trigger1]"
     }
 }
 
 proc waiter2 {} {
     set buffer {}
     while 1 {
         co_vwait ::trigger2
         puts "WAITER2 BUFFER: [append buffer $::trigger2]"
     }
 }
 
 spawn waiter1
 spawn waiter2
 
 #########################################################################
 
 vwait forever

Peter Spjuth 20090903: I noticed that the coroutine waiting for the variable executed immediately when someone wrote to the variable. Thus the code writing got some strange side effects. To return to a stricter cooperative multitasking, I delayed waking the one waiting until idle time:

 proc co_vwait varname {
    upvar $varname var
    set callback "co_vwait_callback [info coroutine]"
    trace add variable var write $callback
    yield
    trace remove variable var write $callback
    # Delay continuing until idle time, to avoid side effects in the
    # code writing the variable.
    after idle [info coroutine]
    yield
 }

NEM Very nice! I particularly like co_gets: it makes non-blocking I/O just about as convenient as the blocking version.

AK I wonder, should we create a tcl::unsupported::co namespace or other where we can collect these commands ? Could allow us get rid of the 'co_' prefix. For completeness we will need a 'co_read' as well. For polish go the extra mile and make them fully API compatible to the originals. Oh, what do we get from [infoCoroutine], or [info coroutine] if there is no coroutine running ? The empty string? If we can determine whether we are in a coro or not we can implement something which switches the internal behaviour automatically. I.e. regular [gets] outside of a coro, and the coro specific gets if a coro is running. Anyone up4 it ?

(MS sez: note that [info coroutine] returns an FQN, so {} is unambiguously "not in a coroutine": a coroutine named {} returns '::')

AK: Now I have to look at my own pop3 retriever code again, which is fully event-based, and quite difficult to follow. Especially the highlevel control code pulling one message after the other and handling any errors which occur. Inverting that loop into an event chain was murder when I did it a few years ago. As a coro it should be trivial.

AK: Another place we could simplify through a coro would be the spaghetti which is Tcllib's ftp package.

NEM 2008-09-08: I have a bare-bones coroutine ensemble at Coroutines for event-based programming. [info coroutine] (or at least the current version) does return the empty string when not in a coroutine. Putting some this stuff into tcllib would be a good idea (I'll happily volunteer to maintain it). In the short-term, I don't see that coro's could clean up tcllib event based code, as that would obviously limit the code to work only with 8.6+. Easier to structure everything in terms of callbacks and then just allow an easy way to wrap a coroutine around a call to make it look synchronous (as in the http example below).

AK: Definitely agree that this is not a short-term thing. Also agree that having this in Tcllib would be very nice.


NEM Here's another example - asynchronous HTTP requests using a synchronous interface:

proc get url {
    http::geturl $url -command [info coroutine]
    yield
}
proc main {} {
    set t [get https://wiki.tcl-lang.org/4]
    puts [http::data $t]
    http::cleanup $t
}
coroutine m main

ZB When trying to run it with just "m" - it's answering: can't read "state(body)": no such variable

NEM The coroutine is already running, you don't need to invoke it: just start the event loop:

 vwait forever

MHN in c.l.t I recently posted some issues with coroutine::auto, but I feel like this code is better "maintained" here:

#!/usr/bin/env tclsh
package require Tcl 8.6

# wrapper: run this script in a coroutine and wait for that coroutine to exit
if {{} eq [info coroutine]} {
   # make sure that vwait forever is the outer-most event loop: enter it immediately
   after 0 [list coroutine [info script] apply {script {source $script; exit}} [info script]]
   vwait forever
}

# demonstrate the order of execution: helper code
# timeline is a list of integers 1 2 3 ...
global timeline
proc reset {} {
   set ::timeline {}
}
proc verify {} {
   update
   global timeline
   foreach cur $timeline {
      if {[incr i] != $cur} {
         puts stderr "unexpected order of execution ($i != $cur): $timeline"
         return false
      }
   }
   return true
}
reset
trace add variable timeline write [format {apply {args {puts stderr "timeline write ([expr {[clock seconds] - %d}]s): $::timeline"}}} [clock seconds]]

###############################
puts stderr "legacy after"
reset
after 1000 {lappend timeline 3}
lappend timeline 1
after 2000 ;# synchronous after blocks asynchronous after
lappend timeline 2
verify

puts stderr "legacy vwait" ;# from man vwait
reset
after 1000 {
   lappend timeline 2
   vwait b ;# nested vwait blocks ...
   lappend timeline 4
}
after 2000 {
   lappend timeline 3
   set a 10 ;# ... releasing of ...
}
after 3000 {
   # release deadlock
   set b 42
}
lappend timeline 1
vwait a ;# ... the outer vwait
set b 42
lappend timeline 5
verify

puts stderr "legacy update"
reset
after 200 {
   lappend timeline 2
}
after 100 {
   lappend timeline 1
   after 0 {
      lappend timeline 5
   }
}
after 300 {
   lappend timeline 3
}
after 400
after 0 {
   lappend timeline 4
}
verify

# Using coroutine::auto I expect the following without changing any of the above code:
# * after:
#   - timeline lappend changes from 3 1 2 to 2 1 3
# * vwait:
#   - timeline lappend changes from 2 4 3 1 5 to 2 5 3 1 4
#   - no deadlock occurs
# * update:
#   - timeline lappend changes from 2 1 5 3 4 to 3 1 2 4 5 (because of after, not update)
# XXX still need to "hook" all callback code and make it a coroutine
# XXX only works with legacy update!

###############################
package require coroutine::auto
###############################

puts stderr "coroutine after"
reset
after 1000 ::coroutine::util create eval {{
   lappend ::timeline 2
}}
lappend timeline 1
after 2000 ;# after 1000 comes before after 2000
lappend timeline 3
verify

puts stderr "coroutine vwait"
package require coroutine::auto
reset
after 1000 ::coroutine::util create eval {{
   lappend timeline 2
   vwait ::b ;# released just before verify, continued during update in verify
   lappend timeline 5
}}
after 2000 ::coroutine::util create eval {{
   lappend timeline 3
   set ::a 10
}}
lappend timeline 1
vwait ::a ;# released after 2000
lappend timeline 4
set ::b 42
verify

puts stderr "coroutine update"
reset
after 200 {
   lappend timeline 3
}
after 100 {
   lappend timeline 1
   after 0 {
      lappend timeline 2
   }
}
after 300 {
   lappend timeline 4
}
after 400
after 0 {
   lappend timeline 5
}
verify

MHN when running this script, we can see that wrapped update and wrapped vwait don't play nicely together. I tracked it down to the decoupling by Peter Spjuth: ::coroutine::util::update queues "after 0" to resume. On the other hand, vwait queues another "after idle" after that ...

Therefore ::coroutine::util::update does not "bring the application “up to date” by entering the event loop repeatedly until all pending events (including idle callbacks) have been processed", but seems to only process events that have been registered before update was called.


CGM Here is another little hack I came up with recently to handle multiple concurrent Expect sessions, each in its own coroutine. I needed this for a script which logs in to multiple remote systems and collects information by running a sequence of commands on each. Some of these systems may respond slowly or not at all, so it's important to save time by querying them in parallel, which can be done with expect_background. But without coroutines, the state of each connection needs to be managed via global arrays indexed on spawn_id. This becomes painfully complex when a sequence of commands need to be run on each system, with later commands depending on the results of earlier ones. Running a coroutine for each connection makes this much simpler.

Finding a general way of integrating Expect with coroutines is difficult, not only because the expect command has many more options than after, gets and vwait, but because it is a flow-control command. Also the control of execution flow needs to happen in the connection-specific coroutine, which is not the coroutine that expect_background will be triggered in. My workaround for this is to have the actual expect_background command yield an index indicating which expected pattern matched, then the connection-specific coroutine can do a switch/while/if on this index.

So I have written a simplified co_expect which can be called from a coroutine handling one connection. This sets up expect_background handling to resume the same coroutine when one of a set of patterns is matched on that connection. The coroutine then yields to wait for input, and the return value from the yield is an index indicating which of the specified patterns was matched.

 ##########################################################################
 # co_expect spawn_id ?regex regex regex ...?
 # - set up coroutine-enabled Expect processing for the specified spawn_id.
 # The other arguments are regex patterns to be looked for on the spawn_id.
 # This call does not block, the calling coroutine should yield to wait for
 # input.  When matching input arrives, the yield will return the index of
 # the matched pattern, i.e. 0 for the first pattern, 1 for second, etc.
 
 proc co_expect {sid args} {
 
     set expect_args [list -i $sid]
     set pos -1
     foreach pattern $args {
         lappend expect_args -re $pattern [list [info coroutine] [incr pos]]
     }
 
     after idle expect_background $expect_args
 }
 

Here is one example of how it can be used, collecting a list of file names and then grepping for a pattern in those files. Multiple coroutines can run this code in parallel, one for each remote system being queried. In each co_expect call, op1> is the system prompt that will indicate that the command finished, returning index 0 to terminate the while loop, and the second pattern is to match and extract the required output. Then add_match is code to display the results in a common GUI.

    exp_send "ls -1 --color=never $::filepat\n"

    set files {}

    co_expect $spawn_id op1> {\n([^\r]+)\r}
    while {[yield]} {
        set file $::expect_out(1,string)
        lappend files $file
    }

    foreach file $files {
        exp_send "gzgrep -i -n '$::target' $file | cut -c-1000\n"

        co_expect $spawn_id op1> {\n(\d+):([^\r]+)\r}
        while {[yield]} {
            set line $::expect_out(1,string)
            set text $::expect_out(2,string)
            add_match $machine $file $line $text
        }
    }

CGM Update 2021-05-27: My presentation about co_expect from EuroTcl 2019 eventually got posted at https://www.youtube.com/watch?v=xjLveEjR72U&list=PLHNnTryxvDnckbpWgFfdSEhDtiB7bzx20&index=14 . Watching this prompted me to revisit the code. I realised that it's not really very difficult to make a more general version, supporting most of the flexibility of the original expect command. So here is my updated, more general co_expect:

package require Expect

# Remove and return first element from the named list
proc lshift name {
    upvar $name list
    set list [lassign $list first]
    return $first
}

# Transform a list of possibly-quoted elements by removing newlines
# between elements but not within elements
proc flatter input {
    foreach line [split $input \n] {
        append lines $line
        if {[info complete $lines]} {
            append lines " "
        } else {
            append lines "\n"
        }
    }
    return $lines
}

# Coroutine-enabled version of the expect command,
# allows other coroutines to execute while waiting for input
proc co_expect args {

    # For the all-braced form of the command,
    # get the usual substitutions done in the caller's scope
    if {[llength $args] == 1} {
        set args [uplevel 1 list [flatter [lindex $args 0]]]
    }
    set resume [info coroutine]
    set timeout $::timeout
    set timeout_action {}
    upvar spawn_id spawn_id
    set expect_args {}
    set cleanup_args {}

    # Mangle the arguments into something we can pass to expect_background.
    # Note that expect_background doesn't support timeouts, so we have to
    # handle those separately.
    # Most options can just be passed through, but actions need to be wrapped
    # in a callback to this coroutine.
    while {[llength $args]} {
        set arg [lshift args]
        switch -glob -- $arg {
            -timeout {set timeout [lshift args]}
            timeout {set timeout_action [lshift args]}
            default {set timeout_action [lshift args]
                     lappend expect_args eof [list $resume $timeout_action]}
            -i {set sid [lshift args]
                    lappend expect_args -i $sid
                    lappend cleanup_args -i $sid}
            -* {lappend expect_args $arg}
            * {lappend expect_args $arg [list $resume [lshift args]]}
        }
    }

    # Link expect_out in the caller's scope to the global one
    uplevel {unset -nocomplain expect_out; global expect_out}
    
    # Setting our expectations
    expect_background {*}$expect_args

    while 1 {
        # Set up timeout processing
        if {$timeout > 0} {
            set ms [expr {$timeout * 1000}]
            set timeout_id [after $ms [list $resume $timeout_action]]
        }
        # Wait for input, when it arrives we will be passed back
        # the action to perform in the caller's scope
        set action [yield]
        if {$timeout > 0} {after cancel $timeout_id}
        set rc [catch {uplevel $action} result]

        # If exp_continue was called, loop back to wait for more input.
        # Note that we do not support the -continue_timer flag
        if {$rc == -101} continue

        # Otherwise cancel background processing for the same spawn ids
        catch [list expect_background {*}$cleanup_args]
        # and return the result from the action we ran
        return -code $rc $result
    }
}

And now a sample program to show how this could be used:

#######################################################################
# Define a simple demo which starts a shell, feeds it a few commands,
# and processes their output.
# Then we start 3 parallel instances of this with different starting delays.
# In a serious application we would probably be spawning remote shells
# to multiple systems.

proc demo delay {
    set name [info coroutine]
    puts "$name STARTED, will delay $delay seconds"
    spawn bash
    co_expect -ex $

    exp_send "echo $name;sleep $delay;date\r"
    co_expect -re {[^\n]*202.} {puts "$name GOT '$expect_out(0,string)'"} \
                timeout {puts "$name TIMED OUT"}

    co_expect -ex $
    exp_send "ls -lart /\r"
    co_expect {
            -re {(\S+) -> (\S+)} {
            puts "$name SYMLINK $expect_out(1,string) HAS TARGET $expect_out(2,string)"
            exp_continue}
        default {puts "$name CROAK"}
        -ex $
    }
    puts "$name DONE"
}

log_user 0
coroutine tom demo 4
coroutine dick demo 8
coroutine harry demo 12

# something else to run in parallel
proc pinger {} {puts PING; after 3000 pinger}

pinger

vwait godot

AMG: See also async for another take on using coroutines to manage I/O with any number of child processes. Async provides various modes for separating or merging stdout and stderr from the child process. I may someday expand it to also handle client and server sockets; the current process's stdin and stdout; communication between coroutines, interpreters, and threads; timed events and timeouts; asynchronous file I/O (chan copy); and other event sources and sinks such as Tk. Also I'm contemplating expanding its architecture to allow for a single get or read call to receive one of any number of event sources rather than only a single child process.