Version 1 of Dataflow programming

Updated 2009-10-04 18:24:38 by sarnold

Sarnold Dataflow programming is a programming paradigm where one defines relations between variables, which values are updated automatically.

A special case of dataflow programming is spreadsheets, another is functional reactive programming. LISP has since some time a framework for dataflow programming called Cells, while Python fans can have a few other dataflow frameworks. Among these is Python Trellis, a dataflow framework with very interesting approach to event-driven programming.

Here is my modest approach trying to emulate this functionality in pure Tcl.

observe::var varname ?initial-value?
observe::rule varname body
observe::action body

observe::var declares a namespace variable named varname with an optional initial value.
When this variable is used inside a rule or an action, each update (write) to the variable would
raise the action.

observe::rule defines a namespace variable named varname whose value is computed
by a Tcl code body evaluated inside the same namespace.
The observe package finds each variable command in body to register the dependencies
between the variable defined by observe::var and observe::rule.

observe::action defines a code body which will be evaluated each time a variable dependency
has changed, including rules.

Note: rules compute values in a non-deterministic way, so they should be side-effect and IO-operations free.
In fact, a rule variable is computed (currently) each time the variable is read, by a trace on the actual variable.

package require Tcl 8.5

namespace eval observe {
        # to create the array
        set observers(dummy) 1
        set actions(dummy) 1
        set rule(dummy) 1
        set locks(dummy) 0
        
        # this is an emulation of atomic computation
        # to do things right we would have to use STM
        # and we do not, of course!
        proc atomically body {
                set ns [uplevel 1 namespace current]
                variable locks
                set locks($ns,atomic) 1
                uplevel 1 [list eval $body]
                set locks($ns,atomic) 0
                set deps ""
                foreach name $locks($ns,list) {
                        lappend deps {*}[deepseek $ns $name]
                }
                set locks($ns,list) ""
                compdeps [lsort -unique $deps] $ns
        }
                
        proc _var {name args} {
                # create the variable
                uplevel 1 variable $name
                assert {[llength $args] < 2}
                if {[llength $args]==1} {
                        uplevel 1 set $name $args
                }
        }
        
        proc deepseek {ns name} {
                # performs a deep seek on dependencies, retaining only actions
                variable observers
                set res ""
                set l $observers($ns,$name)
                foreach n $l {
                        lassign $n type name
                        if {$type eq "fwd"} {
                                lappend res {*}[deepseek $ns $name]
                        } else {
                                lappend res [list $type $name]
                        }
                }
                set res
        }
        
        proc clear {ns args} {
                variable locks
                unset locks($ns)
        }
        
        proc write {ns name args} {
                variable locks
                if {![info exists locks($ns)]} {
                        set locks($ns) 0
                        set locks($ns,atomic) 0
                        set locks($ns,list) ""
                        trace add variable ${ns}::$name unset [list clear $ns]
                }
                if {$locks($ns,atomic)} {
                        lappend locks($ns,list) $name
                        return
                }
                incr locks($ns)
                if {$locks($ns) > 1} {
                        incr locks($ns) -1
                        # abort the current computation
                        return
                }
                compdeps [lsort -unique [deepseek $ns $name]] $ns
                incr locks($ns) -1; # maybe locks are bugged ?
        }
        
        # given a list of dependencies, computes all actions
        proc compdeps {dependencies ns} {
                variable actions
                foreach dep $dependencies {
                        # evaluates all actions
                        lassign $dep type id
                        if {$type eq "act"} {
                                namespace eval $ns $actions($ns,$id)
                        }
                }
        }
                        
        
        proc var {name args} {
                set ns [uplevel 1 namespace current]
                # create the variable
                uplevel 1 ::observe::_var $name {*}$args
                # set the trace
                trace add variable ${ns}::$name write [list ::observe::write $ns $name]
                # register as no-op
                variable observers
                set observers($ns,$name) ""
        }
        
        proc raise {ns varname msg args} {
                error "$msg: ${ns}::$varname"
        }
        
        proc rule {name body} {
                set ns [uplevel 1 namespace current]
                # create the variable
                uplevel 1 [list ::observe::_var $name [uplevel 1 $body]]
                
                variable observers
                # the variable existence
                set observers($ns,$name) ""
                
                foreach var [getvarrefs $body] {
                        # dependencies
                        assert {[info exists observers($ns,$var)]} "cannot observe variable '${ns}::$name' which does not exist"
                        lappend observers($ns,$var) [list fwd $name]
                }
                # rules are computed on-demand and read-only
                trace add variable ${ns}::$name write [list ::observe::raise $ns $name "cannot write to rule"]
                trace add variable ${ns}::$name read [list ::observe::_rule $ns $name $body]
        }
        
        proc _rule {ns name body args} {
                set ${ns}::$name [namespace eval $ns $body]
        }
                
        proc getvarrefs body {
                set res ""
                foreach cmd [cmdSplit $body] {
                        set cmd [wordSplit $cmd]
                        switch -- [lindex $cmd 0] {
                                variable - ::variable {
                                        set name [lindex $cmd 1]
                                        # only watch current namespace's variables
                                        if {[regexp {^[a-zA-Z_0-9]+$} $name]} {lappend res $name}
                                }
                        }
                }
                set res
        }
        
        proc action body {
                set ns [uplevel 1 namespace current]
                variable observers
                variable actions
                incr actions($ns)
                set actions($ns,$actions($ns)) $body                
                foreach var [getvarrefs $body] {
                        assert {[info exists observers($ns,$var)]} "cannot observe variable '${ns}::$var' which does not exist"
                        lappend observers($ns,$var) [list act $actions($ns)] 
                }
        }
        
        # the assertion common procedure
        proc assert {cond {msg "assertion failed"}} {
                if {![uplevel 1 [list expr $cond]]} {error $msg}
        }
        
        # from the Tcler's wiki cmdSplit page
         proc cmdSplit {body} {
            set commands {}
            set chunk ""
            foreach line [split $body "\n"] {
                append chunk $line
                if {[info complete "$chunk\n"]} {
                    # $chunk ends in a complete Tcl command, and none of the
                    # newlines within it end a complete Tcl command.  If there
                    # are multiple Tcl commands in $chunk, they must be
                    # separated by semi-colons.
                    set cmd ""
                    foreach part [split $chunk ";"] {
                        append cmd $part
                        if {[info complete "$cmd\n"]} {
                            set cmd [string trimleft $cmd]
                            # Drop empty commands and comments
                            if {![string match {} $cmd] \
                                    && ![string match \#* $cmd]} {
                                lappend commands $cmd
                            }
                            if {[string match \#* $cmd]} {
                                set cmd "\#;"
                            } else {
                                set cmd ""
                            }
                        } else {
                            # No complete command yet.
                            # Replace semicolon and continue
                            append cmd ";"
                        }
                    }
                    set chunk ""
                } else {
                    # No end of command yet.  Put the newline back and continue
                    append chunk "\n"
                }
            }
             if {![string match {} [string trimright $chunk]]} {
                return -code error "Can't parse body into a\
                        sequence of commands.\n\tIncomplete\
                        command:\n-----\n$chunk\n-----"
            }
            return $commands
         }


        # from http://wiki.tcl.tk/cmdSplit (written by S. Arnold)
        proc wordSplit {command} {
        if {![info complete $command]} {error "non complete command"}
        set res ""; # the list of words
        set chunk ""
        foreach word [split $command " "] {
            # testing each word until the word being tested makes the
            # command up to it complete
                        # example:
                        # set "a b"
                        # set -> complete, 1 word
                        # set "a -> not complete
                        # set "a b" -> complete, 2 words
                        append chunk $word
                        if {[info complete "$res $chunk"]} {
                                        lappend res $chunk
                                        set chunk ""
                        } else {
                                        append chunk " "
                        }
                }
        set res
        }
}



# testing
interp alias "" s "" source observe.tcl

proc test {} {
        catch {namespace delete obs}
        namespace eval obs {
                observe::var x 0.0
                observe::var y 0.0
                observe::rule hypot {
                        variable x
                        variable y
                        expr {sqrt($x*$x+$y*$y)}
                }
                proc test {_x _y} {
                        observe::atomically {
                                variable x
                                variable y
                                set x $_x
                                set y $_y
                        }
                }
                observe::action {
                        variable x
                        variable y
                        variable hypot
                        puts "Hypot $x $y : $hypot"
                }
        }
        obs::test 4 3
}