Version 0 of boost::thread and Tk

Updated 2008-11-17 21:10:24 by SL

http://www.tfh-berlin.de/~sleuthold/pics/tk_threaded.jpg

This is a simple example to learn something about threaded applications using boost::thread's in a C++ environment with a Tk front end. It's simple and not perfect. It is only a little showcase to get an idea how that works and what could be done better.

Sources:


===== //Tcl_Handling.h #include <tk.h>

int Tk_AppInit(Tcl_Interp* interp); int startJob_ObjCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj*CONST objv); int jobFinished_ObjCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj*CONST objv); int getJobResult_ObjCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj*CONST objv); int cancelJob_ObjCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj*CONST objv);

//Tcl_Handling.cxx

#include <cassert>

#include "Tcl_Handling.h" #include "Job_Handling.h"

int Tk_AppInit(Tcl_Interp* interp) {

        if (Tcl_Init(interp) == TCL_ERROR)
                return TCL_ERROR;

        if (Tk_Init(interp) == TCL_ERROR)
                return TCL_ERROR;

        //define new commands here
        Tcl_CreateObjCommand(interp, "::tkmain::startJob", startJob_ObjCmd,
                &g_GetJobQueue(), (Tcl_CmdDeleteProc*)NULL);

        Tcl_CreateObjCommand(interp, "::tkmain::jobFinished", jobFinished_ObjCmd,
                &g_GetJobQueue(), (Tcl_CmdDeleteProc*)NULL);

        Tcl_CreateObjCommand(interp, "::tkmain::getJobResult", getJobResult_ObjCmd,
                &g_GetJobQueue(), (Tcl_CmdDeleteProc*)NULL);

        Tcl_CreateObjCommand(interp, "::tkmain::cancelJob", cancelJob_ObjCmd,
                &g_GetJobQueue(), (Tcl_CmdDeleteProc*)NULL);

        return TCL_OK;

}

int startJob_ObjCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj*CONST objv) {

        if (objc != 3)
        {
                Tcl_WrongNumArgs(interp, 1, objv, "op n");
                return TCL_ERROR;
        }

        std::string op_type = Tcl_GetStringFromObj(objv[1],NULL);

        int n;
        if (Tcl_GetIntFromObj(interp, objv[2], &n) != TCL_OK)
                return TCL_ERROR;


        JobQueue* _jobs = reinterpret_cast<JobQueue*>(clientData);

        if (!_jobs)
                return TCL_ERROR;

        jobID id = _jobs->AppendJob(op_type, JobArg(static_cast<double>(n)));

        Tcl_SetObjResult(interp,Tcl_NewIntObj(id));        
        return TCL_OK;

}

int jobFinished_ObjCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj*CONST objv) {

        if (objc != 2)
        {
                Tcl_WrongNumArgs(interp, 1, objv, "id");
                return TCL_ERROR;
        }

        jobID id;
        if (Tcl_GetIntFromObj(interp, objv[1], &id) != TCL_OK)
                return TCL_ERROR;


        JobQueue* _jobs = reinterpret_cast<JobQueue*>(clientData);

        if (!_jobs)
                return TCL_ERROR;

        if (_jobs->JobFinished(id))
                Tcl_SetObjResult(interp,Tcl_NewBooleanObj(1));        
        else
                Tcl_SetObjResult(interp,Tcl_NewBooleanObj(0));        
        return TCL_OK;

}

int getJobResult_ObjCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj*CONST objv) {

        if (objc != 2)
        {
                Tcl_WrongNumArgs(interp, 1, objv, "id");
                return TCL_ERROR;
        }

        jobID id;
        if (Tcl_GetIntFromObj(interp, objv[1], &id) != TCL_OK)
                return TCL_ERROR;


        JobQueue* _jobs = reinterpret_cast<JobQueue*>(clientData);

        if (!_jobs)
                return TCL_ERROR;

        assert(_jobs->JobFinished(id));
        const JobArg res = _jobs->GetJobResult(id);

        switch (res.GetType())
        {
        case JobArg::BOOL:
                {
                        if (res.GetBool())
                                Tcl_SetObjResult(interp,Tcl_NewBooleanObj(1));        
                        else
                                Tcl_SetObjResult(interp,Tcl_NewBooleanObj(0));        
                        break;
                }
        case JobArg::DOUBLE:
                {
                        Tcl_SetObjResult(interp,Tcl_NewDoubleObj(res.GetDouble()));        
                        break;
                }
        case JobArg::STRING:
                {
                        Tcl_SetObjResult(interp,Tcl_NewStringObj(res.GetString().c_str(), -1));
                        break;
                }
        default:
                Tcl_SetObjResult(interp,Tcl_NewStringObj("error", -1));
        }

        return TCL_OK;

}

int cancelJob_ObjCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj*CONST objv) {

        if (objc != 2)
        {
                Tcl_WrongNumArgs(interp, 1, objv, "id");
                return TCL_ERROR;
        }

        jobID id;
        if (Tcl_GetIntFromObj(interp, objv[1], &id) != TCL_OK)
                return TCL_ERROR;


        JobQueue* _jobs = reinterpret_cast<JobQueue*>(clientData);

        if (!_jobs)
                return TCL_ERROR;

        _jobs->Cancel(id);

        return TCL_OK;

}

//Job_Handling.h

#include <boost/noncopyable.hpp> #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> #include <cstdlib> #include <list> #include <string> #include <map>

typedef int jobID;

static jobID g_count = 0; jobID g_Count();

//quick and dirty class JobArg { public:

        enum type {DOUBLE, STRING, BOOL, NAN};
        JobArg():_t(NAN) {}
        JobArg(std::string s):_s(s),_t(STRING) {}
        JobArg(double d):_d(d),_t(DOUBLE) {}
        JobArg(bool b):_b(b),_t(BOOL) {}

        inline type                        GetType()        const {return _t;}
        inline std::string        GetString()        const {return _s;}
        inline double                GetDouble()        const {return _d;}
        inline bool                        GetBool()        const {return _b;}

private:

        type                _t;
        double                _d;
        std::string _s;
        bool                _b;

};

class JobItem { public:

        JobItem(JobArg arg):a(arg),id(g_Count()){ }
        void operator()();
        virtual ~JobItem() {};
        jobID GetJobID() const {return id;}

protected:

        const JobArg& GetArg() const {return a;}
        virtual void run() = 0;
        JobArg a;
        jobID id;

};

class approx_pi : public JobItem { public:

        approx_pi(JobArg arg):JobItem(arg){ }

protected:

    void run();

};

class approx_exp : public JobItem { public:

        approx_exp(JobArg arg):JobItem(arg){ }

protected:

    void run();

};

class JobQueue : private boost::noncopyable { public:

        JobQueue(){}

        enum JobType {APPOXPI};

        jobID AppendJob(const std::string& type, JobArg data);
        void RunNextJob();
        int Size();

        void AppendResult(jobID id, JobArg res);
        bool JobFinished(jobID id) const;
        const JobArg GetJobResult(jobID id) const;
        void Cancel(jobID id);

private:

        std::list<JobItem*> _jobs;
        std::map<jobID, JobArg> _results;
        boost::mutex monitor;
        boost::thread exe_thread; //dangerous

};

void WorkHorse();

JobQueue& g_GetJobQueue();

//Job_Handling.cxx

#include <tk.h>

#include <boost/thread/xtime.hpp> #include <boost/math/special_functions/gamma.hpp> #include <iostream> #include <cassert>

#include "Job_Handling.h"

void JobItem::operator()() {

        try
        {
                run(); 
        }
        catch(boost::thread_interrupted&)
        {
                g_GetJobQueue().AppendResult(GetJobID(),JobArg());
        }

}

void approx_pi::run() {

        assert(GetArg().GetType() == JobArg::DOUBLE);
        int n = static_cast<int>(GetArg().GetDouble());

        int c_count = 0;
        double Xd,Yd;
        for (int i = 0; i < n; ++i)
        {
                Xd = static_cast<double>(rand());
                Yd = static_cast<double>(rand());

                Xd/=static_cast<double>(RAND_MAX);
                Yd/=static_cast<double>(RAND_MAX);

                if ( sqrt(Xd*Xd + Yd*Yd) < 1.0 ) ++c_count;
                boost::this_thread::interruption_point();
        }

        double res = 4.0 * static_cast<double>(c_count) / static_cast<double>(n);

        g_GetJobQueue().AppendResult(GetJobID(),res);

}

void approx_exp::run() {

        assert(GetArg().GetType() == JobArg::DOUBLE);
        double n = GetArg().GetDouble();

        double e = 1.0;
        double plus = 0.0;
        for (double i = 1.0; i < n; ++i)
        {
                plus = 1.0/boost::math::tgamma(i);
                if (plus < 10e-15)
                        break;
                e+=plus;
                boost::this_thread::interruption_point();
        }
        g_GetJobQueue().AppendResult(GetJobID(),e);

}

jobID JobQueue::AppendJob(const std::string& type, JobArg data) {

        boost::mutex::scoped_lock lock(monitor);
        jobID id = -1;
        std::cout << "append a job" << std::endl;

        JobItem* job = 0;
        if (type == "pi")
                job = new approx_pi(data);
        else if (type == "exp")
                job = new approx_exp(data);

        if (job)
        {
                _jobs.push_back(job);
                id = job->GetJobID();
        }

        assert(id != -1);
        return id;

}

void JobQueue::RunNextJob() {

        monitor.lock();
        std::cout << "start a job" << std::endl;
        std::list<JobItem*>::iterator iter = _jobs.begin();
        //temporary todo: inheritance behavior is difficult with boost?!
        approx_pi* j = dynamic_cast<approx_pi*>(*iter);

        if (j)
        {
                _jobs.pop_front();
                monitor.unlock();

                exe_thread = boost::thread(*j);
                exe_thread.join();
                delete j;
        }
        else
        {
                approx_exp* j = dynamic_cast<approx_exp*>(*iter);
                _jobs.pop_front();
                monitor.unlock();

                exe_thread = boost::thread(*j);
                exe_thread.join();
                delete j;
        }



        std::cout << "job finished" << std::endl;

}

int JobQueue::Size() {

        boost::mutex::scoped_lock lock(monitor);
        return static_cast<int>(_jobs.size());

}

void JobQueue::AppendResult(jobID id, JobArg res) {

        boost::mutex::scoped_lock lock(monitor);
        _results.insert(std::make_pair<jobID, JobArg>(id, res));

}

bool JobQueue::JobFinished(jobID id) const {

        std::map<jobID, JobArg>::const_iterator iter = _results.find(id);
        return (iter != _results.end());

}

const JobArg JobQueue::GetJobResult(jobID id) const {

        std::map<jobID, JobArg>::const_iterator iter = _results.find(id);
        if (iter != _results.end())
                return iter->second;
        return JobArg();

}

void JobQueue::Cancel(jobID id) {

        std::list<JobItem*>::iterator iter;
        for (iter = _jobs.begin(); iter != _jobs.end(); ++iter)
        {
                if ((*iter)->GetJobID() == id)
                {
                        monitor.lock();
                        delete *iter;
                        _jobs.erase(iter);
                        monitor.unlock();
                        g_GetJobQueue().AppendResult(id,JobArg());
                        return;
                }
        }
        //current exe thread
        exe_thread.interrupt();

}

JobQueue jobs;

void WorkHorse() {

        while(1)
        {
                if(g_GetJobQueue().Size())
                {
                        g_GetJobQueue().RunNextJob();
                }
                else
                {
                        boost::xtime xt;
                        boost::xtime_get(&xt, boost::TIME_UTC);
                        xt.sec += 1;
                        boost::thread::sleep(xt);
                        std::cout << "wait for work " << std::endl;
                }
        }

}

JobQueue& g_GetJobQueue() {

        return jobs;

}

jobID g_Count() {

        return ++g_count;

}

//Tk_Main_Threaded.cpp // Tk_Main_Threaded.cpp : Defines the entry point for the console application. //

#include <tk.h> #include <boost/thread/thread.hpp>

#include "Job_Handling.h" #include "Tcl_Handling.h"

class TclThread { public:

        TclThread(int aargc, char** aargv):argc(aargc),argv(aargv) { }
    void operator()() { init(); }

private:

    void init() { 
                Tk_Main(argc, argv, Tk_AppInit);
        }

private:

        int argc;
        char** argv;

};

int main(int argc, char* argv) {

        boost::thread_group threads;

        TclThread tcl(argc, argv);

        threads.create_thread(tcl);
        threads.create_thread(WorkHorse);

        threads.join_all();

        return 0;

}

//main.tcl lappend auto_path .

namespace eval ::tkmain {

    variable ns [namespace current]

}

proc ::tkmain::BuildGui {} {

    variable ns

    variable title "thread test : "

    set fStart [frame .fStart]

    set startJob [button $fStart.startJob -text "Start" -command ${ns}::start]

    variable op "pi"
    set radio [frame $fStart.radio]
    foreach item {pi exp dummy} {
        set radio${item} [radiobutton $radio.radio${item} \
                -text $item \
                -value $item \
                -variable ${ns}::op]
        if {$item == "dummy"} {
            [set radio${item}] config -state disabled
        }
        pack [set radio${item}] -anchor w
    }

    variable textVar "100000000"
    set textField [entry $fStart.textField -textvariable ${ns}::textVar]

    variable opFrame [labelframe .opFrame -text "Jobs"]

    pack $startJob $radio $textField -side left -padx 5 -pady 5
    pack $fStart -fill x -expand false -anchor n -padx 10 -pady 10
    pack $opFrame -fill both -expand true -padx 10 -pady 10

    wm geometry . 400x500

    variable pending_jobs [list]

    UpdateGUI

} proc ::tkmain::AddJobControl {id op arg} {

    variable ns
    variable opFrame

    set tFrame [frame $opFrame.tFrame${id}]
    set b [button $tFrame.b -text "Cancel" -command "${ns}::cancelJob $id"]
    set l [label $tFrame.l -text "$op\($arg\) = "]

    pack $b $l -side left
    pack $tFrame -fill x

    variable pending_jobs
    lappend pending_jobs $id

} proc ::tkmain::AddResult {id res} {

    variable opFrame
    set f $opFrame.tFrame${id}

    $f.b config -state disabled
    $f.l config -text "[$f.l cget -text] $res"

    variable pending_jobs
    set i [lsearch $pending_jobs $id]
    if {$i != -1} {
        set pending_jobs [lreplace $pending_jobs $i $i]
    }

} proc ::tkmain::start {} {

    variable textVar
    variable op
    set id [startJob $op $textVar]
    if {$id != -1} {
        AddJobControl $id $op $textVar
    }

} proc ::tkmain::UpdateGUI {} {

    variable title
    wm title . "$title [clock format [clock seconds] -format %H:%M:%S]"

    UpdateJobs

    variable ns
    after 100 ${ns}::UpdateGUI

} proc ::tkmain::UpdateJobs {} {

    variable pending_jobs
    foreach job $pending_jobs {
        if {[jobFinished $job]} {
            AddResult $job [getJobResult $job]
        }
    }

}

::tkmain::BuildGui =====