File indexing completed on 2025-12-18 10:24:25
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB_flow_graph_H
0018 #define __TBB_flow_graph_H
0019
0020 #include <atomic>
0021 #include <memory>
0022 #include <type_traits>
0023
0024 #include "detail/_config.h"
0025 #include "detail/_namespace_injection.h"
0026 #include "spin_mutex.h"
0027 #include "null_mutex.h"
0028 #include "spin_rw_mutex.h"
0029 #include "null_rw_mutex.h"
0030 #include "detail/_pipeline_filters.h"
0031 #include "detail/_task.h"
0032 #include "detail/_small_object_pool.h"
0033 #include "cache_aligned_allocator.h"
0034 #include "detail/_exception.h"
0035 #include "detail/_template_helpers.h"
0036 #include "detail/_aggregator.h"
0037 #include "detail/_allocator_traits.h"
0038 #include "detail/_utils.h"
0039 #include "profiling.h"
0040 #include "task_arena.h"
0041
0042 #if TBB_USE_PROFILING_TOOLS && ( __unix__ || __APPLE__ )
0043 #if __INTEL_COMPILER
0044
0045 #pragma warning (push)
0046 #pragma warning( disable: 2196 )
0047 #endif
0048 #define __TBB_NOINLINE_SYM __attribute__((noinline))
0049 #else
0050 #define __TBB_NOINLINE_SYM
0051 #endif
0052
0053 #include <tuple>
0054 #include <list>
0055 #include <forward_list>
0056 #include <queue>
0057 #if __TBB_CPP20_CONCEPTS_PRESENT
0058 #include <concepts>
0059 #endif
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071 namespace tbb {
0072 namespace detail {
0073
0074 namespace d2 {
0075
0076
0077 enum concurrency { unlimited = 0, serial = 1 };
0078
0079
0080 struct null_type {};
0081
0082
0083 class continue_msg {};
0084
0085 }
0086
0087 #if __TBB_CPP20_CONCEPTS_PRESENT
0088 namespace d0 {
0089
0090 template <typename ReturnType, typename OutputType>
0091 concept node_body_return_type = std::same_as<OutputType, tbb::detail::d2::continue_msg> ||
0092 std::convertible_to<OutputType, ReturnType>;
0093
0094
0095 template <typename Body, typename Output>
0096 concept continue_node_body = std::copy_constructible<Body> &&
0097 requires( Body& body, const tbb::detail::d2::continue_msg& v ) {
0098 { body(v) } -> node_body_return_type<Output>;
0099 };
0100
0101 template <typename Body, typename Input, typename Output>
0102 concept function_node_body = std::copy_constructible<Body> &&
0103 std::invocable<Body&, const Input&> &&
0104 node_body_return_type<std::invoke_result_t<Body&, const Input&>, Output>;
0105
0106 template <typename FunctionObject, typename Input, typename Key>
0107 concept join_node_function_object = std::copy_constructible<FunctionObject> &&
0108 std::invocable<FunctionObject&, const Input&> &&
0109 std::convertible_to<std::invoke_result_t<FunctionObject&, const Input&>, Key>;
0110
0111 template <typename Body, typename Output>
0112 concept input_node_body = std::copy_constructible<Body> &&
0113 requires( Body& body, tbb::detail::d1::flow_control& fc ) {
0114 { body(fc) } -> adaptive_same_as<Output>;
0115 };
0116
0117 template <typename Body, typename Input, typename OutputPortsType>
0118 concept multifunction_node_body = std::copy_constructible<Body> &&
0119 std::invocable<Body&, const Input&, OutputPortsType&>;
0120
0121 template <typename Sequencer, typename Value>
0122 concept sequencer = std::copy_constructible<Sequencer> &&
0123 std::invocable<Sequencer&, const Value&> &&
0124 std::convertible_to<std::invoke_result_t<Sequencer&, const Value&>, std::size_t>;
0125
0126 template <typename Body, typename Input, typename GatewayType>
0127 concept async_node_body = std::copy_constructible<Body> &&
0128 std::invocable<Body&, const Input&, GatewayType&>;
0129
0130 }
0131 #endif
0132
0133 namespace d2 {
0134
0135
0136 template< typename T > class sender;
0137 template< typename T > class receiver;
0138 class continue_receiver;
0139
0140 template< typename T, typename U > class limiter_node;
0141
0142 template<typename T, typename M> class successor_cache;
0143 template<typename T, typename M> class broadcast_cache;
0144 template<typename T, typename M> class round_robin_cache;
0145 template<typename T, typename M> class predecessor_cache;
0146 template<typename T, typename M> class reservable_predecessor_cache;
0147
0148 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0149 namespace order {
0150 struct following;
0151 struct preceding;
0152 }
0153 template<typename Order, typename... Args> struct node_set;
0154 #endif
0155
0156
0157 }
0158 }
0159 }
0160
0161
0162 #include "detail/_flow_graph_impl.h"
0163
0164 namespace tbb {
0165 namespace detail {
0166 namespace d2 {
0167
0168 static inline std::pair<graph_task*, graph_task*> order_tasks(graph_task* first, graph_task* second) {
0169 if (second->priority > first->priority)
0170 return std::make_pair(second, first);
0171 return std::make_pair(first, second);
0172 }
0173
0174
0175 static inline graph_task* combine_tasks(graph& g, graph_task* left, graph_task* right) {
0176
0177 if (right == nullptr) return left;
0178
0179 if (left == nullptr) return right;
0180 if (left == SUCCESSFULLY_ENQUEUED) return right;
0181
0182 if (right != SUCCESSFULLY_ENQUEUED) {
0183
0184 auto tasks_pair = order_tasks(left, right);
0185 spawn_in_graph_arena(g, *tasks_pair.first);
0186 return tasks_pair.second;
0187 }
0188 return left;
0189 }
0190
0191 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0192 class message_metainfo {
0193 public:
0194 using waiters_type = std::forward_list<d1::wait_context_vertex*>;
0195
0196 message_metainfo() = default;
0197
0198 message_metainfo(const waiters_type& waiters) : my_waiters(waiters) {}
0199 message_metainfo(waiters_type&& waiters) : my_waiters(std::move(waiters)) {}
0200
0201 const waiters_type& waiters() const & { return my_waiters; }
0202 waiters_type&& waiters() && { return std::move(my_waiters); }
0203
0204 bool empty() const { return my_waiters.empty(); }
0205
0206 void merge(const message_metainfo& other) {
0207
0208 my_waiters.insert_after(my_waiters.before_begin(),
0209 other.waiters().begin(),
0210 other.waiters().end());
0211 }
0212 private:
0213 waiters_type my_waiters;
0214 };
0215
0216 #define __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo) , metainfo
0217
0218 #else
0219 #define __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)
0220 #endif
0221
0222
0223 template< typename T >
0224 class sender {
0225 public:
0226 virtual ~sender() {}
0227
0228
0229 virtual bool try_get( T & ) { return false; }
0230
0231 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0232 virtual bool try_get( T &, message_metainfo& ) { return false; }
0233 #endif
0234
0235
0236 virtual bool try_reserve( T & ) { return false; }
0237
0238 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0239 virtual bool try_reserve( T &, message_metainfo& ) { return false; }
0240 #endif
0241
0242
0243 virtual bool try_release( ) { return false; }
0244
0245
0246 virtual bool try_consume( ) { return false; }
0247
0248 protected:
0249
0250 typedef T output_type;
0251
0252
0253 typedef receiver<T> successor_type;
0254
0255
0256 virtual bool register_successor( successor_type &r ) = 0;
0257
0258
0259 virtual bool remove_successor( successor_type &r ) = 0;
0260
0261 template<typename C>
0262 friend bool register_successor(sender<C>& s, receiver<C>& r);
0263
0264 template<typename C>
0265 friend bool remove_successor (sender<C>& s, receiver<C>& r);
0266 };
0267
0268 template<typename C>
0269 bool register_successor(sender<C>& s, receiver<C>& r) {
0270 return s.register_successor(r);
0271 }
0272
0273 template<typename C>
0274 bool remove_successor(sender<C>& s, receiver<C>& r) {
0275 return s.remove_successor(r);
0276 }
0277
0278
0279 template< typename T >
0280 class receiver {
0281 private:
0282 template <typename... TryPutTaskArgs>
0283 bool internal_try_put(const T& t, TryPutTaskArgs&&... args) {
0284 graph_task* res = try_put_task(t, std::forward<TryPutTaskArgs>(args)...);
0285 if (!res) return false;
0286 if (res != SUCCESSFULLY_ENQUEUED) spawn_in_graph_arena(graph_reference(), *res);
0287 return true;
0288 }
0289
0290 public:
0291
0292 virtual ~receiver() {}
0293
0294
0295 bool try_put( const T& t ) {
0296 return internal_try_put(t);
0297 }
0298
0299 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0300
0301 bool try_put_and_wait( const T& t ) {
0302
0303 d1::wait_context_vertex msg_wait_vertex{};
0304
0305 bool res = internal_try_put(t, message_metainfo{message_metainfo::waiters_type{&msg_wait_vertex}});
0306 if (res) {
0307 __TBB_ASSERT(graph_reference().my_context != nullptr, "No wait_context associated with the Flow Graph");
0308 wait(msg_wait_vertex.get_context(), *graph_reference().my_context);
0309 }
0310 return res;
0311 }
0312 #endif
0313
0314
0315 protected:
0316
0317 typedef T input_type;
0318
0319
0320 typedef sender<T> predecessor_type;
0321
0322 template< typename R, typename B > friend class run_and_put_task;
0323 template< typename X, typename Y > friend class broadcast_cache;
0324 template< typename X, typename Y > friend class round_robin_cache;
0325 virtual graph_task *try_put_task(const T& t) = 0;
0326 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0327 virtual graph_task *try_put_task(const T& t, const message_metainfo&) = 0;
0328 #endif
0329 virtual graph& graph_reference() const = 0;
0330
0331 template<typename TT, typename M> friend class successor_cache;
0332 virtual bool is_continue_receiver() { return false; }
0333
0334
0335 virtual node_priority_t priority() const { return no_priority; }
0336
0337
0338 virtual bool register_predecessor( predecessor_type & ) { return false; }
0339
0340
0341 virtual bool remove_predecessor( predecessor_type & ) { return false; }
0342
0343 template <typename C>
0344 friend bool register_predecessor(receiver<C>& r, sender<C>& s);
0345 template <typename C>
0346 friend bool remove_predecessor (receiver<C>& r, sender<C>& s);
0347 };
0348
0349 template <typename C>
0350 bool register_predecessor(receiver<C>& r, sender<C>& s) {
0351 return r.register_predecessor(s);
0352 }
0353
0354 template <typename C>
0355 bool remove_predecessor(receiver<C>& r, sender<C>& s) {
0356 return r.remove_predecessor(s);
0357 }
0358
0359
0360
0361 class continue_receiver : public receiver< continue_msg > {
0362 protected:
0363
0364
0365 explicit continue_receiver( int number_of_predecessors, node_priority_t a_priority ) {
0366 my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
0367 my_current_count = 0;
0368 my_priority = a_priority;
0369 }
0370
0371
0372 continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
0373 my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
0374 my_current_count = 0;
0375 my_priority = src.my_priority;
0376 }
0377
0378
0379 bool register_predecessor( predecessor_type & ) override {
0380 spin_mutex::scoped_lock l(my_mutex);
0381 ++my_predecessor_count;
0382 return true;
0383 }
0384
0385
0386
0387
0388
0389 bool remove_predecessor( predecessor_type & ) override {
0390 spin_mutex::scoped_lock l(my_mutex);
0391 --my_predecessor_count;
0392 return true;
0393 }
0394
0395
0396 typedef continue_msg input_type;
0397
0398
0399 typedef receiver<input_type>::predecessor_type predecessor_type;
0400
0401 template< typename R, typename B > friend class run_and_put_task;
0402 template<typename X, typename Y> friend class broadcast_cache;
0403 template<typename X, typename Y> friend class round_robin_cache;
0404
0405 private:
0406
0407 graph_task* try_put_task_impl( const input_type& __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) {
0408 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0409 message_metainfo predecessor_metainfo;
0410 #endif
0411 {
0412 spin_mutex::scoped_lock l(my_mutex);
0413 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0414
0415 for (auto waiter : metainfo.waiters()) {
0416 waiter->reserve(1);
0417 }
0418 my_current_metainfo.merge(metainfo);
0419 #endif
0420 if ( ++my_current_count < my_predecessor_count )
0421 return SUCCESSFULLY_ENQUEUED;
0422 else {
0423 my_current_count = 0;
0424 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0425 predecessor_metainfo = my_current_metainfo;
0426 my_current_metainfo = message_metainfo{};
0427 #endif
0428 }
0429 }
0430 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0431 graph_task* res = execute(predecessor_metainfo);
0432 for (auto waiter : predecessor_metainfo.waiters()) {
0433 waiter->release(1);
0434 }
0435 #else
0436 graph_task* res = execute();
0437 #endif
0438 return res? res : SUCCESSFULLY_ENQUEUED;
0439 }
0440
0441 protected:
0442 graph_task* try_put_task( const input_type& input ) override {
0443 return try_put_task_impl(input __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
0444 }
0445
0446 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0447 graph_task* try_put_task( const input_type& input, const message_metainfo& metainfo ) override {
0448 return try_put_task_impl(input, metainfo);
0449 }
0450 #endif
0451
0452 spin_mutex my_mutex;
0453 int my_predecessor_count;
0454 int my_current_count;
0455 int my_initial_predecessor_count;
0456 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0457 message_metainfo my_current_metainfo;
0458 #endif
0459 node_priority_t my_priority;
0460
0461
0462 template<typename U, typename V> friend class limiter_node;
0463
0464 virtual void reset_receiver( reset_flags f ) {
0465 my_current_count = 0;
0466 if (f & rf_clear_edges) {
0467 my_predecessor_count = my_initial_predecessor_count;
0468 }
0469 }
0470
0471
0472
0473
0474 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0475 virtual graph_task* execute(const message_metainfo& metainfo) = 0;
0476 #else
0477 virtual graph_task* execute() = 0;
0478 #endif
0479 template<typename TT, typename M> friend class successor_cache;
0480 bool is_continue_receiver() override { return true; }
0481
0482 node_priority_t priority() const override { return my_priority; }
0483 };
0484
0485 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
0486 template <typename K, typename T>
0487 K key_from_message( const T &t ) {
0488 return t.key();
0489 }
0490 #endif
0491
0492 }
0493 }
0494 }
0495
0496 #include "detail/_flow_graph_trace_impl.h"
0497 #include "detail/_hash_compare.h"
0498
0499 namespace tbb {
0500 namespace detail {
0501 namespace d2 {
0502
0503 #include "detail/_flow_graph_body_impl.h"
0504 #include "detail/_flow_graph_cache_impl.h"
0505 #include "detail/_flow_graph_types_impl.h"
0506
0507 using namespace graph_policy_namespace;
0508
0509 template <typename C, typename N>
0510 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(nullptr)
0511 {
0512 if (begin) current_node = my_graph->my_nodes;
0513
0514 }
0515
0516 template <typename C, typename N>
0517 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
0518 __TBB_ASSERT(current_node, "graph_iterator at end");
0519 return *operator->();
0520 }
0521
0522 template <typename C, typename N>
0523 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
0524 return current_node;
0525 }
0526
0527 template <typename C, typename N>
0528 void graph_iterator<C,N>::internal_forward() {
0529 if (current_node) current_node = current_node->next;
0530 }
0531
0532
0533 inline graph::graph() : my_wait_context_vertex(0), my_nodes(nullptr), my_nodes_last(nullptr), my_task_arena(nullptr) {
0534 prepare_task_arena();
0535 own_context = true;
0536 cancelled = false;
0537 caught_exception = false;
0538 my_context = new (r1::cache_aligned_allocate(sizeof(task_group_context))) task_group_context(FLOW_TASKS);
0539 fgt_graph(this);
0540 my_is_active = true;
0541 }
0542
0543 inline graph::graph(task_group_context& use_this_context) :
0544 my_wait_context_vertex(0), my_context(&use_this_context), my_nodes(nullptr), my_nodes_last(nullptr), my_task_arena(nullptr) {
0545 prepare_task_arena();
0546 own_context = false;
0547 cancelled = false;
0548 caught_exception = false;
0549 fgt_graph(this);
0550 my_is_active = true;
0551 }
0552
0553 inline graph::~graph() {
0554 wait_for_all();
0555 if (own_context) {
0556 my_context->~task_group_context();
0557 r1::cache_aligned_deallocate(my_context);
0558 }
0559 delete my_task_arena;
0560 }
0561
0562 inline void graph::reserve_wait() {
0563 my_wait_context_vertex.reserve();
0564 fgt_reserve_wait(this);
0565 }
0566
0567 inline void graph::release_wait() {
0568 fgt_release_wait(this);
0569 my_wait_context_vertex.release();
0570 }
0571
0572 inline void graph::register_node(graph_node *n) {
0573 n->next = nullptr;
0574 {
0575 spin_mutex::scoped_lock lock(nodelist_mutex);
0576 n->prev = my_nodes_last;
0577 if (my_nodes_last) my_nodes_last->next = n;
0578 my_nodes_last = n;
0579 if (!my_nodes) my_nodes = n;
0580 }
0581 }
0582
0583 inline void graph::remove_node(graph_node *n) {
0584 {
0585 spin_mutex::scoped_lock lock(nodelist_mutex);
0586 __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
0587 if (n->prev) n->prev->next = n->next;
0588 if (n->next) n->next->prev = n->prev;
0589 if (my_nodes_last == n) my_nodes_last = n->prev;
0590 if (my_nodes == n) my_nodes = n->next;
0591 }
0592 n->prev = n->next = nullptr;
0593 }
0594
0595 inline void graph::reset( reset_flags f ) {
0596
0597 deactivate_graph(*this);
0598
0599 my_context->reset();
0600 cancelled = false;
0601 caught_exception = false;
0602
0603 for(iterator ii = begin(); ii != end(); ++ii) {
0604 graph_node *my_p = &(*ii);
0605 my_p->reset_node(f);
0606 }
0607
0608
0609 prepare_task_arena( true );
0610 activate_graph(*this);
0611 }
0612
0613 inline void graph::cancel() {
0614 my_context->cancel_group_execution();
0615 }
0616
0617 inline graph::iterator graph::begin() { return iterator(this, true); }
0618
0619 inline graph::iterator graph::end() { return iterator(this, false); }
0620
0621 inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
0622
0623 inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
0624
0625 inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
0626
0627 inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
0628
0629 inline graph_node::graph_node(graph& g) : my_graph(g) {
0630 my_graph.register_node(this);
0631 }
0632
0633 inline graph_node::~graph_node() {
0634 my_graph.remove_node(this);
0635 }
0636
0637 #include "detail/_flow_graph_node_impl.h"
0638
0639
0640
0641
0642 template < typename Output >
0643 __TBB_requires(std::copyable<Output>)
0644 class input_node : public graph_node, public sender< Output > {
0645 public:
0646
0647 typedef Output output_type;
0648
0649
0650 typedef typename sender<output_type>::successor_type successor_type;
0651
0652
0653 typedef null_type input_type;
0654
0655
0656 template< typename Body >
0657 __TBB_requires(input_node_body<Body, Output>)
0658 __TBB_NOINLINE_SYM input_node( graph &g, Body body )
0659 : graph_node(g), my_active(false)
0660 , my_body( new input_body_leaf< output_type, Body>(body) )
0661 , my_init_body( new input_body_leaf< output_type, Body>(body) )
0662 , my_successors(this), my_reserved(false), my_has_cached_item(false)
0663 {
0664 fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph,
0665 static_cast<sender<output_type> *>(this), this->my_body);
0666 }
0667
0668 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0669 template <typename Body, typename... Successors>
0670 __TBB_requires(input_node_body<Body, Output>)
0671 input_node( const node_set<order::preceding, Successors...>& successors, Body body )
0672 : input_node(successors.graph_reference(), body)
0673 {
0674 make_edges(*this, successors);
0675 }
0676 #endif
0677
0678
0679 __TBB_NOINLINE_SYM input_node( const input_node& src )
0680 : graph_node(src.my_graph), sender<Output>()
0681 , my_active(false)
0682 , my_body(src.my_init_body->clone()), my_init_body(src.my_init_body->clone())
0683 , my_successors(this), my_reserved(false), my_has_cached_item(false)
0684 {
0685 fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph,
0686 static_cast<sender<output_type> *>(this), this->my_body);
0687 }
0688
0689
0690 ~input_node() { delete my_body; delete my_init_body; }
0691
0692
0693 bool register_successor( successor_type &r ) override {
0694 spin_mutex::scoped_lock lock(my_mutex);
0695 my_successors.register_successor(r);
0696 if ( my_active )
0697 spawn_put();
0698 return true;
0699 }
0700
0701
0702 bool remove_successor( successor_type &r ) override {
0703 spin_mutex::scoped_lock lock(my_mutex);
0704 my_successors.remove_successor(r);
0705 return true;
0706 }
0707
0708
0709 bool try_get( output_type &v ) override {
0710 spin_mutex::scoped_lock lock(my_mutex);
0711 if ( my_reserved )
0712 return false;
0713
0714 if ( my_has_cached_item ) {
0715 v = my_cached_item;
0716 my_has_cached_item = false;
0717 return true;
0718 }
0719
0720
0721 if ( my_active )
0722 spawn_put();
0723 return false;
0724 }
0725
0726
0727 bool try_reserve( output_type &v ) override {
0728 spin_mutex::scoped_lock lock(my_mutex);
0729 if ( my_reserved ) {
0730 return false;
0731 }
0732
0733 if ( my_has_cached_item ) {
0734 v = my_cached_item;
0735 my_reserved = true;
0736 return true;
0737 } else {
0738 return false;
0739 }
0740 }
0741
0742 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0743 private:
0744 bool try_reserve( output_type& v, message_metainfo& ) override {
0745 return try_reserve(v);
0746 }
0747
0748 bool try_get( output_type& v, message_metainfo& ) override {
0749 return try_get(v);
0750 }
0751 public:
0752 #endif
0753
0754
0755
0756 bool try_release( ) override {
0757 spin_mutex::scoped_lock lock(my_mutex);
0758 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
0759 my_reserved = false;
0760 if(!my_successors.empty())
0761 spawn_put();
0762 return true;
0763 }
0764
0765
0766 bool try_consume( ) override {
0767 spin_mutex::scoped_lock lock(my_mutex);
0768 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
0769 my_reserved = false;
0770 my_has_cached_item = false;
0771 if ( !my_successors.empty() ) {
0772 spawn_put();
0773 }
0774 return true;
0775 }
0776
0777
0778 void activate() {
0779 spin_mutex::scoped_lock lock(my_mutex);
0780 my_active = true;
0781 if (!my_successors.empty())
0782 spawn_put();
0783 }
0784
0785 template<typename Body>
0786 Body copy_function_object() {
0787 input_body<output_type> &body_ref = *this->my_body;
0788 return dynamic_cast< input_body_leaf<output_type, Body> & >(body_ref).get_body();
0789 }
0790
0791 protected:
0792
0793
0794 void reset_node( reset_flags f) override {
0795 my_active = false;
0796 my_reserved = false;
0797 my_has_cached_item = false;
0798
0799 if(f & rf_clear_edges) my_successors.clear();
0800 if(f & rf_reset_bodies) {
0801 input_body<output_type> *tmp = my_init_body->clone();
0802 delete my_body;
0803 my_body = tmp;
0804 }
0805 }
0806
0807 private:
0808 spin_mutex my_mutex;
0809 bool my_active;
0810 input_body<output_type> *my_body;
0811 input_body<output_type> *my_init_body;
0812 broadcast_cache< output_type > my_successors;
0813 bool my_reserved;
0814 bool my_has_cached_item;
0815 output_type my_cached_item;
0816
0817
0818 bool try_reserve_apply_body(output_type &v) {
0819 spin_mutex::scoped_lock lock(my_mutex);
0820 if ( my_reserved ) {
0821 return false;
0822 }
0823 if ( !my_has_cached_item ) {
0824 d1::flow_control control;
0825
0826 fgt_begin_body( my_body );
0827
0828 my_cached_item = (*my_body)(control);
0829 my_has_cached_item = !control.is_pipeline_stopped;
0830
0831 fgt_end_body( my_body );
0832 }
0833 if ( my_has_cached_item ) {
0834 v = my_cached_item;
0835 my_reserved = true;
0836 return true;
0837 } else {
0838 return false;
0839 }
0840 }
0841
0842 graph_task* create_put_task() {
0843 d1::small_object_allocator allocator{};
0844 typedef input_node_task_bypass< input_node<output_type> > task_type;
0845 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
0846 return t;
0847 }
0848
0849
0850 void spawn_put( ) {
0851 if(is_graph_active(this->my_graph)) {
0852 spawn_in_graph_arena(this->my_graph, *create_put_task());
0853 }
0854 }
0855
0856 friend class input_node_task_bypass< input_node<output_type> >;
0857
0858 graph_task* apply_body_bypass( ) {
0859 output_type v;
0860 if ( !try_reserve_apply_body(v) )
0861 return nullptr;
0862
0863 graph_task *last_task = my_successors.try_put_task(v);
0864 if ( last_task )
0865 try_consume();
0866 else
0867 try_release();
0868 return last_task;
0869 }
0870 };
0871
0872
0873 template<typename Input, typename Output = continue_msg, typename Policy = queueing>
0874 __TBB_requires(std::default_initializable<Input> &&
0875 std::copy_constructible<Input> &&
0876 std::copy_constructible<Output>)
0877 class function_node
0878 : public graph_node
0879 , public function_input< Input, Output, Policy, cache_aligned_allocator<Input> >
0880 , public function_output<Output>
0881 {
0882 typedef cache_aligned_allocator<Input> internals_allocator;
0883
0884 public:
0885 typedef Input input_type;
0886 typedef Output output_type;
0887 typedef function_input<input_type,output_type,Policy,internals_allocator> input_impl_type;
0888 typedef function_input_queue<input_type, internals_allocator> input_queue_type;
0889 typedef function_output<output_type> fOutput_type;
0890 typedef typename input_impl_type::predecessor_type predecessor_type;
0891 typedef typename fOutput_type::successor_type successor_type;
0892
0893 using input_impl_type::my_predecessors;
0894
0895
0896
0897
0898
0899 template< typename Body >
0900 __TBB_requires(function_node_body<Body, Input, Output>)
0901 __TBB_NOINLINE_SYM function_node( graph &g, size_t concurrency,
0902 Body body, Policy = Policy(), node_priority_t a_priority = no_priority )
0903 : graph_node(g), input_impl_type(g, concurrency, body, a_priority),
0904 fOutput_type(g) {
0905 fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph,
0906 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
0907 }
0908
0909 template <typename Body>
0910 __TBB_requires(function_node_body<Body, Input, Output>)
0911 function_node( graph& g, size_t concurrency, Body body, node_priority_t a_priority )
0912 : function_node(g, concurrency, body, Policy(), a_priority) {}
0913
0914 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0915 template <typename Body, typename... Args>
0916 __TBB_requires(function_node_body<Body, Input, Output>)
0917 function_node( const node_set<Args...>& nodes, size_t concurrency, Body body,
0918 Policy p = Policy(), node_priority_t a_priority = no_priority )
0919 : function_node(nodes.graph_reference(), concurrency, body, p, a_priority) {
0920 make_edges_in_order(nodes, *this);
0921 }
0922
0923 template <typename Body, typename... Args>
0924 __TBB_requires(function_node_body<Body, Input, Output>)
0925 function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority )
0926 : function_node(nodes, concurrency, body, Policy(), a_priority) {}
0927 #endif
0928
0929
0930 __TBB_NOINLINE_SYM function_node( const function_node& src ) :
0931 graph_node(src.my_graph),
0932 input_impl_type(src),
0933 fOutput_type(src.my_graph) {
0934 fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph,
0935 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
0936 }
0937
0938 protected:
0939 template< typename R, typename B > friend class run_and_put_task;
0940 template<typename X, typename Y> friend class broadcast_cache;
0941 template<typename X, typename Y> friend class round_robin_cache;
0942 using input_impl_type::try_put_task;
0943
0944 broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; }
0945
0946 void reset_node(reset_flags f) override {
0947 input_impl_type::reset_function_input(f);
0948
0949 if(f & rf_clear_edges) {
0950 successors().clear();
0951 my_predecessors.clear();
0952 }
0953 __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
0954 __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
0955 }
0956
0957 };
0958
0959
0960
0961 template<typename Input, typename Output, typename Policy = queueing>
0962 __TBB_requires(std::default_initializable<Input> &&
0963 std::copy_constructible<Input>)
0964 class multifunction_node :
0965 public graph_node,
0966 public multifunction_input
0967 <
0968 Input,
0969 typename wrap_tuple_elements<
0970 std::tuple_size<Output>::value,
0971 multifunction_output,
0972 Output
0973 >::type,
0974 Policy,
0975 cache_aligned_allocator<Input>
0976 >
0977 {
0978 typedef cache_aligned_allocator<Input> internals_allocator;
0979
0980 protected:
0981 static const int N = std::tuple_size<Output>::value;
0982 public:
0983 typedef Input input_type;
0984 typedef null_type output_type;
0985 typedef typename wrap_tuple_elements<N,multifunction_output, Output>::type output_ports_type;
0986 typedef multifunction_input<
0987 input_type, output_ports_type, Policy, internals_allocator> input_impl_type;
0988 typedef function_input_queue<input_type, internals_allocator> input_queue_type;
0989 private:
0990 using input_impl_type::my_predecessors;
0991 public:
0992 template<typename Body>
0993 __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
0994 __TBB_NOINLINE_SYM multifunction_node(
0995 graph &g, size_t concurrency,
0996 Body body, Policy = Policy(), node_priority_t a_priority = no_priority
0997 ) : graph_node(g), input_impl_type(g, concurrency, body, a_priority) {
0998 fgt_multioutput_node_with_body<N>(
0999 CODEPTR(), FLOW_MULTIFUNCTION_NODE,
1000 &this->my_graph, static_cast<receiver<input_type> *>(this),
1001 this->output_ports(), this->my_body
1002 );
1003 }
1004
1005 template <typename Body>
1006 __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
1007 __TBB_NOINLINE_SYM multifunction_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority)
1008 : multifunction_node(g, concurrency, body, Policy(), a_priority) {}
1009
1010 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1011 template <typename Body, typename... Args>
1012 __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
1013 __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body,
1014 Policy p = Policy(), node_priority_t a_priority = no_priority)
1015 : multifunction_node(nodes.graph_reference(), concurrency, body, p, a_priority) {
1016 make_edges_in_order(nodes, *this);
1017 }
1018
1019 template <typename Body, typename... Args>
1020 __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
1021 __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority)
1022 : multifunction_node(nodes, concurrency, body, Policy(), a_priority) {}
1023 #endif
1024
1025 __TBB_NOINLINE_SYM multifunction_node( const multifunction_node &other) :
1026 graph_node(other.my_graph), input_impl_type(other) {
1027 fgt_multioutput_node_with_body<N>( CODEPTR(), FLOW_MULTIFUNCTION_NODE,
1028 &this->my_graph, static_cast<receiver<input_type> *>(this),
1029 this->output_ports(), this->my_body );
1030 }
1031
1032
1033 protected:
1034 void reset_node(reset_flags f) override { input_impl_type::reset(f); }
1035 };
1036
1037
1038
1039 template<typename TupleType>
1040 class split_node : public graph_node, public receiver<TupleType> {
1041 static const int N = std::tuple_size<TupleType>::value;
1042 typedef receiver<TupleType> base_type;
1043 public:
1044 typedef TupleType input_type;
1045 typedef typename wrap_tuple_elements<
1046 N,
1047 multifunction_output,
1048 TupleType
1049 >::type output_ports_type;
1050
1051 __TBB_NOINLINE_SYM explicit split_node(graph &g)
1052 : graph_node(g),
1053 my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports))
1054 {
1055 fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph,
1056 static_cast<receiver<input_type> *>(this), this->output_ports());
1057 }
1058
1059 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1060 template <typename... Args>
1061 __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) {
1062 make_edges_in_order(nodes, *this);
1063 }
1064 #endif
1065
1066 __TBB_NOINLINE_SYM split_node(const split_node& other)
1067 : graph_node(other.my_graph), base_type(other),
1068 my_output_ports(init_output_ports<output_ports_type>::call(other.my_graph, my_output_ports))
1069 {
1070 fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph,
1071 static_cast<receiver<input_type> *>(this), this->output_ports());
1072 }
1073
1074 output_ports_type &output_ports() { return my_output_ports; }
1075
1076 protected:
1077 graph_task *try_put_task(const TupleType& t) override {
1078
1079
1080 return emit_element<N>::emit_this(this->my_graph, t, output_ports());
1081 }
1082 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1083 graph_task* try_put_task(const TupleType& t, const message_metainfo& metainfo) override {
1084
1085
1086 return emit_element<N>::emit_this(this->my_graph, t, output_ports(), metainfo);
1087 }
1088 #endif
1089
1090 void reset_node(reset_flags f) override {
1091 if (f & rf_clear_edges)
1092 clear_element<N>::clear_this(my_output_ports);
1093
1094 __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "split_node reset failed");
1095 }
1096 graph& graph_reference() const override {
1097 return my_graph;
1098 }
1099
1100 private:
1101 output_ports_type my_output_ports;
1102 };
1103
1104
1105 template <typename Output, typename Policy = Policy<void> >
1106 __TBB_requires(std::copy_constructible<Output>)
1107 class continue_node : public graph_node, public continue_input<Output, Policy>,
1108 public function_output<Output> {
1109 public:
1110 typedef continue_msg input_type;
1111 typedef Output output_type;
1112 typedef continue_input<Output, Policy> input_impl_type;
1113 typedef function_output<output_type> fOutput_type;
1114 typedef typename input_impl_type::predecessor_type predecessor_type;
1115 typedef typename fOutput_type::successor_type successor_type;
1116
1117
1118 template <typename Body >
1119 __TBB_requires(continue_node_body<Body, Output>)
1120 __TBB_NOINLINE_SYM continue_node(
1121 graph &g,
1122 Body body, Policy = Policy(), node_priority_t a_priority = no_priority
1123 ) : graph_node(g), input_impl_type( g, body, a_priority ),
1124 fOutput_type(g) {
1125 fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1126
1127 static_cast<receiver<input_type> *>(this),
1128 static_cast<sender<output_type> *>(this), this->my_body );
1129 }
1130
1131 template <typename Body>
1132 __TBB_requires(continue_node_body<Body, Output>)
1133 continue_node( graph& g, Body body, node_priority_t a_priority )
1134 : continue_node(g, body, Policy(), a_priority) {}
1135
1136 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1137 template <typename Body, typename... Args>
1138 __TBB_requires(continue_node_body<Body, Output>)
1139 continue_node( const node_set<Args...>& nodes, Body body,
1140 Policy p = Policy(), node_priority_t a_priority = no_priority )
1141 : continue_node(nodes.graph_reference(), body, p, a_priority ) {
1142 make_edges_in_order(nodes, *this);
1143 }
1144 template <typename Body, typename... Args>
1145 __TBB_requires(continue_node_body<Body, Output>)
1146 continue_node( const node_set<Args...>& nodes, Body body, node_priority_t a_priority)
1147 : continue_node(nodes, body, Policy(), a_priority) {}
1148 #endif
1149
1150
1151 template <typename Body >
1152 __TBB_requires(continue_node_body<Body, Output>)
1153 __TBB_NOINLINE_SYM continue_node(
1154 graph &g, int number_of_predecessors,
1155 Body body, Policy = Policy(), node_priority_t a_priority = no_priority
1156 ) : graph_node(g)
1157 , input_impl_type(g, number_of_predecessors, body, a_priority),
1158 fOutput_type(g) {
1159 fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1160 static_cast<receiver<input_type> *>(this),
1161 static_cast<sender<output_type> *>(this), this->my_body );
1162 }
1163
1164 template <typename Body>
1165 __TBB_requires(continue_node_body<Body, Output>)
1166 continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t a_priority)
1167 : continue_node(g, number_of_predecessors, body, Policy(), a_priority) {}
1168
1169 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1170 template <typename Body, typename... Args>
1171 __TBB_requires(continue_node_body<Body, Output>)
1172 continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1173 Body body, Policy p = Policy(), node_priority_t a_priority = no_priority )
1174 : continue_node(nodes.graph_reference(), number_of_predecessors, body, p, a_priority) {
1175 make_edges_in_order(nodes, *this);
1176 }
1177
1178 template <typename Body, typename... Args>
1179 __TBB_requires(continue_node_body<Body, Output>)
1180 continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1181 Body body, node_priority_t a_priority )
1182 : continue_node(nodes, number_of_predecessors, body, Policy(), a_priority) {}
1183 #endif
1184
1185
1186 __TBB_NOINLINE_SYM continue_node( const continue_node& src ) :
1187 graph_node(src.my_graph), input_impl_type(src),
1188 function_output<Output>(src.my_graph) {
1189 fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1190 static_cast<receiver<input_type> *>(this),
1191 static_cast<sender<output_type> *>(this), this->my_body );
1192 }
1193
1194 protected:
1195 template< typename R, typename B > friend class run_and_put_task;
1196 template<typename X, typename Y> friend class broadcast_cache;
1197 template<typename X, typename Y> friend class round_robin_cache;
1198 using input_impl_type::try_put_task;
1199 broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; }
1200
1201 void reset_node(reset_flags f) override {
1202 input_impl_type::reset_receiver(f);
1203 if(f & rf_clear_edges)successors().clear();
1204 __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1205 }
1206 };
1207
1208
1209 template <typename T>
1210 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1211 public:
1212 typedef T input_type;
1213 typedef T output_type;
1214 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1215 typedef typename sender<output_type>::successor_type successor_type;
1216 private:
1217 broadcast_cache<input_type> my_successors;
1218 public:
1219
1220 __TBB_NOINLINE_SYM explicit broadcast_node(graph& g) : graph_node(g), my_successors(this) {
1221 fgt_node( CODEPTR(), FLOW_BROADCAST_NODE, &this->my_graph,
1222 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1223 }
1224
1225 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1226 template <typename... Args>
1227 broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) {
1228 make_edges_in_order(nodes, *this);
1229 }
1230 #endif
1231
1232
1233 __TBB_NOINLINE_SYM broadcast_node( const broadcast_node& src ) : broadcast_node(src.my_graph) {}
1234
1235
1236 bool register_successor( successor_type &r ) override {
1237 my_successors.register_successor( r );
1238 return true;
1239 }
1240
1241
1242 bool remove_successor( successor_type &r ) override {
1243 my_successors.remove_successor( r );
1244 return true;
1245 }
1246
1247 private:
1248 graph_task* try_put_task_impl(const T& t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) {
1249 graph_task* new_task = my_successors.try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
1250 if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
1251 return new_task;
1252 }
1253
1254 protected:
1255 template< typename R, typename B > friend class run_and_put_task;
1256 template<typename X, typename Y> friend class broadcast_cache;
1257 template<typename X, typename Y> friend class round_robin_cache;
1258
1259 graph_task* try_put_task(const T& t) override {
1260 return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
1261 }
1262
1263 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1264 graph_task* try_put_task(const T& t, const message_metainfo& metainfo) override {
1265 return try_put_task_impl(t, metainfo);
1266 }
1267 #endif
1268
1269 graph& graph_reference() const override {
1270 return my_graph;
1271 }
1272
1273 void reset_node(reset_flags f) override {
1274 if (f&rf_clear_edges) {
1275 my_successors.clear();
1276 }
1277 __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
1278 }
1279 };
1280
1281
1282 template <typename T>
1283 class buffer_node
1284 : public graph_node
1285 , public reservable_item_buffer< T, cache_aligned_allocator<T> >
1286 , public receiver<T>, public sender<T>
1287 {
1288 typedef cache_aligned_allocator<T> internals_allocator;
1289
1290 public:
1291 typedef T input_type;
1292 typedef T output_type;
1293 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1294 typedef typename sender<output_type>::successor_type successor_type;
1295 typedef buffer_node<T> class_type;
1296
1297 protected:
1298 typedef size_t size_type;
1299 round_robin_cache< T, null_rw_mutex > my_successors;
1300
1301 friend class forward_task_bypass< class_type >;
1302
1303 enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
1304 };
1305
1306
1307 class buffer_operation : public d1::aggregated_operation< buffer_operation > {
1308 public:
1309 char type;
1310 T* elem;
1311 graph_task* ltask;
1312 successor_type *r;
1313 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1314 message_metainfo* metainfo{ nullptr };
1315 #endif
1316
1317 buffer_operation(const T& e, op_type t) : type(char(t))
1318 , elem(const_cast<T*>(&e)) , ltask(nullptr)
1319 , r(nullptr)
1320 {}
1321
1322 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1323 buffer_operation(const T& e, op_type t, const message_metainfo& info)
1324 : type(char(t)), elem(const_cast<T*>(&e)), ltask(nullptr), r(nullptr)
1325 , metainfo(const_cast<message_metainfo*>(&info))
1326 {}
1327
1328 buffer_operation(op_type t, message_metainfo& info)
1329 : type(char(t)), elem(nullptr), ltask(nullptr), r(nullptr), metainfo(&info) {}
1330 #endif
1331 buffer_operation(op_type t) : type(char(t)), elem(nullptr), ltask(nullptr), r(nullptr) {}
1332 };
1333
1334 bool forwarder_busy;
1335 typedef d1::aggregating_functor<class_type, buffer_operation> handler_type;
1336 friend class d1::aggregating_functor<class_type, buffer_operation>;
1337 d1::aggregator< handler_type, buffer_operation> my_aggregator;
1338
1339 virtual void handle_operations(buffer_operation *op_list) {
1340 handle_operations_impl(op_list, this);
1341 }
1342
1343 template<typename derived_type>
1344 void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
1345 __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1346
1347 buffer_operation *tmp = nullptr;
1348 bool try_forwarding = false;
1349 while (op_list) {
1350 tmp = op_list;
1351 op_list = op_list->next;
1352 switch (tmp->type) {
1353 case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
1354 case rem_succ: internal_rem_succ(tmp); break;
1355 case req_item: internal_pop(tmp); break;
1356 case res_item: internal_reserve(tmp); break;
1357 case rel_res: internal_release(tmp); try_forwarding = true; break;
1358 case con_res: internal_consume(tmp); try_forwarding = true; break;
1359 case put_item: try_forwarding = internal_push(tmp); break;
1360 case try_fwd_task: internal_forward_task(tmp); break;
1361 }
1362 }
1363
1364 derived->order();
1365
1366 if (try_forwarding && !forwarder_busy) {
1367 if(is_graph_active(this->my_graph)) {
1368 forwarder_busy = true;
1369 typedef forward_task_bypass<class_type> task_type;
1370 d1::small_object_allocator allocator{};
1371 graph_task* new_task = allocator.new_object<task_type>(graph_reference(), allocator, *this);
1372
1373
1374
1375
1376
1377
1378 graph_task *z = tmp->ltask;
1379 graph &g = this->my_graph;
1380 tmp->ltask = combine_tasks(g, z, new_task);
1381 }
1382 }
1383 }
1384
1385 inline graph_task *grab_forwarding_task( buffer_operation &op_data) {
1386 return op_data.ltask;
1387 }
1388
1389 inline bool enqueue_forwarding_task(buffer_operation &op_data) {
1390 graph_task *ft = grab_forwarding_task(op_data);
1391 if(ft) {
1392 spawn_in_graph_arena(graph_reference(), *ft);
1393 return true;
1394 }
1395 return false;
1396 }
1397
1398
1399 virtual graph_task *forward_task() {
1400 buffer_operation op_data(try_fwd_task);
1401 graph_task *last_task = nullptr;
1402 do {
1403 op_data.status = WAIT;
1404 op_data.ltask = nullptr;
1405 my_aggregator.execute(&op_data);
1406
1407
1408 graph_task *xtask = op_data.ltask;
1409 graph& g = this->my_graph;
1410 last_task = combine_tasks(g, last_task, xtask);
1411 } while (op_data.status ==SUCCEEDED);
1412 return last_task;
1413 }
1414
1415
1416 virtual void internal_reg_succ(buffer_operation *op) {
1417 __TBB_ASSERT(op->r, nullptr);
1418 my_successors.register_successor(*(op->r));
1419 op->status.store(SUCCEEDED, std::memory_order_release);
1420 }
1421
1422
1423 virtual void internal_rem_succ(buffer_operation *op) {
1424 __TBB_ASSERT(op->r, nullptr);
1425 my_successors.remove_successor(*(op->r));
1426 op->status.store(SUCCEEDED, std::memory_order_release);
1427 }
1428
1429 private:
1430 void order() {}
1431
1432 bool is_item_valid() {
1433 return this->my_item_valid(this->my_tail - 1);
1434 }
1435
1436 void try_put_and_add_task(graph_task*& last_task) {
1437 graph_task* new_task = my_successors.try_put_task(this->back()
1438 __TBB_FLOW_GRAPH_METAINFO_ARG(this->back_metainfo()));
1439 if (new_task) {
1440
1441 graph& g = this->my_graph;
1442 last_task = combine_tasks(g, last_task, new_task);
1443 this->destroy_back();
1444 }
1445 }
1446
1447 protected:
1448
1449 virtual void internal_forward_task(buffer_operation *op) {
1450 internal_forward_task_impl(op, this);
1451 }
1452
1453 template<typename derived_type>
1454 void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
1455 __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1456
1457 if (this->my_reserved || !derived->is_item_valid()) {
1458 op->status.store(FAILED, std::memory_order_release);
1459 this->forwarder_busy = false;
1460 return;
1461 }
1462
1463 graph_task* last_task = nullptr;
1464 size_type counter = my_successors.size();
1465 for (; counter > 0 && derived->is_item_valid(); --counter)
1466 derived->try_put_and_add_task(last_task);
1467
1468 op->ltask = last_task;
1469 if (last_task && !counter) {
1470 op->status.store(SUCCEEDED, std::memory_order_release);
1471 }
1472 else {
1473 op->status.store(FAILED, std::memory_order_release);
1474 forwarder_busy = false;
1475 }
1476 }
1477
1478 virtual bool internal_push(buffer_operation *op) {
1479 __TBB_ASSERT(op->elem, nullptr);
1480 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1481 __TBB_ASSERT(op->metainfo, nullptr);
1482 this->push_back(*(op->elem), (*op->metainfo));
1483 #else
1484 this->push_back(*(op->elem));
1485 #endif
1486 op->status.store(SUCCEEDED, std::memory_order_release);
1487 return true;
1488 }
1489
1490 virtual void internal_pop(buffer_operation *op) {
1491 __TBB_ASSERT(op->elem, nullptr);
1492 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1493 bool pop_result = op->metainfo ? this->pop_back(*(op->elem), *(op->metainfo))
1494 : this->pop_back(*(op->elem));
1495 #else
1496 bool pop_result = this->pop_back(*(op->elem));
1497 #endif
1498 if (pop_result) {
1499 op->status.store(SUCCEEDED, std::memory_order_release);
1500 }
1501 else {
1502 op->status.store(FAILED, std::memory_order_release);
1503 }
1504 }
1505
1506 virtual void internal_reserve(buffer_operation *op) {
1507 __TBB_ASSERT(op->elem, nullptr);
1508 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1509 bool reserve_result = op->metainfo ? this->reserve_front(*(op->elem), *(op->metainfo))
1510 : this->reserve_front(*(op->elem));
1511 #else
1512 bool reserve_result = this->reserve_front(*(op->elem));
1513 #endif
1514 if (reserve_result) {
1515 op->status.store(SUCCEEDED, std::memory_order_release);
1516 }
1517 else {
1518 op->status.store(FAILED, std::memory_order_release);
1519 }
1520 }
1521
1522 virtual void internal_consume(buffer_operation *op) {
1523 this->consume_front();
1524 op->status.store(SUCCEEDED, std::memory_order_release);
1525 }
1526
1527 virtual void internal_release(buffer_operation *op) {
1528 this->release_front();
1529 op->status.store(SUCCEEDED, std::memory_order_release);
1530 }
1531
1532 public:
1533
1534 __TBB_NOINLINE_SYM explicit buffer_node( graph &g )
1535 : graph_node(g), reservable_item_buffer<T, internals_allocator>(), receiver<T>(),
1536 sender<T>(), my_successors(this), forwarder_busy(false)
1537 {
1538 my_aggregator.initialize_handler(handler_type(this));
1539 fgt_node( CODEPTR(), FLOW_BUFFER_NODE, &this->my_graph,
1540 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1541 }
1542
1543 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1544 template <typename... Args>
1545 buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) {
1546 make_edges_in_order(nodes, *this);
1547 }
1548 #endif
1549
1550
1551 __TBB_NOINLINE_SYM buffer_node( const buffer_node& src ) : buffer_node(src.my_graph) {}
1552
1553
1554
1555
1556
1557
1558
1559 bool register_successor( successor_type &r ) override {
1560 buffer_operation op_data(reg_succ);
1561 op_data.r = &r;
1562 my_aggregator.execute(&op_data);
1563 (void)enqueue_forwarding_task(op_data);
1564 return true;
1565 }
1566
1567
1568
1569
1570 bool remove_successor( successor_type &r ) override {
1571
1572 tbb::detail::d2::remove_predecessor(r, *this);
1573 buffer_operation op_data(rem_succ);
1574 op_data.r = &r;
1575 my_aggregator.execute(&op_data);
1576
1577
1578
1579 (void)enqueue_forwarding_task(op_data);
1580 return true;
1581 }
1582
1583
1584
1585
1586 bool try_get( T &v ) override {
1587 buffer_operation op_data(req_item);
1588 op_data.elem = &v;
1589 my_aggregator.execute(&op_data);
1590 (void)enqueue_forwarding_task(op_data);
1591 return (op_data.status==SUCCEEDED);
1592 }
1593
1594 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1595 bool try_get( T &v, message_metainfo& metainfo ) override {
1596 buffer_operation op_data(req_item, metainfo);
1597 op_data.elem = &v;
1598 my_aggregator.execute(&op_data);
1599 (void)enqueue_forwarding_task(op_data);
1600 return (op_data.status==SUCCEEDED);
1601 }
1602 #endif
1603
1604
1605
1606
1607 bool try_reserve( T &v ) override {
1608 buffer_operation op_data(res_item);
1609 op_data.elem = &v;
1610 my_aggregator.execute(&op_data);
1611 (void)enqueue_forwarding_task(op_data);
1612 return (op_data.status==SUCCEEDED);
1613 }
1614
1615 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1616 bool try_reserve( output_type& v, message_metainfo& metainfo ) override {
1617 buffer_operation op_data(res_item, metainfo);
1618 op_data.elem = &v;
1619 my_aggregator.execute(&op_data);
1620 (void)enqueue_forwarding_task(op_data);
1621 return op_data.status==SUCCEEDED;
1622 }
1623 #endif
1624
1625
1626
1627 bool try_release() override {
1628 buffer_operation op_data(rel_res);
1629 my_aggregator.execute(&op_data);
1630 (void)enqueue_forwarding_task(op_data);
1631 return true;
1632 }
1633
1634
1635
1636 bool try_consume() override {
1637 buffer_operation op_data(con_res);
1638 my_aggregator.execute(&op_data);
1639 (void)enqueue_forwarding_task(op_data);
1640 return true;
1641 }
1642
1643 private:
1644 graph_task* try_put_task_impl(const T& t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) {
1645 buffer_operation op_data(t, put_item __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
1646 my_aggregator.execute(&op_data);
1647 graph_task *ft = grab_forwarding_task(op_data);
1648
1649
1650
1651 if(ft && op_data.status ==FAILED) {
1652
1653
1654
1655 spawn_in_graph_arena(graph_reference(), *ft); ft = nullptr;
1656 }
1657 else if(!ft && op_data.status ==SUCCEEDED) {
1658 ft = SUCCESSFULLY_ENQUEUED;
1659 }
1660 return ft;
1661 }
1662
1663 protected:
1664
1665 template< typename R, typename B > friend class run_and_put_task;
1666 template<typename X, typename Y> friend class broadcast_cache;
1667 template<typename X, typename Y> friend class round_robin_cache;
1668
1669 graph_task *try_put_task(const T &t) override {
1670 return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
1671 }
1672
1673 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1674 graph_task* try_put_task(const T& t, const message_metainfo& metainfo) override {
1675 return try_put_task_impl(t, metainfo);
1676 }
1677 #endif
1678
1679 graph& graph_reference() const override {
1680 return my_graph;
1681 }
1682
1683 protected:
1684 void reset_node( reset_flags f) override {
1685 reservable_item_buffer<T, internals_allocator>::reset();
1686
1687 if (f&rf_clear_edges) {
1688 my_successors.clear();
1689 }
1690 forwarder_busy = false;
1691 }
1692 };
1693
1694
1695 template <typename T>
1696 class queue_node : public buffer_node<T> {
1697 protected:
1698 typedef buffer_node<T> base_type;
1699 typedef typename base_type::size_type size_type;
1700 typedef typename base_type::buffer_operation queue_operation;
1701 typedef queue_node class_type;
1702
1703 private:
1704 template<typename> friend class buffer_node;
1705
1706 bool is_item_valid() {
1707 return this->my_item_valid(this->my_head);
1708 }
1709
1710 void try_put_and_add_task(graph_task*& last_task) {
1711 graph_task* new_task = this->my_successors.try_put_task(this->front()
1712 __TBB_FLOW_GRAPH_METAINFO_ARG(this->front_metainfo()));
1713
1714 if (new_task) {
1715
1716 graph& graph_ref = this->graph_reference();
1717 last_task = combine_tasks(graph_ref, last_task, new_task);
1718 this->destroy_front();
1719 }
1720 }
1721
1722 protected:
1723 void internal_forward_task(queue_operation *op) override {
1724 this->internal_forward_task_impl(op, this);
1725 }
1726
1727 void internal_pop(queue_operation *op) override {
1728 if ( this->my_reserved || !this->my_item_valid(this->my_head)){
1729 op->status.store(FAILED, std::memory_order_release);
1730 }
1731 else {
1732 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1733 if (op->metainfo) {
1734 this->pop_front(*(op->elem), *(op->metainfo));
1735 } else
1736 #endif
1737 {
1738 this->pop_front(*(op->elem));
1739 }
1740 op->status.store(SUCCEEDED, std::memory_order_release);
1741 }
1742 }
1743 void internal_reserve(queue_operation *op) override {
1744 if (this->my_reserved || !this->my_item_valid(this->my_head)) {
1745 op->status.store(FAILED, std::memory_order_release);
1746 }
1747 else {
1748 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1749 if (op->metainfo) {
1750 this->reserve_front(*(op->elem), *(op->metainfo));
1751 }
1752 else
1753 #endif
1754 {
1755 this->reserve_front(*(op->elem));
1756 }
1757 op->status.store(SUCCEEDED, std::memory_order_release);
1758 }
1759 }
1760 void internal_consume(queue_operation *op) override {
1761 this->consume_front();
1762 op->status.store(SUCCEEDED, std::memory_order_release);
1763 }
1764
1765 public:
1766 typedef T input_type;
1767 typedef T output_type;
1768 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1769 typedef typename sender<output_type>::successor_type successor_type;
1770
1771
1772 __TBB_NOINLINE_SYM explicit queue_node( graph &g ) : base_type(g) {
1773 fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph),
1774 static_cast<receiver<input_type> *>(this),
1775 static_cast<sender<output_type> *>(this) );
1776 }
1777
1778 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1779 template <typename... Args>
1780 queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) {
1781 make_edges_in_order(nodes, *this);
1782 }
1783 #endif
1784
1785
1786 __TBB_NOINLINE_SYM queue_node( const queue_node& src) : base_type(src) {
1787 fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph),
1788 static_cast<receiver<input_type> *>(this),
1789 static_cast<sender<output_type> *>(this) );
1790 }
1791
1792
1793 protected:
1794 void reset_node( reset_flags f) override {
1795 base_type::reset_node(f);
1796 }
1797 };
1798
1799
1800 template <typename T>
1801 __TBB_requires(std::copyable<T>)
1802 class sequencer_node : public queue_node<T> {
1803 function_body< T, size_t > *my_sequencer;
1804
1805
1806 public:
1807 typedef T input_type;
1808 typedef T output_type;
1809 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1810 typedef typename sender<output_type>::successor_type successor_type;
1811
1812
1813 template< typename Sequencer >
1814 __TBB_requires(sequencer<Sequencer, T>)
1815 __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T>(g),
1816 my_sequencer(new function_body_leaf< T, size_t, Sequencer>(s) ) {
1817 fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph),
1818 static_cast<receiver<input_type> *>(this),
1819 static_cast<sender<output_type> *>(this) );
1820 }
1821
1822 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1823 template <typename Sequencer, typename... Args>
1824 __TBB_requires(sequencer<Sequencer, T>)
1825 sequencer_node( const node_set<Args...>& nodes, const Sequencer& s)
1826 : sequencer_node(nodes.graph_reference(), s) {
1827 make_edges_in_order(nodes, *this);
1828 }
1829 #endif
1830
1831
1832 __TBB_NOINLINE_SYM sequencer_node( const sequencer_node& src ) : queue_node<T>(src),
1833 my_sequencer( src.my_sequencer->clone() ) {
1834 fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph),
1835 static_cast<receiver<input_type> *>(this),
1836 static_cast<sender<output_type> *>(this) );
1837 }
1838
1839
1840 ~sequencer_node() { delete my_sequencer; }
1841
1842 protected:
1843 typedef typename buffer_node<T>::size_type size_type;
1844 typedef typename buffer_node<T>::buffer_operation sequencer_operation;
1845
1846 private:
1847 bool internal_push(sequencer_operation *op) override {
1848 size_type tag = (*my_sequencer)(*(op->elem));
1849 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
1850 if (tag < this->my_head) {
1851
1852 op->status.store(FAILED, std::memory_order_release);
1853 return false;
1854 }
1855 #endif
1856
1857 size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
1858
1859 if (this->size(new_tail) > this->capacity()) {
1860 this->grow_my_array(this->size(new_tail));
1861 }
1862 this->my_tail = new_tail;
1863
1864 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1865 __TBB_ASSERT(op->metainfo, nullptr);
1866 bool place_item_result = this->place_item(tag, *(op->elem), *(op->metainfo));
1867 const op_stat res = place_item_result ? SUCCEEDED : FAILED;
1868 #else
1869 const op_stat res = this->place_item(tag, *(op->elem)) ? SUCCEEDED : FAILED;
1870 #endif
1871 op->status.store(res, std::memory_order_release);
1872 return res ==SUCCEEDED;
1873 }
1874 };
1875
1876
1877 template<typename T, typename Compare = std::less<T>>
1878 class priority_queue_node : public buffer_node<T> {
1879 public:
1880 typedef T input_type;
1881 typedef T output_type;
1882 typedef buffer_node<T> base_type;
1883 typedef priority_queue_node class_type;
1884 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1885 typedef typename sender<output_type>::successor_type successor_type;
1886
1887
1888 __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() )
1889 : buffer_node<T>(g), compare(comp), mark(0) {
1890 fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
1891 static_cast<receiver<input_type> *>(this),
1892 static_cast<sender<output_type> *>(this) );
1893 }
1894
1895 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1896 template <typename... Args>
1897 priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare())
1898 : priority_queue_node(nodes.graph_reference(), comp) {
1899 make_edges_in_order(nodes, *this);
1900 }
1901 #endif
1902
1903
1904 __TBB_NOINLINE_SYM priority_queue_node( const priority_queue_node &src )
1905 : buffer_node<T>(src), mark(0)
1906 {
1907 fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
1908 static_cast<receiver<input_type> *>(this),
1909 static_cast<sender<output_type> *>(this) );
1910 }
1911
1912 protected:
1913
1914 void reset_node( reset_flags f) override {
1915 mark = 0;
1916 base_type::reset_node(f);
1917 }
1918
1919 typedef typename buffer_node<T>::size_type size_type;
1920 typedef typename buffer_node<T>::item_type item_type;
1921 typedef typename buffer_node<T>::buffer_operation prio_operation;
1922
1923
1924 void internal_forward_task(prio_operation *op) override {
1925 this->internal_forward_task_impl(op, this);
1926 }
1927
1928 void handle_operations(prio_operation *op_list) override {
1929 this->handle_operations_impl(op_list, this);
1930 }
1931
1932 bool internal_push(prio_operation *op) override {
1933 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1934 __TBB_ASSERT(op->metainfo, nullptr);
1935 prio_push(*(op->elem), *(op->metainfo));
1936 #else
1937 prio_push(*(op->elem));
1938 #endif
1939 op->status.store(SUCCEEDED, std::memory_order_release);
1940 return true;
1941 }
1942
1943 void internal_pop(prio_operation *op) override {
1944
1945 if ( this->my_reserved == true || this->my_tail == 0 ) {
1946 op->status.store(FAILED, std::memory_order_release);
1947 return;
1948 }
1949
1950 *(op->elem) = prio();
1951 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1952 if (op->metainfo) {
1953 *(op->metainfo) = std::move(prio_metainfo());
1954 }
1955 #endif
1956 op->status.store(SUCCEEDED, std::memory_order_release);
1957 prio_pop();
1958
1959 }
1960
1961
1962 void internal_reserve(prio_operation *op) override {
1963 if (this->my_reserved == true || this->my_tail == 0) {
1964 op->status.store(FAILED, std::memory_order_release);
1965 return;
1966 }
1967 this->my_reserved = true;
1968 *(op->elem) = prio();
1969 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1970 if (op->metainfo) {
1971 *(op->metainfo) = std::move(prio_metainfo());
1972 reserved_metainfo = *(op->metainfo);
1973 }
1974 #endif
1975 reserved_item = *(op->elem);
1976 op->status.store(SUCCEEDED, std::memory_order_release);
1977 prio_pop();
1978 }
1979
1980 void internal_consume(prio_operation *op) override {
1981 op->status.store(SUCCEEDED, std::memory_order_release);
1982 this->my_reserved = false;
1983 reserved_item = input_type();
1984 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1985 for (auto waiter : reserved_metainfo.waiters()) {
1986 waiter->release(1);
1987 }
1988
1989 reserved_metainfo = message_metainfo{};
1990 #endif
1991 }
1992
1993 void internal_release(prio_operation *op) override {
1994 op->status.store(SUCCEEDED, std::memory_order_release);
1995 prio_push(reserved_item __TBB_FLOW_GRAPH_METAINFO_ARG(reserved_metainfo));
1996 this->my_reserved = false;
1997 reserved_item = input_type();
1998 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1999 for (auto waiter : reserved_metainfo.waiters()) {
2000 waiter->release(1);
2001 }
2002
2003 reserved_metainfo = message_metainfo{};
2004 #endif
2005 }
2006
2007 private:
2008 template<typename> friend class buffer_node;
2009
2010 void order() {
2011 if (mark < this->my_tail) heapify();
2012 __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
2013 }
2014
2015 bool is_item_valid() {
2016 return this->my_tail > 0;
2017 }
2018
2019 void try_put_and_add_task(graph_task*& last_task) {
2020 graph_task* new_task = this->my_successors.try_put_task(this->prio()
2021 __TBB_FLOW_GRAPH_METAINFO_ARG(this->prio_metainfo()));
2022 if (new_task) {
2023
2024 graph& graph_ref = this->graph_reference();
2025 last_task = combine_tasks(graph_ref, last_task, new_task);
2026 prio_pop();
2027 }
2028 }
2029
2030 private:
2031 Compare compare;
2032 size_type mark;
2033
2034 input_type reserved_item;
2035 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
2036 message_metainfo reserved_metainfo;
2037 #endif
2038
2039
2040 bool prio_use_tail() {
2041 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
2042 return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
2043 }
2044
2045
2046 void prio_push(const T &src __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) {
2047 if ( this->my_tail >= this->my_array_size )
2048 this->grow_my_array( this->my_tail + 1 );
2049 (void) this->place_item(this->my_tail, src __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
2050 ++(this->my_tail);
2051 __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
2052 }
2053
2054
2055
2056
2057 void prio_pop() {
2058 if (prio_use_tail()) {
2059
2060
2061 this->destroy_item(this->my_tail-1);
2062 --(this->my_tail);
2063 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2064 return;
2065 }
2066 this->destroy_item(0);
2067 if(this->my_tail > 1) {
2068
2069 __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), nullptr);
2070 this->move_item(0,this->my_tail - 1);
2071 }
2072 --(this->my_tail);
2073 if(mark > this->my_tail) --mark;
2074 if (this->my_tail > 1)
2075 reheap();
2076 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2077 }
2078
2079 const T& prio() {
2080 return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
2081 }
2082
2083 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
2084 message_metainfo& prio_metainfo() {
2085 return this->get_my_metainfo(prio_use_tail() ? this->my_tail-1 : 0);
2086 }
2087 #endif
2088
2089
2090 void heapify() {
2091 if(this->my_tail == 0) {
2092 mark = 0;
2093 return;
2094 }
2095 if (!mark) mark = 1;
2096 for (; mark<this->my_tail; ++mark) {
2097 size_type cur_pos = mark;
2098 input_type to_place;
2099 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
2100 message_metainfo metainfo;
2101 #endif
2102 this->fetch_item(mark, to_place __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
2103 do {
2104 size_type parent = (cur_pos-1)>>1;
2105 if (!compare(this->get_my_item(parent), to_place))
2106 break;
2107 this->move_item(cur_pos, parent);
2108 cur_pos = parent;
2109 } while( cur_pos );
2110 this->place_item(cur_pos, to_place __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(metainfo)));
2111 }
2112 }
2113
2114
2115 void reheap() {
2116 size_type cur_pos=0, child=1;
2117 while (child < mark) {
2118 size_type target = child;
2119 if (child+1<mark &&
2120 compare(this->get_my_item(child),
2121 this->get_my_item(child+1)))
2122 ++target;
2123
2124 if (compare(this->get_my_item(target),
2125 this->get_my_item(cur_pos)))
2126 break;
2127
2128 this->swap_items(cur_pos, target);
2129 cur_pos = target;
2130 child = (cur_pos<<1)+1;
2131 }
2132 }
2133 };
2134
2135
2136
2137
2138
2139 template< typename T, typename DecrementType=continue_msg >
2140 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
2141 public:
2142 typedef T input_type;
2143 typedef T output_type;
2144 typedef typename receiver<input_type>::predecessor_type predecessor_type;
2145 typedef typename sender<output_type>::successor_type successor_type;
2146
2147
2148 private:
2149 size_t my_threshold;
2150 size_t my_count;
2151 size_t my_tries;
2152 size_t my_future_decrement;
2153 reservable_predecessor_cache< T, spin_mutex > my_predecessors;
2154 spin_mutex my_mutex;
2155 broadcast_cache< T > my_successors;
2156
2157
2158 threshold_regulator< limiter_node<T, DecrementType>, DecrementType > decrement;
2159
2160 graph_task* decrement_counter( long long delta ) {
2161 if ( delta > 0 && size_t(delta) > my_threshold ) {
2162 delta = my_threshold;
2163 }
2164
2165 {
2166 spin_mutex::scoped_lock lock(my_mutex);
2167 if ( delta > 0 && size_t(delta) > my_count ) {
2168 if( my_tries > 0 ) {
2169 my_future_decrement += (size_t(delta) - my_count);
2170 }
2171 my_count = 0;
2172 }
2173 else if ( delta < 0 && size_t(-delta) > my_threshold - my_count ) {
2174 my_count = my_threshold;
2175 }
2176 else {
2177 my_count -= size_t(delta);
2178 }
2179 __TBB_ASSERT(my_count <= my_threshold, "counter values are truncated to be inside the [0, threshold] interval");
2180 }
2181 return forward_task();
2182 }
2183
2184
2185 friend class threshold_regulator< limiter_node<T, DecrementType>, DecrementType >;
2186
2187 friend class forward_task_bypass< limiter_node<T,DecrementType> >;
2188
2189 bool check_conditions() {
2190 return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
2191 }
2192
2193
2194 graph_task* forward_task() {
2195 input_type v;
2196 graph_task* rval = nullptr;
2197 bool reserved = false;
2198
2199 {
2200 spin_mutex::scoped_lock lock(my_mutex);
2201 if ( check_conditions() )
2202 ++my_tries;
2203 else
2204 return nullptr;
2205 }
2206
2207
2208
2209
2210 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
2211 message_metainfo metainfo;
2212 #endif
2213 if ( (my_predecessors.try_reserve(v __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo))) == true ) {
2214 reserved = true;
2215 if ( (rval = my_successors.try_put_task(v __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo))) != nullptr ) {
2216 {
2217 spin_mutex::scoped_lock lock(my_mutex);
2218 ++my_count;
2219 if ( my_future_decrement ) {
2220 if ( my_count > my_future_decrement ) {
2221 my_count -= my_future_decrement;
2222 my_future_decrement = 0;
2223 }
2224 else {
2225 my_future_decrement -= my_count;
2226 my_count = 0;
2227 }
2228 }
2229 --my_tries;
2230 my_predecessors.try_consume();
2231 if ( check_conditions() ) {
2232 if ( is_graph_active(this->my_graph) ) {
2233 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2234 d1::small_object_allocator allocator{};
2235 graph_task* rtask = allocator.new_object<task_type>( my_graph, allocator, *this );
2236 spawn_in_graph_arena(graph_reference(), *rtask);
2237 }
2238 }
2239 }
2240 return rval;
2241 }
2242 }
2243
2244
2245
2246 {
2247 spin_mutex::scoped_lock lock(my_mutex);
2248 --my_tries;
2249 if (reserved) my_predecessors.try_release();
2250 if ( check_conditions() ) {
2251 if ( is_graph_active(this->my_graph) ) {
2252 d1::small_object_allocator allocator{};
2253 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2254 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
2255 __TBB_ASSERT(!rval, "Have two tasks to handle");
2256 return t;
2257 }
2258 }
2259 return rval;
2260 }
2261 }
2262
2263 void initialize() {
2264 fgt_node(
2265 CODEPTR(), FLOW_LIMITER_NODE, &this->my_graph,
2266 static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
2267 static_cast<sender<output_type> *>(this)
2268 );
2269 }
2270
2271 public:
2272
2273 limiter_node(graph &g, size_t threshold)
2274 : graph_node(g), my_threshold(threshold), my_count(0), my_tries(0), my_future_decrement(0),
2275 my_predecessors(this), my_successors(this), decrement(this)
2276 {
2277 initialize();
2278 }
2279
2280 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2281 template <typename... Args>
2282 limiter_node(const node_set<Args...>& nodes, size_t threshold)
2283 : limiter_node(nodes.graph_reference(), threshold) {
2284 make_edges_in_order(nodes, *this);
2285 }
2286 #endif
2287
2288
2289 limiter_node( const limiter_node& src ) : limiter_node(src.my_graph, src.my_threshold) {}
2290
2291
2292 receiver<DecrementType>& decrementer() { return decrement; }
2293
2294
2295 bool register_successor( successor_type &r ) override {
2296 spin_mutex::scoped_lock lock(my_mutex);
2297 bool was_empty = my_successors.empty();
2298 my_successors.register_successor(r);
2299
2300 if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
2301 if ( is_graph_active(this->my_graph) ) {
2302 d1::small_object_allocator allocator{};
2303 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2304 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
2305 spawn_in_graph_arena(graph_reference(), *t);
2306 }
2307 }
2308 return true;
2309 }
2310
2311
2312
2313 bool remove_successor( successor_type &r ) override {
2314
2315 tbb::detail::d2::remove_predecessor(r, *this);
2316 my_successors.remove_successor(r);
2317 return true;
2318 }
2319
2320
2321 bool register_predecessor( predecessor_type &src ) override {
2322 spin_mutex::scoped_lock lock(my_mutex);
2323 my_predecessors.add( src );
2324 if ( my_count + my_tries < my_threshold && !my_successors.empty() && is_graph_active(this->my_graph) ) {
2325 d1::small_object_allocator allocator{};
2326 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2327 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
2328 spawn_in_graph_arena(graph_reference(), *t);
2329 }
2330 return true;
2331 }
2332
2333
2334 bool remove_predecessor( predecessor_type &src ) override {
2335 my_predecessors.remove( src );
2336 return true;
2337 }
2338
2339 protected:
2340
2341 template< typename R, typename B > friend class run_and_put_task;
2342 template<typename X, typename Y> friend class broadcast_cache;
2343 template<typename X, typename Y> friend class round_robin_cache;
2344
2345 private:
2346
2347 graph_task* try_put_task_impl( const T &t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) {
2348 {
2349 spin_mutex::scoped_lock lock(my_mutex);
2350 if ( my_count + my_tries >= my_threshold )
2351 return nullptr;
2352 else
2353 ++my_tries;
2354 }
2355
2356 graph_task* rtask = my_successors.try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
2357 if ( !rtask ) {
2358 spin_mutex::scoped_lock lock(my_mutex);
2359 --my_tries;
2360 if (check_conditions() && is_graph_active(this->my_graph)) {
2361 d1::small_object_allocator allocator{};
2362 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2363 rtask = allocator.new_object<task_type>(my_graph, allocator, *this);
2364 }
2365 }
2366 else {
2367 spin_mutex::scoped_lock lock(my_mutex);
2368 ++my_count;
2369 if ( my_future_decrement ) {
2370 if ( my_count > my_future_decrement ) {
2371 my_count -= my_future_decrement;
2372 my_future_decrement = 0;
2373 }
2374 else {
2375 my_future_decrement -= my_count;
2376 my_count = 0;
2377 }
2378 }
2379 --my_tries;
2380 }
2381 return rtask;
2382 }
2383
2384 protected:
2385 graph_task* try_put_task(const T& t) override {
2386 return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
2387 }
2388 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
2389 graph_task* try_put_task(const T& t, const message_metainfo& metainfo) override {
2390 return try_put_task_impl(t, metainfo);
2391 }
2392 #endif
2393
2394 graph& graph_reference() const override { return my_graph; }
2395
2396 void reset_node( reset_flags f ) override {
2397 my_count = 0;
2398 if ( f & rf_clear_edges ) {
2399 my_predecessors.clear();
2400 my_successors.clear();
2401 }
2402 else {
2403 my_predecessors.reset();
2404 }
2405 decrement.reset_receiver(f);
2406 }
2407 };
2408
2409 #include "detail/_flow_graph_join_impl.h"
2410
2411 template<typename OutputTuple, typename JP=queueing> class join_node;
2412
2413 template<typename OutputTuple>
2414 class join_node<OutputTuple,reserving>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
2415 private:
2416 static const int N = std::tuple_size<OutputTuple>::value;
2417 typedef unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
2418 public:
2419 typedef OutputTuple output_type;
2420 typedef typename unfolded_type::input_ports_type input_ports_type;
2421 __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
2422 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2423 this->input_ports(), static_cast< sender< output_type > *>(this) );
2424 }
2425
2426 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2427 template <typename... Args>
2428 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) {
2429 make_edges_in_order(nodes, *this);
2430 }
2431 #endif
2432
2433 __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2434 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2435 this->input_ports(), static_cast< sender< output_type > *>(this) );
2436 }
2437
2438 };
2439
2440 template<typename OutputTuple>
2441 class join_node<OutputTuple,queueing>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
2442 private:
2443 static const int N = std::tuple_size<OutputTuple>::value;
2444 typedef unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
2445 public:
2446 typedef OutputTuple output_type;
2447 typedef typename unfolded_type::input_ports_type input_ports_type;
2448 __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
2449 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2450 this->input_ports(), static_cast< sender< output_type > *>(this) );
2451 }
2452
2453 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2454 template <typename... Args>
2455 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) {
2456 make_edges_in_order(nodes, *this);
2457 }
2458 #endif
2459
2460 __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2461 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2462 this->input_ports(), static_cast< sender< output_type > *>(this) );
2463 }
2464
2465 };
2466
2467 #if __TBB_CPP20_CONCEPTS_PRESENT
2468
2469
2470 template <typename OutputTuple, typename K,
2471 typename... Functions, std::size_t... Idx>
2472 void join_node_function_objects_helper( std::index_sequence<Idx...> )
2473 requires (std::tuple_size_v<OutputTuple> == sizeof...(Functions)) &&
2474 (... && join_node_function_object<Functions, std::tuple_element_t<Idx, OutputTuple>, K>);
2475
2476 template <typename OutputTuple, typename K, typename... Functions>
2477 concept join_node_functions = requires {
2478 join_node_function_objects_helper<OutputTuple, K, Functions...>(std::make_index_sequence<sizeof...(Functions)>{});
2479 };
2480
2481 #endif
2482
2483
2484
2485 template<typename OutputTuple, typename K, typename KHash>
2486 class join_node<OutputTuple, key_matching<K, KHash> > : public unfolded_join_node<std::tuple_size<OutputTuple>::value,
2487 key_matching_port, OutputTuple, key_matching<K,KHash> > {
2488 private:
2489 static const int N = std::tuple_size<OutputTuple>::value;
2490 typedef unfolded_join_node<N, key_matching_port, OutputTuple, key_matching<K,KHash> > unfolded_type;
2491 public:
2492 typedef OutputTuple output_type;
2493 typedef typename unfolded_type::input_ports_type input_ports_type;
2494
2495 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
2496 join_node(graph &g) : unfolded_type(g) {}
2497 #endif
2498
2499 template<typename __TBB_B0, typename __TBB_B1>
2500 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1>)
2501 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
2502 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2503 this->input_ports(), static_cast< sender< output_type > *>(this) );
2504 }
2505 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
2506 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2>)
2507 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
2508 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2509 this->input_ports(), static_cast< sender< output_type > *>(this) );
2510 }
2511 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
2512 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3>)
2513 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
2514 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2515 this->input_ports(), static_cast< sender< output_type > *>(this) );
2516 }
2517 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
2518 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4>)
2519 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
2520 unfolded_type(g, b0, b1, b2, b3, b4) {
2521 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2522 this->input_ports(), static_cast< sender< output_type > *>(this) );
2523 }
2524 #if __TBB_VARIADIC_MAX >= 6
2525 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2526 typename __TBB_B5>
2527 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5>)
2528 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
2529 unfolded_type(g, b0, b1, b2, b3, b4, b5) {
2530 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2531 this->input_ports(), static_cast< sender< output_type > *>(this) );
2532 }
2533 #endif
2534 #if __TBB_VARIADIC_MAX >= 7
2535 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2536 typename __TBB_B5, typename __TBB_B6>
2537 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6>)
2538 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
2539 unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
2540 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2541 this->input_ports(), static_cast< sender< output_type > *>(this) );
2542 }
2543 #endif
2544 #if __TBB_VARIADIC_MAX >= 8
2545 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2546 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
2547 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7>)
2548 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2549 __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
2550 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2551 this->input_ports(), static_cast< sender< output_type > *>(this) );
2552 }
2553 #endif
2554 #if __TBB_VARIADIC_MAX >= 9
2555 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2556 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
2557 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8>)
2558 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2559 __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
2560 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2561 this->input_ports(), static_cast< sender< output_type > *>(this) );
2562 }
2563 #endif
2564 #if __TBB_VARIADIC_MAX >= 10
2565 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2566 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
2567 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8, __TBB_B9>)
2568 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2569 __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
2570 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2571 this->input_ports(), static_cast< sender< output_type > *>(this) );
2572 }
2573 #endif
2574
2575 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2576 template <
2577 #if (__clang_major__ == 3 && __clang_minor__ == 4)
2578
2579 template<typename...> class node_set,
2580 #endif
2581 typename... Args, typename... Bodies
2582 >
2583 __TBB_requires((sizeof...(Bodies) == 0) || join_node_functions<OutputTuple, K, Bodies...>)
2584 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies)
2585 : join_node(nodes.graph_reference(), bodies...) {
2586 make_edges_in_order(nodes, *this);
2587 }
2588 #endif
2589
2590 __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2591 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2592 this->input_ports(), static_cast< sender< output_type > *>(this) );
2593 }
2594
2595 };
2596
2597
2598 #include "detail/_flow_graph_indexer_impl.h"
2599
2600
2601 template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
2602 typename T4=null_type, typename T5=null_type, typename T6=null_type,
2603 typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
2604
2605
2606 template<typename T0>
2607 class indexer_node<T0> : public unfolded_indexer_node<std::tuple<T0> > {
2608 private:
2609 static const int N = 1;
2610 public:
2611 typedef std::tuple<T0> InputTuple;
2612 typedef tagged_msg<size_t, T0> output_type;
2613 typedef unfolded_indexer_node<InputTuple> unfolded_type;
2614 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2615 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2616 this->input_ports(), static_cast< sender< output_type > *>(this) );
2617 }
2618
2619 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2620 template <typename... Args>
2621 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2622 make_edges_in_order(nodes, *this);
2623 }
2624 #endif
2625
2626
2627 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2628 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2629 this->input_ports(), static_cast< sender< output_type > *>(this) );
2630 }
2631 };
2632
2633 template<typename T0, typename T1>
2634 class indexer_node<T0, T1> : public unfolded_indexer_node<std::tuple<T0, T1> > {
2635 private:
2636 static const int N = 2;
2637 public:
2638 typedef std::tuple<T0, T1> InputTuple;
2639 typedef tagged_msg<size_t, T0, T1> output_type;
2640 typedef unfolded_indexer_node<InputTuple> unfolded_type;
2641 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2642 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2643 this->input_ports(), static_cast< sender< output_type > *>(this) );
2644 }
2645
2646 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2647 template <typename... Args>
2648 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2649 make_edges_in_order(nodes, *this);
2650 }
2651 #endif
2652
2653
2654 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2655 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2656 this->input_ports(), static_cast< sender< output_type > *>(this) );
2657 }
2658
2659 };
2660
2661 template<typename T0, typename T1, typename T2>
2662 class indexer_node<T0, T1, T2> : public unfolded_indexer_node<std::tuple<T0, T1, T2> > {
2663 private:
2664 static const int N = 3;
2665 public:
2666 typedef std::tuple<T0, T1, T2> InputTuple;
2667 typedef tagged_msg<size_t, T0, T1, T2> output_type;
2668 typedef unfolded_indexer_node<InputTuple> unfolded_type;
2669 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2670 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2671 this->input_ports(), static_cast< sender< output_type > *>(this) );
2672 }
2673
2674 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2675 template <typename... Args>
2676 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2677 make_edges_in_order(nodes, *this);
2678 }
2679 #endif
2680
2681
2682 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2683 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2684 this->input_ports(), static_cast< sender< output_type > *>(this) );
2685 }
2686
2687 };
2688
2689 template<typename T0, typename T1, typename T2, typename T3>
2690 class indexer_node<T0, T1, T2, T3> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3> > {
2691 private:
2692 static const int N = 4;
2693 public:
2694 typedef std::tuple<T0, T1, T2, T3> InputTuple;
2695 typedef tagged_msg<size_t, T0, T1, T2, T3> output_type;
2696 typedef unfolded_indexer_node<InputTuple> unfolded_type;
2697 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2698 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2699 this->input_ports(), static_cast< sender< output_type > *>(this) );
2700 }
2701
2702 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2703 template <typename... Args>
2704 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2705 make_edges_in_order(nodes, *this);
2706 }
2707 #endif
2708
2709
2710 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2711 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2712 this->input_ports(), static_cast< sender< output_type > *>(this) );
2713 }
2714
2715 };
2716
2717 template<typename T0, typename T1, typename T2, typename T3, typename T4>
2718 class indexer_node<T0, T1, T2, T3, T4> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4> > {
2719 private:
2720 static const int N = 5;
2721 public:
2722 typedef std::tuple<T0, T1, T2, T3, T4> InputTuple;
2723 typedef tagged_msg<size_t, T0, T1, T2, T3, T4> output_type;
2724 typedef unfolded_indexer_node<InputTuple> unfolded_type;
2725 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2726 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2727 this->input_ports(), static_cast< sender< output_type > *>(this) );
2728 }
2729
2730 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2731 template <typename... Args>
2732 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2733 make_edges_in_order(nodes, *this);
2734 }
2735 #endif
2736
2737
2738 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2739 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2740 this->input_ports(), static_cast< sender< output_type > *>(this) );
2741 }
2742
2743 };
2744
2745 #if __TBB_VARIADIC_MAX >= 6
2746 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
2747 class indexer_node<T0, T1, T2, T3, T4, T5> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5> > {
2748 private:
2749 static const int N = 6;
2750 public:
2751 typedef std::tuple<T0, T1, T2, T3, T4, T5> InputTuple;
2752 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5> output_type;
2753 typedef unfolded_indexer_node<InputTuple> unfolded_type;
2754 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2755 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2756 this->input_ports(), static_cast< sender< output_type > *>(this) );
2757 }
2758
2759 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2760 template <typename... Args>
2761 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2762 make_edges_in_order(nodes, *this);
2763 }
2764 #endif
2765
2766
2767 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2768 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2769 this->input_ports(), static_cast< sender< output_type > *>(this) );
2770 }
2771
2772 };
2773 #endif
2774
2775 #if __TBB_VARIADIC_MAX >= 7
2776 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2777 typename T6>
2778 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6> > {
2779 private:
2780 static const int N = 7;
2781 public:
2782 typedef std::tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
2783 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6> output_type;
2784 typedef unfolded_indexer_node<InputTuple> unfolded_type;
2785 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2786 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2787 this->input_ports(), static_cast< sender< output_type > *>(this) );
2788 }
2789
2790 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2791 template <typename... Args>
2792 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2793 make_edges_in_order(nodes, *this);
2794 }
2795 #endif
2796
2797
2798 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2799 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2800 this->input_ports(), static_cast< sender< output_type > *>(this) );
2801 }
2802
2803 };
2804 #endif
2805
2806 #if __TBB_VARIADIC_MAX >= 8
2807 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2808 typename T6, typename T7>
2809 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
2810 private:
2811 static const int N = 8;
2812 public:
2813 typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
2814 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7> output_type;
2815 typedef unfolded_indexer_node<InputTuple> unfolded_type;
2816 indexer_node(graph& g) : unfolded_type(g) {
2817 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2818 this->input_ports(), static_cast< sender< output_type > *>(this) );
2819 }
2820
2821 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2822 template <typename... Args>
2823 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2824 make_edges_in_order(nodes, *this);
2825 }
2826 #endif
2827
2828
2829 indexer_node( const indexer_node& other ) : unfolded_type(other) {
2830 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2831 this->input_ports(), static_cast< sender< output_type > *>(this) );
2832 }
2833
2834 };
2835 #endif
2836
2837 #if __TBB_VARIADIC_MAX >= 9
2838 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2839 typename T6, typename T7, typename T8>
2840 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
2841 private:
2842 static const int N = 9;
2843 public:
2844 typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
2845 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8> output_type;
2846 typedef unfolded_indexer_node<InputTuple> unfolded_type;
2847 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2848 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2849 this->input_ports(), static_cast< sender< output_type > *>(this) );
2850 }
2851
2852 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2853 template <typename... Args>
2854 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2855 make_edges_in_order(nodes, *this);
2856 }
2857 #endif
2858
2859
2860 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2861 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2862 this->input_ports(), static_cast< sender< output_type > *>(this) );
2863 }
2864
2865 };
2866 #endif
2867
2868 #if __TBB_VARIADIC_MAX >= 10
2869 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2870 typename T6, typename T7, typename T8, typename T9>
2871 class indexer_node : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
2872 private:
2873 static const int N = 10;
2874 public:
2875 typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
2876 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
2877 typedef unfolded_indexer_node<InputTuple> unfolded_type;
2878 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2879 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2880 this->input_ports(), static_cast< sender< output_type > *>(this) );
2881 }
2882
2883 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2884 template <typename... Args>
2885 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2886 make_edges_in_order(nodes, *this);
2887 }
2888 #endif
2889
2890
2891 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2892 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2893 this->input_ports(), static_cast< sender< output_type > *>(this) );
2894 }
2895
2896 };
2897 #endif
2898
2899 template< typename T >
2900 inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
2901 register_successor(p, s);
2902 fgt_make_edge( &p, &s );
2903 }
2904
2905
2906 template< typename T >
2907 inline void make_edge( sender<T> &p, receiver<T> &s ) {
2908 internal_make_edge( p, s );
2909 }
2910
2911
2912 template< typename T, typename V,
2913 typename = typename T::output_ports_type, typename = typename V::input_ports_type >
2914 inline void make_edge( T& output, V& input) {
2915 make_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports()));
2916 }
2917
2918
2919 template< typename T, typename R,
2920 typename = typename T::output_ports_type >
2921 inline void make_edge( T& output, receiver<R>& input) {
2922 make_edge(std::get<0>(output.output_ports()), input);
2923 }
2924
2925
2926 template< typename S, typename V,
2927 typename = typename V::input_ports_type >
2928 inline void make_edge( sender<S>& output, V& input) {
2929 make_edge(output, std::get<0>(input.input_ports()));
2930 }
2931
2932 template< typename T >
2933 inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
2934 remove_successor( p, s );
2935 fgt_remove_edge( &p, &s );
2936 }
2937
2938
2939 template< typename T >
2940 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
2941 internal_remove_edge( p, s );
2942 }
2943
2944
2945 template< typename T, typename V,
2946 typename = typename T::output_ports_type, typename = typename V::input_ports_type >
2947 inline void remove_edge( T& output, V& input) {
2948 remove_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports()));
2949 }
2950
2951
2952 template< typename T, typename R,
2953 typename = typename T::output_ports_type >
2954 inline void remove_edge( T& output, receiver<R>& input) {
2955 remove_edge(std::get<0>(output.output_ports()), input);
2956 }
2957
2958 template< typename S, typename V,
2959 typename = typename V::input_ports_type >
2960 inline void remove_edge( sender<S>& output, V& input) {
2961 remove_edge(output, std::get<0>(input.input_ports()));
2962 }
2963
2964
2965 template< typename Body, typename Node >
2966 Body copy_body( Node &n ) {
2967 return n.template copy_function_object<Body>();
2968 }
2969
2970
2971 template< typename InputTuple, typename OutputTuple > class composite_node;
2972
2973 template< typename... InputTypes, typename... OutputTypes>
2974 class composite_node <std::tuple<InputTypes...>, std::tuple<OutputTypes...> > : public graph_node {
2975
2976 public:
2977 typedef std::tuple< receiver<InputTypes>&... > input_ports_type;
2978 typedef std::tuple< sender<OutputTypes>&... > output_ports_type;
2979
2980 private:
2981 std::unique_ptr<input_ports_type> my_input_ports;
2982 std::unique_ptr<output_ports_type> my_output_ports;
2983
2984 static const size_t NUM_INPUTS = sizeof...(InputTypes);
2985 static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
2986
2987 protected:
2988 void reset_node(reset_flags) override {}
2989
2990 public:
2991 composite_node( graph &g ) : graph_node(g) {
2992 fgt_multiinput_multioutput_node( CODEPTR(), FLOW_COMPOSITE_NODE, this, &this->my_graph );
2993 }
2994
2995 template<typename T1, typename T2>
2996 void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
2997 static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
2998 static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
2999
3000 fgt_internal_input_alias_helper<T1, NUM_INPUTS>::alias_port( this, input_ports_tuple);
3001 fgt_internal_output_alias_helper<T2, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
3002
3003 my_input_ports.reset( new input_ports_type(std::forward<T1>(input_ports_tuple)) );
3004 my_output_ports.reset( new output_ports_type(std::forward<T2>(output_ports_tuple)) );
3005 }
3006
3007 template< typename... NodeTypes >
3008 void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
3009
3010 template< typename... NodeTypes >
3011 void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
3012
3013
3014 input_ports_type& input_ports() {
3015 __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
3016 return *my_input_ports;
3017 }
3018
3019 output_ports_type& output_ports() {
3020 __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
3021 return *my_output_ports;
3022 }
3023 };
3024
3025
3026 template< typename... InputTypes>
3027 class composite_node <std::tuple<InputTypes...>, std::tuple<> > : public graph_node {
3028 public:
3029 typedef std::tuple< receiver<InputTypes>&... > input_ports_type;
3030
3031 private:
3032 std::unique_ptr<input_ports_type> my_input_ports;
3033 static const size_t NUM_INPUTS = sizeof...(InputTypes);
3034
3035 protected:
3036 void reset_node(reset_flags) override {}
3037
3038 public:
3039 composite_node( graph &g ) : graph_node(g) {
3040 fgt_composite( CODEPTR(), this, &g );
3041 }
3042
3043 template<typename T>
3044 void set_external_ports(T&& input_ports_tuple) {
3045 static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
3046
3047 fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, input_ports_tuple);
3048
3049 my_input_ports.reset( new input_ports_type(std::forward<T>(input_ports_tuple)) );
3050 }
3051
3052 template< typename... NodeTypes >
3053 void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
3054
3055 template< typename... NodeTypes >
3056 void add_nodes( const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
3057
3058
3059 input_ports_type& input_ports() {
3060 __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
3061 return *my_input_ports;
3062 }
3063
3064 };
3065
3066
3067 template<typename... OutputTypes>
3068 class composite_node <std::tuple<>, std::tuple<OutputTypes...> > : public graph_node {
3069 public:
3070 typedef std::tuple< sender<OutputTypes>&... > output_ports_type;
3071
3072 private:
3073 std::unique_ptr<output_ports_type> my_output_ports;
3074 static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3075
3076 protected:
3077 void reset_node(reset_flags) override {}
3078
3079 public:
3080 __TBB_NOINLINE_SYM composite_node( graph &g ) : graph_node(g) {
3081 fgt_composite( CODEPTR(), this, &g );
3082 }
3083
3084 template<typename T>
3085 void set_external_ports(T&& output_ports_tuple) {
3086 static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
3087
3088 fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
3089
3090 my_output_ports.reset( new output_ports_type(std::forward<T>(output_ports_tuple)) );
3091 }
3092
3093 template<typename... NodeTypes >
3094 void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
3095
3096 template<typename... NodeTypes >
3097 void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
3098
3099
3100 output_ports_type& output_ports() {
3101 __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
3102 return *my_output_ports;
3103 }
3104
3105 };
3106
3107 template<typename Gateway>
3108 class async_body_base: no_assign {
3109 public:
3110 typedef Gateway gateway_type;
3111
3112 async_body_base(gateway_type *gateway): my_gateway(gateway) { }
3113 void set_gateway(gateway_type *gateway) {
3114 my_gateway = gateway;
3115 }
3116
3117 protected:
3118 gateway_type *my_gateway;
3119 };
3120
3121 template<typename Input, typename Ports, typename Gateway, typename Body>
3122 class async_body: public async_body_base<Gateway> {
3123 private:
3124 Body my_body;
3125
3126 public:
3127 typedef async_body_base<Gateway> base_type;
3128 typedef Gateway gateway_type;
3129
3130 async_body(const Body &body, gateway_type *gateway)
3131 : base_type(gateway), my_body(body) { }
3132
3133 void operator()( const Input &v, Ports & ) noexcept(noexcept(tbb::detail::invoke(my_body, v, std::declval<gateway_type&>()))) {
3134 tbb::detail::invoke(my_body, v, *this->my_gateway);
3135 }
3136
3137 Body get_body() { return my_body; }
3138 };
3139
3140
3141 template < typename Input, typename Output,
3142 typename Policy = queueing_lightweight >
3143 __TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input>)
3144 class async_node
3145 : public multifunction_node< Input, std::tuple< Output >, Policy >, public sender< Output >
3146 {
3147 typedef multifunction_node< Input, std::tuple< Output >, Policy > base_type;
3148 typedef multifunction_input<
3149 Input, typename base_type::output_ports_type, Policy, cache_aligned_allocator<Input>> mfn_input_type;
3150
3151 public:
3152 typedef Input input_type;
3153 typedef Output output_type;
3154 typedef receiver<input_type> receiver_type;
3155 typedef receiver<output_type> successor_type;
3156 typedef sender<input_type> predecessor_type;
3157 typedef receiver_gateway<output_type> gateway_type;
3158 typedef async_body_base<gateway_type> async_body_base_type;
3159 typedef typename base_type::output_ports_type output_ports_type;
3160
3161 private:
3162 class receiver_gateway_impl: public receiver_gateway<Output> {
3163 public:
3164 receiver_gateway_impl(async_node* node): my_node(node) {}
3165 void reserve_wait() override {
3166 fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
3167 my_node->my_graph.reserve_wait();
3168 }
3169
3170 void release_wait() override {
3171 async_node* n = my_node;
3172 graph* g = &n->my_graph;
3173 g->release_wait();
3174 fgt_async_commit(static_cast<typename async_node::receiver_type *>(n), g);
3175 }
3176
3177
3178 bool try_put(const Output &i) override {
3179 return my_node->try_put_impl(i);
3180 }
3181
3182 private:
3183 async_node* my_node;
3184 } my_gateway;
3185
3186
3187 async_node* self() { return this; }
3188
3189
3190 bool try_put_impl(const Output &i) {
3191 multifunction_output<Output> &port_0 = output_port<0>(*this);
3192 broadcast_cache<output_type>& port_successors = port_0.successors();
3193 fgt_async_try_put_begin(this, &port_0);
3194
3195 graph_task_list tasks;
3196 bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
3197 __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
3198 "Return status is inconsistent with the method operation." );
3199
3200 while( !tasks.empty() ) {
3201 enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
3202 }
3203 fgt_async_try_put_end(this, &port_0);
3204 return is_at_least_one_put_successful;
3205 }
3206
3207 public:
3208 template<typename Body>
3209 __TBB_requires(async_node_body<Body, input_type, gateway_type>)
3210 __TBB_NOINLINE_SYM async_node(
3211 graph &g, size_t concurrency,
3212 Body body, Policy = Policy(), node_priority_t a_priority = no_priority
3213 ) : base_type(
3214 g, concurrency,
3215 async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
3216 (body, &my_gateway), a_priority ), my_gateway(self()) {
3217 fgt_multioutput_node_with_body<1>(
3218 CODEPTR(), FLOW_ASYNC_NODE,
3219 &this->my_graph, static_cast<receiver<input_type> *>(this),
3220 this->output_ports(), this->my_body
3221 );
3222 }
3223
3224 template <typename Body>
3225 __TBB_requires(async_node_body<Body, input_type, gateway_type>)
3226 __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority)
3227 : async_node(g, concurrency, body, Policy(), a_priority) {}
3228
3229 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3230 template <typename Body, typename... Args>
3231 __TBB_requires(async_node_body<Body, input_type, gateway_type>)
3232 __TBB_NOINLINE_SYM async_node(
3233 const node_set<Args...>& nodes, size_t concurrency, Body body,
3234 Policy = Policy(), node_priority_t a_priority = no_priority )
3235 : async_node(nodes.graph_reference(), concurrency, body, a_priority) {
3236 make_edges_in_order(nodes, *this);
3237 }
3238
3239 template <typename Body, typename... Args>
3240 __TBB_requires(async_node_body<Body, input_type, gateway_type>)
3241 __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority)
3242 : async_node(nodes, concurrency, body, Policy(), a_priority) {}
3243 #endif
3244
3245 __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
3246 static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
3247 static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
3248
3249 fgt_multioutput_node_with_body<1>( CODEPTR(), FLOW_ASYNC_NODE,
3250 &this->my_graph, static_cast<receiver<input_type> *>(this),
3251 this->output_ports(), this->my_body );
3252 }
3253
3254 gateway_type& gateway() {
3255 return my_gateway;
3256 }
3257
3258
3259
3260
3261 bool register_successor(successor_type&) override {
3262 __TBB_ASSERT(false, "Successors must be registered only via ports");
3263 return false;
3264 }
3265
3266
3267 bool remove_successor(successor_type&) override {
3268 __TBB_ASSERT(false, "Successors must be removed only via ports");
3269 return false;
3270 }
3271
3272 template<typename Body>
3273 Body copy_function_object() {
3274 typedef multifunction_body<input_type, typename base_type::output_ports_type> mfn_body_type;
3275 typedef async_body<Input, typename base_type::output_ports_type, gateway_type, Body> async_body_type;
3276 mfn_body_type &body_ref = *this->my_body;
3277 async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
3278 return ab.get_body();
3279 }
3280
3281 protected:
3282
3283 void reset_node( reset_flags f) override {
3284 base_type::reset_node(f);
3285 }
3286 };
3287
3288 #include "detail/_flow_graph_node_set_impl.h"
3289
3290 template< typename T >
3291 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
3292 public:
3293 typedef T input_type;
3294 typedef T output_type;
3295 typedef typename receiver<input_type>::predecessor_type predecessor_type;
3296 typedef typename sender<output_type>::successor_type successor_type;
3297
3298 __TBB_NOINLINE_SYM explicit overwrite_node(graph &g)
3299 : graph_node(g), my_successors(this), my_buffer_is_valid(false)
3300 {
3301 fgt_node( CODEPTR(), FLOW_OVERWRITE_NODE, &this->my_graph,
3302 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
3303 }
3304
3305 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3306 template <typename... Args>
3307 overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) {
3308 make_edges_in_order(nodes, *this);
3309 }
3310 #endif
3311
3312
3313 __TBB_NOINLINE_SYM overwrite_node( const overwrite_node& src ) : overwrite_node(src.my_graph) {}
3314
3315 ~overwrite_node() {}
3316
3317 bool register_successor( successor_type &s ) override {
3318 spin_mutex::scoped_lock l( my_mutex );
3319 if (my_buffer_is_valid && is_graph_active( my_graph )) {
3320
3321 bool ret = s.try_put( my_buffer );
3322 if ( ret ) {
3323
3324 my_successors.register_successor( s );
3325 } else {
3326
3327
3328
3329
3330 d1::small_object_allocator allocator{};
3331 typedef register_predecessor_task task_type;
3332 graph_task* t = allocator.new_object<task_type>(graph_reference(), allocator, *this, s);
3333 spawn_in_graph_arena( my_graph, *t );
3334 }
3335 } else {
3336
3337 my_successors.register_successor( s );
3338 }
3339 return true;
3340 }
3341
3342 bool remove_successor( successor_type &s ) override {
3343 spin_mutex::scoped_lock l( my_mutex );
3344 my_successors.remove_successor(s);
3345 return true;
3346 }
3347
3348 bool try_get( input_type &v ) override {
3349 spin_mutex::scoped_lock l( my_mutex );
3350 if ( my_buffer_is_valid ) {
3351 v = my_buffer;
3352 return true;
3353 }
3354 return false;
3355 }
3356
3357 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
3358 bool try_get( input_type &v, message_metainfo& metainfo ) override {
3359 spin_mutex::scoped_lock l( my_mutex );
3360 if (my_buffer_is_valid) {
3361 v = my_buffer;
3362 metainfo = my_buffered_metainfo;
3363
3364
3365
3366
3367
3368 for (auto msg_waiter : metainfo.waiters()) {
3369 msg_waiter->reserve(1);
3370 }
3371 return true;
3372 }
3373 return false;
3374 }
3375 #endif
3376
3377
3378 bool try_reserve( T &v ) override {
3379 return try_get(v);
3380 }
3381
3382 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
3383 private:
3384 bool try_reserve(T& v, message_metainfo& metainfo) override {
3385 spin_mutex::scoped_lock l( my_mutex );
3386 if (my_buffer_is_valid) {
3387 v = my_buffer;
3388 metainfo = my_buffered_metainfo;
3389 return true;
3390 }
3391 return false;
3392 }
3393 public:
3394 #endif
3395
3396
3397 bool try_release() override { return true; }
3398
3399
3400 bool try_consume() override { return true; }
3401
3402 bool is_valid() {
3403 spin_mutex::scoped_lock l( my_mutex );
3404 return my_buffer_is_valid;
3405 }
3406
3407 void clear() {
3408 spin_mutex::scoped_lock l( my_mutex );
3409 my_buffer_is_valid = false;
3410 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
3411 for (auto msg_waiter : my_buffered_metainfo.waiters()) {
3412 msg_waiter->release(1);
3413 }
3414 my_buffered_metainfo = message_metainfo{};
3415 #endif
3416 }
3417
3418 protected:
3419
3420 template< typename R, typename B > friend class run_and_put_task;
3421 template<typename X, typename Y> friend class broadcast_cache;
3422 template<typename X, typename Y> friend class round_robin_cache;
3423 graph_task* try_put_task( const input_type &v ) override {
3424 spin_mutex::scoped_lock l( my_mutex );
3425 return try_put_task_impl(v __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
3426 }
3427
3428 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
3429 graph_task* try_put_task(const input_type& v, const message_metainfo& metainfo) override {
3430 spin_mutex::scoped_lock l( my_mutex );
3431 return try_put_task_impl(v, metainfo);
3432 }
3433 #endif
3434
3435 graph_task * try_put_task_impl(const input_type &v __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) {
3436 my_buffer = v;
3437 my_buffer_is_valid = true;
3438 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
3439
3440 for (auto msg_waiter : metainfo.waiters()) {
3441 msg_waiter->reserve(1);
3442 }
3443
3444
3445 for (auto msg_waiter : my_buffered_metainfo.waiters()) {
3446 msg_waiter->release(1);
3447 }
3448
3449 my_buffered_metainfo = metainfo;
3450 #endif
3451 graph_task* rtask = my_successors.try_put_task(v __TBB_FLOW_GRAPH_METAINFO_ARG(my_buffered_metainfo) );
3452 if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
3453 return rtask;
3454 }
3455
3456 graph& graph_reference() const override {
3457 return my_graph;
3458 }
3459
3460
3461 struct register_predecessor_task : public graph_task {
3462 register_predecessor_task(
3463 graph& g, d1::small_object_allocator& allocator, predecessor_type& owner, successor_type& succ)
3464 : graph_task(g, allocator), o(owner), s(succ) {};
3465
3466 d1::task* execute(d1::execution_data& ed) override {
3467
3468 using tbb::detail::d2::register_predecessor;
3469 using tbb::detail::d2::register_successor;
3470 if ( !register_predecessor(s, o) ) {
3471 register_successor(o, s);
3472 }
3473 finalize<register_predecessor_task>(ed);
3474 return nullptr;
3475 }
3476
3477 d1::task* cancel(d1::execution_data& ed) override {
3478 finalize<register_predecessor_task>(ed);
3479 return nullptr;
3480 }
3481
3482 predecessor_type& o;
3483 successor_type& s;
3484 };
3485
3486 spin_mutex my_mutex;
3487 broadcast_cache< input_type, null_rw_mutex > my_successors;
3488 input_type my_buffer;
3489 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
3490 message_metainfo my_buffered_metainfo;
3491 #endif
3492 bool my_buffer_is_valid;
3493
3494 void reset_node( reset_flags f) override {
3495 my_buffer_is_valid = false;
3496 if (f&rf_clear_edges) {
3497 my_successors.clear();
3498 }
3499 }
3500 };
3501
3502 template< typename T >
3503 class write_once_node : public overwrite_node<T> {
3504 public:
3505 typedef T input_type;
3506 typedef T output_type;
3507 typedef overwrite_node<T> base_type;
3508 typedef typename receiver<input_type>::predecessor_type predecessor_type;
3509 typedef typename sender<output_type>::successor_type successor_type;
3510
3511
3512 __TBB_NOINLINE_SYM explicit write_once_node(graph& g) : base_type(g) {
3513 fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3514 static_cast<receiver<input_type> *>(this),
3515 static_cast<sender<output_type> *>(this) );
3516 }
3517
3518 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3519 template <typename... Args>
3520 write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) {
3521 make_edges_in_order(nodes, *this);
3522 }
3523 #endif
3524
3525
3526 __TBB_NOINLINE_SYM write_once_node( const write_once_node& src ) : base_type(src) {
3527 fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3528 static_cast<receiver<input_type> *>(this),
3529 static_cast<sender<output_type> *>(this) );
3530 }
3531
3532 protected:
3533 template< typename R, typename B > friend class run_and_put_task;
3534 template<typename X, typename Y> friend class broadcast_cache;
3535 template<typename X, typename Y> friend class round_robin_cache;
3536 graph_task *try_put_task( const T &v ) override {
3537 spin_mutex::scoped_lock l( this->my_mutex );
3538 return this->my_buffer_is_valid ? nullptr : this->try_put_task_impl(v __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
3539 }
3540
3541 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
3542 graph_task* try_put_task(const T& v, const message_metainfo& metainfo) override {
3543 spin_mutex::scoped_lock l( this->my_mutex );
3544 return this->my_buffer_is_valid ? nullptr : this->try_put_task_impl(v, metainfo);
3545 }
3546 #endif
3547 };
3548
3549 inline void set_name(const graph& g, const char *name) {
3550 fgt_graph_desc(&g, name);
3551 }
3552
3553 template <typename Output>
3554 inline void set_name(const input_node<Output>& node, const char *name) {
3555 fgt_node_desc(&node, name);
3556 }
3557
3558 template <typename Input, typename Output, typename Policy>
3559 inline void set_name(const function_node<Input, Output, Policy>& node, const char *name) {
3560 fgt_node_desc(&node, name);
3561 }
3562
3563 template <typename Output, typename Policy>
3564 inline void set_name(const continue_node<Output,Policy>& node, const char *name) {
3565 fgt_node_desc(&node, name);
3566 }
3567
3568 template <typename T>
3569 inline void set_name(const broadcast_node<T>& node, const char *name) {
3570 fgt_node_desc(&node, name);
3571 }
3572
3573 template <typename T>
3574 inline void set_name(const buffer_node<T>& node, const char *name) {
3575 fgt_node_desc(&node, name);
3576 }
3577
3578 template <typename T>
3579 inline void set_name(const queue_node<T>& node, const char *name) {
3580 fgt_node_desc(&node, name);
3581 }
3582
3583 template <typename T>
3584 inline void set_name(const sequencer_node<T>& node, const char *name) {
3585 fgt_node_desc(&node, name);
3586 }
3587
3588 template <typename T, typename Compare>
3589 inline void set_name(const priority_queue_node<T, Compare>& node, const char *name) {
3590 fgt_node_desc(&node, name);
3591 }
3592
3593 template <typename T, typename DecrementType>
3594 inline void set_name(const limiter_node<T, DecrementType>& node, const char *name) {
3595 fgt_node_desc(&node, name);
3596 }
3597
3598 template <typename OutputTuple, typename JP>
3599 inline void set_name(const join_node<OutputTuple, JP>& node, const char *name) {
3600 fgt_node_desc(&node, name);
3601 }
3602
3603 template <typename... Types>
3604 inline void set_name(const indexer_node<Types...>& node, const char *name) {
3605 fgt_node_desc(&node, name);
3606 }
3607
3608 template <typename T>
3609 inline void set_name(const overwrite_node<T>& node, const char *name) {
3610 fgt_node_desc(&node, name);
3611 }
3612
3613 template <typename T>
3614 inline void set_name(const write_once_node<T>& node, const char *name) {
3615 fgt_node_desc(&node, name);
3616 }
3617
3618 template<typename Input, typename Output, typename Policy>
3619 inline void set_name(const multifunction_node<Input, Output, Policy>& node, const char *name) {
3620 fgt_multioutput_node_desc(&node, name);
3621 }
3622
3623 template<typename TupleType>
3624 inline void set_name(const split_node<TupleType>& node, const char *name) {
3625 fgt_multioutput_node_desc(&node, name);
3626 }
3627
3628 template< typename InputTuple, typename OutputTuple >
3629 inline void set_name(const composite_node<InputTuple, OutputTuple>& node, const char *name) {
3630 fgt_multiinput_multioutput_node_desc(&node, name);
3631 }
3632
3633 template<typename Input, typename Output, typename Policy>
3634 inline void set_name(const async_node<Input, Output, Policy>& node, const char *name)
3635 {
3636 fgt_multioutput_node_desc(&node, name);
3637 }
3638 }
3639 }
3640 }
3641
3642
3643
3644 #include "detail/_flow_graph_nodes_deduction.h"
3645
3646 namespace tbb {
3647 namespace flow {
3648 inline namespace v1 {
3649 using detail::d2::receiver;
3650 using detail::d2::sender;
3651
3652 using detail::d2::serial;
3653 using detail::d2::unlimited;
3654
3655 using detail::d2::reset_flags;
3656 using detail::d2::rf_reset_protocol;
3657 using detail::d2::rf_reset_bodies;
3658 using detail::d2::rf_clear_edges;
3659
3660 using detail::d2::graph;
3661 using detail::d2::graph_node;
3662 using detail::d2::continue_msg;
3663
3664 using detail::d2::input_node;
3665 using detail::d2::function_node;
3666 using detail::d2::multifunction_node;
3667 using detail::d2::split_node;
3668 using detail::d2::output_port;
3669 using detail::d2::indexer_node;
3670 using detail::d2::tagged_msg;
3671 using detail::d2::cast_to;
3672 using detail::d2::is_a;
3673 using detail::d2::continue_node;
3674 using detail::d2::overwrite_node;
3675 using detail::d2::write_once_node;
3676 using detail::d2::broadcast_node;
3677 using detail::d2::buffer_node;
3678 using detail::d2::queue_node;
3679 using detail::d2::sequencer_node;
3680 using detail::d2::priority_queue_node;
3681 using detail::d2::limiter_node;
3682 using namespace detail::d2::graph_policy_namespace;
3683 using detail::d2::join_node;
3684 using detail::d2::input_port;
3685 using detail::d2::copy_body;
3686 using detail::d2::make_edge;
3687 using detail::d2::remove_edge;
3688 using detail::d2::tag_value;
3689 using detail::d2::composite_node;
3690 using detail::d2::async_node;
3691 using detail::d2::node_priority_t;
3692 using detail::d2::no_priority;
3693
3694 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3695 using detail::d2::follows;
3696 using detail::d2::precedes;
3697 using detail::d2::make_node_set;
3698 using detail::d2::make_edges;
3699 #endif
3700
3701 }
3702 }
3703
3704 using detail::d1::flow_control;
3705
3706 namespace profiling {
3707 using detail::d2::set_name;
3708 }
3709
3710 }
3711
3712
3713 #if TBB_USE_PROFILING_TOOLS && ( __unix__ || __APPLE__ )
3714
3715 #undef __TBB_NOINLINE_SYM
3716 #endif
3717
3718 #endif