Version 1 of pheap

Updated 2009-02-04 11:53:36 by fm

pheap - Priority Queue

A Pairing-heap priority queue in Tcl. The aim is to have an illustration and implementation of the pairing-heap data-structure in a readable and practically usable way. The heap is represented by "pointer-like" connected elements. These elements are kept in an tcl-array 'PH'. Instead of pointers, names are used in Tcl. That leaves the details of the algorithm openly visible and makes it easy to re-implement it later in some more low-level language like Java, C# etc.

Synopsis

package require Tcl 8.5
package require fm::pheap  ? 0.1 ? 
::pheap::pheap   ? -compare ascii|dictionary|numeric|command ?  ? -decreasing ?  ? pheapName ?  
pheapName clear
pheapName contains item ? prioVar ?
pheapName destroy ? name ?
pheapName hasmin  ? itemVar ?  ? prioVar ?
pheapName popmin  ? itemVar ?  ? prioVar ?  
pheapName remove item  ? prioVar ?
pheapName setp item ? prio ?
pheapName size

Code

package require Tcl 8.5
package provide fm::pheap 0.1
# ########################################################################
namespace eval pheap {
    namespace export pheap sort
    variable counter 0
    variable MAX_INT [expr {int((1<<31)-1)}]
}
# ########################################################################
# ************************************************************************
proc pheap::_usage {pref name} {
    return -code error "${pref}: should be \"$name ?-compare ascii|dictionary|numeric|command? ?name?\""
}
# ************************************************************************
proc pheap::cmp-dictionary {a b} {
    if {$a eq $b} { return 0 }
    # need to use lsort to access -dictionary sorting
    set x [lsort -dictionary [list $a $b]]
    if {[lindex $x 0] eq $a} { return -1 }
    return 1
}
# ************************************************************************
proc pheap::cmp-dictionary-decr {a b} {
    if {$a eq $b} { return 0 }
    # need to use lsort to access -dictionary sorting
    set x [lsort -dictionary [list $a $b]]
    if {[lindex $x 0] eq $a} { return 1 }
    return -1
}
# ************************************************************************
proc pheap::cmp-numeric {a b} { expr {($a<$b)?-1:(($a==$b)?0:1)} }
# ************************************************************************
proc pheap::cmp-numeric-decr {a b} { expr {($a<$b)?1:(($a==$b)?0:-1)} }
# ************************************************************************
proc pheap::cmp-ascii-decr {a b} {
    set x [::string compare $a $b]
    expr {($x<0)?1:($x>0)?-1:0}
}
# ************************************************************************
proc pheap::cmp-userdef-decr {cmp a b} {
    set x [{*}$cmp $a $b]
    expr {($x<0)?1:($x>0)?-1:0}
}
# ************************************************************************
    ;# ::pheap::pheap  ?-compare ascii|dictionary|numeric|command?
    ;#                 ?-decreasing?   ?pheapName?
    ;# 
    ;# This command creates a new prioqueue object with an associated
    ;# global Tcl command whose name is pheapName, which will be returned.
    ;# If no name is given, some name will be generated using the
    ;# format ::pheap$nr. This command may be used to invoke any of
    ;# defined operations on the priority queue.
    ;# 
    ;# The option -compare selects the comparing function.
    ;# It can be a userdefined command prefix that gets called with
    ;# two priorities appended, or one of the predefined functions:
    ;#  * 'ascii' does 'string compare'
    ;#  * 'dictionary' behaves like 'lsort -dictionary'
    ;#  * 'numeric' uses 'expr {$a<$b}'
    ;# Default is '-compare numeric'.
    ;# 
    ;# To get the inverse comparison you may use the flag -decreasing.
    ;# 
proc pheap::pheap {args} {
    set len [llength $args]
    variable counter
    set id [incr counter]

    set dir increasing
    set cmp numeric
    set cmdName {}
    
    for {set i 0} {$i < $len} {incr i} {
        switch -- [set opt [lindex $args $i]] {
            -decreasing { set cmp decreasing }
            -compare    { set cmp [lindex $args [incr i]] }
            default {
                if {$i == $len-1} {
                    set cmdName $opt
                } else {
                    _usage "unknown option '$opt'" [lindex [info level 0] 0]
                }
            }
        }
    }
    if {$cmdName eq {}} { set cmdName  ::pheap$id }

    if {[info commands $cmdName] ne ""} {
        error "command \"$cmdName\" already exists, unable to create pheap"
    }

    if {$dir eq "increasing"} {  ;# default
        switch -- $cmp {
            ascii      { set cmp "::string compare" }
            dictionary { set cmp "::pheap::cmp-dictionary" }
            numeric    { set cmp "::pheap::cmp-numeric" }
            default    {  ;# keep user-defined cmp.
            }
        }
    } else {  ;# decreasing
        switch -- $cmp {
            ascii      { set cmp "::pheap::cmp-ascii-decr" }
            dictionary { set cmp "::pheap::cmp-dictionary-decr" }
            numeric    { set cmp "::pheap::cmp-numeric-decr" }
            default    { set cmp [list "::pheap::cmp-userdef-decr" $cmp] }
        }
    }


    # create variables containing the data
    array set ::pheap::ph$id {}
    set ::pheap::root$id     {}
    set ::pheap::cmp$id      $cmp
    set ::pheap::cmdName$id  $cmdName


    # Create the command that represents that object
    set map [dict create]
    dict set map clear        [list ::pheap::clear    $id]
    dict set map contains     [list ::pheap::contains $id]
    dict set map destroy      [list ::pheap::destroy  $id]
    dict set map hasmin       [list ::pheap::hasmin   $id]
    dict set map popmin       [list ::pheap::popmin   $id]
    dict set map remove       [list ::pheap::remove   $id]
   #dict set map set          [list ::pheap::setp     $id]
    dict set map setp         [list ::pheap::setp     $id]
    dict set map size         [list ::pheap::size     $id]


    # Unofficial functions:
    dict set map demote       [list ::pheap::demote   $id]
    dict set map dump         [list ::pheap::dump     $id]
    dict set map get          [list ::pheap::get      $id]
    dict set map id           [list ::pheap::id       $id]
    dict set map index        [list ::pheap::index    $id]
    dict set map insert       [list ::pheap::insert   $id]
   #dict set map isempty      [list ::pheap::isempty  $id]
   #dict set map isfilled     [list ::pheap::isfilled $id]
    dict set map keep         [list ::pheap::keep     $id]
    dict set map merge        [list ::pheap::merge    $id]
    dict set map names        [list ::pheap::names    $id]
    dict set map peek         [list ::pheap::peek     $id]
    dict set map pop          [list ::pheap::pop      $id]
    dict set map priority     [list ::pheap::priority $id]
    dict set map promote      [list ::pheap::promote  $id]
    dict set map root         [list ::pheap::root     $id]
    dict set map top          [list ::pheap::top      $id]
   #dict set map value        [list ::pheap::value    $id]
   #dict set map xx           [list ::pheap::xx       $id]

    namespace ensemble create  -map $map  -command $cmdName
    set cmdName
}
# ************************************************************************
proc pheap::size {id} { array size ::pheap::ph$id }
# ************************************************************************
    ;# Remove all items from this priority queue.
proc pheap::clear {id} {
    upvar #0 ::pheap::ph$id PH   ::pheap::root$id root
    unset PH
    array set PH {}
    set root {}
}
# ************************************************************************
    ;# Return 1 if the selected item is in this priority queue, else 0. 
proc pheap::contains {id a {aCost {}}} {
    upvar #0 ::pheap::ph$id PH
    if {[info exists PH($a)]} {
        if {[llength [info level 0]] == 4} {
            upvar $aCost cost
            set cost [lindex $PH($a) 0]
        }
        return 1
    }
    return 0
}
# ************************************************************************
    ;# Destroy the prioqueue, including its storage space and associated command.
    ;# Example:  $q destroy $q
    ;# 
proc pheap::destroy {id {a {}}} {
    set cmdName [set ::pheap::cmdName$id]
    # If there has been a rename to $cmdName then 'rename $cmdName {}' is wrong.
    if {$a eq {}} { catch {rename $cmdName {}} } else { rename $a {} }
    unset -nocomplain \
         ::pheap::ph$id \
         ::pheap::cmp$id \
         ::pheap::root$id \
         ::pheap::cmdName$id
}
# ************************************************************************
    ;# Get or set the priority of the item. If called with one argument,
    ;# it just returns the priority of item. If prio is given, it does a
    ;#     pheapName promote|demote|insert item prio
    ;# depending on whether the item already is in the priority queue and
    ;# whether the new prio is smaller or bigger than before.
    ;# Then prio is returned.
    ;# 
proc pheap::setp {id item {cost {}}} {
    upvar #0 ::pheap::ph$id PH  ::pheap::root$id root  ::pheap::cmp$id cmp
    if {[llength [info level 0]] == 3} { ;# get priority
        if {[info exist PH($item)]} { return [lindex $PH($item) 0] }
        error "*** [set ::pheap::cmdName$id] no such item: $item"
    }
    if {[info exists PH($item)]} {
        set c [{*}$cmp $cost [lindex $PH($item) 0]]
        if {$c < 0} {
            promote $id $item $acost
        } elseif {!$c} {
            lset PH($item) 0 $cost
        } else {
            demote $id $item $acost
        }
    } elseif {$root eq {}} {
        set PH($item) [list $cost {} {} {}]
        set root $item
    } else {
        lassign $PH($root) rcost rchild rprev rnext
        if {[{*}$cmp $cost $rcost] <= 0} { ;# item becomes new root
            lset PH($root) 2 $item
            set PH($item) [list $cost $root {} {}]
            set root $item
        } else {               ;# item becomes first-child of root
            lset PH($root) 1 $item
            set PH($item) [list $cost {} $root $rchild]
            if {$rchild ne {}} { lset PH($rchild) 2 $item }
        }
    }
    set cost
}
# ************************************************************************
    ;# Robust:  $a need not be in pheap.  (contains)
    ;# 
proc pheap::remove {id a {aCost _}} {
    upvar #0 ::pheap::ph$id PH  ::pheap::root$id root
    if {![info exists PH($a)]} { return 0 }  ;# don't touch anything

    if {[llength [info level 0]] == 4} { upvar $aCost cost }
    if {$a eq $root} { return [popmin $id a cost] }

    lassign $PH($a) cost xchild xprev xnext

    if {$xchild eq {}} {
        if {[lindex $PH($xprev) 1] eq $a} {  ;# .child
            lset PH($xprev) 1 $xnext
        } else {  ;# .next
            lset PH($xprev) 3 $xnext
        }
        if {$xnext ne {}} { lset PH($xnext) 2 $xprev }
    } else {  ;# xchild replaces a
        set xchild [_twoPass $id $xchild]
        lset PH($xchild) 2 $xprev
        lset PH($xchild) 3 $xnext

        if {[lindex $PH($xprev) 1] eq $a} {  ;# .child
            lset PH($xprev) 1 $xchild
        } else {  ;# .next
            lset PH($xprev) 3 $xchild
        }
        if {$xnext ne {}} { lset PH($xnext) 2 $xchild }
    }
    unset PH($a)
    return 1
}
# ************************************************************************
    ;# Get minimum and its cost and remove it from the heap.
    ;# 
proc pheap::popmin {id {aElem _} {aCost _}} {
    upvar #0 ::pheap::ph$id PH  ::pheap::root$id root
    if {$root eq {}} { return 0 }  ;# don't touch anything

    switch [llength [info level 0]] {
        3 { upvar $aElem x }
        4 { upvar $aElem x  $aCost cost }
    }
    set x $root
    lassign $PH($root) cost rchild rprev rnext
    #assert {[_testMin $id]}

    if {$rchild eq {}} {
        set root {}
    } else {
        set root [_twoPass $id $rchild]
        lset PH($root) 2 {}  ;# .prev
        #assert {[lindex $PH($root) 2] eq {}}; assert {[lindex $PH($root) 3] eq {}}
    }

    unset PH($x)
    return 1
}
# ************************************************************************
    ;# Return 0 if empty, else set item and prio and return 1.
    ;# 
proc pheap::hasmin {id {item _} {prio _}} {
    upvar #0 ::pheap::ph$id PH  ::pheap::root$id root
    if {$root eq {}} { return 0 }  ;# don't touch anything
    
    switch [llength [info level 0]] {
        3 { upvar $item x }
        4 { upvar $item x  $prio cost }
    }
    set x $root
    set cost [lindex $PH($root) 0]
    return 1
}
# ************************************************************************
    ;# Set a new, smaller priority $cost for object $a.
    ;# 'decreaseKey'  := 'decrease cost'
    ;# 
proc pheap::promote {id a acost} {
    upvar #0 ::pheap::ph$id PH  ::pheap::root$id root
    lset PH($a) 0 $acost
    if {$a ne $root} {  ;# Entferne a samt children aus dem heap.
        lassign $PH($a) xcost xchild xprev xnext
        if {$xnext ne {}} { lset PH($xnext) 2 $xprev }
        if {[lindex $PH($xprev) 1] eq $a} {  ;# (.child == a)
            lset PH($xprev) 1 $xnext
        } else {  ;# (.next == a)
            lset PH($xprev) 3 $xnext
        }
        # Bereite '_compLink' vor.
        # Unnötig:  lset PH($a) 2 {}  ;# .prev
        lset PH($a) 3 {}  ;# .next
        # Stecke a ganz oben wieder rein. D.h. 'insert'+berücksichtige Kinder!
        lset PH($root) 3 $a  ;# .next (implicit para for '_compLink')
        ;#      root -  a
        ;#      |    +  |
        ;#      A       B
        set root [_compLink $id $root]
        #assert {[lindex $PH($root) 2] eq {}}; assert {[lindex $PH($root) 3] eq {}}
    }
    set acost
}
# ************************************************************************
    ;# Set a new, bigger priority $cost for object $a.
    ;# 
    ;# xxx Not tested yet!
    ;# 
proc pheap::demote {id a acost} {
    upvar #0 ::pheap::ph$id PH  ::pheap::root$id root  ::pheap::cmp$id cmp
    lset PH($a) 0 $acost
    lassign $PH($a) mcost mchild mprev mnext

    if {$a eq $root} {
        if {$mchild eq {}} { return $acost }

        set x [_twoPass $id $mchild]  ;# remove siblings
        lassign $PH($x) xcost xchild xprev xnext
        if {[{*}$cmp $acost $xcost] > 0} {
            ;# remove root and insert it with modified cost
            lset PH($root) 1 {}
            set PH($x) [list $xcost $xchild {} $root]
            set root [_compLink $id $x]
            #assert {[lindex $PH($root) 2] eq {}}; assert {[lindex $PH($root) 3] eq {}}
        }
    } else {
        if {$mchild ne {} && [{*}$cmp [lindex $PH($mchild) 0] $acost] < 0} {
            set x [_twoPass $id $mchild]  ;# remove siblings
            lassign $PH($x) xcost xchild xprev xnext
            # remove x (new child of a) and insert it at root
            lset PH($a) 1 {}

            # Stecke x ganz oben als first-child wieder rein.
            # We know:
            #     (root.cost < x.cost)   &&
            #     (root.child ne {})     &&
            #     (root.child ne mchild) &&
            #     (root.child ne x)      
            lassign $PH($root) rcost rchild rprev rnext

            lset PH($root)   1 $x      ;# .child
            lset PH($x)      2 $root   ;# .prev
            lset PH($rchild) 2 $x      ;# .prev
            lset PH($x)      3 $rchild ;# .next
        }
    }
    set acost
}
# ************************************************************************
proc pheap::insert {id item cost} {
    upvar #0 ::pheap::ph$id PH  ::pheap::root$id root  ::pheap::cmp$id cmp
    if {$root eq {}} {
        set PH($item) [list $cost {} {} {}]
        set root $item
    } else {
        lassign $PH($root) rcost rchild rprev rnext
        if {[{*}$cmp $cost $rcost] <= 0} { ;# item becomes new root
            lset PH($root) 2 $item
            set PH($item) [list $cost $root {} {}]
            set root $item
        } else {               ;# item becomes first-child of root
            lset PH($root) 1 $item
            set PH($item) [list $cost {} $root $rchild]
            if {$rchild ne {}} { lset PH($rchild) 2 $item }
        }
    }
}
# ************************************************************************
    ;#                  x -      y - C
    ;#                  |    +   |
    ;#                  A        B
    ;#   -------------------- -------------------
    ;#          (x<=y)               (x>y)
    ;#
    ;#           x - C                y - C
    ;#           |                    |
    ;#           y - A                x - B
    ;#           |                    |
    ;#           B                    A
    ;#
proc pheap::_compLink {id x} {
    upvar #0 ::pheap::ph$id PH  ::pheap::cmp$id cmp

    lassign $PH($x) xcost xchild xprev xnext
    set y $xnext ;# implicit para, instead of:  assert (xnext=={})
    lassign $PH($y) ycost ychild yprev ynext
    # Beachte:  Durch den 'implicit para' Trick gilt anfangs
    #   ($yprev eq $x) und ($xnext eq $y), aber $xprev kann beliebig sein!
    #   Ich verwende nur ($xnext eq $y)! [D.h. yprev darf Unsinn enthalten]

    if {[{*}$cmp $xcost $ycost] <= 0} {  ;# x bleibt Root.
        set PH($x) [list $xcost $y      $xprev $ynext]
        set PH($y) [list $ycost $ychild $x     $xchild]

        if {$ynext ne {}}  { lset PH($ynext) 2 $x }  ;# C.prev ist jetzt x
        if {$xchild ne {}} { lset PH($xchild) 2 $y } ;# A.prev ist jetzt y
        return $x
    }

    # y wird Root.
    set PH($x) [list $xcost $xchild $y     $ychild]
    set PH($y) [list $ycost $x      $xprev $ynext]

    if {$xprev ne {}} {
        if {[lindex $PH($xprev) 1] eq $x} {      ;# xprev.child ist jetzt y
            lset PH($xprev) 1 $y
        } else {                                 ;# xprev.next ist jetzt y
            # assert {[lindex $PH($xprev) 3] eq $x}
            lset PH($xprev) 3 $y
        }
     }
    if {$ychild ne {}} { lset PH($ychild) 2 $x } ;# B.prev ist jetzt x
    set y
}
# ************************************************************************
    ;# Reduce siblings of child x to null and return new child.
    ;#     assert {$x ne {}}
    ;# Uses O(log(n)) amortized time?!
    ;# The real workhorse of the pairing heap.
    ;# Uses "two-pass merging" of siblings until no siblings are left.
    ;# 
    ;#         |
    ;#         x - c1 - c2 - c3 - c4
    ;#         |   |    |    |    |  
    ;#         A   B1   B2   B3   B4
    ;#     -----------------------------
    ;# pass1:
    ;#         |
    ;#         x - c1c2 - c3c4 
    ;#         |    |      |   
    ;#         A   B1B2   B3B4 
    ;#     -----------------------------
    ;# pass2:
    ;#         |
    ;#         xc1c2c3c4
    ;#         |
    ;#         AB1B2B3B4
    ;#
proc pheap::_twoPass {id x} {
    upvar #0 ::pheap::ph$id PH

    # list:   0=.cost 1=.child 2=.prev 3=.next
    lassign $PH($x) kcost kchild kprev knext
    if {$knext eq {}} { return $x }

    # pass1:  left-to-right merging pairs of children.
    while {1} {
        lassign $PH($x) xcost xchild xprev xnext
        if {$xnext eq {}} { break }

        set y [_compLink $id $x]
        lassign $PH($y) ycost ychild yprev ynext

        if {$ynext eq {}} { set x $y;  break }
        set x $ynext
    }

    # pass2:  right-to-left merging with the current result.
    while {1} {
        lassign $PH($x) xcost xchild xprev xnext
        if {$xprev eq $kprev} break

        set x [_compLink $id $xprev]
    }
    set x
}
# ************************************************************************
    ;# Sorts the list.
    ;# Mainly as an example to illustrate pheap and for testing.
    ;# 
proc pheap::sort {alist args} {
    set q [::pheap::pheap {*}$args ::pheap[incr ::pheap::counter]]
    foreach i $alist { $q setp [incr cnt] $i }

    set rval {}
    while {[$q popmin m c]} { lappend rval $c }
    $q destroy
    set rval
}
# ************************************************************************

Example1. Sort a list using pheap

proc mysort {alist args} {
    set q [::pheap::pheap {*}$args ::mypheap]
    foreach i $alist { $q setp [incr cnt] $i }

    set rval {}
    while {[$q popmin m c]} { lappend rval $c }
    $q destroy
    set rval
}

Remarks

Since this is the first version of this package, some of the functions are still experimental and might change or vanish in a future release. [The function destroy might loose its parameter name, once I find out how to find out the command-name that called it (info level 0 and info frame 2 didn't seem to do the trick).

As it is now, pheap relies on a hashtable (array, or dict) as backbone in its implementation, endorsing functions that use an item as index. For C internal purposes one can easily adapt the algorithmic ideas of the implementation without using any hashtable and store all pointers in the items themselves.

Comparison with package struct::prioqueue: Prioqueue lacks the sometimes essential functionality of setp or promote and its implementation seems to be a bubblesort in disguise, which is detrimental to performance if one is dealing with many items. Pheap on the other hand, reflects more closely the intention and performance characteristics of a priority queue and it seems to be more versatile, too. (The interface of get and peek in struct::prioqueue makes me frown, too. Imho the distinction of lists and strings should not be blurred more, but less.)

Functions using count as parameter require sorting at least the first count items, which makes them potentially costly. Therefore these functions are in the experimental section.

While example 1 shows one usage of priority queues, it is a bit uncharacteristic, because usually the items are the important part and not the priorities. More typical are algorithms like Dijkstra's shortest path, or A*, where the items are nodes and the priorities are distances or costs. Those algorithms greatly benefit from pheap, where cost estimates can dynamically be adapted.

Bugs, Ideas, Feedback

This document, and the package it describes, will undoubtedly contain bugs and other problems. Please report such problems with title "pheap" to [string map {at @ x . ext .com} [append _ Florian x Murr "at" siemens ext] . Please also report any ideas for enhancements you may have for either package and/or documentation.


enter categories here