Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /include/oneapi/tbb/task_arena.h was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 /*
0002     Copyright (c) 2005-2023 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB_task_arena_H
0018 #define __TBB_task_arena_H
0019 
0020 #include "detail/_config.h"
0021 
0022 #include "detail/_aligned_space.h"
0023 #include "detail/_attach.h"
0024 #include "detail/_exception.h"
0025 #include "detail/_namespace_injection.h"
0026 #include "detail/_small_object_pool.h"
0027 #include "detail/_task.h"
0028 
0029 #include "detail/_task_handle.h"
0030 
0031 #if __TBB_ARENA_BINDING
0032 #include "info.h"
0033 #endif /*__TBB_ARENA_BINDING*/
0034 
0035 namespace tbb {
0036 namespace detail {
0037 
0038 namespace d1 {
0039 
0040 template<typename F, typename R>
0041 class task_arena_function : public delegate_base {
0042     F &my_func;
0043     aligned_space<R> my_return_storage;
0044     bool my_constructed{false};
0045     // The function should be called only once.
0046     bool operator()() const override {
0047         new (my_return_storage.begin()) R(my_func());
0048         return true;
0049     }
0050 public:
0051     task_arena_function(F& f) : my_func(f) {}
0052     // The function can be called only after operator() and only once.
0053     R consume_result() {
0054         my_constructed = true;
0055         return std::move(*(my_return_storage.begin()));
0056     }
0057     ~task_arena_function() override {
0058         if (my_constructed) {
0059             my_return_storage.begin()->~R();
0060         }
0061     }
0062 };
0063 
0064 template<typename F>
0065 class task_arena_function<F,void> : public delegate_base {
0066     F &my_func;
0067     bool operator()() const override {
0068         my_func();
0069         return true;
0070     }
0071 public:
0072     task_arena_function(F& f) : my_func(f) {}
0073     void consume_result() const {}
0074 
0075     friend class task_arena_base;
0076 };
0077 
0078 class task_arena_base;
0079 class task_scheduler_observer;
0080 } // namespace d1
0081 
0082 namespace r1 {
0083 class arena;
0084 struct task_arena_impl;
0085 
0086 TBB_EXPORT void __TBB_EXPORTED_FUNC observe(d1::task_scheduler_observer&, bool);
0087 TBB_EXPORT void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base&);
0088 TBB_EXPORT void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base&);
0089 TBB_EXPORT bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base&);
0090 TBB_EXPORT void __TBB_EXPORTED_FUNC execute(d1::task_arena_base&, d1::delegate_base&);
0091 TBB_EXPORT void __TBB_EXPORTED_FUNC wait(d1::task_arena_base&);
0092 TBB_EXPORT int  __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base*);
0093 TBB_EXPORT void __TBB_EXPORTED_FUNC isolate_within_arena(d1::delegate_base& d, std::intptr_t);
0094 
0095 TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_arena_base*);
0096 TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_group_context&, d1::task_arena_base*);
0097 TBB_EXPORT void __TBB_EXPORTED_FUNC submit(d1::task&, d1::task_group_context&, arena*, std::uintptr_t);
0098 } // namespace r1
0099 
0100 namespace d2 {
0101 inline void enqueue_impl(task_handle&& th, d1::task_arena_base* ta) {
0102     __TBB_ASSERT(th != nullptr, "Attempt to schedule empty task_handle");
0103 
0104     auto& ctx = task_handle_accessor::ctx_of(th);
0105 
0106     // Do not access th after release
0107     r1::enqueue(*task_handle_accessor::release(th), ctx, ta);
0108 }
0109 } //namespace d2
0110 
0111 namespace d1 {
0112 
0113 static constexpr unsigned num_priority_levels = 3;
0114 static constexpr int priority_stride = INT_MAX / (num_priority_levels + 1);
0115 
0116 class task_arena_base {
0117     friend struct r1::task_arena_impl;
0118     friend void r1::observe(d1::task_scheduler_observer&, bool);
0119 public:
0120     enum class priority : int {
0121         low    = 1 * priority_stride,
0122         normal = 2 * priority_stride,
0123         high   = 3 * priority_stride
0124     };
0125 #if __TBB_ARENA_BINDING
0126     using constraints = tbb::detail::d1::constraints;
0127 #endif /*__TBB_ARENA_BINDING*/
0128 protected:
0129     //! Special settings
0130     intptr_t my_version_and_traits;
0131 
0132     std::atomic<do_once_state> my_initialization_state;
0133 
0134     //! nullptr if not currently initialized.
0135     std::atomic<r1::arena*> my_arena;
0136     static_assert(sizeof(std::atomic<r1::arena*>) == sizeof(r1::arena*),
0137         "To preserve backward compatibility we need the equal size of an atomic pointer and a pointer");
0138 
0139     //! Concurrency level for deferred initialization
0140     int my_max_concurrency;
0141 
0142     //! Reserved slots for external threads
0143     unsigned my_num_reserved_slots;
0144 
0145     //! Arena priority
0146     priority my_priority;
0147 
0148     //! The NUMA node index to which the arena will be attached
0149     numa_node_id my_numa_id;
0150 
0151     //! The core type index to which arena will be attached
0152     core_type_id my_core_type;
0153 
0154     //! Number of threads per core
0155     int my_max_threads_per_core;
0156 
0157     // Backward compatibility checks.
0158     core_type_id core_type() const {
0159         return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_core_type : automatic;
0160     }
0161     int max_threads_per_core() const {
0162         return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_max_threads_per_core : automatic;
0163     }
0164 
0165     enum {
0166         default_flags = 0
0167         , core_type_support_flag = 1
0168     };
0169 
0170     task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority)
0171         : my_version_and_traits(default_flags | core_type_support_flag)
0172         , my_initialization_state(do_once_state::uninitialized)
0173         , my_arena(nullptr)
0174         , my_max_concurrency(max_concurrency)
0175         , my_num_reserved_slots(reserved_for_masters)
0176         , my_priority(a_priority)
0177         , my_numa_id(automatic)
0178         , my_core_type(automatic)
0179         , my_max_threads_per_core(automatic)
0180         {}
0181 
0182 #if __TBB_ARENA_BINDING
0183     task_arena_base(const constraints& constraints_, unsigned reserved_for_masters, priority a_priority)
0184         : my_version_and_traits(default_flags | core_type_support_flag)
0185         , my_initialization_state(do_once_state::uninitialized)
0186         , my_arena(nullptr)
0187         , my_max_concurrency(constraints_.max_concurrency)
0188         , my_num_reserved_slots(reserved_for_masters)
0189         , my_priority(a_priority)
0190         , my_numa_id(constraints_.numa_id)
0191         , my_core_type(constraints_.core_type)
0192         , my_max_threads_per_core(constraints_.max_threads_per_core)
0193         {}
0194 #endif /*__TBB_ARENA_BINDING*/
0195 public:
0196     //! Typedef for number of threads that is automatic.
0197     static const int automatic = -1;
0198     static const int not_initialized = -2;
0199 };
0200 
0201 template<typename R, typename F>
0202 R isolate_impl(F& f) {
0203     task_arena_function<F, R> func(f);
0204     r1::isolate_within_arena(func, /*isolation*/ 0);
0205     return func.consume_result();
0206 }
0207 
0208 template <typename F>
0209 class enqueue_task : public task {
0210     small_object_allocator m_allocator;
0211     const F m_func;
0212 
0213     void finalize(const execution_data& ed) {
0214         m_allocator.delete_object(this, ed);
0215     }
0216     task* execute(execution_data& ed) override {
0217         m_func();
0218         finalize(ed);
0219         return nullptr;
0220     }
0221     task* cancel(execution_data&) override {
0222         __TBB_ASSERT_RELEASE(false, "Unhandled exception from enqueue task is caught");
0223         return nullptr;
0224     }
0225 public:
0226     enqueue_task(const F& f, small_object_allocator& alloc) : m_allocator(alloc), m_func(f) {}
0227     enqueue_task(F&& f, small_object_allocator& alloc) : m_allocator(alloc), m_func(std::move(f)) {}
0228 };
0229 
0230 template<typename F>
0231 void enqueue_impl(F&& f, task_arena_base* ta) {
0232     small_object_allocator alloc{};
0233     r1::enqueue(*alloc.new_object<enqueue_task<typename std::decay<F>::type>>(std::forward<F>(f), alloc), ta);
0234 }
0235 /** 1-to-1 proxy representation class of scheduler's arena
0236  * Constructors set up settings only, real construction is deferred till the first method invocation
0237  * Destructor only removes one of the references to the inner arena representation.
0238  * Final destruction happens when all the references (and the work) are gone.
0239  */
0240 class task_arena : public task_arena_base {
0241 
0242     void mark_initialized() {
0243         __TBB_ASSERT( my_arena.load(std::memory_order_relaxed), "task_arena initialization is incomplete" );
0244         my_initialization_state.store(do_once_state::initialized, std::memory_order_release);
0245     }
0246 
0247     template<typename R, typename F>
0248     R execute_impl(F& f) {
0249         initialize();
0250         task_arena_function<F, R> func(f);
0251         r1::execute(*this, func);
0252         return func.consume_result();
0253     }
0254 public:
0255     //! Creates task_arena with certain concurrency limits
0256     /** Sets up settings only, real construction is deferred till the first method invocation
0257      *  @arg max_concurrency specifies total number of slots in arena where threads work
0258      *  @arg reserved_for_masters specifies number of slots to be used by external threads only.
0259      *       Value of 1 is default and reflects behavior of implicit arenas.
0260      **/
0261     task_arena(int max_concurrency_ = automatic, unsigned reserved_for_masters = 1,
0262                priority a_priority = priority::normal)
0263         : task_arena_base(max_concurrency_, reserved_for_masters, a_priority)
0264     {}
0265 
0266 #if __TBB_ARENA_BINDING
0267     //! Creates task arena pinned to certain NUMA node
0268     task_arena(const constraints& constraints_, unsigned reserved_for_masters = 1,
0269                priority a_priority = priority::normal)
0270         : task_arena_base(constraints_, reserved_for_masters, a_priority)
0271     {}
0272 
0273     //! Copies settings from another task_arena
0274     task_arena(const task_arena &s) // copy settings but not the reference or instance
0275         : task_arena_base(
0276             constraints{}
0277                 .set_numa_id(s.my_numa_id)
0278                 .set_max_concurrency(s.my_max_concurrency)
0279                 .set_core_type(s.my_core_type)
0280                 .set_max_threads_per_core(s.my_max_threads_per_core)
0281             , s.my_num_reserved_slots, s.my_priority)
0282     {}
0283 #else
0284     //! Copies settings from another task_arena
0285     task_arena(const task_arena& a) // copy settings but not the reference or instance
0286         : task_arena_base(a.my_max_concurrency, a.my_num_reserved_slots, a.my_priority)
0287     {}
0288 #endif /*__TBB_ARENA_BINDING*/
0289 
0290     //! Tag class used to indicate the "attaching" constructor
0291     struct attach {};
0292 
0293     //! Creates an instance of task_arena attached to the current arena of the thread
0294     explicit task_arena( attach )
0295         : task_arena_base(automatic, 1, priority::normal) // use default settings if attach fails
0296     {
0297         if (r1::attach(*this)) {
0298             mark_initialized();
0299         }
0300     }
0301 
0302     //! Creates an instance of task_arena attached to the current arena of the thread
0303     explicit task_arena(d1::attach)
0304         : task_arena(attach{})
0305     {}
0306 
0307     //! Forces allocation of the resources for the task_arena as specified in constructor arguments
0308     void initialize() {
0309         atomic_do_once([this]{ r1::initialize(*this); }, my_initialization_state);
0310     }
0311 
0312     //! Overrides concurrency level and forces initialization of internal representation
0313     void initialize(int max_concurrency_, unsigned reserved_for_masters = 1,
0314                     priority a_priority = priority::normal)
0315     {
0316         __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
0317         if( !is_active() ) {
0318             my_max_concurrency = max_concurrency_;
0319             my_num_reserved_slots = reserved_for_masters;
0320             my_priority = a_priority;
0321             r1::initialize(*this);
0322             mark_initialized();
0323         }
0324     }
0325 
0326 #if __TBB_ARENA_BINDING
0327     void initialize(constraints constraints_, unsigned reserved_for_masters = 1,
0328                     priority a_priority = priority::normal)
0329     {
0330         __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
0331         if( !is_active() ) {
0332             my_numa_id = constraints_.numa_id;
0333             my_max_concurrency = constraints_.max_concurrency;
0334             my_core_type = constraints_.core_type;
0335             my_max_threads_per_core = constraints_.max_threads_per_core;
0336             my_num_reserved_slots = reserved_for_masters;
0337             my_priority = a_priority;
0338             r1::initialize(*this);
0339             mark_initialized();
0340         }
0341     }
0342 #endif /*__TBB_ARENA_BINDING*/
0343 
0344     //! Attaches this instance to the current arena of the thread
0345     void initialize(attach) {
0346         // TODO: decide if this call must be thread-safe
0347         __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
0348         if( !is_active() ) {
0349             if ( !r1::attach(*this) ) {
0350                 r1::initialize(*this);
0351             }
0352             mark_initialized();
0353         }
0354     }
0355 
0356     //! Attaches this instance to the current arena of the thread
0357     void initialize(d1::attach) {
0358         initialize(attach{});
0359     }
0360 
0361     //! Removes the reference to the internal arena representation.
0362     //! Not thread safe wrt concurrent invocations of other methods.
0363     void terminate() {
0364         if( is_active() ) {
0365             r1::terminate(*this);
0366             my_initialization_state.store(do_once_state::uninitialized, std::memory_order_relaxed);
0367         }
0368     }
0369 
0370     //! Removes the reference to the internal arena representation, and destroys the external object.
0371     //! Not thread safe wrt concurrent invocations of other methods.
0372     ~task_arena() {
0373         terminate();
0374     }
0375 
0376     //! Returns true if the arena is active (initialized); false otherwise.
0377     //! The name was chosen to match a task_scheduler_init method with the same semantics.
0378     bool is_active() const {
0379         return my_initialization_state.load(std::memory_order_acquire) == do_once_state::initialized;
0380     }
0381 
0382     //! Enqueues a task into the arena to process a functor, and immediately returns.
0383     //! Does not require the calling thread to join the arena
0384 
0385     template<typename F>
0386     void enqueue(F&& f) {
0387         initialize();
0388         enqueue_impl(std::forward<F>(f), this);
0389     }
0390 
0391     //! Enqueues a task into the arena to process a functor wrapped in task_handle, and immediately returns.
0392     //! Does not require the calling thread to join the arena
0393     void enqueue(d2::task_handle&& th) {
0394         initialize();
0395         d2::enqueue_impl(std::move(th), this);
0396     }
0397 
0398     //! Joins the arena and executes a mutable functor, then returns
0399     //! If not possible to join, wraps the functor into a task, enqueues it and waits for task completion
0400     //! Can decrement the arena demand for workers, causing a worker to leave and free a slot to the calling thread
0401     //! Since C++11, the method returns the value returned by functor (prior to C++11 it returns void).
0402     template<typename F>
0403     auto execute(F&& f) -> decltype(f()) {
0404         return execute_impl<decltype(f())>(f);
0405     }
0406 
0407 #if __TBB_EXTRA_DEBUG
0408     //! Returns my_num_reserved_slots
0409     int debug_reserved_slots() const {
0410         // Handle special cases inside the library
0411         return my_num_reserved_slots;
0412     }
0413 
0414     //! Returns my_max_concurrency
0415     int debug_max_concurrency() const {
0416         // Handle special cases inside the library
0417         return my_max_concurrency;
0418     }
0419 
0420     //! Wait for all work in the arena to be completed
0421     //! Even submitted by other application threads
0422     //! Joins arena if/when possible (in the same way as execute())
0423     void debug_wait_until_empty() {
0424         initialize();
0425         r1::wait(*this);
0426     }
0427 #endif //__TBB_EXTRA_DEBUG
0428 
0429     //! Returns the maximal number of threads that can work inside the arena
0430     int max_concurrency() const {
0431         // Handle special cases inside the library
0432         return (my_max_concurrency > 1) ? my_max_concurrency : r1::max_concurrency(this);
0433     }
0434 
0435     friend void submit(task& t, task_arena& ta, task_group_context& ctx, bool as_critical) {
0436         __TBB_ASSERT(ta.is_active(), nullptr);
0437         call_itt_task_notify(releasing, &t);
0438         r1::submit(t, ctx, ta.my_arena.load(std::memory_order_relaxed), as_critical ? 1 : 0);
0439     }
0440 };
0441 
0442 //! Executes a mutable functor in isolation within the current task arena.
0443 //! Since C++11, the method returns the value returned by functor (prior to C++11 it returns void).
0444 template<typename F>
0445 inline auto isolate(F&& f) -> decltype(f()) {
0446     return isolate_impl<decltype(f())>(f);
0447 }
0448 
0449 //! Returns the index, aka slot number, of the calling thread in its current arena
0450 inline int current_thread_index() {
0451     slot_id idx = r1::execution_slot(nullptr);
0452     return idx == slot_id(-1) ? task_arena_base::not_initialized : int(idx);
0453 }
0454 
0455 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
0456 inline bool is_inside_task() {
0457     return nullptr != current_context();
0458 }
0459 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS
0460 
0461 //! Returns the maximal number of threads that can work inside the arena
0462 inline int max_concurrency() {
0463     return r1::max_concurrency(nullptr);
0464 }
0465 
0466 inline void enqueue(d2::task_handle&& th) {
0467     d2::enqueue_impl(std::move(th), nullptr);
0468 }
0469 
0470 template<typename F>
0471 inline void enqueue(F&& f) {
0472     enqueue_impl(std::forward<F>(f), nullptr);
0473 }
0474 
0475 using r1::submit;
0476 
0477 } // namespace d1
0478 } // namespace detail
0479 
0480 inline namespace v1 {
0481 using detail::d1::task_arena;
0482 using detail::d1::attach;
0483 
0484 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
0485 using detail::d1::is_inside_task;
0486 #endif
0487 
0488 namespace this_task_arena {
0489 using detail::d1::current_thread_index;
0490 using detail::d1::max_concurrency;
0491 using detail::d1::isolate;
0492 
0493 using detail::d1::enqueue;
0494 } // namespace this_task_arena
0495 
0496 } // inline namespace v1
0497 
0498 } // namespace tbb
0499 #endif /* __TBB_task_arena_H */