Version 9 of Distributed computation

Updated 2006-10-15 15:16:39

* agents


AM (10 october 2006) I have been sketching a package that will do what I want with distributed computation:

  • make setting up such a computation as simple as possible from the client side.
  • make the server side as transparant as possible
  • first: whether threads are used or separate processes to gain concurrency is up to the server and what resources it has available)
  • second: whether there is a fixed pool of worker interpreters or not, depends on the kind tasks the client asks for (independent tasks versus tasks that require synchronisation)
  • third: if synchronisation is required, then all tasks to be synchronised will be waited upon

This is just meant to preserve my early-morning thoughts on the matter - scribbles on the back of a print-out may get lost.

Zarutian 13. oktober 22:09 2006: on last point: will the server handle that? AM Yes - you can start tasks as requiring synchronisation - see also my page on A simple mechanical system

AM (12 october 2006) Some further remarks: the packages Thread, comm and tie will be very helpful in the actual implementation.

Here is a sketch of one possible application - searching the web for interesting pages:

   set server [connectServer "servername" -poolsize 10]
   set keywords [list ...]
   set interesting_urls {}
   set urls [list ...] ;# Initial list

   exportProcs $server [list getWebPage checkWebPage] ;# The procedures that do the actual work
   registerHandler $server storeUrls ;# procedure that gathers the results in a convenient form

   set count 0
   while { 1 } {
       # Set the workers to work
       foreach u $urls {
           independentTask $server task$count [list \
               checkWebPage $u
           }
           incr count 
       }

       set tasks [waitForTasks $server] ;# Get a list of tasks that have finished, might be empty 

       foreach task $tasks {
           lappend urls $result($task) 
       }

       #
       # Collect the new URLs that were found 
       #
       ...

       #
       # To be added: administration and suitable stopping criteria
   }

Well, this is just a sketch - most important aspects:

  • Scheduling tasks
  • Waiting for results
  • All details are taken care of

AM (15 october 2006)

Ideas for an easy-to-use distributed computation facility

With all the packages in Tcllib and the core facilities it should be easy to set up a package that will allow anyone to do distributed computations, that is have several computers or several processes on a single computer work out the solution of a problem simultaneously, so that you get the answer faster or easier than if everything was done in a single program.

Here are two use cases:

  • Given an initial list of URLs, follow "all" the links contained therein that seem interesting, i.e. that refer to pages that contain information on some subject we are interested in. Obviously, we need to stop at some point, say after collecting 100 URLs or the 50th link level.
  • In a chemical factory the fabrication of some product is divided into several steps, each step is associated with its own tank. To model the processes in these tanks we can design programs that take care of the details in each tank. Raw materials and the intermediate products are transferred to the various tanks. If each tank is represented via a separate computational program, we need some sort of interaction and coordination between them.

This has led me to consider the following set of procedures:

  • connectServer "server" ?options?: connect to some (computational) server process, either on this machine or on some other computer.
  • independentTask "serverID" "taskname" "script": start a task (called "taskname" and implemented in "script") via this server. The

task is run without any coordination with others

  • syncTask "serverID" "taskname" "script": start a task, similar to an independent task, but the results only become available when all

other tasks that need synchronisation are also done

  • waitForTasks "serverID" "handler": wait for the tasks to finish and handle the results via the handler procedure.

There would have to be methods to transfer procedures and data to the server, like "exportProcs serverID list-of-procs", so that we can install the problem-specific code on the server. In fact, it should be possible to transfer complete compiled programs in that way.

The server process takes care of all the details:

  • Use threads or separate processes to carry out the actual computations
  • Provide persistency for the input and output data so that you can simply schedule a job, log off and collect the results the next day

The packages comm and Threads can be used to implement the communication between the client program and the server. The data persistency comes from tie or tequila.

(As suggested by Michael Schlenker, we can even use the [unknown] command to automatically get the procedures for us. And we can use a distributed VFS to exchange the data and provide persistency)

I have created a very first version of what I have in mind - see below.

It is very preliminary:

  • Connecting to a server is currently very clumsy - the client code starts a server on the local machine. I do not know how to make the

[comm] package pick a particular port, that would be much easier.

  • There are no threads or multiple interpreters yet, so there is only one actual task that can be run simultaneously.

I offer this code not because it is useful, but to get the discussion going.


Client side:

 # dcclient.tcl
 #     Package for distributed computation - the client side
 #
 #     Note:
 #     The package depends on several others:
 #     comm     - for general remote/inter-process communication
 #     Threads  - for multithreading
 #     dcserver - for local servicing
 #
 #     prepareWorker
 #     exportVariables
 #     importVariables
 #     exportFile
 #     importFile
 #
 package require comm
 #package require DCServer
 source dcserver.tcl

 namespace eval ::DCClient {
     variable self 0
     variable channel
     variable waiting
     variable finished_tasks
     variable result_data

     namespace export connectServer independentTask syncTask
     namespace export waitForTasks exportProcs exportData

     #
     # Initialise the comm package, get rid of the double eval
     # semantics now
     #
     ::comm::comm config -local 0
     ::comm::comm hook eval {return [uplevel #0 $buffer]}
 }

 # connectServer --
 #     Connect to the server process
 # Arguments:
 #     server     Name/ID of the server
 #     args       List of options
 # Result:
 #     ID of the server or error if failed
 # Notes:
 #     Two server names are special:
 #     self      - Use this process as the server
 #     localhost - Starts a second process that acts as a computation server
 #
 proc ::DCClient::connectServer {server args} {
     variable channel

     switch -- $server {
         "self" {
             set self 1
             set id [StartServer $self]
         }
         "localhost" {
             set self 0
             set id [StartServer $self]
         }
         default {
             #
             # TODO: finding out the remote server is a problem!
             #
             ::comm::comm config -local 0
             set id $server
             set channel($id) [::comm::comm connect $server]
         }
     }

     if { [llength $args] > 0 } {
         if { [lindex $args 0] == "-poolsize" } {
             SetPoolSize $id [lindex $args 1]
         }
     }
     return $id
 }

 # independentTask --
 #     Start a task that does not depend on others (no synchronisation needed)
 # Arguments:
 #     id         ID of the server
 #     task       Name of the task
 #     script     Script to be run in the worker interpreter
 # Result:
 #     None
 #
 proc ::DCClient::independentTask {id task script} {
     variable self

     if { $self } {
         ::DCServer::IndependentTask $task $script
     } else {
         set count 0
         set success 0
         while { $count < 10 } {
             if { [catch {
                 ::comm::comm send -command ::DCServer::GetTaskResult $id \
                     ::DCServer::IndependentTask [::comm::comm self] $task $script
             } msg] } {
                 after 1000
                 incr count
             } else {
                 set success 1
                 break
             }
         }
         if { ! $success } {
             return -code error $msg
         }
     }
 }

 # syncTask --
 #     Start a task that requires synchronisation with others
 # Arguments:
 #     id         ID of the server
 #     task       Name of the task
 #     script     Script to be run in the worker interpreter
 # Result:
 #     None
 #
 proc ::DCClient::syncTask {id task script} {
     variable self

     if { $self } {
         ::DCServer::SyncTask $task $script
     } else {
         ::comm::comm send -command ::DCServer::GetTaskResult $id \
             ::DCServer::SyncTask [::comm::comm self] $task $script
     }
 }

 # exportProcs --
 #     Export one or more procedures to the server
 # Arguments:
 #     id         ID of the server
 #     procs      List of procedures to be sent
 # Result:
 #     None
 # Note:
 #     No provision for namespaces yet
 #
 proc ::DCClient::exportProcs {id procs} {
     variable self

     set code {}
     foreach p $procs {
         lappend code $p [info args $p] [info body $p]
     }

     if { $self } {
         ::DCServer::ReceiveExportedProcs $code
     } else {
         ::comm::comm send -async $id \
             ::DCServer::ReceiveExportedProcs $code
     }
 }

 # exportData --
 #     Export data for a particular task on the server
 # Arguments:
 #     id         ID of the server
 #     task       Task name
 #     values     List of variable name/value pairs
 # Result:
 #     None
 # Note:
 #     All variables defined in the global namespace at the moment
 #
 proc ::DCClient::exportData {id task values} {
     variable self

     if { $self } {
         ::DCServer::ReceiveExportedData $task $values
     } else {
         ::comm::comm send -async $id \
             ::DCServer::ReceiveExportedData $task $values
     }
 }

 # StartServer --
 #     Start a local server or use this process as the server
 # Arguments:
 #     self       Whether to use this process or another one
 # Result:
 #     None
 #
 proc ::DCClient::StartServer {self} {

     if { $self } {
         ... use the threads package
     } else {
         file delete "_server_.id"
         set outfile [open "_server_.tcl" w]
         puts $outfile "#package require DCServer"
         puts $outfile "source dcserver.tcl"
         puts $outfile "::DCServer::InitServer"
         puts $outfile "vwait done"
         close $outfile
         exec [info nameofexecutable] _server_.tcl &
         while {![file exists _server_.id] } {
             after 1000
         }
         after 2000
         set infile [open "_server_.id"]
         set id [gets $infile]
         close $infile
         file delete "_server_.id"
         file delete "_server_.tcl"
     }
     return $id
 }

 # GetTaskResult --
 #     Get the result of some task
 # Arguments:
 #     server     Which server
 #     task       Name of the task
 #     code       Return code
 #     result     Result string of the task
 # Result:
 #     None
 # Side effects:
 #     Sets the variable finished_tasks and the array result_data
 # Note:
 #     This assumes that each _independent_ task has a unique
 #     name among the active or scheduled tasks. Otherwise
 #     the results will be overwritten
 #
 proc ::DCClient::GetTaskResult {server task code result} {
     variable finished_tasks
     variable result_data

     lappend finished_tasks $server $task
     set result_data($server,$task,code)   $code
     set result_data($server,$task,result) $result

     set ::DCClient::waiting 0
 }

 # waitForTasks --
 #     Wait for one or more tasks to finish
 # Arguments:
 #     id         ID of the server (empty for all servers)
 #     handler    Name of the procedure to be called to handle the results
 # Result:
 #     List of tasks that finished
 #
 proc ::DCClient::waitForTasks {id handler} {
     variable self
     variable waiting
     variable finished_tasks
     variable result_data

     #
     # Wait for any task to be finished.
     # This will work for the following reason:
     # - One or more tasks are scheduled in the server process
     # - When finished they schedule another task in the client process
     # - This gets handled as soon as we enter the event loop
     # Hm, what about Tk?
     #
     vwait ::DCClient::waiting

     #
     # Filter the list of tasks, update the list of finished tasks
     # and handle the results of the selected tasks
     #
     set new_list {}
     foreach {server task} $finished_tasks {
         if { $server eq $id || $id eq "" } {
             lappend tasks $server $task
         } else {
             lappend new_list $server $task
         }
     }
     set finished_tasks $new_list

     foreach {server task} $tasks {
         $handler $server $task $result_data($server,$task,code) \
             $result_data($server,$task,result)

         #
         # Clean up the results - to avoid memory leaks
         #
         unset result_data($server,$task,code)
         unset result_data($server,$task,result)
     }

     return $tasks
 }

 # main --
 #     Test code
 #
 proc getResult {server task code result} {
    puts "Result: $server $task $code -- $result"
 }
 proc computeSquare {x} {
     return [expr {$x*$x}]
 }

 namespace import ::DCClient::*
 set id [connectServer localhost]
 puts "Connected: $id ..."
 exportData $id task1 {v 3}
 exportProcs $id computeSquare
 independentTask $id task1 {computeSquare $v}
 puts "Task started ..."
 set tasks [waitForTasks $id getResult]
 puts "Finished - $tasks"

Server side:

 # dcserver.tcl
 #     Package for distributed computation - the server side
 #
 #     Note:
 #     The package depends on several others:
 #     comm     - for general remote/inter-process communication
 #     Threads  - for multithreading
 #     dcserver - for local servicing
 #
 package require comm

 namespace eval ::DCServer {
     variable self           0
     variable exported_procs {}

     #
     # Initialise the comm package, get rid of the double eval
     # semantics now
     #
     ::comm::comm config -local 0
     ::comm::comm hook eval {return [uplevel #0 $buffer]}
 }

 # InitServer --
 #     Initialise the server process
 # Arguments:
 #     None
 # Result:
 #     None
 # Side effects:
 #     Writes the server's ID to a file "_server_.id"
 #
 proc ::DCServer::InitServer {} {

     set outfile [open "_server_.id" w]
     puts $outfile [::comm::comm self]
     close $outfile
 }

 # IndependentTask --
 #     Starts an independent task
 # Arguments:
 #     client       ID of the client process
 #     task         Name of the task
 #     script       Script to be executed
 # Result:
 #     A list of the task name and whatever the result of the script
 # Note:
 #     A very simple implementation for now! No concurrent tasks
 #     possible
 #
 proc ::DCServer::IndependentTask {client task script} {
     ImportProcs
     ImportData $task

     set result [uplevel #0 $script]
     puts "IndependentTask: $result"
     return [list $client $task $result]
 }

 # SyncTask --
 #     Starts a task that requires synchronisation with others
 # Arguments:
 #     client       ID of the client process
 #     task         Name of the task
 #     script       Script to be executed
 # Result:
 #     A list of the task name and whatever the result of the script
 # Note:
 #     A very simple implementation for now! No concurrent tasks
 #     possible
 #
 proc ::DCServer::SyncTask {client task script} {
     variable sync_tasks

     lappend sync_tasks $task
     set result [uplevel #0 $script]
     puts "SyncTask: $result"
     return [list $client $task $result]
 }

 # GetTaskResult --
 #     Capture the result of the task and send it back
 # Arguments:
 #     args         List of key/value pairs
 # Result:
 #     None
 # Note:
 #     Simple implementation, no error checking
 #
 proc ::DCServer::GetTaskResult {args} {
     array set data $args

     puts "Server: $args"

     foreach {client task result} $data(-result) {break}
     ::comm::comm send -async $client ::DCClient::GetTaskResult \
         $data(-id) $task $data(-code) $result
 }

 # GetFinishedTasks --
 #     Return a list of finished tasks
 # Arguments:
 #     None
 # Result:
 #     List of finished tasks or an empty string if there are none
 #
 proc ::DCServer::GetFinishedTasks {} {
     variable finished_tasks

     set result $finished_tasks
     set finished_tasks {}

     return $result
 }

 # SetPoolSize --
 #     Set the pool size (before any task has been set)
 # Arguments:
 #     size         Number of worker interpreters
 # Result:
 #     None
 # Note:
 #     The setting takes effect when the first independent
 #     task is started - these tasks use the pool
 #
 proc ::DCServer::SetPoolSize {size} {
     variable poolsize

     set poolsize $size
 }

 # ReceiveExportedProcs --
 #     Receive the exported procedures and store the code for later use
 # Arguments:
 #     code         List of procedure names, argument lists and bodies
 # Result:
 #     None
 # Note:
 #     The procedures are passed on to the handlers with the _first_
 #     task that is started
 #
 proc ::DCServer::ReceiveExportedProcs {code} {
     variable exported_procs

     set exported_procs $code
 }

 # ReceiveExportedData --
 #     Receive the exported data and store the values for later use
 # Arguments:
 #     task         Task associated with the data
 #     data         List of variable names and values
 # Result:
 #     None
 # Note:
 #     The values are passed on to the handler when the associated
 #     task is started
 #
 proc ::DCServer::ReceiveExportedData {task data} {
     variable exported_data

     set exported_data($task) $data
 }

 # ImportData --
 #     Use the transferred data to set one or more variables in the
 #     handler
 # Arguments:
 #     task         Task for which to set the data
 # Result:
 #     None
 # Note:
 #     This will be more complex! We will need to select the handler/interpreter
 #
 proc ::DCServer::ImportData {task} {
     variable exported_data

     if { [info exists exported_data($task)] } {
         foreach {var value} $exported_data($task) {
             uplevel #0 [list set $var $value]
         }
     }
 }

 # ImportProcs --
 #     Use the transferred code to define one or more procedures in the
 #     handler
 # Arguments:
 #     None
 # Result:
 #     None
 # Note:
 #     This will be more complex! The same code for each handler
 #
 proc ::DCServer::ImportProcs {} {
     variable exported_procs

     foreach {proc arglist body} $exported_procs {
         uplevel #0 [list proc $proc $arglist $body]
     }
 }

Category Interprocess Communication