File indexing completed on 2025-09-16 08:55:37
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032 #pragma once
0033
0034 #include "PTL/AutoLock.hh"
0035 #ifndef G4GMAKE
0036 #include "PTL/Config.hh"
0037 #endif
0038 #include "PTL/JoinFunction.hh"
0039 #include "PTL/ScopeDestructor.hh"
0040 #include "PTL/Task.hh"
0041 #include "PTL/ThreadData.hh"
0042 #include "PTL/ThreadPool.hh"
0043 #include "PTL/Types.hh"
0044 #include "PTL/VTask.hh"
0045 #include "PTL/VUserTaskQueue.hh"
0046 #include "PTL/detail/CxxBackports.hh"
0047
0048 #include <atomic>
0049 #include <chrono>
0050 #include <cstdint>
0051 #include <cstdio>
0052 #include <functional>
0053 #include <future>
0054 #include <iostream>
0055 #include <memory>
0056 #include <mutex>
0057 #include <sstream> // IWYU pragma: keep
0058 #include <stdexcept>
0059 #include <thread>
0060 #include <type_traits>
0061 #include <utility>
0062 #include <vector>
0063
0064 #if defined(PTL_USE_TBB)
0065 # include <tbb/task_group.h> // IWYU pragma: keep
0066 #endif
0067
0068 namespace PTL
0069 {
0070 namespace internal
0071 {
0072 std::atomic_uintmax_t&
0073 task_group_counter();
0074
0075 ThreadPool*
0076 get_default_threadpool();
0077
0078 intmax_t
0079 get_task_depth();
0080 }
0081
0082 template <typename Tp, typename Arg = Tp, intmax_t MaxDepth = 0>
0083 class TaskGroup
0084 {
0085 public:
0086
0087 template <typename Up>
0088 using container_type = std::vector<Up>;
0089
0090 using tid_type = std::thread::id;
0091 using size_type = uintmax_t;
0092 using lock_t = Mutex;
0093 using atomic_int = std::atomic_intmax_t;
0094 using atomic_uint = std::atomic_uintmax_t;
0095 using condition_t = Condition;
0096 using ArgTp = decay_t<Arg>;
0097 using result_type = Tp;
0098 using task_pointer = std::shared_ptr<TaskFuture<ArgTp>>;
0099 using task_list_t = container_type<task_pointer>;
0100 using this_type = TaskGroup<Tp, Arg, MaxDepth>;
0101 using promise_type = std::promise<ArgTp>;
0102 using future_type = std::future<ArgTp>;
0103 using packaged_task_type = std::packaged_task<ArgTp()>;
0104 using future_list_t = container_type<future_type>;
0105 using join_type = typename JoinFunction<Tp, Arg>::Type;
0106 using iterator = typename future_list_t::iterator;
0107 using reverse_iterator = typename future_list_t::reverse_iterator;
0108 using const_iterator = typename future_list_t::const_iterator;
0109 using const_reverse_iterator = typename future_list_t::const_reverse_iterator;
0110
0111 template <typename... Args>
0112 using task_type = Task<ArgTp, decay_t<Args>...>;
0113
0114
0115 public:
0116
0117 template <typename Func>
0118 TaskGroup(Func&& _join, ThreadPool* _tp = internal::get_default_threadpool());
0119
0120 template <typename Up = Tp>
0121 TaskGroup(ThreadPool* _tp = internal::get_default_threadpool(),
0122 enable_if_t<std::is_void<Up>::value, int> = 0);
0123
0124
0125 ~TaskGroup();
0126
0127
0128 TaskGroup(const this_type&) = delete;
0129
0130
0131 TaskGroup(this_type&& rhs) = default;
0132
0133 TaskGroup& operator=(const this_type& rhs) = delete;
0134
0135
0136 TaskGroup& operator=(this_type&& rhs) = default;
0137
0138 public:
0139 template <typename Up>
0140 std::shared_ptr<Up> operator+=(std::shared_ptr<Up>&& _task);
0141
0142
0143 void wait();
0144
0145
0146 intmax_t operator++() { return ++(m_tot_task_count); }
0147 intmax_t operator++(int) { return (m_tot_task_count)++; }
0148 intmax_t operator--() { return --(m_tot_task_count); }
0149 intmax_t operator--(int) { return (m_tot_task_count)--; }
0150
0151
0152 intmax_t size() const { return m_tot_task_count.load(); }
0153
0154
0155 lock_t& task_lock() { return m_task_lock; }
0156 condition_t& task_cond() { return m_task_cond; }
0157
0158
0159 uintmax_t id() const { return m_id; }
0160
0161
0162 void set_pool(ThreadPool* tp) { m_pool = tp; }
0163 ThreadPool*& pool() { return m_pool; }
0164 ThreadPool* pool() const { return m_pool; }
0165
0166 bool is_native_task_group() const { return (m_tbb_task_group) == nullptr; }
0167 bool is_main() const { return this_tid() == m_main_tid; }
0168
0169
0170 intmax_t pending() { return m_tot_task_count.load(); }
0171
0172 static void set_verbose(int level) { f_verbose = level; }
0173
0174 ScopeDestructor get_scope_destructor();
0175
0176 void notify();
0177 void notify_all();
0178
0179 void reserve(size_t _n)
0180 {
0181 m_task_list.reserve(_n);
0182 m_future_list.reserve(_n);
0183 }
0184
0185 public:
0186 template <typename Func, typename... Args>
0187 std::shared_ptr<task_type<Args...>> wrap(Func func, Args... args)
0188 {
0189 return operator+=(std::make_shared<task_type<Args...>>(
0190 is_native_task_group(), m_depth, std::move(func), std::move(args)...));
0191 }
0192
0193 template <typename Func, typename... Args, typename Up = Tp>
0194 enable_if_t<std::is_void<Up>::value, void> exec(Func func, Args... args);
0195
0196 template <typename Func, typename... Args, typename Up = Tp>
0197 enable_if_t<!std::is_void<Up>::value, void> exec(Func func, Args... args);
0198
0199 template <typename Func, typename... Args>
0200 void run(Func func, Args... args)
0201 {
0202 exec(std::move(func), std::move(args)...);
0203 }
0204
0205 protected:
0206 template <typename Up, typename Func, typename... Args>
0207 enable_if_t<std::is_void<Up>::value, void> local_exec(Func func, Args... args);
0208
0209 template <typename Up, typename Func, typename... Args>
0210 enable_if_t<!std::is_void<Up>::value, void> local_exec(Func func, Args... args);
0211
0212
0213 using itr_t = iterator;
0214 using citr_t = const_iterator;
0215 using ritr_t = reverse_iterator;
0216 using critr_t = const_reverse_iterator;
0217
0218 public:
0219
0220
0221
0222 future_list_t& get_tasks() { return m_future_list; }
0223 const future_list_t& get_tasks() const { return m_future_list; }
0224
0225
0226
0227
0228 itr_t begin() { return m_future_list.begin(); }
0229 itr_t end() { return m_future_list.end(); }
0230 citr_t begin() const { return m_future_list.begin(); }
0231 citr_t end() const { return m_future_list.end(); }
0232 citr_t cbegin() const { return m_future_list.begin(); }
0233 citr_t cend() const { return m_future_list.end(); }
0234 ritr_t rbegin() { return m_future_list.rbegin(); }
0235 ritr_t rend() { return m_future_list.rend(); }
0236 critr_t rbegin() const { return m_future_list.rbegin(); }
0237 critr_t rend() const { return m_future_list.rend(); }
0238
0239
0240
0241 template <typename Up = Tp, enable_if_t<!std::is_void<Up>::value, int> = 0>
0242 inline Up join(Up accum = {});
0243
0244
0245 template <typename Up = Tp, typename Rp = Arg,
0246 enable_if_t<std::is_void<Up>::value && std::is_void<Rp>::value, int> = 0>
0247 inline void join();
0248
0249
0250 template <typename Up = Tp, typename Rp = Arg,
0251 enable_if_t<std::is_void<Up>::value && !std::is_void<Rp>::value, int> = 0>
0252 inline void join();
0253
0254
0255 void clear();
0256
0257 protected:
0258
0259
0260 static tid_type this_tid() { return std::this_thread::get_id(); }
0261
0262
0263
0264 atomic_int& task_count() { return m_tot_task_count; }
0265 const atomic_int& task_count() const { return m_tot_task_count; }
0266
0267 protected:
0268 static int f_verbose;
0269
0270 uintmax_t m_id = internal::task_group_counter()++;
0271 intmax_t m_depth = internal::get_task_depth();
0272 tid_type m_main_tid = std::this_thread::get_id();
0273 atomic_int m_tot_task_count{ 0 };
0274 lock_t m_task_lock = {};
0275 condition_t m_task_cond = {};
0276 join_type m_join{};
0277 ThreadPool* m_pool = internal::get_default_threadpool();
0278 tbb_task_group_t* m_tbb_task_group = nullptr;
0279 task_list_t m_task_list = {};
0280 future_list_t m_future_list = {};
0281
0282 private:
0283 void internal_update();
0284 };
0285
0286 }
0287 namespace PTL
0288 {
0289 template <typename Tp, typename Arg, intmax_t MaxDepth>
0290 template <typename Func>
0291 TaskGroup<Tp, Arg, MaxDepth>::TaskGroup(Func&& _join, ThreadPool* _tp)
0292 : m_join{ std::forward<Func>(_join) }
0293 , m_pool{ _tp }
0294 {
0295 internal_update();
0296 }
0297
0298 template <typename Tp, typename Arg, intmax_t MaxDepth>
0299 template <typename Up>
0300 TaskGroup<Tp, Arg, MaxDepth>::TaskGroup(ThreadPool* _tp,
0301 enable_if_t<std::is_void<Up>::value, int>)
0302 : m_join{ []() {} }
0303 , m_pool{ _tp }
0304 {
0305 internal_update();
0306 }
0307
0308
0309 template <typename Tp, typename Arg, intmax_t MaxDepth>
0310 TaskGroup<Tp, Arg, MaxDepth>::~TaskGroup()
0311 {
0312 {
0313
0314
0315
0316 AutoLock _lk{ m_task_lock, std::defer_lock };
0317 if(!_lk.owns_lock())
0318 _lk.lock();
0319 }
0320
0321 if(m_tbb_task_group)
0322 {
0323 auto* _arena = m_pool->get_task_arena();
0324 _arena->execute([this]() { this->m_tbb_task_group->wait(); });
0325 }
0326 delete m_tbb_task_group;
0327 this->clear();
0328 }
0329
0330 template <typename Tp, typename Arg, intmax_t MaxDepth>
0331 template <typename Up>
0332 std::shared_ptr<Up>
0333 TaskGroup<Tp, Arg, MaxDepth>::operator+=(std::shared_ptr<Up>&& _task)
0334 {
0335
0336 operator++();
0337
0338 m_task_list.push_back(_task);
0339
0340 return std::move(_task);
0341 }
0342
0343 template <typename Tp, typename Arg, intmax_t MaxDepth>
0344 void
0345 TaskGroup<Tp, Arg, MaxDepth>::wait()
0346 {
0347 auto _dtor = ScopeDestructor{ [&]() {
0348 if(m_tbb_task_group)
0349 {
0350 auto* _arena = m_pool->get_task_arena();
0351 _arena->execute([this]() { this->m_tbb_task_group->wait(); });
0352 }
0353 } };
0354
0355 ThreadData* data = ThreadData::GetInstance();
0356 if(!data)
0357 return;
0358
0359
0360 if(!m_pool)
0361 {
0362
0363 m_pool = internal::get_default_threadpool();
0364
0365
0366 if(!m_pool)
0367 {
0368 if(f_verbose > 0)
0369 {
0370 fprintf(stderr, "%s @ %i :: Warning! nullptr to thread-pool (%p)\n",
0371 __FUNCTION__, __LINE__, static_cast<void*>(m_pool));
0372 std::cerr << __FUNCTION__ << "@" << __LINE__ << " :: Warning! "
0373 << "nullptr to thread pool!" << std::endl;
0374 }
0375 return;
0376 }
0377 }
0378
0379 ThreadPool* tpool = (m_pool) ? m_pool : data->thread_pool;
0380 VUserTaskQueue* taskq = (tpool) ? tpool->get_queue() : data->current_queue;
0381
0382 bool _is_main = data->is_main;
0383 bool _within_task = data->within_task;
0384
0385 auto is_active_state = [&]() {
0386 return (tpool->state()->load(std::memory_order_relaxed) !=
0387 thread_pool::state::STOPPED);
0388 };
0389
0390 auto execute_this_threads_tasks = [&]() {
0391 if(!taskq)
0392 return;
0393
0394
0395 if((!_is_main || tpool->size() < 2) && _within_task)
0396 {
0397 int bin = static_cast<int>(taskq->GetThreadBin());
0398
0399
0400 while(this->pending() > 0)
0401 {
0402 if(!taskq->empty())
0403 {
0404 auto _task = taskq->GetTask(bin);
0405 if(_task)
0406 (*_task)();
0407 }
0408 }
0409 }
0410 };
0411
0412
0413 if(!is_native_task_group())
0414 {
0415
0416 if(!_is_main || tpool->size() < 2)
0417 return;
0418 }
0419 else if(f_verbose > 0)
0420 {
0421 if(!tpool || !taskq)
0422 {
0423
0424 fprintf(stderr,
0425 "%s @ %i :: Warning! nullptr to thread data (%p) or task-queue "
0426 "(%p)\n",
0427 __FUNCTION__, __LINE__, static_cast<void*>(tpool),
0428 static_cast<void*>(taskq));
0429 }
0430
0431 else if(is_native_task_group() && !tpool->is_alive())
0432 {
0433 fprintf(stderr, "%s @ %i :: Warning! thread-pool is not alive!\n",
0434 __FUNCTION__, __LINE__);
0435 }
0436 else if(!is_active_state())
0437 {
0438 fprintf(stderr, "%s @ %i :: Warning! thread-pool is not active!\n",
0439 __FUNCTION__, __LINE__);
0440 }
0441 }
0442
0443 intmax_t wake_size = 2;
0444 AutoLock _lock(m_task_lock, std::defer_lock);
0445
0446 while(is_active_state())
0447 {
0448 execute_this_threads_tasks();
0449
0450
0451 while(_is_main && pending() > 0 && is_active_state())
0452 {
0453
0454
0455
0456
0457
0458 if(!_lock.owns_lock())
0459 _lock.lock();
0460
0461
0462
0463
0464 if(pending() >= wake_size)
0465 {
0466 m_task_cond.wait(_lock);
0467 }
0468 else
0469 {
0470 m_task_cond.wait_for(_lock, std::chrono::microseconds(100));
0471 }
0472
0473 if(_lock.owns_lock())
0474 _lock.unlock();
0475 }
0476
0477
0478 if(pending() <= 0)
0479 break;
0480 }
0481
0482 if(_lock.owns_lock())
0483 _lock.unlock();
0484
0485 intmax_t ntask = this->task_count().load();
0486 if(ntask > 0)
0487 {
0488 std::stringstream ss;
0489 ss << "\nWarning! Join operation issue! " << ntask << " tasks still "
0490 << "are running!" << std::endl;
0491 std::cerr << ss.str();
0492 this->wait();
0493 }
0494 }
0495
0496 template <typename Tp, typename Arg, intmax_t MaxDepth>
0497 ScopeDestructor
0498 TaskGroup<Tp, Arg, MaxDepth>::get_scope_destructor()
0499 {
0500 auto& _counter = m_tot_task_count;
0501 auto& _task_cond = task_cond();
0502 auto& _task_lock = task_lock();
0503 return ScopeDestructor{ [&_task_cond, &_task_lock, &_counter]() {
0504 auto _count = --(_counter);
0505 if(_count < 1)
0506 {
0507 AutoLock _lk{ _task_lock };
0508 _task_cond.notify_all();
0509 }
0510 } };
0511 }
0512
0513 template <typename Tp, typename Arg, intmax_t MaxDepth>
0514 void
0515 TaskGroup<Tp, Arg, MaxDepth>::notify()
0516 {
0517 AutoLock _lk{ m_task_lock };
0518 m_task_cond.notify_one();
0519 }
0520
0521 template <typename Tp, typename Arg, intmax_t MaxDepth>
0522 void
0523 TaskGroup<Tp, Arg, MaxDepth>::notify_all()
0524 {
0525 AutoLock _lk{ m_task_lock };
0526 m_task_cond.notify_all();
0527 }
0528
0529 template <typename Tp, typename Arg, intmax_t MaxDepth>
0530 template <typename Func, typename... Args, typename Up>
0531 enable_if_t<std::is_void<Up>::value, void>
0532 TaskGroup<Tp, Arg, MaxDepth>::exec(Func func, Args... args)
0533 {
0534 if(MaxDepth > 0 && !m_tbb_task_group && ThreadData::GetInstance() &&
0535 ThreadData::GetInstance()->task_depth > MaxDepth)
0536 {
0537 local_exec<Tp>(std::move(func), std::move(args)...);
0538 }
0539 else
0540 {
0541 auto& _counter = m_tot_task_count;
0542 auto& _task_cond = task_cond();
0543 auto& _task_lock = task_lock();
0544 auto _task = wrap([&_task_cond, &_task_lock, &_counter, func, args...]() {
0545 auto* _tdata = ThreadData::GetInstance();
0546 if(_tdata)
0547 ++(_tdata->task_depth);
0548 func(args...);
0549 auto _count = --(_counter);
0550 if(_tdata)
0551 --(_tdata->task_depth);
0552 if(_count < 1)
0553 {
0554 AutoLock _lk{ _task_lock };
0555 _task_cond.notify_all();
0556 }
0557 });
0558
0559 if(m_tbb_task_group)
0560 {
0561 auto* _arena = m_pool->get_task_arena();
0562 auto* _tbb_task_group = m_tbb_task_group;
0563 auto* _ptask = _task.get();
0564 _arena->execute([_tbb_task_group, _ptask]() {
0565 _tbb_task_group->run([_ptask]() { (*_ptask)(); });
0566 });
0567 }
0568 else
0569 {
0570 m_pool->add_task(std::move(_task));
0571 }
0572 }
0573 }
0574 template <typename Tp, typename Arg, intmax_t MaxDepth>
0575 template <typename Func, typename... Args, typename Up>
0576 enable_if_t<!std::is_void<Up>::value, void>
0577 TaskGroup<Tp, Arg, MaxDepth>::exec(Func func, Args... args)
0578 {
0579 if(MaxDepth > 0 && !m_tbb_task_group && ThreadData::GetInstance() &&
0580 ThreadData::GetInstance()->task_depth > MaxDepth)
0581 {
0582 local_exec<Tp>(std::move(func), std::move(args)...);
0583 }
0584 else
0585 {
0586 auto& _counter = m_tot_task_count;
0587 auto& _task_cond = task_cond();
0588 auto& _task_lock = task_lock();
0589 auto _task = wrap([&_task_cond, &_task_lock, &_counter, func, args...]() {
0590 auto* _tdata = ThreadData::GetInstance();
0591 if(_tdata)
0592 ++(_tdata->task_depth);
0593 auto&& _ret = func(args...);
0594 auto _count = --(_counter);
0595 if(_tdata)
0596 --(_tdata->task_depth);
0597 if(_count < 1)
0598 {
0599 AutoLock _lk{ _task_lock };
0600 _task_cond.notify_all();
0601 }
0602 return std::forward<decltype(_ret)>(_ret);
0603 });
0604
0605 if(m_tbb_task_group)
0606 {
0607 auto* _arena = m_pool->get_task_arena();
0608 auto* _tbb_task_group = m_tbb_task_group;
0609 auto* _ptask = _task.get();
0610 _arena->execute([_tbb_task_group, _ptask]() {
0611 _tbb_task_group->run([_ptask]() { (*_ptask)(); });
0612 });
0613 }
0614 else
0615 {
0616 m_pool->add_task(std::move(_task));
0617 }
0618 }
0619 }
0620
0621 template <typename Tp, typename Arg, intmax_t MaxDepth>
0622 template <typename Up, typename Func, typename... Args>
0623 enable_if_t<std::is_void<Up>::value, void>
0624 TaskGroup<Tp, Arg, MaxDepth>::local_exec(Func func, Args... args)
0625 {
0626 auto* _tdata = ThreadData::GetInstance();
0627 if(_tdata)
0628 ++(_tdata->task_depth);
0629 promise_type _p{};
0630 m_future_list.emplace_back(_p.get_future());
0631 func(args...);
0632 _p.set_value();
0633 if(_tdata)
0634 --(_tdata->task_depth);
0635 }
0636
0637 template <typename Tp, typename Arg, intmax_t MaxDepth>
0638 template <typename Up, typename Func, typename... Args>
0639 enable_if_t<!std::is_void<Up>::value, void>
0640 TaskGroup<Tp, Arg, MaxDepth>::local_exec(Func func, Args... args)
0641 {
0642 auto* _tdata = ThreadData::GetInstance();
0643 if(_tdata)
0644 ++(_tdata->task_depth);
0645 promise_type _p{};
0646 m_future_list.emplace_back(_p.get_future());
0647 _p.set_value(func(args...));
0648 if(_tdata)
0649 --(_tdata->task_depth);
0650 }
0651
0652 template <typename Tp, typename Arg, intmax_t MaxDepth>
0653 template <typename Up, enable_if_t<!std::is_void<Up>::value, int>>
0654 inline Up
0655 TaskGroup<Tp, Arg, MaxDepth>::join(Up accum)
0656 {
0657 this->wait();
0658 for(auto& itr : m_task_list)
0659 {
0660 using RetT = decay_t<decltype(itr->get())>;
0661 accum = std::move(m_join(std::ref(accum), std::forward<RetT>(itr->get())));
0662 }
0663 for(auto& itr : m_future_list)
0664 {
0665 using RetT = decay_t<decltype(itr.get())>;
0666 accum = std::move(m_join(std::ref(accum), std::forward<RetT>(itr.get())));
0667 }
0668 this->clear();
0669 return accum;
0670 }
0671
0672 template <typename Tp, typename Arg, intmax_t MaxDepth>
0673 template <typename Up, typename Rp,
0674 enable_if_t<std::is_void<Up>::value && std::is_void<Rp>::value, int>>
0675 inline void
0676 TaskGroup<Tp, Arg, MaxDepth>::join()
0677 {
0678 this->wait();
0679 for(auto& itr : m_task_list)
0680 itr->get();
0681 for(auto& itr : m_future_list)
0682 itr.get();
0683 m_join();
0684 this->clear();
0685 }
0686
0687 template <typename Tp, typename Arg, intmax_t MaxDepth>
0688 template <typename Up, typename Rp,
0689 enable_if_t<std::is_void<Up>::value && !std::is_void<Rp>::value, int>>
0690 inline void
0691 TaskGroup<Tp, Arg, MaxDepth>::join()
0692 {
0693 this->wait();
0694 for(auto& itr : m_task_list)
0695 {
0696 using RetT = decay_t<decltype(itr->get())>;
0697 m_join(std::forward<RetT>(itr->get()));
0698 }
0699 for(auto& itr : m_future_list)
0700 {
0701 using RetT = decay_t<decltype(itr.get())>;
0702 m_join(std::forward<RetT>(itr.get()));
0703 }
0704 this->clear();
0705 }
0706
0707 template <typename Tp, typename Arg, intmax_t MaxDepth>
0708 void
0709 TaskGroup<Tp, Arg, MaxDepth>::clear()
0710 {
0711 m_future_list.clear();
0712 m_task_list.clear();
0713 }
0714
0715 template <typename Tp, typename Arg, intmax_t MaxDepth>
0716 void
0717 TaskGroup<Tp, Arg, MaxDepth>::internal_update()
0718 {
0719 if(!m_pool)
0720 m_pool = internal::get_default_threadpool();
0721
0722 if(!m_pool)
0723 {
0724 std::stringstream ss{};
0725 ss << "[TaskGroup]> " << __FUNCTION__ << "@" << __LINE__
0726 << " :: nullptr to thread pool";
0727 throw std::runtime_error(ss.str());
0728 }
0729
0730 if(m_pool->is_tbb_threadpool())
0731 {
0732 m_tbb_task_group = new tbb_task_group_t{};
0733 }
0734 }
0735
0736 template <typename Tp, typename Arg, intmax_t MaxDepth>
0737 int TaskGroup<Tp, Arg, MaxDepth>::f_verbose = 0;
0738
0739 }