DisTcl - Distributed Programming Infrastructure for Tcl

CGM : DisTcl provides a communication channel between client and server processes, which may be distributed over multiple computers. A service has a name and can process certain forms of request. A service may be implemented by multiple server processes and used by multiple client processes. All these processes may be on different machines. redis (or one of the compatible forks) is used to communicate between clients and servers, and to cache results. The excellent Retcl is used to interface with Redis.

  • Each request will be routed to one and only one server for the specified service, while spreading the load evenly across the available servers.
  • Extra server processes can be started or stopped as required depending on load, with no change of configuration needed.
  • The code which computes results can indicate how long those results should be cached for, or not cached at all if they are transient.
  • Client code can use "prefetch" operations to pre-populate the cache with results expected to be needed soon. Note that when only a single client is active, prefetching is the only way to get parallel processing across multiple servers.
  • The Redis command line interface redis-cli can be useful for inspecting cached data and making ad-hoc updates.
  • The communication protocol is very simple and could easily be reimplemented to allow clients and servers in other languages.

I gave a presentation on DisTcl at EuroTcl 2024, the slides are at https://openacs.org/conf2024/info/download/file/DisTcl.pdf . As a demonstration I showed a version of Mandelbrot modified to distribute the computation over multiple machines, giving a significant speed-up. The code for this demo can be found at https://cmacleod.me.uk/tcl/mand/ .

DisTcl is a Tcl implementation of an idea I originally blogged about in https://colin-macleod.blogspot.com/2020/11/fundis-functional-distributed-processing.html . It was conceived while working on applications distributed across massive corporate datacentres. However I think it can also be useful in smaller-scale environments.

I am currently using DisTcl as part of my "Newsgrouper" Usenet-to-Web gateway. There is one service named "ng" which is implemented by up to four back-end "newsgetter" processes. Each of these has an NNTP connection to the usenet server, which allows up to 4 connections in parallel. The functions they support include reading the list of headers for a newsgroup and reading individual articles. The front-end is a tclhttpd webserver which acts as a DisTcl client of the "ng" service.

Here is a minimal server for a demo service called "flub". Note that the response to "up" will be constantly changing so it should not be cached (the default). But the response to the second form of request is stable so we say it should be cached for 5 minutes (300 seconds). Note that if the Redis server is on a remote machine you will need specify its hostname and possibly login details, using the relevant Retcl facilities.

source distcl.tcl

retcl create redis

proc flubbit {what args} {
    if {$what eq "up"} {
        return [exec uptime]
    } elseif {[string is integer $what]} {
        return -secs2keep 300 [string repeat [join $args] $what]
    } else {
        error "Don't know how to '$what'"
    }
}

distcl::serve redis flub flubbit

And here is an interactive client session with that service:

% source distcl.tcl
% retcl create redis
::redis
% distcl::get redis flub up
 15:51:55 up 42 days, 23:29,  3 users,  load average: 0.08, 0.03, 0.01
% distcl::get redis flub 4 abcd.
abcd.abcd.abcd.abcd.
% distcl::get redis flub fizz
Don't know how to 'fizz'
% 

Here is the DisTcl code. A useful enhancement which I have not yet attempted would be to convert sequences of Redis operations into Lua functions to download and execute within Redis - this would be more efficient by reducing the number of communication round-trips and also guaranteeing atomicity of updates.

# DisTcl - Distributed Programming Infrastructure for Tcl

# DisTcl provides a communication channel between clients and named services.
# Each named service can process certain forms of request.  A service may be
# implemented by multiple server processes and used by multiple client
# processes.  All these processes may be on different machines.  Redis
# (or one of the compatible forks) is used to communicate between clients
# and servers, and to cache results.


# Details of operation:
#
# A request to service "abc" for key "def" will proceed as follows:
# The value may be requested by writing def to redis queue q:abc .
# A prefetch may be requested by writing def to redis queue p:abc .
#
# A server reads "def" from q:abc or p:abc, and pushes 0 (for a prefetch)
# or 1 (for a get) onto list r:abc:def. If this list already had an entry,
# this request is already being computed by another server so skip it, otherwise...
#
# The server computes value "ghi" with status s from key "def" .
# The server writes "s:ghi" to the Value cache v:abc:def .
# The server pushes "s:ghi" to the Waitlist w:abc:def as many times as there
# are get requests recorded in r:abc:def .
# Each client which requested the value for def reads it from queue w:abc:def .
# After this, any client which requests this value will read it from v:abc:def .
#
# Each server also monitors an individual control queue z:id through which
# it can be requested to shut down cleanly.

package require retcl

namespace eval distcl {

# Loop serving requests, will continue until told 'stop' via the control queue.
#
# redis - a retcl connection to redis, authenticated if necessary;
# service - name of the service being provided;
# proc - command to call to process the request and return its value.
# id - optional identifier for this service instance.
proc serve {redis service proc {id {}}} {
    set reqqueue q:$service
    set prequeue p:$service
    if {$id eq {}} {set id [pid]}
    set ctlqueue z:$id
    puts stderr "Control queue is '$ctlqueue'"
    set verbose 0

    while 1 {
        # wait for a request to appear on one of the queues
        set qreq [$redis -sync blpop $ctlqueue $reqqueue $prequeue 60]
        if {$qreq eq "(nil)"} {
            # keep things alive?
            continue
        }
        lassign $qreq queue request
        if {$verbose} {puts "QUEUE $queue REQUEST '$request'"}

        # server control request?
        if {$queue eq $ctlqueue} {
            switch -glob -- $request {
                stop break
                v* {set verbose 1}
                q* {set verbose 0}
            }
            continue
        }

        # request is get or prefetch
        set is_get [expr {$queue eq $reqqueue}]
        set runlist r:${service}:$request
        set runcount [$redis -sync rpush $runlist $is_get]
        # is the same request already running on another server?
        if {$runcount > 1} continue

        # call the request processor
        $redis -sync expire $runlist 10
        set status [catch {$proc {*}$request} value options]
        set result ${status}:$value

        # cache the result if an expiry time was specified
        if {[dict exists $options -secs2keep]} {
            set expiry [dict get $options -secs2keep]
            $redis -sync set v:${service}:$request $result ex $expiry
        }

        # push the result to the waitlist for each client waiting
        set requests [$redis -sync lpop $runlist 999]
        set waiters [tcl::mathop::+ {*}$requests]
        if {$waiters} {
            set waitlist w:${service}:$request
            while {$waiters} {
                $redis -sync rpush $waitlist $result
                incr waiters -1
            }
            $redis -sync expire $waitlist 10
        }
    }
}


# Request the data computed by service for these arguments.
#
# redis - a retcl connection to Redis, authenticated if necessary;
# service - name of the service to call;
# args - one or more arguments to pass to the service.
proc get {redis service args} {
    set key v:${service}:$args
    # try to read the data from the cache
    set res [$redis -sync get $key]
    if {$res eq "(nil)"} {
        # data not in cache, send a request for it
        $redis -sync rpush q:$service $args

        # wait for the data to be returned in the waitlist
        set qres [$redis -sync blpop w:${service}:$args 10]
        # if 10 second timeout expired, report error
        if {$qres eq "(nil)"} {error "Request for '$key' timed out."}
        set res [lindex $qres 1]
    }
    # parse the result and return it
    if {[string index $res 1] ne ":"} {error "Malformed result for '$key'."}
    set status [string index $res 0]
    set value [string range $res 2 end]
    return -code $status $value
}

# Request that a data item be precomputed as it will soon be needed.
# We don't wait for the reply, so multiple prefetches can be issued
# and processed in parallel if multiple servers are available.
#
# redis - a retcl connection to Redis, authenticated if necessary;
# service - name of the service to call;
# args - one or more arguments to pass to the service.
proc prefetch {redis service args} {
    set key v:${service}:$args
    # check if it's already cached
    if {! [$redis -sync exists $key]} {
        # not cached, send request to precompute it
        $redis -sync rpush p:$service $args
    }
}

# Remove a previously-computed data item from the cache.
#
# redis - a retcl connection to Redis, authenticated if necessary;
# service - name of the service;
# args - one or more arguments.
proc forget {redis service args} {
    set key v:${service}:$args
    # waitlist could be left behind by a crash, so delete it too
    set waitlist w:${service}:$args
    $redis -sync del $key $waitlist
}

}