Warning, file /include/oneapi/tbb/task_arena.h was not indexed
or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB_task_arena_H
0018 #define __TBB_task_arena_H
0019
0020 #include "detail/_config.h"
0021
0022 #include "detail/_aligned_space.h"
0023 #include "detail/_attach.h"
0024 #include "detail/_exception.h"
0025 #include "detail/_namespace_injection.h"
0026 #include "detail/_small_object_pool.h"
0027 #include "detail/_task.h"
0028
0029 #include "detail/_task_handle.h"
0030
0031 #if __TBB_ARENA_BINDING
0032 #include "info.h"
0033 #endif
0034
0035 namespace tbb {
0036 namespace detail {
0037
0038 namespace d1 {
0039
0040 template<typename F, typename R>
0041 class task_arena_function : public delegate_base {
0042 F &my_func;
0043 aligned_space<R> my_return_storage;
0044 bool my_constructed{false};
0045
0046 bool operator()() const override {
0047 new (my_return_storage.begin()) R(my_func());
0048 return true;
0049 }
0050 public:
0051 task_arena_function(F& f) : my_func(f) {}
0052
0053 R consume_result() {
0054 my_constructed = true;
0055 return std::move(*(my_return_storage.begin()));
0056 }
0057 ~task_arena_function() override {
0058 if (my_constructed) {
0059 my_return_storage.begin()->~R();
0060 }
0061 }
0062 };
0063
0064 template<typename F>
0065 class task_arena_function<F,void> : public delegate_base {
0066 F &my_func;
0067 bool operator()() const override {
0068 my_func();
0069 return true;
0070 }
0071 public:
0072 task_arena_function(F& f) : my_func(f) {}
0073 void consume_result() const {}
0074
0075 friend class task_arena_base;
0076 };
0077
0078 class task_arena_base;
0079 class task_scheduler_observer;
0080 }
0081
0082 namespace r1 {
0083 class arena;
0084 struct task_arena_impl;
0085
0086 TBB_EXPORT void __TBB_EXPORTED_FUNC observe(d1::task_scheduler_observer&, bool);
0087 TBB_EXPORT void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base&);
0088 TBB_EXPORT void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base&);
0089 TBB_EXPORT bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base&);
0090 TBB_EXPORT void __TBB_EXPORTED_FUNC execute(d1::task_arena_base&, d1::delegate_base&);
0091 TBB_EXPORT void __TBB_EXPORTED_FUNC wait(d1::task_arena_base&);
0092 TBB_EXPORT int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base*);
0093 TBB_EXPORT void __TBB_EXPORTED_FUNC isolate_within_arena(d1::delegate_base& d, std::intptr_t);
0094
0095 TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_arena_base*);
0096 TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_group_context&, d1::task_arena_base*);
0097 TBB_EXPORT void __TBB_EXPORTED_FUNC submit(d1::task&, d1::task_group_context&, arena*, std::uintptr_t);
0098 }
0099
0100 namespace d2 {
0101 inline void enqueue_impl(task_handle&& th, d1::task_arena_base* ta) {
0102 __TBB_ASSERT(th != nullptr, "Attempt to schedule empty task_handle");
0103
0104 auto& ctx = task_handle_accessor::ctx_of(th);
0105
0106
0107 r1::enqueue(*task_handle_accessor::release(th), ctx, ta);
0108 }
0109 }
0110
0111 namespace d1 {
0112
0113 static constexpr unsigned num_priority_levels = 3;
0114 static constexpr int priority_stride = INT_MAX / (num_priority_levels + 1);
0115
0116 class task_arena_base {
0117 friend struct r1::task_arena_impl;
0118 friend void r1::observe(d1::task_scheduler_observer&, bool);
0119 public:
0120 enum class priority : int {
0121 low = 1 * priority_stride,
0122 normal = 2 * priority_stride,
0123 high = 3 * priority_stride
0124 };
0125 #if __TBB_ARENA_BINDING
0126 using constraints = tbb::detail::d1::constraints;
0127 #endif
0128 protected:
0129
0130 intptr_t my_version_and_traits;
0131
0132 std::atomic<do_once_state> my_initialization_state;
0133
0134
0135 std::atomic<r1::arena*> my_arena;
0136 static_assert(sizeof(std::atomic<r1::arena*>) == sizeof(r1::arena*),
0137 "To preserve backward compatibility we need the equal size of an atomic pointer and a pointer");
0138
0139
0140 int my_max_concurrency;
0141
0142
0143 unsigned my_num_reserved_slots;
0144
0145
0146 priority my_priority;
0147
0148
0149 numa_node_id my_numa_id;
0150
0151
0152 core_type_id my_core_type;
0153
0154
0155 int my_max_threads_per_core;
0156
0157
0158 core_type_id core_type() const {
0159 return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_core_type : automatic;
0160 }
0161 int max_threads_per_core() const {
0162 return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_max_threads_per_core : automatic;
0163 }
0164
0165 enum {
0166 default_flags = 0
0167 , core_type_support_flag = 1
0168 };
0169
0170 task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority)
0171 : my_version_and_traits(default_flags | core_type_support_flag)
0172 , my_initialization_state(do_once_state::uninitialized)
0173 , my_arena(nullptr)
0174 , my_max_concurrency(max_concurrency)
0175 , my_num_reserved_slots(reserved_for_masters)
0176 , my_priority(a_priority)
0177 , my_numa_id(automatic)
0178 , my_core_type(automatic)
0179 , my_max_threads_per_core(automatic)
0180 {}
0181
0182 #if __TBB_ARENA_BINDING
0183 task_arena_base(const constraints& constraints_, unsigned reserved_for_masters, priority a_priority)
0184 : my_version_and_traits(default_flags | core_type_support_flag)
0185 , my_initialization_state(do_once_state::uninitialized)
0186 , my_arena(nullptr)
0187 , my_max_concurrency(constraints_.max_concurrency)
0188 , my_num_reserved_slots(reserved_for_masters)
0189 , my_priority(a_priority)
0190 , my_numa_id(constraints_.numa_id)
0191 , my_core_type(constraints_.core_type)
0192 , my_max_threads_per_core(constraints_.max_threads_per_core)
0193 {}
0194 #endif
0195 public:
0196
0197 static const int automatic = -1;
0198 static const int not_initialized = -2;
0199 };
0200
0201 template<typename R, typename F>
0202 R isolate_impl(F& f) {
0203 task_arena_function<F, R> func(f);
0204 r1::isolate_within_arena(func, 0);
0205 return func.consume_result();
0206 }
0207
0208 template <typename F>
0209 class enqueue_task : public task {
0210 small_object_allocator m_allocator;
0211 const F m_func;
0212
0213 void finalize(const execution_data& ed) {
0214 m_allocator.delete_object(this, ed);
0215 }
0216 task* execute(execution_data& ed) override {
0217 m_func();
0218 finalize(ed);
0219 return nullptr;
0220 }
0221 task* cancel(execution_data&) override {
0222 __TBB_ASSERT_RELEASE(false, "Unhandled exception from enqueue task is caught");
0223 return nullptr;
0224 }
0225 public:
0226 enqueue_task(const F& f, small_object_allocator& alloc) : m_allocator(alloc), m_func(f) {}
0227 enqueue_task(F&& f, small_object_allocator& alloc) : m_allocator(alloc), m_func(std::move(f)) {}
0228 };
0229
0230 template<typename F>
0231 void enqueue_impl(F&& f, task_arena_base* ta) {
0232 small_object_allocator alloc{};
0233 r1::enqueue(*alloc.new_object<enqueue_task<typename std::decay<F>::type>>(std::forward<F>(f), alloc), ta);
0234 }
0235
0236
0237
0238
0239
0240 class task_arena : public task_arena_base {
0241
0242 void mark_initialized() {
0243 __TBB_ASSERT( my_arena.load(std::memory_order_relaxed), "task_arena initialization is incomplete" );
0244 my_initialization_state.store(do_once_state::initialized, std::memory_order_release);
0245 }
0246
0247 template<typename R, typename F>
0248 R execute_impl(F& f) {
0249 initialize();
0250 task_arena_function<F, R> func(f);
0251 r1::execute(*this, func);
0252 return func.consume_result();
0253 }
0254 public:
0255
0256
0257
0258
0259
0260
0261 task_arena(int max_concurrency_ = automatic, unsigned reserved_for_masters = 1,
0262 priority a_priority = priority::normal)
0263 : task_arena_base(max_concurrency_, reserved_for_masters, a_priority)
0264 {}
0265
0266 #if __TBB_ARENA_BINDING
0267
0268 task_arena(const constraints& constraints_, unsigned reserved_for_masters = 1,
0269 priority a_priority = priority::normal)
0270 : task_arena_base(constraints_, reserved_for_masters, a_priority)
0271 {}
0272
0273
0274 task_arena(const task_arena &s)
0275 : task_arena_base(
0276 constraints{}
0277 .set_numa_id(s.my_numa_id)
0278 .set_max_concurrency(s.my_max_concurrency)
0279 .set_core_type(s.my_core_type)
0280 .set_max_threads_per_core(s.my_max_threads_per_core)
0281 , s.my_num_reserved_slots, s.my_priority)
0282 {}
0283 #else
0284
0285 task_arena(const task_arena& a)
0286 : task_arena_base(a.my_max_concurrency, a.my_num_reserved_slots, a.my_priority)
0287 {}
0288 #endif
0289
0290
0291 struct attach {};
0292
0293
0294 explicit task_arena( attach )
0295 : task_arena_base(automatic, 1, priority::normal)
0296 {
0297 if (r1::attach(*this)) {
0298 mark_initialized();
0299 }
0300 }
0301
0302
0303 explicit task_arena(d1::attach)
0304 : task_arena(attach{})
0305 {}
0306
0307
0308 void initialize() {
0309 atomic_do_once([this]{ r1::initialize(*this); }, my_initialization_state);
0310 }
0311
0312
0313 void initialize(int max_concurrency_, unsigned reserved_for_masters = 1,
0314 priority a_priority = priority::normal)
0315 {
0316 __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
0317 if( !is_active() ) {
0318 my_max_concurrency = max_concurrency_;
0319 my_num_reserved_slots = reserved_for_masters;
0320 my_priority = a_priority;
0321 r1::initialize(*this);
0322 mark_initialized();
0323 }
0324 }
0325
0326 #if __TBB_ARENA_BINDING
0327 void initialize(constraints constraints_, unsigned reserved_for_masters = 1,
0328 priority a_priority = priority::normal)
0329 {
0330 __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
0331 if( !is_active() ) {
0332 my_numa_id = constraints_.numa_id;
0333 my_max_concurrency = constraints_.max_concurrency;
0334 my_core_type = constraints_.core_type;
0335 my_max_threads_per_core = constraints_.max_threads_per_core;
0336 my_num_reserved_slots = reserved_for_masters;
0337 my_priority = a_priority;
0338 r1::initialize(*this);
0339 mark_initialized();
0340 }
0341 }
0342 #endif
0343
0344
0345 void initialize(attach) {
0346
0347 __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
0348 if( !is_active() ) {
0349 if ( !r1::attach(*this) ) {
0350 r1::initialize(*this);
0351 }
0352 mark_initialized();
0353 }
0354 }
0355
0356
0357 void initialize(d1::attach) {
0358 initialize(attach{});
0359 }
0360
0361
0362
0363 void terminate() {
0364 if( is_active() ) {
0365 r1::terminate(*this);
0366 my_initialization_state.store(do_once_state::uninitialized, std::memory_order_relaxed);
0367 }
0368 }
0369
0370
0371
0372 ~task_arena() {
0373 terminate();
0374 }
0375
0376
0377
0378 bool is_active() const {
0379 return my_initialization_state.load(std::memory_order_acquire) == do_once_state::initialized;
0380 }
0381
0382
0383
0384
0385 template<typename F>
0386 void enqueue(F&& f) {
0387 initialize();
0388 enqueue_impl(std::forward<F>(f), this);
0389 }
0390
0391
0392
0393 void enqueue(d2::task_handle&& th) {
0394 initialize();
0395 d2::enqueue_impl(std::move(th), this);
0396 }
0397
0398
0399
0400
0401
0402 template<typename F>
0403 auto execute(F&& f) -> decltype(f()) {
0404 return execute_impl<decltype(f())>(f);
0405 }
0406
0407 #if __TBB_EXTRA_DEBUG
0408
0409 int debug_reserved_slots() const {
0410
0411 return my_num_reserved_slots;
0412 }
0413
0414
0415 int debug_max_concurrency() const {
0416
0417 return my_max_concurrency;
0418 }
0419
0420
0421
0422
0423 void debug_wait_until_empty() {
0424 initialize();
0425 r1::wait(*this);
0426 }
0427 #endif
0428
0429
0430 int max_concurrency() const {
0431
0432 return (my_max_concurrency > 1) ? my_max_concurrency : r1::max_concurrency(this);
0433 }
0434
0435 friend void submit(task& t, task_arena& ta, task_group_context& ctx, bool as_critical) {
0436 __TBB_ASSERT(ta.is_active(), nullptr);
0437 call_itt_task_notify(releasing, &t);
0438 r1::submit(t, ctx, ta.my_arena.load(std::memory_order_relaxed), as_critical ? 1 : 0);
0439 }
0440 };
0441
0442
0443
0444 template<typename F>
0445 inline auto isolate(F&& f) -> decltype(f()) {
0446 return isolate_impl<decltype(f())>(f);
0447 }
0448
0449
0450 inline int current_thread_index() {
0451 slot_id idx = r1::execution_slot(nullptr);
0452 return idx == slot_id(-1) ? task_arena_base::not_initialized : int(idx);
0453 }
0454
0455 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
0456 inline bool is_inside_task() {
0457 return nullptr != current_context();
0458 }
0459 #endif
0460
0461
0462 inline int max_concurrency() {
0463 return r1::max_concurrency(nullptr);
0464 }
0465
0466 inline void enqueue(d2::task_handle&& th) {
0467 d2::enqueue_impl(std::move(th), nullptr);
0468 }
0469
0470 template<typename F>
0471 inline void enqueue(F&& f) {
0472 enqueue_impl(std::forward<F>(f), nullptr);
0473 }
0474
0475 using r1::submit;
0476
0477 }
0478 }
0479
0480 inline namespace v1 {
0481 using detail::d1::task_arena;
0482 using detail::d1::attach;
0483
0484 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
0485 using detail::d1::is_inside_task;
0486 #endif
0487
0488 namespace this_task_arena {
0489 using detail::d1::current_thread_index;
0490 using detail::d1::max_concurrency;
0491 using detail::d1::isolate;
0492
0493 using detail::d1::enqueue;
0494 }
0495
0496 }
0497
0498 }
0499 #endif