playing channels with coroutines

jima (2009-11-14)


The idea.

Seeing lots of stuff lately like tcl coroutines, go goroutines, go channels... I wanted to try to have a coroutine that would act as a buffered communication channel among other coroutines.

The idea, not fully implemented, is that there are two "objects" (two obj coroutines) A and B that have a slot each named just "slot". A wants to write messages to slot and B reads them from slot.

The intermediate channel object CHAN (implemented as a coroutine called lchan for list and channel) has a limit buffer of three messages. When A tries to write for the fourth time it blocks and it does not unblock untill B picks one message from the channel.

The blocking and unblocking mechanism is just to rely on a trace to a global variable.

I have used CGM's co_vwait method (with Peter Spjuth's modification) from coroutine-enabled event handling and the idea of coroutines as simple objects (as discussed by NEM and jcw in Coroutines for event-based programming).

This is similar in spirit to the queues NEM did in A Thread-Safe Message Queue but coroutine based, not thread based.

Below, there is a second version of the code. This version shows a problem I am running into. Namely, the blocking mechanism is somewhat making some messages being lost.


jima (2009-11-15)

Second thoughts...

Well, it seems clear that implementing the objects as coroutines that have an infinite loop where a yield is issued to pass the arguments back and forth makes issuing yields inside the method of the object a little bit troublesome, as it is the case with the blocking mechanism.

The yield issued for blocking means passing the control to the outside of the object again, so next message to the object is "lost" as it is not injected in the exact spot (the while yield).

Besides, this kind of channel might be more interesting to coordinate threads not just coroutines (sequentially executed).

In case of being in the need to implement this idea there should be a mechanism to process the calls like there is in the example given by AM in Discrete event modelling with coroutines.


The code.

This is just a helper class to generate code for the to be coroutine objects.

::oo::class create CC_cobj {
 variable {*}{  
  VM_N
  VM_con
  VM_des
  VM_TD_states
  VM_state_ini
 }
 constructor {
 } {
  set VM_N [lindex [split [self] _] end]
  set VM_con {}
  set VM_des {}
 }
 method pM_ini_state {
  VR_N_state
 } {
  if {[dict exists $VM_TD_states $VR_N_state]} {
   set VM_state_ini $VR_N_state
  } else {
   error unknown_state
  }
 }
 method pM_add_con {
  VR_S
 } {
  set VM_con $VR_S
 }
 method pM_add_des {
  VR_S
 } {
  set VM_des $VR_S
 }
 method pM_add_state {
  VR_N_state VR_N_msg VR_S
 } {
  dict set VM_TD_states $VR_N_state $VR_N_msg $VR_S
 } 
 method pM_generate {
 } {
  set VL_TX {}
  append VL_TX [subst -nocommands {
proc $VM_N \{
 args
\} \{
 #Create.}]
  if {[string length $VM_con]} {
   append VL_TX [subst -nocommands {
 $VM_con    
}]    
  }
  append VL_TX [subst -nocommands -novariables {
 set VL_state }]
  append VL_TX $VM_state_ini
  append VL_TX [subst -nocommands -novariables {
 #Live.
 while 1 \{
  set args [lassign [yield $args] VL_msg]
  switch $VL_state \{}]
  dict for {VL_I_k VL_I_v} $VM_TD_states {
   append VL_TX [format [subst -nocommands -novariables {
   %s \{
    switch $VL_msg \{}] $VL_I_k]
   dict for {VL_I_vk VL_I_vv} $VL_I_v {
    append VL_TX [format [subst -nocommands -novariables {
     %s \{
%s
     \}}] $VL_I_vk $VL_I_vv]
   }
   append VL_TX [subst -nocommands -novariables {
    \}}]
  }
  append VL_TX [subst -nocommands -novariables {
   \}
  \}
 \}
 \#Destroy.}]
  if {[string length $VM_des]} {
   append VL_TX [subst -nocommands {
 $VM_des
}]    
  }
 append VL_TX [subst -nocommands -novariables {
\}}]
  return $VL_TX  
 }
}

This is the present blocking mechanism.

proc co_vwait_callback {coro args} {$coro}

proc co_vwait varname {
 upvar $varname var
 puts waiting($varname,[info coroutine])
 set callback "co_vwait_callback [info coroutine]"
 trace add variable var write $callback 
 yield
 trace remove variable var write $callback
 puts waited($varname,[info coroutine])
 after idle [info coroutine]
 yield
}

A and B are coroutines that run the proc xobj. Here is the proc xobj defined with the help of the auxiliary class.

::CC_cobj create ::CO_xobj

::CO_xobj pM_add_con {
 lassign $args VL_T VL_N VL_N_coro
}

::CO_xobj pM_add_state alive in {
 set VL_res [$VL_N_coro [list out {*}$args]] 
}

::CO_xobj pM_add_state alive out {
 set VL_res [$VL_N_coro [list in [lindex $args end]]]
 while {![string equal $VL_res ok]} {
  co_vwait $VL_res
  set VL_res [$VL_N_coro [list in [lindex $args end]]]
 }
}

::CO_xobj pM_ini_state alive
eval [::CO_xobj pM_generate]

CHAN is a coroutine that run the proc lchan. Here is the proc lchan defined with the help of the auxiliary class.

::CC_cobj create ::CO_lchan
::CO_lchan pM_add_con {
 lassign $args VL_max VL_var
 set VL_buf [list]
}

::CO_lchan pM_add_state alive in {
 if {[llength $VL_buf] == $VL_max} {
  set args $VL_var
 } else {     
  lappend VL_buf $args
  puts ($VL_buf) 
  set args ok
 }
}

::CO_lchan pM_add_state alive out {
 if {[llength $VL_buf] == $VL_max} {
  set $VL_var {}
 }    
 set args [lindex $VL_buf 0]
 set VL_buf [lrange $VL_buf 1 end]
 puts ($VL_buf)
}

::CO_lchan pM_ini_state alive
eval [::CO_lchan pM_generate]

And this is the "main". It clearly shows the problem of loosing the A {out slot wu} call.

coroutine A xobj out slot CHAN
coroutine B xobj in slot CHAN

coroutine CHAN lchan 3 ::VG_CHAN

A {out slot yi}
A {out slot er}
A {out slot san}
A {out slot si}
B {in slot}
B {in slot}
A {out slot wu}
B {in slot}

after 1000 set forever done
vwait forever