Warning, file /include/root/TThreadPool.h was not indexed
or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 #ifndef ROOT_TThreadPool
0013 #define ROOT_TThreadPool
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023 #include "RtypesCore.h"
0024 #include "TMutex.h"
0025 #include "TCondition.h"
0026 #include "TThread.h"
0027
0028 #include <queue>
0029 #include <vector>
0030 #include <iostream>
0031 #include <sstream>
0032 #include <utility>
0033 #ifdef _MSC_VER
0034 #define sleep(s) _sleep(s)
0035 #else
0036 #include <unistd.h>
0037 #endif
0038
0039
0040
0041
0042
0043
0044
0045
0046 class TNonCopyable {
0047 protected:
0048 TNonCopyable() { }
0049 ~TNonCopyable() { }
0050 private:
0051 TNonCopyable(const TNonCopyable&);
0052 const TNonCopyable& operator=(const TNonCopyable&);
0053 };
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071
0072 template <class aTask, class aParam>
0073 class TThreadPoolTaskImp {
0074 public:
0075 bool run(aParam ¶m) {
0076 aTask *pThis = reinterpret_cast<aTask *>(this);
0077 return pThis->runTask(param);
0078 }
0079 };
0080
0081
0082
0083
0084
0085
0086
0087
0088
0089 template <class aTask, class aParam>
0090 class TThreadPoolTask {
0091 public:
0092 typedef TThreadPoolTaskImp<aTask, aParam> task_t;
0093
0094 public:
0095 TThreadPoolTask(task_t &task, aParam ¶m):
0096 fTask(task),
0097 fTaskParam(param) {
0098 }
0099 bool run() {
0100 return fTask.run(fTaskParam);
0101 }
0102
0103 private:
0104 task_t &fTask;
0105 aParam fTaskParam;
0106 };
0107
0108
0109
0110
0111
0112
0113
0114
0115
0116
0117
0118 template <class aTask, class aParam>
0119 class TThreadPool : public TNonCopyable {
0120
0121 typedef TThreadPoolTask<aTask, aParam> task_t;
0122 typedef std::queue<task_t*> taskqueue_t;
0123 typedef std::vector<TThread*> threads_array_t;
0124
0125 public:
0126 TThreadPool(size_t threadsCount, bool needDbg = false):
0127 fStopped(false),
0128 fSuccessfulTasks(0),
0129 fTasksCount(0),
0130 fIdleThreads(threadsCount),
0131 fSilent(!needDbg) {
0132 fThreadNeeded = new TCondition(&fMutex);
0133 fThreadAvailable = new TCondition(&fMutex);
0134 fAllTasksDone = new TCondition(&fMutexAllTasksDone);
0135
0136 for (size_t i = 0; i < threadsCount; ++i) {
0137 TThread *pThread = new TThread(&TThreadPool::Executor, this);
0138 fThreads.push_back(pThread);
0139 pThread->Run();
0140 }
0141
0142 fThreadJoinHelper = new TThread(&TThreadPool::JoinHelper, this);
0143
0144 if (needDbg) {
0145 fThreadMonitor = new TThread(&TThreadPool::Monitor, this);
0146 fThreadMonitor->Run();
0147 }
0148 }
0149
0150 ~TThreadPool() {
0151 Stop();
0152
0153 threads_array_t::const_iterator iter = fThreads.begin();
0154 threads_array_t::const_iterator iter_end = fThreads.end();
0155 for (; iter != iter_end; ++iter)
0156 delete(*iter);
0157
0158 delete fThreadJoinHelper;
0159
0160 delete fThreadNeeded;
0161 delete fThreadAvailable;
0162 delete fAllTasksDone;
0163 }
0164
0165 void AddThread() {
0166 TLockGuard lock(&fMutex);
0167 TThread *pThread = new TThread(&TThreadPool::Executor, this);
0168 fThreads.push_back(pThread);
0169 pThread->Run();
0170 ++fIdleThreads;
0171 }
0172
0173 void PushTask(typename TThreadPoolTask<aTask, aParam>::task_t &task, aParam param) {
0174 {
0175 DbgLog("Main thread. Try to push a task");
0176
0177 TLockGuard lock(&fMutex);
0178 task_t *t = new task_t(task, param);
0179 fTasks.push(t);
0180 ++fTasksCount;
0181
0182 DbgLog("Main thread. the task is pushed");
0183 }
0184 TLockGuard lock(&fMutex);
0185 fThreadNeeded->Broadcast();
0186 }
0187
0188 void Stop(bool processRemainingJobs = false) {
0189
0190 if (fStopped)
0191 return;
0192
0193 if (processRemainingJobs) {
0194 TLockGuard lock(&fMutex);
0195
0196 while (!fTasks.empty() && !fStopped) {
0197 DbgLog("Main thread is waiting");
0198 fThreadAvailable->Wait();
0199 DbgLog("Main thread is DONE waiting");
0200 }
0201 }
0202
0203 {
0204 TLockGuard lock(&fMutex);
0205 fStopped = true;
0206 fThreadNeeded->Broadcast();
0207 DbgLog("Main threads requests to STOP");
0208 }
0209
0210
0211 fThreadJoinHelper->Run();
0212 fThreadJoinHelper->Join();
0213 }
0214
0215 void Drain() {
0216
0217
0218 TLockGuard lock(&fMutexAllTasksDone);
0219 fAllTasksDone->Wait();
0220 }
0221
0222 size_t TasksCount() const {
0223 return fTasksCount;
0224 }
0225
0226 size_t SuccessfulTasks() const {
0227 return fSuccessfulTasks;
0228 }
0229
0230 size_t IdleThreads() const {
0231 return fIdleThreads;
0232 }
0233
0234 private:
0235 static void* Monitor(void *arg) {
0236 if (NULL == arg)
0237 return NULL;
0238
0239 TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
0240 while (true && !pThis->fStopped) {
0241 std::stringstream ss;
0242 ss
0243 << ">>>> Check for tasks."
0244 << " Number of Tasks: " << pThis->fTasks.size()
0245 << "; Idle threads: " << pThis->IdleThreads();
0246 pThis->DbgLog(ss.str());
0247 sleep(1);
0248 }
0249 return NULL;
0250 }
0251
0252 static void* Executor(void *arg) {
0253 TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
0254
0255 while (!pThis->fStopped) {
0256 task_t *task(NULL);
0257
0258
0259 {
0260
0261 TLockGuard lock(&pThis->fMutex);
0262 if (pThis->fTasks.empty() && !pThis->fStopped) {
0263 pThis->DbgLog("waiting for a task");
0264
0265 if (pThis->fThreads.size() == pThis->fIdleThreads) {
0266 TLockGuard l(&pThis->fMutexAllTasksDone);
0267 pThis->fAllTasksDone->Broadcast();
0268 }
0269
0270
0271 pThis->fThreadNeeded->Wait();
0272
0273 pThis->DbgLog("done waiting for tasks");
0274 }
0275 }
0276
0277 {
0278 TLockGuard lock(&pThis->fMutex);
0279 if (!pThis->fTasks.empty()) {
0280 --pThis->fIdleThreads;
0281 task = pThis->fTasks.front();
0282 pThis->fTasks.pop();
0283
0284 pThis->DbgLog("get the task");
0285 } else if (pThis->fThreads.size() == pThis->fIdleThreads) {
0286 TLockGuard l(&pThis->fMutexAllTasksDone);
0287 pThis->fAllTasksDone->Broadcast();
0288 }
0289 pThis->DbgLog("done Check <<<<");
0290 }
0291
0292
0293 if (task) {
0294 pThis->DbgLog("Run the task");
0295
0296 if (task->run()) {
0297 TLockGuard lock(&pThis->fMutex);
0298 ++pThis->fSuccessfulTasks;
0299 }
0300 delete task;
0301 task = NULL;
0302
0303 TLockGuard lock(&pThis->fMutex);
0304 ++pThis->fIdleThreads;
0305
0306 pThis->DbgLog("Done Running the task");
0307 }
0308
0309 TLockGuard lock(&pThis->fMutex);
0310 pThis->fThreadAvailable->Broadcast();
0311 }
0312
0313 pThis->DbgLog("**** DONE ***");
0314 return NULL;
0315 }
0316
0317 static void *JoinHelper(void *arg) {
0318 TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
0319 threads_array_t::const_iterator iter = pThis->fThreads.begin();
0320 threads_array_t::const_iterator iter_end = pThis->fThreads.end();
0321 for (; iter != iter_end; ++iter)
0322 (*iter)->Join();
0323
0324 return NULL;
0325 }
0326
0327 static bool IsThreadActive(TThread *pThread) {
0328
0329 return (pThread->GetState() == TThread::kRunningState);
0330 }
0331
0332 void DbgLog(const std::string &msg) {
0333 if (fSilent)
0334 return;
0335 TLockGuard lock(&fDbgOutputMutex);
0336 std::cout << "[" << TThread::SelfId() << "] " << msg << std::endl;
0337 }
0338
0339 private:
0340 taskqueue_t fTasks;
0341 TMutex fMutex;
0342 TCondition *fThreadNeeded;
0343 TCondition *fThreadAvailable;
0344 TMutex fMutexAllTasksDone;
0345 TCondition *fAllTasksDone;
0346 threads_array_t fThreads;
0347 TThread *fThreadJoinHelper;
0348 TThread *fThreadMonitor;
0349 volatile bool fStopped;
0350 size_t fSuccessfulTasks;
0351 size_t fTasksCount;
0352 size_t fIdleThreads;
0353 TMutex fDbgOutputMutex;
0354 bool fSilent;
0355 };
0356
0357 #endif