File indexing completed on 2025-01-18 10:12:47
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB_flow_graph_impl_H
0018 #define __TBB_flow_graph_impl_H
0019
0020 #include "../tbb_stddef.h"
0021 #include "../task.h"
0022 #include "../task_arena.h"
0023 #include "../flow_graph_abstractions.h"
0024
0025 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
0026 #include "../concurrent_priority_queue.h"
0027 #endif
0028
0029 #include <list>
0030
0031 #if TBB_DEPRECATED_FLOW_ENQUEUE
0032 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
0033 #else
0034 #define FLOW_SPAWN(a) tbb::task::spawn((a))
0035 #endif
0036
0037 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
0038 #define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr ) expr
0039 #define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority ) , priority
0040 #define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1, priority
0041 #else
0042 #define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr )
0043 #define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority )
0044 #define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1
0045 #endif
0046
0047 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
0048 #define __TBB_DEPRECATED_LIMITER_EXPR( expr ) expr
0049 #define __TBB_DEPRECATED_LIMITER_ARG2( arg1, arg2 ) arg1, arg2
0050 #define __TBB_DEPRECATED_LIMITER_ARG4( arg1, arg2, arg3, arg4 ) arg1, arg3, arg4
0051 #else
0052 #define __TBB_DEPRECATED_LIMITER_EXPR( expr )
0053 #define __TBB_DEPRECATED_LIMITER_ARG2( arg1, arg2 ) arg1
0054 #define __TBB_DEPRECATED_LIMITER_ARG4( arg1, arg2, arg3, arg4 ) arg1, arg2
0055 #endif
0056
0057 namespace tbb {
0058 namespace flow {
0059
0060 namespace internal {
0061 static tbb::task * const SUCCESSFULLY_ENQUEUED = (task *)-1;
0062 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
0063 typedef unsigned int node_priority_t;
0064 static const node_priority_t no_priority = node_priority_t(0);
0065 #endif
0066 }
0067
0068 namespace interface10 {
0069 class graph;
0070 }
0071
0072 namespace interface11 {
0073
0074 using tbb::flow::internal::SUCCESSFULLY_ENQUEUED;
0075
0076 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
0077 using tbb::flow::internal::node_priority_t;
0078 using tbb::flow::internal::no_priority;
0079
0080 struct graph_task : public task {
0081 graph_task( node_priority_t node_priority = no_priority ) : priority( node_priority ) {}
0082 node_priority_t priority;
0083 };
0084 #else
0085 typedef task graph_task;
0086 #endif
0087
0088 class graph_node;
0089
0090 template <typename GraphContainerType, typename GraphNodeType>
0091 class graph_iterator {
0092 friend class tbb::flow::interface10::graph;
0093 friend class graph_node;
0094 public:
0095 typedef size_t size_type;
0096 typedef GraphNodeType value_type;
0097 typedef GraphNodeType* pointer;
0098 typedef GraphNodeType& reference;
0099 typedef const GraphNodeType& const_reference;
0100 typedef std::forward_iterator_tag iterator_category;
0101
0102
0103 graph_iterator() : my_graph(NULL), current_node(NULL) {}
0104
0105
0106 graph_iterator(const graph_iterator& other) :
0107 my_graph(other.my_graph), current_node(other.current_node)
0108 {}
0109
0110
0111 graph_iterator& operator=(const graph_iterator& other) {
0112 if (this != &other) {
0113 my_graph = other.my_graph;
0114 current_node = other.current_node;
0115 }
0116 return *this;
0117 }
0118
0119
0120 reference operator*() const;
0121
0122
0123 pointer operator->() const;
0124
0125
0126 bool operator==(const graph_iterator& other) const {
0127 return ((my_graph == other.my_graph) && (current_node == other.current_node));
0128 }
0129
0130
0131 bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
0132
0133
0134 graph_iterator& operator++() {
0135 internal_forward();
0136 return *this;
0137 }
0138
0139
0140 graph_iterator operator++(int) {
0141 graph_iterator result = *this;
0142 operator++();
0143 return result;
0144 }
0145
0146 private:
0147
0148 GraphContainerType *my_graph;
0149
0150 pointer current_node;
0151
0152
0153 graph_iterator(GraphContainerType *g, bool begin);
0154 void internal_forward();
0155 };
0156
0157
0158 enum reset_flags {
0159 rf_reset_protocol = 0,
0160 rf_reset_bodies = 1 << 0,
0161 rf_clear_edges = 1 << 1
0162 };
0163
0164 namespace internal {
0165
0166 void activate_graph(tbb::flow::interface10::graph& g);
0167 void deactivate_graph(tbb::flow::interface10::graph& g);
0168 bool is_graph_active(tbb::flow::interface10::graph& g);
0169 tbb::task& prioritize_task(tbb::flow::interface10::graph& g, tbb::task& arena_task);
0170 void spawn_in_graph_arena(tbb::flow::interface10::graph& g, tbb::task& arena_task);
0171 void enqueue_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task& arena_task);
0172 void add_task_to_graph_reset_list(tbb::flow::interface10::graph& g, tbb::task *tp);
0173
0174 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
0175 struct graph_task_comparator {
0176 bool operator()(const graph_task* left, const graph_task* right) {
0177 return left->priority < right->priority;
0178 }
0179 };
0180
0181 typedef tbb::concurrent_priority_queue<graph_task*, graph_task_comparator> graph_task_priority_queue_t;
0182
0183 class priority_task_selector : public task {
0184 public:
0185 priority_task_selector(graph_task_priority_queue_t& priority_queue)
0186 : my_priority_queue(priority_queue) {}
0187 task* execute() __TBB_override {
0188 graph_task* t = NULL;
0189 bool result = my_priority_queue.try_pop(t);
0190 __TBB_ASSERT_EX( result, "Number of critical tasks for scheduler and tasks"
0191 " in graph's priority queue mismatched" );
0192 __TBB_ASSERT( t && t != SUCCESSFULLY_ENQUEUED,
0193 "Incorrect task submitted to graph priority queue" );
0194 __TBB_ASSERT( t->priority != tbb::flow::internal::no_priority,
0195 "Tasks from graph's priority queue must have priority" );
0196 task* t_next = t->execute();
0197 task::destroy(*t);
0198 return t_next;
0199 }
0200 private:
0201 graph_task_priority_queue_t& my_priority_queue;
0202 };
0203 #endif
0204
0205 }
0206
0207 }
0208 namespace interface10 {
0209
0210
0211 class graph : tbb::internal::no_copy, public tbb::flow::graph_proxy {
0212 friend class tbb::flow::interface11::graph_node;
0213
0214 template< typename Body >
0215 class run_task : public tbb::flow::interface11::graph_task {
0216 public:
0217 run_task(Body& body
0218 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
0219 , tbb::flow::interface11::node_priority_t node_priority = tbb::flow::interface11::no_priority
0220 ) : tbb::flow::interface11::graph_task(node_priority),
0221 #else
0222 ) :
0223 #endif
0224 my_body(body) { }
0225 tbb::task *execute() __TBB_override {
0226 my_body();
0227 return NULL;
0228 }
0229 private:
0230 Body my_body;
0231 };
0232
0233 template< typename Receiver, typename Body >
0234 class run_and_put_task : public tbb::flow::interface11::graph_task {
0235 public:
0236 run_and_put_task(Receiver &r, Body& body) : my_receiver(r), my_body(body) {}
0237 tbb::task *execute() __TBB_override {
0238 tbb::task *res = my_receiver.try_put_task(my_body());
0239 if (res == tbb::flow::interface11::SUCCESSFULLY_ENQUEUED) res = NULL;
0240 return res;
0241 }
0242 private:
0243 Receiver &my_receiver;
0244 Body my_body;
0245 };
0246 typedef std::list<tbb::task *> task_list_type;
0247
0248 class wait_functor {
0249 tbb::task* graph_root_task;
0250 public:
0251 wait_functor(tbb::task* t) : graph_root_task(t) {}
0252 void operator()() const { graph_root_task->wait_for_all(); }
0253 };
0254
0255
0256 class spawn_functor : tbb::internal::no_assign {
0257 tbb::task& spawn_task;
0258 public:
0259 spawn_functor(tbb::task& t) : spawn_task(t) {}
0260 void operator()() const {
0261 FLOW_SPAWN(spawn_task);
0262 }
0263 };
0264
0265 void prepare_task_arena(bool reinit = false) {
0266 if (reinit) {
0267 __TBB_ASSERT(my_task_arena, "task arena is NULL");
0268 my_task_arena->terminate();
0269 my_task_arena->initialize(tbb::task_arena::attach());
0270 }
0271 else {
0272 __TBB_ASSERT(my_task_arena == NULL, "task arena is not NULL");
0273 my_task_arena = new tbb::task_arena(tbb::task_arena::attach());
0274 }
0275 if (!my_task_arena->is_active())
0276 my_task_arena->initialize();
0277 __TBB_ASSERT(my_task_arena->is_active(), "task arena is not active");
0278 }
0279
0280 public:
0281
0282 graph();
0283
0284
0285 explicit graph(tbb::task_group_context& use_this_context);
0286
0287
0288
0289 ~graph();
0290
0291 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
0292 void set_name(const char *name);
0293 #endif
0294
0295 __TBB_DEPRECATED void increment_wait_count() {
0296 reserve_wait();
0297 }
0298
0299 __TBB_DEPRECATED void decrement_wait_count() {
0300 release_wait();
0301 }
0302
0303
0304
0305
0306 void reserve_wait() __TBB_override;
0307
0308
0309
0310
0311 void release_wait() __TBB_override;
0312
0313
0314
0315
0316 template< typename Receiver, typename Body >
0317 __TBB_DEPRECATED void run(Receiver &r, Body body) {
0318 if (tbb::flow::interface11::internal::is_graph_active(*this)) {
0319 task* rtask = new (task::allocate_additional_child_of(*root_task()))
0320 run_and_put_task< Receiver, Body >(r, body);
0321 my_task_arena->execute(spawn_functor(*rtask));
0322 }
0323 }
0324
0325
0326
0327
0328 template< typename Body >
0329 __TBB_DEPRECATED void run(Body body) {
0330 if (tbb::flow::interface11::internal::is_graph_active(*this)) {
0331 task* rtask = new (task::allocate_additional_child_of(*root_task())) run_task< Body >(body);
0332 my_task_arena->execute(spawn_functor(*rtask));
0333 }
0334 }
0335
0336
0337
0338 void wait_for_all() {
0339 cancelled = false;
0340 caught_exception = false;
0341 if (my_root_task) {
0342 #if TBB_USE_EXCEPTIONS
0343 try {
0344 #endif
0345 my_task_arena->execute(wait_functor(my_root_task));
0346 #if __TBB_TASK_GROUP_CONTEXT
0347 cancelled = my_context->is_group_execution_cancelled();
0348 #endif
0349 #if TBB_USE_EXCEPTIONS
0350 }
0351 catch (...) {
0352 my_root_task->set_ref_count(1);
0353 my_context->reset();
0354 caught_exception = true;
0355 cancelled = true;
0356 throw;
0357 }
0358 #endif
0359 #if __TBB_TASK_GROUP_CONTEXT
0360
0361
0362
0363 if (!(my_context->traits() & tbb::task_group_context::concurrent_wait)) {
0364 my_context->reset();
0365 #endif
0366 my_root_task->set_ref_count(1);
0367 #if __TBB_TASK_GROUP_CONTEXT
0368 }
0369 #endif
0370 }
0371 }
0372
0373
0374 __TBB_DEPRECATED tbb::task * root_task() {
0375 return my_root_task;
0376 }
0377
0378
0379 template<typename C, typename N>
0380 friend class tbb::flow::interface11::graph_iterator;
0381
0382
0383 typedef tbb::flow::interface11::graph_iterator<graph, tbb::flow::interface11::graph_node> iterator;
0384 typedef tbb::flow::interface11::graph_iterator<const graph, const tbb::flow::interface11::graph_node> const_iterator;
0385
0386
0387
0388 iterator begin();
0389
0390 iterator end();
0391
0392 const_iterator begin() const;
0393
0394 const_iterator end() const;
0395
0396 const_iterator cbegin() const;
0397
0398 const_iterator cend() const;
0399
0400
0401 bool is_cancelled() { return cancelled; }
0402 bool exception_thrown() { return caught_exception; }
0403
0404
0405 void reset(tbb::flow::interface11::reset_flags f = tbb::flow::interface11::rf_reset_protocol);
0406
0407 private:
0408 tbb::task *my_root_task;
0409 #if __TBB_TASK_GROUP_CONTEXT
0410 tbb::task_group_context *my_context;
0411 #endif
0412 bool own_context;
0413 bool cancelled;
0414 bool caught_exception;
0415 bool my_is_active;
0416 task_list_type my_reset_task_list;
0417
0418 tbb::flow::interface11::graph_node *my_nodes, *my_nodes_last;
0419
0420 tbb::spin_mutex nodelist_mutex;
0421 void register_node(tbb::flow::interface11::graph_node *n);
0422 void remove_node(tbb::flow::interface11::graph_node *n);
0423
0424 tbb::task_arena* my_task_arena;
0425
0426 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
0427 tbb::flow::interface11::internal::graph_task_priority_queue_t my_priority_queue;
0428 #endif
0429
0430 friend void tbb::flow::interface11::internal::activate_graph(graph& g);
0431 friend void tbb::flow::interface11::internal::deactivate_graph(graph& g);
0432 friend bool tbb::flow::interface11::internal::is_graph_active(graph& g);
0433 friend tbb::task& tbb::flow::interface11::internal::prioritize_task(graph& g, tbb::task& arena_task);
0434 friend void tbb::flow::interface11::internal::spawn_in_graph_arena(graph& g, tbb::task& arena_task);
0435 friend void tbb::flow::interface11::internal::enqueue_in_graph_arena(graph &g, tbb::task& arena_task);
0436 friend void tbb::flow::interface11::internal::add_task_to_graph_reset_list(graph& g, tbb::task *tp);
0437
0438 friend class tbb::interface7::internal::task_arena_base;
0439
0440 };
0441 }
0442
0443 namespace interface11 {
0444
0445 using tbb::flow::interface10::graph;
0446
0447 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0448 namespace internal{
0449 class get_graph_helper;
0450 }
0451 #endif
0452
0453
0454 class graph_node : tbb::internal::no_copy {
0455 friend class graph;
0456 template<typename C, typename N>
0457 friend class graph_iterator;
0458
0459 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0460 friend class internal::get_graph_helper;
0461 #endif
0462
0463 protected:
0464 graph& my_graph;
0465 graph_node *next, *prev;
0466 public:
0467 explicit graph_node(graph& g);
0468
0469 virtual ~graph_node();
0470
0471 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
0472 virtual void set_name(const char *name) = 0;
0473 #endif
0474
0475 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0476 virtual void extract() = 0;
0477 #endif
0478
0479 protected:
0480
0481 virtual void reset_node(reset_flags f = rf_reset_protocol) = 0;
0482 };
0483
0484 namespace internal {
0485
0486 inline void activate_graph(graph& g) {
0487 g.my_is_active = true;
0488 }
0489
0490 inline void deactivate_graph(graph& g) {
0491 g.my_is_active = false;
0492 }
0493
0494 inline bool is_graph_active(graph& g) {
0495 return g.my_is_active;
0496 }
0497
0498 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
0499 inline tbb::task& prioritize_task(graph& g, tbb::task& t) {
0500 task* critical_task = &t;
0501
0502 graph_task* gt = static_cast<graph_task*>(&t);
0503 if( gt->priority != no_priority ) {
0504
0505
0506
0507
0508 critical_task = new( gt->allocate_continuation() ) priority_task_selector(g.my_priority_queue);
0509 tbb::internal::make_critical( *critical_task );
0510 g.my_priority_queue.push(gt);
0511 }
0512 return *critical_task;
0513 }
0514 #else
0515 inline tbb::task& prioritize_task(graph&, tbb::task& t) {
0516 return t;
0517 }
0518 #endif
0519
0520
0521 inline void spawn_in_graph_arena(graph& g, tbb::task& arena_task) {
0522 if (is_graph_active(g)) {
0523 graph::spawn_functor s_fn(prioritize_task(g, arena_task));
0524 __TBB_ASSERT(g.my_task_arena && g.my_task_arena->is_active(), NULL);
0525 g.my_task_arena->execute(s_fn);
0526 }
0527 }
0528
0529
0530 inline void enqueue_in_graph_arena(graph &g, tbb::task& arena_task) {
0531 if (is_graph_active(g)) {
0532 __TBB_ASSERT( g.my_task_arena && g.my_task_arena->is_active(), "Is graph's arena initialized and active?" );
0533 task::enqueue(prioritize_task(g, arena_task), *g.my_task_arena);
0534 }
0535 }
0536
0537 inline void add_task_to_graph_reset_list(graph& g, tbb::task *tp) {
0538 g.my_reset_task_list.push_back(tp);
0539 }
0540
0541 }
0542
0543 }
0544 }
0545 }
0546
0547 #endif