File indexing completed on 2025-10-25 08:44:29
0001 
0002 
0003 
0004 
0005 
0006 
0007 
0008 
0009 
0010 
0011 
0012 
0013 
0014 
0015 
0016 
0017 
0018 
0019 
0020 
0021 
0022 
0023 
0024 
0025 
0026 
0027 
0028 
0029 
0030 
0031 #pragma once
0032 
0033 #include "PTL/AutoLock.hh"
0034 #ifndef G4GMAKE
0035 #include "PTL/Config.hh"
0036 #endif
0037 #include "PTL/ThreadData.hh"
0038 #include "PTL/Threading.hh"
0039 #include "PTL/Types.hh"
0040 #include "PTL/VTask.hh"
0041 #include "PTL/VUserTaskQueue.hh"
0042 
0043 #if defined(PTL_USE_TBB)
0044 #    if !defined(TBB_SUPPRESS_DEPRECATED_MESSAGES)
0045 #        define TBB_SUPPRESS_DEPRECATED_MESSAGES 1
0046 #    endif
0047 #    if !defined(TBB_PREVIEW_GLOBAL_CONTROL)
0048 #        define TBB_PREVIEW_GLOBAL_CONTROL 1
0049 #    endif
0050 #    include <tbb/global_control.h>
0051 #    include <tbb/task_arena.h>
0052 #    include <tbb/task_group.h>
0053 #endif
0054 
0055 #include <algorithm>
0056 #include <atomic>
0057 #include <chrono>
0058 #include <cstdint>
0059 #include <cstdlib>
0060 #include <deque>
0061 #include <functional>
0062 #include <iostream>
0063 #include <map>
0064 #include <memory>
0065 #include <mutex>  // IWYU pragma: keep
0066 #include <set>
0067 #include <thread>
0068 #include <type_traits>  // IWYU pragma: keep
0069 #include <unordered_map>
0070 #include <utility>
0071 #include <vector>
0072 
0073 namespace PTL
0074 {
0075 namespace thread_pool
0076 {
0077 namespace state
0078 {
0079 static const short STARTED = 0;
0080 static const short PARTIAL = 1;
0081 static const short STOPPED = 2;
0082 static const short NONINIT = 3;
0083 
0084 }  
0085 }  
0086 
0087 class ThreadPool
0088 {
0089 public:
0090     template <typename KeyT, typename MappedT, typename HashT = KeyT>
0091     using uomap = std::unordered_map<KeyT, MappedT, std::hash<HashT>>;
0092 
0093     
0094     using size_type        = size_t;
0095     using task_count_type  = std::shared_ptr<std::atomic_uintmax_t>;
0096     using atomic_int_type  = std::shared_ptr<std::atomic_uintmax_t>;
0097     using pool_state_type  = std::shared_ptr<std::atomic_short>;
0098     using atomic_bool_type = std::shared_ptr<std::atomic_bool>;
0099     
0100     using task_type    = VTask;
0101     using lock_t       = std::shared_ptr<Mutex>;
0102     using condition_t  = std::shared_ptr<Condition>;
0103     using task_pointer = std::shared_ptr<task_type>;
0104     using task_queue_t = VUserTaskQueue;
0105     
0106     using thread_list_t      = std::deque<ThreadId>;
0107     using bool_list_t        = std::vector<bool>;
0108     using thread_id_map_t    = std::map<ThreadId, uintmax_t>;
0109     using thread_index_map_t = std::map<uintmax_t, ThreadId>;
0110     using thread_vec_t       = std::vector<Thread>;
0111     using thread_data_t      = std::vector<std::shared_ptr<ThreadData>>;
0112     
0113     using initialize_func_t = std::function<void()>;
0114     using finalize_func_t   = std::function<void()>;
0115     using affinity_func_t   = std::function<intmax_t(intmax_t)>;
0116 
0117     static affinity_func_t& affinity_functor()
0118     {
0119         static affinity_func_t _v = [](intmax_t) {
0120             static std::atomic<intmax_t> assigned;
0121             intmax_t                     _assign = assigned++;
0122             return _assign % Thread::hardware_concurrency();
0123         };
0124         return _v;
0125     }
0126 
0127     static initialize_func_t& initialization_functor()
0128     {
0129         static initialize_func_t _v = []() {};
0130         return _v;
0131     }
0132 
0133     static finalize_func_t& finalization_functor()
0134     {
0135         static finalize_func_t _v = []() {};
0136         return _v;
0137     }
0138 
0139     struct Config
0140     {
0141         bool              init         = true;
0142         bool              use_tbb      = false;
0143         bool              use_affinity = false;
0144         int               verbose      = 0;
0145         int               priority     = 0;
0146         size_type         pool_size    = f_default_pool_size();
0147         VUserTaskQueue*   task_queue   = nullptr;
0148         affinity_func_t   set_affinity = affinity_functor();
0149         initialize_func_t initializer  = initialization_functor();
0150         finalize_func_t   finalizer    = finalization_functor();
0151     };
0152 
0153 public:
0154     
0155     explicit ThreadPool(const Config&);
0156     ~ThreadPool();
0157     ThreadPool(const ThreadPool&) = delete;
0158     ThreadPool(ThreadPool&&)      = default;
0159     ThreadPool& operator=(const ThreadPool&) = delete;
0160     ThreadPool& operator=(ThreadPool&&) = default;
0161 
0162 public:
0163     
0164     size_type initialize_threadpool(size_type);  
0165     size_type destroy_threadpool();              
0166     size_type stop_thread();
0167 
0168     template <typename FuncT>
0169     void execute_on_all_threads(FuncT&& _func);
0170 
0171     template <typename FuncT>
0172     void execute_on_specific_threads(const std::set<std::thread::id>& _tid,
0173                                      FuncT&&                          _func);
0174 
0175     task_queue_t*  get_queue() const { return m_task_queue; }
0176     task_queue_t*& get_valid_queue(task_queue_t*&) const;
0177 
0178     bool is_tbb_threadpool() const { return m_tbb_tp; }
0179 
0180 public:
0181     
0182     static void set_default_size(size_type _v) { f_default_pool_size() = _v; }
0183 
0184     
0185     static size_type get_default_size() { return f_default_pool_size(); }
0186 
0187 public:
0188     
0189     size_type add_task(task_pointer&& task, int bin = -1);
0190     
0191     
0192     template <typename ListT>
0193     size_type add_tasks(ListT&);
0194 
0195     Thread* get_thread(size_type _n) const;
0196     Thread* get_thread(std::thread::id id) const;
0197 
0198     
0199     static tbb_global_control_t*& tbb_global_control();
0200 
0201     void set_initialization(initialize_func_t f) { m_init_func = std::move(f); }
0202     void set_finalization(finalize_func_t f) { m_fini_func = std::move(f); }
0203 
0204     void reset_initialization()
0205     {
0206         m_init_func = []() {};
0207     }
0208     void reset_finalization()
0209     {
0210         m_fini_func = []() {};
0211     }
0212 
0213 public:
0214     
0215     const pool_state_type& state() const { return m_pool_state; }
0216     
0217     size_type size() const { return m_pool_size; }
0218     
0219     void resize(size_type _n);
0220     
0221     bool using_affinity() const { return m_use_affinity; }
0222     bool is_alive() { return m_alive_flag->load(); }
0223     void notify();
0224     void notify_all();
0225     void notify(size_type);
0226     bool is_initialized() const;
0227     int  get_active_threads_count() const { return (int)m_thread_awake->load(); }
0228 
0229     void set_affinity(affinity_func_t f) { m_affinity_func = std::move(f); }
0230     void set_affinity(intmax_t i, Thread&) const;
0231     void set_priority(int _prio, Thread&) const;
0232 
0233     void set_verbose(int n) { m_verbose = n; }
0234     int  get_verbose() const { return m_verbose; }
0235     bool is_main() const { return ThisThread::get_id() == m_main_tid; }
0236 
0237     tbb_task_arena_t* get_task_arena();
0238 
0239 public:
0240     
0241     static const thread_id_map_t& get_thread_ids();
0242     static uintmax_t              get_thread_id(ThreadId);
0243     static uintmax_t              get_this_thread_id();
0244     static uintmax_t              add_thread_id(ThreadId = ThisThread::get_id());
0245 
0246 private:
0247     void execute_thread(VUserTaskQueue*);  
0248     int  insert(task_pointer&&, int = -1);
0249     int  run_on_this(task_pointer&&);
0250 
0251 private:
0252     
0253     static void start_thread(ThreadPool*, thread_data_t*, intmax_t = -1);
0254 
0255     void record_entry();
0256     void record_exit();
0257 
0258 private:
0259     
0260     
0261     bool             m_use_affinity      = false;
0262     bool             m_tbb_tp            = false;
0263     bool             m_delete_task_queue = false;
0264     int              m_verbose           = 0;
0265     int              m_priority          = 0;
0266     size_type        m_pool_size         = 0;
0267     ThreadId         m_main_tid          = ThisThread::get_id();
0268     atomic_bool_type m_alive_flag        = std::make_shared<std::atomic_bool>(false);
0269     pool_state_type  m_pool_state        = std::make_shared<std::atomic_short>(0);
0270     atomic_int_type  m_thread_awake      = std::make_shared<std::atomic_uintmax_t>(0);
0271     atomic_int_type  m_thread_active     = std::make_shared<std::atomic_uintmax_t>(0);
0272 
0273     
0274     lock_t m_task_lock = std::make_shared<Mutex>();
0275     
0276     condition_t m_task_cond = std::make_shared<Condition>();
0277 
0278     
0279     bool_list_t   m_is_joined    = {};  
0280     bool_list_t   m_is_stopped   = {};  
0281     thread_list_t m_main_threads = {};  
0282     thread_list_t m_stop_threads = {};  
0283     thread_vec_t  m_threads      = {};
0284     thread_data_t m_thread_data  = {};
0285 
0286     
0287     task_queue_t*     m_task_queue     = nullptr;
0288     tbb_task_arena_t* m_tbb_task_arena = nullptr;
0289     tbb_task_group_t* m_tbb_task_group = nullptr;
0290 
0291     
0292     initialize_func_t m_init_func     = initialization_functor();
0293     finalize_func_t   m_fini_func     = finalization_functor();
0294     affinity_func_t   m_affinity_func = affinity_functor();
0295 
0296 private:
0297     static size_type&       f_default_pool_size();
0298     static thread_id_map_t& f_thread_ids();
0299 };
0300 
0301 
0302 inline void
0303 ThreadPool::notify()
0304 {
0305     
0306     if(m_thread_awake->load() < m_pool_size)
0307     {
0308         AutoLock l(*m_task_lock);
0309         m_task_cond->notify_one();
0310     }
0311 }
0312 
0313 inline void
0314 ThreadPool::notify_all()
0315 {
0316     
0317     AutoLock l(*m_task_lock);
0318     m_task_cond->notify_all();
0319 }
0320 
0321 inline void
0322 ThreadPool::notify(size_type ntasks)
0323 {
0324     if(ntasks == 0)
0325         return;
0326 
0327     
0328     if(m_thread_awake->load() < m_pool_size)
0329     {
0330         AutoLock l(*m_task_lock);
0331         if(ntasks < this->size())
0332         {
0333             for(size_type i = 0; i < ntasks; ++i)
0334                 m_task_cond->notify_one();
0335         }
0336         else
0337         {
0338             m_task_cond->notify_all();
0339         }
0340     }
0341 }
0342 
0343 
0344 inline tbb_global_control_t*&
0345 ThreadPool::tbb_global_control()
0346 {
0347     static thread_local tbb_global_control_t* _instance = nullptr;
0348     return _instance;
0349 }
0350 
0351 
0352 inline tbb_task_arena_t*
0353 ThreadPool::get_task_arena()
0354 {
0355 #if defined(PTL_USE_TBB)
0356     
0357     if(!m_tbb_task_arena)
0358     {
0359         auto _sz = (tbb_global_control())
0360                        ? tbb_global_control()->active_value(
0361                              tbb::global_control::max_allowed_parallelism)
0362                        : size();
0363         m_tbb_task_arena = new tbb_task_arena_t(::tbb::task_arena::attach{});
0364         m_tbb_task_arena->initialize(_sz, 1);
0365     }
0366 #else
0367     if(!m_tbb_task_arena)
0368         m_tbb_task_arena = new tbb_task_arena_t{};
0369 #endif
0370     return m_tbb_task_arena;
0371 }
0372 
0373 inline void
0374 ThreadPool::resize(size_type _n)
0375 {
0376     initialize_threadpool(_n);
0377     if(m_task_queue)
0378         m_task_queue->resize(static_cast<intmax_t>(_n));
0379 }
0380 
0381 inline int
0382 ThreadPool::run_on_this(task_pointer&& _task)
0383 {
0384     auto&& _func = [_task]() { (*_task)(); };
0385 
0386     if(m_tbb_tp && m_tbb_task_group)
0387     {
0388         auto* _arena = get_task_arena();
0389         _arena->execute([this, _func]() { this->m_tbb_task_group->run(_func); });
0390     }
0391     else
0392     {
0393         _func();
0394     }
0395     
0396     return 0;
0397 }
0398 
0399 inline int
0400 ThreadPool::insert(task_pointer&& task, int bin)
0401 {
0402     static thread_local ThreadData* _data = ThreadData::GetInstance();
0403 
0404     
0405     auto ibin = get_valid_queue(m_task_queue)->InsertTask(std::move(task), _data, bin);
0406     notify();
0407     return (int)ibin;
0408 }
0409 
0410 inline ThreadPool::size_type
0411 ThreadPool::add_task(task_pointer&& task, int bin)
0412 {
0413     
0414     if(m_tbb_tp || !task->is_native_task() || !m_alive_flag->load())
0415         return static_cast<size_type>(run_on_this(std::move(task)));
0416 
0417     return static_cast<size_type>(insert(std::move(task), bin));
0418 }
0419 
0420 template <typename ListT>
0421 inline ThreadPool::size_type
0422 ThreadPool::add_tasks(ListT& c)
0423 {
0424     if(!m_alive_flag)  
0425     {
0426         for(auto& itr : c)
0427             run(itr);
0428         c.clear();
0429         return 0;
0430     }
0431 
0432     
0433     auto c_size = c.size();
0434     for(auto& itr : c)
0435     {
0436         if(!itr->is_native_task())
0437             --c_size;
0438         else
0439         {
0440             
0441             get_valid_queue(m_task_queue)->InsertTask(itr);
0442         }
0443     }
0444     c.clear();
0445 
0446     
0447     notify(c_size);
0448 
0449     return c_size;
0450 }
0451 
0452 template <typename FuncT>
0453 inline void
0454 ThreadPool::execute_on_all_threads(FuncT&& _func)
0455 {
0456     if(m_tbb_tp && m_tbb_task_group)
0457     {
0458 #if defined(PTL_USE_TBB)
0459         
0460         
0461         
0462         
0463         std::set<std::thread::id> _first{};
0464         Mutex                     _mutex{};
0465         
0466         auto _init = [&]() {
0467             int _once = 0;
0468             _mutex.lock();
0469             if(_first.find(std::this_thread::get_id()) == _first.end())
0470             {
0471                 
0472                 
0473                 _once = 1;
0474                 _first.insert(std::this_thread::get_id());
0475             }
0476             _mutex.unlock();
0477             if(_once != 0)
0478             {
0479                 _func();
0480                 return 1;
0481             }
0482             return 0;
0483         };
0484         
0485         
0486         std::atomic<size_t> _total_init{ 0 };
0487         
0488         size_t _maxp = tbb_global_control()->active_value(
0489             tbb::global_control::max_allowed_parallelism);
0490         
0491         auto* _arena = get_task_arena();
0492         
0493         size_t _sz = size();
0494         
0495         size_t _ncore = GetNumberOfCores();
0496         
0497         size_t _dmax = std::max<size_t>(_ncore, 8);
0498         
0499         size_t _num = std::min(_maxp, std::min(_sz, _ncore));
0500         
0501         std::function<void()> _init_task;
0502         _init_task = [&]() {
0503             add_thread_id();
0504             static thread_local size_type _depth = 0;
0505             int                           _ret   = 0;
0506             
0507             if(!is_main())
0508             {
0509                 
0510                 _ret = _init();
0511                 
0512                 _total_init += _ret;
0513             }
0514             
0515             
0516             ++_depth;
0517             if(_ret == 0 && _depth < _dmax && _total_init.load() < _num)
0518             {
0519                 tbb::task_group tg{};
0520                 tg.run([&]() { _init_task(); });
0521                 tg.run([&]() { _init_task(); });
0522                 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
0523                 tg.wait();
0524             }
0525             --_depth;
0526         };
0527 
0528         
0529         size_t nitr        = 0;
0530         auto   _fname      = __FUNCTION__;
0531         auto   _write_info = [&]() {
0532             std::cout << "[" << _fname << "]> Total initialized: " << _total_init
0533                       << ", expected: " << _num << ", max-parallel: " << _maxp
0534                       << ", size: " << _sz << ", ncore: " << _ncore << std::endl;
0535         };
0536         while(_total_init < _num)
0537         {
0538             auto _n = 2 * _num;
0539             while(--_n > 0)
0540             {
0541                 _arena->execute(
0542                     [&]() { m_tbb_task_group->run([&]() { _init_task(); }); });
0543             }
0544             _arena->execute([&]() { m_tbb_task_group->wait(); });
0545             
0546             if(nitr++ > 2 * (_num + 1) && (_total_init - 1) == _num)
0547             {
0548                 _write_info();
0549                 break;
0550             }
0551             
0552             if(nitr > 4 * (_ncore + 1))
0553             {
0554                 _write_info();
0555                 break;
0556             }
0557         }
0558         if(get_verbose() > 3)
0559             _write_info();
0560 #endif
0561     }
0562     else if(get_queue())
0563     {
0564         get_queue()->ExecuteOnAllThreads(this, std::forward<FuncT>(_func));
0565     }
0566 }
0567 
0568 
0569 
0570 template <typename FuncT>
0571 inline void
0572 ThreadPool::execute_on_specific_threads(const std::set<std::thread::id>& _tids,
0573                                         FuncT&&                          _func)
0574 {
0575     if(m_tbb_tp && m_tbb_task_group)
0576     {
0577 #if defined(PTL_USE_TBB)
0578         
0579         
0580         
0581         
0582         std::set<std::thread::id> _first{};
0583         Mutex                     _mutex{};
0584         
0585         auto _exec = [&]() {
0586             int _once = 0;
0587             _mutex.lock();
0588             if(_first.find(std::this_thread::get_id()) == _first.end())
0589             {
0590                 
0591                 
0592                 _once = 1;
0593                 _first.insert(std::this_thread::get_id());
0594             }
0595             _mutex.unlock();
0596             if(_once != 0)
0597             {
0598                 _func();
0599                 return 1;
0600             }
0601             return 0;
0602         };
0603         
0604         
0605         std::atomic<size_t> _total_exec{ 0 };
0606         
0607         size_t _ncore = GetNumberOfCores();
0608         
0609         size_t _dmax = std::max<size_t>(_ncore, 8);
0610         
0611         size_t _num = _tids.size();
0612         
0613         auto* _arena = get_task_arena();
0614         
0615         std::function<void()> _exec_task;
0616         _exec_task = [&]() {
0617             add_thread_id();
0618             static thread_local size_type _depth    = 0;
0619             int                           _ret      = 0;
0620             auto                          _this_tid = std::this_thread::get_id();
0621             
0622             if(_tids.count(_this_tid) > 0)
0623             {
0624                 
0625                 _ret = _exec();
0626                 
0627                 _total_exec += _ret;
0628             }
0629             
0630             
0631             ++_depth;
0632             if(_ret == 0 && _depth < _dmax && _total_exec.load() < _num)
0633             {
0634                 tbb::task_group tg{};
0635                 tg.run([&]() { _exec_task(); });
0636                 tg.run([&]() { _exec_task(); });
0637                 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
0638                 tg.wait();
0639             }
0640             --_depth;
0641         };
0642 
0643         
0644         size_t nitr        = 0;
0645         auto   _fname      = __FUNCTION__;
0646         auto   _write_info = [&]() {
0647             std::cout << "[" << _fname << "]> Total executed: " << _total_exec
0648                       << ", expected: " << _num << ", size: " << size() << std::endl;
0649         };
0650         while(_total_exec < _num)
0651         {
0652             auto _n = 2 * _num;
0653             while(--_n > 0)
0654             {
0655                 _arena->execute(
0656                     [&]() { m_tbb_task_group->run([&]() { _exec_task(); }); });
0657             }
0658             _arena->execute([&]() { m_tbb_task_group->wait(); });
0659             
0660             if(nitr++ > 2 * (_num + 1) && (_total_exec - 1) == _num)
0661             {
0662                 _write_info();
0663                 break;
0664             }
0665             
0666             if(nitr > 8 * (_num + 1))
0667             {
0668                 _write_info();
0669                 break;
0670             }
0671         }
0672         if(get_verbose() > 3)
0673             _write_info();
0674 #endif
0675     }
0676     else if(get_queue())
0677     {
0678         get_queue()->ExecuteOnSpecificThreads(_tids, this, std::forward<FuncT>(_func));
0679     }
0680 }
0681 
0682 
0683 
0684 }