Version 14 of Asynchronous Script Evaluation via Coroutines and Channels

Updated 2016-07-03 19:25:28 by pooryorick

Asynchronous Script Evaluation via Coroutines and Channels is a working example of putting the pieces together to harness an interpreter over a channel.

Description

PYK 2016-04-06:

Given a channel with a Tcl interpreter connected to the other side, what would be the minimal scaffolding necessary to effectively use that interpreter as an asynchronous eval mechanism? Conceptually it's a simple enough task, but there were enough details to attend to that it took some time to get right. The little system below is small enough to serve as a working example of event-driven coroutines and channels, and is also an example of the "wire protocol" described in Tool Protocol Language: A sequence of lists separated by newlines. This is a slight twist the format of a standard Tcl script, but it's perfect for the task of moving arbitray data through a channel, and a worthy alternative to netstrings. Wrapping each message as a list containing one item makes it possible for the receiver to use info complete to separate out the messages. Null characters in the data pose no problem. The first item in each list is the message; additional items might be used to transport out-of-band data or for control streams.

There are several other examples of a similar nature, but this one is more of a building block, not tied to sockets like comm is, focused on doing just one thing right. I anticipate using this system to build out a minimal distributed computing system.

To stay out of the way, the system creates no commands or variables in the remote interpreter.

Synopsis

init chan receiver
send chan message

Usage

init takes a read/write channel connected on the other end to an interpreter. This could be an interpreter in the same thread, in another thread, in another process, or on a remote host; init doesn't know and doesn't care, and neither does the rest of the system. The second argument to init is a script to send to the other interpreter to install the receiver on that end. receive generates this script. By default, it configures the other interpreter to evaluate scripts that it receives and returns the results. The default command evaluator, to which the incoming script is appended, is:

namespace eval :: $command

To change this, use the process argument. For example, to simply echo the received script:

init $writechan $readchan [accept process list]

Or to provide a little more detail in the echo:

init $writechan $readchan [accept process {list timestamp [clock seconds] script}]

send arranges for a message to be sent over the channel, yields, and returns the result of the remote evaluation.

Demos

interps
Demonstrates the use of ycl chan interp in conjunction with ycl coro relay.

Implementation

Also available as ycl chan interp.

# Only call init once for an interpreter . Thereafter , use [send [accept
# ...]] to swap accept .
proc init {send receive accept} {
    chan configure $send -encoding utf-8 -buffering line -blocking 1
    puts $send $accept
    set response [gets $receive]
    if {[lindex $response 0] ne {ok}} {
        return -code error [list {failed to initialize child Tcl process} \
            {response was} $response]
    }
    chan configure $send -blocking 0
}

proc accept args {
    set args [dict merge {input stdin eofclose 1 output stdout process {namespace eval ::}} $args]
    dict update args eofclose eofclose input input output output process process {}
    if {$eofclose} {
        set eofclose {
            if {[eof $chan]} {
                close $chan
                exit
            }
        }
    } else {
        set eofclose {}
    }

    # $process and $eofclose are quoted as a lists .
    string map [list @eofclose@ $eofclose @input@ [list $input] @output@ [
        list $output] @process@ $process] {
        apply {{} {
            catch {
                set chan @input@
                chan configure @output@ -buffering line
                chan configure $chan -buffering line -encoding utf-8 -blocking 0 
                chan event $chan readable [list apply [list {chan command delay} {
                    set count [gets $chan line]

                    if {$count < 0} {
                        chan event $chan readable {}
                        # ramp down
                        after $delay [list chan event $chan readable [list {*}[
                            lrange [info level 0] 0 end-2] $command [
                                expr {min(1000,$delay + 1 + (
                                    $delay * int(.01)))}]]]
                    } else {
                        append command $line\n
                        # $command is guaranteed to end in \n , so no need to
                        # add it here .
                        if {[info complete $command]} {

                            # Remove the message from the envelope .
                            set command [lindex $command 0]

                            if {$command ne {}} {
                                catch {@process@ $command} cres copts
                                set command {}
                                puts @output@ [list $cres $copts]
                            }
                        }
                        chan event $chan readable [list {*}[
                            lrange [info level 0] 0 end-2] $command 0]
                    }
                    @eofclose@
                } [namespace current]] $chan {} 10]
                lindex ok
            } cres copts
            puts @output@ [list $cres $copts]
            # Engage infinite improbability drive .

            vwait {
                                   o           o
                  |\'o.O'=(_/|\'o.O'=(_/|\'o.O'=(_/|\'o.O'=(_/|
                  )=U_ \U/| _)=U_ \U/| _)=U _\U /|_)=U _\U /|_)
                  ./|\'o.O' o./|\'o.O' o./| \'o.O'o./| \'o.O'o.
                  \'o=(___)= \'o=(___)= \'o =(___)=\'o =(___)=\
                  =(___)U =O'=(___)U =O'=(___) U=O'=(___) U=O'=
                  /|\'o.O'=U_/|\'o.O'=U_/|\'o.O'=U_/|\'o.O'=U_/
                                   o           o           
            }
        }}

    }
}

proc send {send receive message} {
    chan event $receive readable [list [info coroutine]]

    # Envelop the message as a list for safe transport .
    puts $send [list $message]

    set response {}

    while 1 {
        yield
        set count [gets $receive moreresponse]
        if {$count < 0} {
            if {[eof $receive]} {
                return -code error [list {eof on reply channel} $receive]
            }
            continue
        } else {
            if {[eof $receive]} {
                append response $moreresponse
            } else {
                append response $moreresponse\n
            }
        }
        if {[info complete $response]} break 
    }

    # Turn the event handler off . Among other things , this allows additional
    # calls to [init] to succeed .
    chan event $receive readable {}

    # The channel should be line-buffered , which isn't often the case
    # for asynchronous channels , but it works for our purposes .
    lassign $response[set response {}] res opts
    dict set opts -send $send -receive $receive -message $message
    return -options $opts $res
}

And an example:

apply [list {} {
    while 1 {
        set cmdname [info cmdcount]
        if {[namespace which $cmdname] ne "[namespace current]::$cmdname"} {
            break
        }
    }
    coroutine $cmdname apply [list {} {

        # $chan should remain line-buffered and blocking .
        set chan [open |[list [info nameofexecutable] - 2>@stderr] r+]
        init $chan $chan [accept]

        set response [send $chan $chan {clock seconds}]
        puts [list {clock seconds result} $response {on channel} $chan]

        set response [send $chan $chan {info cmdcount}]
        puts [list {cmdcount result} $response {on channel} $chan]

        set response [send $chan $chan {
            proc responder message {
                return [list {message was} $messsage]
            }
        }]

        set response [send $chan $chan {
            # As with any script , the result of the script is the result of
            # the last command
            lindex {blah blah blah}
            clock seconds
        }]
        puts [list {clock seconds list result} $response {on channel} $chan]


        # Errors propagate back to the mothership .
        catch {send $chan $chan {bad command}} cres copts

        # Swap out receivers .
        send $chan $chan [accept process {list timestamp [clock seconds] script}]

        set response [send $chan $chan {clock seconds}]
        puts [list {clock seconds list result} $response {on channel} $chan]


        # ... and so on ...
    } [namespace current]]
    vwait forever
} [namespace current]]