Version 3 of Asynchronous Script Evaluation via Coroutines and Channels

Updated 2016-04-07 21:50:34 by pooryorick

Asynchronous Script Evaluation via Coroutines and Channels is a working example of putting the pieces together to harness an interpreter over a channel.

Description

PYK 2016-04-06:

Given a channel with a Tcl interpreter connected to the other side, what would be the minimal scaffolding necessary to effectively use that interpreter as an asynchronous eval mechanism? Conceptually it's a simple enough task, but there were enough details to attend to that it took some time to get right. The little system below is small enough to serve as a working example of event-driven coroutines and channels, and is also an example of the "wire protocol" described in Tool Protocol Language: A sequence of lists separated by newlines. This is a slight twist the format of a standard Tcl script, but it's perfect for the task of moving arbitray data through a channel, and a worthy alternative to netstrings. Wrapping each message as a list containing one item makes it possible for the receiver to use info complete to separate out the messages. Null characters in the data pose no problem. The first item in each list is the message; additional items might be used to transport out-of-band data or for control streams.

There are several other examples of a similar nature, but this one is more of a building block, not tied to sockets like comm is, focused on doing just one thing right. I anticipate using this system to build out a minimal distributed computing system.

To stay out of the way, the system creates no commands or variables in the remote interpreter.

Synopsis

init chan receiver
send chan message

Usage

init takes a read/write channel connected on the other end to an interpreter. This could be an interpreter in the same thread, in another thread, in another process, or on a remote host; init doesn't know and doesn't care, and neither does the rest of the system. The second argument to init is a script to send to the other interpreter to install the receiver on that end. receive generates this script. By default, it configures the other interpreter to evaluate scripts that it receives and returns the results. The default command evaluator, to which the incoming script is appended, is:

namespace eval :: $command

To change this, use the process argument. For example, to simply echo the received script:

init [receive process list]

Or to provide a little more detail in the echo:

init [receive process {list timestamp [clock seconds] script}]

send arranges for a message to be sent over the channel, yields, and returns the result of the remote evaluation.

Implementation

Also available as ycl chan interp.

# Only call init once for an interpreter . Thereafter , use [send [receive
# ...]] to swap receivers .
proc init {chan receive} {
    chan configure $chan -encoding utf-8 -buffering line -blocking 1
    puts $chan $receive
    set response [gets $chan]
    if {[lindex $response 0] ne {ok}} {
        return -code error [list {failed to initialize child Tcl process} \
            {response was} $response]
    }
    chan configure $chan -blocking 0
}

proc receive args {
    dict update args eofclose eofclose input input process process {}
    if {![info exists input]} {
        set input stdin
    }
    if {![info exists eofclose]} {
        set eofclose 0
    }
    if {$eofclose} {
        set eofclose {}
    } else {
        set eofclose {
            if {[eof $chan]} {
                close $chan
                exit
            }
        }
    }
    if {![info exists process]} {
        set process {namespace eval ::}
    }

    # $process and $eofclose are quoted as a lists .
    string map [list @eofclose@ $eofclose @input@ [list $input] @process@ $process] {
        apply {args {
            catch {
                set chan @input@
                chan configure $chan -buffering line -encoding utf-8 -blocking 0 
                chan event $chan readable [list apply [list {chan command} {
                    set count [gets $chan line]

                    if {$count >= 0} {
                        append command $line\n
                        # $command is guaranteed to end in \n , so no need to
                        # add it here .
                        if {[info complete $command]} {

                            # Remove the message from the package .
                            set command [lindex $command 0]

                            if {$command ne {}} {
                                catch {@process@ $command} cres copts
                                set command {}
                                puts [list $cres $copts]
                            }
                        }
                        chan event $chan readable [list {*}[
                            lrange [info level 0] 0 end-1] $command]
                    } 
                    @eofclose@
                } [namespace current]] $chan {}]
                lindex ok
            } cres copts
            puts [list $cres $copts]
        }}

        # Engage infinite improbability drive .
        vwait {
                               o           o
              |\'o.O'=(_/|\'o.O'=(_/|\'o.O'=(_/|\'o.O'=(_/|
              )=U_ \U/| _)=U_ \U/| _)=U _\U /|_)=U _\U /|_)
              ./|\'o.O' o./|\'o.O' o./| \'o.O'o./| \'o.O'o.
              \'o=(___)= \'o=(___)= \'o =(___)=\'o =(___)=\
              =(___)U =O'=(___)U =O'=(___) U=O'=(___) U=O'=
              /|\'o.O'=U_/|\'o.O'=U_/|\'o.O'=U_/|\'o.O'=U_/
                               o           o           
        }
    }
}

proc send {chan message} {
    chan event $chan readable [list [info coroutine]]

    # Package the message as a list for safe transport .
    puts $chan [list $message]

    set response {}

    while 1 {
        yield
        set count [gets $chan moreresponse]
        if {$count < 0} {
            if {[eof $chan]} {
                return -code error [list {eof on reply channel} $chan]
            }
            continue
        } else {
            if {[eof $chan]} {
                append response $moreresponse
            } else {
                append response $moreresponse\n
            }
        }
        if {[info complete $response]} break 
    }

    # Turn the event handler off . Among other things , this allows additional
    # calls to [init] to succeed .
    chan event $chan readable {}

    # The channel should be line-buffered , which isn't often the case
    # for asynchronous channels , but it works for our purposes .
    lassign $response[set response {}] res opts
    dict set opts -chan $chan -message $message
    return -options $opts $res
}

And an example:

apply [list {} {
    while 1 {
        set cmdname [info cmdcount]
        if {[namespace which $cmdname] ne "[namespace current]::$cmdname"} {
            break
        }
    }
    coroutine [info cmdcount] apply [list {} {

        # $chan should remain line-buffered and blocking .
        set chan [open |[list [info nameofexecutable] - 2>@stderr] r+]
        init $chan [receive]

        set response [send $chan {clock seconds}]
        puts [list {clock seconds result} $response {on channel} $chan]

        set response [send $chan {info cmdcount}]
        puts [list {cmdcount result} $response {on channel} $chan]

        set response [send $chan {
            proc responder message {
                return [list {message was} $messsage]
            }
        }]

        set response [send $chan {
            # As with any script , the result of the script is the result of
            # the last command
            lindex {blah blah blah}
            clock seconds
        }]
        puts [list {clock seconds list result} $response {on channel} $chan]


        # Errors propagate back to the mothership .
        catch {send $chan ooga} cres copts

        # Swap out receivers .
        send $chan [receive process {list timestamp [clock seconds] script}]

        set response [send $chan {clock seconds}]
        puts [list {clock seconds list result} $response {on channel} $chan]


        # ... and so on ...
    } [namespace current]]
    vwait forever
} [namespace current]]