Arjen Markus (6 february 2008) I promised in Tupleserver to post the source code for the application I wrote based on the idea of tuple spaces. (see Tupleserver for an explanation of what it is all about.)
I like to think of this as a simple way to do grid computing: rather than use a single computer, you use a whole (variable) set of computers to do the work. They do not even need to be on the same location, sharing disks and so on.
The application consists of the following files:
The code in runclient.tcl is not quite flawless - it forgets about one computation and I should really solve that issue.
The nice thing about the tupleserver is that your own code does not have to worry too much about synchronisation, nothing at all about race conditions or inconsistent data and the various parts can run on completely different machines - I tested it in various configurations, one of which was the client on a Linux machine, the computational client on a Windows PC and the server on a second Windows PC. It sohuld be possible to run multiple ordinary clients and computational clients, but I have not used that configuration.
Note: the configuration determines among other things where to find the server.
To use this under Windows, for instance:
runclient.tcl:
# runclient.tcl -- # Prepare the input for the computations, send the files to # the tuple server and receive the results. # An external program is used to prepare the input files # # Variables in runclient.cfg file: # Tuple server's address: tupleserver # Tuple server's port: serverport # External program to prepare the input: program # List of input files: inputfiles # # TODO: # Proper handling of end-of-socket-connection # Server: end server # # connect_tuplespace -- # Connect to the tuple server # # Arguments: # name Name/address of the tuple server # namespace Namespace for the commands # # Result: # Channel to the server # # Side effects: # Connection to the tuple server established, file event handler # installed # proc connect_tuplespace {name namespace} { global servername global tupleserver global serverport set s [socket $tupleserver $serverport] fconfigure $s -buffering line puts $s "open $name" gets $s fileevent $s readable [list handle_command $s $name $namespace] set servername $name return $s } # handle_command -- # Handle the commands sent by the server # # Arguments: # s Server connection # ts Tuplespace we are using # ns Namespace for the commands # # Result: # None # # Side effects: # The commands received from the server will have side effects # proc handle_command {s ts ns} { if { [eof $s] } { close $s set ::forever 1 return } gets $s resp # get next lines if response has embedded new lines while {![info complete $resp]} { append resp \n[gets $s] } set status [lindex $resp 0] set resp [lindex $resp end] if { $status eq "error" } { puts stderr $resp return } set cmd [lindex $resp 0] set args [lrange $resp 1 end] if { $cmd == "" } { return } puts "Received: $cmd -- $s -- $args" if { [catch {eval ::${ns}::$cmd $s $args} error] } { puts stderr "$error - $::errorInfo" } # # Get the first tuple for a task that has been completed # #puts $s "in $ts {TASKDONE * * *}" #puts "IN TASKDONE" } # logsrv -- # Define commands understood by logsrv # namespace eval logsrv {} # TASKDONE -- # Get a description of the completed task # # Arguments: # s Server connection # task Task number # files Number of files # dummy Not used # proc ::logsrv::TASKDONE {s task files dummy} { global servername global filecount set filecount $files # # Request the first file # puts $s "in $servername {RESULT $task * *}" flush $s puts "IN RESULT" } # RESULT -- # Get the result file # # Arguments: # s Server connection # task Task number # filename Name of the file # contents Contents of the file # proc ::logsrv::RESULT {s task filename contents} { global servername global filecount global tasks global expected if { ! [file exists out$task] } { file mkdir out$task } set filename [file join out$task $filename] set outfile [open $filename w] puts -nonewline $outfile [string map {# \n} $contents] close $outfile # # Request the next file, if any # incr filecount -1 if { $filecount > 0 } { puts $s "in $servername {RESULT $task * *}" flush $s puts "IN RESULT" } else { incr tasks if { $tasks < $expected } { puts $s "in $servername {TASKDONE * * *}" puts "IN TASKDONE" } else { close $s set ::forever 1 } } } # runProgram -- # Run the external program that prepares the input # # Arguments: # s Server connection # task Task number # # Result: # Either "continue" or "done" - the last to indicate # all tasks have been scheduled. # proc ::logsrv::runProgram {s task} { global servername global filecount global program global inputfiles #set oldpwd [pwd] #cd $task # # Simple communication with the external program # puts "Scheduling task $task ..." set outfile [open "schedule" w] puts $outfile $task close $outfile # # Run the program and send the input # if { [catch { eval $program } msg] } { #puts $s "out $servername [list TASKDONE $task ERROR $msg]" puts "Error: $msg" } else { puts $s "out $servername {[list TASK $task [llength $inputfiles] {}]}" puts "OUT TASK $task" foreach f $inputfiles { set infile [open $f] set contents [read $infile] close $infile puts $s "out $servername {[list INPUT $task $f [string map {\n #} $contents]]}" puts "OUT INPUT $task" } } set rc "continue" if { [file exists "done"] } { puts "All tasks scheduled" set rc "done" } #cd $oldpwd flush $s return $rc } # main -- # Connect to the server: # - Get the connection parameters # - Get the computational program's name # - Wait "forever" # source runclient.cfg set server [connect_tuplespace LOGsrv logsrv] puts "Client connected to tuple server" set task 0 set tasks 0 set expected 0 while {1} { set rc [::logsrv::runProgram $server $task] if { $rc eq "done" } { break } incr task incr expected } puts $server "in LOGsrv {TASKDONE * * *}" vwait forever
runclient.cfg:
# Configuration file for the steering client # set tupleserver localhost set serverport 4040 set program setupComp set inputfiles {primes.inp} # # Use the locally defined procedure to set up the runs # proc setupComp {} { set infile [open "schedule"] set count [gets $infile] close $infile if { $count < 10 } { set outfile [open "primes.inp" w] puts $outfile [expr {$count*1000}] puts $outfile [expr {($count+1)*1000}] close $outfile } else { close [open "done" w] } }
runprog.tcl:
# runprog.tcl -- # Run an external program using input data from the tuple server # The program in this file waits for jobs on the server and # when it gets one, runs the external program, waits for it to # return and sends the results back. # # Variables in runprog.cfg file: # Tuple server's address: tupleserver # Tuple server's port: serverport # External program: program # Result files to be sent: resultfiles # # connect_tuplespace -- # Connect to the tuple server # # Arguments: # name Name/address of the tuple server # namespace Namespace for the commands # # Result: # None # # Side effects: # Connection to the tuple server established, file event handler # installed # proc connect_tuplespace {name namespace} { global servername global tupleserver global serverport set s [socket $tupleserver $serverport] fconfigure $s -buffering line puts $s "open $name" gets $s fileevent $s readable [list handle_command $s $name $namespace] set servername $name puts $s "in $name {TASK * * *}" } # handle_command -- # Handle the commands sent by the server # # Arguments: # s Server connection # ts Tuplespace we are using # ns Namespace for the commands # # Result: # None # # Side effects: # The commands received from the server will have side effects # proc handle_command {s ts ns} { if { [eof $s] } { close $s set ::forever 1 } gets $s resp # get next lines if response has embedded new lines while {![info complete $resp]} { append resp \n[gets $s] } set status [lindex $resp 0] set resp [lindex $resp end] if { $status eq "error" } { puts stderr $resp return } set cmd [lindex $resp 0] set args [lrange $resp 1 end] if { $cmd == "" } { return } if { [catch {eval ::${ns}::$cmd $s $args} error] } { puts stderr "$error - $::errorInfo" } # # Get the first tuple for a new task # puts $s "in $ts {TASK * * *}" } # logsrv -- # Define commands understood by logsrv # namespace eval logsrv {} # TASK -- # Get a description of the task # # Arguments: # s Server connection # task Task number # files Number of files # dummy Not used # proc ::logsrv::TASK {s task files dummy} { global servername global filecount set filecount $files # # Request the first file # puts "Task received: $task - $files files" puts $s "in $servername {INPUT $task * *}" flush $s } # INPUT -- # Get the input file # # Arguments: # s Server connection # task Task number # filename Name of the file # contents Contents of the file # proc ::logsrv::INPUT {s task filename contents} { global servername global filecount puts "Input received: task $task - $filename" if { ! [file exists $task] } { file mkdir $task } set filename [file join $task $filename] set outfile [open $filename w] puts -nonewline $outfile [string map {# \n} $contents] close $outfile # # Request the next file, if any # incr filecount -1 if { $filecount > 0 } { puts $s "in $servername {INPUT $task * *}" flush $s } else { runProgram $s $task } } # runProgram -- # Run the external program # # Arguments: # s Server connection # task Task number # proc ::logsrv::runProgram {s task} { global servername global filecount global program global resultfiles set oldpwd [pwd] cd $task if { [catch { eval $program } msg] } { puts $s "out $servername {[list TASKDONE $task ERROR $msg]}" puts "Error: $msg" } else { puts $s "out $servername {[list TASKDONE $task [llength $resultfiles] {}]}" foreach f $resultfiles { set infile [open $f] set contents [read $infile] close $infile puts $s "out $servername {[list RESULT $task $f [string map {\n #} $contents]]}" } } cd $oldpwd flush $s } # main -- # Connect to the server: # - Get the connection parameters # - Get the computational program's name # - Wait "forever" # source runprog.cfg connect_tuplespace LOGsrv logsrv puts "Computing server connected to tuple server" vwait forever
runprog.cfg:
# Configuration file for the computational client # set tupleserver localhost set serverport 4040 set program "exec ../primes.exe" set resultfiles {primes.out primes.log}
primes.f90:
! primes.f90 -- ! Program to compute the primes between two numbers. ! It reads the bounds from primes.inp and writes two ! files: primes.out and primes.log ! Nothing serious, just a slightly less than trivial ! program to illustrate the tuple server ! program primes implicit none integer :: vmin integer :: vmax integer :: candidate1 integer :: candidate2 integer :: count open( 10, file = 'primes.inp' ) open( 20, file = 'primes.out' ) open( 30, file = 'primes.log' ) read( 10, * ) vmin read( 10, * ) vmax close( 10 ) count = 0 candidate1 = 6 * (vmin/6) + 1 candidate2 = 6 * (vmin/6) + 5 do while ( candidate1 <= vmax ) if ( isprime(candidate1) ) then count = count + 1 write(20,*) candidate1 endif if ( isprime(candidate2) ) then count = count + 1 write(20,*) candidate2 endif candidate1 = candidate1 + 6 candidate2 = candidate2 + 6 enddo write( 30, * ) 'Number of primes found: ', count contains ! isprime -- ! Naive function to determine if a number is prime ! logical function isprime( x ) integer :: x integer :: fmax integer :: f fmax = sqrt(real(x)) f = 5 isprime = .true. do while ( f <= fmax ) if ( mod(x,f) == 0 ) then isprime = .false. exit endif f = f + 2 enddo end function isprime end program primes
tcleval (10-february-2008) the tuplespace is line buffered, it is ok for text messages, but what if the file for computation or the messages have an '#' character? it will be substituted by '\n' as the string map above suggests. I sujest, for transfering binary files, that tuplespace should be binary oriented, and the messages could be like ed2k (emule/edonkey2000) where you have to specify the length of the message in binary
DKF: The way to ship truly arbitrary strings is pretty much as described. You need a little code like this to write and read the binary data (be aware that the reader assumes that the channel is blocking, and everything assumes that the channel is in full binary mode):
proc writeMsg {channel string} { set enc [encoding convertto utf-8 $string] puts -nonewline $channel [binary format I [string length $enc]]$enc } proc readMsg {channel} { binary scan [read $channel 4] I len return [encoding convertfrom utf-8 [read $channel $len]] }
Writing a reader that can work properly in non-blocking mode is left as an exercise.
AM (11 february 2008) I am quite aware that the code as given above is not quite capable of transferring binary data. (And the current version of MJ's tuple server takes care of multiline messages itself). An alternative we discussed briefly is to use base64 encoding, but the encoding DKF describes would work even better (I think it will at least be faster).
MJ - Note that currently, the only restriction on the tuple is that it should be a valid Tcl list. For sending binary data, it's much better to use something like base64. The reason binary is not used it that you will then have to specify stuff like the character encoding and the line ending convention either in band or in the 'protocol' spec. This seems like overkill atm.