asynchronous threads

CmcC 2012-05-31: Asynchronous Threads is an OO wrapper to make asynchronous threading over the Thread package a little easier (latest version always here ).

Changelog

pyk 2015-04-08: Small modification to reflect that thread::errorproc is not per-thread, but a thread-global shared configuration. Added a more thorough version of main script check. Also some minor stylistic changes.

Implementation

# Async - asynchronous thread wrapper
if {0 && [info exists argv0] && $argv0 eq [info script]} {
    lappend ::auto_path .
}

package require Thread
#package require Debug
#Debug define thread

oo::class create ::Async {
    # response - process response from next call
    method response {var count op} {
        if {[catch {
            upvar 1 $var result
            variable id 

            # get the async response
            lassign $result($count) code e eo
            unset result($count)

            # get the scripts associated with this response (by $count)
            variable responder
            variable next
            lassign $responder([incr next]) response error
            unset responder($next)

            #Debug.thread {response $next: $var $op -> code:$code e:$e eo:($eo)}

            # invoke the appropriate script to process result
            switch $code {
                return - 2 -
                ok - 0 {
                    if {$response ne {}} {
                        #Debug.thread {DO: $response $e}
                        {*}$response $e
                    } else {
                        #Debug.thread {DO EMPTY}
                    }
                }
                default {
                    if {$error eq {}} {
                        ::return -code $code -options $eo $e
                    } else {
                        {*}$error $code $e $eo
                    }
                }
            }
        } e eo]} {
            puts stderr "ERR: $e $eo"
        }
    }

    # call - asynchronously send call script to thread
    # callback $response on success, $error on error
    method call {call {response {}} {error {}}} {
        variable id
        variable responder
        variable rcount
        set responder([incr rcount]) [list $response $error]

        #Debug.thread {$id call$rcount ($call) response:($response) error:($error)}

        ::thread::send -async $id [list ::_thread::call $call] [namespace current]::waiter($rcount)
    }

    # construct some pass through commands - their use is not generally recommended
    foreach n {preserve release configure exists broadcast join transfer} {
        method $n args [string map [list %N% $n] {
            variable id
            thread::%N% $id {*}$args
        }]
    }

    destructor {
        my release        ;# just delete the thread
    }

    constructor args {
        if {[llength $args]%2} {
            variable script [lindex $args end]
            set args [lrange $args 0 end-1]
        } else {
            variable script {}
        }

        variable prescript {
            namespace eval ::_thread {
                # call - run the script, return the full result
                proc call {script} {
                    list [catch {uplevel #0 $script} e eo] $e $eo
                }
            }
        }

        variable postscript {
            ::thread::wait
        }

        variable {*}$args
        variable next        ;# next expected response
        variable rcount        ;# last sent request

        variable id [::thread::create -preserved $prescript$script$postscript]
        ::thread::configure $id -eventmark 3

        trace add variable [namespace current]::waiter write [list [self] response]
    }
}
if {[info exists argv0] && [
    file dirname [file normalize [info script]/...]] eq [
    file dirname [file normalize $argv0/...]]} {

    # Unit Test

    #Debug on thread

    proc output args {
        puts stderr $args
    }

    proc terror args {
        puts stderr [::thread::id]:$args
    }

    ::thread::errorproc terror
    interp bgerror {} output

    set max 10

    for {set i 0} {$i < $max} {incr i} {
        set thread($i) [Async new {
            proc bgerror args {
                puts stderr [::thread::id]:$args
            }
            interp bgerror {} bgerror
        }]
    }

    after idle {
        time {
            set i [expr {int(rand() * $max)}]
            $thread($i) call ::thread::id [list output [incr count] $i:]
        } 100
    }

    vwait forever
}