Version 7 of playing channels with coroutines

Updated 2009-11-14 19:09:52 by jima

jima (2009-11-14)


The idea.

Seeing lots of stuff lately like tcl coroutines, go goroutines, go channels... I wanted to try to have a coroutine that would act as a buffered communication channel among other coroutines.

The idea, not fully implemented, is that there are two "objects" (two obj coroutines) A and B that have a slot each named just "slot". A wants to write messages to slot and B reads them from slot.

The intermediate channel object CHAN (implemented as a coroutine called lchan for list and channel) has a limit buffer of three messages. When A tries to write for the fourth time it blocks and it does not unblock untill B picks one message from the channel.

The blocking and unblocking mechanism is just to rely on a trace to a global variable.

I have used CGM's co_vwait method (with Peter Spjuth's modification) from coroutine-enabled event handling and the idea of coroutines as simple objects (as discussed by NEM and jcw in Coroutines for event-based programming).

This is similar in spirit to the queues NEM did in A Thread-Safe Message Queue but coroutine based, not thread based.

This is a second version of the code. This version shows a problem I am running into. Namely, the blocking mechanism is somewhat making some messages being lost.


The code.

This is just a helper class to generate code for the to be coroutine objects.

::oo::class create CC_cobj {
 variable {*}{  
  VM_N
  VM_con
  VM_des
  VM_TD_states
  VM_state_ini
 }
 constructor {
 } {
  set VM_N [lindex [split [self] _] end]
  set VM_con {}
  set VM_des {}
 }
 method pM_ini_state {
  VR_N_state
 } {
  if {[dict exists $VM_TD_states $VR_N_state]} {
   set VM_state_ini $VR_N_state
  } else {
   error unknown_state
  }
 }
 method pM_add_con {
  VR_S
 } {
  set VM_con $VR_S
 }
 method pM_add_des {
  VR_S
 } {
  set VM_des $VR_S
 }
 method pM_add_state {
  VR_N_state VR_N_msg VR_S
 } {
  dict set VM_TD_states $VR_N_state $VR_N_msg $VR_S
 } 
 method pM_generate {
 } {
  set VL_TX {}
  append VL_TX [subst -nocommands {
proc $VM_N \{
 args
\} \{
 #Create.}]
  if {[string length $VM_con]} {
   append VL_TX [subst -nocommands {
 $VM_con    
}]    
  }
  append VL_TX [subst -nocommands -novariables {
 set VL_state }]
  append VL_TX $VM_state_ini
  append VL_TX [subst -nocommands -novariables {
 #Live.
 while 1 \{
  set args [lassign [yield $args] VL_msg]
  switch $VL_state \{}]
  dict for {VL_I_k VL_I_v} $VM_TD_states {
   append VL_TX [format [subst -nocommands -novariables {
   %s \{
    switch $VL_msg \{}] $VL_I_k]
   dict for {VL_I_vk VL_I_vv} $VL_I_v {
    append VL_TX [format [subst -nocommands -novariables {
     %s \{
%s
     \}}] $VL_I_vk $VL_I_vv]
   }
   append VL_TX [subst -nocommands -novariables {
    \}}]
  }
  append VL_TX [subst -nocommands -novariables {
   \}
  \}
 \}
 \#Destroy.}]
  if {[string length $VM_des]} {
   append VL_TX [subst -nocommands {
 $VM_des
}]    
  }
 append VL_TX [subst -nocommands -novariables {
\}}]
  return $VL_TX  
 }
}

This is the present blocking mechanism.

proc co_vwait_callback {coro args} {$coro}

proc co_vwait varname {
 upvar $varname var
 puts waiting($varname,[info coroutine])
 set callback "co_vwait_callback [info coroutine]"
 trace add variable var write $callback 
 yield
 trace remove variable var write $callback
 puts waited($varname,[info coroutine])
 after idle [info coroutine]
 yield
}

A and B are coroutines that run the proc xobj. Here is the proc xobj defined with the help of the auxiliary class.

::CC_cobj create ::CO_xobj

::CO_xobj pM_add_con {
 lassign $args VL_T VL_N VL_N_coro
}

::CO_xobj pM_add_state alive in {
 set VL_res [$VL_N_coro [list out {*}$args]] 
}

::CO_xobj pM_add_state alive out {
 set VL_res [$VL_N_coro [list in [lindex $args end]]]
 while {![string equal $VL_res ok]} {
  co_vwait $VL_res
  set VL_res [$VL_N_coro [list in [lindex $args end]]]
 }
}

::CO_xobj pM_ini_state alive
eval [::CO_xobj pM_generate]

CHAN is a coroutine that run the proc lchan. Here is the proc lchan defined with the help of the auxiliary class.

::CC_cobj create ::CO_lchan
::CO_lchan pM_add_con {
 lassign $args VL_max VL_var
 set VL_buf [list]
}

::CO_lchan pM_add_state alive in {
 if {[llength $VL_buf] == $VL_max} {
  set args $VL_var
 } else {     
  lappend VL_buf $args
  puts ($VL_buf) 
  set args ok
 }
}

::CO_lchan pM_add_state alive out {
 if {[llength $VL_buf] == $VL_max} {
  set $VL_var {}
 }    
 set args [lindex $VL_buf 0]
 set VL_buf [lrange $VL_buf 1 end]
 puts ($VL_buf)
}

::CO_lchan pM_ini_state alive
eval [::CO_lchan pM_generate]

And this is the "main". It clearly shows the problem of loosing the A {out slot wu} call.

coroutine A xobj out slot CHAN
coroutine B xobj in slot CHAN

coroutine CHAN lchan 3 ::VG_CHAN

A {out slot yi}
A {out slot er}
A {out slot san}
A {out slot si}
B {in slot}
B {in slot}
A {out slot wu}
B {in slot}

after 1000 set forever done
vwait forever