Arjen Markus (7 september 2006) The idea behind the three programs below is fairly simple: some computations (in my case numerical computations) can be split up into separate pieces and only a limited amount of information needs to be exchanged. I meant it as an example for my Ftcl library (see: Combining Fortran and Tcl in one program) but it can also serve as an example of Distributed computation.
The three programs together simulate a simple mechanical system: a series of bodies connected via springs. The trick is however that the motion of each body is simulated by a separate process.
Naturally this requires coordination: each process simulating a body needs information on the position of the neighbours - that determines the force on the body. And to get to the next time, all processes must have finished. So, a central process (the server) is doing all coordination.
The code below is a bit tricky: it needs to take care of the physical arrangement, or topology if you will, of the bodies and the springs and that led me to introduce a keyword "FREE" to indicate no spring is attached on that side and to juggle a bit with extra items at the beginning and end of the various lists. Yes, it should be cleaned up.
More important, in my opinion, than the actual code is the "architecture". I had to make a number of decisions:
Well, there are numerous extensions and variations one can think of for setting up a system like this. Hopefully the Distributed computation page will end up in a more or less clear-cut design for a more general and flexible set-up.
To use it:
# bs_client.tcl # The client program: # - Define up the system # - Define the initial condition # - Start the simulation # # run -- # Start the computation and show the result (via the callback) # Arguments: # server Name of the server # show Procedure used to show the result # Result: # None # Side effects: # Whatever "show" does # proc run {server show} { global bs_system bs_state time timestep stoptime # # Connect to the server # set channel [socket $server 5678] fileevent $channel readable [list getData $channel $show] # # Start the computation # puts $channel "ID client" puts $channel "SYSTEM $bs_system" puts $channel "TIMESTEP $timestep" puts $channel "INIT $bs_state" puts $channel "TIME" flush $channel set time 0.0 } # getData -- # Get the results from the server and show them # Arguments: # channel Channel to the server # show Procedure used to show the result # Result: # None # Side effects: # Whatever "show" does # proc getData {channel show} { global time timestep stoptime if { [gets $channel result] >= 0 } { $show $time [lrange $result 1 end] set time [expr {$time+$timestep}] if { $time <= $stoptime } { puts $channel TIME flush $channel } } else { if { [eof $channel] } { close $channel set ::forever 1 } } } # bs_system -- # Register the system setup (body-spring data) # Arguments: # list List of spring-body-spring properties # Result: # None # Side effects: # Sets bs_system # proc bs_system {list} { global bs_system set bs_system $list } # bs_initial -- # Register the system's initial state # Arguments: # list List of displacements per body # Result: # None # Side effects: # Sets bs_state# # proc bs_initial {list} { global bs_state set bs_state $list } # showResult -- # Display the result in a useful way # Arguments: # time Current system time # list State of the system (displacements) # Result: # None # proc showResult {time list} { puts "$time $list" } # main -- # # S B S B S B S B bs_system {FREE 1.0 0.1 10.0 0.1 1.0 1.0} ;# Implicitly FIXED} bs_initial { 1.0 0.0 0.0 } set stoptime 10.0 set timestep 0.02 run localhost showResult vwait ::forever
# bs_compute.tcl # The computational program: # - Get the properties of the body and the two springs # - Compute the new position # - Send the new result to the server # # handleInput -- # Handle input from the client or the worker process # Arguments: # channel Channel for the connection # Result: # None # proc handleInput {channel} { global id position velocity if { [gets $channel input] >= 0 } { puts "Worker $id >> $input" switch -- [lindex $input 0] { "NEXT" { computeState [lrange $input 1 end] puts $channel "NEWSTATE $id $position $velocity" flush $channel puts "Sent NEWSTATE: $position $velocity" } default { puts "Worker $id: unknown input -$input" } } } else { if { [eof $channel] } { puts "Worker $id stopped" catch [close $channel] set ::forever 1 } } } # computeState -- # Compute the new position and velocity of the body # Arguments: # info Information about the position of the neighbours # Result: # None # proc computeState {info} { global bodymass position velocity left_spring right_spring friction foreach {dt left right} $info {break} set force 0.0 if { $left_spring != {} } { set force [expr {$force+$left_spring*($left-$position)}] } if { $right_spring != {} } { set force [expr {$force+$right_spring*($right-$position)}] } set force [expr {$force-$velocity*$friction}] set position [expr {$position + $dt * $velocity}] set velocity [expr {$velocity + $dt * $force / $bodymass}] } # main -- # Get the command-line parameters and start listening to the # server's commands # global id bodymass position velocity left_spring right_spring friction foreach {id bodymass position velocity left_spring right_spring} $argv {break} puts "Arguments: >>$argv<<" set friction 0.1 # Test mode ... if { $id == -1 } { set time 0.0 set dt 0.1 while { $time < 10.0 } { computeState [list $dt 0.0 0.0] puts "$time $position" set time [expr {$time+$dt}] } } else { set channel [socket localhost 5678] fileevent $channel readable [list handleInput $channel] puts $channel "ID $id" flush $channel vwait ::forever }
# bs_server.tcl # The server program: # - Handle the connection to the client # - Dispatch the computations # - Collect the results # # registerClient -- # Register the client or the worker process # Arguments: # channel Channel for the connection # addr Address of the client # port Port to be used # Result: # None # proc registerClient {channel addr port} { fconfigure $channel -blocking 0 -buffering line fileevent $channel readable [list handleInput $channel] } # handleInput -- # Handle input from the client or the worker process # Arguments: # channel Channel for the connection # Result: # None # proc handleInput {channel} { global bs_system state timestep if { [gets $channel input] >= 0 } { puts "Received >> $input" set keyword [lindex $input 0] switch -- $keyword { "ID" { storeId $channel [lindex $input 1] } "SYSTEM" { set bs_system [lrange $input 1 end] } "TIMESTEP" { set timestep [lindex $input 1] } "INIT" { set state(position) [lrange $input 1 end] set state(record) [lrange $input 0 end] ;# Note the 0! } "TIME" { nextTime } "NEWSTATE" { newState [lrange $input 1 end] } default { puts "Unknown input: $input" } } } else { if { [eof $channel] } { catch [close $channel] } } } # storeId -- # Store the ID of the client or worker process # Arguments: # channel Channel for the connection # id The ID in question # Result: # None # proc storeId {channel id} { global connect set connect(channel,$channel) $id set connect(id,$id) $channel if { $id != "client" } { lset connect(started) $id 1 puts "Started: $connect(started)" if { [lsearch $connect(started) 0] < 0 } { puts "Started: Ready!" set ::all_started 1 } } puts "Client/worker: $id" } # startWorkers -- # Start the worker processes # Arguments: # None # Result: # None # proc startWorkers {} { global connect bs_system state set connect(started) {X} ;# Dummy to make it possible to start workers IDs at 1 set state(record) {X} set left [lindex $bs_system 0] set id 1 foreach {body right} [lrange $bs_system 1 end] position $state(position) { if { $left == "FREE" } { set left {} } if { $right == "FREE" } { set right {} } puts "Server: started worker process $id" exec [info nameofexecutable] bs_compute.tcl $id $body $position 0.0 $left $right >worker$id & lappend connect(workers) $id lappend connect(started) 0 lappend state(record) 0 set left $right incr id } # Add a left and right boundary to the position set state(position) [concat 0.0 $state(position) 0.0] } # nextTime -- # Instruct the workers to compute the next state # Arguments: # None # Result: # None # proc nextTime {} { global connect state timestep if { $connect(workers) == {} } { startWorkers vwait ::all_started } set left 0 set right 2 foreach w $connect(workers) { lset state(record) $w 1 puts $connect(id,$w) \ "NEXT $timestep [lindex $state(position) $left] [lindex $state(position) $right]" flush $connect(id,$w) puts "Sent: NEXT - $w" incr left incr right } } # newState -- # Store the new state information of the worker # Arguments: # info The worker ID and the state # Result: # None # Side effects # Stores the new state for that body. If # all workers have sent their new state, # the client can be informed # proc newState {info} { global connect state foreach {id position velocity} $info {break} lset state(position) $id $position lset state(record) $id 0 puts "State: $state(record)" if { [lsearch $state(record) 1] < 0 } { puts $connect(id,client) "STATE [lrange $state(position) 1 end-1]" flush $connect(id,client) } } # main -- # # Prepare the global variables # global all_started bs_system timestep connect state set connect(workers) {} set connect(started) {} socket -server [list registerClient] 5678 vwait ::forever