Version 0 of Exchanging data between processes

Updated 2011-06-06 09:14:15 by arjen

Arjen Markus (5 june 2011) I have been fascinated by the idea of "tuplespaces" for a long time now. The only drawback of the concept is that it is a trifle difficult to implement - due to the generality - in languages like C or Fortran. Here is a scaled-down version that accepts tuples of three elements only - (type, tag, data). The type is used to identify the structure/meaning of the data, the tag can be used as an identification of the particular record.

The data server recognises the following operations:

  • out: the client sends data to the data server to be stored
  • in: the client asks for a data record, identified by type and tag, the server keeps its copy
  • read: the client asks for a data record, identified by type and tag, the data record is removed from the server

The operations in and read can either return immediately or they wait for a suitable record. Records need not to be unique by type and tag, but only the first suitable record is ever returned. You can also ask for a record of non-specific type or tag, as the type and tag in the request are used as masks (an empty type or tag field is treated as "*")

One immediate use of this simplified data server I present below: a queuing system for computational jobs. The idea is this:

  • The data server is running on one computer
  • One or more computers connect to this data server via the program jobserver.tcl. They connect to the server and wait for data records of a particular type
  • You can use the other program, submit.tcl, to submit jobs, to be run immediately or if necessary, to be started only after a certain time of day

submit.tcl collects the data on the job (which user-ID, which queue (a way of partitioning the set of computational servers), time to schedule it, the working directory, the actual command to run) and constructs a data record of type "<platform>-<queue>-submit". The tag is not used and the data part consists of all the other information.

jobserver.tcl requests from the data server a record of the type "<platform>-<queue>-submit", where <platform> is the name of the platform it runs on (Linux, Windows, ...) and <queue> is the queue to which the computer is assigned.

To be really useful as a practical queueing system, more is needed:

  • Properly handle the user-ID
  • Provide means to cancel a (possibly running) job
  • Show what jobs are running and what jobs are waiting

As a proof of concept, this is not bad, I'd say. The code is fairly compact and easy to debug. ''A nice feature is that the system can be used in a heterogeneous environment, that is: Linux, Windows, OSX machines can all participate, as long as the commands scheduled are suitable for the target platfom - and the working directory can be suitably translated.

Possible subsequent steps:

  • Complete the work on the queueing system
  • Use the data server to exchange data between computational programs written in, say, Fortran, which was my initial motivation to experiment with this.

*Running the queueing system* To run the queueing system, the following steps are required:

  • Start the data server program
  • Start one or more job servers with the appropriate options so they can find the data server
  • Submit one or more jobs of your choice

It is probably easiest to see what is happening if you run all of them on your local machine first.

*Code*

Here is the code for the data server:

# dataserver.tcl --
#     Scaled-down version of a tuplespace data server:
#     - Each tuple has the same fields:
#       - The sender
#       - A type (identifying the type of the associated data)
#       - A tag (identifying the data, but it does not need to
#         be unique)
#       - The data themselves - no selection possible on this
#         field, as the contents are treated as opaque
#     - Operations are:
#       out:  the client sends the data to the server
#       in:   the client gets a copy of the data, selection on
#             sender, type and tag (all optional)
#       read: as "in", but the record is removed from the
#             server
#       Both in and read can be waited for or not
#
#     The server listens to port 8181 by default
#
#     Limitations:
#     - If the requests are not specific enough, only the
#       first data record that matches is returned.
#       Perhaps an "allin" method should be added.
#

# DataServer --
#     Namespace for the data server
#
namespace eval ::DataServer {
    variable server
    variable storedData {}
}

# AcceptClient --
#     Accept the incoming request from a client to connect
#
# Arguments:
#     sock       Client socket
#     host       Name of the host the client runs on
#     port       Port number for the client
#
proc ::DataServer::AcceptClient {sock host port} {
    fconfigure $sock -blocking 0
    fileevent $sock readable [list ::DataServer::ReadData $sock]
}

# ReadData --
#     Read the raw data from the client
#
# Arguments:
#     sock       Client socket
#
proc ::DataServer::ReadData {sock} {
    variable incomingData

    if { [catch {gets $sock line} len] || [eof $sock] } {
        catch {
            close $sock
            puts "Client closed connection"
        }
    } elseif { $len >= 0 } {
        foreach {request type tag data} $line {break}

        HandleRequest $sock $request $type $tag $data
    }
}

# HandleRequest --
#     Examine the incoming request and dispatch
#
# Arguments:
#     client      Identifier of the client connection
#     request     Type of request
#     type        Type of the data (empty for any)
#     tag         Tag of the data (empty for any)
#     data        (Optional) the data themselves
#
# Result:
#     None
#
# Side effects:
#     Depend on the request
#
proc ::DataServer::HandleRequest {client request type tag {data {}}} {

    switch -- $request {
        "out" {
            StoreData $client $type $tag $data
        }
        "in" {
            SearchDataAndSend $client $type $tag 0
        }
        "read" {
            SearchDataAndSend $client $type $tag 1
        }
        default {
            SendErrorMesssage "Unknown request: $request"
        }
    }
}

# StoreData --
#     Store the data
#
# Arguments:
#     client      Identifier of the client connection
#     type        Type of the data (empty for any)
#     tag         Tag of the data (empty for any)
#     data        (Optional) the data themselves
#
# Result:
#     None
#
# Side effects:
#     Depend on the request
#
proc ::DataServer::StoreData {client type tag {data {}}} {
    variable storedData

    lappend storedData [list $client $type $tag $data]
}

# SearchDataAndSend --
#     Search the first record of data that fits the request
#     and send the result to the client
#
# Arguments:
#     client      Identifier of the client connection
#     type        Type of the data (empty for any)
#     tag         Tag of the data (empty for any)
#     remove      Remove the data or not
#
# Result:
#     None
#
# Side effects:
#     Send the result to the client, remove the data
#     if requested
#
proc ::DataServer::SearchDataAndSend {client type tag remove} {
    variable storedData

    if { $type == "" } {
        set type "*"
    }
    if { $tag == "" } {
        set tag "*"
    }
    set index -1
    foreach record $storedData {
        incr index
        foreach {client_ type_ tag_ data} $record {break}

        if { [string match $type $type_] && [string match $tag $tag_] } {
            puts $client [list DATA $type_ $tag_ $data]
            break
        }
    }

    if { $index >= 0 && $remove } {
        set storedData [lreplace $storedData $index $index]
    }

    if { $index < 0 } {
        puts $client NONE
    }
    flush $client
}

# startServer --
#     Start up the server
#
# Arguments:
#     argv        The command-line arguments
#
# Result:
#     None
#
# Side effects:
#     Starts the server
#
proc ::DataServer::startServer {argv} {
    variable server

    set port 8181

    foreach {key value} $argv {
        switch -- $key {
            "-port" {
                set port $value
            }
            default {
                # Nothing
            }
        }
    }

    puts "Starting data server ..."
    set server [socket -server ::DataServer::AcceptClient $port]

    vwait forever
}

# main --
#     Start the server
#
::DataServer::startServer $::argv

The jobserver is implemented via this code:

# jobserver.tcl --
#     Program to get jobs from the data server and run them
#
#     Arguments:
#     -host host          The host where the data server is running
#     -port port          Port that the data server is listening to
#     -queue queue        Name of the queue this job server belongs to
#
#     Limitations:
#     - The user that should run the job is ignored
#
#     Notes:
#     - The defaults should come from a configuration file

# DataClient --
#     Namespace for the general procedures
#
namespace eval ::DataClient {
    variable server           ;# Connection to the server
    variable forServer        ;# Wait for server
}

# connect --
#     Connect to the data server
#
# Arguments:
#     host        Name of the host that the server is running on
#     port        Port for the server (8181 by default)
#
proc ::DataClient::connect {host port} {
    variable server

    set server [socket $host $port]

    fileevent $server readable [list ::DataClient::ReadServerData $server]
}

# GetData --
#     Get data from the server
#
# Arguments:
#     request     Type of request (in/read)
#     type        Type of data record
#     tag         Identifier for the data
#     wait        Wait for a record or not
#
proc ::DataClient::GetData {request type tag {wait 0}} {
    variable server
    variable serverData

    while {1} {
        puts $server [list $request $type $tag]
        flush $server

        set serverData {}

        vwait ::DataClient::forServer

        if { ! $wait || $serverData ne "NONE" } {
            break
        }
    }

    return $serverData
}

# in --
#     Get data from the server, but leave them in
#
# Arguments:
#     type        Type of data record
#     tag         Identifier for the data
#     wait        Wait for a record or not
#
proc ::DataClient::in {type tag {wait 0}} {

    return [GetData in $type $tag $wait]
}

# read --
#     Get data from the server, and have them removed
#
# Arguments:
#     type        Type of data record
#     tag         Identifier for the data
#     wait        Wait for a record or not
#
proc ::DataClient::read {type tag {wait 0}} {

    return [GetData read $type $tag $wait]
}

# out --
#     Send data to the server
#
# Arguments:
#     type        Type of data record
#     tag         Identifier for the data
#     data        The data themselves
#
proc ::DataClient::out {type tag data} {
    variable server

    puts $server [list out $type $tag $data]
    flush $server
}

# ReadServerData --
#     Read data from the server
#
# Arguments:
#     server      Server connection
#
proc ::DataClient::ReadServerData {server} {
    variable forServer
    variable serverData

    if { [catch {gets $server serverData} len] || [eof $server] } {
        catch {
            close $server
        }
        set serverData "NONE"
    } elseif { $len >= 0 } {
        set forServer 1
    }
}

# ReadAndRecord --
#     Read the output from the job
#
# Arguments:
#     fin         Channel to the running job
#     fout        Channel to the log file
#
proc ReadAndRecord {fin fout} {
    variable jobEnds

    if { [catch {gets $fin line} len] || [eof $fin] } {
        catch {
            close $fin
        }
        set ::jobEnds 1
    } elseif { $len >= 0 } {
        puts $fout $line
    }
}

# main --
#     Analyse the arguments (if any) and get the job to the
#     data server
#
set host     localhost
set port     8181
set time     0:00
set queue    normal
set platform $::tcl_platform(platform)

set count 0
foreach {key value} $::argv {
    switch -- $key {
        "-host" {
             set host $value
        }
        "-port" {
             set port $value
        }
        "-queue" {
             set queue $value
        }
        default {
             break
        }
    }
    incr count
}

::DataClient::connect $host $port

#
# Start the endless loop of requesting jobs from the
# data server
#
while {1} {
    set jobData [::DataClient::read $platform-$queue-submit 0 1]
    puts "Job data: $jobData"

    foreach {dummy thisQueue tag commandData} $jobData     {break}
    foreach {user time pwd command}           $commandData {break}

    #
    # If the job should not run yet, put it back
    # Question: is this an "honest" algorithm?
    #
    # TODO: update the job record (running, remove from queue altogether)
    #
    set scheduleTime [clock scan $time]
    if { $scheduleTime > [clock seconds] } {
        ::DataClient::out $platform-$queue-submit 0 $jobData
    }

    puts "Starting job: $command (in $pwd)"

    cd $pwd
    set jobChannel [open "|$command" r]
    set jobOutput  [open [lindex $command 0].out[clock seconds] w]
    fileevent $jobChannel readable [list ReadAndRecord $jobChannel $jobOutput]

    vwait jobEnds
    close $jobOutput
}

And the submit program is implemented in this way:

# submit.tcl --
#     Submit a new job
#
#     Arguments:
#     -host host          The host where the data server is running
#     -port port          Port that the data server is listening to
#     -after time         Time of day that the job should start after
#     -queue queue        Name of the queue in which the job is placed
#     -platform platform  Platform that the job is intended for
#
#     All other arguments are treated as being the job that
#     should be run
#
#
#     Limitations:
#     - The user that should run the job is the user of the
#       job server.
#     - No accompanying programs to cancel the job or to get
#       an overview
#
#     Notes:
#     - The defaults should come from a configuration file

# DataClient --
#     Namespace for the general procedures
#
namespace eval ::DataClient {
    variable server           ;# Connection to the server
    variable forServer        ;# Wait for server
}

# connect --
#     Connect to the data server
#
# Arguments:
#     host        Name of the host that the server is running on
#     port        Port for the server (8181 by default)
#
proc ::DataClient::connect {host port} {
    variable server

    set server [socket $host $port]
}

# out --
#     Send data to the server
#
# Arguments:
#     type        Type of data record
#     tag         Identifier for the data
#     data        The data themselves
#
proc ::DataClient::out {type tag data} {
    variable server

    puts $server [list out $type $tag $data]
    flush $server
}

# main --
#     Analyse the arguments (if any) and send the job to the
#     data server
#
set host     localhost
set port     8181
set time     0:00
set queue    normal
set platform $::tcl_platform(platform)

set count 0
foreach {key value} $::argv {
    switch -- $key {
        "-host" {
             set host $value
        }
        "-port" {
             set port $value
        }
        "-queue" {
             set queue $value
        }
        "-after" {
             set time $value
        }
        "-platform" {
             set platform $value
        }
        default {
             break
        }
    }
    incr count
}

set command [lrange $::argv $count end]


::DataClient::connect $host $port

::DataClient::out $platform-$queue-submit 0 [list $::env(USERID) $time [pwd] $command]
puts "Submitted job: $command"

(Note that the submit program runs only shortly: it needs to connect to the server, send the data and then finish)