coroutines and sockets

coroutines can be used to turn blocking-style socket code into non-blocking. Here's a simple example.

First, a blocking-style echo server:

namespace eval echo {
  proc echo {sock host port} {
    while {![eof $sock]} {
      puts $sock [gets $sock]
    }
  }
  proc accept {sock host port} {
    chan configure $sock -blocking 1 -buffering line
    echo $sock $host $port
    close $sock
  }
}

socket -server echo::accept 1234

Because echo uses blocking reads, this server can only service one client at a time. A concurrent version has to use non-blocking sockets, with significant impact on how echo is written:

namespace eval echo {
  proc echo {sock host port} {
    if {[eof $sock]} {
      close $sock
      return
    }
    puts $sock [gets $sock]
  }
  proc accept {sock host port} {
    chan configure $sock -blocking 0 -buffering line
    chan event $sock readable [list [namespace current]::echo $sock $host $port]
  }
}

socket -server echo::accept 1234

The transformation to echo is quite simple, but if it were a more complex protocol it would effectively need to be turned into a state machine, making the code harder to write.

Wouldn't it be nice if we could keep the blocking-style implementation of the connection handler, and yet safely handle multiple clients at once? Coroutines to the rescue!

namespace eval echo {
  proc gets {args} {
    if {[info coroutine] == ""} {
      return [::gets {*}$args]
    } else {
      set sock [lindex $args end]
      while {[set data [::gets $sock]] == ""} {
        if {[eof $sock]} {
          puts "EOF on $sock"
          return -code return {}
        }
        yield
      }
      return $data
    }
  }
  proc echo {sock host port} {
    while {![eof $sock]} {
      puts $sock [gets $sock]
    }
  }
  proc handle {sock host port} {
    puts "connect on $sock (from $host:$port)"
    yield
    echo $sock $host $port
    close $sock
  }
  namespace eval clients {}
  proc accept {sock host port} {
    coroutine clients::$host:$port handle $sock $host $port
    chan configure $sock -blocking 0 -buffering line
    chan event $sock readable [namespace current]::clients::$host:$port
  }
}

socket -server echo::accept 1234
vwait forever

I've cheated a little here by putting echo in its own namespace so that a local version of gets can be provided (taken from co_gets at coroutine-enabled event handling), but notice that the implementation of echo is exactly the same as the original blocking version!

Trying to explain what's going on here in simple terms: when we receive a new connection, we spawn a coroutine for it, naming it clients::$host:$port. The coroutine call in accept invokes handle which immediately yields back to the main coroutine. At this point, clients::$host:$port is a command which will resume the coroutine at whatever point it last yielded. The next command chan event ensures the coroutine will be invoked whenever data becomes available.

The other important detail is the implementation of gets: since the socket is actually in non-blocking mode, ::gets will return an empty string if no data is available. When this is the case we want to put the current coroutine back to sleep and resume on the next chan event .. which is just a call to yield!

Note that if we hit eof on the socket during a call to echo::gets, the current implementation will raise an error back to echo::echo. Returning {} would be more consistent with gets in blocking mode, but (I thought) raising an error gives more opportunity for echo::handle to robustly recover from failures.

The following diagram of control flow helped me to understand:


 main           coro
 ----           ----
 [socket]
 [event loop]
 [accept]
                [handle]
                [yield]
 [event loop]
 [chan event]
                [echo]
                [gets]
                [yield]
 [event loop]
 [chan event]
                [gets]
                [puts]
                [gets]
                [yield]
 [event loop]

The limitation here is of course that gets needs to be replaced .. it would be neater with reflected channels but I couldn't figure out how to make that work.

DKF: You can't yield through the I/O subsystem itself; that would be just too brain-boggling for us to code up at the moment (and likely to be that way for many future moments too). What you do do is to let the redefinitions of the I/O commands to be coroutine-aware be done in a package so that you don't have to repeatedly do that work yourself. A package like this one perhaps…