File indexing completed on 2025-07-30 08:46:16
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB_flow_graph_impl_H
0018 #define __TBB_flow_graph_impl_H
0019
0020
0021 #include "_task.h"
0022 #include "../task_group.h"
0023 #include "../task_arena.h"
0024 #include "../flow_graph_abstractions.h"
0025
0026 #include "../concurrent_priority_queue.h"
0027
0028 #include <list>
0029
0030 namespace tbb {
0031 namespace detail {
0032
0033 namespace d1 {
0034
0035 class graph_task;
0036 static graph_task* const SUCCESSFULLY_ENQUEUED = (graph_task*)-1;
0037 typedef unsigned int node_priority_t;
0038 static const node_priority_t no_priority = node_priority_t(0);
0039
0040 class graph;
0041 class graph_node;
0042
0043 template <typename GraphContainerType, typename GraphNodeType>
0044 class graph_iterator {
0045 friend class graph;
0046 friend class graph_node;
0047 public:
0048 typedef size_t size_type;
0049 typedef GraphNodeType value_type;
0050 typedef GraphNodeType* pointer;
0051 typedef GraphNodeType& reference;
0052 typedef const GraphNodeType& const_reference;
0053 typedef std::forward_iterator_tag iterator_category;
0054
0055
0056 graph_iterator(const graph_iterator& other) :
0057 my_graph(other.my_graph), current_node(other.current_node)
0058 {}
0059
0060
0061 graph_iterator& operator=(const graph_iterator& other) {
0062 if (this != &other) {
0063 my_graph = other.my_graph;
0064 current_node = other.current_node;
0065 }
0066 return *this;
0067 }
0068
0069
0070 reference operator*() const;
0071
0072
0073 pointer operator->() const;
0074
0075
0076 bool operator==(const graph_iterator& other) const {
0077 return ((my_graph == other.my_graph) && (current_node == other.current_node));
0078 }
0079
0080 #if !__TBB_CPP20_COMPARISONS_PRESENT
0081
0082 bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
0083 #endif
0084
0085
0086 graph_iterator& operator++() {
0087 internal_forward();
0088 return *this;
0089 }
0090
0091
0092 graph_iterator operator++(int) {
0093 graph_iterator result = *this;
0094 operator++();
0095 return result;
0096 }
0097
0098 private:
0099
0100 GraphContainerType *my_graph;
0101
0102 pointer current_node;
0103
0104
0105 graph_iterator(GraphContainerType *g, bool begin);
0106 void internal_forward();
0107 };
0108
0109
0110 enum reset_flags {
0111 rf_reset_protocol = 0,
0112 rf_reset_bodies = 1 << 0,
0113 rf_clear_edges = 1 << 1
0114 };
0115
0116 void activate_graph(graph& g);
0117 void deactivate_graph(graph& g);
0118 bool is_graph_active(graph& g);
0119 graph_task* prioritize_task(graph& g, graph_task& arena_task);
0120 void spawn_in_graph_arena(graph& g, graph_task& arena_task);
0121 void enqueue_in_graph_arena(graph &g, graph_task& arena_task);
0122
0123 class graph;
0124
0125
0126 class graph_task : public task {
0127 public:
0128 graph_task(graph& g, small_object_allocator& allocator
0129 , node_priority_t node_priority = no_priority
0130 )
0131 : my_graph(g)
0132 , priority(node_priority)
0133 , my_allocator(allocator)
0134 {}
0135 graph& my_graph;
0136
0137 node_priority_t priority;
0138 template <typename DerivedType>
0139 void destruct_and_deallocate(const execution_data& ed);
0140 protected:
0141 template <typename DerivedType>
0142 void finalize(const execution_data& ed);
0143 private:
0144
0145 graph_task* my_next{ nullptr };
0146 small_object_allocator my_allocator;
0147
0148 friend class graph_task_list;
0149 friend graph_task* prioritize_task(graph& g, graph_task& gt);
0150 };
0151
0152 struct graph_task_comparator {
0153 bool operator()(const graph_task* left, const graph_task* right) {
0154 return left->priority < right->priority;
0155 }
0156 };
0157
0158 typedef tbb::concurrent_priority_queue<graph_task*, graph_task_comparator> graph_task_priority_queue_t;
0159
0160 class priority_task_selector : public task {
0161 public:
0162 priority_task_selector(graph_task_priority_queue_t& priority_queue, small_object_allocator& allocator)
0163 : my_priority_queue(priority_queue), my_allocator(allocator), my_task() {}
0164 task* execute(execution_data& ed) override {
0165 next_task();
0166 __TBB_ASSERT(my_task, nullptr);
0167 task* t_next = my_task->execute(ed);
0168 my_allocator.delete_object(this, ed);
0169 return t_next;
0170 }
0171 task* cancel(execution_data& ed) override {
0172 if (!my_task) {
0173 next_task();
0174 }
0175 __TBB_ASSERT(my_task, nullptr);
0176 task* t_next = my_task->cancel(ed);
0177 my_allocator.delete_object(this, ed);
0178 return t_next;
0179 }
0180 private:
0181 void next_task() {
0182
0183 bool result = my_priority_queue.try_pop(my_task);
0184 __TBB_ASSERT_EX(result, "Number of critical tasks for scheduler and tasks"
0185 " in graph's priority queue mismatched");
0186 __TBB_ASSERT(my_task && my_task != SUCCESSFULLY_ENQUEUED,
0187 "Incorrect task submitted to graph priority queue");
0188 __TBB_ASSERT(my_task->priority != no_priority,
0189 "Tasks from graph's priority queue must have priority");
0190 }
0191
0192 graph_task_priority_queue_t& my_priority_queue;
0193 small_object_allocator my_allocator;
0194 graph_task* my_task;
0195 };
0196
0197 template <typename Receiver, typename Body> class run_and_put_task;
0198 template <typename Body> class run_task;
0199
0200
0201
0202
0203
0204
0205 class graph_task_list : no_copy {
0206 private:
0207 graph_task* my_first;
0208 graph_task** my_next_ptr;
0209 public:
0210
0211 graph_task_list() : my_first(nullptr), my_next_ptr(&my_first) {}
0212
0213
0214 bool empty() const { return !my_first; }
0215
0216
0217 void push_back(graph_task& task) {
0218 task.my_next = nullptr;
0219 *my_next_ptr = &task;
0220 my_next_ptr = &task.my_next;
0221 }
0222
0223
0224 graph_task& pop_front() {
0225 __TBB_ASSERT(!empty(), "attempt to pop item from empty task_list");
0226 graph_task* result = my_first;
0227 my_first = result->my_next;
0228 if (!my_first) {
0229 my_next_ptr = &my_first;
0230 }
0231 return *result;
0232 }
0233 };
0234
0235
0236
0237 class graph : no_copy, public graph_proxy {
0238 friend class graph_node;
0239
0240 void prepare_task_arena(bool reinit = false) {
0241 if (reinit) {
0242 __TBB_ASSERT(my_task_arena, "task arena is nullptr");
0243 my_task_arena->terminate();
0244 my_task_arena->initialize(task_arena::attach());
0245 }
0246 else {
0247 __TBB_ASSERT(my_task_arena == nullptr, "task arena is not nullptr");
0248 my_task_arena = new task_arena(task_arena::attach());
0249 }
0250 if (!my_task_arena->is_active())
0251 my_task_arena->initialize();
0252 __TBB_ASSERT(my_task_arena->is_active(), "task arena is not active");
0253 }
0254
0255 public:
0256
0257 graph();
0258
0259
0260 explicit graph(task_group_context& use_this_context);
0261
0262
0263
0264 ~graph();
0265
0266
0267
0268
0269 void reserve_wait() override;
0270
0271
0272
0273
0274 void release_wait() override;
0275
0276
0277
0278
0279 void wait_for_all() {
0280 cancelled = false;
0281 caught_exception = false;
0282 try_call([this] {
0283 my_task_arena->execute([this] {
0284 wait(my_wait_context, *my_context);
0285 });
0286 cancelled = my_context->is_group_execution_cancelled();
0287 }).on_exception([this] {
0288 my_context->reset();
0289 caught_exception = true;
0290 cancelled = true;
0291 });
0292
0293
0294
0295 if (!(my_context->traits() & task_group_context::concurrent_wait)) {
0296 my_context->reset();
0297 }
0298 }
0299
0300
0301
0302
0303 template<typename C, typename N>
0304 friend class graph_iterator;
0305
0306
0307 typedef graph_iterator<graph, graph_node> iterator;
0308 typedef graph_iterator<const graph, const graph_node> const_iterator;
0309
0310
0311
0312 iterator begin();
0313
0314 iterator end();
0315
0316 const_iterator begin() const;
0317
0318 const_iterator end() const;
0319
0320 const_iterator cbegin() const;
0321
0322 const_iterator cend() const;
0323
0324
0325 void reset(reset_flags f = rf_reset_protocol);
0326
0327
0328 void cancel();
0329
0330
0331 bool is_cancelled() { return cancelled; }
0332 bool exception_thrown() { return caught_exception; }
0333
0334 private:
0335 wait_context my_wait_context;
0336 task_group_context *my_context;
0337 bool own_context;
0338 bool cancelled;
0339 bool caught_exception;
0340 bool my_is_active;
0341
0342 graph_node *my_nodes, *my_nodes_last;
0343
0344 tbb::spin_mutex nodelist_mutex;
0345 void register_node(graph_node *n);
0346 void remove_node(graph_node *n);
0347
0348 task_arena* my_task_arena;
0349
0350 graph_task_priority_queue_t my_priority_queue;
0351
0352 friend void activate_graph(graph& g);
0353 friend void deactivate_graph(graph& g);
0354 friend bool is_graph_active(graph& g);
0355 friend graph_task* prioritize_task(graph& g, graph_task& arena_task);
0356 friend void spawn_in_graph_arena(graph& g, graph_task& arena_task);
0357 friend void enqueue_in_graph_arena(graph &g, graph_task& arena_task);
0358
0359 friend class task_arena_base;
0360
0361 };
0362
0363 template<typename DerivedType>
0364 inline void graph_task::destruct_and_deallocate(const execution_data& ed) {
0365 auto allocator = my_allocator;
0366
0367 this->~graph_task();
0368 allocator.deallocate(static_cast<DerivedType*>(this), ed);
0369 }
0370
0371 template<typename DerivedType>
0372 inline void graph_task::finalize(const execution_data& ed) {
0373 graph& g = my_graph;
0374 destruct_and_deallocate<DerivedType>(ed);
0375 g.release_wait();
0376 }
0377
0378
0379
0380
0381
0382
0383 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0384 class get_graph_helper;
0385 #endif
0386
0387
0388 class graph_node : no_copy {
0389 friend class graph;
0390 template<typename C, typename N>
0391 friend class graph_iterator;
0392
0393 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0394 friend class get_graph_helper;
0395 #endif
0396
0397 protected:
0398 graph& my_graph;
0399 graph& graph_reference() const {
0400
0401 return my_graph;
0402 }
0403 graph_node* next = nullptr;
0404 graph_node* prev = nullptr;
0405 public:
0406 explicit graph_node(graph& g);
0407
0408 virtual ~graph_node();
0409
0410 protected:
0411
0412 virtual void reset_node(reset_flags f = rf_reset_protocol) = 0;
0413 };
0414
0415 inline void activate_graph(graph& g) {
0416 g.my_is_active = true;
0417 }
0418
0419 inline void deactivate_graph(graph& g) {
0420 g.my_is_active = false;
0421 }
0422
0423 inline bool is_graph_active(graph& g) {
0424 return g.my_is_active;
0425 }
0426
0427 inline graph_task* prioritize_task(graph& g, graph_task& gt) {
0428 if( no_priority == gt.priority )
0429 return >
0430
0431
0432
0433
0434
0435 task* critical_task = gt.my_allocator.new_object<priority_task_selector>(g.my_priority_queue, gt.my_allocator);
0436 __TBB_ASSERT( critical_task, "bad_alloc?" );
0437 g.my_priority_queue.push(>);
0438 using tbb::detail::d1::submit;
0439 submit( *critical_task, *g.my_task_arena, *g.my_context, true );
0440 return nullptr;
0441 }
0442
0443
0444 inline void spawn_in_graph_arena(graph& g, graph_task& arena_task) {
0445 if (is_graph_active(g)) {
0446 task* gt = prioritize_task(g, arena_task);
0447 if( !gt )
0448 return;
0449
0450 __TBB_ASSERT(g.my_task_arena && g.my_task_arena->is_active(), nullptr);
0451 submit( *gt, *g.my_task_arena, *g.my_context
0452 #if __TBB_PREVIEW_CRITICAL_TASKS
0453 , false
0454 #endif
0455 );
0456 }
0457 }
0458
0459
0460
0461
0462 inline void enqueue_in_graph_arena(graph &g, graph_task& arena_task) {
0463 if (is_graph_active(g)) {
0464 __TBB_ASSERT( g.my_task_arena && g.my_task_arena->is_active(), "Is graph's arena initialized and active?" );
0465
0466
0467 if( task* gt = prioritize_task(g, arena_task) )
0468 submit( *gt, *g.my_task_arena, *g.my_context, false);
0469 }
0470 }
0471
0472 }
0473 }
0474 }
0475
0476 #endif