Dataflow programming

Difference between version 15 and 16 - Previous - Next
'''[http://en.wikipedia.org/wiki/Dataflow_programming%|%Dataflow
programming]''' is a programming paradigm in which program instructions are
organized into '''steps''', and the next step of a program is determined by
the outputs of the previous step, which become the inputs to that next step.



** See Also **

   [flow-based programming]:   like dataflow programming, but articulates the connections as channels and the inputs/outputs as messages 

   [functional programming]:   in which '''monads''' are essentially flow-based mechanisms for interacting with the stateful world

   [puredata]:   an open-source dataflow programming environment

   [streams]:   [NEM]: Functional reactive programming/dataflow also come to mind


   [TFP], by [George Peter Staplin]:   '''T'''cl '''F'''low-based '''P'''rogramming

   [BWise], by [TV%|%Theo Verelst]:   

   [http://hal.archives-ouvertes.fr/docs/00/07/38/23/PDF/RR-2868.pdf%|%Reactive Scripts%|%] project:   uses Reactive-C to add reactive functions to a Tcl interpreter, offering the possibility to use reactive programming in an interpretative context.

   [TclProp]:   

   [ycl%|%ycl let]:   Associates a variable with the variables it depends on and a routine that is called with the input variable values to update the value of the variable.  This single routine can be used to construct a lazy dataflow program, i.e., a program that only performs the work needed to update a variable when the value of that variable is actually needed. 


** Dataflow Programs **

   [make]:   A ubiquitous dataflow program.

   [spreadsheet%|%spreadsheet]:   A general interactive table-based customizable dataflow program.


-----
http://www-sop.inria.fr/members/Charles.Andre/SyncCharts/%|%SyncCharts%|% is a graphical program written in Tcl/Tk that allows the specification of reactive behavior, as well as the synchronous programming of applications.

SyncCharts adhere to the "Synchronous Paradigm". They support:

   *     Hierarchy: nested macrostates
   *     Concurrency: orthogonal constellations (kinds of state graphs)
   *     Communication: instantaneous broadcasting of signals
   *     Preemption: abortion and suspension


Any syncChart can be automatically translated into an http://en.wikipedia.org/wiki/Esterel%|%Esterel%|% program, so that, the user can take advantage of the software environment developed for synchronous programming. 

[http://www-sop.inria.fr/members/Charles.Andre/SyncCharts/panel.gif]
-----

** Reference **

   [http://en.wikipedia.org/wiki/Dataflow_programming%|%Dataflow Programming], Wikipedia:   

   [http://expressionflow.com/2013/11/17/dataflow-and-flow-based-programming-two-approaches-to-visual-programming/%|%Dataflow and Flow-Based Programming – Two Approaches To Visual Programming], Tomi Maila, 2013-11-17:   

   [http://dataflowbook.com/%|%Dataflow & Reactive Programming Systems], Matt Carkci, 2013:   



** Non-Tcl Tools **

   [http://www.clarity-support.co.uk/products/clarity/%|%Clarity Programming Language]:   

   [http://noflojs.org/%|%NoFlo]:   visual control flows for Javascript

   [http://www-sop.inria.fr/meije/rp/ReactiveC/%|%Reactive-C]:   a [C]-based language for reactive programming. The idea is to allow a programming style close to C, in which program behaviors are defined in terms of reactions to activations. Reactive-C is a preprocessor for reactive programming in C. Reactive-C is used as a "reactive assembly language" to implement several reactive formalisms.


   [http://en.wikipedia.org/wiki/SIGNAL_%28programming_language%29%|%Signal]:   a programming language based on '''synchronized''' dataflow



** Description **

In dataflow programming, programs are represented as graphs where the
nodes are units of computation and the edges are the data connection between
the units. The data connections in large part supplant the use of variables in
other programming paradigms, which localizes the inter-operational state of
the program into the state of the connections. These connections are known as '''monads''' in [functional programming] languages.

A dataflow system is constrained to wait for the entire group of outputs to
arrive so that it knows what pattern to match against in determining the next
step. This is called '''synchronous dataflow'''. When the steps are
pre-connected, the pattern matching becomes unnecessary, and each individual
output can be passed along to the next step as it becomes available.  This is
called '''asynchronous dataflow''', and is the strategy adopted in [flow-based
programming]. 

One way to implement dataflow programming in Tcl is to use `[trace]` to
recompute the value of a variable when values it depends on change.
`[spreadsheet%|%spreadsheets]` are an example of dataflow programming.  Another is functional reactive programming.
[LISP] has for some time had a framework for dataflow programming called [http://common-lisp.net/project/cells/%|%Cells], and there are several dataflow frameworks for [Python], including these is [https://pypi.python.org/pypi/Trellis%|%Trellis], with very interesting dataflow approach
to event-driven programming.

[TV]: From ´90 or so there was (early open source) ´Khoros´ flow based programming for image processing and other things which was good, I think it has commercial and the original seems not
to be around on the web anymore (IIRC there was a university licence to sign). Also there was APE.

Later that was followed by the still popular (commercial) AVS.

Probably digital circuit design (from loooong before VHDL) and its numerous tools is one of the earliest applications of FBP.



** A System by [Sarnold] **

[Sarnold]:

Here is my modest approach trying to emulate this functionality in pure Tcl.

----

'''Documentation'''

======
observe::var varname ?initial-value?
observe::rule varname body
observe::action body
observe::atomically body
observe::filter varname newvalname body ?initial-value?
======

`observe::var` declares a namespace variable named varname with an optional initial value.
When this variable is used inside a rule or an action, each update (write) to the variable would
raise the action.

`observe::rule` defines a namespace variable named varname whose value is computed
by a Tcl code body evaluated inside the same namespace.
The '''observe''' package finds each variable command in body to register the dependencies
between the variable defined by `observe::var` and `observe::rule`.

`observe::action` defines a code body which will be evaluated each time a variable dependency
has changed, including rules.

'''Note''': rules compute values in a non-deterministic way, so they should be free of side-effects, including IO-operations, or at least the program should not be critically sensitive to them. 
In fact, a rule variable is computed (currently) by a trace on the actual variable each time the variable is read.

`observe::atomically` delays update traces (actions) until the code body is evaluated entirely.
If the body first modifies ''$A'', and then ''$B'', an action depending on both variables
would be triggered after the body is finished.

`observe::filter` creates a namespace variable with a filter on write accesses. When the variable 'varname' is set,
the code within 'body' is invoked within the same namespace, with the new value as 'newvalname' and the previous
value of 'varname' as 'this'. The return value of this body is assigned to 'varname'.
For example the following code implements a stack:

======
observe::filter stack newval {lappend this $newval}
foreach x {1 2 3 4 5} {set stack $x}
puts $stack ; # should print "1 2 3 4 5"
======

An initial value may be provided, otherwise the variable will be set to the empty string at its creation.

----

'''Source'''

======
package require Tcl 8.5

namespace eval observe {
    # to create the array
    set observers(dummy) 1
    set actions(dummy) 1
    set rule(dummy) 1
    set locks(dummy) 0
    
    # this is an emulation of atomic computation
    # to do things right we would have to use STM
    # and we do not, of course!
    proc atomically body {
        set ns [uplevel 1 namespace current]
        variable locks
        set locks($ns,atomic) 1
        uplevel 1 [list eval $body]
        set locks($ns,atomic) 0
        set deps {} 
        foreach name $locks($ns,list) {
            lappend deps {*}[deepseek $ns $name]
        }
        set locks($ns,list) {} 
        compdeps [lsort -unique $deps] $ns
    }
        
    proc _var {ns name args} {
        # create the variable
        uplevel 1 variable $name
        assert {[llength $args] < 2}
        if {[llength $args] == 1} {
            uplevel 1 set $name $args
        }
        # register as no-op
        variable observers
        set observers($ns,$name) {} 
    }
    
    proc deepseek {ns name} {
        # performs a deep seek on dependencies, retaining only actions
        variable observers
        set res {} 
        set l $observers($ns,$name)
        foreach n $l {
            lassign $n type name
            if {$type eq {fwd}} {
                lappend res {*}[deepseek $ns $name]
            } else {
                lappend res [list $type $name]
            }
        }
        set res
    }
    
    proc clear {ns args} {
        variable locks
        unset locks($ns)
    }
    
    proc write {ns name args} {
        variable locks
        if {![info exists locks($ns)]} {
            set locks($ns) 0
            set locks($ns,atomic) 0
            set locks($ns,list) {} 
            trace add variable ${ns}::$name unset [list clear $ns]
        }
        if {$locks($ns,atomic)} {
            lappend locks($ns,list) $name
            return
        }
        incr locks($ns)
        if {$locks($ns) > 1} {
            incr locks($ns) -1
            # abort the current computation
            return
        }
        compdeps [lsort -unique [deepseek $ns $name]] $ns
        incr locks($ns) -1; # maybe locks are bugged ?
    }
    
    # given a list of dependencies, computes all actions
    proc compdeps {dependencies ns} {
        variable actions
        foreach dep $dependencies {
            # evaluates all actions
            lassign $dep type id
            if {$type eq {act}} {
                namespace eval $ns $actions($ns,$id)
            }
        }
    }
    
    proc filter {name newval body {def {}}} {
        set ns [uplevel 1 namespace current]
        # create the variable
        foreach v [list $name __read$name __write$name] {
                        catch {unset ${ns}::$v}
                }
        uplevel 1 [list ::observe::_var $ns $name $def]
        uplevel 1 [list variable __write$name]
        uplevel 1 [list variable __read$name $def]
                
        
        # rules are computed on-demand and read-only
        trace add variable ${ns}::$name write [
                        list ::observe::modify $ns $name]
        trace add variable ${ns}::__write$name write [
                        list ::observe::_modify $ns [
                                string map [
                                list %newval $newval %ns $ns %name $name %body $body] {

                variable __write%name
                variable __read%name
                set this $__read%name
                set %newval $__write%name
                set __read%name [eval {%body}]
                }]]
        trace add variable ${ns}::$name read [
                        list ::observe::_filter $ns $name]
    }
    
    # gets the cached value
    proc _filter {ns name args} {
        set ${ns}::$name [set ${ns}::__read$name]
    } 
            
    # modify a filtered value
    proc modify {ns name args} {
        set ${ns}::__write$name [set ${ns}::$name]
        write $ns $name
    }
    
    proc _modify {ns body args} {
        namespace eval $ns $body
    }
    
    proc reset {name {val {}}} {
        set ns [namespace qualifiers $name]
        set name [namespace tail $name]
        set ${ns}::__read$name $val
        set ${ns}::$name
    }
        
    # the K combinator
    proc K {a b} {set a}
    
    # defines an observed variable
    proc var {name args} {
        set ns [uplevel 1 namespace current]
        # create the variable
        uplevel 1 ::observe::_var $ns $name {*}$args
        # set the trace
        trace add variable ${ns}::$name write [list ::observe::write $ns $name]
        
    }
    
    proc raise {ns varname msg args} {
        error "$msg: ${ns}::$varname"
    }
    
    proc rule {name body} {
        set ns [uplevel 1 namespace current]
        # create the variable
        uplevel 1 [list ::observe::_var $ns $name [uplevel 1 $body]]
                
        variable observers
        foreach var [getvarrefs $body] {
            # dependencies
            assert {[info exists observers($ns,$var)]} \
                                "cannot observe variable '${ns}::$name' which does not exist"
            lappend observers($ns,$var) [list fwd $name]
        }
        # rules are computed on-demand and read-only
        trace add variable ${ns}::$name write [
                        list ::observe::raise $ns $name "cannot write to rule"]
        trace add variable ${ns}::$name read [
                        list ::observe::_rule $ns $name $body]
    }
    
    proc _rule {ns name body args} {
        set ${ns}::$name [namespace eval $ns $body]
    }
        
    proc getvarrefs body {
        set res {} 
        foreach cmd [scriptSplit $body] {
            set cmd [cmdSplit $cmd]
            switch -- [lindex $cmd 0] {
                variable - ::variable {
                    set name [lindex $cmd 1]
                    # only watch current namespace's variables
                    if {[regexp {^[a-zA-Z_0-9]+$} $name]} {lappend res $name}
                }
            }
        }
        set res
    }
    
    proc action body {
        set ns [uplevel 1 namespace current]
        variable observers
        variable actions
        incr actions($ns)
        set actions($ns,$actions($ns)) $body        
        foreach var [getvarrefs $body] {
            assert {[info exists observers($ns,$var)]} \
                        "cannot observe variable '${ns}::$var' which does not exist"

            lappend observers($ns,$var) [list act $actions($ns)] 
        }
    }
    
    # the assertion common procedure
    proc assert {cond {msg {assertion failed}}} {
        if {![uplevel 1 [list expr $cond]]} {error $msg}
    }
    
    # from the Tcler's wiki cmdSplit page
     proc scriptSplit {body} {
        set commands {}
        set chunk {} 
        foreach line [split $body \n] {
        append chunk $line
        if {[info complete $chunk\n]} {
            # $chunk ends in a complete Tcl command, and none of the
            # newlines within it end a complete Tcl command.  If there
            # are multiple Tcl commands in $chunk, they must be
            # separated by semi-colons.
            set cmd {} 
            foreach part [split $chunk \;] {
            append cmd $part
            if {[info complete $cmd\n]} {
                set cmd [string trimleft $cmd]
                # Drop empty commands and comments
                if {$cmd ne {} && ![string match #* $cmd]} {
                    lappend commands $cmd
                }
                if {[string match \#* $cmd]} {
                    set cmd #\;
                } else {
                    set cmd {} 
                }
            } else {
                # No complete command yet.
                # Replace semicolon and continue
                append cmd \;
            }
            }
            set chunk {} 
        } else {
            # No end of command yet.  Put the newline back and continue
            append chunk \n
        }
        }
         if {[string trimright $chunk] ne {}} {
        return -code error "Can't parse body into a\
            sequence of commands.\n\tIncomplete\
            command:\n-----\n$chunk\n-----"
        }
        return $commands
     }


    # from http://wiki.tcl.tk/cmdSplit (by S. Arnold et al)
    # pooryorick 2014-10-07: replaced with cmdSplit2, 2014-10-07, pooryorick

    proc cmdSplit cmd {
        if {![info complete $cmd]} {
            error [list {not a complete command} $cmd]
        }
        set words {}
        set logical {}
        set cmd [string trimleft $cmd[set cmd {}]]
        while {[regexp {([^\s]*)(\s+)(.*)} $cmd full first delim last]} {
            append logical $first
            if {[info complete $logical\n]} {
                lappend words $logical
                set logical {}
            } else {
                append logical $delim
            }
            set cmd $last[set last {}]
        }
        if {$cmd ne {}} {
            append logical $cmd
        }
        if {$logical ne {}} {
            lappend words $logical 
        }
        return $words
    }
}


# testing
interp alias {} s {} source observe.tcl

proc test {} {
    catch {namespace delete obs}
    namespace eval obs {
        observe::var x 0.0
        observe::var y 0.0
        observe::rule hypot {
            variable x
            variable y
            expr {sqrt($x*$x+$y*$y)}
        }
        proc test {_x _y} {
            observe::atomically {
                variable x
                variable y
                set x $_x
                set y $_y
            }
        }
        observe::action {
            variable x
            variable y
            variable hypot
            variable distances
            puts "Hypot $x $y : $hypot"
        }
        
    }
    obs::test 4 3
}

proc test2 {} {
    catch {namespace delete obs2}
    namespace eval obs2 {
        observe::filter stack newval {
            lappend this $newval
        }
        proc act list {
            variable stack
            foreach x $list {set stack $x}
            puts $stack
        }
    }
    obs2::act {1 2 3 4}
}
======

<<categories>> Design | Functional Programming | Language