Version 4 of playing channels with coroutines

Updated 2009-11-14 11:53:29 by jima

jima (2009-11-14)


The idea.

Seeing lots of stuff lately like tcl coroutines, go goroutines, go channels... I wanted to try to have a coroutine that would act as a buffered communication channel among other coroutines.

Here is my crude first attemp (apologies for crude code quality and for the fact that the script has to be killed abruptly to see what has happened).

The idea, not fully implemented, is that there are two "objects" (two obj coroutines) A and B that have a slot each named just "slot". A wants to write messages to slot and B reads them from slot.

The intermediate channel object CHAN (implemented as a coroutine called lchan for list and channel) has a limit buffer of three messages. When A tries to write for the fourth time it blocks and it does not unblock untill B picks one message from the channel.

The blocking and unblocking mechanism is just to rely on a trace to a global variable.

I have used CGM's co_vwait method (with Peter Spjuth's modification) from coroutine-enabled event handling and the idea of coroutines as simple objects (as discussed by NEM and jcw in Coroutines for event-based programming).

This is similar in spirit to the queues NEM did in A Thread-Safe Message Queue but coroutine based, not thread based.

Hope to be polishing this hack if its promising enough...


The code.

proc co_vwait_callback {coro args} {$coro}

proc co_vwait varname {
 upvar $varname var
 puts waiting($varname,[info coroutine])
 set callback "co_vwait_callback [info coroutine]"
 trace add variable var write $callback 
 yield
 trace remove variable var write $callback
 puts waited($varname,[info coroutine])
 after idle [info coroutine]
 yield
}

proc obj {
 args
} {
 lassign $args VL_T VL_N VL_N_coro
 puts =obj=crea($args)
 while 1 {
  set args [lassign [yield $args] VL_key]
  switch $VL_key {
   in {
    puts =obj=in($args)
    set VL_res [$VL_N_coro [list out {*}$args]]
    puts =obj=in=ret=($VL_res)       
   }
   out {
    puts =obj=out($args)
    set VL_res [$VL_N_coro [list in {*}$args]]
    while {![string equal $VL_res ok]} {
     co_vwait $VL_res
     puts RETRY
     set VL_res [$VL_N_coro [list in {*}$args]]
    }
    puts =obj=out=ret=($VL_res) 
   }
  }
 }
}
proc lchan {
 args
} {
 lassign $args VL_max VL_var
 set VL_buf [list]
 puts =lchan=crea($args)
 while 1 {
  set args [lassign [yield $args] VL_key]
  switch $VL_key {
   in {
    puts =lchan=in($args)
    if {[llength $VL_buf] == $VL_max} {
     puts =lchan=in=BLOCK($args)
     set args $VL_var
    } else {     
     lappend VL_buf $args
     puts =lchan=in=LEN([llength $VL_buf]) 
     set args ok
    }
   }
   out {
    puts =lchan=out($args)
    if {[llength $VL_buf] == $VL_max} {
     puts =lchan=out=UNBLOCK($args)
     set $VL_var {}
    }    
    set args [lindex $VL_buf 0]
    set VL_buf [lrange $VL_buf 1 end]    
   }
  }
 }
}

coroutine A obj out slot CHAN
coroutine B obj in slot CHAN

coroutine CHAN lchan 3 ::VG_CHAN

A {out slot yi}
A {out slot er}
A {out slot san}
A {out slot si}
B {in slot}

vwait forever