A poor man's grid computing

Arjen Markus (6 february 2008) I promised in Tupleserver to post the source code for the application I wrote based on the idea of tuple spaces. (see Tupleserver for an explanation of what it is all about.)

I like to think of this as a simple way to do grid computing: rather than use a single computer, you use a whole (variable) set of computers to do the work. They do not even need to be on the same location, sharing disks and so on.

The application consists of the following files:

  • runclient.tcl and runclient.cfg: The source code for the client that sets up the computations and the corresponding configuration file (it also defines a small procedure to prepare the input for the computations)
  • runprog.tcl and runprog.cfg: The source code for the "computational client" - it picks up the input files, runs the computation and sends the results back. Of course it too requires a little configuration file.
  • primes.f90: A slightly less than trivial program whose main task it is to produce two different output files (I wanted more than one file to check the synchronisation in the client). But you are free to insert a different program

The code in runclient.tcl is not quite flawless - it forgets about one computation and I should really solve that issue.

The nice thing about the tupleserver is that your own code does not have to worry too much about synchronisation, nothing at all about race conditions or inconsistent data and the various parts can run on completely different machines - I tested it in various configurations, one of which was the client on a Linux machine, the computational client on a Windows PC and the server on a second Windows PC. It sohuld be possible to run multiple ordinary clients and computational clients, but I have not used that configuration.

Note: the configuration determines among other things where to find the server.

To use this under Windows, for instance:

  • Start the tupleserver (via wish, as it presents a simple GUI)
  • Start two DOS-boxes
  • Start the client and the computational client (or multiple computational clients) in the two DOS-boxes.

runclient.tcl:

# runclient.tcl --
#     Prepare the input for the computations, send the files to
#     the tuple server and receive the results.
#     An external program is used to prepare the input files
#
#     Variables in runclient.cfg file:
#         Tuple server's address:                 tupleserver
#         Tuple server's port:                    serverport
#         External program to prepare the input:  program
#         List of input files:                    inputfiles
#
# TODO:
#     Proper handling of end-of-socket-connection
#     Server: end server
#

# connect_tuplespace --
#     Connect to the tuple server
#
# Arguments:
#     name           Name/address of the tuple server
#     namespace      Namespace for the commands
#
# Result:
#     Channel to the server
#
# Side effects:
#     Connection to the tuple server established, file event handler
#     installed
#
proc connect_tuplespace {name namespace} {
    global servername
    global tupleserver
    global serverport

    set s [socket $tupleserver $serverport]
    fconfigure $s -buffering line
    puts $s "open $name"
    gets $s
    fileevent $s readable [list handle_command $s $name $namespace]
    set servername $name

    return $s
}

# handle_command --
#     Handle the commands sent by the server
#
# Arguments:
#     s            Server connection
#     ts           Tuplespace we are using
#     ns           Namespace for the commands
#
# Result:
#     None
#
# Side effects:
#     The commands received from the server will have side effects
#
proc handle_command {s ts ns} {
    if { [eof $s] } {
        close $s
        set ::forever 1
        return
    }
    gets $s resp
    # get next lines if response has embedded new lines
    while {![info complete $resp]} {
        append resp \n[gets $s]
    }

    set status [lindex $resp 0]
    set resp [lindex $resp end]
    if { $status eq "error" } {
        puts stderr $resp
        return
    }
    set cmd [lindex $resp 0]
    set args [lrange $resp 1 end]

    if { $cmd == "" } {
        return
    }

    puts "Received: $cmd -- $s -- $args"
    if { [catch {eval ::${ns}::$cmd $s $args} error] } {
        puts stderr "$error - $::errorInfo"
    }

    #
    # Get the first tuple for a task that has been completed
    #
    #puts $s "in $ts {TASKDONE * * *}"
    #puts "IN TASKDONE"
}

# logsrv --
#     Define commands understood by logsrv
#
namespace eval logsrv {}

# TASKDONE --
#     Get a description of the completed task
#
# Arguments:
#     s           Server connection
#     task        Task number
#     files       Number of files
#     dummy       Not used
#
proc ::logsrv::TASKDONE {s task files dummy} {
    global servername
    global filecount

    set filecount $files

    #
    # Request the first file
    #
    puts $s "in $servername {RESULT $task * *}"
    flush $s
    puts "IN RESULT"
}

# RESULT --
#     Get the result file
#
# Arguments:
#     s           Server connection
#     task        Task number
#     filename    Name of the file
#     contents    Contents of the file
#
proc ::logsrv::RESULT {s task filename contents} {
    global servername
    global filecount
    global tasks
    global expected

    if { ! [file exists out$task] } {
        file mkdir out$task
    }
    set filename [file join out$task $filename]
    set outfile  [open $filename w]
    puts -nonewline $outfile [string map {# \n} $contents]
    close $outfile

    #
    # Request the next file, if any
    #
    incr filecount -1
    if { $filecount > 0 } {
        puts $s "in $servername {RESULT $task * *}"
        flush $s
        puts "IN RESULT"
    } else {
        incr tasks
        if { $tasks < $expected } {
            puts $s "in $servername {TASKDONE * * *}"
            puts "IN TASKDONE"
        } else {
            close $s
            set ::forever 1
        }
    }
}

# runProgram --
#     Run the external program that prepares the input
#
# Arguments:
#     s           Server connection
#     task        Task number
#
# Result:
#     Either "continue" or "done" - the last to indicate
#     all tasks have been scheduled.
#
proc ::logsrv::runProgram {s task} {
    global servername
    global filecount
    global program
    global inputfiles

    #set oldpwd [pwd]
    #cd $task

    #
    # Simple communication with the external program
    #
    puts "Scheduling task $task ..."
    set outfile [open "schedule" w]
    puts $outfile $task
    close $outfile

    #
    #  Run the program and send the input
    #
    if { [catch {
             eval $program
         } msg] } {
       #puts $s "out $servername [list TASKDONE $task ERROR $msg]"
       puts "Error: $msg"
    } else {
       puts $s "out $servername {[list TASK $task [llength $inputfiles] {}]}"
       puts "OUT TASK $task"
       foreach f $inputfiles {
           set infile [open $f]
           set contents [read $infile]
           close $infile
           puts $s "out $servername {[list INPUT $task $f [string map {\n #} $contents]]}"
           puts "OUT INPUT $task"
       }
    }

    set rc "continue"
    if { [file exists "done"] } {
        puts "All tasks scheduled"
        set rc "done"
    }

    #cd $oldpwd
    flush $s

    return $rc
}

# main --
#     Connect to the server:
#     - Get the connection parameters
#     - Get the computational program's name
#     - Wait "forever"
#
source runclient.cfg

set server [connect_tuplespace LOGsrv logsrv]
puts "Client connected to tuple server"

set task     0
set tasks    0
set expected 0
while {1} {
   set rc [::logsrv::runProgram $server $task]
   if { $rc eq "done" } {
       break
   }
   incr task
   incr expected
}
puts $server "in LOGsrv {TASKDONE * * *}"

vwait forever

runclient.cfg:

# Configuration file for the steering client
#
set tupleserver localhost
set serverport  4040
set program     setupComp
set inputfiles  {primes.inp}

#
# Use the locally defined procedure to set up the runs
#
proc setupComp {} {
    set infile [open "schedule"]
    set count  [gets $infile]
    close $infile

    if { $count < 10 } {
        set outfile [open "primes.inp" w]
        puts $outfile [expr {$count*1000}]
        puts $outfile [expr {($count+1)*1000}]
        close $outfile
    } else {
        close [open "done" w]
    }
}

runprog.tcl:

# runprog.tcl --
#     Run an external program using input data from the tuple server
#     The program in this file waits for jobs on the server and
#     when it gets one, runs the external program, waits for it to
#     return and sends the results back.
#
#     Variables in runprog.cfg file:
#         Tuple server's address:  tupleserver
#         Tuple server's port:     serverport
#         External program:        program
#         Result files to be sent: resultfiles
#

# connect_tuplespace --
#     Connect to the tuple server
#
# Arguments:
#     name           Name/address of the tuple server
#     namespace      Namespace for the commands
#
# Result:
#     None
#
# Side effects:
#     Connection to the tuple server established, file event handler
#     installed
#
proc connect_tuplespace {name namespace} {
    global servername
    global tupleserver
    global serverport

    set s [socket $tupleserver $serverport]
    fconfigure $s -buffering line
    puts $s "open $name"
    gets $s
    fileevent $s readable [list handle_command $s $name $namespace]
    set servername $name
    puts $s "in $name {TASK * * *}"
}

# handle_command --
#     Handle the commands sent by the server
#
# Arguments:
#     s            Server connection
#     ts           Tuplespace we are using
#     ns           Namespace for the commands
#
# Result:
#     None
#
# Side effects:
#     The commands received from the server will have side effects
#
proc handle_command {s ts ns} {
    if { [eof $s] } {
        close $s
        set ::forever 1
    }
    gets $s resp
    # get next lines if response has embedded new lines
    while {![info complete $resp]} {
        append resp \n[gets $s]
    }

    set status [lindex $resp 0]
    set resp [lindex $resp end]
    if { $status eq "error" } {
        puts stderr $resp
        return
    }
    set cmd [lindex $resp 0]
    set args [lrange $resp 1 end]

    if { $cmd == "" } {
        return
    }

    if { [catch {eval ::${ns}::$cmd $s $args} error] } {
        puts stderr "$error - $::errorInfo"
    }

    #
    # Get the first tuple for a new task
    #
    puts $s "in $ts {TASK * * *}"
}

# logsrv --
#     Define commands understood by logsrv
#
namespace eval logsrv {}

# TASK --
#     Get a description of the task
#
# Arguments:
#     s           Server connection
#     task        Task number
#     files       Number of files
#     dummy       Not used
#
proc ::logsrv::TASK {s task files dummy} {
    global servername
    global filecount

    set filecount $files

    #
    # Request the first file
    #
    puts "Task received: $task - $files files"
    puts $s "in $servername {INPUT $task * *}"
    flush $s
}

# INPUT --
#     Get the input file
#
# Arguments:
#     s           Server connection
#     task        Task number
#     filename    Name of the file
#     contents    Contents of the file
#
proc ::logsrv::INPUT {s task filename contents} {
    global servername
    global filecount

    puts "Input received: task $task - $filename"
    if { ! [file exists $task] } {
        file mkdir $task
    }
    set filename [file join $task $filename]
    set outfile  [open $filename w]
    puts -nonewline $outfile [string map {# \n} $contents]
    close $outfile

    #
    # Request the next file, if any
    #
    incr filecount -1
    if { $filecount > 0 } {
        puts $s "in $servername {INPUT $task * *}"
        flush $s
    } else {
        runProgram $s $task
    }
}

# runProgram --
#     Run the external program
#
# Arguments:
#     s           Server connection
#     task        Task number
#
proc ::logsrv::runProgram {s task} {
    global servername
    global filecount
    global program
    global resultfiles

    set oldpwd [pwd]
    cd $task
    if { [catch {
             eval $program
         } msg] } {
       puts $s "out $servername {[list TASKDONE $task ERROR $msg]}"
       puts "Error: $msg"
    } else {
       puts $s "out $servername {[list TASKDONE $task [llength $resultfiles] {}]}"
       foreach f $resultfiles {
           set infile [open $f]
           set contents [read $infile]
           close $infile
           puts $s "out $servername {[list RESULT $task $f [string map {\n #} $contents]]}"
       }
    }
    cd $oldpwd
    flush $s
}

# main --
#     Connect to the server:
#     - Get the connection parameters
#     - Get the computational program's name
#     - Wait "forever"
#
source runprog.cfg

connect_tuplespace LOGsrv logsrv
puts "Computing server connected to tuple server"

vwait forever

runprog.cfg:

# Configuration file for the computational client
#
set tupleserver localhost
set serverport  4040
set program     "exec ../primes.exe"
set resultfiles {primes.out primes.log}

primes.f90:

! primes.f90 --
!     Program to compute the primes between two numbers.
!     It reads the bounds from primes.inp and writes two
!     files: primes.out and primes.log
!     Nothing serious, just a slightly less than trivial
!     program to illustrate the tuple server
!
program primes

    implicit none
    integer :: vmin
    integer :: vmax
    integer :: candidate1
    integer :: candidate2
    integer :: count

    open( 10, file = 'primes.inp' )
    open( 20, file = 'primes.out' )
    open( 30, file = 'primes.log' )

    read( 10, * ) vmin
    read( 10, * ) vmax
    close( 10 )

    count      = 0
    candidate1 = 6 * (vmin/6) + 1
    candidate2 = 6 * (vmin/6) + 5

    do while ( candidate1 <= vmax )

        if ( isprime(candidate1) ) then
            count = count + 1
            write(20,*) candidate1
        endif

        if ( isprime(candidate2) ) then
            count = count + 1
            write(20,*) candidate2
        endif

        candidate1 = candidate1 + 6
        candidate2 = candidate2 + 6
    enddo

    write( 30, * ) 'Number of primes found: ', count
contains

! isprime --
!     Naive function to determine if a number is prime
!
logical function isprime( x )
    integer :: x
    integer :: fmax
    integer :: f

    fmax = sqrt(real(x))
    f    = 5

    isprime = .true.
    do while ( f <= fmax )
        if ( mod(x,f) == 0 ) then
            isprime = .false.
            exit
        endif

        f = f + 2
    enddo
end function isprime
end program primes

tcleval (10-february-2008) the tuplespace is line buffered, it is ok for text messages, but what if the file for computation or the messages have an '#' character? it will be substituted by '\n' as the string map above suggests. I sujest, for transfering binary files, that tuplespace should be binary oriented, and the messages could be like ed2k (emule/edonkey2000) where you have to specify the length of the message in binary

DKF: The way to ship truly arbitrary strings is pretty much as described. You need a little code like this to write and read the binary data (be aware that the reader assumes that the channel is blocking, and everything assumes that the channel is in full binary mode):

proc writeMsg {channel string} {
   set enc [encoding convertto utf-8 $string]
   puts -nonewline $channel [binary format I [string length $enc]]$enc
}
proc readMsg {channel} {
   binary scan [read $channel 4] I len
   return [encoding convertfrom utf-8 [read $channel $len]]
}

Writing a reader that can work properly in non-blocking mode is left as an exercise.

AM (11 february 2008) I am quite aware that the code as given above is not quite capable of transferring binary data. (And the current version of MJ's tuple server takes care of multiline messages itself). An alternative we discussed briefly is to use base64 encoding, but the encoding DKF describes would work even better (I think it will at least be faster).

MJ - Note that currently, the only restriction on the tuple is that it should be a valid Tcl list. For sending binary data, it's much better to use something like base64. The reason binary is not used it that you will then have to specify stuff like the character encoding and the line ending convention either in band or in the 'protocol' spec. This seems like overkill atm.