File indexing completed on 2025-01-18 09:57:47
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032 #pragma once
0033
0034 #include "PTL/AutoLock.hh"
0035 #ifndef G4GMAKE
0036 #include "PTL/Config.hh"
0037 #endif
0038 #include "PTL/Globals.hh"
0039 #include "PTL/JoinFunction.hh"
0040 #include "PTL/Task.hh"
0041 #include "PTL/ThreadData.hh"
0042 #include "PTL/ThreadPool.hh"
0043 #include "PTL/Threading.hh"
0044 #include "PTL/Utility.hh"
0045 #include "PTL/VTask.hh"
0046 #include "PTL/VUserTaskQueue.hh"
0047
0048 #include <atomic>
0049 #include <chrono>
0050 #include <cstdint>
0051 #include <cstdio>
0052 #include <functional>
0053 #include <future>
0054 #include <iostream>
0055 #include <memory>
0056 #include <mutex>
0057 #include <stdexcept>
0058 #include <thread>
0059 #include <type_traits>
0060 #include <utility>
0061 #include <vector>
0062
0063 #if defined(PTL_USE_TBB)
0064 # include <tbb/task_group.h>
0065 #endif
0066
0067 namespace PTL
0068 {
0069 namespace internal
0070 {
0071 std::atomic_uintmax_t&
0072 task_group_counter();
0073
0074 ThreadPool*
0075 get_default_threadpool();
0076
0077 intmax_t
0078 get_task_depth();
0079 }
0080
0081 template <typename Tp, typename Arg = Tp, intmax_t MaxDepth = 0>
0082 class TaskGroup
0083 {
0084 public:
0085
0086 template <typename Up>
0087 using container_type = std::vector<Up>;
0088
0089 using tid_type = std::thread::id;
0090 using size_type = uintmax_t;
0091 using lock_t = Mutex;
0092 using atomic_int = std::atomic_intmax_t;
0093 using atomic_uint = std::atomic_uintmax_t;
0094 using condition_t = Condition;
0095 using ArgTp = decay_t<Arg>;
0096 using result_type = Tp;
0097 using task_pointer = std::shared_ptr<TaskFuture<ArgTp>>;
0098 using task_list_t = container_type<task_pointer>;
0099 using this_type = TaskGroup<Tp, Arg, MaxDepth>;
0100 using promise_type = std::promise<ArgTp>;
0101 using future_type = std::future<ArgTp>;
0102 using packaged_task_type = std::packaged_task<ArgTp()>;
0103 using future_list_t = container_type<future_type>;
0104 using join_type = typename JoinFunction<Tp, Arg>::Type;
0105 using iterator = typename future_list_t::iterator;
0106 using reverse_iterator = typename future_list_t::reverse_iterator;
0107 using const_iterator = typename future_list_t::const_iterator;
0108 using const_reverse_iterator = typename future_list_t::const_reverse_iterator;
0109
0110 template <typename... Args>
0111 using task_type = Task<ArgTp, decay_t<Args>...>;
0112
0113
0114 public:
0115
0116 template <typename Func>
0117 TaskGroup(Func&& _join, ThreadPool* _tp = internal::get_default_threadpool());
0118
0119 template <typename Up = Tp>
0120 TaskGroup(ThreadPool* _tp = internal::get_default_threadpool(),
0121 enable_if_t<std::is_void<Up>::value, int> = 0);
0122
0123
0124 ~TaskGroup();
0125
0126
0127 TaskGroup(const this_type&) = delete;
0128
0129
0130 TaskGroup(this_type&& rhs) = default;
0131
0132 TaskGroup& operator=(const this_type& rhs) = delete;
0133
0134
0135 TaskGroup& operator=(this_type&& rhs) = default;
0136
0137 public:
0138 template <typename Up>
0139 std::shared_ptr<Up> operator+=(std::shared_ptr<Up>&& _task);
0140
0141
0142 void wait();
0143
0144
0145 intmax_t operator++() { return ++(m_tot_task_count); }
0146 intmax_t operator++(int) { return (m_tot_task_count)++; }
0147 intmax_t operator--() { return --(m_tot_task_count); }
0148 intmax_t operator--(int) { return (m_tot_task_count)--; }
0149
0150
0151 intmax_t size() const { return m_tot_task_count.load(); }
0152
0153
0154 lock_t& task_lock() { return m_task_lock; }
0155 condition_t& task_cond() { return m_task_cond; }
0156
0157
0158 uintmax_t id() const { return m_id; }
0159
0160
0161 void set_pool(ThreadPool* tp) { m_pool = tp; }
0162 ThreadPool*& pool() { return m_pool; }
0163 ThreadPool* pool() const { return m_pool; }
0164
0165 bool is_native_task_group() const { return (m_tbb_task_group) == nullptr; }
0166 bool is_main() const { return this_tid() == m_main_tid; }
0167
0168
0169 intmax_t pending() { return m_tot_task_count.load(); }
0170
0171 static void set_verbose(int level) { f_verbose = level; }
0172
0173 ScopeDestructor get_scope_destructor();
0174
0175 void notify();
0176 void notify_all();
0177
0178 void reserve(size_t _n)
0179 {
0180 m_task_list.reserve(_n);
0181 m_future_list.reserve(_n);
0182 }
0183
0184 public:
0185 template <typename Func, typename... Args>
0186 std::shared_ptr<task_type<Args...>> wrap(Func func, Args... args)
0187 {
0188 return operator+=(std::make_shared<task_type<Args...>>(
0189 is_native_task_group(), m_depth, std::move(func), std::move(args)...));
0190 }
0191
0192 template <typename Func, typename... Args, typename Up = Tp>
0193 enable_if_t<std::is_void<Up>::value, void> exec(Func func, Args... args);
0194
0195 template <typename Func, typename... Args, typename Up = Tp>
0196 enable_if_t<!std::is_void<Up>::value, void> exec(Func func, Args... args);
0197
0198 template <typename Func, typename... Args>
0199 void run(Func func, Args... args)
0200 {
0201 exec(std::move(func), std::move(args)...);
0202 }
0203
0204 protected:
0205 template <typename Up, typename Func, typename... Args>
0206 enable_if_t<std::is_void<Up>::value, void> local_exec(Func func, Args... args);
0207
0208 template <typename Up, typename Func, typename... Args>
0209 enable_if_t<!std::is_void<Up>::value, void> local_exec(Func func, Args... args);
0210
0211
0212 using itr_t = iterator;
0213 using citr_t = const_iterator;
0214 using ritr_t = reverse_iterator;
0215 using critr_t = const_reverse_iterator;
0216
0217 public:
0218
0219
0220
0221 future_list_t& get_tasks() { return m_future_list; }
0222 const future_list_t& get_tasks() const { return m_future_list; }
0223
0224
0225
0226
0227 itr_t begin() { return m_future_list.begin(); }
0228 itr_t end() { return m_future_list.end(); }
0229 citr_t begin() const { return m_future_list.begin(); }
0230 citr_t end() const { return m_future_list.end(); }
0231 citr_t cbegin() const { return m_future_list.begin(); }
0232 citr_t cend() const { return m_future_list.end(); }
0233 ritr_t rbegin() { return m_future_list.rbegin(); }
0234 ritr_t rend() { return m_future_list.rend(); }
0235 critr_t rbegin() const { return m_future_list.rbegin(); }
0236 critr_t rend() const { return m_future_list.rend(); }
0237
0238
0239
0240 template <typename Up = Tp, enable_if_t<!std::is_void<Up>::value, int> = 0>
0241 inline Up join(Up accum = {});
0242
0243
0244 template <typename Up = Tp, typename Rp = Arg,
0245 enable_if_t<std::is_void<Up>::value && std::is_void<Rp>::value, int> = 0>
0246 inline void join();
0247
0248
0249 template <typename Up = Tp, typename Rp = Arg,
0250 enable_if_t<std::is_void<Up>::value && !std::is_void<Rp>::value, int> = 0>
0251 inline void join();
0252
0253
0254 void clear();
0255
0256 protected:
0257
0258
0259 static tid_type this_tid() { return std::this_thread::get_id(); }
0260
0261
0262
0263 atomic_int& task_count() { return m_tot_task_count; }
0264 const atomic_int& task_count() const { return m_tot_task_count; }
0265
0266 protected:
0267 static int f_verbose;
0268
0269 uintmax_t m_id = internal::task_group_counter()++;
0270 intmax_t m_depth = internal::get_task_depth();
0271 tid_type m_main_tid = std::this_thread::get_id();
0272 atomic_int m_tot_task_count{ 0 };
0273 lock_t m_task_lock = {};
0274 condition_t m_task_cond = {};
0275 join_type m_join{};
0276 ThreadPool* m_pool = internal::get_default_threadpool();
0277 tbb_task_group_t* m_tbb_task_group = nullptr;
0278 task_list_t m_task_list = {};
0279 future_list_t m_future_list = {};
0280
0281 private:
0282 void internal_update();
0283 };
0284
0285 }
0286 namespace PTL
0287 {
0288 template <typename Tp, typename Arg, intmax_t MaxDepth>
0289 template <typename Func>
0290 TaskGroup<Tp, Arg, MaxDepth>::TaskGroup(Func&& _join, ThreadPool* _tp)
0291 : m_join{ std::forward<Func>(_join) }
0292 , m_pool{ _tp }
0293 {
0294 internal_update();
0295 }
0296
0297 template <typename Tp, typename Arg, intmax_t MaxDepth>
0298 template <typename Up>
0299 TaskGroup<Tp, Arg, MaxDepth>::TaskGroup(ThreadPool* _tp,
0300 enable_if_t<std::is_void<Up>::value, int>)
0301 : m_join{ []() {} }
0302 , m_pool{ _tp }
0303 {
0304 internal_update();
0305 }
0306
0307
0308 template <typename Tp, typename Arg, intmax_t MaxDepth>
0309 TaskGroup<Tp, Arg, MaxDepth>::~TaskGroup()
0310 {
0311 {
0312
0313
0314
0315 AutoLock _lk{ m_task_lock, std::defer_lock };
0316 if(!_lk.owns_lock())
0317 _lk.lock();
0318 }
0319
0320 if(m_tbb_task_group)
0321 {
0322 auto* _arena = m_pool->get_task_arena();
0323 _arena->execute([this]() { this->m_tbb_task_group->wait(); });
0324 }
0325 delete m_tbb_task_group;
0326 this->clear();
0327 }
0328
0329 template <typename Tp, typename Arg, intmax_t MaxDepth>
0330 template <typename Up>
0331 std::shared_ptr<Up>
0332 TaskGroup<Tp, Arg, MaxDepth>::operator+=(std::shared_ptr<Up>&& _task)
0333 {
0334
0335 operator++();
0336
0337 m_task_list.push_back(_task);
0338
0339 return std::move(_task);
0340 }
0341
0342 template <typename Tp, typename Arg, intmax_t MaxDepth>
0343 void
0344 TaskGroup<Tp, Arg, MaxDepth>::wait()
0345 {
0346 auto _dtor = ScopeDestructor{ [&]() {
0347 if(m_tbb_task_group)
0348 {
0349 auto* _arena = m_pool->get_task_arena();
0350 _arena->execute([this]() { this->m_tbb_task_group->wait(); });
0351 }
0352 } };
0353
0354 ThreadData* data = ThreadData::GetInstance();
0355 if(!data)
0356 return;
0357
0358
0359 if(!m_pool)
0360 {
0361
0362 m_pool = internal::get_default_threadpool();
0363
0364
0365 if(!m_pool)
0366 {
0367 if(f_verbose > 0)
0368 {
0369 fprintf(stderr, "%s @ %i :: Warning! nullptr to thread-pool (%p)\n",
0370 __FUNCTION__, __LINE__, static_cast<void*>(m_pool));
0371 std::cerr << __FUNCTION__ << "@" << __LINE__ << " :: Warning! "
0372 << "nullptr to thread pool!" << std::endl;
0373 }
0374 return;
0375 }
0376 }
0377
0378 ThreadPool* tpool = (m_pool) ? m_pool : data->thread_pool;
0379 VUserTaskQueue* taskq = (tpool) ? tpool->get_queue() : data->current_queue;
0380
0381 bool _is_main = data->is_main;
0382 bool _within_task = data->within_task;
0383
0384 auto is_active_state = [&]() {
0385 return (tpool->state()->load(std::memory_order_relaxed) !=
0386 thread_pool::state::STOPPED);
0387 };
0388
0389 auto execute_this_threads_tasks = [&]() {
0390 if(!taskq)
0391 return;
0392
0393
0394 if((!_is_main || tpool->size() < 2) && _within_task)
0395 {
0396 int bin = static_cast<int>(taskq->GetThreadBin());
0397
0398
0399 while(this->pending() > 0)
0400 {
0401 if(!taskq->empty())
0402 {
0403 auto _task = taskq->GetTask(bin);
0404 if(_task)
0405 (*_task)();
0406 }
0407 }
0408 }
0409 };
0410
0411
0412 if(!is_native_task_group())
0413 {
0414
0415 if(!_is_main || tpool->size() < 2)
0416 return;
0417 }
0418 else if(f_verbose > 0)
0419 {
0420 if(!tpool || !taskq)
0421 {
0422
0423 fprintf(stderr,
0424 "%s @ %i :: Warning! nullptr to thread data (%p) or task-queue "
0425 "(%p)\n",
0426 __FUNCTION__, __LINE__, static_cast<void*>(tpool),
0427 static_cast<void*>(taskq));
0428 }
0429
0430 else if(is_native_task_group() && !tpool->is_alive())
0431 {
0432 fprintf(stderr, "%s @ %i :: Warning! thread-pool is not alive!\n",
0433 __FUNCTION__, __LINE__);
0434 }
0435 else if(!is_active_state())
0436 {
0437 fprintf(stderr, "%s @ %i :: Warning! thread-pool is not active!\n",
0438 __FUNCTION__, __LINE__);
0439 }
0440 }
0441
0442 intmax_t wake_size = 2;
0443 AutoLock _lock(m_task_lock, std::defer_lock);
0444
0445 while(is_active_state())
0446 {
0447 execute_this_threads_tasks();
0448
0449
0450 while(_is_main && pending() > 0 && is_active_state())
0451 {
0452
0453
0454
0455
0456
0457 if(!_lock.owns_lock())
0458 _lock.lock();
0459
0460
0461
0462
0463 if(pending() >= wake_size)
0464 {
0465 m_task_cond.wait(_lock);
0466 }
0467 else
0468 {
0469 m_task_cond.wait_for(_lock, std::chrono::microseconds(100));
0470 }
0471
0472 if(_lock.owns_lock())
0473 _lock.unlock();
0474 }
0475
0476
0477 if(pending() <= 0)
0478 break;
0479 }
0480
0481 if(_lock.owns_lock())
0482 _lock.unlock();
0483
0484 intmax_t ntask = this->task_count().load();
0485 if(ntask > 0)
0486 {
0487 std::stringstream ss;
0488 ss << "\nWarning! Join operation issue! " << ntask << " tasks still "
0489 << "are running!" << std::endl;
0490 std::cerr << ss.str();
0491 this->wait();
0492 }
0493 }
0494
0495 template <typename Tp, typename Arg, intmax_t MaxDepth>
0496 ScopeDestructor
0497 TaskGroup<Tp, Arg, MaxDepth>::get_scope_destructor()
0498 {
0499 auto& _counter = m_tot_task_count;
0500 auto& _task_cond = task_cond();
0501 auto& _task_lock = task_lock();
0502 return ScopeDestructor{ [&_task_cond, &_task_lock, &_counter]() {
0503 auto _count = --(_counter);
0504 if(_count < 1)
0505 {
0506 AutoLock _lk{ _task_lock };
0507 _task_cond.notify_all();
0508 }
0509 } };
0510 }
0511
0512 template <typename Tp, typename Arg, intmax_t MaxDepth>
0513 void
0514 TaskGroup<Tp, Arg, MaxDepth>::notify()
0515 {
0516 AutoLock _lk{ m_task_lock };
0517 m_task_cond.notify_one();
0518 }
0519
0520 template <typename Tp, typename Arg, intmax_t MaxDepth>
0521 void
0522 TaskGroup<Tp, Arg, MaxDepth>::notify_all()
0523 {
0524 AutoLock _lk{ m_task_lock };
0525 m_task_cond.notify_all();
0526 }
0527
0528 template <typename Tp, typename Arg, intmax_t MaxDepth>
0529 template <typename Func, typename... Args, typename Up>
0530 enable_if_t<std::is_void<Up>::value, void>
0531 TaskGroup<Tp, Arg, MaxDepth>::exec(Func func, Args... args)
0532 {
0533 if(MaxDepth > 0 && !m_tbb_task_group && ThreadData::GetInstance() &&
0534 ThreadData::GetInstance()->task_depth > MaxDepth)
0535 {
0536 local_exec<Tp>(std::move(func), std::move(args)...);
0537 }
0538 else
0539 {
0540 auto& _counter = m_tot_task_count;
0541 auto& _task_cond = task_cond();
0542 auto& _task_lock = task_lock();
0543 auto _task = wrap([&_task_cond, &_task_lock, &_counter, func, args...]() {
0544 auto* _tdata = ThreadData::GetInstance();
0545 if(_tdata)
0546 ++(_tdata->task_depth);
0547 func(args...);
0548 auto _count = --(_counter);
0549 if(_tdata)
0550 --(_tdata->task_depth);
0551 if(_count < 1)
0552 {
0553 AutoLock _lk{ _task_lock };
0554 _task_cond.notify_all();
0555 }
0556 });
0557
0558 if(m_tbb_task_group)
0559 {
0560 auto* _arena = m_pool->get_task_arena();
0561 auto* _tbb_task_group = m_tbb_task_group;
0562 auto* _ptask = _task.get();
0563 _arena->execute([_tbb_task_group, _ptask]() {
0564 _tbb_task_group->run([_ptask]() { (*_ptask)(); });
0565 });
0566 }
0567 else
0568 {
0569 m_pool->add_task(std::move(_task));
0570 }
0571 }
0572 }
0573 template <typename Tp, typename Arg, intmax_t MaxDepth>
0574 template <typename Func, typename... Args, typename Up>
0575 enable_if_t<!std::is_void<Up>::value, void>
0576 TaskGroup<Tp, Arg, MaxDepth>::exec(Func func, Args... args)
0577 {
0578 if(MaxDepth > 0 && !m_tbb_task_group && ThreadData::GetInstance() &&
0579 ThreadData::GetInstance()->task_depth > MaxDepth)
0580 {
0581 local_exec<Tp>(std::move(func), std::move(args)...);
0582 }
0583 else
0584 {
0585 auto& _counter = m_tot_task_count;
0586 auto& _task_cond = task_cond();
0587 auto& _task_lock = task_lock();
0588 auto _task = wrap([&_task_cond, &_task_lock, &_counter, func, args...]() {
0589 auto* _tdata = ThreadData::GetInstance();
0590 if(_tdata)
0591 ++(_tdata->task_depth);
0592 auto&& _ret = func(args...);
0593 auto _count = --(_counter);
0594 if(_tdata)
0595 --(_tdata->task_depth);
0596 if(_count < 1)
0597 {
0598 AutoLock _lk{ _task_lock };
0599 _task_cond.notify_all();
0600 }
0601 return std::forward<decltype(_ret)>(_ret);
0602 });
0603
0604 if(m_tbb_task_group)
0605 {
0606 auto* _arena = m_pool->get_task_arena();
0607 auto* _tbb_task_group = m_tbb_task_group;
0608 auto* _ptask = _task.get();
0609 _arena->execute([_tbb_task_group, _ptask]() {
0610 _tbb_task_group->run([_ptask]() { (*_ptask)(); });
0611 });
0612 }
0613 else
0614 {
0615 m_pool->add_task(std::move(_task));
0616 }
0617 }
0618 }
0619
0620 template <typename Tp, typename Arg, intmax_t MaxDepth>
0621 template <typename Up, typename Func, typename... Args>
0622 enable_if_t<std::is_void<Up>::value, void>
0623 TaskGroup<Tp, Arg, MaxDepth>::local_exec(Func func, Args... args)
0624 {
0625 auto* _tdata = ThreadData::GetInstance();
0626 if(_tdata)
0627 ++(_tdata->task_depth);
0628 promise_type _p{};
0629 m_future_list.emplace_back(_p.get_future());
0630 func(args...);
0631 _p.set_value();
0632 if(_tdata)
0633 --(_tdata->task_depth);
0634 }
0635
0636 template <typename Tp, typename Arg, intmax_t MaxDepth>
0637 template <typename Up, typename Func, typename... Args>
0638 enable_if_t<!std::is_void<Up>::value, void>
0639 TaskGroup<Tp, Arg, MaxDepth>::local_exec(Func func, Args... args)
0640 {
0641 auto* _tdata = ThreadData::GetInstance();
0642 if(_tdata)
0643 ++(_tdata->task_depth);
0644 promise_type _p{};
0645 m_future_list.emplace_back(_p.get_future());
0646 _p.set_value(func(args...));
0647 if(_tdata)
0648 --(_tdata->task_depth);
0649 }
0650
0651 template <typename Tp, typename Arg, intmax_t MaxDepth>
0652 template <typename Up, enable_if_t<!std::is_void<Up>::value, int>>
0653 inline Up
0654 TaskGroup<Tp, Arg, MaxDepth>::join(Up accum)
0655 {
0656 this->wait();
0657 for(auto& itr : m_task_list)
0658 {
0659 using RetT = decay_t<decltype(itr->get())>;
0660 accum = std::move(m_join(std::ref(accum), std::forward<RetT>(itr->get())));
0661 }
0662 for(auto& itr : m_future_list)
0663 {
0664 using RetT = decay_t<decltype(itr.get())>;
0665 accum = std::move(m_join(std::ref(accum), std::forward<RetT>(itr.get())));
0666 }
0667 this->clear();
0668 return accum;
0669 }
0670
0671 template <typename Tp, typename Arg, intmax_t MaxDepth>
0672 template <typename Up, typename Rp,
0673 enable_if_t<std::is_void<Up>::value && std::is_void<Rp>::value, int>>
0674 inline void
0675 TaskGroup<Tp, Arg, MaxDepth>::join()
0676 {
0677 this->wait();
0678 for(auto& itr : m_task_list)
0679 itr->get();
0680 for(auto& itr : m_future_list)
0681 itr.get();
0682 m_join();
0683 this->clear();
0684 }
0685
0686 template <typename Tp, typename Arg, intmax_t MaxDepth>
0687 template <typename Up, typename Rp,
0688 enable_if_t<std::is_void<Up>::value && !std::is_void<Rp>::value, int>>
0689 inline void
0690 TaskGroup<Tp, Arg, MaxDepth>::join()
0691 {
0692 this->wait();
0693 for(auto& itr : m_task_list)
0694 {
0695 using RetT = decay_t<decltype(itr->get())>;
0696 m_join(std::forward<RetT>(itr->get()));
0697 }
0698 for(auto& itr : m_future_list)
0699 {
0700 using RetT = decay_t<decltype(itr.get())>;
0701 m_join(std::forward<RetT>(itr.get()));
0702 }
0703 this->clear();
0704 }
0705
0706 template <typename Tp, typename Arg, intmax_t MaxDepth>
0707 void
0708 TaskGroup<Tp, Arg, MaxDepth>::clear()
0709 {
0710 m_future_list.clear();
0711 m_task_list.clear();
0712 }
0713
0714 template <typename Tp, typename Arg, intmax_t MaxDepth>
0715 void
0716 TaskGroup<Tp, Arg, MaxDepth>::internal_update()
0717 {
0718 if(!m_pool)
0719 m_pool = internal::get_default_threadpool();
0720
0721 if(!m_pool)
0722 {
0723 std::stringstream ss{};
0724 ss << "[TaskGroup]> " << __FUNCTION__ << "@" << __LINE__
0725 << " :: nullptr to thread pool";
0726 throw std::runtime_error(ss.str());
0727 }
0728
0729 if(m_pool->is_tbb_threadpool())
0730 {
0731 m_tbb_task_group = new tbb_task_group_t{};
0732 }
0733 }
0734
0735 template <typename Tp, typename Arg, intmax_t MaxDepth>
0736 int TaskGroup<Tp, Arg, MaxDepth>::f_verbose = GetEnv<int>("PTL_VERBOSE", 0);
0737
0738 }