Streams

Richard Suchenwirth 2002-05-11 - Streams are a powerful concept in (not only functional) programming. In SICP chapter 3.5, streams are introduced as data structures characterized as "delayed lists", whose elements are produced and returned only on demand (deferred evaluation). This way, a stream can promise to be a potentially endless source of data, while taking only finite time to process and deliver what's really wanted. Other streams may provide a finite but very large number of elements, which would be impractical to process in one go. In Tcl, the two ways of reading a file are a good example:

  • read $fp returns the whole contents, which then can be processed;
  • while {[gets $fp line]>-1} {...} reads line by line, interleaved with processing

The second construct may be less efficient, but is robust for gigabyte-sized files. A simpler example is pipes in Unix/DOS (use TYPE for cat on DOS):

   cat foo.bar | more

where the "cat" delivers lines of the file as long as "more" will take them, and waits otherwise (after all, stdin and stdout are just streams...). Such process chains can be emulated in Tcl with the following rules:

A stream is modelled here as a procedure that returns one stream item on each call. The special item "" (the empty string) indicates that the stream is exhausted. Streams are interesting if they don't deliver the same result on every call, which requires them to maintain state between calls e.g. in static variables (here implemented with the fancy remember proc) - examples are intgen that delivers ever increasing integers, or gets $fp where the file pointer advances at each call, so potentially all lines of the file are returned over time.

A filter takes one or more streams, and possibly other arguments, and reacts like a stream too. Hence, streams can be (and typically are) nested for processing purposes. If a filter meets end-of-stream, it should return that too. Filters may be characterized as "selectors" (who may return only part of their input, like "grep") and/or "appliers" who call a command on their input and return the result. Note that on infinite streams, selectors may never return, e.g. if you want the second even prime...

Streams in general should not be written in brackets (then the Tcl parser would eagerly evaluate them before evaluating the command), but braced, and stream consumers eval the stream at their discretion.

Before we start, a word of warning: maintaining state of a procedure is done with default arguments that may be rewritten. To prevent bugs from procedures whose defaults have changed, I've come up with the following simple architecture - procs with static variables are registered as "sproc"s, which remembers the initial defaults, and with a reset command you can restore the initial values for one or all sprocs:

proc sproc {name head body} {
    set ::sproc($name) $head
    proc $name $head $body
}
proc reset {{what *}} {
    foreach name [array names ::sproc $what] {
        proc $name $::sproc($name) [info body $name]
    }
}

Now let's start with a simple stream source, "cat", which as a wrapper for gets returns the lines of a file one by one until exhausted (EOF), in which case an empty string is returned (this requires that empty lines in the files, which would look similarly, are represented as a single blank):

sproc cat {filename {fp {}} } {
    if {$fp eq ""} {
        remember fp [set fp [open $filename]]
    }
    if {[gets $fp res]<0} {
        remember fp [close $fp] ;# which returns an empty string ;-)
    } elseif {$res eq ""} {set res " "} ;# not end of stream!
    set res
}
# - rewrite a proc's default arg with given value
proc remember {argn value} {
    set procn [lindex [info level -1] 0] ;# caller's name
    set argl {}
    foreach arg [info args $procn] {
        if [info default $procn $arg default] {
            if {$arg==$argn} {set default $value}
            lappend argl [list $arg $default]
        } else {
            lappend argl $arg
        }
    }
    proc $procn $argl [info body $procn]
    set value
}
# This simple but infinite stream source produces all positive integers:
sproc intgen {{seed -1}} {remember seed [incr seed]}

# This produces all (well, very many) powers of 2:
sproc powers-of-2 {{x 0.5}} {remember x [expr $x*2]}

# A filter that reads and displays a stream until user stops it:
proc more {stream} {
    while 1 {
        set res [eval $stream]
        if {$res==""} break ;# encountered end of stream
        puts -nonewline $res; flush stdout
        if {[gets stdin]=="q"} break
    }
}

Usage example:

more {cat streams.tcl}

which crudely emulates the Unix/DOS pipe mentioned above (you'll have to hit <Return> after every line, and "q<Return>" to quit..). more is the most important "end-user" of streams, especially if they are infinite. Note however that you need stdin for this implementation, which excludes wishes on Windows (one might easily write a UI-more that reacts on mouse clicks, though).

A more generic filter takes a condition and a stream, and on each call returns an element of the input stream where the condition holds - if ever one comes along:

proc filter {cond stream} {
    while 1 {
        set res [eval $stream]
        if {$res=="" || [$cond $res]} break
    }
    set res    
}
# Here is a sample usage with famous name:
proc grep {re stream} {
    filter [lambda [list x [list re $re]] {regexp $re $x}] $stream
}
#.... which uses the (less) famous function maker:
proc lambda {args body} {
    set name [info level 0]
    proc $name $args $body
    set name
}
# Usage example: more {grep this {cat streams.tcl}}

Friends of syntactic sugar might prefer shell style:

 $ cat streams.tcl | grep this | more

and guess what, we can have that in Tcl too (and not in Scheme !-), by writing a proc, that also resets all sprocs, with the fancy name "$" (in Unix, this could be the shell prompt that you don't type, but for Tcl we always have to have the command name as first word):

proc $ args {
    reset
    set cmd {}
    foreach arg $args {
        if {$arg != "|"} {
            lappend tmp $arg
        } else {
            set cmd [expr {$cmd==""? $tmp: [lappend tmp $cmd]}]
            set tmp {}
        }
    }
    uplevel 1 [lappend tmp $cmd]
}

To prove that we haven't cheated by using exec, let's introduce a line counter filter:

sproc -n {stream {n 0}} {
    set res [eval $stream]
    if {$res!=""} {set res [remember n [incr n]]:$res}
}

This can be added to filter chains, to count lines in the original file, or only the results from grep:

$ cat streams.tcl | -n | grep this | more
$ cat streams.tcl | grep this | -n | more

We further observe that more has a similar structure to filter, so we could also rewrite it in terms of that:

proc more2 stream {
    filter [lambda x {
        puts -nonewline $x; flush stdout
        expr {[gets stdin]=="q"}
    }] $stream
}
# Here is another stream producer that returns elements from a list:
sproc streamlist {list {todo {}} {firstTime 1} } {
    if $firstTime {set todo $list; remember firstTime 0}
    remember todo [lrange $todo 1 end]
    lindex $todo 0
}
# This one repeats its list endlessly, so better use it with 'more':
sproc infinite-streamlist {list {todo {}} } {
    initially todo $list
    remember  todo [lrange $todo 1 end]
    lindex   $todo 0
}
# This is sugar for first-time assignment of static variables:
proc initially {varName value} {
    upvar 1 $varName var
    if {$var==""} {set var $value}
}
# But for a simple constant stream source, just use [subst]:
# more {subst 1} ;# will produce as many ones as you wish

# This filter collects its input (should be finite ;-) into a list:
proc collect stream {
    set res {}
    while 1 {
        set element [eval $stream]
        if {$element==""} break
        lappend res $element
    }
    set res
}

The sort filter is unusual in that it consumes its whole (finite!) input, sorts it, and acts as a stream source on the output:

sproc sort {stream {todo {}} {firstTime 1}} {
    if $firstTime {
        set todo [lsort [collect $stream]]
        remember firstTime 0
    }
    remember todo [lrange $todo 1 end]
    lindex $todo 0
}
# $ streamlist {foo bar grill a} | sort | collect => a bar foo grill
 
proc apply {f stream} {$f [eval $stream]}
#... This can be plugged into a filter chain to see what's going on:
proc observe stream {apply [lambda y {puts $y; set y}] $stream}

# ... or, to get a stream of even numbers, starting from 0:
more {apply [lambda x {expr $x*2}] intgen}

Now for the example in SICP: find the second prime in the interval between 10000 and 1000000.

sproc interval {from to {current {}} } {
    initially current $from
    if {$current<=$to} {
        remember current [expr $current+1]
    }
}
proc prime? x {
    if {$x<2} {return 0}
    set max [expr sqrt($x)]
    set try 2
    while {$try<=$max} {
        if {$x%$try == 0} {return 0}
        incr try [expr {2-($try==2)}]
    }
    return 1
}
proc stream-index {stream index} {
    for {set i 0} {$i<=$index} {incr i} {
        set res [eval $stream]
    }
    set res
}
sproc stream-range {stream from to {pos 0}} {
    while {$pos<$from} {
        set res [eval $stream] ;# ignore elements before 'from'
        if {$res==""} return   ;# might be end-of-stream
        incr pos
    }
    if {$to!="end" && $pos > $to} return
    remember pos [incr pos]
    eval $stream
}

stream-index {filter prime? {interval 10000 1000000}} 1 ==> 10009

Another idea from SICP is a "smoothing" function, that averages each pair of values from the input stream. For this we need to introduce a short-term memory also in the filter:

sproc average {stream {previous {}} } {
    if {$previous==""} {set previous [eval $stream]}
    remember previous [set current [eval $stream]]
    if {$current!=""} {expr {($previous+$current)/2.}}
}

which, tested on a n-element stream, returns n-1 averages:

collect {average {streamlist {1 2 3 4 5}}} ==> 1.5 2.5 3.5 4.5

Yet another challenge was to produce an infinite stream of pairs {i j} of positive integers, i <= j, ordered by their sum, so that more pairs produces consecutively

 {1 1} {1 2} {1 3} {2 2} {1 4} {2 3} {1 5} {2 4} {3 3} {1 6} ...

Here's my solution which does that:

sproc pairs {{last {}} } {
    if {$last eq ""} {
        set last [list 1 1] ;# start of iteration
    } else {
        foreach {a b} $last break
        if {$a >= $b-1} {
            set last [list 1 [expr {$a+$b}]] ;# next sum level
        } else {
            set last [list [incr a] [incr b -1]]
        }
    }
    remember last $last
}

Ramanujan numbers: The pairs generator can be used to find Ramanujan numbers, which can be represented as the sum of two integer cubes in more than one way. Here I use a global array for recording results:

sproc Ramanujan {stream {firstTime 1}} {
    if $firstTime {unset ::A; remember firstTime 0}
    while 1 {
        set pair [eval $stream]
        foreach {a b} $pair break
        set n [expr {$a*$a*$a + $b*$b*$b}]
        if [info exists ::A($n)] {
            lappend ::A($n) $pair
            break
        } else {set ::A($n) [list $pair]}
    }
    list $n $::A($n)
}

more {Ramanujan pairs} ;# or: $ pairs | Ramanujan | more

delivers in hardly noticeable time the R. numbers 1729, 4104, 13832... Or, how's this infinite Fibonacchi number generator, which on more fibo produces all the F.numbers (0,1,1,2,3,5,8,13,21...) you might want?

sproc fibo {{a ""} {b ""}} {
    if {$a==""} {
        remember a 0
    } elseif {$b==""} {
        remember b 1
    } else {
        if {$b > 1<<30} {set b [expr double($b)]}
        remember a $b
        remember b [expr $a+$b]
    }
}

Discussion

With the above code, it was possible to reproduce quite some behavior of streams as documented in SICP, not as data structures but with Tcl procs (though procs are data too, in some sense...). What's missing is the capability to randomly address parts of a stream, as is possible in Scheme (and of course their claim to do without assignment, or mutable data...) Tcl lists just don't follow LISP's CAR/CDR model (though KBK demonstrated in Tcl and LISP that this structure can be emulated, also with procs), but rather C's flat *TclObject[] style.

The absence of lexical scoping also led to constructs like sproc/reset, which stop a gap but aren't exactly elegant - but Tcl's clear line between either local or global variables allows something like closures only by rewriting default arguments like done in remember (or like in Python).

Don't take this as a fundamental critique of Tcl, though - its underlying model is far more simple and elegant than LISP's (what with "special forms", "reader macros"...), and yet powerful enough to do just about everything possible...

vfs does the discussion on VFS under exec make sense? Could interaction between these 'Tcl-streams' and 'exec-pipe-streams' be possible by diverting Tcl's TclpCreateProcess in appropriate ways? The idea would be that 'exec cat foo | tclproc | more' could pipe the contents of 'foo' to a tclproc and then the results of tclproc would be piped to more.... where some of these components are Tcl procs, and some are standard executables, and Tcl doesn't care!


From the Tcl chatroom: - re Streams, the Icon language and generators ...

dkf: So the idea would be sort of a list where the elements of the list have not all been created yet?

suchenwi: For instance: proc intgen {{seed -1}} {remember [incr seed]} ;# integer generator - "remember" rewrites the pseudo-static default argument.

dkf: Or maybe a pair of a value and a command to generate the next value in the sequence...

suchenwi: Donal: Exactly - and might be infinite. Lazy evaluation, on demand. This pair approach is how they do it in Scheme (see SICP).

dkf: The pair approach is also used in StandardML. It'd be feasable to create real lazy lists in Tcl if conversion to string was allowed to fail. (Or if we had a way to represent finite prefixes of infinite strings, of course.)

suchenwi: Such lazy lists should also be foreach-able.

dkf: Having such "lists" would be beneficial to Tcl, particularly because you could use them in places (notably fconfigure on sockets) where generating some values is very slow but not all software wants the expensive-to-generate bits.

suchenwi: What should llength return? NaN? -1?

dkf: [llength $infiniteList] either should throw an error or not return at all.

suchenwi: -1 looks better to me. or error, ok - but not "not return at all". When playing with Streams, I had endlessly endless loops... Better quick kill w/error.

dkf: No, because that'll cause *lots* of problems elsewhere in the core. (negative offsets are used in quite a few places in the core to indicate end-relative indexing.)

suchenwi: Yes, I withdrew my -1 proposal.

stevel: combine generators with threads and some form of stream (perhaps, a memchan), and you have pipes + filters. Only much nicer than the Unix model - these would be cross-platform and highly scaleable. Sounds like a nice project for an undergraduate thesis ;-)

dkf: Instead of a stream of bytes, you have a stream of structured values. Or even a tree (because the first element could be an infinite list itself...) As I said, the main problem is that there is currently no facility in Tcl for conversion-to-string to fail.

stevel: actually, my idea is to leave your data sitting in something like metakit, and just pass handles around.


30sep03 jcw - The read/gets model deals with streams. Memchan is a way to simulate (not too big) byte streams. Would it be possible to extend the channel model so gets returns some other unit than lines of text? What if one had an, eh, stream which could be accessed as a channel, but with gets returning Tcl_Obj's, i.e. arbitrary data structures? Maybe puts could be adjusted as well, to inject objects into such a stream without forcing them to a string representation? Maybe the channel could have an extra setting which defines what "puts" inserts at the end (normally a newline, but in this case nothing would be useful)? Maybe, like memchan, one could also have a fifo-style variant with queueing behavior, but for Tcl_Obj's instead of strings/bytes?

In other words: maybe the channel model can be expanded to cover this generator- and producer/consumer capability?

Lots of maybe's...


RS: Sounds very promising... One might put such streams in the center of the model, and let files, sockets and such just be special cases...

jcw - Good point. Hey, we could perhaps introduce new (non-stringifying) commands "get" and "put"! - RS keeps on thinking: lists could also be special cases of streams; and/or their value representation. - jcw does too: get/put would differ from gets/puts, in that they maintain "object boundaries" in the stream, and if mixed, put+gets could force "stringification" and stop at newlines as usual, while puts+get could use [info complete ...] to grab whatever text makes a single list item? With a bit of luck, put/get could even work across byte streams such as sockets, using tcl's list quoting to get items across, of arbitrary shape? ... anyway, quite a stream of thought here, not sure where the boundaries are - :o)

Oh, and to get back to the starting point: "foreach x ... { ... }" for streams would need an equivalent... "geteach x $fd { ... }" ? -jcw


AK From my pov you seem to try to twist channels, which are streams of bytes, into message queues, which are streams of messages/packets/objects. Sort of like the difference between TCP/IP and UDP/IP.


Jpaulm For a discussion of recursive functions and "lazy cons" in the FBP context, see http://www.jpaulmorrison.com/fbp/recurs.htm . By the way, have you guys looked at Comega ? IMO they have some very interesting generalizations of the . and [] notations for streams...

See also