Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2024-11-15 09:38:49

0001 //
0002 // MIT License
0003 // Copyright (c) 2020 Jonathan R. Madsen
0004 // Permission is hereby granted, free of charge, to any person obtaining a copy
0005 // of this software and associated documentation files (the "Software"), to deal
0006 // in the Software without restriction, including without limitation the rights
0007 // to use, copy, modify, merge, publish, distribute, sublicense, and
0008 // copies of the Software, and to permit persons to whom the Software is
0009 // furnished to do so, subject to the following conditions:
0010 // The above copyright notice and this permission notice shall be included in
0011 // all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
0012 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
0013 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
0014 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
0015 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
0016 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
0017 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
0018 //
0019 // ---------------------------------------------------------------
0020 // Tasking class header file
0021 //
0022 // Class Description:
0023 //
0024 // This file creates a class for an efficient thread-pool that
0025 // accepts work in the form of tasks.
0026 //
0027 // ---------------------------------------------------------------
0028 // Author: Jonathan Madsen (Feb 13th 2018)
0029 // ---------------------------------------------------------------
0030 
0031 #pragma once
0032 
0033 #include "PTL/AutoLock.hh"
0034 #ifndef G4GMAKE
0035 #include "PTL/Config.hh"
0036 #endif
0037 #include "PTL/ThreadData.hh"
0038 #include "PTL/Threading.hh"
0039 #include "PTL/Types.hh"
0040 #include "PTL/VTask.hh"
0041 #include "PTL/VUserTaskQueue.hh"
0042 
0043 #if defined(PTL_USE_TBB)
0044 #    if !defined(TBB_SUPPRESS_DEPRECATED_MESSAGES)
0045 #        define TBB_SUPPRESS_DEPRECATED_MESSAGES 1
0046 #    endif
0047 #    if !defined(TBB_PREVIEW_GLOBAL_CONTROL)
0048 #        define TBB_PREVIEW_GLOBAL_CONTROL 1
0049 #    endif
0050 #    include <tbb/global_control.h>
0051 #    include <tbb/task_arena.h>
0052 #    include <tbb/task_group.h>
0053 #endif
0054 
0055 #include <algorithm>
0056 #include <atomic>
0057 #include <chrono>
0058 #include <cstdint>
0059 #include <cstdlib>
0060 #include <deque>
0061 #include <functional>
0062 #include <iostream>
0063 #include <map>
0064 #include <memory>
0065 #include <mutex>  // IWYU pragma: keep
0066 #include <set>
0067 #include <thread>
0068 #include <type_traits>  // IWYU pragma: keep
0069 #include <unordered_map>
0070 #include <utility>
0071 #include <vector>
0072 
0073 namespace PTL
0074 {
0075 namespace thread_pool
0076 {
0077 namespace state
0078 {
0079 static const short STARTED = 0;
0080 static const short PARTIAL = 1;
0081 static const short STOPPED = 2;
0082 static const short NONINIT = 3;
0083 
0084 }  // namespace state
0085 }  // namespace thread_pool
0086 
0087 class ThreadPool
0088 {
0089 public:
0090     template <typename KeyT, typename MappedT, typename HashT = KeyT>
0091     using uomap = std::unordered_map<KeyT, MappedT, std::hash<HashT>>;
0092 
0093     // pod-types
0094     using size_type        = size_t;
0095     using task_count_type  = std::shared_ptr<std::atomic_uintmax_t>;
0096     using atomic_int_type  = std::shared_ptr<std::atomic_uintmax_t>;
0097     using pool_state_type  = std::shared_ptr<std::atomic_short>;
0098     using atomic_bool_type = std::shared_ptr<std::atomic_bool>;
0099     // objects
0100     using task_type    = VTask;
0101     using lock_t       = std::shared_ptr<Mutex>;
0102     using condition_t  = std::shared_ptr<Condition>;
0103     using task_pointer = std::shared_ptr<task_type>;
0104     using task_queue_t = VUserTaskQueue;
0105     // containers
0106     using thread_list_t      = std::deque<ThreadId>;
0107     using bool_list_t        = std::vector<bool>;
0108     using thread_id_map_t    = std::map<ThreadId, uintmax_t>;
0109     using thread_index_map_t = std::map<uintmax_t, ThreadId>;
0110     using thread_vec_t       = std::vector<Thread>;
0111     using thread_data_t      = std::vector<std::shared_ptr<ThreadData>>;
0112     // functions
0113     using initialize_func_t = std::function<void()>;
0114     using finalize_func_t   = std::function<void()>;
0115     using affinity_func_t   = std::function<intmax_t(intmax_t)>;
0116 
0117     static affinity_func_t& affinity_functor()
0118     {
0119         static affinity_func_t _v = [](intmax_t) {
0120             static std::atomic<intmax_t> assigned;
0121             intmax_t                     _assign = assigned++;
0122             return _assign % Thread::hardware_concurrency();
0123         };
0124         return _v;
0125     }
0126 
0127     static initialize_func_t& initialization_functor()
0128     {
0129         static initialize_func_t _v = []() {};
0130         return _v;
0131     }
0132 
0133     static finalize_func_t& finalization_functor()
0134     {
0135         static finalize_func_t _v = []() {};
0136         return _v;
0137     }
0138 
0139     struct Config
0140     {
0141         PTL_DEFAULT_OBJECT(Config)
0142 
0143         Config(bool, bool, bool, int, int, size_type, VUserTaskQueue*, affinity_func_t,
0144                initialize_func_t, finalize_func_t);
0145 
0146         bool              init         = true;
0147         bool              use_tbb      = f_use_tbb();
0148         bool              use_affinity = f_use_cpu_affinity();
0149         int               verbose      = f_verbose();
0150         int               priority     = f_thread_priority();
0151         size_type         pool_size    = f_default_pool_size();
0152         VUserTaskQueue*   task_queue   = nullptr;
0153         affinity_func_t   set_affinity = affinity_functor();
0154         initialize_func_t initializer  = initialization_functor();
0155         finalize_func_t   finalizer    = finalization_functor();
0156     };
0157 
0158 public:
0159     // Constructor and Destructors
0160     explicit ThreadPool(const Config&);
0161     ThreadPool(const size_type& pool_size, VUserTaskQueue* task_queue = nullptr,
0162                bool _use_affinity = f_use_cpu_affinity(),
0163                affinity_func_t    = affinity_functor(),
0164                initialize_func_t  = initialization_functor(),
0165                finalize_func_t    = finalization_functor());
0166     ThreadPool(const size_type& pool_size, initialize_func_t, finalize_func_t,
0167                bool             _use_affinity = f_use_cpu_affinity(),
0168                affinity_func_t                = affinity_functor(),
0169                VUserTaskQueue* task_queue     = nullptr);
0170     virtual ~ThreadPool();
0171     ThreadPool(const ThreadPool&) = delete;
0172     ThreadPool(ThreadPool&&)      = default;
0173     ThreadPool& operator=(const ThreadPool&) = delete;
0174     ThreadPool& operator=(ThreadPool&&) = default;
0175 
0176 public:
0177     // Public functions
0178     size_type initialize_threadpool(size_type);  // start the threads
0179     size_type destroy_threadpool();              // destroy the threads
0180     size_type stop_thread();
0181 
0182     template <typename FuncT>
0183     void execute_on_all_threads(FuncT&& _func);
0184 
0185     template <typename FuncT>
0186     void execute_on_specific_threads(const std::set<std::thread::id>& _tid,
0187                                      FuncT&&                          _func);
0188 
0189     task_queue_t*  get_queue() const { return m_task_queue; }
0190     task_queue_t*& get_valid_queue(task_queue_t*&) const;
0191 
0192     bool is_tbb_threadpool() const { return m_tbb_tp; }
0193 
0194 public:
0195     // Public functions related to TBB
0196     static bool using_tbb();
0197     // enable using TBB if available - semi-deprecated
0198     static void set_use_tbb(bool _v);
0199 
0200     /// set the default use of tbb
0201     static void set_default_use_tbb(bool _v) { set_use_tbb(_v); }
0202     /// set the default use of cpu affinity
0203     static void set_default_use_cpu_affinity(bool _v);
0204     /// set the default scheduling priority of threads in thread-pool
0205     static void set_default_scheduling_priority(int _v) { f_thread_priority() = _v; }
0206     /// set the default verbosity
0207     static void set_default_verbose(int _v) { f_verbose() = _v; }
0208     /// set the default pool size
0209     static void set_default_size(size_type _v) { f_default_pool_size() = _v; }
0210 
0211     /// get the default use of tbb
0212     static bool get_default_use_tbb() { return f_use_tbb(); }
0213     /// get the default use of cpu affinity
0214     static bool get_default_use_cpu_affinity() { return f_use_cpu_affinity(); }
0215     /// get the default scheduling priority of threads in thread-pool
0216     static int get_default_scheduling_priority() { return f_thread_priority(); }
0217     /// get the default verbosity
0218     static int get_default_verbose() { return f_verbose(); }
0219     /// get the default pool size
0220     static size_type get_default_size() { return f_default_pool_size(); }
0221 
0222 public:
0223     // add tasks for threads to process
0224     size_type add_task(task_pointer&& task, int bin = -1);
0225     // size_type add_thread_task(ThreadId id, task_pointer&& task);
0226     // add a generic container with iterator
0227     template <typename ListT>
0228     size_type add_tasks(ListT&);
0229 
0230     Thread* get_thread(size_type _n) const;
0231     Thread* get_thread(std::thread::id id) const;
0232 
0233     // only relevant when compiled with PTL_USE_TBB
0234     static tbb_global_control_t*& tbb_global_control();
0235 
0236     void set_initialization(initialize_func_t f) { m_init_func = std::move(f); }
0237     void set_finalization(finalize_func_t f) { m_fini_func = std::move(f); }
0238 
0239     void reset_initialization()
0240     {
0241         m_init_func = []() {};
0242     }
0243     void reset_finalization()
0244     {
0245         m_fini_func = []() {};
0246     }
0247 
0248 public:
0249     // get the pool state
0250     const pool_state_type& state() const { return m_pool_state; }
0251     // see how many main task threads there are
0252     size_type size() const { return m_pool_size; }
0253     // set the thread pool size
0254     void resize(size_type _n);
0255     // affinity assigns threads to cores, assignment at constructor
0256     bool using_affinity() const { return m_use_affinity; }
0257     bool is_alive() { return m_alive_flag->load(); }
0258     void notify();
0259     void notify_all();
0260     void notify(size_type);
0261     bool is_initialized() const;
0262     int  get_active_threads_count() const { return (int)m_thread_awake->load(); }
0263 
0264     void set_affinity(affinity_func_t f) { m_affinity_func = std::move(f); }
0265     void set_affinity(intmax_t i, Thread&) const;
0266     void set_priority(int _prio, Thread&) const;
0267 
0268     void set_verbose(int n) { m_verbose = n; }
0269     int  get_verbose() const { return m_verbose; }
0270     bool is_main() const { return ThisThread::get_id() == m_main_tid; }
0271 
0272     tbb_task_arena_t* get_task_arena();
0273 
0274 public:
0275     // read FORCE_NUM_THREADS environment variable
0276     static const thread_id_map_t& get_thread_ids();
0277     static uintmax_t              get_thread_id(ThreadId);
0278     static uintmax_t              get_this_thread_id();
0279     static uintmax_t              add_thread_id(ThreadId = ThisThread::get_id());
0280 
0281 protected:
0282     void execute_thread(VUserTaskQueue*);  // function thread sits in
0283     int  insert(task_pointer&&, int = -1);
0284     int  run_on_this(task_pointer&&);
0285 
0286 protected:
0287     // called in THREAD INIT
0288     static void start_thread(ThreadPool*, thread_data_t*, intmax_t = -1);
0289 
0290     void record_entry();
0291     void record_exit();
0292 
0293 private:
0294     // Private variables
0295     // random
0296     bool             m_use_affinity      = false;
0297     bool             m_tbb_tp            = false;
0298     bool             m_delete_task_queue = false;
0299     int              m_verbose           = f_verbose();
0300     int              m_priority          = f_thread_priority();
0301     size_type        m_pool_size         = 0;
0302     ThreadId         m_main_tid          = ThisThread::get_id();
0303     atomic_bool_type m_alive_flag        = std::make_shared<std::atomic_bool>(false);
0304     pool_state_type  m_pool_state        = std::make_shared<std::atomic_short>(0);
0305     atomic_int_type  m_thread_awake      = std::make_shared<std::atomic_uintmax_t>(0);
0306     atomic_int_type  m_thread_active     = std::make_shared<std::atomic_uintmax_t>(0);
0307 
0308     // locks
0309     lock_t m_task_lock = std::make_shared<Mutex>();
0310     // conditions
0311     condition_t m_task_cond = std::make_shared<Condition>();
0312 
0313     // containers
0314     bool_list_t   m_is_joined    = {};  // join list
0315     bool_list_t   m_is_stopped   = {};  // lets thread know to stop
0316     thread_list_t m_main_threads = {};  // storage for active threads
0317     thread_list_t m_stop_threads = {};  // storage for stopped threads
0318     thread_vec_t  m_threads      = {};
0319     thread_data_t m_thread_data  = {};
0320 
0321     // task queue
0322     task_queue_t*     m_task_queue     = nullptr;
0323     tbb_task_arena_t* m_tbb_task_arena = nullptr;
0324     tbb_task_group_t* m_tbb_task_group = nullptr;
0325 
0326     // functions
0327     initialize_func_t m_init_func     = initialization_functor();
0328     finalize_func_t   m_fini_func     = finalization_functor();
0329     affinity_func_t   m_affinity_func = affinity_functor();
0330 
0331 private:
0332     static bool&            f_use_tbb();
0333     static bool&            f_use_cpu_affinity();
0334     static int&             f_thread_priority();
0335     static int&             f_verbose();
0336     static size_type&       f_default_pool_size();
0337     static thread_id_map_t& f_thread_ids();
0338 };
0339 
0340 //--------------------------------------------------------------------------------------//
0341 inline void
0342 ThreadPool::notify()
0343 {
0344     // wake up one thread that is waiting for a task to be available
0345     if(m_thread_awake->load() < m_pool_size)
0346     {
0347         AutoLock l(*m_task_lock);
0348         m_task_cond->notify_one();
0349     }
0350 }
0351 //--------------------------------------------------------------------------------------//
0352 inline void
0353 ThreadPool::notify_all()
0354 {
0355     // wake all threads
0356     AutoLock l(*m_task_lock);
0357     m_task_cond->notify_all();
0358 }
0359 //--------------------------------------------------------------------------------------//
0360 inline void
0361 ThreadPool::notify(size_type ntasks)
0362 {
0363     if(ntasks == 0)
0364         return;
0365 
0366     // wake up as many threads that tasks just added
0367     if(m_thread_awake->load() < m_pool_size)
0368     {
0369         AutoLock l(*m_task_lock);
0370         if(ntasks < this->size())
0371         {
0372             for(size_type i = 0; i < ntasks; ++i)
0373                 m_task_cond->notify_one();
0374         }
0375         else
0376         {
0377             m_task_cond->notify_all();
0378         }
0379     }
0380 }
0381 //--------------------------------------------------------------------------------------//
0382 // local function for getting the tbb task scheduler
0383 inline tbb_global_control_t*&
0384 ThreadPool::tbb_global_control()
0385 {
0386     static thread_local tbb_global_control_t* _instance = nullptr;
0387     return _instance;
0388 }
0389 //--------------------------------------------------------------------------------------//
0390 // task arena
0391 inline tbb_task_arena_t*
0392 ThreadPool::get_task_arena()
0393 {
0394 #if defined(PTL_USE_TBB)
0395     // create a task arena
0396     if(!m_tbb_task_arena)
0397     {
0398         auto _sz = (tbb_global_control())
0399                        ? tbb_global_control()->active_value(
0400                              tbb::global_control::max_allowed_parallelism)
0401                        : size();
0402         m_tbb_task_arena = new tbb_task_arena_t(::tbb::task_arena::attach{});
0403         m_tbb_task_arena->initialize(_sz, 1);
0404     }
0405 #else
0406     if(!m_tbb_task_arena)
0407         m_tbb_task_arena = new tbb_task_arena_t{};
0408 #endif
0409     return m_tbb_task_arena;
0410 }
0411 //--------------------------------------------------------------------------------------//
0412 inline void
0413 ThreadPool::resize(size_type _n)
0414 {
0415     initialize_threadpool(_n);
0416     if(m_task_queue)
0417         m_task_queue->resize(static_cast<intmax_t>(_n));
0418 }
0419 //--------------------------------------------------------------------------------------//
0420 inline int
0421 ThreadPool::run_on_this(task_pointer&& _task)
0422 {
0423     auto&& _func = [_task]() { (*_task)(); };
0424 
0425     if(m_tbb_tp && m_tbb_task_group)
0426     {
0427         auto* _arena = get_task_arena();
0428         _arena->execute([this, _func]() { this->m_tbb_task_group->run(_func); });
0429     }
0430     else
0431     {
0432         _func();
0433     }
0434     // return the number of tasks added to task-list
0435     return 0;
0436 }
0437 //--------------------------------------------------------------------------------------//
0438 inline int
0439 ThreadPool::insert(task_pointer&& task, int bin)
0440 {
0441     static thread_local ThreadData* _data = ThreadData::GetInstance();
0442 
0443     // pass the task to the queue
0444     auto ibin = get_valid_queue(m_task_queue)->InsertTask(std::move(task), _data, bin);
0445     notify();
0446     return (int)ibin;
0447 }
0448 //--------------------------------------------------------------------------------------//
0449 inline ThreadPool::size_type
0450 ThreadPool::add_task(task_pointer&& task, int bin)
0451 {
0452     // if not native (i.e. TBB) or we haven't built thread-pool, just execute
0453     if(m_tbb_tp || !task->is_native_task() || !m_alive_flag->load())
0454         return static_cast<size_type>(run_on_this(std::move(task)));
0455 
0456     return static_cast<size_type>(insert(std::move(task), bin));
0457 }
0458 //--------------------------------------------------------------------------------------//
0459 template <typename ListT>
0460 inline ThreadPool::size_type
0461 ThreadPool::add_tasks(ListT& c)
0462 {
0463     if(!m_alive_flag)  // if we haven't built thread-pool, just execute
0464     {
0465         for(auto& itr : c)
0466             run(itr);
0467         c.clear();
0468         return 0;
0469     }
0470 
0471     // TODO: put a limit on how many tasks can be added at most
0472     auto c_size = c.size();
0473     for(auto& itr : c)
0474     {
0475         if(!itr->is_native_task())
0476             --c_size;
0477         else
0478         {
0479             //++(m_task_queue);
0480             get_valid_queue(m_task_queue)->InsertTask(itr);
0481         }
0482     }
0483     c.clear();
0484 
0485     // notify sleeping threads
0486     notify(c_size);
0487 
0488     return c_size;
0489 }
0490 //--------------------------------------------------------------------------------------//
0491 template <typename FuncT>
0492 inline void
0493 ThreadPool::execute_on_all_threads(FuncT&& _func)
0494 {
0495     if(m_tbb_tp && m_tbb_task_group)
0496     {
0497 #if defined(PTL_USE_TBB)
0498         // TBB lazily activates threads to process tasks and the main thread
0499         // participates in processing the tasks so getting a specific
0500         // function to execute only on the worker threads requires some trickery
0501         //
0502         std::set<std::thread::id> _first{};
0503         Mutex                     _mutex{};
0504         // init function which executes function and returns 1 only once
0505         auto _init = [&]() {
0506             int _once = 0;
0507             _mutex.lock();
0508             if(_first.find(std::this_thread::get_id()) == _first.end())
0509             {
0510                 // we need to reset this thread-local static for multiple invocations
0511                 // of the same template instantiation
0512                 _once = 1;
0513                 _first.insert(std::this_thread::get_id());
0514             }
0515             _mutex.unlock();
0516             if(_once != 0)
0517             {
0518                 _func();
0519                 return 1;
0520             }
0521             return 0;
0522         };
0523         // this will collect the number of threads which have
0524         // executed the _init function above
0525         std::atomic<size_t> _total_init{ 0 };
0526         // max parallelism by TBB
0527         size_t _maxp = tbb_global_control()->active_value(
0528             tbb::global_control::max_allowed_parallelism);
0529         // create a task arean
0530         auto* _arena = get_task_arena();
0531         // size of the thread-pool
0532         size_t _sz = size();
0533         // number of cores
0534         size_t _ncore = Threading::GetNumberOfCores();
0535         // maximum depth for recursion
0536         size_t _dmax = std::max<size_t>(_ncore, 8);
0537         // how many threads we need to initialize
0538         size_t _num = std::min(_maxp, std::min(_sz, _ncore));
0539         // this is the task passed to the task-group
0540         std::function<void()> _init_task;
0541         _init_task = [&]() {
0542             add_thread_id();
0543             static thread_local size_type _depth = 0;
0544             int                           _ret   = 0;
0545             // don't let the main thread execute the function
0546             if(!is_main())
0547             {
0548                 // execute the function
0549                 _ret = _init();
0550                 // add the result
0551                 _total_init += _ret;
0552             }
0553             // if the function did not return anything, recursively execute
0554             // two more tasks
0555             ++_depth;
0556             if(_ret == 0 && _depth < _dmax && _total_init.load() < _num)
0557             {
0558                 tbb::task_group tg{};
0559                 tg.run([&]() { _init_task(); });
0560                 tg.run([&]() { _init_task(); });
0561                 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
0562                 tg.wait();
0563             }
0564             --_depth;
0565         };
0566 
0567         // TBB won't oversubscribe so we need to limit by ncores - 1
0568         size_t nitr        = 0;
0569         auto   _fname      = __FUNCTION__;
0570         auto   _write_info = [&]() {
0571             std::cout << "[" << _fname << "]> Total initialized: " << _total_init
0572                       << ", expected: " << _num << ", max-parallel: " << _maxp
0573                       << ", size: " << _sz << ", ncore: " << _ncore << std::endl;
0574         };
0575         while(_total_init < _num)
0576         {
0577             auto _n = 2 * _num;
0578             while(--_n > 0)
0579             {
0580                 _arena->execute(
0581                     [&]() { m_tbb_task_group->run([&]() { _init_task(); }); });
0582             }
0583             _arena->execute([&]() { m_tbb_task_group->wait(); });
0584             // don't loop infinitely but use a strict condition
0585             if(nitr++ > 2 * (_num + 1) && (_total_init - 1) == _num)
0586             {
0587                 _write_info();
0588                 break;
0589             }
0590             // at this point we need to exit
0591             if(nitr > 4 * (_ncore + 1))
0592             {
0593                 _write_info();
0594                 break;
0595             }
0596         }
0597         if(get_verbose() > 3)
0598             _write_info();
0599 #endif
0600     }
0601     else if(get_queue())
0602     {
0603         get_queue()->ExecuteOnAllThreads(this, std::forward<FuncT>(_func));
0604     }
0605 }
0606 
0607 //--------------------------------------------------------------------------------------//
0608 
0609 template <typename FuncT>
0610 inline void
0611 ThreadPool::execute_on_specific_threads(const std::set<std::thread::id>& _tids,
0612                                         FuncT&&                          _func)
0613 {
0614     if(m_tbb_tp && m_tbb_task_group)
0615     {
0616 #if defined(PTL_USE_TBB)
0617         // TBB lazily activates threads to process tasks and the main thread
0618         // participates in processing the tasks so getting a specific
0619         // function to execute only on the worker threads requires some trickery
0620         //
0621         std::set<std::thread::id> _first{};
0622         Mutex                     _mutex{};
0623         // init function which executes function and returns 1 only once
0624         auto _exec = [&]() {
0625             int _once = 0;
0626             _mutex.lock();
0627             if(_first.find(std::this_thread::get_id()) == _first.end())
0628             {
0629                 // we need to reset this thread-local static for multiple invocations
0630                 // of the same template instantiation
0631                 _once = 1;
0632                 _first.insert(std::this_thread::get_id());
0633             }
0634             _mutex.unlock();
0635             if(_once != 0)
0636             {
0637                 _func();
0638                 return 1;
0639             }
0640             return 0;
0641         };
0642         // this will collect the number of threads which have
0643         // executed the _exec function above
0644         std::atomic<size_t> _total_exec{ 0 };
0645         // number of cores
0646         size_t _ncore = Threading::GetNumberOfCores();
0647         // maximum depth for recursion
0648         size_t _dmax = std::max<size_t>(_ncore, 8);
0649         // how many threads we need to initialize
0650         size_t _num = _tids.size();
0651         // create a task arena
0652         auto* _arena = get_task_arena();
0653         // this is the task passed to the task-group
0654         std::function<void()> _exec_task;
0655         _exec_task = [&]() {
0656             add_thread_id();
0657             static thread_local size_type _depth    = 0;
0658             int                           _ret      = 0;
0659             auto                          _this_tid = std::this_thread::get_id();
0660             // don't let the main thread execute the function
0661             if(_tids.count(_this_tid) > 0)
0662             {
0663                 // execute the function
0664                 _ret = _exec();
0665                 // add the result
0666                 _total_exec += _ret;
0667             }
0668             // if the function did not return anything, recursively execute
0669             // two more tasks
0670             ++_depth;
0671             if(_ret == 0 && _depth < _dmax && _total_exec.load() < _num)
0672             {
0673                 tbb::task_group tg{};
0674                 tg.run([&]() { _exec_task(); });
0675                 tg.run([&]() { _exec_task(); });
0676                 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
0677                 tg.wait();
0678             }
0679             --_depth;
0680         };
0681 
0682         // TBB won't oversubscribe so we need to limit by ncores - 1
0683         size_t nitr        = 0;
0684         auto   _fname      = __FUNCTION__;
0685         auto   _write_info = [&]() {
0686             std::cout << "[" << _fname << "]> Total executed: " << _total_exec
0687                       << ", expected: " << _num << ", size: " << size() << std::endl;
0688         };
0689         while(_total_exec < _num)
0690         {
0691             auto _n = 2 * _num;
0692             while(--_n > 0)
0693             {
0694                 _arena->execute(
0695                     [&]() { m_tbb_task_group->run([&]() { _exec_task(); }); });
0696             }
0697             _arena->execute([&]() { m_tbb_task_group->wait(); });
0698             // don't loop infinitely but use a strict condition
0699             if(nitr++ > 2 * (_num + 1) && (_total_exec - 1) == _num)
0700             {
0701                 _write_info();
0702                 break;
0703             }
0704             // at this point we need to exit
0705             if(nitr > 8 * (_num + 1))
0706             {
0707                 _write_info();
0708                 break;
0709             }
0710         }
0711         if(get_verbose() > 3)
0712             _write_info();
0713 #endif
0714     }
0715     else if(get_queue())
0716     {
0717         get_queue()->ExecuteOnSpecificThreads(_tids, this, std::forward<FuncT>(_func));
0718     }
0719 }
0720 
0721 //======================================================================================//
0722 
0723 }  // namespace PTL