Distributed computation

Distributed Computation is a form of concurrent computing in which multiple programs, perhaps running on different processors, which may be remote, communicate to accomplish tasks.

Techniques

two threads run synchronously
parallel adding with tcl processes
A simple mechanical system

Coordinated Data

dQUOB
MMTI Message Passing Protocol and API
JBR (updated by escargo): An interface that is used to operate F5 instrumentation at the MMT Observatory . Very similar to Tequila above but operates on global variables. It supports publish, subscribe of values and commands. It includes synchronous, asynchronous and unacknowledged messages. Clients persistently reconnect to servers and reestablish thier subscription states automatically. It works very well and is used to tie dozens of hardware and software control functions at the MMT Observatory together.
Tequila
Uses traces, fileevents and sockets to create distributed arrays, i.e. arrays whose content is shared between several applications and where changes are automatically distributed to them all. It makes use of a central server. It does have the advantage that applications can be built using arrays for data storage, and separated into client/server components with only a few additional lines of code.
Tuplespace
A generic term for what was called at Yale a Lindaspace.

Distributing Work

Distributing a series of tasks
AM: another shot at interacting processes
Ants
Daerth
Threaded asynchronous backpressure-mediated pipelined compute stations.
Swift/T
Tcl implementations of publish-subscribe mechanisms
Tcl has a distinguished record of accomplishment in supporting architecutures based on the publish-subscribe pattern.
DisTcl - Distributed Programming Infrastructure for Tcl
This uses Redis as the communication medium between distributed clients and servers.
ControlHost
A publish-subscribe/message passing application from CERN dating from the mid-1990's, recently revived. C/C++ base with code to incorporate into a custom Tcl interpreter.
DRAMA
an environment for writing distributed real time systems. It provides a C language API, support libraries, development tools and user interface development tools. DRAMA provides a consistent look and feel to a networked system which can run on various diverse architectures. The DRAMA library can be loaded into a Tcl interpreter and DRAMA programs can thus be scripted in Tcl. A DRAMA system consists of one or more DRAMA tasks distributed as required across one or more machines possibly of significantly different architectures. DRAMA provides a message system to allow the tasks to communicate whilst hiding architecture specific details.
MX
a portable toolkit for writing data acquisition and control programs. Server-client architecture for accessing multiple device drivers. Tcl GUI and wrapper for library, plus pure Tcl client. Includes Mxplot, BLT-based replacement for plotgnu. It add x-y cursors, the ability to zoom, and a button for printing the plot.
TclGearman
A Tcl implementation of Gearman Client/Worker/Admin interface.
SHORE
The Scalable Heterogeneous Object REpository is an object database designed for high scalability on modern computer architectures, with both thread-parallel and multi-server distributed query features. Written in C++ with a Tcl loadable extension included.
cluster
a simple light-weight framework for providing inter-process and inter-machine discovery and communications.
LARD
the Language for Asynchronous Research and Development is a domain-specific language for parallel computation which includes a bytecode compiler and a bytecode interpreter embedded into a Tcl interpreter. Includes an IDE and debugger written in Tcl/Tk. Source code: [L1 ]

http://apt.cs.manchester.ac.uk/projects/tools/lard/tutorial/screenshots/bview3.gif

http://apt.cs.manchester.ac.uk/projects/tools/lard/tutorial/screenshots/blocksort1.gif

See Also

agent
Concepts of Architectural Design for Tcl Applications
grid computing
LYDIAN
A simulation and visualization environment for distributed algorithms.
OPNTcl
An object orientation interpreter for defining, checking and running Petri Nets
NSCL SpecTcl
MPI

Misc

AM 2006-10-10: I have been sketching a package that will do what I want with distributed computation:

  • make setting up such a computation as simple as possible from the client side.
  • make the server side as transparant as possible
  • first: whether threads are used or separate processes to gain concurrency is up to the server and what resources it has available)
  • second: whether there is a fixed pool of worker interpreters or not, depends on the kind tasks the client asks for (independent tasks versus tasks that require synchronisation)
  • third: if synchronisation is required, then all tasks to be synchronised will be waited upon

This is just meant to preserve my early-morning thoughts on the matter - scribbles on the back of a print-out may get lost.

Zarutian 2006-10-13T22:09: On last point: will the server handle that?

AM: Yes - you can start tasks as requiring synchronisation - see also my page on A simple mechanical system

AM 2006-10-12: Some further remarks: the packages Thread, comm and tie will be very helpful in the actual implementation.

Here is a sketch of one possible application - searching the web for interesting pages:

set server [connectServer "servername" -poolsize 10]
set keywords [list ...]
set interesting_urls {}
set urls [list ...] ;# Initial list

exportProcs $server [list getWebPage checkWebPage] ;# The procedures that do the actual work
registerHandler $server storeUrls ;# procedure that gathers the results in a convenient form

set count 0
while { 1 } {
    # Set the workers to work
    foreach u $urls {
        independentTask $server task$count [list \
            checkWebPage $u
        }
        incr count 
    }

    set tasks [waitForTasks $server] ;# Get a list of tasks that have finished, might be empty 

    foreach task $tasks {
        lappend urls $result($task) 
    }

    #
    # Collect the new URLs that were found 
    #
    ...

    #
    # To be added: administration and suitable stopping criteria
}

Well, this is just a sketch - most important aspects:

  • Scheduling tasks
  • Waiting for results
  • All details are taken care of

AM 2006-10-15

Ideas for an easy-to-use distributed computation facility

With all the packages in Tcllib and the core facilities it should be easy to set up a package that will allow anyone to do distributed computations, that is have several computers or several processes on a single computer work out the solution of a problem simultaneously, so that you get the answer faster or easier than if everything was done in a single program.

Here are two use cases:

  • Given an initial list of URLs, follow "all" the links contained therein that seem interesting, i.e. that refer to pages that contain information on some subject we are interested in. Obviously, we need to stop at some point, say after collecting 100 URLs or the 50th link level.
  • In a chemical factory the fabrication of some product is divided into several steps, each step is associated with its own tank. To model the processes in these tanks we can design programs that take care of the details in each tank. Raw materials and the intermediate products are transferred to the various tanks. If each tank is represented via a separate computational program, we need some sort of interaction and coordination between them.

This has led me to consider the following set of procedures:

  • connectServer "server" ?options?: connect to some (computational) server process, either on this machine or on some other computer.
  • independentTask "serverID" "taskname" "script": start a task (called "taskname" and implemented in "script") via this server. The task is run without any coordination with others
  • syncTask "serverID" "taskname" "script": start a task, similar to an independent task, but the results only become available when all other tasks that need synchronisation are also done
  • waitForTasks "serverID" "handler": wait for the tasks to finish and handle the results via the handler procedure.

There would have to be methods to transfer procedures and data to the server, like "exportProcs serverID list-of-procs", so that we can install the problem-specific code on the server. In fact, it should be possible to transfer complete compiled programs in that way.

The server process takes care of all the details:

  • Use threads or separate processes to carry out the actual computations
  • Provide persistency for the input and output data so that you can simply schedule a job, log off and collect the results the next day

The packages comm and Threads can be used to implement the communication between the client program and the server. The data persistency comes from tie or tequila.

(As suggested by Michael Schlenker, we can even use the [unknown] command to automatically get the procedures for us. And we can use a distributed VFS to exchange the data and provide persistency)

I have created a very first version of what I have in mind - see below.

It is very preliminary:

  • Connecting to a server is currently very clumsy - the client code starts a server on the local machine. I do not know how to make the [comm] package pick a particular port, that would be much easier.

schlenk comm config -port does what you want..., or comm new with the appropriate options.

  • There are no threads or multiple interpreters yet, so there is only one actual task that can be run simultaneously.

I offer this code not because it is useful, but to get the discussion going.


Client side:

# dcclient.tcl
#     Package for distributed computation - the client side
#
#     Note:
#     The package depends on several others:
#     comm     - for general remote/inter-process communication
#     Threads  - for multithreading
#     dcserver - for local servicing
#
#     prepareWorker
#     exportVariables
#     importVariables
#     exportFile
#     importFile
#
package require comm
#package require DCServer
source dcserver.tcl

namespace eval ::DCClient {
    variable self 0
    variable channel
    variable waiting
    variable finished_tasks
    variable result_data

    namespace export connectServer independentTask syncTask
    namespace export waitForTasks exportProcs exportData

    #
    # Initialise the comm package, get rid of the double eval
    # semantics now
    #
    ::comm::comm config -local 0
    ::comm::comm hook eval {return [uplevel #0 $buffer]}
}

# connectServer --
#     Connect to the server process
# Arguments:
#     server     Name/ID of the server
#     args       List of options
# Result:
#     ID of the server or error if failed
# Notes:
#     Two server names are special:
#     self      - Use this process as the server
#     localhost - Starts a second process that acts as a computation server
#
proc ::DCClient::connectServer {server args} {
    variable channel

    switch -- $server {
        "self" {
            set self 1
            set id [StartServer $self]
        }
        "localhost" {
            set self 0
            set id [StartServer $self]
        }
        default {
            #
            # TODO: finding out the remote server is a problem!
            #
            ::comm::comm config -local 0
            set id $server
            set channel($id) [::comm::comm connect $server]
        }
    }

    if { [llength $args] > 0 } {
        if { [lindex $args 0] == "-poolsize" } {
            SetPoolSize $id [lindex $args 1]
        }
    }
    return $id
}

# independentTask --
#     Start a task that does not depend on others (no synchronisation needed)
# Arguments:
#     id         ID of the server
#     task       Name of the task
#     script     Script to be run in the worker interpreter
# Result:
#     None
#
proc ::DCClient::independentTask {id task script} {
    variable self

    if { $self } {
        ::DCServer::IndependentTask $task $script
    } else {
        set count 0
        set success 0
        while { $count < 10 } {
            if { [catch {
                ::comm::comm send -command ::DCServer::GetTaskResult $id \
                    ::DCServer::IndependentTask [::comm::comm self] $task $script
            } msg] } {
                after 1000
                incr count
            } else {
                set success 1
                break
            }
        }
        if { ! $success } {
            return -code error $msg
        }
    }
}

# syncTask --
#     Start a task that requires synchronisation with others
# Arguments:
#     id         ID of the server
#     task       Name of the task
#     script     Script to be run in the worker interpreter
# Result:
#     None
#
proc ::DCClient::syncTask {id task script} {
    variable self

    if { $self } {
        ::DCServer::SyncTask $task $script
    } else {
        ::comm::comm send -command ::DCServer::GetTaskResult $id \
            ::DCServer::SyncTask [::comm::comm self] $task $script
    }
}

# exportProcs --
#     Export one or more procedures to the server
# Arguments:
#     id         ID of the server
#     procs      List of procedures to be sent
# Result:
#     None
# Note:
#     No provision for namespaces yet
#
proc ::DCClient::exportProcs {id procs} {
    variable self

    set code {}
    foreach p $procs {
        lappend code $p [info args $p] [info body $p]
    }

    if { $self } {
        ::DCServer::ReceiveExportedProcs $code
    } else {
        ::comm::comm send -async $id \
            ::DCServer::ReceiveExportedProcs $code
    }
}

# exportData --
#     Export data for a particular task on the server
# Arguments:
#     id         ID of the server
#     task       Task name
#     values     List of variable name/value pairs
# Result:
#     None
# Note:
#     All variables defined in the global namespace at the moment
#
proc ::DCClient::exportData {id task values} {
    variable self

    if { $self } {
        ::DCServer::ReceiveExportedData $task $values
    } else {
        ::comm::comm send -async $id \
            ::DCServer::ReceiveExportedData $task $values
    }
}

# StartServer --
#     Start a local server or use this process as the server
# Arguments:
#     self       Whether to use this process or another one
# Result:
#     None
#
proc ::DCClient::StartServer {self} {

    if { $self } {
        ... use the threads package
    } else {
        file delete "_server_.id"
        set outfile [open "_server_.tcl" w]
        puts $outfile "#package require DCServer"
        puts $outfile "source dcserver.tcl"
        puts $outfile "::DCServer::InitServer"
        puts $outfile "vwait done"
        close $outfile
        exec [info nameofexecutable] _server_.tcl &
        while {![file exists _server_.id] } {
            after 1000
        }
        after 2000
        set infile [open "_server_.id"]
        set id [gets $infile]
        close $infile
        file delete "_server_.id"
        file delete "_server_.tcl"
    }
    return $id
}

# GetTaskResult --
#     Get the result of some task
# Arguments:
#     server     Which server
#     task       Name of the task
#     code       Return code
#     result     Result string of the task
# Result:
#     None
# Side effects:
#     Sets the variable finished_tasks and the array result_data
# Note:
#     This assumes that each _independent_ task has a unique
#     name among the active or scheduled tasks. Otherwise
#     the results will be overwritten
#
proc ::DCClient::GetTaskResult {server task code result} {
    variable finished_tasks
    variable result_data

    lappend finished_tasks $server $task
    set result_data($server,$task,code)   $code
    set result_data($server,$task,result) $result

    set ::DCClient::waiting 0
}

# waitForTasks --
#     Wait for one or more tasks to finish
# Arguments:
#     id         ID of the server (empty for all servers)
#     handler    Name of the procedure to be called to handle the results
# Result:
#     List of tasks that finished
#
proc ::DCClient::waitForTasks {id handler} {
    variable self
    variable waiting
    variable finished_tasks
    variable result_data

    #
    # Wait for any task to be finished.
    # This will work for the following reason:
    # - One or more tasks are scheduled in the server process
    # - When finished they schedule another task in the client process
    # - This gets handled as soon as we enter the event loop
    # Hm, what about Tk?
    #
    vwait ::DCClient::waiting

    #
    # Filter the list of tasks, update the list of finished tasks
    # and handle the results of the selected tasks
    #
    set new_list {}
    foreach {server task} $finished_tasks {
        if { $server eq $id || $id eq "" } {
            lappend tasks $server $task
        } else {
            lappend new_list $server $task
        }
    }
    set finished_tasks $new_list

    foreach {server task} $tasks {
        $handler $server $task $result_data($server,$task,code) \
            $result_data($server,$task,result)

        #
        # Clean up the results - to avoid memory leaks
        #
        unset result_data($server,$task,code)
        unset result_data($server,$task,result)
    }

    return $tasks
}

# main --
#     Test code
#
proc getResult {server task code result} {
   puts "Result: $server $task $code -- $result"
}
proc computeSquare {x} {
    return [expr {$x*$x}]
}

namespace import ::DCClient::*
set id [connectServer localhost]
puts "Connected: $id ..."
exportData $id task1 {v 3}
exportProcs $id computeSquare
independentTask $id task1 {computeSquare $v}
puts "Task started ..."
set tasks [waitForTasks $id getResult]
puts "Finished - $tasks"

---
erver side:

# dcserver.tcl
#     Package for distributed computation - the server side
#
#     Note:
#     The package depends on several others:
#     comm     - for general remote/inter-process communication
#     Threads  - for multithreading
#     dcserver - for local servicing
#
package require comm

namespace eval ::DCServer {
    variable self           0
    variable exported_procs {}

    #
    # Initialise the comm package, get rid of the double eval
    # semantics now
    #
    ::comm::comm config -local 0
    ::comm::comm hook eval {return [uplevel #0 $buffer]}
}

# InitServer --
#     Initialise the server process
# Arguments:
#     None
# Result:
#     None
# Side effects:
#     Writes the server's ID to a file "_server_.id"
#
proc ::DCServer::InitServer {} {

    set outfile [open "_server_.id" w]
    puts $outfile [::comm::comm self]
    close $outfile
}

# IndependentTask --
#     Starts an independent task
# Arguments:
#     client       ID of the client process
#     task         Name of the task
#     script       Script to be executed
# Result:
#     A list of the task name and whatever the result of the script
# Note:
#     A very simple implementation for now! No concurrent tasks
#     possible
#
proc ::DCServer::IndependentTask {client task script} {
    ImportProcs
    ImportData $task

    set result [uplevel #0 $script]
    puts "IndependentTask: $result"
    return [list $client $task $result]
}

# SyncTask --
#     Starts a task that requires synchronisation with others
# Arguments:
#     client       ID of the client process
#     task         Name of the task
#     script       Script to be executed
# Result:
#     A list of the task name and whatever the result of the script
# Note:
#     A very simple implementation for now! No concurrent tasks
#     possible
#
proc ::DCServer::SyncTask {client task script} {
    variable sync_tasks

    lappend sync_tasks $task
    set result [uplevel #0 $script]
    puts "SyncTask: $result"
    return [list $client $task $result]
}

# GetTaskResult --
#     Capture the result of the task and send it back
# Arguments:
#     args         List of key/value pairs
# Result:
#     None
# Note:
#     Simple implementation, no error checking
#
proc ::DCServer::GetTaskResult {args} {
    array set data $args

    puts "Server: $args"

    foreach {client task result} $data(-result) {break}
    ::comm::comm send -async $client ::DCClient::GetTaskResult \
        $data(-id) $task $data(-code) $result
}

# GetFinishedTasks --
#     Return a list of finished tasks
# Arguments:
#     None
# Result:
#     List of finished tasks or an empty string if there are none
#
proc ::DCServer::GetFinishedTasks {} {
    variable finished_tasks

    set result $finished_tasks
    set finished_tasks {}

    return $result
}

# SetPoolSize --
#     Set the pool size (before any task has been set)
# Arguments:
#     size         Number of worker interpreters
# Result:
#     None
# Note:
#     The setting takes effect when the first independent
#     task is started - these tasks use the pool
#
proc ::DCServer::SetPoolSize {size} {
    variable poolsize

    set poolsize $size
}

# ReceiveExportedProcs --
#     Receive the exported procedures and store the code for later use
# Arguments:
#     code         List of procedure names, argument lists and bodies
# Result:
#     None
# Note:
#     The procedures are passed on to the handlers with the _first_
#     task that is started
#
proc ::DCServer::ReceiveExportedProcs {code} {
    variable exported_procs

    set exported_procs $code
}

# ReceiveExportedData --
#     Receive the exported data and store the values for later use
# Arguments:
#     task         Task associated with the data
#     data         List of variable names and values
# Result:
#     None
# Note:
#     The values are passed on to the handler when the associated
#     task is started
#
proc ::DCServer::ReceiveExportedData {task data} {
    variable exported_data

    set exported_data($task) $data
}

# ImportData --
#     Use the transferred data to set one or more variables in the
#     handler
# Arguments:
#     task         Task for which to set the data
# Result:
#     None
# Note:
#     This will be more complex! We will need to select the handler/interpreter
#
proc ::DCServer::ImportData {task} {
    variable exported_data

    if { [info exists exported_data($task)] } {
        foreach {var value} $exported_data($task) {
            uplevel #0 [list set $var $value]
        }
    }
}

# ImportProcs --
#     Use the transferred code to define one or more procedures in the
#     handler
# Arguments:
#     None
# Result:
#     None
# Note:
#     This will be more complex! The same code for each handler
#
proc ::DCServer::ImportProcs {} {
    variable exported_procs

    foreach {proc arglist body} $exported_procs {
        uplevel #0 [list proc $proc $arglist $body]
    }
}