[AMG]: The `async` package manages asynchronous communication with any number of child processes. ** Example ** ====== coroutine xxx apply {{} { set id1 [async::exec sort] chan puts $id1 b\na\nc set id2 [async::exec sort] async::close -direction stdin $id chan puts $id2 z\na\nc async::close -direction stdin $id2 chan puts [async::get $id2] chan puts [async::read -limit 2 -dropNewline $id2] chan puts [async::read -dropNewline $id1] async::close $id2 async::close $id1 set ::end 1 }} vwait end ====== The above will print "a c a b c", each character on a separate line. ** Commands ** *** [[`async::exec`]] *** Executes a child process command. The returned channel identifier can be used to asynchronously communicate with the child process via the [[`async::`*]] and selected [[`[chan]` *]] commands. The valid [[`chan`]] subcommands are: `[puts]`, `[flush]`, `[chan copy%|%copy%|%]` (recommend ''`outputChan`'' only), `[chan configure%|%configure%|%]`, `[chan push%|%push%|%]`, and `[chan pop%|%pop%|%]`. The channel is configured to be nonblocking with binary translation and line buffering. Please do not change the blocking mode. To block, instead omit the `-noWait` switch to [[`async::get`]] and [[`async::read`]]. *** [[`async::channels`]] *** Returns a list of all currently open asynchronous channel IDs. *** [[`async::close`]] *** Closes a currently open asynchronous channel. A channel may be partially closed, for example to send the `sort`(1) program EOF on its `stdin` after which it will start writing results to its `stdout`. Unread data will be discarded, possibly except for `stderr`. If `stderr` is being closed and `-stderr ignore` was not used, any unread `stderr` data is thrown with `ASYNC STDERR`. `async::close` ?`-direction` `all`|`stdin`|`stdout`|`stderr`? ?`-stderr` `ignore`|`throw`? ''`id`'' &| `-direction all` | Fully closes channel. The default. |& &| `-direction stdin` | Closes `stdin` component of channel. |& &| `-direction stdout` | Closes `stdout` component of channel. |& &| `-direction stderr` | Closes `stderr` component of channel. |& &| `-stderr ignore` | Ignores any unread data on `stderr`. |& &| `-stderr throw` | Throws any stderr data with `ASYNC STDERR`. The default. |& &| ''`id`'' | The asynchronous channel identifier returned by [[`async::exec`]]. |& *** [[`async::get`]] *** Gets one line of text from the `stdout` and/or `stderr` of an asynchronous child process, according to the value of the `-stderr` switch, which defaults to `-stderr throw`. The trailing newline is stripped, unless the `-keepNewline` switch is used. [Yield]s until the requested data is available, unless the `-noWait` switch is used. The result is returned or written into a variable, in which case the number of characters read is returned. In the case of `-noWait` with a variable, -1 is returned rather than 0 if the requested data is not immediately available. At end of file, the last line of `stdout` or `stderr` is considered complete even if it does not end in a newline. If called at end of file after all data has already been read and returned, `ASYNC EOF` is thrown. `async::get` ?`-noWait`? ?`-keepNewline`? ?`-variable` ''`varName`''? ?`-stderr` ''`mode`''? ''`id`'' &| `-noWait` | Returns immediately rather than yield if data is not available. |& &| `-keepNewline` | Does not strip the trailing newline. |& &| `-variable` ''`varName`'' | Name of variable into which to write the result. |& &| `-stderr ignore` | Ignores `stderr` and reads only `stdout`. |& &| `-stderr merge` | Reads from either `stderr` or `stdout`, whichever is available. |& &| `-stderr tag` | Like `merge`, except the return value is a tagged list. |& &| `-stderr read` | Ignores `stdout` and reads only `stderr`. |& &| `-stderr throw` | Returns `stdout` normally and throws `stderr` with `ASYNC STDERR`. |& &| ''`id`'' | The asynchronous channel identifier returned by [[`async::exec`]]. |& *** [[`async::read`]] *** Gets a block of data from the `stdout` and/or `stderr` of an asynchronous child process, according to the value of the `-stderr` switch, which defaults to `-stderr throw`. [Yield]s until the requested data is available, unless the `-noWait` switch is used. If called at end of file after all data has already been read and returned, `ASYNC EOF` is thrown. `async::get` ?`-noWait`? ?`-dropNewline`? ?`-limit` ''`size`''? ?`-stderr` ''`mode`''? ''`id`'' &| `-noWait` | Returns immediately rather than yield if data is not available. |& &| `-dropNewline` | Strips the trailing newline. |& &| `-limit` ''`size`'' | Maximum number of characters to read and return. |& &| `-stderr ignore` | Ignores `stderr` and reads only `stdout`. |& &| `-stderr merge` | Reads from either `stderr` or `stdout`, whichever is available. |& &| `-stderr tag` | Like `merge`, except the return value is a tagged list. |& &| `-stderr read` | Ignores `stdout` and reads only `stderr`. |& &| `-stderr throw` | Returns `stdout` normally and throws `stderr` with `ASYNC STDERR`. |& &| ''`id`'' | The asynchronous channel identifier returned by [[`async::exec`]]. |& *** [[`async::fill`]] *** Reads all available data from `stdout` and `stderr` into internal buffers. The current [coroutine] [yield]s until at least one character is successfully read or end of file is reached. This procedure need not be called by user code. *** [[`async::flow`]] *** Reads all immediately available data from `stdout` or `stderr` into internal buffers. This procedure need not be called by user code. Returns the number of characters that were read, 0 if no data was available, or -1 if at end of file or the requested direction was locally closed. ** Discussion ** [AMG]: The code appears to work but is only very lightly tested at this point. I will be putting it through its paces shortly, but I had to get a first version out before I could integrate it with anything. ---- [AMG]: I'm a little hesitant about a few design choices. * Is it really the right thing to have separate [[`async::get`]] and [[`async::read`]] commands? I did this to mirror [[`[chan gets]`]] and [[`[chan read]`]], but then my switch syntax diverged considerably. There's so much code overlap, I could combine the two, but perhaps it's simpler to use if they are kept separate. * Do I really want to have the block size, line termination mode, etc. apply to `stderr` when using `-stderr throw`? Or should I have `-stderr throw` always just throw all available `stderr` text just the way it appears? ---- [AMG]: Maybe this can be included in [tcllib] someday. ** Code ** ====== # async.tcl # Andy Goth # Require [dict], [throw], [chan pipe], [chan close dir], and [coroutine]. package require Tcl 8.6 # Create namespace. namespace eval ::async {} # Asynchronous channel internal data. set ::async::Channels {} # async::exec -- # Executes a child process command. The returned channel identifier can be used # to asynchronously communicate with the child process via the [async::*] and # selected [chan *] commands. The valid [chan] subcommands are: puts, flush, # copy (recommend outputChan only), configure, push, and pop. The channel is # configured to be nonblocking with binary translation and line buffering. # Please do not change the blocking mode. To block, instead omit the -noWait # switch to [async::get] and [async::read]. proc ::async::exec {args} { variable Channels # Create a pipe to collect stderr from the child process. lassign [chan pipe] chanStderr chanStderrWrite # Execute the child process. set chanInOut [open |[list {*}$args 2>@ $chanStderrWrite] a+] chan close $chanStderrWrite # Configure stdin/stdout and stderr channels. chan configure $chanInOut -blocking 0 -translation binary -buffering line chan configure $chanStderr -blocking 0 -translation binary -buffering line # Initialize the channel buffers. dict set Channels $chanInOut open {stdin {} stdout {} stderr {}} dict set Channels $chanInOut chan stdout $chanInOut dict set Channels $chanInOut chan stderr $chanStderr dict set Channels $chanInOut buf {stdout {} stderr {}} dict set Channels $chanInOut eof {flag 0 message {}} # Return the input/output channel to the caller. return $chanInOut } # async::channels -- # Returns a list of all currently open asynchronous channel IDs. proc ::async::channels {} { variable Channels dict keys $Channels } # async::close -- # Closes a currently open asynchronous channel. A channel may be partially # closed, for example to send the sort(1) program EOF on its stdin after which # it will start writing results to its stdout. Unread data will be discarded, # possibly except for stderr. If stderr is being closed and -stderr ignore was # not used, any unread stderr data is thrown with ASYNC STDERR. # # async::close ?-direction all|stdin|stdout|stderr? ?-stderr ignore|throw? id # -direction all: Fully closes channel. The default. # -direction stdin: Closes stdin component of channel. # -direction stdout: Closes stdout component of channel. # -direction stderr: Closes stderr component of channel. # -stderr ignore: Ignores any unread data on stderr. # -stderr throw: Throws any stderr data with ASYNC STDERR. The default. # id: The asynchronous channel identifier returned by [async::exec]. proc ::async::close {args} { variable Channels # Parse arguments. set direction all set stderrMode throw while {[llength $args]} { set args [lassign $args arg] if {[info exists id]} { return -code error "wrong # args: should be \"async::close\ ?-direction all|stdin|stdout|stderr?\ ?-stderr ignore|throw? id" } elseif {$arg eq "-direction"} { if {![llength $args] || [lindex $args 0] ni {all stdin stdout stderr}} { return -code error "-direction switch must be followed by:\ all, stdin, stdout, or stderr" } set args [lassign $args direction] } elseif {$arg eq "-stderr"} { if {![llength $args] || [lindex $args 0] ni {ignore throw}} { return -code error "-stderr switch must be followed by:\ ignore or throw" } set args [lassign $args stderrMode] } elseif {![dict exists $Channels $arg]} { return -code error "not an open asynchronous channel: $arg" } else { set id $arg } } # Get access to the channel status variables. dict with Channels $id { # When closing stderr and using -stderr throw, check for unread stderr. if {$stderrMode eq "throw" && $direction in {all stderr}} { flow $id stderr set stderrData [dict get $buf stderr] } if {$direction eq "all"} { # If requested, close all open channels. if {[dict exists $open stderr]} { chan close [dict get $chan stderr] } if {[dict exists $open stdin] || [dict exists $open stdout]} { chan close [dict get $chan stdout] } dict unset Channels $id } elseif {![dict exists $open $direction]} { # Check for double close. return -code error "direction \"$direction\" already closed for\ asynchronous channel: $id" } else { # Close only the requested direction. if {$direction eq "stdin"} { chan close [dict get $chan stdout] write } elseif {$direction eq "stdout"} { chan close [dict get $chan stdout] read } else { chan close [dict get $chan stderr] } # Clean up the asynchronous channel data as appropriate. dict unset open $direction if {![dict size $open]} { dict unset Channels $id } } # Throw any unread stderr data. if {[info exists stderrData] && $stderrData ne {}} { throw {ASYNC STDERR} $stderrData } } } # async::get -- # Gets one line of text from the stdout and/or stderr of an asynchronous child # process, according to the value of the -stderr switch, which defaults to # -stderr throw. The trailing newline is stripped, unless the -keepNewline # switch is used. Yields until the requested data is available, unless the # -noWait switch is used. The result is returned or written into a variable, in # which case the number of characters read is returned. In the case of -noWait # with a variable, -1 is returned rather than 0 if the requested data is not # immediately available. At end of file, the last line of stdout or stderr is # considered complete even if it does not end in a newline. If called at end of # file after all data has already been read and returned, ASYNC EOF is thrown. # # async::get ?-noWait? ?-keepNewline? ?-variable varName? ?-stderr mode? id # -noWait: Returns immediately rather than yield if data is not available. # -keepNewline: Does not strip the trailing newline. # -variable varName: Name of variable into which to write the result. # -stderr ignore: Ignores stderr and reads only stdout. # -stderr merge: Reads from either stderr or stdout, whichever is available. # -stderr tag: Like merge, except the return value is a tagged list. # -stderr read: Ignores stdout and reads only stderr. # -stderr throw: Returns stdout normally and throws stderr with ASYNC STDERR. # id: The asynchronous channel identifier returned by [async::exec]. proc ::async::get {args} { variable Channels # Parse arguments. set stderrMode throw while {[llength $args]} { set args [lassign $args arg] if {[info exists id]} { return -code error "wrong # args: should be \"async::get\ ?-noWait? ?-keepNewline? ?-variable varName?\ ?-stderr ignore|merge|tag|read|throw? id\"" } elseif {$arg eq "-noWait"} { set noWait {} } elseif {$arg eq "-keepNewline"} { set keepNewline {} } elseif {$arg eq "-variable"} { set varName $arg } elseif {$arg eq "-stderr"} { if {![llength $args] || [lindex $args 0] ni {ignore merge tag read throw}} { return -code error "-stderr switch must be followed by\ ignore, merge, tag, read, or throw" } set args [lassign $args stderrMode] } elseif {![dict exists $Channels $arg]} { return -code error "not an open asynchronous channel: $arg" } else { set id $arg } } # Confirm the requested directions are open. dict with Channels $id open { if {![info exists stdout] && ![info exists stderr]} { return -code error "both stdout and stderr closed for\ asynchronous channel: $id" } elseif {$stderrMode eq "ignore" && ![info exists stdout]} { return -code error "stdout closed for asynchronous channel: $id" } elseif {$stderrMode eq "read" && ![info exists stderr]} { return -code error "stderr closed for asynchronous channel: $id" } } # Loop until a complete line is available. Return if -noWait was given and # both stdout and stderr are empty, incomplete, or unwanted. At end of # file, the remainder of the buffer is considered to be complete even if it # does not end in newline. set rest {} while (1) { # First try reading from stderr, then stdout. foreach {dir skipMode} {stderr ignore stdout read} { # Check if this direction is open, not being ignored, and has a # complete line or has an incomplete line at end of file. set line [dict get $Channels $id buf $dir] if {$stderrMode ne $skipMode && [dict exists $Channels $id open $dir] && ([regexp {([^\n]*\n)(.*)} $line _ line rest] || ([dict get $Channels $id eof flag] && $line ne {}))} { # Remove the data from the buffer. dict set Channels $id buf $dir $rest # Unless -keepNewline, strip trailing newline. if {![info exists keepNewline]} { regsub {\n$} $line {} line } # If -stderr throw (the default) and a complete line was read # from stderr, throw it rather than return it. if {$dir eq "stderr" && $stderrMode eq "throw"} { throw {ASYNC STDERR} $line } # Build the result. With -stderr tag, the result is a list # consisting of "stdout" or "stderr" followed by the received # data. Otherwise, the result is the data. -stderr merge is # the only case where the caller won't be able to tell where the # data came from. if {$stderrMode eq "tag"} { set result [list $dir $line] } else { set result $line } # Give the result to the caller. if {[info exists varName]} { uplevel 1 [list set $varName $result] return [string length $line] } else { return $result } } } # At this point, stderr and stdout are both unwanted or unavailable. if {[dict get $Channels $id eof flag]} { # Throw an exception if at end of file. throw {ASYNC EOF} [dict get $Channels $id eof message] } elseif {![info exists noWait]} { # Fill the stdout/stderr buffers and retry if not -noWait. fill $id } elseif {[info exists varName]} { # If -noWait with a variable, set it empty and return -1. uplevel 1 [list set $varName {}] return -1 } else { # If -noWait without a vraiable, return empty. return {} } } } # async::read -- # Gets a block of data from the stdout and/or stderr of an asynchronous child # process, according to the value of the -stderr switch, which defaults to # -stderr throw. Yields until the requested data is available, unless the # -noWait switch is used. If called at end of file after all data has already # been read and returned, ASYNC EOF is thrown. # # async::get ?-noWait? ?-dropNewline? ?-limit size? ?-stderr mode? id # -noWait: Returns immediately rather than yield if data is not available. # -dropNewline: Strips the trailing newline. # -limit size: Maximum number of characters to read and return. # -stderr ignore: Ignores stderr and reads only stdout. # -stderr merge: Reads from either stderr or stdout, whichever is available. # -stderr tag: Like merge, except the return value is a tagged list. # -stderr read: Ignores stdout and reads only stderr. # -stderr throw: Returns stdout normally and throws stderr with ASYNC STDERR. # id: The asynchronous channel identifier returned by [async::exec]. proc ::async::read {args} { variable Channels # Parse arguments. set stderrMode throw while {[llength $args]} { set args [lassign $args arg] if {[info exists id]} { return -code error "wrong # args: should be \"async::read\ ?-noWait? ?-dropNewline? ?-limit size?\ ?-stderr ignore|merge|tag|read|throw? id\"" } elseif {$arg eq "-noWait"} { set noWait {} } elseif {$arg eq "-dropNewline"} { set dropNewline {} } elseif {$arg eq "-limit"} { if {![llength $args] || ![string is entier [lindex $args 0]] || [lindex $args 0] <= 0} { return -code error "-limit switch must be followed by\ a positive integer" } set args [lassign $args limit] } elseif {$arg eq "-stderr"} { if {![llength $args] || [lindex $args 0] ni {ignore merge tag read throw}} { return -code error "-stderr switch must be followed by\ ignore, merge, tag, read, or throw" } set args [lassign $args stderrMode] } elseif {![dict exists $Channels $arg]} { return -code error "not an open asynchronous channel: $arg" } else { set id $arg } } # Confirm the requested directions are open. dict with Channels $id open { if {![info exists stdout] && ![info exists stderr]} { return -code error "both stdout and stderr closed for\ asynchronous channel: $id" } elseif {$stderrMode eq "ignore" && ![info exists stdout]} { return -code error "stdout closed for asynchronous channel: $id" } elseif {$stderrMode eq "read" && ![info exists stderr]} { return -code error "stderr closed for asynchronous channel: $id" } } # Loop until a complete block is available. Return if -noWait was given # and both stdout and stderr are empty, incomplete, or unwanted. At end # of file, the remainder of the buffer is considered to be complete even # if its size is less than -limit value. while (1) { # First try reading from stderr, then stdout. foreach {dir skipMode} {stderr ignore stdout read} { # Check if this direction is open, not being ignored, and has a # complete block or has an incomplete block at end of file. set block [dict get $Channels $id buf $dir] if {$stderrMode ne $skipMode && [dict exists $Channels $id open $dir] && (![info exists limit] || [dict get $Channels $id eof flag] ? $block ne {} : [string length $block] >= $limit)} { # Remove the data from the buffer. if {[info exists limit]} { dict set Channels $id buf $dir\ [string range $block $limit end] set block [string replace $block $limit end] } else { dict set Channels $id buf $dir {} } # If -dropNewline, strip trailing newline. if {[info exists dropNewline]} { regsub {\n$} $block {} block } # If -stderr throw (the default) and a complete block was # read from stderr, throw it rather than return it. if {$dir eq "stderr" && $stderrMode eq "throw"} { throw {ASYNC STDERR} $block } # Return the result. With -stderr tag, the result is a list # consisting of "stdout" or "stderr" followed by the received # data. Otherwise, the result is the data. -stderr merge # is the only case where the caller won't be able to tell # where the data came from. if {$stderrMode eq "tag"} { return [list $dir $block] } else { return $block } } } # At this point, stderr and stdout are both unwanted or unavailable. if {[dict get $Channels $id eof flag]} { # Throw an exception if at end of file. throw {ASYNC EOF} [dict get $Channels $id eof message] } elseif {![info exists noWait]} { # Fill the stdout/stderr buffers and retry if not -noWait. fill $id } else { # If -noWait, return empty. return {} } } } # async::fill -- # Reads all available data from stdout and stderr into internal buffers. The # current coroutine yields until at least one character is successfully read or # end of file is reached. This procedure need not be called by user code. proc ::async::fill {id} { variable Channels # Don't attempt to read when already at end of file or when both stdout # and stderr have been closed. if {![dict get $Channels $id eof flag] && ([dict exists $Channels $id open stdout] || [dict exists $Channels $id open stderr])} { # Loop until data could be read. while {![flow $id stdout] && ![flow $id stderr]} { # Schedule this coroutine to be resumed when there is data. foreach dir {stdout stderr} { if {[dict exists $Channels $id open $dir]} { chan event [dict get $Channels $id chan $dir] readable\ [info coroutine] } } # Wait until at least one channel is readable. yield # Remove the scheduled channel event handlers. foreach dir {stdout stderr} { if {[dict exists $Channels $id open $dir]} { chan event [dict get $Channels $id chan $dir] readable {} } } } } } # async::flow -- # Reads all immediately available data from stdout or stderr into internal # buffers. This procedure need not be called by user code. Returns the number # of characters that were read, 0 if no data was available, or -1 if at end of # file or the requested direction was locally closed. proc ::async::flow {id direction} { variable Channels # Get access to the channel status variables. dict with Channels $id { if {[dict get $eof flag] || ![dict exists $open $direction]} { # Already at end of file, or the requested direction was closed. return -1 } elseif {[catch {chan read [dict get $chan $direction]} data] || ($data eq {} && [chan eof [dict get $chan $direction]])} { # At end of file, or an error occurred. dict set eof flag 1 if {$data eq {}} { dict set eof message "end of file" } else { dict set eof message $data } return -1 } else { # Not at end of file. Possibly got some data. Append to buffer. dict append buf $direction $data return [string length $data] } } } # vim: set sts=4 sw=4 tw=80 et ft=tcl: ====== <> Command | Interprocess Communication | Package