Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /include/root/TThreadPool.h was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 // @(#)root/thread:$Id$
0002 // Author: Anar Manafov   20/09/2011
0003 
0004 /*************************************************************************
0005  * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers.               *
0006  * All rights reserved.                                                  *
0007  *                                                                       *
0008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
0009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
0010  *************************************************************************/
0011 
0012 #ifndef ROOT_TThreadPool
0013 #define ROOT_TThreadPool
0014 
0015 //////////////////////////////////////////////////////////////////////////
0016 //                                                                      //
0017 // TThreadPool                                                          //
0018 //                                                                      //
0019 //                                                                      //
0020 //////////////////////////////////////////////////////////////////////////
0021 
0022 // ROOT
0023 #include "RtypesCore.h"
0024 #include "TMutex.h"
0025 #include "TCondition.h"
0026 #include "TThread.h"
0027 // STD
0028 #include <queue>
0029 #include <vector>
0030 #include <iostream>
0031 #include <sstream>
0032 #include <utility>
0033 #ifdef _MSC_VER
0034 #define sleep(s) _sleep(s)
0035 #else
0036 #include <unistd.h>
0037 #endif
0038 
0039 
0040 //////////////////////////////////////////////////////////////////////////
0041 //                                                                      //
0042 // TNonCopyable                                                         //
0043 // Class which makes child to be non-copyable object.                   //
0044 //                                                                      //
0045 //////////////////////////////////////////////////////////////////////////
0046 class TNonCopyable {
0047 protected:
0048    TNonCopyable() { }
0049    ~TNonCopyable() { }
0050 private:
0051    TNonCopyable(const TNonCopyable&);
0052    const TNonCopyable& operator=(const TNonCopyable&);
0053 };
0054 
0055 //////////////////////////////////////////////////////////////////////////
0056 //                                                                      //
0057 // TThreadPoolTaskImp                                                   //
0058 // A base class for thread pool tasks. Users must inherit their         //
0059 // tasks classes from it.                                               //
0060 // Example:                                                             //
0061 //        class TTestTask: public TThreadPoolTaskImp<TTestTask, int>    //
0062 //                                                                      //
0063 //        in this example,                                              //
0064 //           TTestTask - is a user class, which implements              //
0065 //                       thread pool task object.                       //
0066 //           int - is a type of argument to TTestTask::run method.      //
0067 //                                                                      //
0068 // Please see the tutorial "tutorials/thread/threadPool.C" for          //
0069 // more details on how to use TThreadPool.                              //
0070 //                                                                      //
0071 //////////////////////////////////////////////////////////////////////////
0072 template <class aTask, class aParam>
0073 class TThreadPoolTaskImp {
0074 public:
0075    bool run(aParam &param) {
0076       aTask *pThis = reinterpret_cast<aTask *>(this);
0077       return pThis->runTask(param);
0078    }
0079 };
0080 
0081 //////////////////////////////////////////////////////////////////////////
0082 //                                                                      //
0083 // TThreadPoolTask                                                      //
0084 // This is a supporting class for TThreadPool.                          //
0085 // It wraps users task objects in order to pass tasks arguments in      //
0086 // type-safe way.                                                       //
0087 //                                                                      //
0088 //////////////////////////////////////////////////////////////////////////
0089 template <class aTask, class aParam>
0090 class TThreadPoolTask {
0091 public:
0092    typedef TThreadPoolTaskImp<aTask, aParam> task_t;
0093 
0094 public:
0095    TThreadPoolTask(task_t &task, aParam &param):
0096       fTask(task),
0097       fTaskParam(param) {
0098    }
0099    bool run() {
0100       return fTask.run(fTaskParam);
0101    }
0102 
0103 private:
0104    task_t &fTask;
0105    aParam fTaskParam;
0106 };
0107 
0108 //////////////////////////////////////////////////////////////////////////
0109 //                                                                      //
0110 // TThreadPool                                                          //
0111 // This class implement a simple Thread Pool pattern.                   //
0112 // So far it supports only one type of queue - FIFO                     //
0113 //                                                                      //
0114 // Please see the tutorial "tutorials/thread/threadPool.C" for          //
0115 // more details on how to use TThreadPool.                              //
0116 //                                                                      //
0117 //////////////////////////////////////////////////////////////////////////
0118 template <class aTask, class aParam>
0119 class TThreadPool : public TNonCopyable {
0120 
0121    typedef TThreadPoolTask<aTask, aParam> task_t;
0122    typedef std::queue<task_t*>            taskqueue_t;
0123    typedef std::vector<TThread*>          threads_array_t;
0124 
0125 public:
0126    TThreadPool(size_t threadsCount, bool needDbg = false):
0127       fStopped(false),
0128       fSuccessfulTasks(0),
0129       fTasksCount(0),
0130       fIdleThreads(threadsCount),
0131       fSilent(!needDbg) {
0132       fThreadNeeded = new TCondition(&fMutex);
0133       fThreadAvailable = new TCondition(&fMutex);
0134       fAllTasksDone = new TCondition(&fMutexAllTasksDone);
0135 
0136       for (size_t i = 0; i < threadsCount; ++i) {
0137          TThread *pThread = new TThread(&TThreadPool::Executor, this);
0138          fThreads.push_back(pThread);
0139          pThread->Run();
0140       }
0141 
0142       fThreadJoinHelper = new TThread(&TThreadPool::JoinHelper, this);
0143 
0144       if (needDbg) {
0145          fThreadMonitor = new TThread(&TThreadPool::Monitor, this);
0146          fThreadMonitor->Run();
0147       }
0148    }
0149 
0150    ~TThreadPool() {
0151       Stop();
0152       // deleting threads
0153       threads_array_t::const_iterator iter = fThreads.begin();
0154       threads_array_t::const_iterator iter_end = fThreads.end();
0155       for (; iter != iter_end; ++iter)
0156          delete(*iter);
0157 
0158       delete fThreadJoinHelper;
0159 
0160       delete fThreadNeeded;
0161       delete fThreadAvailable;
0162       delete fAllTasksDone;
0163    }
0164 
0165    void AddThread() {
0166       TLockGuard lock(&fMutex);
0167       TThread *pThread = new TThread(&TThreadPool::Executor, this);
0168       fThreads.push_back(pThread);
0169       pThread->Run();
0170       ++fIdleThreads;
0171    }
0172 
0173    void PushTask(typename TThreadPoolTask<aTask, aParam>::task_t &task, aParam param) {
0174       {
0175          DbgLog("Main thread. Try to push a task");
0176 
0177          TLockGuard lock(&fMutex);
0178          task_t *t = new task_t(task, param);
0179          fTasks.push(t);
0180          ++fTasksCount;
0181 
0182          DbgLog("Main thread. the task is pushed");
0183       }
0184       TLockGuard lock(&fMutex);
0185       fThreadNeeded->Broadcast();
0186    }
0187 
0188    void Stop(bool processRemainingJobs = false) {
0189       // prevent more jobs from being added to the queue
0190       if (fStopped)
0191          return;
0192 
0193       if (processRemainingJobs) {
0194          TLockGuard lock(&fMutex);
0195          // wait for queue to drain
0196          while (!fTasks.empty() && !fStopped) {
0197             DbgLog("Main thread is waiting");
0198             fThreadAvailable->Wait();
0199             DbgLog("Main thread is DONE waiting");
0200          }
0201       }
0202       // tell all threads to stop
0203       {
0204          TLockGuard lock(&fMutex);
0205          fStopped = true;
0206          fThreadNeeded->Broadcast();
0207          DbgLog("Main threads requests to STOP");
0208       }
0209 
0210       // Waiting for all threads to complete
0211       fThreadJoinHelper->Run();
0212       fThreadJoinHelper->Join();
0213    }
0214 
0215    void Drain() {
0216       // This method stops the calling thread until the task queue is empty
0217 
0218       TLockGuard lock(&fMutexAllTasksDone);
0219       fAllTasksDone->Wait();
0220    }
0221 
0222    size_t TasksCount() const {
0223       return fTasksCount;
0224    }
0225 
0226    size_t SuccessfulTasks() const {
0227       return fSuccessfulTasks;
0228    }
0229 
0230    size_t IdleThreads() const {
0231       return fIdleThreads;
0232    }
0233 
0234 private:
0235    static void* Monitor(void *arg) {
0236       if (NULL == arg)
0237          return NULL;
0238 
0239       TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
0240       while (true && !pThis->fStopped) {
0241          std::stringstream ss;
0242          ss
0243                << ">>>> Check for tasks."
0244                << " Number of Tasks: " << pThis->fTasks.size()
0245                << "; Idle threads: " << pThis->IdleThreads();
0246          pThis->DbgLog(ss.str());
0247          sleep(1);
0248       }
0249       return NULL;
0250    }
0251 
0252    static void* Executor(void *arg) {
0253       TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
0254 
0255       while (!pThis->fStopped) {
0256          task_t *task(NULL);
0257 
0258          // There is a task, let's take it
0259          {
0260             // Find a task to perform
0261             TLockGuard lock(&pThis->fMutex);
0262             if (pThis->fTasks.empty() && !pThis->fStopped) {
0263                pThis->DbgLog("waiting for a task");
0264 
0265                if (pThis->fThreads.size() == pThis->fIdleThreads) {
0266                   TLockGuard l(&pThis->fMutexAllTasksDone);
0267                   pThis->fAllTasksDone->Broadcast();
0268                }
0269 
0270                // No tasks, we wait for a task to come
0271                pThis->fThreadNeeded->Wait();
0272 
0273                pThis->DbgLog("done waiting for tasks");
0274             }
0275          }
0276 
0277          {
0278             TLockGuard lock(&pThis->fMutex);
0279             if (!pThis->fTasks.empty()) {
0280                --pThis->fIdleThreads;
0281                task = pThis->fTasks.front();
0282                pThis->fTasks.pop();
0283 
0284                pThis->DbgLog("get the task");
0285             } else if (pThis->fThreads.size() == pThis->fIdleThreads) {
0286                TLockGuard l(&pThis->fMutexAllTasksDone);
0287                pThis->fAllTasksDone->Broadcast();
0288             }
0289             pThis->DbgLog("done Check <<<<");
0290          }
0291 
0292          // Execute the task
0293          if (task) {
0294             pThis->DbgLog("Run the task");
0295 
0296             if (task->run()) {
0297                TLockGuard lock(&pThis->fMutex);
0298                ++pThis->fSuccessfulTasks;
0299             }
0300             delete task;
0301             task = NULL;
0302 
0303             TLockGuard lock(&pThis->fMutex);
0304             ++pThis->fIdleThreads;
0305 
0306             pThis->DbgLog("Done Running the task");
0307          }
0308          // Task is done, report that the thread is free
0309          TLockGuard lock(&pThis->fMutex);
0310          pThis->fThreadAvailable->Broadcast();
0311       }
0312 
0313       pThis->DbgLog("**** DONE ***");
0314       return NULL;
0315    }
0316 
0317    static void *JoinHelper(void *arg) {
0318       TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
0319       threads_array_t::const_iterator iter = pThis->fThreads.begin();
0320       threads_array_t::const_iterator iter_end = pThis->fThreads.end();
0321       for (; iter != iter_end; ++iter)
0322          (*iter)->Join();
0323 
0324       return NULL;
0325    }
0326 
0327    static bool IsThreadActive(TThread *pThread) {
0328       // so far we consider only kRunningState as activity
0329       return (pThread->GetState() == TThread::kRunningState);
0330    }
0331 
0332    void DbgLog(const std::string &msg) {
0333       if (fSilent)
0334          return;
0335       TLockGuard lock(&fDbgOutputMutex);
0336       std::cout << "[" << TThread::SelfId() << "] " << msg << std::endl;
0337    }
0338 
0339 private:
0340    taskqueue_t     fTasks;
0341    TMutex          fMutex;
0342    TCondition     *fThreadNeeded;
0343    TCondition     *fThreadAvailable;
0344    TMutex         fMutexAllTasksDone;
0345    TCondition     *fAllTasksDone;
0346    threads_array_t fThreads;
0347    TThread        *fThreadJoinHelper;
0348    TThread        *fThreadMonitor;
0349    volatile bool   fStopped;
0350    size_t          fSuccessfulTasks;
0351    size_t          fTasksCount;
0352    size_t          fIdleThreads;
0353    TMutex          fDbgOutputMutex;
0354    bool            fSilent; // No DBG messages
0355 };
0356 
0357 #endif