File indexing completed on 2025-09-18 09:13:52
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031 #pragma once
0032
0033 #include "PTL/AutoLock.hh"
0034 #ifndef G4GMAKE
0035 #include "PTL/Config.hh"
0036 #endif
0037 #include "PTL/ThreadData.hh"
0038 #include "PTL/Threading.hh"
0039 #include "PTL/Types.hh"
0040 #include "PTL/VTask.hh"
0041 #include "PTL/VUserTaskQueue.hh"
0042
0043 #if defined(PTL_USE_TBB)
0044 # if !defined(TBB_SUPPRESS_DEPRECATED_MESSAGES)
0045 # define TBB_SUPPRESS_DEPRECATED_MESSAGES 1
0046 # endif
0047 # if !defined(TBB_PREVIEW_GLOBAL_CONTROL)
0048 # define TBB_PREVIEW_GLOBAL_CONTROL 1
0049 # endif
0050 # include <tbb/global_control.h>
0051 # include <tbb/task_arena.h>
0052 # include <tbb/task_group.h>
0053 #endif
0054
0055 #include <algorithm>
0056 #include <atomic>
0057 #include <chrono>
0058 #include <cstdint>
0059 #include <cstdlib>
0060 #include <deque>
0061 #include <functional>
0062 #include <iostream>
0063 #include <map>
0064 #include <memory>
0065 #include <mutex> // IWYU pragma: keep
0066 #include <set>
0067 #include <thread>
0068 #include <type_traits> // IWYU pragma: keep
0069 #include <unordered_map>
0070 #include <utility>
0071 #include <vector>
0072
0073 namespace PTL
0074 {
0075 namespace thread_pool
0076 {
0077 namespace state
0078 {
0079 static const short STARTED = 0;
0080 static const short PARTIAL = 1;
0081 static const short STOPPED = 2;
0082 static const short NONINIT = 3;
0083
0084 }
0085 }
0086
0087 class ThreadPool
0088 {
0089 public:
0090 template <typename KeyT, typename MappedT, typename HashT = KeyT>
0091 using uomap = std::unordered_map<KeyT, MappedT, std::hash<HashT>>;
0092
0093
0094 using size_type = size_t;
0095 using task_count_type = std::shared_ptr<std::atomic_uintmax_t>;
0096 using atomic_int_type = std::shared_ptr<std::atomic_uintmax_t>;
0097 using pool_state_type = std::shared_ptr<std::atomic_short>;
0098 using atomic_bool_type = std::shared_ptr<std::atomic_bool>;
0099
0100 using task_type = VTask;
0101 using lock_t = std::shared_ptr<Mutex>;
0102 using condition_t = std::shared_ptr<Condition>;
0103 using task_pointer = std::shared_ptr<task_type>;
0104 using task_queue_t = VUserTaskQueue;
0105
0106 using thread_list_t = std::deque<ThreadId>;
0107 using bool_list_t = std::vector<bool>;
0108 using thread_id_map_t = std::map<ThreadId, uintmax_t>;
0109 using thread_index_map_t = std::map<uintmax_t, ThreadId>;
0110 using thread_vec_t = std::vector<Thread>;
0111 using thread_data_t = std::vector<std::shared_ptr<ThreadData>>;
0112
0113 using initialize_func_t = std::function<void()>;
0114 using finalize_func_t = std::function<void()>;
0115 using affinity_func_t = std::function<intmax_t(intmax_t)>;
0116
0117 static affinity_func_t& affinity_functor()
0118 {
0119 static affinity_func_t _v = [](intmax_t) {
0120 static std::atomic<intmax_t> assigned;
0121 intmax_t _assign = assigned++;
0122 return _assign % Thread::hardware_concurrency();
0123 };
0124 return _v;
0125 }
0126
0127 static initialize_func_t& initialization_functor()
0128 {
0129 static initialize_func_t _v = []() {};
0130 return _v;
0131 }
0132
0133 static finalize_func_t& finalization_functor()
0134 {
0135 static finalize_func_t _v = []() {};
0136 return _v;
0137 }
0138
0139 struct Config
0140 {
0141 bool init = true;
0142 bool use_tbb = false;
0143 bool use_affinity = false;
0144 int verbose = 0;
0145 int priority = 0;
0146 size_type pool_size = f_default_pool_size();
0147 VUserTaskQueue* task_queue = nullptr;
0148 affinity_func_t set_affinity = affinity_functor();
0149 initialize_func_t initializer = initialization_functor();
0150 finalize_func_t finalizer = finalization_functor();
0151 };
0152
0153 public:
0154
0155 explicit ThreadPool(const Config&);
0156 ~ThreadPool();
0157 ThreadPool(const ThreadPool&) = delete;
0158 ThreadPool(ThreadPool&&) = default;
0159 ThreadPool& operator=(const ThreadPool&) = delete;
0160 ThreadPool& operator=(ThreadPool&&) = default;
0161
0162 public:
0163
0164 size_type initialize_threadpool(size_type);
0165 size_type destroy_threadpool();
0166 size_type stop_thread();
0167
0168 template <typename FuncT>
0169 void execute_on_all_threads(FuncT&& _func);
0170
0171 template <typename FuncT>
0172 void execute_on_specific_threads(const std::set<std::thread::id>& _tid,
0173 FuncT&& _func);
0174
0175 task_queue_t* get_queue() const { return m_task_queue; }
0176 task_queue_t*& get_valid_queue(task_queue_t*&) const;
0177
0178 bool is_tbb_threadpool() const { return m_tbb_tp; }
0179
0180 public:
0181
0182 static void set_default_size(size_type _v) { f_default_pool_size() = _v; }
0183
0184
0185 static size_type get_default_size() { return f_default_pool_size(); }
0186
0187 public:
0188
0189 size_type add_task(task_pointer&& task, int bin = -1);
0190
0191
0192 template <typename ListT>
0193 size_type add_tasks(ListT&);
0194
0195 Thread* get_thread(size_type _n) const;
0196 Thread* get_thread(std::thread::id id) const;
0197
0198
0199 static tbb_global_control_t*& tbb_global_control();
0200
0201 void set_initialization(initialize_func_t f) { m_init_func = std::move(f); }
0202 void set_finalization(finalize_func_t f) { m_fini_func = std::move(f); }
0203
0204 void reset_initialization()
0205 {
0206 m_init_func = []() {};
0207 }
0208 void reset_finalization()
0209 {
0210 m_fini_func = []() {};
0211 }
0212
0213 public:
0214
0215 const pool_state_type& state() const { return m_pool_state; }
0216
0217 size_type size() const { return m_pool_size; }
0218
0219 void resize(size_type _n);
0220
0221 bool using_affinity() const { return m_use_affinity; }
0222 bool is_alive() { return m_alive_flag->load(); }
0223 void notify();
0224 void notify_all();
0225 void notify(size_type);
0226 bool is_initialized() const;
0227 int get_active_threads_count() const { return (int)m_thread_awake->load(); }
0228
0229 void set_affinity(affinity_func_t f) { m_affinity_func = std::move(f); }
0230 void set_affinity(intmax_t i, Thread&) const;
0231 void set_priority(int _prio, Thread&) const;
0232
0233 void set_verbose(int n) { m_verbose = n; }
0234 int get_verbose() const { return m_verbose; }
0235 bool is_main() const { return ThisThread::get_id() == m_main_tid; }
0236
0237 tbb_task_arena_t* get_task_arena();
0238
0239 public:
0240
0241 static const thread_id_map_t& get_thread_ids();
0242 static uintmax_t get_thread_id(ThreadId);
0243 static uintmax_t get_this_thread_id();
0244 static uintmax_t add_thread_id(ThreadId = ThisThread::get_id());
0245
0246 private:
0247 void execute_thread(VUserTaskQueue*);
0248 int insert(task_pointer&&, int = -1);
0249 int run_on_this(task_pointer&&);
0250
0251 private:
0252
0253 static void start_thread(ThreadPool*, thread_data_t*, intmax_t = -1);
0254
0255 void record_entry();
0256 void record_exit();
0257
0258 private:
0259
0260
0261 bool m_use_affinity = false;
0262 bool m_tbb_tp = false;
0263 bool m_delete_task_queue = false;
0264 int m_verbose = 0;
0265 int m_priority = 0;
0266 size_type m_pool_size = 0;
0267 ThreadId m_main_tid = ThisThread::get_id();
0268 atomic_bool_type m_alive_flag = std::make_shared<std::atomic_bool>(false);
0269 pool_state_type m_pool_state = std::make_shared<std::atomic_short>(0);
0270 atomic_int_type m_thread_awake = std::make_shared<std::atomic_uintmax_t>(0);
0271 atomic_int_type m_thread_active = std::make_shared<std::atomic_uintmax_t>(0);
0272
0273
0274 lock_t m_task_lock = std::make_shared<Mutex>();
0275
0276 condition_t m_task_cond = std::make_shared<Condition>();
0277
0278
0279 bool_list_t m_is_joined = {};
0280 bool_list_t m_is_stopped = {};
0281 thread_list_t m_main_threads = {};
0282 thread_list_t m_stop_threads = {};
0283 thread_vec_t m_threads = {};
0284 thread_data_t m_thread_data = {};
0285
0286
0287 task_queue_t* m_task_queue = nullptr;
0288 tbb_task_arena_t* m_tbb_task_arena = nullptr;
0289 tbb_task_group_t* m_tbb_task_group = nullptr;
0290
0291
0292 initialize_func_t m_init_func = initialization_functor();
0293 finalize_func_t m_fini_func = finalization_functor();
0294 affinity_func_t m_affinity_func = affinity_functor();
0295
0296 private:
0297 static size_type& f_default_pool_size();
0298 static thread_id_map_t& f_thread_ids();
0299 };
0300
0301
0302 inline void
0303 ThreadPool::notify()
0304 {
0305
0306 if(m_thread_awake->load() < m_pool_size)
0307 {
0308 AutoLock l(*m_task_lock);
0309 m_task_cond->notify_one();
0310 }
0311 }
0312
0313 inline void
0314 ThreadPool::notify_all()
0315 {
0316
0317 AutoLock l(*m_task_lock);
0318 m_task_cond->notify_all();
0319 }
0320
0321 inline void
0322 ThreadPool::notify(size_type ntasks)
0323 {
0324 if(ntasks == 0)
0325 return;
0326
0327
0328 if(m_thread_awake->load() < m_pool_size)
0329 {
0330 AutoLock l(*m_task_lock);
0331 if(ntasks < this->size())
0332 {
0333 for(size_type i = 0; i < ntasks; ++i)
0334 m_task_cond->notify_one();
0335 }
0336 else
0337 {
0338 m_task_cond->notify_all();
0339 }
0340 }
0341 }
0342
0343
0344 inline tbb_global_control_t*&
0345 ThreadPool::tbb_global_control()
0346 {
0347 static thread_local tbb_global_control_t* _instance = nullptr;
0348 return _instance;
0349 }
0350
0351
0352 inline tbb_task_arena_t*
0353 ThreadPool::get_task_arena()
0354 {
0355 #if defined(PTL_USE_TBB)
0356
0357 if(!m_tbb_task_arena)
0358 {
0359 auto _sz = (tbb_global_control())
0360 ? tbb_global_control()->active_value(
0361 tbb::global_control::max_allowed_parallelism)
0362 : size();
0363 m_tbb_task_arena = new tbb_task_arena_t(::tbb::task_arena::attach{});
0364 m_tbb_task_arena->initialize(_sz, 1);
0365 }
0366 #else
0367 if(!m_tbb_task_arena)
0368 m_tbb_task_arena = new tbb_task_arena_t{};
0369 #endif
0370 return m_tbb_task_arena;
0371 }
0372
0373 inline void
0374 ThreadPool::resize(size_type _n)
0375 {
0376 initialize_threadpool(_n);
0377 if(m_task_queue)
0378 m_task_queue->resize(static_cast<intmax_t>(_n));
0379 }
0380
0381 inline int
0382 ThreadPool::run_on_this(task_pointer&& _task)
0383 {
0384 auto&& _func = [_task]() { (*_task)(); };
0385
0386 if(m_tbb_tp && m_tbb_task_group)
0387 {
0388 auto* _arena = get_task_arena();
0389 _arena->execute([this, _func]() { this->m_tbb_task_group->run(_func); });
0390 }
0391 else
0392 {
0393 _func();
0394 }
0395
0396 return 0;
0397 }
0398
0399 inline int
0400 ThreadPool::insert(task_pointer&& task, int bin)
0401 {
0402 static thread_local ThreadData* _data = ThreadData::GetInstance();
0403
0404
0405 auto ibin = get_valid_queue(m_task_queue)->InsertTask(std::move(task), _data, bin);
0406 notify();
0407 return (int)ibin;
0408 }
0409
0410 inline ThreadPool::size_type
0411 ThreadPool::add_task(task_pointer&& task, int bin)
0412 {
0413
0414 if(m_tbb_tp || !task->is_native_task() || !m_alive_flag->load())
0415 return static_cast<size_type>(run_on_this(std::move(task)));
0416
0417 return static_cast<size_type>(insert(std::move(task), bin));
0418 }
0419
0420 template <typename ListT>
0421 inline ThreadPool::size_type
0422 ThreadPool::add_tasks(ListT& c)
0423 {
0424 if(!m_alive_flag)
0425 {
0426 for(auto& itr : c)
0427 run(itr);
0428 c.clear();
0429 return 0;
0430 }
0431
0432
0433 auto c_size = c.size();
0434 for(auto& itr : c)
0435 {
0436 if(!itr->is_native_task())
0437 --c_size;
0438 else
0439 {
0440
0441 get_valid_queue(m_task_queue)->InsertTask(itr);
0442 }
0443 }
0444 c.clear();
0445
0446
0447 notify(c_size);
0448
0449 return c_size;
0450 }
0451
0452 template <typename FuncT>
0453 inline void
0454 ThreadPool::execute_on_all_threads(FuncT&& _func)
0455 {
0456 if(m_tbb_tp && m_tbb_task_group)
0457 {
0458 #if defined(PTL_USE_TBB)
0459
0460
0461
0462
0463 std::set<std::thread::id> _first{};
0464 Mutex _mutex{};
0465
0466 auto _init = [&]() {
0467 int _once = 0;
0468 _mutex.lock();
0469 if(_first.find(std::this_thread::get_id()) == _first.end())
0470 {
0471
0472
0473 _once = 1;
0474 _first.insert(std::this_thread::get_id());
0475 }
0476 _mutex.unlock();
0477 if(_once != 0)
0478 {
0479 _func();
0480 return 1;
0481 }
0482 return 0;
0483 };
0484
0485
0486 std::atomic<size_t> _total_init{ 0 };
0487
0488 size_t _maxp = tbb_global_control()->active_value(
0489 tbb::global_control::max_allowed_parallelism);
0490
0491 auto* _arena = get_task_arena();
0492
0493 size_t _sz = size();
0494
0495 size_t _ncore = GetNumberOfCores();
0496
0497 size_t _dmax = std::max<size_t>(_ncore, 8);
0498
0499 size_t _num = std::min(_maxp, std::min(_sz, _ncore));
0500
0501 std::function<void()> _init_task;
0502 _init_task = [&]() {
0503 add_thread_id();
0504 static thread_local size_type _depth = 0;
0505 int _ret = 0;
0506
0507 if(!is_main())
0508 {
0509
0510 _ret = _init();
0511
0512 _total_init += _ret;
0513 }
0514
0515
0516 ++_depth;
0517 if(_ret == 0 && _depth < _dmax && _total_init.load() < _num)
0518 {
0519 tbb::task_group tg{};
0520 tg.run([&]() { _init_task(); });
0521 tg.run([&]() { _init_task(); });
0522 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
0523 tg.wait();
0524 }
0525 --_depth;
0526 };
0527
0528
0529 size_t nitr = 0;
0530 auto _fname = __FUNCTION__;
0531 auto _write_info = [&]() {
0532 std::cout << "[" << _fname << "]> Total initialized: " << _total_init
0533 << ", expected: " << _num << ", max-parallel: " << _maxp
0534 << ", size: " << _sz << ", ncore: " << _ncore << std::endl;
0535 };
0536 while(_total_init < _num)
0537 {
0538 auto _n = 2 * _num;
0539 while(--_n > 0)
0540 {
0541 _arena->execute(
0542 [&]() { m_tbb_task_group->run([&]() { _init_task(); }); });
0543 }
0544 _arena->execute([&]() { m_tbb_task_group->wait(); });
0545
0546 if(nitr++ > 2 * (_num + 1) && (_total_init - 1) == _num)
0547 {
0548 _write_info();
0549 break;
0550 }
0551
0552 if(nitr > 4 * (_ncore + 1))
0553 {
0554 _write_info();
0555 break;
0556 }
0557 }
0558 if(get_verbose() > 3)
0559 _write_info();
0560 #endif
0561 }
0562 else if(get_queue())
0563 {
0564 get_queue()->ExecuteOnAllThreads(this, std::forward<FuncT>(_func));
0565 }
0566 }
0567
0568
0569
0570 template <typename FuncT>
0571 inline void
0572 ThreadPool::execute_on_specific_threads(const std::set<std::thread::id>& _tids,
0573 FuncT&& _func)
0574 {
0575 if(m_tbb_tp && m_tbb_task_group)
0576 {
0577 #if defined(PTL_USE_TBB)
0578
0579
0580
0581
0582 std::set<std::thread::id> _first{};
0583 Mutex _mutex{};
0584
0585 auto _exec = [&]() {
0586 int _once = 0;
0587 _mutex.lock();
0588 if(_first.find(std::this_thread::get_id()) == _first.end())
0589 {
0590
0591
0592 _once = 1;
0593 _first.insert(std::this_thread::get_id());
0594 }
0595 _mutex.unlock();
0596 if(_once != 0)
0597 {
0598 _func();
0599 return 1;
0600 }
0601 return 0;
0602 };
0603
0604
0605 std::atomic<size_t> _total_exec{ 0 };
0606
0607 size_t _ncore = GetNumberOfCores();
0608
0609 size_t _dmax = std::max<size_t>(_ncore, 8);
0610
0611 size_t _num = _tids.size();
0612
0613 auto* _arena = get_task_arena();
0614
0615 std::function<void()> _exec_task;
0616 _exec_task = [&]() {
0617 add_thread_id();
0618 static thread_local size_type _depth = 0;
0619 int _ret = 0;
0620 auto _this_tid = std::this_thread::get_id();
0621
0622 if(_tids.count(_this_tid) > 0)
0623 {
0624
0625 _ret = _exec();
0626
0627 _total_exec += _ret;
0628 }
0629
0630
0631 ++_depth;
0632 if(_ret == 0 && _depth < _dmax && _total_exec.load() < _num)
0633 {
0634 tbb::task_group tg{};
0635 tg.run([&]() { _exec_task(); });
0636 tg.run([&]() { _exec_task(); });
0637 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
0638 tg.wait();
0639 }
0640 --_depth;
0641 };
0642
0643
0644 size_t nitr = 0;
0645 auto _fname = __FUNCTION__;
0646 auto _write_info = [&]() {
0647 std::cout << "[" << _fname << "]> Total executed: " << _total_exec
0648 << ", expected: " << _num << ", size: " << size() << std::endl;
0649 };
0650 while(_total_exec < _num)
0651 {
0652 auto _n = 2 * _num;
0653 while(--_n > 0)
0654 {
0655 _arena->execute(
0656 [&]() { m_tbb_task_group->run([&]() { _exec_task(); }); });
0657 }
0658 _arena->execute([&]() { m_tbb_task_group->wait(); });
0659
0660 if(nitr++ > 2 * (_num + 1) && (_total_exec - 1) == _num)
0661 {
0662 _write_info();
0663 break;
0664 }
0665
0666 if(nitr > 8 * (_num + 1))
0667 {
0668 _write_info();
0669 break;
0670 }
0671 }
0672 if(get_verbose() > 3)
0673 _write_info();
0674 #endif
0675 }
0676 else if(get_queue())
0677 {
0678 get_queue()->ExecuteOnSpecificThreads(_tids, this, std::forward<FuncT>(_func));
0679 }
0680 }
0681
0682
0683
0684 }