File indexing completed on 2025-12-18 10:24:17
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB_flow_graph_impl_H
0018 #define __TBB_flow_graph_impl_H
0019
0020
0021 #include "_task.h"
0022 #include "../task_group.h"
0023 #include "../task_arena.h"
0024 #include "../flow_graph_abstractions.h"
0025
0026 #include "../concurrent_priority_queue.h"
0027
0028 #include <list>
0029
0030 namespace tbb {
0031 namespace detail {
0032
0033 namespace d2 {
0034
0035 class graph_task;
0036 static graph_task* const SUCCESSFULLY_ENQUEUED = (graph_task*)-1;
0037 typedef unsigned int node_priority_t;
0038 static const node_priority_t no_priority = node_priority_t(0);
0039
0040 class graph;
0041 class graph_node;
0042
0043 template <typename GraphContainerType, typename GraphNodeType>
0044 class graph_iterator {
0045 friend class graph;
0046 friend class graph_node;
0047 public:
0048 typedef size_t size_type;
0049 typedef GraphNodeType value_type;
0050 typedef GraphNodeType* pointer;
0051 typedef GraphNodeType& reference;
0052 typedef const GraphNodeType& const_reference;
0053 typedef std::forward_iterator_tag iterator_category;
0054
0055
0056 graph_iterator(const graph_iterator& other) :
0057 my_graph(other.my_graph), current_node(other.current_node)
0058 {}
0059
0060
0061 graph_iterator& operator=(const graph_iterator& other) {
0062 if (this != &other) {
0063 my_graph = other.my_graph;
0064 current_node = other.current_node;
0065 }
0066 return *this;
0067 }
0068
0069
0070 reference operator*() const;
0071
0072
0073 pointer operator->() const;
0074
0075
0076 bool operator==(const graph_iterator& other) const {
0077 return ((my_graph == other.my_graph) && (current_node == other.current_node));
0078 }
0079
0080 #if !__TBB_CPP20_COMPARISONS_PRESENT
0081
0082 bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
0083 #endif
0084
0085
0086 graph_iterator& operator++() {
0087 internal_forward();
0088 return *this;
0089 }
0090
0091
0092 graph_iterator operator++(int) {
0093 graph_iterator result = *this;
0094 operator++();
0095 return result;
0096 }
0097
0098 private:
0099
0100 GraphContainerType *my_graph;
0101
0102 pointer current_node;
0103
0104
0105 graph_iterator(GraphContainerType *g, bool begin);
0106 void internal_forward();
0107 };
0108
0109
0110 enum reset_flags {
0111 rf_reset_protocol = 0,
0112 rf_reset_bodies = 1 << 0,
0113 rf_clear_edges = 1 << 1
0114 };
0115
0116 void activate_graph(graph& g);
0117 void deactivate_graph(graph& g);
0118 bool is_graph_active(graph& g);
0119 graph_task* prioritize_task(graph& g, graph_task& arena_task);
0120 void spawn_in_graph_arena(graph& g, graph_task& arena_task);
0121 void enqueue_in_graph_arena(graph &g, graph_task& arena_task);
0122
0123 class graph;
0124
0125
0126 class graph_task : public d1::task {
0127 public:
0128 graph_task(graph& g, d1::small_object_allocator& allocator,
0129 node_priority_t node_priority = no_priority);
0130
0131 graph& my_graph;
0132
0133 node_priority_t priority;
0134 template <typename DerivedType>
0135 void destruct_and_deallocate(const d1::execution_data& ed);
0136 protected:
0137 template <typename DerivedType>
0138 void finalize(const d1::execution_data& ed);
0139 private:
0140
0141 graph_task* my_next{ nullptr };
0142 d1::small_object_allocator my_allocator;
0143 d1::wait_tree_vertex_interface* my_reference_vertex;
0144
0145 friend class graph_task_list;
0146 friend graph_task* prioritize_task(graph& g, graph_task& gt);
0147 };
0148
0149 inline bool is_this_thread_in_graph_arena(graph& g);
0150
0151 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0152 class trackable_messages_graph_task : public graph_task {
0153 public:
0154 trackable_messages_graph_task(graph& g, d1::small_object_allocator& allocator,
0155 node_priority_t node_priority,
0156 const std::forward_list<d1::wait_context_vertex*>& msg_waiters)
0157 : graph_task(g, allocator, node_priority)
0158 , my_msg_wait_context_vertices(msg_waiters)
0159 {
0160 auto last_iterator = my_msg_reference_vertices.cbefore_begin();
0161
0162 for (auto& msg_waiter : my_msg_wait_context_vertices) {
0163
0164
0165
0166 d1::wait_tree_vertex_interface* ref_vertex = is_this_thread_in_graph_arena(g) ?
0167 r1::get_thread_reference_vertex(msg_waiter) :
0168 msg_waiter;
0169 last_iterator = my_msg_reference_vertices.emplace_after(last_iterator,
0170 ref_vertex);
0171 ref_vertex->reserve(1);
0172 }
0173 }
0174
0175 trackable_messages_graph_task(graph& g, d1::small_object_allocator& allocator,
0176 node_priority_t node_priority,
0177 std::forward_list<d1::wait_context_vertex*>&& msg_waiters)
0178 : graph_task(g, allocator, node_priority)
0179 , my_msg_wait_context_vertices(std::move(msg_waiters))
0180 {
0181 }
0182
0183 const std::forward_list<d1::wait_context_vertex*> get_msg_wait_context_vertices() const {
0184 return my_msg_wait_context_vertices;
0185 }
0186
0187 protected:
0188 template <typename DerivedType>
0189 void finalize(const d1::execution_data& ed) {
0190 auto wait_context_vertices = std::move(my_msg_wait_context_vertices);
0191 auto msg_reference_vertices = std::move(my_msg_reference_vertices);
0192 graph_task::finalize<DerivedType>(ed);
0193
0194
0195
0196
0197 if (msg_reference_vertices.empty()) {
0198 for (auto& msg_waiter : wait_context_vertices) {
0199 msg_waiter->release(1);
0200 }
0201 } else {
0202 for (auto& msg_waiter : msg_reference_vertices) {
0203 msg_waiter->release(1);
0204 }
0205 }
0206 }
0207 private:
0208
0209
0210
0211
0212
0213 std::forward_list<d1::wait_context_vertex*> my_msg_wait_context_vertices;
0214 std::forward_list<d1::wait_tree_vertex_interface*> my_msg_reference_vertices;
0215 };
0216 #endif
0217
0218 struct graph_task_comparator {
0219 bool operator()(const graph_task* left, const graph_task* right) {
0220 return left->priority < right->priority;
0221 }
0222 };
0223
0224 typedef tbb::concurrent_priority_queue<graph_task*, graph_task_comparator> graph_task_priority_queue_t;
0225
0226 class priority_task_selector : public d1::task {
0227 public:
0228 priority_task_selector(graph_task_priority_queue_t& priority_queue, d1::small_object_allocator& allocator)
0229 : my_priority_queue(priority_queue), my_allocator(allocator), my_task() {}
0230 task* execute(d1::execution_data& ed) override {
0231 next_task();
0232 __TBB_ASSERT(my_task, nullptr);
0233 task* t_next = my_task->execute(ed);
0234 my_allocator.delete_object(this, ed);
0235 return t_next;
0236 }
0237 task* cancel(d1::execution_data& ed) override {
0238 if (!my_task) {
0239 next_task();
0240 }
0241 __TBB_ASSERT(my_task, nullptr);
0242 task* t_next = my_task->cancel(ed);
0243 my_allocator.delete_object(this, ed);
0244 return t_next;
0245 }
0246 private:
0247 void next_task() {
0248
0249 bool result = my_priority_queue.try_pop(my_task);
0250 __TBB_ASSERT_EX(result, "Number of critical tasks for scheduler and tasks"
0251 " in graph's priority queue mismatched");
0252 __TBB_ASSERT(my_task && my_task != SUCCESSFULLY_ENQUEUED,
0253 "Incorrect task submitted to graph priority queue");
0254 __TBB_ASSERT(my_task->priority != no_priority,
0255 "Tasks from graph's priority queue must have priority");
0256 }
0257
0258 graph_task_priority_queue_t& my_priority_queue;
0259 d1::small_object_allocator my_allocator;
0260 graph_task* my_task;
0261 };
0262
0263 template <typename Receiver, typename Body> class run_and_put_task;
0264 template <typename Body> class run_task;
0265
0266
0267
0268
0269
0270
0271 class graph_task_list : no_copy {
0272 private:
0273 graph_task* my_first;
0274 graph_task** my_next_ptr;
0275 public:
0276
0277 graph_task_list() : my_first(nullptr), my_next_ptr(&my_first) {}
0278
0279
0280 bool empty() const { return !my_first; }
0281
0282
0283 void push_back(graph_task& task) {
0284 task.my_next = nullptr;
0285 *my_next_ptr = &task;
0286 my_next_ptr = &task.my_next;
0287 }
0288
0289
0290 graph_task& pop_front() {
0291 __TBB_ASSERT(!empty(), "attempt to pop item from empty task_list");
0292 graph_task* result = my_first;
0293 my_first = result->my_next;
0294 if (!my_first) {
0295 my_next_ptr = &my_first;
0296 }
0297 return *result;
0298 }
0299 };
0300
0301
0302
0303 class graph : no_copy, public graph_proxy {
0304 friend class graph_node;
0305
0306 void prepare_task_arena(bool reinit = false) {
0307 if (reinit) {
0308 __TBB_ASSERT(my_task_arena, "task arena is nullptr");
0309 my_task_arena->terminate();
0310 my_task_arena->initialize(task_arena::attach());
0311 }
0312 else {
0313 __TBB_ASSERT(my_task_arena == nullptr, "task arena is not nullptr");
0314 my_task_arena = new task_arena(task_arena::attach());
0315 }
0316 if (!my_task_arena->is_active())
0317 my_task_arena->initialize();
0318 __TBB_ASSERT(my_task_arena->is_active(), "task arena is not active");
0319 }
0320
0321 public:
0322
0323 graph();
0324
0325
0326 explicit graph(task_group_context& use_this_context);
0327
0328
0329
0330 ~graph();
0331
0332
0333
0334
0335 void reserve_wait() override;
0336
0337
0338
0339
0340 void release_wait() override;
0341
0342
0343
0344
0345 void wait_for_all() {
0346 cancelled = false;
0347 caught_exception = false;
0348 try_call([this] {
0349 my_task_arena->execute([this] {
0350 wait(my_wait_context_vertex.get_context(), *my_context);
0351 });
0352 cancelled = my_context->is_group_execution_cancelled();
0353 }).on_exception([this] {
0354 my_context->reset();
0355 caught_exception = true;
0356 cancelled = true;
0357 });
0358
0359
0360
0361 if (!(my_context->traits() & task_group_context::concurrent_wait)) {
0362 my_context->reset();
0363 }
0364 }
0365
0366
0367
0368
0369 template<typename C, typename N>
0370 friend class graph_iterator;
0371
0372
0373 typedef graph_iterator<graph, graph_node> iterator;
0374 typedef graph_iterator<const graph, const graph_node> const_iterator;
0375
0376
0377
0378 iterator begin();
0379
0380 iterator end();
0381
0382 const_iterator begin() const;
0383
0384 const_iterator end() const;
0385
0386 const_iterator cbegin() const;
0387
0388 const_iterator cend() const;
0389
0390
0391 void reset(reset_flags f = rf_reset_protocol);
0392
0393
0394 void cancel();
0395
0396
0397 bool is_cancelled() { return cancelled; }
0398 bool exception_thrown() { return caught_exception; }
0399
0400 private:
0401 d1::wait_context_vertex my_wait_context_vertex;
0402 task_group_context *my_context;
0403 bool own_context;
0404 bool cancelled;
0405 bool caught_exception;
0406 bool my_is_active;
0407
0408 graph_node *my_nodes, *my_nodes_last;
0409
0410 tbb::spin_mutex nodelist_mutex;
0411 void register_node(graph_node *n);
0412 void remove_node(graph_node *n);
0413
0414 task_arena* my_task_arena;
0415
0416 graph_task_priority_queue_t my_priority_queue;
0417
0418 d1::wait_context_vertex& get_wait_context_vertex() { return my_wait_context_vertex; }
0419
0420 friend void activate_graph(graph& g);
0421 friend void deactivate_graph(graph& g);
0422 friend bool is_graph_active(graph& g);
0423 friend bool is_this_thread_in_graph_arena(graph& g);
0424 friend graph_task* prioritize_task(graph& g, graph_task& arena_task);
0425 friend void spawn_in_graph_arena(graph& g, graph_task& arena_task);
0426 friend void enqueue_in_graph_arena(graph &g, graph_task& arena_task);
0427
0428 friend class d1::task_arena_base;
0429 friend class graph_task;
0430
0431 template <typename T>
0432 friend class receiver;
0433 };
0434
0435 template<typename DerivedType>
0436 inline void graph_task::destruct_and_deallocate(const d1::execution_data& ed) {
0437 auto allocator = my_allocator;
0438
0439 this->~graph_task();
0440 allocator.deallocate(static_cast<DerivedType*>(this), ed);
0441 }
0442
0443 template<typename DerivedType>
0444 inline void graph_task::finalize(const d1::execution_data& ed) {
0445 d1::wait_tree_vertex_interface* reference_vertex = my_reference_vertex;
0446 destruct_and_deallocate<DerivedType>(ed);
0447 reference_vertex->release();
0448 }
0449
0450 inline graph_task::graph_task(graph& g, d1::small_object_allocator& allocator,
0451 node_priority_t node_priority)
0452 : my_graph(g)
0453 , priority(node_priority)
0454 , my_allocator(allocator)
0455 {
0456
0457
0458
0459
0460 d1::wait_context_vertex* graph_wait_context_vertex = &my_graph.get_wait_context_vertex();
0461 my_reference_vertex = is_this_thread_in_graph_arena(g) ? r1::get_thread_reference_vertex(graph_wait_context_vertex)
0462 : graph_wait_context_vertex;
0463 __TBB_ASSERT(my_reference_vertex, nullptr);
0464 my_reference_vertex->reserve();
0465 }
0466
0467
0468
0469
0470
0471
0472 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0473 class get_graph_helper;
0474 #endif
0475
0476
0477 class graph_node : no_copy {
0478 friend class graph;
0479 template<typename C, typename N>
0480 friend class graph_iterator;
0481
0482 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0483 friend class get_graph_helper;
0484 #endif
0485
0486 protected:
0487 graph& my_graph;
0488 graph& graph_reference() const {
0489
0490 return my_graph;
0491 }
0492 graph_node* next = nullptr;
0493 graph_node* prev = nullptr;
0494 public:
0495 explicit graph_node(graph& g);
0496
0497 virtual ~graph_node();
0498
0499 protected:
0500
0501 virtual void reset_node(reset_flags f = rf_reset_protocol) = 0;
0502 };
0503
0504 inline void activate_graph(graph& g) {
0505 g.my_is_active = true;
0506 }
0507
0508 inline void deactivate_graph(graph& g) {
0509 g.my_is_active = false;
0510 }
0511
0512 inline bool is_graph_active(graph& g) {
0513 return g.my_is_active;
0514 }
0515
0516 inline bool is_this_thread_in_graph_arena(graph& g) {
0517 __TBB_ASSERT(g.my_task_arena && g.my_task_arena->is_active(), nullptr);
0518 return r1::execution_slot(*g.my_task_arena) != d1::slot_id(-1);
0519 }
0520
0521 inline graph_task* prioritize_task(graph& g, graph_task& gt) {
0522 if( no_priority == gt.priority )
0523 return >
0524
0525
0526
0527
0528
0529 d1::task* critical_task = gt.my_allocator.new_object<priority_task_selector>(g.my_priority_queue, gt.my_allocator);
0530 __TBB_ASSERT( critical_task, "bad_alloc?" );
0531 g.my_priority_queue.push(>);
0532 using tbb::detail::d1::submit;
0533 submit( *critical_task, *g.my_task_arena, *g.my_context, true );
0534 return nullptr;
0535 }
0536
0537
0538 inline void spawn_in_graph_arena(graph& g, graph_task& arena_task) {
0539 if (is_graph_active(g)) {
0540 d1::task* gt = prioritize_task(g, arena_task);
0541 if( !gt )
0542 return;
0543
0544 __TBB_ASSERT(g.my_task_arena && g.my_task_arena->is_active(), nullptr);
0545 submit( *gt, *g.my_task_arena, *g.my_context
0546 #if __TBB_PREVIEW_CRITICAL_TASKS
0547 , false
0548 #endif
0549 );
0550 }
0551 }
0552
0553
0554
0555
0556 inline void enqueue_in_graph_arena(graph &g, graph_task& arena_task) {
0557 if (is_graph_active(g)) {
0558 __TBB_ASSERT( g.my_task_arena && g.my_task_arena->is_active(), "Is graph's arena initialized and active?" );
0559
0560
0561 if( d1::task* gt = prioritize_task(g, arena_task) )
0562 submit( *gt, *g.my_task_arena, *g.my_context, false);
0563 }
0564 }
0565
0566 }
0567 }
0568 }
0569
0570 #endif