File indexing completed on 2024-11-15 09:38:49
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031 #pragma once
0032
0033 #include "PTL/AutoLock.hh"
0034 #ifndef G4GMAKE
0035 #include "PTL/Config.hh"
0036 #endif
0037 #include "PTL/ThreadData.hh"
0038 #include "PTL/Threading.hh"
0039 #include "PTL/Types.hh"
0040 #include "PTL/VTask.hh"
0041 #include "PTL/VUserTaskQueue.hh"
0042
0043 #if defined(PTL_USE_TBB)
0044 # if !defined(TBB_SUPPRESS_DEPRECATED_MESSAGES)
0045 # define TBB_SUPPRESS_DEPRECATED_MESSAGES 1
0046 # endif
0047 # if !defined(TBB_PREVIEW_GLOBAL_CONTROL)
0048 # define TBB_PREVIEW_GLOBAL_CONTROL 1
0049 # endif
0050 # include <tbb/global_control.h>
0051 # include <tbb/task_arena.h>
0052 # include <tbb/task_group.h>
0053 #endif
0054
0055 #include <algorithm>
0056 #include <atomic>
0057 #include <chrono>
0058 #include <cstdint>
0059 #include <cstdlib>
0060 #include <deque>
0061 #include <functional>
0062 #include <iostream>
0063 #include <map>
0064 #include <memory>
0065 #include <mutex> // IWYU pragma: keep
0066 #include <set>
0067 #include <thread>
0068 #include <type_traits> // IWYU pragma: keep
0069 #include <unordered_map>
0070 #include <utility>
0071 #include <vector>
0072
0073 namespace PTL
0074 {
0075 namespace thread_pool
0076 {
0077 namespace state
0078 {
0079 static const short STARTED = 0;
0080 static const short PARTIAL = 1;
0081 static const short STOPPED = 2;
0082 static const short NONINIT = 3;
0083
0084 }
0085 }
0086
0087 class ThreadPool
0088 {
0089 public:
0090 template <typename KeyT, typename MappedT, typename HashT = KeyT>
0091 using uomap = std::unordered_map<KeyT, MappedT, std::hash<HashT>>;
0092
0093
0094 using size_type = size_t;
0095 using task_count_type = std::shared_ptr<std::atomic_uintmax_t>;
0096 using atomic_int_type = std::shared_ptr<std::atomic_uintmax_t>;
0097 using pool_state_type = std::shared_ptr<std::atomic_short>;
0098 using atomic_bool_type = std::shared_ptr<std::atomic_bool>;
0099
0100 using task_type = VTask;
0101 using lock_t = std::shared_ptr<Mutex>;
0102 using condition_t = std::shared_ptr<Condition>;
0103 using task_pointer = std::shared_ptr<task_type>;
0104 using task_queue_t = VUserTaskQueue;
0105
0106 using thread_list_t = std::deque<ThreadId>;
0107 using bool_list_t = std::vector<bool>;
0108 using thread_id_map_t = std::map<ThreadId, uintmax_t>;
0109 using thread_index_map_t = std::map<uintmax_t, ThreadId>;
0110 using thread_vec_t = std::vector<Thread>;
0111 using thread_data_t = std::vector<std::shared_ptr<ThreadData>>;
0112
0113 using initialize_func_t = std::function<void()>;
0114 using finalize_func_t = std::function<void()>;
0115 using affinity_func_t = std::function<intmax_t(intmax_t)>;
0116
0117 static affinity_func_t& affinity_functor()
0118 {
0119 static affinity_func_t _v = [](intmax_t) {
0120 static std::atomic<intmax_t> assigned;
0121 intmax_t _assign = assigned++;
0122 return _assign % Thread::hardware_concurrency();
0123 };
0124 return _v;
0125 }
0126
0127 static initialize_func_t& initialization_functor()
0128 {
0129 static initialize_func_t _v = []() {};
0130 return _v;
0131 }
0132
0133 static finalize_func_t& finalization_functor()
0134 {
0135 static finalize_func_t _v = []() {};
0136 return _v;
0137 }
0138
0139 struct Config
0140 {
0141 PTL_DEFAULT_OBJECT(Config)
0142
0143 Config(bool, bool, bool, int, int, size_type, VUserTaskQueue*, affinity_func_t,
0144 initialize_func_t, finalize_func_t);
0145
0146 bool init = true;
0147 bool use_tbb = f_use_tbb();
0148 bool use_affinity = f_use_cpu_affinity();
0149 int verbose = f_verbose();
0150 int priority = f_thread_priority();
0151 size_type pool_size = f_default_pool_size();
0152 VUserTaskQueue* task_queue = nullptr;
0153 affinity_func_t set_affinity = affinity_functor();
0154 initialize_func_t initializer = initialization_functor();
0155 finalize_func_t finalizer = finalization_functor();
0156 };
0157
0158 public:
0159
0160 explicit ThreadPool(const Config&);
0161 ThreadPool(const size_type& pool_size, VUserTaskQueue* task_queue = nullptr,
0162 bool _use_affinity = f_use_cpu_affinity(),
0163 affinity_func_t = affinity_functor(),
0164 initialize_func_t = initialization_functor(),
0165 finalize_func_t = finalization_functor());
0166 ThreadPool(const size_type& pool_size, initialize_func_t, finalize_func_t,
0167 bool _use_affinity = f_use_cpu_affinity(),
0168 affinity_func_t = affinity_functor(),
0169 VUserTaskQueue* task_queue = nullptr);
0170 virtual ~ThreadPool();
0171 ThreadPool(const ThreadPool&) = delete;
0172 ThreadPool(ThreadPool&&) = default;
0173 ThreadPool& operator=(const ThreadPool&) = delete;
0174 ThreadPool& operator=(ThreadPool&&) = default;
0175
0176 public:
0177
0178 size_type initialize_threadpool(size_type);
0179 size_type destroy_threadpool();
0180 size_type stop_thread();
0181
0182 template <typename FuncT>
0183 void execute_on_all_threads(FuncT&& _func);
0184
0185 template <typename FuncT>
0186 void execute_on_specific_threads(const std::set<std::thread::id>& _tid,
0187 FuncT&& _func);
0188
0189 task_queue_t* get_queue() const { return m_task_queue; }
0190 task_queue_t*& get_valid_queue(task_queue_t*&) const;
0191
0192 bool is_tbb_threadpool() const { return m_tbb_tp; }
0193
0194 public:
0195
0196 static bool using_tbb();
0197
0198 static void set_use_tbb(bool _v);
0199
0200
0201 static void set_default_use_tbb(bool _v) { set_use_tbb(_v); }
0202
0203 static void set_default_use_cpu_affinity(bool _v);
0204
0205 static void set_default_scheduling_priority(int _v) { f_thread_priority() = _v; }
0206
0207 static void set_default_verbose(int _v) { f_verbose() = _v; }
0208
0209 static void set_default_size(size_type _v) { f_default_pool_size() = _v; }
0210
0211
0212 static bool get_default_use_tbb() { return f_use_tbb(); }
0213
0214 static bool get_default_use_cpu_affinity() { return f_use_cpu_affinity(); }
0215
0216 static int get_default_scheduling_priority() { return f_thread_priority(); }
0217
0218 static int get_default_verbose() { return f_verbose(); }
0219
0220 static size_type get_default_size() { return f_default_pool_size(); }
0221
0222 public:
0223
0224 size_type add_task(task_pointer&& task, int bin = -1);
0225
0226
0227 template <typename ListT>
0228 size_type add_tasks(ListT&);
0229
0230 Thread* get_thread(size_type _n) const;
0231 Thread* get_thread(std::thread::id id) const;
0232
0233
0234 static tbb_global_control_t*& tbb_global_control();
0235
0236 void set_initialization(initialize_func_t f) { m_init_func = std::move(f); }
0237 void set_finalization(finalize_func_t f) { m_fini_func = std::move(f); }
0238
0239 void reset_initialization()
0240 {
0241 m_init_func = []() {};
0242 }
0243 void reset_finalization()
0244 {
0245 m_fini_func = []() {};
0246 }
0247
0248 public:
0249
0250 const pool_state_type& state() const { return m_pool_state; }
0251
0252 size_type size() const { return m_pool_size; }
0253
0254 void resize(size_type _n);
0255
0256 bool using_affinity() const { return m_use_affinity; }
0257 bool is_alive() { return m_alive_flag->load(); }
0258 void notify();
0259 void notify_all();
0260 void notify(size_type);
0261 bool is_initialized() const;
0262 int get_active_threads_count() const { return (int)m_thread_awake->load(); }
0263
0264 void set_affinity(affinity_func_t f) { m_affinity_func = std::move(f); }
0265 void set_affinity(intmax_t i, Thread&) const;
0266 void set_priority(int _prio, Thread&) const;
0267
0268 void set_verbose(int n) { m_verbose = n; }
0269 int get_verbose() const { return m_verbose; }
0270 bool is_main() const { return ThisThread::get_id() == m_main_tid; }
0271
0272 tbb_task_arena_t* get_task_arena();
0273
0274 public:
0275
0276 static const thread_id_map_t& get_thread_ids();
0277 static uintmax_t get_thread_id(ThreadId);
0278 static uintmax_t get_this_thread_id();
0279 static uintmax_t add_thread_id(ThreadId = ThisThread::get_id());
0280
0281 protected:
0282 void execute_thread(VUserTaskQueue*);
0283 int insert(task_pointer&&, int = -1);
0284 int run_on_this(task_pointer&&);
0285
0286 protected:
0287
0288 static void start_thread(ThreadPool*, thread_data_t*, intmax_t = -1);
0289
0290 void record_entry();
0291 void record_exit();
0292
0293 private:
0294
0295
0296 bool m_use_affinity = false;
0297 bool m_tbb_tp = false;
0298 bool m_delete_task_queue = false;
0299 int m_verbose = f_verbose();
0300 int m_priority = f_thread_priority();
0301 size_type m_pool_size = 0;
0302 ThreadId m_main_tid = ThisThread::get_id();
0303 atomic_bool_type m_alive_flag = std::make_shared<std::atomic_bool>(false);
0304 pool_state_type m_pool_state = std::make_shared<std::atomic_short>(0);
0305 atomic_int_type m_thread_awake = std::make_shared<std::atomic_uintmax_t>(0);
0306 atomic_int_type m_thread_active = std::make_shared<std::atomic_uintmax_t>(0);
0307
0308
0309 lock_t m_task_lock = std::make_shared<Mutex>();
0310
0311 condition_t m_task_cond = std::make_shared<Condition>();
0312
0313
0314 bool_list_t m_is_joined = {};
0315 bool_list_t m_is_stopped = {};
0316 thread_list_t m_main_threads = {};
0317 thread_list_t m_stop_threads = {};
0318 thread_vec_t m_threads = {};
0319 thread_data_t m_thread_data = {};
0320
0321
0322 task_queue_t* m_task_queue = nullptr;
0323 tbb_task_arena_t* m_tbb_task_arena = nullptr;
0324 tbb_task_group_t* m_tbb_task_group = nullptr;
0325
0326
0327 initialize_func_t m_init_func = initialization_functor();
0328 finalize_func_t m_fini_func = finalization_functor();
0329 affinity_func_t m_affinity_func = affinity_functor();
0330
0331 private:
0332 static bool& f_use_tbb();
0333 static bool& f_use_cpu_affinity();
0334 static int& f_thread_priority();
0335 static int& f_verbose();
0336 static size_type& f_default_pool_size();
0337 static thread_id_map_t& f_thread_ids();
0338 };
0339
0340
0341 inline void
0342 ThreadPool::notify()
0343 {
0344
0345 if(m_thread_awake->load() < m_pool_size)
0346 {
0347 AutoLock l(*m_task_lock);
0348 m_task_cond->notify_one();
0349 }
0350 }
0351
0352 inline void
0353 ThreadPool::notify_all()
0354 {
0355
0356 AutoLock l(*m_task_lock);
0357 m_task_cond->notify_all();
0358 }
0359
0360 inline void
0361 ThreadPool::notify(size_type ntasks)
0362 {
0363 if(ntasks == 0)
0364 return;
0365
0366
0367 if(m_thread_awake->load() < m_pool_size)
0368 {
0369 AutoLock l(*m_task_lock);
0370 if(ntasks < this->size())
0371 {
0372 for(size_type i = 0; i < ntasks; ++i)
0373 m_task_cond->notify_one();
0374 }
0375 else
0376 {
0377 m_task_cond->notify_all();
0378 }
0379 }
0380 }
0381
0382
0383 inline tbb_global_control_t*&
0384 ThreadPool::tbb_global_control()
0385 {
0386 static thread_local tbb_global_control_t* _instance = nullptr;
0387 return _instance;
0388 }
0389
0390
0391 inline tbb_task_arena_t*
0392 ThreadPool::get_task_arena()
0393 {
0394 #if defined(PTL_USE_TBB)
0395
0396 if(!m_tbb_task_arena)
0397 {
0398 auto _sz = (tbb_global_control())
0399 ? tbb_global_control()->active_value(
0400 tbb::global_control::max_allowed_parallelism)
0401 : size();
0402 m_tbb_task_arena = new tbb_task_arena_t(::tbb::task_arena::attach{});
0403 m_tbb_task_arena->initialize(_sz, 1);
0404 }
0405 #else
0406 if(!m_tbb_task_arena)
0407 m_tbb_task_arena = new tbb_task_arena_t{};
0408 #endif
0409 return m_tbb_task_arena;
0410 }
0411
0412 inline void
0413 ThreadPool::resize(size_type _n)
0414 {
0415 initialize_threadpool(_n);
0416 if(m_task_queue)
0417 m_task_queue->resize(static_cast<intmax_t>(_n));
0418 }
0419
0420 inline int
0421 ThreadPool::run_on_this(task_pointer&& _task)
0422 {
0423 auto&& _func = [_task]() { (*_task)(); };
0424
0425 if(m_tbb_tp && m_tbb_task_group)
0426 {
0427 auto* _arena = get_task_arena();
0428 _arena->execute([this, _func]() { this->m_tbb_task_group->run(_func); });
0429 }
0430 else
0431 {
0432 _func();
0433 }
0434
0435 return 0;
0436 }
0437
0438 inline int
0439 ThreadPool::insert(task_pointer&& task, int bin)
0440 {
0441 static thread_local ThreadData* _data = ThreadData::GetInstance();
0442
0443
0444 auto ibin = get_valid_queue(m_task_queue)->InsertTask(std::move(task), _data, bin);
0445 notify();
0446 return (int)ibin;
0447 }
0448
0449 inline ThreadPool::size_type
0450 ThreadPool::add_task(task_pointer&& task, int bin)
0451 {
0452
0453 if(m_tbb_tp || !task->is_native_task() || !m_alive_flag->load())
0454 return static_cast<size_type>(run_on_this(std::move(task)));
0455
0456 return static_cast<size_type>(insert(std::move(task), bin));
0457 }
0458
0459 template <typename ListT>
0460 inline ThreadPool::size_type
0461 ThreadPool::add_tasks(ListT& c)
0462 {
0463 if(!m_alive_flag)
0464 {
0465 for(auto& itr : c)
0466 run(itr);
0467 c.clear();
0468 return 0;
0469 }
0470
0471
0472 auto c_size = c.size();
0473 for(auto& itr : c)
0474 {
0475 if(!itr->is_native_task())
0476 --c_size;
0477 else
0478 {
0479
0480 get_valid_queue(m_task_queue)->InsertTask(itr);
0481 }
0482 }
0483 c.clear();
0484
0485
0486 notify(c_size);
0487
0488 return c_size;
0489 }
0490
0491 template <typename FuncT>
0492 inline void
0493 ThreadPool::execute_on_all_threads(FuncT&& _func)
0494 {
0495 if(m_tbb_tp && m_tbb_task_group)
0496 {
0497 #if defined(PTL_USE_TBB)
0498
0499
0500
0501
0502 std::set<std::thread::id> _first{};
0503 Mutex _mutex{};
0504
0505 auto _init = [&]() {
0506 int _once = 0;
0507 _mutex.lock();
0508 if(_first.find(std::this_thread::get_id()) == _first.end())
0509 {
0510
0511
0512 _once = 1;
0513 _first.insert(std::this_thread::get_id());
0514 }
0515 _mutex.unlock();
0516 if(_once != 0)
0517 {
0518 _func();
0519 return 1;
0520 }
0521 return 0;
0522 };
0523
0524
0525 std::atomic<size_t> _total_init{ 0 };
0526
0527 size_t _maxp = tbb_global_control()->active_value(
0528 tbb::global_control::max_allowed_parallelism);
0529
0530 auto* _arena = get_task_arena();
0531
0532 size_t _sz = size();
0533
0534 size_t _ncore = Threading::GetNumberOfCores();
0535
0536 size_t _dmax = std::max<size_t>(_ncore, 8);
0537
0538 size_t _num = std::min(_maxp, std::min(_sz, _ncore));
0539
0540 std::function<void()> _init_task;
0541 _init_task = [&]() {
0542 add_thread_id();
0543 static thread_local size_type _depth = 0;
0544 int _ret = 0;
0545
0546 if(!is_main())
0547 {
0548
0549 _ret = _init();
0550
0551 _total_init += _ret;
0552 }
0553
0554
0555 ++_depth;
0556 if(_ret == 0 && _depth < _dmax && _total_init.load() < _num)
0557 {
0558 tbb::task_group tg{};
0559 tg.run([&]() { _init_task(); });
0560 tg.run([&]() { _init_task(); });
0561 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
0562 tg.wait();
0563 }
0564 --_depth;
0565 };
0566
0567
0568 size_t nitr = 0;
0569 auto _fname = __FUNCTION__;
0570 auto _write_info = [&]() {
0571 std::cout << "[" << _fname << "]> Total initialized: " << _total_init
0572 << ", expected: " << _num << ", max-parallel: " << _maxp
0573 << ", size: " << _sz << ", ncore: " << _ncore << std::endl;
0574 };
0575 while(_total_init < _num)
0576 {
0577 auto _n = 2 * _num;
0578 while(--_n > 0)
0579 {
0580 _arena->execute(
0581 [&]() { m_tbb_task_group->run([&]() { _init_task(); }); });
0582 }
0583 _arena->execute([&]() { m_tbb_task_group->wait(); });
0584
0585 if(nitr++ > 2 * (_num + 1) && (_total_init - 1) == _num)
0586 {
0587 _write_info();
0588 break;
0589 }
0590
0591 if(nitr > 4 * (_ncore + 1))
0592 {
0593 _write_info();
0594 break;
0595 }
0596 }
0597 if(get_verbose() > 3)
0598 _write_info();
0599 #endif
0600 }
0601 else if(get_queue())
0602 {
0603 get_queue()->ExecuteOnAllThreads(this, std::forward<FuncT>(_func));
0604 }
0605 }
0606
0607
0608
0609 template <typename FuncT>
0610 inline void
0611 ThreadPool::execute_on_specific_threads(const std::set<std::thread::id>& _tids,
0612 FuncT&& _func)
0613 {
0614 if(m_tbb_tp && m_tbb_task_group)
0615 {
0616 #if defined(PTL_USE_TBB)
0617
0618
0619
0620
0621 std::set<std::thread::id> _first{};
0622 Mutex _mutex{};
0623
0624 auto _exec = [&]() {
0625 int _once = 0;
0626 _mutex.lock();
0627 if(_first.find(std::this_thread::get_id()) == _first.end())
0628 {
0629
0630
0631 _once = 1;
0632 _first.insert(std::this_thread::get_id());
0633 }
0634 _mutex.unlock();
0635 if(_once != 0)
0636 {
0637 _func();
0638 return 1;
0639 }
0640 return 0;
0641 };
0642
0643
0644 std::atomic<size_t> _total_exec{ 0 };
0645
0646 size_t _ncore = Threading::GetNumberOfCores();
0647
0648 size_t _dmax = std::max<size_t>(_ncore, 8);
0649
0650 size_t _num = _tids.size();
0651
0652 auto* _arena = get_task_arena();
0653
0654 std::function<void()> _exec_task;
0655 _exec_task = [&]() {
0656 add_thread_id();
0657 static thread_local size_type _depth = 0;
0658 int _ret = 0;
0659 auto _this_tid = std::this_thread::get_id();
0660
0661 if(_tids.count(_this_tid) > 0)
0662 {
0663
0664 _ret = _exec();
0665
0666 _total_exec += _ret;
0667 }
0668
0669
0670 ++_depth;
0671 if(_ret == 0 && _depth < _dmax && _total_exec.load() < _num)
0672 {
0673 tbb::task_group tg{};
0674 tg.run([&]() { _exec_task(); });
0675 tg.run([&]() { _exec_task(); });
0676 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
0677 tg.wait();
0678 }
0679 --_depth;
0680 };
0681
0682
0683 size_t nitr = 0;
0684 auto _fname = __FUNCTION__;
0685 auto _write_info = [&]() {
0686 std::cout << "[" << _fname << "]> Total executed: " << _total_exec
0687 << ", expected: " << _num << ", size: " << size() << std::endl;
0688 };
0689 while(_total_exec < _num)
0690 {
0691 auto _n = 2 * _num;
0692 while(--_n > 0)
0693 {
0694 _arena->execute(
0695 [&]() { m_tbb_task_group->run([&]() { _exec_task(); }); });
0696 }
0697 _arena->execute([&]() { m_tbb_task_group->wait(); });
0698
0699 if(nitr++ > 2 * (_num + 1) && (_total_exec - 1) == _num)
0700 {
0701 _write_info();
0702 break;
0703 }
0704
0705 if(nitr > 8 * (_num + 1))
0706 {
0707 _write_info();
0708 break;
0709 }
0710 }
0711 if(get_verbose() > 3)
0712 _write_info();
0713 #endif
0714 }
0715 else if(get_queue())
0716 {
0717 get_queue()->ExecuteOnSpecificThreads(_tids, this, std::forward<FuncT>(_func));
0718 }
0719 }
0720
0721
0722
0723 }