Version 7 of Dataflow programming

Updated 2011-02-13 14:34:43 by RLE

Sarnold Dataflow programming is a programming paradigm where one defines relations between variables, and each new value for a variable make the variables depending on them being automatically recomputed, based on functional dependencies between them. [L1 ]

A special case of dataflow programming is spreadsheets, another is functional reactive programming. LISP has since some time a framework for dataflow programming called Cells, while Python fans can have a few other dataflow frameworks. Among these is Python Trellis, a dataflow framework with very interesting approach to event-driven programming.

Here is my modest approach trying to emulate this functionality in pure Tcl.


Documentation

observe::var varname ?initial-value?
observe::rule varname body
observe::action body
observe::atomically body
observe::filter varname newvalname body ?initial-value?

observe::var declares a namespace variable named varname with an optional initial value.
When this variable is used inside a rule or an action, each update (write) to the variable would
raise the action.

observe::rule defines a namespace variable named varname whose value is computed
by a Tcl code body evaluated inside the same namespace.
The observe package finds each variable command in body to register the dependencies
between the variable defined by observe::var and observe::rule.

observe::action defines a code body which will be evaluated each time a variable dependency
has changed, including rules.

Note: rules compute values in a non-deterministic way, so they should be side-effect and IO-operations free.
In fact, a rule variable is computed (currently) each time the variable is read, by a trace on the actual variable.

observe::atomically delays update traces (actions) until the code body is evaluated entirely.
If the body modifies first variable A, and then variable B, an action depending on both variables
would be triggered after the body is finished.

observe::filter creates a namespace variable with a filter on write accesses. When the variable 'varname' is set,
the code within 'body' is invoked within the same namespace, with the new value as 'newvalname' and the previous
value of 'varname' as 'this'. The return value of this body is assigned to 'varname'.
For example the following code implements a stack:
    observe::filter stack newval {lappend this $newval}
    foreach x {1 2 3 4 5} {set stack $x}
    puts $stack ; # should print "1 2 3 4 5"
An initial-value may be provided, otherwise the variable will be set to the empty string at its creation.

Source

package require Tcl 8.5

namespace eval observe {
        # to create the array
        set observers(dummy) 1
        set actions(dummy) 1
        set rule(dummy) 1
        set locks(dummy) 0
        
        # this is an emulation of atomic computation
        # to do things right we would have to use STM
        # and we do not, of course!
        proc atomically body {
                set ns [uplevel 1 namespace current]
                variable locks
                set locks($ns,atomic) 1
                uplevel 1 [list eval $body]
                set locks($ns,atomic) 0
                set deps ""
                foreach name $locks($ns,list) {
                        lappend deps {*}[deepseek $ns $name]
                }
                set locks($ns,list) ""
                compdeps [lsort -unique $deps] $ns
        }
                
        proc _var {ns name args} {
                # create the variable
                uplevel 1 variable $name
                assert {[llength $args] < 2}
                if {[llength $args]==1} {
                        uplevel 1 set $name $args
                }
                # register as no-op
                variable observers
                set observers($ns,$name) ""
        }
        
        proc deepseek {ns name} {
                # performs a deep seek on dependencies, retaining only actions
                variable observers
                set res ""
                set l $observers($ns,$name)
                foreach n $l {
                        lassign $n type name
                        if {$type eq "fwd"} {
                                lappend res {*}[deepseek $ns $name]
                        } else {
                                lappend res [list $type $name]
                        }
                }
                set res
        }
        
        proc clear {ns args} {
                variable locks
                unset locks($ns)
        }
        
        proc write {ns name args} {
                variable locks
                if {![info exists locks($ns)]} {
                        set locks($ns) 0
                        set locks($ns,atomic) 0
                        set locks($ns,list) ""
                        trace add variable ${ns}::$name unset [list clear $ns]
                }
                if {$locks($ns,atomic)} {
                        lappend locks($ns,list) $name
                        return
                }
                incr locks($ns)
                if {$locks($ns) > 1} {
                        incr locks($ns) -1
                        # abort the current computation
                        return
                }
                compdeps [lsort -unique [deepseek $ns $name]] $ns
                incr locks($ns) -1; # maybe locks are bugged ?
        }
        
        # given a list of dependencies, computes all actions
        proc compdeps {dependencies ns} {
                variable actions
                foreach dep $dependencies {
                        # evaluates all actions
                        lassign $dep type id
                        if {$type eq "act"} {
                                namespace eval $ns $actions($ns,$id)
                        }
                }
        }
        
                
        
        proc filter {name newval body {def ""}} {
                set ns [uplevel 1 namespace current]
                # create the variable
                foreach v [list $name __read$name __write$name] {catch {unset ${ns}::$v}}
                uplevel 1 [list ::observe::_var $ns $name $def]
                uplevel 1 [list variable __write$name]
                uplevel 1 [list variable __read$name $def]
                                
                
                # rules are computed on-demand and read-only
                trace add variable ${ns}::$name write [list ::observe::modify $ns $name]
                trace add variable ${ns}::__write$name write [list ::observe::_modify $ns \
                        [string map [list %newval $newval %ns $ns %name $name %body $body] {
                                variable __write%name
                                variable __read%name
                                set this $__read%name
                                set %newval $__write%name
                                set __read%name [eval {%body}]
                                }]]
                trace add variable ${ns}::$name read [list ::observe::_filter $ns $name]
        }
        
        # gets the cached value
        proc _filter {ns name args} {
                set ${ns}::$name [set ${ns}::__read$name]
        } 
                        
        # modify a filtered value
        proc modify {ns name args} {
                set ${ns}::__write$name [set ${ns}::$name]
                write $ns $name
        }
        
        proc _modify {ns body args} {
                namespace eval $ns $body
        }
        
        proc reset {name {val ""}} {
                set ns [namespace qualifiers $name]
                set name [namespace tail $name]
                set ${ns}::__read$name $val
                set ${ns}::$name
        }
                
        
        # the K combinator
        proc K {a b} {set a}
        
        # defines an observed variable
        proc var {name args} {
                set ns [uplevel 1 namespace current]
                # create the variable
                uplevel 1 ::observe::_var $ns $name {*}$args
                # set the trace
                trace add variable ${ns}::$name write [list ::observe::write $ns $name]
                
        }
        
        proc raise {ns varname msg args} {
                error "$msg: ${ns}::$varname"
        }
        
        proc rule {name body} {
                set ns [uplevel 1 namespace current]
                # create the variable
                uplevel 1 [list ::observe::_var $ns $name [uplevel 1 $body]]
                                
                variable observers
                foreach var [getvarrefs $body] {
                        # dependencies
                        assert {[info exists observers($ns,$var)]} "cannot observe variable '${ns}::$name' which does not exist"
                        lappend observers($ns,$var) [list fwd $name]
                }
                # rules are computed on-demand and read-only
                trace add variable ${ns}::$name write [list ::observe::raise $ns $name "cannot write to rule"]
                trace add variable ${ns}::$name read [list ::observe::_rule $ns $name $body]
        }
        
        proc _rule {ns name body args} {
                set ${ns}::$name [namespace eval $ns $body]
        }
                
        proc getvarrefs body {
                set res ""
                foreach cmd [cmdSplit $body] {
                        set cmd [wordSplit $cmd]
                        switch -- [lindex $cmd 0] {
                                variable - ::variable {
                                        set name [lindex $cmd 1]
                                        # only watch current namespace's variables
                                        if {[regexp {^[a-zA-Z_0-9]+$} $name]} {lappend res $name}
                                }
                        }
                }
                set res
        }
        
        proc action body {
                set ns [uplevel 1 namespace current]
                variable observers
                variable actions
                incr actions($ns)
                set actions($ns,$actions($ns)) $body                
                foreach var [getvarrefs $body] {
                        assert {[info exists observers($ns,$var)]} "cannot observe variable '${ns}::$var' which does not exist"
                        lappend observers($ns,$var) [list act $actions($ns)] 
                }
        }
        
        # the assertion common procedure
        proc assert {cond {msg "assertion failed"}} {
                if {![uplevel 1 [list expr $cond]]} {error $msg}
        }
        
        # from the Tcler's wiki cmdSplit page
         proc cmdSplit {body} {
            set commands {}
            set chunk ""
            foreach line [split $body "\n"] {
                append chunk $line
                if {[info complete "$chunk\n"]} {
                    # $chunk ends in a complete Tcl command, and none of the
                    # newlines within it end a complete Tcl command.  If there
                    # are multiple Tcl commands in $chunk, they must be
                    # separated by semi-colons.
                    set cmd ""
                    foreach part [split $chunk ";"] {
                        append cmd $part
                        if {[info complete "$cmd\n"]} {
                            set cmd [string trimleft $cmd]
                            # Drop empty commands and comments
                            if {![string match {} $cmd] \
                                    && ![string match \#* $cmd]} {
                                lappend commands $cmd
                            }
                            if {[string match \#* $cmd]} {
                                set cmd "\#;"
                            } else {
                                set cmd ""
                            }
                        } else {
                            # No complete command yet.
                            # Replace semicolon and continue
                            append cmd ";"
                        }
                    }
                    set chunk ""
                } else {
                    # No end of command yet.  Put the newline back and continue
                    append chunk "\n"
                }
            }
             if {![string match {} [string trimright $chunk]]} {
                return -code error "Can't parse body into a\
                        sequence of commands.\n\tIncomplete\
                        command:\n-----\n$chunk\n-----"
            }
            return $commands
         }


        # from http://wiki.tcl.tk/cmdSplit (written by S. Arnold)
        proc wordSplit {command} {
        if {![info complete $command]} {error "non complete command"}
        set res ""; # the list of words
        set chunk ""
        foreach word [split $command " "] {
            # testing each word until the word being tested makes the
            # command up to it complete
                        # example:
                        # set "a b"
                        # set -> complete, 1 word
                        # set "a -> not complete
                        # set "a b" -> complete, 2 words
                        append chunk $word
                        if {[info complete "$res $chunk"]} {
                                        lappend res $chunk
                                        set chunk ""
                        } else {
                                        append chunk " "
                        }
                }
        set res
        }
}



# testing
interp alias "" s "" source observe.tcl

proc test {} {
        catch {namespace delete obs}
        namespace eval obs {
                observe::var x 0.0
                observe::var y 0.0
                observe::rule hypot {
                        variable x
                        variable y
                        expr {sqrt($x*$x+$y*$y)}
                }
                proc test {_x _y} {
                        observe::atomically {
                                variable x
                                variable y
                                set x $_x
                                set y $_y
                        }
                }
                observe::action {
                        variable x
                        variable y
                        variable hypot
                        variable distances
                        puts "Hypot $x $y : $hypot"
                }
                
        }
        obs::test 4 3
}

proc test2 {} {
        catch {namespace delete obs2}
        namespace eval obs2 {
                observe::filter stack newval {
                        lappend this $newval
                }
                proc act {list} {
                        variable stack
                        foreach x $list {set stack $x}
                        puts $stack
                }
        }
        obs2::act {1 2 3 4}
}