boost::thread and Tk

http://www.tfh-berlin.de/~sleuthold/files/tcl/tk_threaded.jpg

This is a simple example to learn something about threaded applications using boost::thread's in a C++ environment with a Tk front end. It's simple and not perfect. It is only a little showcase to get an idea how that works and what could be done better.

(Sources: [L1 ])

Sources:


//Tcl_Handling.h
#include <tk.h>

int Tk_AppInit(Tcl_Interp* interp);
int startJob_ObjCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj*CONST objv[]);
int jobFinished_ObjCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj*CONST objv[]);
int getJobResult_ObjCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj*CONST objv[]);
int cancelJob_ObjCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj*CONST objv[]);

//Tcl_Handling.cxx

#include <cassert>

#include "Tcl_Handling.h"
#include "Job_Handling.h"

int Tk_AppInit(Tcl_Interp* interp)
{
        if (Tcl_Init(interp) == TCL_ERROR)
                return TCL_ERROR;

        if (Tk_Init(interp) == TCL_ERROR)
                return TCL_ERROR;

        //define new commands here
        Tcl_CreateObjCommand(interp, "::tkmain::startJob", startJob_ObjCmd,
                &g_GetJobQueue(), (Tcl_CmdDeleteProc*)NULL);

        Tcl_CreateObjCommand(interp, "::tkmain::jobFinished", jobFinished_ObjCmd,
                &g_GetJobQueue(), (Tcl_CmdDeleteProc*)NULL);

        Tcl_CreateObjCommand(interp, "::tkmain::getJobResult", getJobResult_ObjCmd,
                &g_GetJobQueue(), (Tcl_CmdDeleteProc*)NULL);
         
        Tcl_CreateObjCommand(interp, "::tkmain::cancelJob", cancelJob_ObjCmd,
                &g_GetJobQueue(), (Tcl_CmdDeleteProc*)NULL);

        return TCL_OK;
}

int startJob_ObjCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj*CONST objv[])
{
        
        if (objc != 3)
        {
                Tcl_WrongNumArgs(interp, 1, objv, "op n");
                return TCL_ERROR;
        }

        std::string op_type = Tcl_GetStringFromObj(objv[1],NULL);

        int n;
        if (Tcl_GetIntFromObj(interp, objv[2], &n) != TCL_OK)
                return TCL_ERROR;


        JobQueue* _jobs = reinterpret_cast<JobQueue*>(clientData);

        if (!_jobs)
                return TCL_ERROR;

        jobID id = _jobs->AppendJob(op_type, JobArg(static_cast<double>(n)));

        Tcl_SetObjResult(interp,Tcl_NewIntObj(id));        
        return TCL_OK;
}

int jobFinished_ObjCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj*CONST objv[])
{
        
        if (objc != 2)
        {
                Tcl_WrongNumArgs(interp, 1, objv, "id");
                return TCL_ERROR;
        }

        jobID id;
        if (Tcl_GetIntFromObj(interp, objv[1], &id) != TCL_OK)
                return TCL_ERROR;


        JobQueue* _jobs = reinterpret_cast<JobQueue*>(clientData);

        if (!_jobs)
                return TCL_ERROR;

        if (_jobs->JobFinished(id))
                Tcl_SetObjResult(interp,Tcl_NewBooleanObj(1));        
        else
                Tcl_SetObjResult(interp,Tcl_NewBooleanObj(0));        
        return TCL_OK;
}



int getJobResult_ObjCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj*CONST objv[])
{
        if (objc != 2)
        {
                Tcl_WrongNumArgs(interp, 1, objv, "id");
                return TCL_ERROR;
        }

        jobID id;
        if (Tcl_GetIntFromObj(interp, objv[1], &id) != TCL_OK)
                return TCL_ERROR;


        JobQueue* _jobs = reinterpret_cast<JobQueue*>(clientData);

        if (!_jobs)
                return TCL_ERROR;

        assert(_jobs->JobFinished(id));
        const JobArg res = _jobs->GetJobResult(id);

        switch (res.GetType())
        {
        case JobArg::BOOL:
                {
                        if (res.GetBool())
                                Tcl_SetObjResult(interp,Tcl_NewBooleanObj(1));        
                        else
                                Tcl_SetObjResult(interp,Tcl_NewBooleanObj(0));        
                        break;
                }
        case JobArg::DOUBLE:
                {
                        Tcl_SetObjResult(interp,Tcl_NewDoubleObj(res.GetDouble()));        
                        break;
                }
        case JobArg::STRING:
                {
                        Tcl_SetObjResult(interp,Tcl_NewStringObj(res.GetString().c_str(), -1));
                        break;
                }
        default:
                Tcl_SetObjResult(interp,Tcl_NewStringObj("error", -1));
        }
        
        return TCL_OK;
}

int cancelJob_ObjCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj*CONST objv[])
{
        if (objc != 2)
        {
                Tcl_WrongNumArgs(interp, 1, objv, "id");
                return TCL_ERROR;
        }

        jobID id;
        if (Tcl_GetIntFromObj(interp, objv[1], &id) != TCL_OK)
                return TCL_ERROR;


        JobQueue* _jobs = reinterpret_cast<JobQueue*>(clientData);

        if (!_jobs)
                return TCL_ERROR;
        
        _jobs->Cancel(id);

        return TCL_OK;
}

//Job_Handling.h

#include <boost/noncopyable.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <cstdlib>
#include <list>
#include <string>
#include <map>


typedef int jobID;

static jobID g_count = 0;
jobID g_Count();


//quick and dirty
class JobArg {
public:
        enum type {DOUBLE, STRING, BOOL, NAN};
        JobArg():_t(NAN) {}
        JobArg(std::string s):_s(s),_t(STRING) {}
        JobArg(double d):_d(d),_t(DOUBLE) {}
        JobArg(bool b):_b(b),_t(BOOL) {}

        inline type                        GetType()        const {return _t;}
        inline std::string        GetString()        const {return _s;}
        inline double                GetDouble()        const {return _d;}
        inline bool                        GetBool()        const {return _b;}
private:
        type                _t;
        double                _d;
        std::string _s;
        bool                _b;
};


class JobItem {
public:
        JobItem(JobArg arg):a(arg),id(g_Count()){ }
        void operator()();
        virtual ~JobItem() {};
        jobID GetJobID() const {return id;}
protected:
        const JobArg& GetArg() const {return a;}
        virtual void run() = 0;
        JobArg a;
        jobID id;
};

class approx_pi : public JobItem
{
public:
        approx_pi(JobArg arg):JobItem(arg){ }
protected:
    void run();
};

class approx_exp : public JobItem
{
public:
        approx_exp(JobArg arg):JobItem(arg){ }
protected:
    void run();
};

class JobQueue : private boost::noncopyable
{
public:
        JobQueue(){}

        enum JobType {APPOXPI};

        jobID AppendJob(const std::string& type, JobArg data);
        void RunNextJob();
        int Size();

        void AppendResult(jobID id, JobArg res);
        bool JobFinished(jobID id) const;
        const JobArg GetJobResult(jobID id) const;
        void Cancel(jobID id);
private:
        std::list<JobItem*> _jobs;
        std::map<jobID, JobArg> _results;
        boost::mutex monitor;
        boost::thread exe_thread; //dangerous
};

void WorkHorse();

JobQueue& g_GetJobQueue();

//Job_Handling.cxx

#include <tk.h>

#include <boost/thread/xtime.hpp>
#include <boost/math/special_functions/gamma.hpp>
#include <iostream>
#include <cassert>

#include "Job_Handling.h"

void JobItem::operator()() { 
        try
        {
                run(); 
        }
        catch(boost::thread_interrupted&)
        {
                g_GetJobQueue().AppendResult(GetJobID(),JobArg());
        }
}

void approx_pi::run() { 

        assert(GetArg().GetType() == JobArg::DOUBLE);
        int n = static_cast<int>(GetArg().GetDouble());

        int c_count = 0;
        double Xd,Yd;
        for (int i = 0; i < n; ++i)
        {
                Xd = static_cast<double>(rand());
                Yd = static_cast<double>(rand());

                Xd/=static_cast<double>(RAND_MAX);
                Yd/=static_cast<double>(RAND_MAX);

                if ( sqrt(Xd*Xd + Yd*Yd) < 1.0 ) ++c_count;
                boost::this_thread::interruption_point();
        }

        double res = 4.0 * static_cast<double>(c_count) / static_cast<double>(n);

        g_GetJobQueue().AppendResult(GetJobID(),res);
}

void approx_exp::run() { 

        assert(GetArg().GetType() == JobArg::DOUBLE);
        double n = GetArg().GetDouble();

        double e = 1.0;
        double plus = 0.0;
        for (double i = 1.0; i < n; ++i)
        {
                plus = 1.0/boost::math::tgamma(i);
                if (plus < 10e-15)
                        break;
                e+=plus;
                boost::this_thread::interruption_point();
        }
        g_GetJobQueue().AppendResult(GetJobID(),e);
}


jobID JobQueue::AppendJob(const std::string& type, JobArg data) {
        boost::mutex::scoped_lock lock(monitor);
        jobID id = -1;
        std::cout << "append a job" << std::endl;

        JobItem* job = 0;
        if (type == "pi")
                job = new approx_pi(data);
        else if (type == "exp")
                job = new approx_exp(data);

        if (job)
        {
                _jobs.push_back(job);
                id = job->GetJobID();
        }
        
        assert(id != -1);
        return id;
}

void JobQueue::RunNextJob()
{
        monitor.lock();
        std::cout << "start a job" << std::endl;
        std::list<JobItem*>::iterator iter = _jobs.begin();
        //temporary todo: inheritance behavior is difficult with boost?!
        approx_pi* j = dynamic_cast<approx_pi*>(*iter);

        if (j)
        {
                _jobs.pop_front();
                monitor.unlock();

                exe_thread = boost::thread(*j);
                exe_thread.join();
                delete j;
        }
        else
        {
                approx_exp* j = dynamic_cast<approx_exp*>(*iter);
                _jobs.pop_front();
                monitor.unlock();

                exe_thread = boost::thread(*j);
                exe_thread.join();
                delete j;
        }

        

        std::cout << "job finished" << std::endl;
}

int JobQueue::Size() {
        boost::mutex::scoped_lock lock(monitor);
        return static_cast<int>(_jobs.size());
}

void JobQueue::AppendResult(jobID id, JobArg res)
{
        boost::mutex::scoped_lock lock(monitor);
        _results.insert(std::make_pair<jobID, JobArg>(id, res));
}

bool JobQueue::JobFinished(jobID id) const
{
        std::map<jobID, JobArg>::const_iterator iter = _results.find(id);
        return (iter != _results.end());
}

const JobArg JobQueue::GetJobResult(jobID id) const
{
        std::map<jobID, JobArg>::const_iterator iter = _results.find(id);
        if (iter != _results.end())
                return iter->second;
        return JobArg();
}

void JobQueue::Cancel(jobID id)
{
        std::list<JobItem*>::iterator iter;
        for (iter = _jobs.begin(); iter != _jobs.end(); ++iter)
        {
                if ((*iter)->GetJobID() == id)
                {
                        monitor.lock();
                        delete *iter;
                        _jobs.erase(iter);
                        monitor.unlock();
                        g_GetJobQueue().AppendResult(id,JobArg());
                        return;
                }
        }
        //current exe thread
        exe_thread.interrupt();
}

JobQueue jobs;

void WorkHorse()
{
        while(1)
        {
                if(g_GetJobQueue().Size())
                {
                        g_GetJobQueue().RunNextJob();
                }
                else
                {
                        boost::xtime xt;
                        boost::xtime_get(&xt, boost::TIME_UTC);
                        xt.sec += 1;
                        boost::thread::sleep(xt);
                        std::cout << "wait for work " << std::endl;
                }
        }
}

JobQueue& g_GetJobQueue()
{
        return jobs;
}

jobID g_Count()
{
        return ++g_count;
}

//Tk_Main_Threaded.cpp
// Tk_Main_Threaded.cpp : Defines the entry point for the console application.
//

#include <tk.h>
#include <boost/thread/thread.hpp>

#include "Job_Handling.h"
#include "Tcl_Handling.h"


class TclThread
{
public:
        TclThread(int aargc, char** aargv):argc(aargc),argv(aargv) { }
    void operator()() { init(); }
private:
    void init() { 
                Tk_Main(argc, argv, Tk_AppInit);
        }
private:
        int argc;
        char** argv;
};

int main(int argc, char* argv[])
{
        boost::thread_group threads;

        TclThread tcl(argc, argv);

        threads.create_thread(tcl);
        threads.create_thread(WorkHorse);

        threads.join_all();

        return 0;
}


//main.tcl
lappend auto_path .


namespace eval ::tkmain {

    variable ns [namespace current]
}


proc ::tkmain::BuildGui {} {
    
    variable ns
    
    variable title "thread test : "
    
    set fStart [frame .fStart]
    
    set startJob [button $fStart.startJob -text "Start" -command ${ns}::start]
    
    variable op "pi"
    set radio [frame $fStart.radio]
    foreach item {pi exp dummy} {
        set radio${item} [radiobutton $radio.radio${item} \
                -text $item \
                -value $item \
                -variable ${ns}::op]
        if {$item == "dummy"} {
            [set radio${item}] config -state disabled
        }
        pack [set radio${item}] -anchor w
    }
    
    variable textVar "100000000"
    set textField [entry $fStart.textField -textvariable ${ns}::textVar]
    
    variable opFrame [labelframe .opFrame -text "Jobs"]
    
    pack $startJob $radio $textField -side left -padx 5 -pady 5
    pack $fStart -fill x -expand false -anchor n -padx 10 -pady 10
    pack $opFrame -fill both -expand true -padx 10 -pady 10
    
    wm geometry . 400x500
    
    variable pending_jobs [list]
    
    UpdateGUI
}
proc ::tkmain::AddJobControl {id op arg} {
    
    variable ns
    variable opFrame
    
    set tFrame [frame $opFrame.tFrame${id}]
    set b [button $tFrame.b -text "Cancel" -command "${ns}::cancelJob $id"]
    set l [label $tFrame.l -text "$op\($arg\) = "]
    
    pack $b $l -side left
    pack $tFrame -fill x
    
    variable pending_jobs
    lappend pending_jobs $id
}
proc ::tkmain::AddResult {id res} {
    
    variable opFrame
    set f $opFrame.tFrame${id}
    
    $f.b config -state disabled
    $f.l config -text "[$f.l cget -text] $res"
    
    variable pending_jobs
    set i [lsearch $pending_jobs $id]
    if {$i != -1} {
        set pending_jobs [lreplace $pending_jobs $i $i]
    }
}
proc ::tkmain::start {} {
    variable textVar
    variable op
    set id [startJob $op $textVar]
    if {$id != -1} {
        AddJobControl $id $op $textVar
    }
}
proc ::tkmain::UpdateGUI {} {
    
    variable title
    wm title . "$title [clock format [clock seconds] -format %H:%M:%S]"
    
    UpdateJobs
    
    variable ns
    after 100 ${ns}::UpdateGUI
}
proc ::tkmain::UpdateJobs {} {
    variable pending_jobs
    foreach job $pending_jobs {
        if {[jobFinished $job]} {
            AddResult $job [getJobResult $job]
        }
    }
}

::tkmain::BuildGui