After Task Scheduler

George Peter Staplin: As a fun experiment with cooperative task scheduling I implemented this prototype.


 # By George Peter Staplin -- Dec 19, 2005 - Jan 5, 2006
 # This implements a task scheduler using [after].
 # It supports (so far) 2 scheduling algorithms -- alternating and sequential.
 # This is version 4 of a prototype.
 
 array set ::tasks {}
 set ::task_counter 0
 set ::run_queue [list]
 
 proc assert body {
  if {![uplevel 1 $body]} {
   return -code error "assertion failed: $body"
  }
 }
 
 proc task {start iterator end data} {
  global tasks task_counter
 
  incr task_counter
 
  while {[info exists tasks($task_counter)]} {
   incr task_counter
  }
 
  lappend tasks(list) $task_counter
  set tasks($task_counter) $task_counter
  #
  # The valid states are NOT_RUNNING, RUNNING, DONE.
  #
  set tasks($task_counter,state) NOT_RUNNING 
  set tasks($task_counter,start) $start
  set tasks($task_counter,iterator) $iterator
  set tasks($task_counter,end) $end
  set tasks($task_counter,data) $data
 
  return $task_counter
 }
 
 proc reset.tasks {} {
  global tasks
 
  foreach t $tasks(list) {
   set tasks($t,state) NOT_RUNNING
  }
 }
 
 
 proc run.tasks algorithm {
  global tasks
  global run_queue
 
  lappend run_queue $algorithm
 
  if {[llength $run_queue] > 1} {
   #
   # We will handle this after the current task queue finishes.
   #
   return
  }
  
 
  switch -- $algorithm {
   sequential - alternating {}
 
   default {
    return -code error "invalid scheduler algorithm"
   }
  }
  after 1 start.task $tasks([lindex $tasks(list) 0]) $algorithm
 }
 
 proc possibly.start.task task {
  global tasks
 
  #
  # This is used to prevent starting a task multiple times.
  #
  if {"RUNNING" ne $tasks($task,state)} {
   set tasks($task,state) RUNNING
   $tasks($task,start) $tasks($task,data)
  }
 }
 
 proc next.run.queue {} {
  #
  # The current queue was emptied.
  # Transition to the next queue if possible.
  #
  global run_queue tasks
 
  set run_queue [lrange $run_queue 1 end]
 
  if {![llength $run_queue]} {
   return
  }
 
  reset.tasks
  set task [lindex $tasks(list) 0]
  possibly.start.task $task
  after 1 [list start.task $tasks($task) [lindex $run_queue 0]]
 }
 
 proc find.next.task.sequential {task algorithm end queue_var} {
  if {!$end} {
   return $task
  }
 
  global tasks
  upvar $queue_var queue
  #
  # Remove the task from the active queue.
  #
  set i [lsearch -exact $queue $task]
  set queue [lreplace $queue $i $i]
  if {![llength $queue]} {
   next.run.queue
   return ""
  }
 
  #
  # Start the next task
  # 
  set task [lindex $queue $i]
 
  possibly.start.task $task
 
  return $task
 }
 
 proc find.next.task.alternating {task algorithm end queue_var} {
  global tasks
  upvar $queue_var queue
 
  set i [lsearch -exact $queue $task]
 
  assert {expr {$i >= 0}}
 
  if {$end} {
   set queue [lreplace $queue $i $i]
   if {![llength $queue]} {
    next.run.queue
    return ""
   }
  } else {
   incr i
  }
 
  if {$i >= [llength $queue]} {
   set i 0
  }
 
 #puts "I:$i QUEUE LENGTH:[llength $queue]"
 
  set task [lindex $queue $i]
 
  possibly.start.task $task
 
  return $task
 }
 
 proc find.next.task {task algorithm end queue_var} {
  upvar $queue_var queue
 
  #
  # WARNING: This returns a result.
  #
  find.next.task.$algorithm $task $algorithm $end queue
 }
 
 proc run.iterator {task algorithm queue} {
  global tasks
 
  set end 0
  #
  # Call the iterator with the data.
  #
  if {![$tasks($task,iterator) $tasks($task,data)]} {
   #
   # We are done with this task.
   #
   $tasks($task,end) $tasks($task,data)
   set tasks($task,state) DONE
   set end 1
  }
 
  #
  # Find the next available task.
  #
  set task [find.next.task $task $algorithm $end queue]
 
  if {"" eq $task} {
   #
   # No more tasks.
   #
   return
  }
  after 1 [list run.iterator $task $algorithm $queue]
 }
 
 
 proc start.task {task algorithm} {
  global tasks
 
  if {"RUNNING" ne $tasks($task,state)} {
   $tasks($task,start) $tasks($task,data)
   set tasks($task,state) RUNNING
  }
 
  run.iterator [lindex $tasks(list) 0] $algorithm $tasks(list)
 }
 
 #### TESTS ####
 
 
 #
 # This implements a greeting pattern that repeats 100 times.
 # The first message is "well, hello" the last is "goodbye"
 #
 proc greeting.start data {
  set ::counter 0
  puts "well, hello"
 }
 
 proc greeting.iterator data {
  puts hello
 
  incr ::counter
 
  if {$::counter > 100} {
   #end task
   return 0
  }
  #continue
  return 1
 }
 
 proc greeting.end data {
  puts goodbye
 }
 
 
 #
 # This implements a timer pattern that operates for as many seconds
 # as the data member of the task array passes to the initial timer.start
 # procedure.  The data member is set at task creation time.
 #
 proc timer.start data {
  set ::timer_end [expr {wide([clock seconds]) + $data}]
  puts "::timer_end $::timer_end"
 }
 
 proc timer.iterator data {
  puts "timer iterator: [clock seconds]"
 
  if {[clock seconds] >= $::timer_end} {
   #end task
   return 0
  }
  #continue
  return 1
 }
 
 proc timer.end data {
  puts "BINGO!"
 }
 
 task greeting.start greeting.iterator greeting.end {}
 task timer.start timer.iterator timer.end 2
 
 foreach t [list sequential alternating sequential alternating alternating sequential] {
  run.tasks $t
 }
 
 catch {vwait forever}

See also: Concurrency concepts