Exchanging data between processes

Arjen Markus (5 june 2011) I have been fascinated by the idea of "tuplespaces" for a long time now. The only drawback of the concept is that it is a trifle difficult to implement - due to the generality - in languages like C or Fortran. Here is a scaled-down version that accepts tuples of three elements only - (type, tag, data). The type is used to identify the structure/meaning of the data, the tag can be used as an identification of the particular record.

The data server recognises the following operations:

  • out: the client sends data to the data server to be stored
  • in: the client asks for a data record, identified by type and tag, the server keeps its copy
  • read: the client asks for a data record, identified by type and tag, the data record is removed from the server

The operations in and read can either return immediately or they wait for a suitable record. Records need not to be unique by type and tag, but only the first suitable record is ever returned. You can also ask for a record of non-specific type or tag, as the type and tag in the request are used as masks (an empty type or tag field is treated as "*")

One immediate use of this simplified data server I present below: a queuing system for computational jobs. The idea is this:

  • The data server is running on one computer
  • One or more computers connect to this data server via the program jobserver.tcl. They connect to the server and wait for data records of a particular type
  • You can use the other program, submit.tcl, to submit jobs, to be run immediately or if necessary, to be started only after a certain time of day

submit.tcl collects the data on the job (which user-ID, which queue (a way of partitioning the set of computational servers), time to schedule it, the working directory, the actual command to run) and constructs a data record of type "<platform>-<queue>-submit". The tag is not used and the data part consists of all the other information.

jobserver.tcl requests from the data server a record of the type "<platform>-<queue>-submit", where <platform> is the name of the platform it runs on (Linux, Windows, ...) and <queue> is the queue to which the computer is assigned.

To be really useful as a practical queueing system, more is needed:

  • Properly handle the user-ID
  • Provide means to cancel a (possibly running) job
  • Show what jobs are running and what jobs are waiting

As a proof of concept, this is not bad, I'd say. The code is fairly compact and easy to debug. ''A nice feature is that the system can be used in a heterogeneous environment, that is: Linux, Windows, OSX machines can all participate, as long as the commands scheduled are suitable for the target platfom - and the working directory can be suitably translated.

Possible subsequent steps:

  • Complete the work on the queueing system
  • Use the data server to exchange data between computational programs written in, say, Fortran, which was my initial motivation to experiment with this.

Running the queueing system

To run the queueing system, the following steps are required:

  • Start the data server program
  • Start one or more job servers with the appropriate options so they can find the data server
  • Submit one or more jobs of your choice

It is probably easiest to see what is happening if you run all of them on your local machine first.

Code

Here is the code for the data server:

# dataserver.tcl --
#     Scaled-down version of a tuplespace data server:
#     - Each tuple has the same fields:
#       - The sender
#       - A type (identifying the type of the associated data)
#       - A tag (identifying the data, but it does not need to
#         be unique)
#       - The data themselves - no selection possible on this
#         field, as the contents are treated as opaque
#     - Operations are:
#       out:  the client sends the data to the server
#       in:   the client gets a copy of the data, selection on
#             sender, type and tag (all optional)
#       read: as "in", but the record is removed from the
#             server
#       Both in and read can be waited for or not
#
#     The server listens to port 8181 by default
#
#     Limitations:
#     - If the requests are not specific enough, only the
#       first data record that matches is returned.
#       Perhaps an "allin" method should be added.
#

# DataServer --
#     Namespace for the data server
#
namespace eval ::DataServer {
    variable server
    variable storedData {}
}

# AcceptClient --
#     Accept the incoming request from a client to connect
#
# Arguments:
#     sock       Client socket
#     host       Name of the host the client runs on
#     port       Port number for the client
#
proc ::DataServer::AcceptClient {sock host port} {
    fconfigure $sock -blocking 0
    fileevent $sock readable [list ::DataServer::ReadData $sock]
}

# ReadData --
#     Read the raw data from the client
#
# Arguments:
#     sock       Client socket
#
proc ::DataServer::ReadData {sock} {
    variable incomingData

    if { [catch {gets $sock line} len] || [eof $sock] } {
        catch {
            close $sock
            puts "Client closed connection"
        }
    } elseif { $len >= 0 } {
        foreach {request type tag data} $line {break}

        HandleRequest $sock $request $type $tag $data
    }
}

# HandleRequest --
#     Examine the incoming request and dispatch
#
# Arguments:
#     client      Identifier of the client connection
#     request     Type of request
#     type        Type of the data (empty for any)
#     tag         Tag of the data (empty for any)
#     data        (Optional) the data themselves
#
# Result:
#     None
#
# Side effects:
#     Depend on the request
#
proc ::DataServer::HandleRequest {client request type tag {data {}}} {

    switch -- $request {
        "out" {
            StoreData $client $type $tag $data
        }
        "in" {
            SearchDataAndSend $client $type $tag 0
        }
        "read" {
            SearchDataAndSend $client $type $tag 1
        }
        default {
            SendErrorMesssage "Unknown request: $request"
        }
    }
}

# StoreData --
#     Store the data
#
# Arguments:
#     client      Identifier of the client connection
#     type        Type of the data (empty for any)
#     tag         Tag of the data (empty for any)
#     data        (Optional) the data themselves
#
# Result:
#     None
#
# Side effects:
#     Depend on the request
#
proc ::DataServer::StoreData {client type tag {data {}}} {
    variable storedData

    lappend storedData [list $client $type $tag $data]
}

# SearchDataAndSend --
#     Search the first record of data that fits the request
#     and send the result to the client
#
# Arguments:
#     client      Identifier of the client connection
#     type        Type of the data (empty for any)
#     tag         Tag of the data (empty for any)
#     remove      Remove the data or not
#
# Result:
#     None
#
# Side effects:
#     Send the result to the client, remove the data
#     if requested
#
proc ::DataServer::SearchDataAndSend {client type tag remove} {
    variable storedData

    if { $type == "" } {
        set type "*"
    }
    if { $tag == "" } {
        set tag "*"
    }
    set index -1
    foreach record $storedData {
        incr index
        foreach {client_ type_ tag_ data} $record {break}

        if { [string match $type $type_] && [string match $tag $tag_] } {
            puts $client [list DATA $type_ $tag_ $data]
            break
        }
    }

    if { $index >= 0 && $remove } {
        set storedData [lreplace $storedData $index $index]
    }

    if { $index < 0 } {
        puts $client NONE
    }
    flush $client
}

# startServer --
#     Start up the server
#
# Arguments:
#     argv        The command-line arguments
#
# Result:
#     None
#
# Side effects:
#     Starts the server
#
proc ::DataServer::startServer {argv} {
    variable server

    set port 8181

    foreach {key value} $argv {
        switch -- $key {
            "-port" {
                set port $value
            }
            default {
                # Nothing
            }
        }
    }

    puts "Starting data server ..."
    set server [socket -server ::DataServer::AcceptClient $port]

    vwait forever
}

# main --
#     Start the server
#
::DataServer::startServer $::argv

The jobserver is implemented via this code:

# jobserver.tcl --
#     Program to get jobs from the data server and run them
#
#     Arguments:
#     -host host          The host where the data server is running
#     -port port          Port that the data server is listening to
#     -queue queue        Name of the queue this job server belongs to
#
#     Limitations:
#     - The user that should run the job is ignored
#
#     Notes:
#     - The defaults should come from a configuration file

# DataClient --
#     Namespace for the general procedures
#
namespace eval ::DataClient {
    variable server           ;# Connection to the server
    variable forServer        ;# Wait for server
}

# connect --
#     Connect to the data server
#
# Arguments:
#     host        Name of the host that the server is running on
#     port        Port for the server (8181 by default)
#
proc ::DataClient::connect {host port} {
    variable server

    set server [socket $host $port]

    fileevent $server readable [list ::DataClient::ReadServerData $server]
}

# GetData --
#     Get data from the server
#
# Arguments:
#     request     Type of request (in/read)
#     type        Type of data record
#     tag         Identifier for the data
#     wait        Wait for a record or not
#
proc ::DataClient::GetData {request type tag {wait 0}} {
    variable server
    variable serverData

    while {1} {
        puts $server [list $request $type $tag]
        flush $server

        set serverData {}

        vwait ::DataClient::forServer

        if { ! $wait || $serverData ne "NONE" } {
            break
        }
    }

    return $serverData
}

# in --
#     Get data from the server, but leave them in
#
# Arguments:
#     type        Type of data record
#     tag         Identifier for the data
#     wait        Wait for a record or not
#
proc ::DataClient::in {type tag {wait 0}} {

    return [GetData in $type $tag $wait]
}

# read --
#     Get data from the server, and have them removed
#
# Arguments:
#     type        Type of data record
#     tag         Identifier for the data
#     wait        Wait for a record or not
#
proc ::DataClient::read {type tag {wait 0}} {

    return [GetData read $type $tag $wait]
}

# out --
#     Send data to the server
#
# Arguments:
#     type        Type of data record
#     tag         Identifier for the data
#     data        The data themselves
#
proc ::DataClient::out {type tag data} {
    variable server

    puts $server [list out $type $tag $data]
    flush $server
}

# ReadServerData --
#     Read data from the server
#
# Arguments:
#     server      Server connection
#
proc ::DataClient::ReadServerData {server} {
    variable forServer
    variable serverData

    if { [catch {gets $server serverData} len] || [eof $server] } {
        catch {
            close $server
        }
        set serverData "NONE"
    } elseif { $len >= 0 } {
        set forServer 1
    }
}

# ReadAndRecord --
#     Read the output from the job
#
# Arguments:
#     fin         Channel to the running job
#     fout        Channel to the log file
#
proc ReadAndRecord {fin fout} {
    variable jobEnds

    if { [catch {gets $fin line} len] || [eof $fin] } {
        catch {
            close $fin
        }
        set ::jobEnds 1
    } elseif { $len >= 0 } {
        puts $fout $line
    }
}

# main --
#     Analyse the arguments (if any) and get the job to the
#     data server
#
set host     localhost
set port     8181
set time     0:00
set queue    normal
set platform $::tcl_platform(platform)

set count 0
foreach {key value} $::argv {
    switch -- $key {
        "-host" {
             set host $value
        }
        "-port" {
             set port $value
        }
        "-queue" {
             set queue $value
        }
        default {
             break
        }
    }
    incr count
}

::DataClient::connect $host $port

#
# Start the endless loop of requesting jobs from the
# data server
#
while {1} {
    set jobData [::DataClient::read $platform-$queue-submit 0 1]
    puts "Job data: $jobData"

    foreach {dummy thisQueue tag commandData} $jobData     {break}
    foreach {user time pwd command}           $commandData {break}

    #
    # If the job should not run yet, put it back
    # Question: is this an "honest" algorithm?
    #
    # TODO: update the job record (running, remove from queue altogether)
    #
    set scheduleTime [clock scan $time]
    if { $scheduleTime > [clock seconds] } {
        ::DataClient::out $platform-$queue-submit 0 $jobData
    }

    puts "Starting job: $command (in $pwd)"

    cd $pwd
    set jobChannel [open "|$command" r]
    set jobOutput  [open [lindex $command 0].out[clock seconds] w]
    fileevent $jobChannel readable [list ReadAndRecord $jobChannel $jobOutput]

    vwait jobEnds
    close $jobOutput
}

And the submit program is implemented in this way:

# submit.tcl --
#     Submit a new job
#
#     Arguments:
#     -host host          The host where the data server is running
#     -port port          Port that the data server is listening to
#     -after time         Time of day that the job should start after
#     -queue queue        Name of the queue in which the job is placed
#     -platform platform  Platform that the job is intended for
#
#     All other arguments are treated as being the job that
#     should be run
#
#
#     Limitations:
#     - The user that should run the job is the user of the
#       job server.
#     - No accompanying programs to cancel the job or to get
#       an overview
#
#     Notes:
#     - The defaults should come from a configuration file

# DataClient --
#     Namespace for the general procedures
#
namespace eval ::DataClient {
    variable server           ;# Connection to the server
    variable forServer        ;# Wait for server
}

# connect --
#     Connect to the data server
#
# Arguments:
#     host        Name of the host that the server is running on
#     port        Port for the server (8181 by default)
#
proc ::DataClient::connect {host port} {
    variable server

    set server [socket $host $port]
}

# out --
#     Send data to the server
#
# Arguments:
#     type        Type of data record
#     tag         Identifier for the data
#     data        The data themselves
#
proc ::DataClient::out {type tag data} {
    variable server

    puts $server [list out $type $tag $data]
    flush $server
}

# main --
#     Analyse the arguments (if any) and send the job to the
#     data server
#
set host     localhost
set port     8181
set time     0:00
set queue    normal
set platform $::tcl_platform(platform)

set count 0
foreach {key value} $::argv {
    switch -- $key {
        "-host" {
             set host $value
        }
        "-port" {
             set port $value
        }
        "-queue" {
             set queue $value
        }
        "-after" {
             set time $value
        }
        "-platform" {
             set platform $value
        }
        default {
             break
        }
    }
    incr count
}

set command [lrange $::argv $count end]


::DataClient::connect $host $port

::DataClient::out $platform-$queue-submit 0 [list $::env(USERID) $time [pwd] $command]
puts "Submitted job: $command"

(Note that the submit program runs only shortly: it needs to connect to the server, send the data and then finish)