Arjen Markus (5 june 2011) I have been fascinated by the idea of "tuplespaces" for a long time now. The only drawback of the concept is that it is a trifle difficult to implement - due to the generality - in languages like C or Fortran. Here is a scaled-down version that accepts tuples of three elements only - (type, tag, data). The type is used to identify the structure/meaning of the data, the tag can be used as an identification of the particular record.
The data server recognises the following operations:
The operations in and read can either return immediately or they wait for a suitable record. Records need not to be unique by type and tag, but only the first suitable record is ever returned. You can also ask for a record of non-specific type or tag, as the type and tag in the request are used as masks (an empty type or tag field is treated as "*")
One immediate use of this simplified data server I present below: a queuing system for computational jobs. The idea is this:
submit.tcl collects the data on the job (which user-ID, which queue (a way of partitioning the set of computational servers), time to schedule it, the working directory, the actual command to run) and constructs a data record of type "<platform>-<queue>-submit". The tag is not used and the data part consists of all the other information.
jobserver.tcl requests from the data server a record of the type "<platform>-<queue>-submit", where <platform> is the name of the platform it runs on (Linux, Windows, ...) and <queue> is the queue to which the computer is assigned.
To be really useful as a practical queueing system, more is needed:
As a proof of concept, this is not bad, I'd say. The code is fairly compact and easy to debug. ''A nice feature is that the system can be used in a heterogeneous environment, that is: Linux, Windows, OSX machines can all participate, as long as the commands scheduled are suitable for the target platfom - and the working directory can be suitably translated.
Possible subsequent steps:
To run the queueing system, the following steps are required:
It is probably easiest to see what is happening if you run all of them on your local machine first.
Here is the code for the data server:
# dataserver.tcl -- # Scaled-down version of a tuplespace data server: # - Each tuple has the same fields: # - The sender # - A type (identifying the type of the associated data) # - A tag (identifying the data, but it does not need to # be unique) # - The data themselves - no selection possible on this # field, as the contents are treated as opaque # - Operations are: # out: the client sends the data to the server # in: the client gets a copy of the data, selection on # sender, type and tag (all optional) # read: as "in", but the record is removed from the # server # Both in and read can be waited for or not # # The server listens to port 8181 by default # # Limitations: # - If the requests are not specific enough, only the # first data record that matches is returned. # Perhaps an "allin" method should be added. # # DataServer -- # Namespace for the data server # namespace eval ::DataServer { variable server variable storedData {} } # AcceptClient -- # Accept the incoming request from a client to connect # # Arguments: # sock Client socket # host Name of the host the client runs on # port Port number for the client # proc ::DataServer::AcceptClient {sock host port} { fconfigure $sock -blocking 0 fileevent $sock readable [list ::DataServer::ReadData $sock] } # ReadData -- # Read the raw data from the client # # Arguments: # sock Client socket # proc ::DataServer::ReadData {sock} { variable incomingData if { [catch {gets $sock line} len] || [eof $sock] } { catch { close $sock puts "Client closed connection" } } elseif { $len >= 0 } { foreach {request type tag data} $line {break} HandleRequest $sock $request $type $tag $data } } # HandleRequest -- # Examine the incoming request and dispatch # # Arguments: # client Identifier of the client connection # request Type of request # type Type of the data (empty for any) # tag Tag of the data (empty for any) # data (Optional) the data themselves # # Result: # None # # Side effects: # Depend on the request # proc ::DataServer::HandleRequest {client request type tag {data {}}} { switch -- $request { "out" { StoreData $client $type $tag $data } "in" { SearchDataAndSend $client $type $tag 0 } "read" { SearchDataAndSend $client $type $tag 1 } default { SendErrorMesssage "Unknown request: $request" } } } # StoreData -- # Store the data # # Arguments: # client Identifier of the client connection # type Type of the data (empty for any) # tag Tag of the data (empty for any) # data (Optional) the data themselves # # Result: # None # # Side effects: # Depend on the request # proc ::DataServer::StoreData {client type tag {data {}}} { variable storedData lappend storedData [list $client $type $tag $data] } # SearchDataAndSend -- # Search the first record of data that fits the request # and send the result to the client # # Arguments: # client Identifier of the client connection # type Type of the data (empty for any) # tag Tag of the data (empty for any) # remove Remove the data or not # # Result: # None # # Side effects: # Send the result to the client, remove the data # if requested # proc ::DataServer::SearchDataAndSend {client type tag remove} { variable storedData if { $type == "" } { set type "*" } if { $tag == "" } { set tag "*" } set index -1 foreach record $storedData { incr index foreach {client_ type_ tag_ data} $record {break} if { [string match $type $type_] && [string match $tag $tag_] } { puts $client [list DATA $type_ $tag_ $data] break } } if { $index >= 0 && $remove } { set storedData [lreplace $storedData $index $index] } if { $index < 0 } { puts $client NONE } flush $client } # startServer -- # Start up the server # # Arguments: # argv The command-line arguments # # Result: # None # # Side effects: # Starts the server # proc ::DataServer::startServer {argv} { variable server set port 8181 foreach {key value} $argv { switch -- $key { "-port" { set port $value } default { # Nothing } } } puts "Starting data server ..." set server [socket -server ::DataServer::AcceptClient $port] vwait forever } # main -- # Start the server # ::DataServer::startServer $::argv
The jobserver is implemented via this code:
# jobserver.tcl -- # Program to get jobs from the data server and run them # # Arguments: # -host host The host where the data server is running # -port port Port that the data server is listening to # -queue queue Name of the queue this job server belongs to # # Limitations: # - The user that should run the job is ignored # # Notes: # - The defaults should come from a configuration file # DataClient -- # Namespace for the general procedures # namespace eval ::DataClient { variable server ;# Connection to the server variable forServer ;# Wait for server } # connect -- # Connect to the data server # # Arguments: # host Name of the host that the server is running on # port Port for the server (8181 by default) # proc ::DataClient::connect {host port} { variable server set server [socket $host $port] fileevent $server readable [list ::DataClient::ReadServerData $server] } # GetData -- # Get data from the server # # Arguments: # request Type of request (in/read) # type Type of data record # tag Identifier for the data # wait Wait for a record or not # proc ::DataClient::GetData {request type tag {wait 0}} { variable server variable serverData while {1} { puts $server [list $request $type $tag] flush $server set serverData {} vwait ::DataClient::forServer if { ! $wait || $serverData ne "NONE" } { break } } return $serverData } # in -- # Get data from the server, but leave them in # # Arguments: # type Type of data record # tag Identifier for the data # wait Wait for a record or not # proc ::DataClient::in {type tag {wait 0}} { return [GetData in $type $tag $wait] } # read -- # Get data from the server, and have them removed # # Arguments: # type Type of data record # tag Identifier for the data # wait Wait for a record or not # proc ::DataClient::read {type tag {wait 0}} { return [GetData read $type $tag $wait] } # out -- # Send data to the server # # Arguments: # type Type of data record # tag Identifier for the data # data The data themselves # proc ::DataClient::out {type tag data} { variable server puts $server [list out $type $tag $data] flush $server } # ReadServerData -- # Read data from the server # # Arguments: # server Server connection # proc ::DataClient::ReadServerData {server} { variable forServer variable serverData if { [catch {gets $server serverData} len] || [eof $server] } { catch { close $server } set serverData "NONE" } elseif { $len >= 0 } { set forServer 1 } } # ReadAndRecord -- # Read the output from the job # # Arguments: # fin Channel to the running job # fout Channel to the log file # proc ReadAndRecord {fin fout} { variable jobEnds if { [catch {gets $fin line} len] || [eof $fin] } { catch { close $fin } set ::jobEnds 1 } elseif { $len >= 0 } { puts $fout $line } } # main -- # Analyse the arguments (if any) and get the job to the # data server # set host localhost set port 8181 set time 0:00 set queue normal set platform $::tcl_platform(platform) set count 0 foreach {key value} $::argv { switch -- $key { "-host" { set host $value } "-port" { set port $value } "-queue" { set queue $value } default { break } } incr count } ::DataClient::connect $host $port # # Start the endless loop of requesting jobs from the # data server # while {1} { set jobData [::DataClient::read $platform-$queue-submit 0 1] puts "Job data: $jobData" foreach {dummy thisQueue tag commandData} $jobData {break} foreach {user time pwd command} $commandData {break} # # If the job should not run yet, put it back # Question: is this an "honest" algorithm? # # TODO: update the job record (running, remove from queue altogether) # set scheduleTime [clock scan $time] if { $scheduleTime > [clock seconds] } { ::DataClient::out $platform-$queue-submit 0 $jobData } puts "Starting job: $command (in $pwd)" cd $pwd set jobChannel [open "|$command" r] set jobOutput [open [lindex $command 0].out[clock seconds] w] fileevent $jobChannel readable [list ReadAndRecord $jobChannel $jobOutput] vwait jobEnds close $jobOutput }
And the submit program is implemented in this way:
# submit.tcl -- # Submit a new job # # Arguments: # -host host The host where the data server is running # -port port Port that the data server is listening to # -after time Time of day that the job should start after # -queue queue Name of the queue in which the job is placed # -platform platform Platform that the job is intended for # # All other arguments are treated as being the job that # should be run # # # Limitations: # - The user that should run the job is the user of the # job server. # - No accompanying programs to cancel the job or to get # an overview # # Notes: # - The defaults should come from a configuration file # DataClient -- # Namespace for the general procedures # namespace eval ::DataClient { variable server ;# Connection to the server variable forServer ;# Wait for server } # connect -- # Connect to the data server # # Arguments: # host Name of the host that the server is running on # port Port for the server (8181 by default) # proc ::DataClient::connect {host port} { variable server set server [socket $host $port] } # out -- # Send data to the server # # Arguments: # type Type of data record # tag Identifier for the data # data The data themselves # proc ::DataClient::out {type tag data} { variable server puts $server [list out $type $tag $data] flush $server } # main -- # Analyse the arguments (if any) and send the job to the # data server # set host localhost set port 8181 set time 0:00 set queue normal set platform $::tcl_platform(platform) set count 0 foreach {key value} $::argv { switch -- $key { "-host" { set host $value } "-port" { set port $value } "-queue" { set queue $value } "-after" { set time $value } "-platform" { set platform $value } default { break } } incr count } set command [lrange $::argv $count end] ::DataClient::connect $host $port ::DataClient::out $platform-$queue-submit 0 [list $::env(USERID) $time [pwd] $command] puts "Submitted job: $command"
(Note that the submit program runs only shortly: it needs to connect to the server, send the data and then finish)