Warning, file /include/oneapi/tbb/flow_graph.h was not indexed
or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB_flow_graph_H
0018 #define __TBB_flow_graph_H
0019
0020 #include <atomic>
0021 #include <memory>
0022 #include <type_traits>
0023
0024 #include "detail/_config.h"
0025 #include "detail/_namespace_injection.h"
0026 #include "spin_mutex.h"
0027 #include "null_mutex.h"
0028 #include "spin_rw_mutex.h"
0029 #include "null_rw_mutex.h"
0030 #include "detail/_pipeline_filters.h"
0031 #include "detail/_task.h"
0032 #include "detail/_small_object_pool.h"
0033 #include "cache_aligned_allocator.h"
0034 #include "detail/_exception.h"
0035 #include "detail/_template_helpers.h"
0036 #include "detail/_aggregator.h"
0037 #include "detail/_allocator_traits.h"
0038 #include "detail/_utils.h"
0039 #include "profiling.h"
0040 #include "task_arena.h"
0041
0042 #if TBB_USE_PROFILING_TOOLS && ( __unix__ || __APPLE__ )
0043 #if __INTEL_COMPILER
0044
0045 #pragma warning (push)
0046 #pragma warning( disable: 2196 )
0047 #endif
0048 #define __TBB_NOINLINE_SYM __attribute__((noinline))
0049 #else
0050 #define __TBB_NOINLINE_SYM
0051 #endif
0052
0053 #include <tuple>
0054 #include <list>
0055 #include <queue>
0056 #if __TBB_CPP20_CONCEPTS_PRESENT
0057 #include <concepts>
0058 #endif
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070 namespace tbb {
0071 namespace detail {
0072
0073 namespace d1 {
0074
0075
0076 enum concurrency { unlimited = 0, serial = 1 };
0077
0078
0079 struct null_type {};
0080
0081
0082 class continue_msg {};
0083
0084 }
0085
0086 #if __TBB_CPP20_CONCEPTS_PRESENT
0087 namespace d0 {
0088
0089 template <typename ReturnType, typename OutputType>
0090 concept node_body_return_type = std::same_as<OutputType, tbb::detail::d1::continue_msg> ||
0091 std::convertible_to<OutputType, ReturnType>;
0092
0093
0094 template <typename Body, typename Output>
0095 concept continue_node_body = std::copy_constructible<Body> &&
0096 requires( Body& body, const tbb::detail::d1::continue_msg& v ) {
0097 { body(v) } -> node_body_return_type<Output>;
0098 };
0099
0100 template <typename Body, typename Input, typename Output>
0101 concept function_node_body = std::copy_constructible<Body> &&
0102 std::invocable<Body&, const Input&> &&
0103 node_body_return_type<std::invoke_result_t<Body&, const Input&>, Output>;
0104
0105 template <typename FunctionObject, typename Input, typename Key>
0106 concept join_node_function_object = std::copy_constructible<FunctionObject> &&
0107 std::invocable<FunctionObject&, const Input&> &&
0108 std::convertible_to<std::invoke_result_t<FunctionObject&, const Input&>, Key>;
0109
0110 template <typename Body, typename Output>
0111 concept input_node_body = std::copy_constructible<Body> &&
0112 requires( Body& body, tbb::detail::d1::flow_control& fc ) {
0113 { body(fc) } -> adaptive_same_as<Output>;
0114 };
0115
0116 template <typename Body, typename Input, typename OutputPortsType>
0117 concept multifunction_node_body = std::copy_constructible<Body> &&
0118 std::invocable<Body&, const Input&, OutputPortsType&>;
0119
0120 template <typename Sequencer, typename Value>
0121 concept sequencer = std::copy_constructible<Sequencer> &&
0122 std::invocable<Sequencer&, const Value&> &&
0123 std::convertible_to<std::invoke_result_t<Sequencer&, const Value&>, std::size_t>;
0124
0125 template <typename Body, typename Input, typename GatewayType>
0126 concept async_node_body = std::copy_constructible<Body> &&
0127 std::invocable<Body&, const Input&, GatewayType&>;
0128
0129 }
0130 #endif
0131
0132 namespace d1 {
0133
0134
0135 template< typename T > class sender;
0136 template< typename T > class receiver;
0137 class continue_receiver;
0138
0139 template< typename T, typename U > class limiter_node;
0140
0141 template<typename T, typename M> class successor_cache;
0142 template<typename T, typename M> class broadcast_cache;
0143 template<typename T, typename M> class round_robin_cache;
0144 template<typename T, typename M> class predecessor_cache;
0145 template<typename T, typename M> class reservable_predecessor_cache;
0146
0147 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0148 namespace order {
0149 struct following;
0150 struct preceding;
0151 }
0152 template<typename Order, typename... Args> struct node_set;
0153 #endif
0154
0155
0156 }
0157 }
0158 }
0159
0160
0161 #include "detail/_flow_graph_impl.h"
0162
0163 namespace tbb {
0164 namespace detail {
0165 namespace d1 {
0166
0167 static inline std::pair<graph_task*, graph_task*> order_tasks(graph_task* first, graph_task* second) {
0168 if (second->priority > first->priority)
0169 return std::make_pair(second, first);
0170 return std::make_pair(first, second);
0171 }
0172
0173
0174 static inline graph_task* combine_tasks(graph& g, graph_task* left, graph_task* right) {
0175
0176 if (right == nullptr) return left;
0177
0178 if (left == nullptr) return right;
0179 if (left == SUCCESSFULLY_ENQUEUED) return right;
0180
0181 if (right != SUCCESSFULLY_ENQUEUED) {
0182
0183 auto tasks_pair = order_tasks(left, right);
0184 spawn_in_graph_arena(g, *tasks_pair.first);
0185 return tasks_pair.second;
0186 }
0187 return left;
0188 }
0189
0190
0191 template< typename T >
0192 class sender {
0193 public:
0194 virtual ~sender() {}
0195
0196
0197 virtual bool try_get( T & ) { return false; }
0198
0199
0200 virtual bool try_reserve( T & ) { return false; }
0201
0202
0203 virtual bool try_release( ) { return false; }
0204
0205
0206 virtual bool try_consume( ) { return false; }
0207
0208 protected:
0209
0210 typedef T output_type;
0211
0212
0213 typedef receiver<T> successor_type;
0214
0215
0216 virtual bool register_successor( successor_type &r ) = 0;
0217
0218
0219 virtual bool remove_successor( successor_type &r ) = 0;
0220
0221 template<typename C>
0222 friend bool register_successor(sender<C>& s, receiver<C>& r);
0223
0224 template<typename C>
0225 friend bool remove_successor (sender<C>& s, receiver<C>& r);
0226 };
0227
0228 template<typename C>
0229 bool register_successor(sender<C>& s, receiver<C>& r) {
0230 return s.register_successor(r);
0231 }
0232
0233 template<typename C>
0234 bool remove_successor(sender<C>& s, receiver<C>& r) {
0235 return s.remove_successor(r);
0236 }
0237
0238
0239 template< typename T >
0240 class receiver {
0241 public:
0242
0243 virtual ~receiver() {}
0244
0245
0246 bool try_put( const T& t ) {
0247 graph_task *res = try_put_task(t);
0248 if (!res) return false;
0249 if (res != SUCCESSFULLY_ENQUEUED) spawn_in_graph_arena(graph_reference(), *res);
0250 return true;
0251 }
0252
0253
0254 protected:
0255
0256 typedef T input_type;
0257
0258
0259 typedef sender<T> predecessor_type;
0260
0261 template< typename R, typename B > friend class run_and_put_task;
0262 template< typename X, typename Y > friend class broadcast_cache;
0263 template< typename X, typename Y > friend class round_robin_cache;
0264 virtual graph_task *try_put_task(const T& t) = 0;
0265 virtual graph& graph_reference() const = 0;
0266
0267 template<typename TT, typename M> friend class successor_cache;
0268 virtual bool is_continue_receiver() { return false; }
0269
0270
0271 virtual node_priority_t priority() const { return no_priority; }
0272
0273
0274 virtual bool register_predecessor( predecessor_type & ) { return false; }
0275
0276
0277 virtual bool remove_predecessor( predecessor_type & ) { return false; }
0278
0279 template <typename C>
0280 friend bool register_predecessor(receiver<C>& r, sender<C>& s);
0281 template <typename C>
0282 friend bool remove_predecessor (receiver<C>& r, sender<C>& s);
0283 };
0284
0285 template <typename C>
0286 bool register_predecessor(receiver<C>& r, sender<C>& s) {
0287 return r.register_predecessor(s);
0288 }
0289
0290 template <typename C>
0291 bool remove_predecessor(receiver<C>& r, sender<C>& s) {
0292 return r.remove_predecessor(s);
0293 }
0294
0295
0296
0297 class continue_receiver : public receiver< continue_msg > {
0298 protected:
0299
0300
0301 explicit continue_receiver( int number_of_predecessors, node_priority_t a_priority ) {
0302 my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
0303 my_current_count = 0;
0304 my_priority = a_priority;
0305 }
0306
0307
0308 continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
0309 my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
0310 my_current_count = 0;
0311 my_priority = src.my_priority;
0312 }
0313
0314
0315 bool register_predecessor( predecessor_type & ) override {
0316 spin_mutex::scoped_lock l(my_mutex);
0317 ++my_predecessor_count;
0318 return true;
0319 }
0320
0321
0322
0323
0324
0325 bool remove_predecessor( predecessor_type & ) override {
0326 spin_mutex::scoped_lock l(my_mutex);
0327 --my_predecessor_count;
0328 return true;
0329 }
0330
0331
0332 typedef continue_msg input_type;
0333
0334
0335 typedef receiver<input_type>::predecessor_type predecessor_type;
0336
0337 template< typename R, typename B > friend class run_and_put_task;
0338 template<typename X, typename Y> friend class broadcast_cache;
0339 template<typename X, typename Y> friend class round_robin_cache;
0340
0341 graph_task* try_put_task( const input_type & ) override {
0342 {
0343 spin_mutex::scoped_lock l(my_mutex);
0344 if ( ++my_current_count < my_predecessor_count )
0345 return SUCCESSFULLY_ENQUEUED;
0346 else
0347 my_current_count = 0;
0348 }
0349 graph_task* res = execute();
0350 return res? res : SUCCESSFULLY_ENQUEUED;
0351 }
0352
0353 spin_mutex my_mutex;
0354 int my_predecessor_count;
0355 int my_current_count;
0356 int my_initial_predecessor_count;
0357 node_priority_t my_priority;
0358
0359
0360 template<typename U, typename V> friend class limiter_node;
0361
0362 virtual void reset_receiver( reset_flags f ) {
0363 my_current_count = 0;
0364 if (f & rf_clear_edges) {
0365 my_predecessor_count = my_initial_predecessor_count;
0366 }
0367 }
0368
0369
0370
0371
0372 virtual graph_task* execute() = 0;
0373 template<typename TT, typename M> friend class successor_cache;
0374 bool is_continue_receiver() override { return true; }
0375
0376 node_priority_t priority() const override { return my_priority; }
0377 };
0378
0379 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
0380 template <typename K, typename T>
0381 K key_from_message( const T &t ) {
0382 return t.key();
0383 }
0384 #endif
0385
0386 }
0387 }
0388 }
0389
0390 #include "detail/_flow_graph_trace_impl.h"
0391 #include "detail/_hash_compare.h"
0392
0393 namespace tbb {
0394 namespace detail {
0395 namespace d1 {
0396
0397 #include "detail/_flow_graph_body_impl.h"
0398 #include "detail/_flow_graph_cache_impl.h"
0399 #include "detail/_flow_graph_types_impl.h"
0400
0401 using namespace graph_policy_namespace;
0402
0403 template <typename C, typename N>
0404 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(nullptr)
0405 {
0406 if (begin) current_node = my_graph->my_nodes;
0407
0408 }
0409
0410 template <typename C, typename N>
0411 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
0412 __TBB_ASSERT(current_node, "graph_iterator at end");
0413 return *operator->();
0414 }
0415
0416 template <typename C, typename N>
0417 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
0418 return current_node;
0419 }
0420
0421 template <typename C, typename N>
0422 void graph_iterator<C,N>::internal_forward() {
0423 if (current_node) current_node = current_node->next;
0424 }
0425
0426
0427 inline graph::graph() : my_wait_context(0), my_nodes(nullptr), my_nodes_last(nullptr), my_task_arena(nullptr) {
0428 prepare_task_arena();
0429 own_context = true;
0430 cancelled = false;
0431 caught_exception = false;
0432 my_context = new (r1::cache_aligned_allocate(sizeof(task_group_context))) task_group_context(FLOW_TASKS);
0433 fgt_graph(this);
0434 my_is_active = true;
0435 }
0436
0437 inline graph::graph(task_group_context& use_this_context) :
0438 my_wait_context(0), my_context(&use_this_context), my_nodes(nullptr), my_nodes_last(nullptr), my_task_arena(nullptr) {
0439 prepare_task_arena();
0440 own_context = false;
0441 cancelled = false;
0442 caught_exception = false;
0443 fgt_graph(this);
0444 my_is_active = true;
0445 }
0446
0447 inline graph::~graph() {
0448 wait_for_all();
0449 if (own_context) {
0450 my_context->~task_group_context();
0451 r1::cache_aligned_deallocate(my_context);
0452 }
0453 delete my_task_arena;
0454 }
0455
0456 inline void graph::reserve_wait() {
0457 my_wait_context.reserve();
0458 fgt_reserve_wait(this);
0459 }
0460
0461 inline void graph::release_wait() {
0462 fgt_release_wait(this);
0463 my_wait_context.release();
0464 }
0465
0466 inline void graph::register_node(graph_node *n) {
0467 n->next = nullptr;
0468 {
0469 spin_mutex::scoped_lock lock(nodelist_mutex);
0470 n->prev = my_nodes_last;
0471 if (my_nodes_last) my_nodes_last->next = n;
0472 my_nodes_last = n;
0473 if (!my_nodes) my_nodes = n;
0474 }
0475 }
0476
0477 inline void graph::remove_node(graph_node *n) {
0478 {
0479 spin_mutex::scoped_lock lock(nodelist_mutex);
0480 __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
0481 if (n->prev) n->prev->next = n->next;
0482 if (n->next) n->next->prev = n->prev;
0483 if (my_nodes_last == n) my_nodes_last = n->prev;
0484 if (my_nodes == n) my_nodes = n->next;
0485 }
0486 n->prev = n->next = nullptr;
0487 }
0488
0489 inline void graph::reset( reset_flags f ) {
0490
0491 deactivate_graph(*this);
0492
0493 my_context->reset();
0494 cancelled = false;
0495 caught_exception = false;
0496
0497 for(iterator ii = begin(); ii != end(); ++ii) {
0498 graph_node *my_p = &(*ii);
0499 my_p->reset_node(f);
0500 }
0501
0502
0503 prepare_task_arena( true );
0504 activate_graph(*this);
0505 }
0506
0507 inline void graph::cancel() {
0508 my_context->cancel_group_execution();
0509 }
0510
0511 inline graph::iterator graph::begin() { return iterator(this, true); }
0512
0513 inline graph::iterator graph::end() { return iterator(this, false); }
0514
0515 inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
0516
0517 inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
0518
0519 inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
0520
0521 inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
0522
0523 inline graph_node::graph_node(graph& g) : my_graph(g) {
0524 my_graph.register_node(this);
0525 }
0526
0527 inline graph_node::~graph_node() {
0528 my_graph.remove_node(this);
0529 }
0530
0531 #include "detail/_flow_graph_node_impl.h"
0532
0533
0534
0535
0536 template < typename Output >
0537 __TBB_requires(std::copyable<Output>)
0538 class input_node : public graph_node, public sender< Output > {
0539 public:
0540
0541 typedef Output output_type;
0542
0543
0544 typedef typename sender<output_type>::successor_type successor_type;
0545
0546
0547 typedef null_type input_type;
0548
0549
0550 template< typename Body >
0551 __TBB_requires(input_node_body<Body, Output>)
0552 __TBB_NOINLINE_SYM input_node( graph &g, Body body )
0553 : graph_node(g), my_active(false)
0554 , my_body( new input_body_leaf< output_type, Body>(body) )
0555 , my_init_body( new input_body_leaf< output_type, Body>(body) )
0556 , my_successors(this), my_reserved(false), my_has_cached_item(false)
0557 {
0558 fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph,
0559 static_cast<sender<output_type> *>(this), this->my_body);
0560 }
0561
0562 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0563 template <typename Body, typename... Successors>
0564 __TBB_requires(input_node_body<Body, Output>)
0565 input_node( const node_set<order::preceding, Successors...>& successors, Body body )
0566 : input_node(successors.graph_reference(), body)
0567 {
0568 make_edges(*this, successors);
0569 }
0570 #endif
0571
0572
0573 __TBB_NOINLINE_SYM input_node( const input_node& src )
0574 : graph_node(src.my_graph), sender<Output>()
0575 , my_active(false)
0576 , my_body(src.my_init_body->clone()), my_init_body(src.my_init_body->clone())
0577 , my_successors(this), my_reserved(false), my_has_cached_item(false)
0578 {
0579 fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph,
0580 static_cast<sender<output_type> *>(this), this->my_body);
0581 }
0582
0583
0584 ~input_node() { delete my_body; delete my_init_body; }
0585
0586
0587 bool register_successor( successor_type &r ) override {
0588 spin_mutex::scoped_lock lock(my_mutex);
0589 my_successors.register_successor(r);
0590 if ( my_active )
0591 spawn_put();
0592 return true;
0593 }
0594
0595
0596 bool remove_successor( successor_type &r ) override {
0597 spin_mutex::scoped_lock lock(my_mutex);
0598 my_successors.remove_successor(r);
0599 return true;
0600 }
0601
0602
0603 bool try_get( output_type &v ) override {
0604 spin_mutex::scoped_lock lock(my_mutex);
0605 if ( my_reserved )
0606 return false;
0607
0608 if ( my_has_cached_item ) {
0609 v = my_cached_item;
0610 my_has_cached_item = false;
0611 return true;
0612 }
0613
0614
0615 if ( my_active )
0616 spawn_put();
0617 return false;
0618 }
0619
0620
0621 bool try_reserve( output_type &v ) override {
0622 spin_mutex::scoped_lock lock(my_mutex);
0623 if ( my_reserved ) {
0624 return false;
0625 }
0626
0627 if ( my_has_cached_item ) {
0628 v = my_cached_item;
0629 my_reserved = true;
0630 return true;
0631 } else {
0632 return false;
0633 }
0634 }
0635
0636
0637
0638 bool try_release( ) override {
0639 spin_mutex::scoped_lock lock(my_mutex);
0640 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
0641 my_reserved = false;
0642 if(!my_successors.empty())
0643 spawn_put();
0644 return true;
0645 }
0646
0647
0648 bool try_consume( ) override {
0649 spin_mutex::scoped_lock lock(my_mutex);
0650 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
0651 my_reserved = false;
0652 my_has_cached_item = false;
0653 if ( !my_successors.empty() ) {
0654 spawn_put();
0655 }
0656 return true;
0657 }
0658
0659
0660 void activate() {
0661 spin_mutex::scoped_lock lock(my_mutex);
0662 my_active = true;
0663 if (!my_successors.empty())
0664 spawn_put();
0665 }
0666
0667 template<typename Body>
0668 Body copy_function_object() {
0669 input_body<output_type> &body_ref = *this->my_body;
0670 return dynamic_cast< input_body_leaf<output_type, Body> & >(body_ref).get_body();
0671 }
0672
0673 protected:
0674
0675
0676 void reset_node( reset_flags f) override {
0677 my_active = false;
0678 my_reserved = false;
0679 my_has_cached_item = false;
0680
0681 if(f & rf_clear_edges) my_successors.clear();
0682 if(f & rf_reset_bodies) {
0683 input_body<output_type> *tmp = my_init_body->clone();
0684 delete my_body;
0685 my_body = tmp;
0686 }
0687 }
0688
0689 private:
0690 spin_mutex my_mutex;
0691 bool my_active;
0692 input_body<output_type> *my_body;
0693 input_body<output_type> *my_init_body;
0694 broadcast_cache< output_type > my_successors;
0695 bool my_reserved;
0696 bool my_has_cached_item;
0697 output_type my_cached_item;
0698
0699
0700 bool try_reserve_apply_body(output_type &v) {
0701 spin_mutex::scoped_lock lock(my_mutex);
0702 if ( my_reserved ) {
0703 return false;
0704 }
0705 if ( !my_has_cached_item ) {
0706 flow_control control;
0707
0708 fgt_begin_body( my_body );
0709
0710 my_cached_item = (*my_body)(control);
0711 my_has_cached_item = !control.is_pipeline_stopped;
0712
0713 fgt_end_body( my_body );
0714 }
0715 if ( my_has_cached_item ) {
0716 v = my_cached_item;
0717 my_reserved = true;
0718 return true;
0719 } else {
0720 return false;
0721 }
0722 }
0723
0724 graph_task* create_put_task() {
0725 small_object_allocator allocator{};
0726 typedef input_node_task_bypass< input_node<output_type> > task_type;
0727 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
0728 my_graph.reserve_wait();
0729 return t;
0730 }
0731
0732
0733 void spawn_put( ) {
0734 if(is_graph_active(this->my_graph)) {
0735 spawn_in_graph_arena(this->my_graph, *create_put_task());
0736 }
0737 }
0738
0739 friend class input_node_task_bypass< input_node<output_type> >;
0740
0741 graph_task* apply_body_bypass( ) {
0742 output_type v;
0743 if ( !try_reserve_apply_body(v) )
0744 return nullptr;
0745
0746 graph_task *last_task = my_successors.try_put_task(v);
0747 if ( last_task )
0748 try_consume();
0749 else
0750 try_release();
0751 return last_task;
0752 }
0753 };
0754
0755
0756 template<typename Input, typename Output = continue_msg, typename Policy = queueing>
0757 __TBB_requires(std::default_initializable<Input> &&
0758 std::copy_constructible<Input> &&
0759 std::copy_constructible<Output>)
0760 class function_node
0761 : public graph_node
0762 , public function_input< Input, Output, Policy, cache_aligned_allocator<Input> >
0763 , public function_output<Output>
0764 {
0765 typedef cache_aligned_allocator<Input> internals_allocator;
0766
0767 public:
0768 typedef Input input_type;
0769 typedef Output output_type;
0770 typedef function_input<input_type,output_type,Policy,internals_allocator> input_impl_type;
0771 typedef function_input_queue<input_type, internals_allocator> input_queue_type;
0772 typedef function_output<output_type> fOutput_type;
0773 typedef typename input_impl_type::predecessor_type predecessor_type;
0774 typedef typename fOutput_type::successor_type successor_type;
0775
0776 using input_impl_type::my_predecessors;
0777
0778
0779
0780
0781
0782 template< typename Body >
0783 __TBB_requires(function_node_body<Body, Input, Output>)
0784 __TBB_NOINLINE_SYM function_node( graph &g, size_t concurrency,
0785 Body body, Policy = Policy(), node_priority_t a_priority = no_priority )
0786 : graph_node(g), input_impl_type(g, concurrency, body, a_priority),
0787 fOutput_type(g) {
0788 fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph,
0789 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
0790 }
0791
0792 template <typename Body>
0793 __TBB_requires(function_node_body<Body, Input, Output>)
0794 function_node( graph& g, size_t concurrency, Body body, node_priority_t a_priority )
0795 : function_node(g, concurrency, body, Policy(), a_priority) {}
0796
0797 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0798 template <typename Body, typename... Args>
0799 __TBB_requires(function_node_body<Body, Input, Output>)
0800 function_node( const node_set<Args...>& nodes, size_t concurrency, Body body,
0801 Policy p = Policy(), node_priority_t a_priority = no_priority )
0802 : function_node(nodes.graph_reference(), concurrency, body, p, a_priority) {
0803 make_edges_in_order(nodes, *this);
0804 }
0805
0806 template <typename Body, typename... Args>
0807 __TBB_requires(function_node_body<Body, Input, Output>)
0808 function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority )
0809 : function_node(nodes, concurrency, body, Policy(), a_priority) {}
0810 #endif
0811
0812
0813 __TBB_NOINLINE_SYM function_node( const function_node& src ) :
0814 graph_node(src.my_graph),
0815 input_impl_type(src),
0816 fOutput_type(src.my_graph) {
0817 fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph,
0818 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
0819 }
0820
0821 protected:
0822 template< typename R, typename B > friend class run_and_put_task;
0823 template<typename X, typename Y> friend class broadcast_cache;
0824 template<typename X, typename Y> friend class round_robin_cache;
0825 using input_impl_type::try_put_task;
0826
0827 broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; }
0828
0829 void reset_node(reset_flags f) override {
0830 input_impl_type::reset_function_input(f);
0831
0832 if(f & rf_clear_edges) {
0833 successors().clear();
0834 my_predecessors.clear();
0835 }
0836 __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
0837 __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
0838 }
0839
0840 };
0841
0842
0843
0844 template<typename Input, typename Output, typename Policy = queueing>
0845 __TBB_requires(std::default_initializable<Input> &&
0846 std::copy_constructible<Input>)
0847 class multifunction_node :
0848 public graph_node,
0849 public multifunction_input
0850 <
0851 Input,
0852 typename wrap_tuple_elements<
0853 std::tuple_size<Output>::value,
0854 multifunction_output,
0855 Output
0856 >::type,
0857 Policy,
0858 cache_aligned_allocator<Input>
0859 >
0860 {
0861 typedef cache_aligned_allocator<Input> internals_allocator;
0862
0863 protected:
0864 static const int N = std::tuple_size<Output>::value;
0865 public:
0866 typedef Input input_type;
0867 typedef null_type output_type;
0868 typedef typename wrap_tuple_elements<N,multifunction_output, Output>::type output_ports_type;
0869 typedef multifunction_input<
0870 input_type, output_ports_type, Policy, internals_allocator> input_impl_type;
0871 typedef function_input_queue<input_type, internals_allocator> input_queue_type;
0872 private:
0873 using input_impl_type::my_predecessors;
0874 public:
0875 template<typename Body>
0876 __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
0877 __TBB_NOINLINE_SYM multifunction_node(
0878 graph &g, size_t concurrency,
0879 Body body, Policy = Policy(), node_priority_t a_priority = no_priority
0880 ) : graph_node(g), input_impl_type(g, concurrency, body, a_priority) {
0881 fgt_multioutput_node_with_body<N>(
0882 CODEPTR(), FLOW_MULTIFUNCTION_NODE,
0883 &this->my_graph, static_cast<receiver<input_type> *>(this),
0884 this->output_ports(), this->my_body
0885 );
0886 }
0887
0888 template <typename Body>
0889 __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
0890 __TBB_NOINLINE_SYM multifunction_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority)
0891 : multifunction_node(g, concurrency, body, Policy(), a_priority) {}
0892
0893 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0894 template <typename Body, typename... Args>
0895 __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
0896 __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body,
0897 Policy p = Policy(), node_priority_t a_priority = no_priority)
0898 : multifunction_node(nodes.graph_reference(), concurrency, body, p, a_priority) {
0899 make_edges_in_order(nodes, *this);
0900 }
0901
0902 template <typename Body, typename... Args>
0903 __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
0904 __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority)
0905 : multifunction_node(nodes, concurrency, body, Policy(), a_priority) {}
0906 #endif
0907
0908 __TBB_NOINLINE_SYM multifunction_node( const multifunction_node &other) :
0909 graph_node(other.my_graph), input_impl_type(other) {
0910 fgt_multioutput_node_with_body<N>( CODEPTR(), FLOW_MULTIFUNCTION_NODE,
0911 &this->my_graph, static_cast<receiver<input_type> *>(this),
0912 this->output_ports(), this->my_body );
0913 }
0914
0915
0916 protected:
0917 void reset_node(reset_flags f) override { input_impl_type::reset(f); }
0918 };
0919
0920
0921
0922 template<typename TupleType>
0923 class split_node : public graph_node, public receiver<TupleType> {
0924 static const int N = std::tuple_size<TupleType>::value;
0925 typedef receiver<TupleType> base_type;
0926 public:
0927 typedef TupleType input_type;
0928 typedef typename wrap_tuple_elements<
0929 N,
0930 multifunction_output,
0931 TupleType
0932 >::type output_ports_type;
0933
0934 __TBB_NOINLINE_SYM explicit split_node(graph &g)
0935 : graph_node(g),
0936 my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports))
0937 {
0938 fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph,
0939 static_cast<receiver<input_type> *>(this), this->output_ports());
0940 }
0941
0942 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0943 template <typename... Args>
0944 __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) {
0945 make_edges_in_order(nodes, *this);
0946 }
0947 #endif
0948
0949 __TBB_NOINLINE_SYM split_node(const split_node& other)
0950 : graph_node(other.my_graph), base_type(other),
0951 my_output_ports(init_output_ports<output_ports_type>::call(other.my_graph, my_output_ports))
0952 {
0953 fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph,
0954 static_cast<receiver<input_type> *>(this), this->output_ports());
0955 }
0956
0957 output_ports_type &output_ports() { return my_output_ports; }
0958
0959 protected:
0960 graph_task *try_put_task(const TupleType& t) override {
0961
0962
0963 return emit_element<N>::emit_this(this->my_graph, t, output_ports());
0964 }
0965 void reset_node(reset_flags f) override {
0966 if (f & rf_clear_edges)
0967 clear_element<N>::clear_this(my_output_ports);
0968
0969 __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "split_node reset failed");
0970 }
0971 graph& graph_reference() const override {
0972 return my_graph;
0973 }
0974
0975 private:
0976 output_ports_type my_output_ports;
0977 };
0978
0979
0980 template <typename Output, typename Policy = Policy<void> >
0981 __TBB_requires(std::copy_constructible<Output>)
0982 class continue_node : public graph_node, public continue_input<Output, Policy>,
0983 public function_output<Output> {
0984 public:
0985 typedef continue_msg input_type;
0986 typedef Output output_type;
0987 typedef continue_input<Output, Policy> input_impl_type;
0988 typedef function_output<output_type> fOutput_type;
0989 typedef typename input_impl_type::predecessor_type predecessor_type;
0990 typedef typename fOutput_type::successor_type successor_type;
0991
0992
0993 template <typename Body >
0994 __TBB_requires(continue_node_body<Body, Output>)
0995 __TBB_NOINLINE_SYM continue_node(
0996 graph &g,
0997 Body body, Policy = Policy(), node_priority_t a_priority = no_priority
0998 ) : graph_node(g), input_impl_type( g, body, a_priority ),
0999 fOutput_type(g) {
1000 fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1001
1002 static_cast<receiver<input_type> *>(this),
1003 static_cast<sender<output_type> *>(this), this->my_body );
1004 }
1005
1006 template <typename Body>
1007 __TBB_requires(continue_node_body<Body, Output>)
1008 continue_node( graph& g, Body body, node_priority_t a_priority )
1009 : continue_node(g, body, Policy(), a_priority) {}
1010
1011 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1012 template <typename Body, typename... Args>
1013 __TBB_requires(continue_node_body<Body, Output>)
1014 continue_node( const node_set<Args...>& nodes, Body body,
1015 Policy p = Policy(), node_priority_t a_priority = no_priority )
1016 : continue_node(nodes.graph_reference(), body, p, a_priority ) {
1017 make_edges_in_order(nodes, *this);
1018 }
1019 template <typename Body, typename... Args>
1020 __TBB_requires(continue_node_body<Body, Output>)
1021 continue_node( const node_set<Args...>& nodes, Body body, node_priority_t a_priority)
1022 : continue_node(nodes, body, Policy(), a_priority) {}
1023 #endif
1024
1025
1026 template <typename Body >
1027 __TBB_requires(continue_node_body<Body, Output>)
1028 __TBB_NOINLINE_SYM continue_node(
1029 graph &g, int number_of_predecessors,
1030 Body body, Policy = Policy(), node_priority_t a_priority = no_priority
1031 ) : graph_node(g)
1032 , input_impl_type(g, number_of_predecessors, body, a_priority),
1033 fOutput_type(g) {
1034 fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1035 static_cast<receiver<input_type> *>(this),
1036 static_cast<sender<output_type> *>(this), this->my_body );
1037 }
1038
1039 template <typename Body>
1040 __TBB_requires(continue_node_body<Body, Output>)
1041 continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t a_priority)
1042 : continue_node(g, number_of_predecessors, body, Policy(), a_priority) {}
1043
1044 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1045 template <typename Body, typename... Args>
1046 __TBB_requires(continue_node_body<Body, Output>)
1047 continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1048 Body body, Policy p = Policy(), node_priority_t a_priority = no_priority )
1049 : continue_node(nodes.graph_reference(), number_of_predecessors, body, p, a_priority) {
1050 make_edges_in_order(nodes, *this);
1051 }
1052
1053 template <typename Body, typename... Args>
1054 __TBB_requires(continue_node_body<Body, Output>)
1055 continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1056 Body body, node_priority_t a_priority )
1057 : continue_node(nodes, number_of_predecessors, body, Policy(), a_priority) {}
1058 #endif
1059
1060
1061 __TBB_NOINLINE_SYM continue_node( const continue_node& src ) :
1062 graph_node(src.my_graph), input_impl_type(src),
1063 function_output<Output>(src.my_graph) {
1064 fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1065 static_cast<receiver<input_type> *>(this),
1066 static_cast<sender<output_type> *>(this), this->my_body );
1067 }
1068
1069 protected:
1070 template< typename R, typename B > friend class run_and_put_task;
1071 template<typename X, typename Y> friend class broadcast_cache;
1072 template<typename X, typename Y> friend class round_robin_cache;
1073 using input_impl_type::try_put_task;
1074 broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; }
1075
1076 void reset_node(reset_flags f) override {
1077 input_impl_type::reset_receiver(f);
1078 if(f & rf_clear_edges)successors().clear();
1079 __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1080 }
1081 };
1082
1083
1084 template <typename T>
1085 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1086 public:
1087 typedef T input_type;
1088 typedef T output_type;
1089 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1090 typedef typename sender<output_type>::successor_type successor_type;
1091 private:
1092 broadcast_cache<input_type> my_successors;
1093 public:
1094
1095 __TBB_NOINLINE_SYM explicit broadcast_node(graph& g) : graph_node(g), my_successors(this) {
1096 fgt_node( CODEPTR(), FLOW_BROADCAST_NODE, &this->my_graph,
1097 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1098 }
1099
1100 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1101 template <typename... Args>
1102 broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) {
1103 make_edges_in_order(nodes, *this);
1104 }
1105 #endif
1106
1107
1108 __TBB_NOINLINE_SYM broadcast_node( const broadcast_node& src ) : broadcast_node(src.my_graph) {}
1109
1110
1111 bool register_successor( successor_type &r ) override {
1112 my_successors.register_successor( r );
1113 return true;
1114 }
1115
1116
1117 bool remove_successor( successor_type &r ) override {
1118 my_successors.remove_successor( r );
1119 return true;
1120 }
1121
1122 protected:
1123 template< typename R, typename B > friend class run_and_put_task;
1124 template<typename X, typename Y> friend class broadcast_cache;
1125 template<typename X, typename Y> friend class round_robin_cache;
1126
1127 graph_task *try_put_task(const T& t) override {
1128 graph_task *new_task = my_successors.try_put_task(t);
1129 if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
1130 return new_task;
1131 }
1132
1133 graph& graph_reference() const override {
1134 return my_graph;
1135 }
1136
1137 void reset_node(reset_flags f) override {
1138 if (f&rf_clear_edges) {
1139 my_successors.clear();
1140 }
1141 __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
1142 }
1143 };
1144
1145
1146 template <typename T>
1147 class buffer_node
1148 : public graph_node
1149 , public reservable_item_buffer< T, cache_aligned_allocator<T> >
1150 , public receiver<T>, public sender<T>
1151 {
1152 typedef cache_aligned_allocator<T> internals_allocator;
1153
1154 public:
1155 typedef T input_type;
1156 typedef T output_type;
1157 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1158 typedef typename sender<output_type>::successor_type successor_type;
1159 typedef buffer_node<T> class_type;
1160
1161 protected:
1162 typedef size_t size_type;
1163 round_robin_cache< T, null_rw_mutex > my_successors;
1164
1165 friend class forward_task_bypass< class_type >;
1166
1167 enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
1168 };
1169
1170
1171 class buffer_operation : public aggregated_operation< buffer_operation > {
1172 public:
1173 char type;
1174 T* elem;
1175 graph_task* ltask;
1176 successor_type *r;
1177
1178 buffer_operation(const T& e, op_type t) : type(char(t))
1179 , elem(const_cast<T*>(&e)) , ltask(nullptr)
1180 , r(nullptr)
1181 {}
1182 buffer_operation(op_type t) : type(char(t)), elem(nullptr), ltask(nullptr), r(nullptr) {}
1183 };
1184
1185 bool forwarder_busy;
1186 typedef aggregating_functor<class_type, buffer_operation> handler_type;
1187 friend class aggregating_functor<class_type, buffer_operation>;
1188 aggregator< handler_type, buffer_operation> my_aggregator;
1189
1190 virtual void handle_operations(buffer_operation *op_list) {
1191 handle_operations_impl(op_list, this);
1192 }
1193
1194 template<typename derived_type>
1195 void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
1196 __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1197
1198 buffer_operation *tmp = nullptr;
1199 bool try_forwarding = false;
1200 while (op_list) {
1201 tmp = op_list;
1202 op_list = op_list->next;
1203 switch (tmp->type) {
1204 case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
1205 case rem_succ: internal_rem_succ(tmp); break;
1206 case req_item: internal_pop(tmp); break;
1207 case res_item: internal_reserve(tmp); break;
1208 case rel_res: internal_release(tmp); try_forwarding = true; break;
1209 case con_res: internal_consume(tmp); try_forwarding = true; break;
1210 case put_item: try_forwarding = internal_push(tmp); break;
1211 case try_fwd_task: internal_forward_task(tmp); break;
1212 }
1213 }
1214
1215 derived->order();
1216
1217 if (try_forwarding && !forwarder_busy) {
1218 if(is_graph_active(this->my_graph)) {
1219 forwarder_busy = true;
1220 typedef forward_task_bypass<class_type> task_type;
1221 small_object_allocator allocator{};
1222 graph_task* new_task = allocator.new_object<task_type>(graph_reference(), allocator, *this);
1223 my_graph.reserve_wait();
1224
1225
1226
1227
1228
1229
1230 graph_task *z = tmp->ltask;
1231 graph &g = this->my_graph;
1232 tmp->ltask = combine_tasks(g, z, new_task);
1233 }
1234 }
1235 }
1236
1237 inline graph_task *grab_forwarding_task( buffer_operation &op_data) {
1238 return op_data.ltask;
1239 }
1240
1241 inline bool enqueue_forwarding_task(buffer_operation &op_data) {
1242 graph_task *ft = grab_forwarding_task(op_data);
1243 if(ft) {
1244 spawn_in_graph_arena(graph_reference(), *ft);
1245 return true;
1246 }
1247 return false;
1248 }
1249
1250
1251 virtual graph_task *forward_task() {
1252 buffer_operation op_data(try_fwd_task);
1253 graph_task *last_task = nullptr;
1254 do {
1255 op_data.status = WAIT;
1256 op_data.ltask = nullptr;
1257 my_aggregator.execute(&op_data);
1258
1259
1260 graph_task *xtask = op_data.ltask;
1261 graph& g = this->my_graph;
1262 last_task = combine_tasks(g, last_task, xtask);
1263 } while (op_data.status ==SUCCEEDED);
1264 return last_task;
1265 }
1266
1267
1268 virtual void internal_reg_succ(buffer_operation *op) {
1269 __TBB_ASSERT(op->r, nullptr);
1270 my_successors.register_successor(*(op->r));
1271 op->status.store(SUCCEEDED, std::memory_order_release);
1272 }
1273
1274
1275 virtual void internal_rem_succ(buffer_operation *op) {
1276 __TBB_ASSERT(op->r, nullptr);
1277 my_successors.remove_successor(*(op->r));
1278 op->status.store(SUCCEEDED, std::memory_order_release);
1279 }
1280
1281 private:
1282 void order() {}
1283
1284 bool is_item_valid() {
1285 return this->my_item_valid(this->my_tail - 1);
1286 }
1287
1288 void try_put_and_add_task(graph_task*& last_task) {
1289 graph_task *new_task = my_successors.try_put_task(this->back());
1290 if (new_task) {
1291
1292 graph& g = this->my_graph;
1293 last_task = combine_tasks(g, last_task, new_task);
1294 this->destroy_back();
1295 }
1296 }
1297
1298 protected:
1299
1300 virtual void internal_forward_task(buffer_operation *op) {
1301 internal_forward_task_impl(op, this);
1302 }
1303
1304 template<typename derived_type>
1305 void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
1306 __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1307
1308 if (this->my_reserved || !derived->is_item_valid()) {
1309 op->status.store(FAILED, std::memory_order_release);
1310 this->forwarder_busy = false;
1311 return;
1312 }
1313
1314 graph_task* last_task = nullptr;
1315 size_type counter = my_successors.size();
1316 for (; counter > 0 && derived->is_item_valid(); --counter)
1317 derived->try_put_and_add_task(last_task);
1318
1319 op->ltask = last_task;
1320 if (last_task && !counter) {
1321 op->status.store(SUCCEEDED, std::memory_order_release);
1322 }
1323 else {
1324 op->status.store(FAILED, std::memory_order_release);
1325 forwarder_busy = false;
1326 }
1327 }
1328
1329 virtual bool internal_push(buffer_operation *op) {
1330 __TBB_ASSERT(op->elem, nullptr);
1331 this->push_back(*(op->elem));
1332 op->status.store(SUCCEEDED, std::memory_order_release);
1333 return true;
1334 }
1335
1336 virtual void internal_pop(buffer_operation *op) {
1337 __TBB_ASSERT(op->elem, nullptr);
1338 if(this->pop_back(*(op->elem))) {
1339 op->status.store(SUCCEEDED, std::memory_order_release);
1340 }
1341 else {
1342 op->status.store(FAILED, std::memory_order_release);
1343 }
1344 }
1345
1346 virtual void internal_reserve(buffer_operation *op) {
1347 __TBB_ASSERT(op->elem, nullptr);
1348 if(this->reserve_front(*(op->elem))) {
1349 op->status.store(SUCCEEDED, std::memory_order_release);
1350 }
1351 else {
1352 op->status.store(FAILED, std::memory_order_release);
1353 }
1354 }
1355
1356 virtual void internal_consume(buffer_operation *op) {
1357 this->consume_front();
1358 op->status.store(SUCCEEDED, std::memory_order_release);
1359 }
1360
1361 virtual void internal_release(buffer_operation *op) {
1362 this->release_front();
1363 op->status.store(SUCCEEDED, std::memory_order_release);
1364 }
1365
1366 public:
1367
1368 __TBB_NOINLINE_SYM explicit buffer_node( graph &g )
1369 : graph_node(g), reservable_item_buffer<T, internals_allocator>(), receiver<T>(),
1370 sender<T>(), my_successors(this), forwarder_busy(false)
1371 {
1372 my_aggregator.initialize_handler(handler_type(this));
1373 fgt_node( CODEPTR(), FLOW_BUFFER_NODE, &this->my_graph,
1374 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1375 }
1376
1377 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1378 template <typename... Args>
1379 buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) {
1380 make_edges_in_order(nodes, *this);
1381 }
1382 #endif
1383
1384
1385 __TBB_NOINLINE_SYM buffer_node( const buffer_node& src ) : buffer_node(src.my_graph) {}
1386
1387
1388
1389
1390
1391
1392
1393 bool register_successor( successor_type &r ) override {
1394 buffer_operation op_data(reg_succ);
1395 op_data.r = &r;
1396 my_aggregator.execute(&op_data);
1397 (void)enqueue_forwarding_task(op_data);
1398 return true;
1399 }
1400
1401
1402
1403
1404 bool remove_successor( successor_type &r ) override {
1405
1406 tbb::detail::d1::remove_predecessor(r, *this);
1407 buffer_operation op_data(rem_succ);
1408 op_data.r = &r;
1409 my_aggregator.execute(&op_data);
1410
1411
1412
1413 (void)enqueue_forwarding_task(op_data);
1414 return true;
1415 }
1416
1417
1418
1419
1420 bool try_get( T &v ) override {
1421 buffer_operation op_data(req_item);
1422 op_data.elem = &v;
1423 my_aggregator.execute(&op_data);
1424 (void)enqueue_forwarding_task(op_data);
1425 return (op_data.status==SUCCEEDED);
1426 }
1427
1428
1429
1430
1431 bool try_reserve( T &v ) override {
1432 buffer_operation op_data(res_item);
1433 op_data.elem = &v;
1434 my_aggregator.execute(&op_data);
1435 (void)enqueue_forwarding_task(op_data);
1436 return (op_data.status==SUCCEEDED);
1437 }
1438
1439
1440
1441 bool try_release() override {
1442 buffer_operation op_data(rel_res);
1443 my_aggregator.execute(&op_data);
1444 (void)enqueue_forwarding_task(op_data);
1445 return true;
1446 }
1447
1448
1449
1450 bool try_consume() override {
1451 buffer_operation op_data(con_res);
1452 my_aggregator.execute(&op_data);
1453 (void)enqueue_forwarding_task(op_data);
1454 return true;
1455 }
1456
1457 protected:
1458
1459 template< typename R, typename B > friend class run_and_put_task;
1460 template<typename X, typename Y> friend class broadcast_cache;
1461 template<typename X, typename Y> friend class round_robin_cache;
1462
1463 graph_task *try_put_task(const T &t) override {
1464 buffer_operation op_data(t, put_item);
1465 my_aggregator.execute(&op_data);
1466 graph_task *ft = grab_forwarding_task(op_data);
1467
1468
1469
1470 if(ft && op_data.status ==FAILED) {
1471
1472
1473
1474 spawn_in_graph_arena(graph_reference(), *ft); ft = nullptr;
1475 }
1476 else if(!ft && op_data.status ==SUCCEEDED) {
1477 ft = SUCCESSFULLY_ENQUEUED;
1478 }
1479 return ft;
1480 }
1481
1482 graph& graph_reference() const override {
1483 return my_graph;
1484 }
1485
1486 protected:
1487 void reset_node( reset_flags f) override {
1488 reservable_item_buffer<T, internals_allocator>::reset();
1489
1490 if (f&rf_clear_edges) {
1491 my_successors.clear();
1492 }
1493 forwarder_busy = false;
1494 }
1495 };
1496
1497
1498 template <typename T>
1499 class queue_node : public buffer_node<T> {
1500 protected:
1501 typedef buffer_node<T> base_type;
1502 typedef typename base_type::size_type size_type;
1503 typedef typename base_type::buffer_operation queue_operation;
1504 typedef queue_node class_type;
1505
1506 private:
1507 template<typename> friend class buffer_node;
1508
1509 bool is_item_valid() {
1510 return this->my_item_valid(this->my_head);
1511 }
1512
1513 void try_put_and_add_task(graph_task*& last_task) {
1514 graph_task *new_task = this->my_successors.try_put_task(this->front());
1515 if (new_task) {
1516
1517 graph& graph_ref = this->graph_reference();
1518 last_task = combine_tasks(graph_ref, last_task, new_task);
1519 this->destroy_front();
1520 }
1521 }
1522
1523 protected:
1524 void internal_forward_task(queue_operation *op) override {
1525 this->internal_forward_task_impl(op, this);
1526 }
1527
1528 void internal_pop(queue_operation *op) override {
1529 if ( this->my_reserved || !this->my_item_valid(this->my_head)){
1530 op->status.store(FAILED, std::memory_order_release);
1531 }
1532 else {
1533 this->pop_front(*(op->elem));
1534 op->status.store(SUCCEEDED, std::memory_order_release);
1535 }
1536 }
1537 void internal_reserve(queue_operation *op) override {
1538 if (this->my_reserved || !this->my_item_valid(this->my_head)) {
1539 op->status.store(FAILED, std::memory_order_release);
1540 }
1541 else {
1542 this->reserve_front(*(op->elem));
1543 op->status.store(SUCCEEDED, std::memory_order_release);
1544 }
1545 }
1546 void internal_consume(queue_operation *op) override {
1547 this->consume_front();
1548 op->status.store(SUCCEEDED, std::memory_order_release);
1549 }
1550
1551 public:
1552 typedef T input_type;
1553 typedef T output_type;
1554 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1555 typedef typename sender<output_type>::successor_type successor_type;
1556
1557
1558 __TBB_NOINLINE_SYM explicit queue_node( graph &g ) : base_type(g) {
1559 fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph),
1560 static_cast<receiver<input_type> *>(this),
1561 static_cast<sender<output_type> *>(this) );
1562 }
1563
1564 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1565 template <typename... Args>
1566 queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) {
1567 make_edges_in_order(nodes, *this);
1568 }
1569 #endif
1570
1571
1572 __TBB_NOINLINE_SYM queue_node( const queue_node& src) : base_type(src) {
1573 fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph),
1574 static_cast<receiver<input_type> *>(this),
1575 static_cast<sender<output_type> *>(this) );
1576 }
1577
1578
1579 protected:
1580 void reset_node( reset_flags f) override {
1581 base_type::reset_node(f);
1582 }
1583 };
1584
1585
1586 template <typename T>
1587 __TBB_requires(std::copyable<T>)
1588 class sequencer_node : public queue_node<T> {
1589 function_body< T, size_t > *my_sequencer;
1590
1591
1592 public:
1593 typedef T input_type;
1594 typedef T output_type;
1595 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1596 typedef typename sender<output_type>::successor_type successor_type;
1597
1598
1599 template< typename Sequencer >
1600 __TBB_requires(sequencer<Sequencer, T>)
1601 __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T>(g),
1602 my_sequencer(new function_body_leaf< T, size_t, Sequencer>(s) ) {
1603 fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph),
1604 static_cast<receiver<input_type> *>(this),
1605 static_cast<sender<output_type> *>(this) );
1606 }
1607
1608 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1609 template <typename Sequencer, typename... Args>
1610 __TBB_requires(sequencer<Sequencer, T>)
1611 sequencer_node( const node_set<Args...>& nodes, const Sequencer& s)
1612 : sequencer_node(nodes.graph_reference(), s) {
1613 make_edges_in_order(nodes, *this);
1614 }
1615 #endif
1616
1617
1618 __TBB_NOINLINE_SYM sequencer_node( const sequencer_node& src ) : queue_node<T>(src),
1619 my_sequencer( src.my_sequencer->clone() ) {
1620 fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph),
1621 static_cast<receiver<input_type> *>(this),
1622 static_cast<sender<output_type> *>(this) );
1623 }
1624
1625
1626 ~sequencer_node() { delete my_sequencer; }
1627
1628 protected:
1629 typedef typename buffer_node<T>::size_type size_type;
1630 typedef typename buffer_node<T>::buffer_operation sequencer_operation;
1631
1632 private:
1633 bool internal_push(sequencer_operation *op) override {
1634 size_type tag = (*my_sequencer)(*(op->elem));
1635 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
1636 if (tag < this->my_head) {
1637
1638 op->status.store(FAILED, std::memory_order_release);
1639 return false;
1640 }
1641 #endif
1642
1643 size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
1644
1645 if (this->size(new_tail) > this->capacity()) {
1646 this->grow_my_array(this->size(new_tail));
1647 }
1648 this->my_tail = new_tail;
1649
1650 const op_stat res = this->place_item(tag, *(op->elem)) ? SUCCEEDED : FAILED;
1651 op->status.store(res, std::memory_order_release);
1652 return res ==SUCCEEDED;
1653 }
1654 };
1655
1656
1657 template<typename T, typename Compare = std::less<T>>
1658 class priority_queue_node : public buffer_node<T> {
1659 public:
1660 typedef T input_type;
1661 typedef T output_type;
1662 typedef buffer_node<T> base_type;
1663 typedef priority_queue_node class_type;
1664 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1665 typedef typename sender<output_type>::successor_type successor_type;
1666
1667
1668 __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() )
1669 : buffer_node<T>(g), compare(comp), mark(0) {
1670 fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
1671 static_cast<receiver<input_type> *>(this),
1672 static_cast<sender<output_type> *>(this) );
1673 }
1674
1675 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1676 template <typename... Args>
1677 priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare())
1678 : priority_queue_node(nodes.graph_reference(), comp) {
1679 make_edges_in_order(nodes, *this);
1680 }
1681 #endif
1682
1683
1684 __TBB_NOINLINE_SYM priority_queue_node( const priority_queue_node &src )
1685 : buffer_node<T>(src), mark(0)
1686 {
1687 fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
1688 static_cast<receiver<input_type> *>(this),
1689 static_cast<sender<output_type> *>(this) );
1690 }
1691
1692 protected:
1693
1694 void reset_node( reset_flags f) override {
1695 mark = 0;
1696 base_type::reset_node(f);
1697 }
1698
1699 typedef typename buffer_node<T>::size_type size_type;
1700 typedef typename buffer_node<T>::item_type item_type;
1701 typedef typename buffer_node<T>::buffer_operation prio_operation;
1702
1703
1704 void internal_forward_task(prio_operation *op) override {
1705 this->internal_forward_task_impl(op, this);
1706 }
1707
1708 void handle_operations(prio_operation *op_list) override {
1709 this->handle_operations_impl(op_list, this);
1710 }
1711
1712 bool internal_push(prio_operation *op) override {
1713 prio_push(*(op->elem));
1714 op->status.store(SUCCEEDED, std::memory_order_release);
1715 return true;
1716 }
1717
1718 void internal_pop(prio_operation *op) override {
1719
1720 if ( this->my_reserved == true || this->my_tail == 0 ) {
1721 op->status.store(FAILED, std::memory_order_release);
1722 return;
1723 }
1724
1725 *(op->elem) = prio();
1726 op->status.store(SUCCEEDED, std::memory_order_release);
1727 prio_pop();
1728
1729 }
1730
1731
1732 void internal_reserve(prio_operation *op) override {
1733 if (this->my_reserved == true || this->my_tail == 0) {
1734 op->status.store(FAILED, std::memory_order_release);
1735 return;
1736 }
1737 this->my_reserved = true;
1738 *(op->elem) = prio();
1739 reserved_item = *(op->elem);
1740 op->status.store(SUCCEEDED, std::memory_order_release);
1741 prio_pop();
1742 }
1743
1744 void internal_consume(prio_operation *op) override {
1745 op->status.store(SUCCEEDED, std::memory_order_release);
1746 this->my_reserved = false;
1747 reserved_item = input_type();
1748 }
1749
1750 void internal_release(prio_operation *op) override {
1751 op->status.store(SUCCEEDED, std::memory_order_release);
1752 prio_push(reserved_item);
1753 this->my_reserved = false;
1754 reserved_item = input_type();
1755 }
1756
1757 private:
1758 template<typename> friend class buffer_node;
1759
1760 void order() {
1761 if (mark < this->my_tail) heapify();
1762 __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
1763 }
1764
1765 bool is_item_valid() {
1766 return this->my_tail > 0;
1767 }
1768
1769 void try_put_and_add_task(graph_task*& last_task) {
1770 graph_task * new_task = this->my_successors.try_put_task(this->prio());
1771 if (new_task) {
1772
1773 graph& graph_ref = this->graph_reference();
1774 last_task = combine_tasks(graph_ref, last_task, new_task);
1775 prio_pop();
1776 }
1777 }
1778
1779 private:
1780 Compare compare;
1781 size_type mark;
1782
1783 input_type reserved_item;
1784
1785
1786 bool prio_use_tail() {
1787 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
1788 return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
1789 }
1790
1791
1792 void prio_push(const T &src) {
1793 if ( this->my_tail >= this->my_array_size )
1794 this->grow_my_array( this->my_tail + 1 );
1795 (void) this->place_item(this->my_tail, src);
1796 ++(this->my_tail);
1797 __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
1798 }
1799
1800
1801
1802
1803 void prio_pop() {
1804 if (prio_use_tail()) {
1805
1806
1807 this->destroy_item(this->my_tail-1);
1808 --(this->my_tail);
1809 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
1810 return;
1811 }
1812 this->destroy_item(0);
1813 if(this->my_tail > 1) {
1814
1815 __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), nullptr);
1816 this->move_item(0,this->my_tail - 1);
1817 }
1818 --(this->my_tail);
1819 if(mark > this->my_tail) --mark;
1820 if (this->my_tail > 1)
1821 reheap();
1822 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
1823 }
1824
1825 const T& prio() {
1826 return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
1827 }
1828
1829
1830 void heapify() {
1831 if(this->my_tail == 0) {
1832 mark = 0;
1833 return;
1834 }
1835 if (!mark) mark = 1;
1836 for (; mark<this->my_tail; ++mark) {
1837 size_type cur_pos = mark;
1838 input_type to_place;
1839 this->fetch_item(mark,to_place);
1840 do {
1841 size_type parent = (cur_pos-1)>>1;
1842 if (!compare(this->get_my_item(parent), to_place))
1843 break;
1844 this->move_item(cur_pos, parent);
1845 cur_pos = parent;
1846 } while( cur_pos );
1847 (void) this->place_item(cur_pos, to_place);
1848 }
1849 }
1850
1851
1852 void reheap() {
1853 size_type cur_pos=0, child=1;
1854 while (child < mark) {
1855 size_type target = child;
1856 if (child+1<mark &&
1857 compare(this->get_my_item(child),
1858 this->get_my_item(child+1)))
1859 ++target;
1860
1861 if (compare(this->get_my_item(target),
1862 this->get_my_item(cur_pos)))
1863 break;
1864
1865 this->swap_items(cur_pos, target);
1866 cur_pos = target;
1867 child = (cur_pos<<1)+1;
1868 }
1869 }
1870 };
1871
1872
1873
1874
1875
1876 template< typename T, typename DecrementType=continue_msg >
1877 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
1878 public:
1879 typedef T input_type;
1880 typedef T output_type;
1881 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1882 typedef typename sender<output_type>::successor_type successor_type;
1883
1884
1885 private:
1886 size_t my_threshold;
1887 size_t my_count;
1888 size_t my_tries;
1889 size_t my_future_decrement;
1890 reservable_predecessor_cache< T, spin_mutex > my_predecessors;
1891 spin_mutex my_mutex;
1892 broadcast_cache< T > my_successors;
1893
1894
1895 threshold_regulator< limiter_node<T, DecrementType>, DecrementType > decrement;
1896
1897 graph_task* decrement_counter( long long delta ) {
1898 if ( delta > 0 && size_t(delta) > my_threshold ) {
1899 delta = my_threshold;
1900 }
1901
1902 {
1903 spin_mutex::scoped_lock lock(my_mutex);
1904 if ( delta > 0 && size_t(delta) > my_count ) {
1905 if( my_tries > 0 ) {
1906 my_future_decrement += (size_t(delta) - my_count);
1907 }
1908 my_count = 0;
1909 }
1910 else if ( delta < 0 && size_t(-delta) > my_threshold - my_count ) {
1911 my_count = my_threshold;
1912 }
1913 else {
1914 my_count -= size_t(delta);
1915 }
1916 __TBB_ASSERT(my_count <= my_threshold, "counter values are truncated to be inside the [0, threshold] interval");
1917 }
1918 return forward_task();
1919 }
1920
1921
1922 friend class threshold_regulator< limiter_node<T, DecrementType>, DecrementType >;
1923
1924 friend class forward_task_bypass< limiter_node<T,DecrementType> >;
1925
1926 bool check_conditions() {
1927 return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
1928 }
1929
1930
1931 graph_task* forward_task() {
1932 input_type v;
1933 graph_task* rval = nullptr;
1934 bool reserved = false;
1935
1936 {
1937 spin_mutex::scoped_lock lock(my_mutex);
1938 if ( check_conditions() )
1939 ++my_tries;
1940 else
1941 return nullptr;
1942 }
1943
1944
1945
1946
1947 if ( (my_predecessors.try_reserve(v)) == true ) {
1948 reserved = true;
1949 if ( (rval = my_successors.try_put_task(v)) != nullptr ) {
1950 {
1951 spin_mutex::scoped_lock lock(my_mutex);
1952 ++my_count;
1953 if ( my_future_decrement ) {
1954 if ( my_count > my_future_decrement ) {
1955 my_count -= my_future_decrement;
1956 my_future_decrement = 0;
1957 }
1958 else {
1959 my_future_decrement -= my_count;
1960 my_count = 0;
1961 }
1962 }
1963 --my_tries;
1964 my_predecessors.try_consume();
1965 if ( check_conditions() ) {
1966 if ( is_graph_active(this->my_graph) ) {
1967 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
1968 small_object_allocator allocator{};
1969 graph_task* rtask = allocator.new_object<task_type>( my_graph, allocator, *this );
1970 my_graph.reserve_wait();
1971 spawn_in_graph_arena(graph_reference(), *rtask);
1972 }
1973 }
1974 }
1975 return rval;
1976 }
1977 }
1978
1979
1980
1981 {
1982 spin_mutex::scoped_lock lock(my_mutex);
1983 --my_tries;
1984 if (reserved) my_predecessors.try_release();
1985 if ( check_conditions() ) {
1986 if ( is_graph_active(this->my_graph) ) {
1987 small_object_allocator allocator{};
1988 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
1989 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
1990 my_graph.reserve_wait();
1991 __TBB_ASSERT(!rval, "Have two tasks to handle");
1992 return t;
1993 }
1994 }
1995 return rval;
1996 }
1997 }
1998
1999 void initialize() {
2000 fgt_node(
2001 CODEPTR(), FLOW_LIMITER_NODE, &this->my_graph,
2002 static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
2003 static_cast<sender<output_type> *>(this)
2004 );
2005 }
2006
2007 public:
2008
2009 limiter_node(graph &g, size_t threshold)
2010 : graph_node(g), my_threshold(threshold), my_count(0), my_tries(0), my_future_decrement(0),
2011 my_predecessors(this), my_successors(this), decrement(this)
2012 {
2013 initialize();
2014 }
2015
2016 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2017 template <typename... Args>
2018 limiter_node(const node_set<Args...>& nodes, size_t threshold)
2019 : limiter_node(nodes.graph_reference(), threshold) {
2020 make_edges_in_order(nodes, *this);
2021 }
2022 #endif
2023
2024
2025 limiter_node( const limiter_node& src ) : limiter_node(src.my_graph, src.my_threshold) {}
2026
2027
2028 receiver<DecrementType>& decrementer() { return decrement; }
2029
2030
2031 bool register_successor( successor_type &r ) override {
2032 spin_mutex::scoped_lock lock(my_mutex);
2033 bool was_empty = my_successors.empty();
2034 my_successors.register_successor(r);
2035
2036 if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
2037 if ( is_graph_active(this->my_graph) ) {
2038 small_object_allocator allocator{};
2039 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2040 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
2041 my_graph.reserve_wait();
2042 spawn_in_graph_arena(graph_reference(), *t);
2043 }
2044 }
2045 return true;
2046 }
2047
2048
2049
2050 bool remove_successor( successor_type &r ) override {
2051
2052 tbb::detail::d1::remove_predecessor(r, *this);
2053 my_successors.remove_successor(r);
2054 return true;
2055 }
2056
2057
2058 bool register_predecessor( predecessor_type &src ) override {
2059 spin_mutex::scoped_lock lock(my_mutex);
2060 my_predecessors.add( src );
2061 if ( my_count + my_tries < my_threshold && !my_successors.empty() && is_graph_active(this->my_graph) ) {
2062 small_object_allocator allocator{};
2063 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2064 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
2065 my_graph.reserve_wait();
2066 spawn_in_graph_arena(graph_reference(), *t);
2067 }
2068 return true;
2069 }
2070
2071
2072 bool remove_predecessor( predecessor_type &src ) override {
2073 my_predecessors.remove( src );
2074 return true;
2075 }
2076
2077 protected:
2078
2079 template< typename R, typename B > friend class run_and_put_task;
2080 template<typename X, typename Y> friend class broadcast_cache;
2081 template<typename X, typename Y> friend class round_robin_cache;
2082
2083 graph_task* try_put_task( const T &t ) override {
2084 {
2085 spin_mutex::scoped_lock lock(my_mutex);
2086 if ( my_count + my_tries >= my_threshold )
2087 return nullptr;
2088 else
2089 ++my_tries;
2090 }
2091
2092 graph_task* rtask = my_successors.try_put_task(t);
2093 if ( !rtask ) {
2094 spin_mutex::scoped_lock lock(my_mutex);
2095 --my_tries;
2096 if (check_conditions() && is_graph_active(this->my_graph)) {
2097 small_object_allocator allocator{};
2098 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2099 rtask = allocator.new_object<task_type>(my_graph, allocator, *this);
2100 my_graph.reserve_wait();
2101 }
2102 }
2103 else {
2104 spin_mutex::scoped_lock lock(my_mutex);
2105 ++my_count;
2106 if ( my_future_decrement ) {
2107 if ( my_count > my_future_decrement ) {
2108 my_count -= my_future_decrement;
2109 my_future_decrement = 0;
2110 }
2111 else {
2112 my_future_decrement -= my_count;
2113 my_count = 0;
2114 }
2115 }
2116 --my_tries;
2117 }
2118 return rtask;
2119 }
2120
2121 graph& graph_reference() const override { return my_graph; }
2122
2123 void reset_node( reset_flags f ) override {
2124 my_count = 0;
2125 if ( f & rf_clear_edges ) {
2126 my_predecessors.clear();
2127 my_successors.clear();
2128 }
2129 else {
2130 my_predecessors.reset();
2131 }
2132 decrement.reset_receiver(f);
2133 }
2134 };
2135
2136 #include "detail/_flow_graph_join_impl.h"
2137
2138 template<typename OutputTuple, typename JP=queueing> class join_node;
2139
2140 template<typename OutputTuple>
2141 class join_node<OutputTuple,reserving>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
2142 private:
2143 static const int N = std::tuple_size<OutputTuple>::value;
2144 typedef unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
2145 public:
2146 typedef OutputTuple output_type;
2147 typedef typename unfolded_type::input_ports_type input_ports_type;
2148 __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
2149 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2150 this->input_ports(), static_cast< sender< output_type > *>(this) );
2151 }
2152
2153 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2154 template <typename... Args>
2155 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) {
2156 make_edges_in_order(nodes, *this);
2157 }
2158 #endif
2159
2160 __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2161 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2162 this->input_ports(), static_cast< sender< output_type > *>(this) );
2163 }
2164
2165 };
2166
2167 template<typename OutputTuple>
2168 class join_node<OutputTuple,queueing>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
2169 private:
2170 static const int N = std::tuple_size<OutputTuple>::value;
2171 typedef unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
2172 public:
2173 typedef OutputTuple output_type;
2174 typedef typename unfolded_type::input_ports_type input_ports_type;
2175 __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
2176 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2177 this->input_ports(), static_cast< sender< output_type > *>(this) );
2178 }
2179
2180 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2181 template <typename... Args>
2182 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) {
2183 make_edges_in_order(nodes, *this);
2184 }
2185 #endif
2186
2187 __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2188 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2189 this->input_ports(), static_cast< sender< output_type > *>(this) );
2190 }
2191
2192 };
2193
2194 #if __TBB_CPP20_CONCEPTS_PRESENT
2195
2196
2197 template <typename OutputTuple, typename K,
2198 typename... Functions, std::size_t... Idx>
2199 void join_node_function_objects_helper( std::index_sequence<Idx...> )
2200 requires (std::tuple_size_v<OutputTuple> == sizeof...(Functions)) &&
2201 (... && join_node_function_object<Functions, std::tuple_element_t<Idx, OutputTuple>, K>);
2202
2203 template <typename OutputTuple, typename K, typename... Functions>
2204 concept join_node_functions = requires {
2205 join_node_function_objects_helper<OutputTuple, K, Functions...>(std::make_index_sequence<sizeof...(Functions)>{});
2206 };
2207
2208 #endif
2209
2210
2211
2212 template<typename OutputTuple, typename K, typename KHash>
2213 class join_node<OutputTuple, key_matching<K, KHash> > : public unfolded_join_node<std::tuple_size<OutputTuple>::value,
2214 key_matching_port, OutputTuple, key_matching<K,KHash> > {
2215 private:
2216 static const int N = std::tuple_size<OutputTuple>::value;
2217 typedef unfolded_join_node<N, key_matching_port, OutputTuple, key_matching<K,KHash> > unfolded_type;
2218 public:
2219 typedef OutputTuple output_type;
2220 typedef typename unfolded_type::input_ports_type input_ports_type;
2221
2222 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
2223 join_node(graph &g) : unfolded_type(g) {}
2224 #endif
2225
2226 template<typename __TBB_B0, typename __TBB_B1>
2227 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1>)
2228 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
2229 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2230 this->input_ports(), static_cast< sender< output_type > *>(this) );
2231 }
2232 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
2233 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2>)
2234 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
2235 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2236 this->input_ports(), static_cast< sender< output_type > *>(this) );
2237 }
2238 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
2239 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3>)
2240 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
2241 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2242 this->input_ports(), static_cast< sender< output_type > *>(this) );
2243 }
2244 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
2245 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4>)
2246 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
2247 unfolded_type(g, b0, b1, b2, b3, b4) {
2248 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2249 this->input_ports(), static_cast< sender< output_type > *>(this) );
2250 }
2251 #if __TBB_VARIADIC_MAX >= 6
2252 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2253 typename __TBB_B5>
2254 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5>)
2255 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
2256 unfolded_type(g, b0, b1, b2, b3, b4, b5) {
2257 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2258 this->input_ports(), static_cast< sender< output_type > *>(this) );
2259 }
2260 #endif
2261 #if __TBB_VARIADIC_MAX >= 7
2262 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2263 typename __TBB_B5, typename __TBB_B6>
2264 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6>)
2265 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
2266 unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
2267 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2268 this->input_ports(), static_cast< sender< output_type > *>(this) );
2269 }
2270 #endif
2271 #if __TBB_VARIADIC_MAX >= 8
2272 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2273 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
2274 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7>)
2275 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2276 __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
2277 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2278 this->input_ports(), static_cast< sender< output_type > *>(this) );
2279 }
2280 #endif
2281 #if __TBB_VARIADIC_MAX >= 9
2282 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2283 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
2284 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8>)
2285 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2286 __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
2287 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2288 this->input_ports(), static_cast< sender< output_type > *>(this) );
2289 }
2290 #endif
2291 #if __TBB_VARIADIC_MAX >= 10
2292 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2293 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
2294 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8, __TBB_B9>)
2295 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2296 __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
2297 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2298 this->input_ports(), static_cast< sender< output_type > *>(this) );
2299 }
2300 #endif
2301
2302 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2303 template <
2304 #if (__clang_major__ == 3 && __clang_minor__ == 4)
2305
2306 template<typename...> class node_set,
2307 #endif
2308 typename... Args, typename... Bodies
2309 >
2310 __TBB_requires((sizeof...(Bodies) == 0) || join_node_functions<OutputTuple, K, Bodies...>)
2311 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies)
2312 : join_node(nodes.graph_reference(), bodies...) {
2313 make_edges_in_order(nodes, *this);
2314 }
2315 #endif
2316
2317 __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2318 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2319 this->input_ports(), static_cast< sender< output_type > *>(this) );
2320 }
2321
2322 };
2323
2324
2325 #include "detail/_flow_graph_indexer_impl.h"
2326
2327
2328 template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
2329 typename T4=null_type, typename T5=null_type, typename T6=null_type,
2330 typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
2331
2332
2333 template<typename T0>
2334 class indexer_node<T0> : public unfolded_indexer_node<std::tuple<T0> > {
2335 private:
2336 static const int N = 1;
2337 public:
2338 typedef std::tuple<T0> InputTuple;
2339 typedef tagged_msg<size_t, T0> output_type;
2340 typedef unfolded_indexer_node<InputTuple> unfolded_type;
2341 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2342 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2343 this->input_ports(), static_cast< sender< output_type > *>(this) );
2344 }
2345
2346 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2347 template <typename... Args>
2348 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2349 make_edges_in_order(nodes, *this);
2350 }
2351 #endif
2352
2353
2354 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2355 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2356 this->input_ports(), static_cast< sender< output_type > *>(this) );
2357 }
2358 };
2359
2360 template<typename T0, typename T1>
2361 class indexer_node<T0, T1> : public unfolded_indexer_node<std::tuple<T0, T1> > {
2362 private:
2363 static const int N = 2;
2364 public:
2365 typedef std::tuple<T0, T1> InputTuple;
2366 typedef tagged_msg<size_t, T0, T1> output_type;
2367 typedef unfolded_indexer_node<InputTuple> unfolded_type;
2368 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2369 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2370 this->input_ports(), static_cast< sender< output_type > *>(this) );
2371 }
2372
2373 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2374 template <typename... Args>
2375 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2376 make_edges_in_order(nodes, *this);
2377 }
2378 #endif
2379
2380
2381 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2382 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2383 this->input_ports(), static_cast< sender< output_type > *>(this) );
2384 }
2385
2386 };
2387
2388 template<typename T0, typename T1, typename T2>
2389 class indexer_node<T0, T1, T2> : public unfolded_indexer_node<std::tuple<T0, T1, T2> > {
2390 private:
2391 static const int N = 3;
2392 public:
2393 typedef std::tuple<T0, T1, T2> InputTuple;
2394 typedef tagged_msg<size_t, T0, T1, T2> output_type;
2395 typedef unfolded_indexer_node<InputTuple> unfolded_type;
2396 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2397 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2398 this->input_ports(), static_cast< sender< output_type > *>(this) );
2399 }
2400
2401 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2402 template <typename... Args>
2403 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2404 make_edges_in_order(nodes, *this);
2405 }
2406 #endif
2407
2408
2409 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2410 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2411 this->input_ports(), static_cast< sender< output_type > *>(this) );
2412 }
2413
2414 };
2415
2416 template<typename T0, typename T1, typename T2, typename T3>
2417 class indexer_node<T0, T1, T2, T3> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3> > {
2418 private:
2419 static const int N = 4;
2420 public:
2421 typedef std::tuple<T0, T1, T2, T3> InputTuple;
2422 typedef tagged_msg<size_t, T0, T1, T2, T3> output_type;
2423 typedef unfolded_indexer_node<InputTuple> unfolded_type;
2424 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2425 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2426 this->input_ports(), static_cast< sender< output_type > *>(this) );
2427 }
2428
2429 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2430 template <typename... Args>
2431 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2432 make_edges_in_order(nodes, *this);
2433 }
2434 #endif
2435
2436
2437 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2438 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2439 this->input_ports(), static_cast< sender< output_type > *>(this) );
2440 }
2441
2442 };
2443
2444 template<typename T0, typename T1, typename T2, typename T3, typename T4>
2445 class indexer_node<T0, T1, T2, T3, T4> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4> > {
2446 private:
2447 static const int N = 5;
2448 public:
2449 typedef std::tuple<T0, T1, T2, T3, T4> InputTuple;
2450 typedef tagged_msg<size_t, T0, T1, T2, T3, T4> output_type;
2451 typedef unfolded_indexer_node<InputTuple> unfolded_type;
2452 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2453 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2454 this->input_ports(), static_cast< sender< output_type > *>(this) );
2455 }
2456
2457 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2458 template <typename... Args>
2459 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2460 make_edges_in_order(nodes, *this);
2461 }
2462 #endif
2463
2464
2465 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2466 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2467 this->input_ports(), static_cast< sender< output_type > *>(this) );
2468 }
2469
2470 };
2471
2472 #if __TBB_VARIADIC_MAX >= 6
2473 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
2474 class indexer_node<T0, T1, T2, T3, T4, T5> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5> > {
2475 private:
2476 static const int N = 6;
2477 public:
2478 typedef std::tuple<T0, T1, T2, T3, T4, T5> InputTuple;
2479 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5> output_type;
2480 typedef unfolded_indexer_node<InputTuple> unfolded_type;
2481 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2482 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2483 this->input_ports(), static_cast< sender< output_type > *>(this) );
2484 }
2485
2486 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2487 template <typename... Args>
2488 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2489 make_edges_in_order(nodes, *this);
2490 }
2491 #endif
2492
2493
2494 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2495 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2496 this->input_ports(), static_cast< sender< output_type > *>(this) );
2497 }
2498
2499 };
2500 #endif
2501
2502 #if __TBB_VARIADIC_MAX >= 7
2503 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2504 typename T6>
2505 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6> > {
2506 private:
2507 static const int N = 7;
2508 public:
2509 typedef std::tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
2510 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6> output_type;
2511 typedef unfolded_indexer_node<InputTuple> unfolded_type;
2512 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2513 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2514 this->input_ports(), static_cast< sender< output_type > *>(this) );
2515 }
2516
2517 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2518 template <typename... Args>
2519 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2520 make_edges_in_order(nodes, *this);
2521 }
2522 #endif
2523
2524
2525 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2526 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2527 this->input_ports(), static_cast< sender< output_type > *>(this) );
2528 }
2529
2530 };
2531 #endif
2532
2533 #if __TBB_VARIADIC_MAX >= 8
2534 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2535 typename T6, typename T7>
2536 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
2537 private:
2538 static const int N = 8;
2539 public:
2540 typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
2541 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7> output_type;
2542 typedef unfolded_indexer_node<InputTuple> unfolded_type;
2543 indexer_node(graph& g) : unfolded_type(g) {
2544 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2545 this->input_ports(), static_cast< sender< output_type > *>(this) );
2546 }
2547
2548 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2549 template <typename... Args>
2550 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2551 make_edges_in_order(nodes, *this);
2552 }
2553 #endif
2554
2555
2556 indexer_node( const indexer_node& other ) : unfolded_type(other) {
2557 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2558 this->input_ports(), static_cast< sender< output_type > *>(this) );
2559 }
2560
2561 };
2562 #endif
2563
2564 #if __TBB_VARIADIC_MAX >= 9
2565 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2566 typename T6, typename T7, typename T8>
2567 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
2568 private:
2569 static const int N = 9;
2570 public:
2571 typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
2572 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8> output_type;
2573 typedef unfolded_indexer_node<InputTuple> unfolded_type;
2574 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2575 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2576 this->input_ports(), static_cast< sender< output_type > *>(this) );
2577 }
2578
2579 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2580 template <typename... Args>
2581 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2582 make_edges_in_order(nodes, *this);
2583 }
2584 #endif
2585
2586
2587 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2588 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2589 this->input_ports(), static_cast< sender< output_type > *>(this) );
2590 }
2591
2592 };
2593 #endif
2594
2595 #if __TBB_VARIADIC_MAX >= 10
2596 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2597 typename T6, typename T7, typename T8, typename T9>
2598 class indexer_node : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
2599 private:
2600 static const int N = 10;
2601 public:
2602 typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
2603 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
2604 typedef unfolded_indexer_node<InputTuple> unfolded_type;
2605 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2606 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2607 this->input_ports(), static_cast< sender< output_type > *>(this) );
2608 }
2609
2610 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2611 template <typename... Args>
2612 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2613 make_edges_in_order(nodes, *this);
2614 }
2615 #endif
2616
2617
2618 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2619 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2620 this->input_ports(), static_cast< sender< output_type > *>(this) );
2621 }
2622
2623 };
2624 #endif
2625
2626 template< typename T >
2627 inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
2628 register_successor(p, s);
2629 fgt_make_edge( &p, &s );
2630 }
2631
2632
2633 template< typename T >
2634 inline void make_edge( sender<T> &p, receiver<T> &s ) {
2635 internal_make_edge( p, s );
2636 }
2637
2638
2639 template< typename T, typename V,
2640 typename = typename T::output_ports_type, typename = typename V::input_ports_type >
2641 inline void make_edge( T& output, V& input) {
2642 make_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports()));
2643 }
2644
2645
2646 template< typename T, typename R,
2647 typename = typename T::output_ports_type >
2648 inline void make_edge( T& output, receiver<R>& input) {
2649 make_edge(std::get<0>(output.output_ports()), input);
2650 }
2651
2652
2653 template< typename S, typename V,
2654 typename = typename V::input_ports_type >
2655 inline void make_edge( sender<S>& output, V& input) {
2656 make_edge(output, std::get<0>(input.input_ports()));
2657 }
2658
2659 template< typename T >
2660 inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
2661 remove_successor( p, s );
2662 fgt_remove_edge( &p, &s );
2663 }
2664
2665
2666 template< typename T >
2667 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
2668 internal_remove_edge( p, s );
2669 }
2670
2671
2672 template< typename T, typename V,
2673 typename = typename T::output_ports_type, typename = typename V::input_ports_type >
2674 inline void remove_edge( T& output, V& input) {
2675 remove_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports()));
2676 }
2677
2678
2679 template< typename T, typename R,
2680 typename = typename T::output_ports_type >
2681 inline void remove_edge( T& output, receiver<R>& input) {
2682 remove_edge(std::get<0>(output.output_ports()), input);
2683 }
2684
2685 template< typename S, typename V,
2686 typename = typename V::input_ports_type >
2687 inline void remove_edge( sender<S>& output, V& input) {
2688 remove_edge(output, std::get<0>(input.input_ports()));
2689 }
2690
2691
2692 template< typename Body, typename Node >
2693 Body copy_body( Node &n ) {
2694 return n.template copy_function_object<Body>();
2695 }
2696
2697
2698 template< typename InputTuple, typename OutputTuple > class composite_node;
2699
2700 template< typename... InputTypes, typename... OutputTypes>
2701 class composite_node <std::tuple<InputTypes...>, std::tuple<OutputTypes...> > : public graph_node {
2702
2703 public:
2704 typedef std::tuple< receiver<InputTypes>&... > input_ports_type;
2705 typedef std::tuple< sender<OutputTypes>&... > output_ports_type;
2706
2707 private:
2708 std::unique_ptr<input_ports_type> my_input_ports;
2709 std::unique_ptr<output_ports_type> my_output_ports;
2710
2711 static const size_t NUM_INPUTS = sizeof...(InputTypes);
2712 static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
2713
2714 protected:
2715 void reset_node(reset_flags) override {}
2716
2717 public:
2718 composite_node( graph &g ) : graph_node(g) {
2719 fgt_multiinput_multioutput_node( CODEPTR(), FLOW_COMPOSITE_NODE, this, &this->my_graph );
2720 }
2721
2722 template<typename T1, typename T2>
2723 void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
2724 static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
2725 static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
2726
2727 fgt_internal_input_alias_helper<T1, NUM_INPUTS>::alias_port( this, input_ports_tuple);
2728 fgt_internal_output_alias_helper<T2, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
2729
2730 my_input_ports.reset( new input_ports_type(std::forward<T1>(input_ports_tuple)) );
2731 my_output_ports.reset( new output_ports_type(std::forward<T2>(output_ports_tuple)) );
2732 }
2733
2734 template< typename... NodeTypes >
2735 void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
2736
2737 template< typename... NodeTypes >
2738 void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
2739
2740
2741 input_ports_type& input_ports() {
2742 __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
2743 return *my_input_ports;
2744 }
2745
2746 output_ports_type& output_ports() {
2747 __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
2748 return *my_output_ports;
2749 }
2750 };
2751
2752
2753 template< typename... InputTypes>
2754 class composite_node <std::tuple<InputTypes...>, std::tuple<> > : public graph_node {
2755 public:
2756 typedef std::tuple< receiver<InputTypes>&... > input_ports_type;
2757
2758 private:
2759 std::unique_ptr<input_ports_type> my_input_ports;
2760 static const size_t NUM_INPUTS = sizeof...(InputTypes);
2761
2762 protected:
2763 void reset_node(reset_flags) override {}
2764
2765 public:
2766 composite_node( graph &g ) : graph_node(g) {
2767 fgt_composite( CODEPTR(), this, &g );
2768 }
2769
2770 template<typename T>
2771 void set_external_ports(T&& input_ports_tuple) {
2772 static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
2773
2774 fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, input_ports_tuple);
2775
2776 my_input_ports.reset( new input_ports_type(std::forward<T>(input_ports_tuple)) );
2777 }
2778
2779 template< typename... NodeTypes >
2780 void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
2781
2782 template< typename... NodeTypes >
2783 void add_nodes( const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
2784
2785
2786 input_ports_type& input_ports() {
2787 __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
2788 return *my_input_ports;
2789 }
2790
2791 };
2792
2793
2794 template<typename... OutputTypes>
2795 class composite_node <std::tuple<>, std::tuple<OutputTypes...> > : public graph_node {
2796 public:
2797 typedef std::tuple< sender<OutputTypes>&... > output_ports_type;
2798
2799 private:
2800 std::unique_ptr<output_ports_type> my_output_ports;
2801 static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
2802
2803 protected:
2804 void reset_node(reset_flags) override {}
2805
2806 public:
2807 __TBB_NOINLINE_SYM composite_node( graph &g ) : graph_node(g) {
2808 fgt_composite( CODEPTR(), this, &g );
2809 }
2810
2811 template<typename T>
2812 void set_external_ports(T&& output_ports_tuple) {
2813 static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
2814
2815 fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
2816
2817 my_output_ports.reset( new output_ports_type(std::forward<T>(output_ports_tuple)) );
2818 }
2819
2820 template<typename... NodeTypes >
2821 void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
2822
2823 template<typename... NodeTypes >
2824 void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
2825
2826
2827 output_ports_type& output_ports() {
2828 __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
2829 return *my_output_ports;
2830 }
2831
2832 };
2833
2834 template<typename Gateway>
2835 class async_body_base: no_assign {
2836 public:
2837 typedef Gateway gateway_type;
2838
2839 async_body_base(gateway_type *gateway): my_gateway(gateway) { }
2840 void set_gateway(gateway_type *gateway) {
2841 my_gateway = gateway;
2842 }
2843
2844 protected:
2845 gateway_type *my_gateway;
2846 };
2847
2848 template<typename Input, typename Ports, typename Gateway, typename Body>
2849 class async_body: public async_body_base<Gateway> {
2850 private:
2851 Body my_body;
2852
2853 public:
2854 typedef async_body_base<Gateway> base_type;
2855 typedef Gateway gateway_type;
2856
2857 async_body(const Body &body, gateway_type *gateway)
2858 : base_type(gateway), my_body(body) { }
2859
2860 void operator()( const Input &v, Ports & ) noexcept(noexcept(tbb::detail::invoke(my_body, v, std::declval<gateway_type&>()))) {
2861 tbb::detail::invoke(my_body, v, *this->my_gateway);
2862 }
2863
2864 Body get_body() { return my_body; }
2865 };
2866
2867
2868 template < typename Input, typename Output,
2869 typename Policy = queueing_lightweight >
2870 __TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input>)
2871 class async_node
2872 : public multifunction_node< Input, std::tuple< Output >, Policy >, public sender< Output >
2873 {
2874 typedef multifunction_node< Input, std::tuple< Output >, Policy > base_type;
2875 typedef multifunction_input<
2876 Input, typename base_type::output_ports_type, Policy, cache_aligned_allocator<Input>> mfn_input_type;
2877
2878 public:
2879 typedef Input input_type;
2880 typedef Output output_type;
2881 typedef receiver<input_type> receiver_type;
2882 typedef receiver<output_type> successor_type;
2883 typedef sender<input_type> predecessor_type;
2884 typedef receiver_gateway<output_type> gateway_type;
2885 typedef async_body_base<gateway_type> async_body_base_type;
2886 typedef typename base_type::output_ports_type output_ports_type;
2887
2888 private:
2889 class receiver_gateway_impl: public receiver_gateway<Output> {
2890 public:
2891 receiver_gateway_impl(async_node* node): my_node(node) {}
2892 void reserve_wait() override {
2893 fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
2894 my_node->my_graph.reserve_wait();
2895 }
2896
2897 void release_wait() override {
2898 async_node* n = my_node;
2899 graph* g = &n->my_graph;
2900 g->release_wait();
2901 fgt_async_commit(static_cast<typename async_node::receiver_type *>(n), g);
2902 }
2903
2904
2905 bool try_put(const Output &i) override {
2906 return my_node->try_put_impl(i);
2907 }
2908
2909 private:
2910 async_node* my_node;
2911 } my_gateway;
2912
2913
2914 async_node* self() { return this; }
2915
2916
2917 bool try_put_impl(const Output &i) {
2918 multifunction_output<Output> &port_0 = output_port<0>(*this);
2919 broadcast_cache<output_type>& port_successors = port_0.successors();
2920 fgt_async_try_put_begin(this, &port_0);
2921
2922 graph_task_list tasks;
2923 bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
2924 __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
2925 "Return status is inconsistent with the method operation." );
2926
2927 while( !tasks.empty() ) {
2928 enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
2929 }
2930 fgt_async_try_put_end(this, &port_0);
2931 return is_at_least_one_put_successful;
2932 }
2933
2934 public:
2935 template<typename Body>
2936 __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2937 __TBB_NOINLINE_SYM async_node(
2938 graph &g, size_t concurrency,
2939 Body body, Policy = Policy(), node_priority_t a_priority = no_priority
2940 ) : base_type(
2941 g, concurrency,
2942 async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
2943 (body, &my_gateway), a_priority ), my_gateway(self()) {
2944 fgt_multioutput_node_with_body<1>(
2945 CODEPTR(), FLOW_ASYNC_NODE,
2946 &this->my_graph, static_cast<receiver<input_type> *>(this),
2947 this->output_ports(), this->my_body
2948 );
2949 }
2950
2951 template <typename Body>
2952 __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2953 __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority)
2954 : async_node(g, concurrency, body, Policy(), a_priority) {}
2955
2956 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2957 template <typename Body, typename... Args>
2958 __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2959 __TBB_NOINLINE_SYM async_node(
2960 const node_set<Args...>& nodes, size_t concurrency, Body body,
2961 Policy = Policy(), node_priority_t a_priority = no_priority )
2962 : async_node(nodes.graph_reference(), concurrency, body, a_priority) {
2963 make_edges_in_order(nodes, *this);
2964 }
2965
2966 template <typename Body, typename... Args>
2967 __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2968 __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority)
2969 : async_node(nodes, concurrency, body, Policy(), a_priority) {}
2970 #endif
2971
2972 __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
2973 static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
2974 static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
2975
2976 fgt_multioutput_node_with_body<1>( CODEPTR(), FLOW_ASYNC_NODE,
2977 &this->my_graph, static_cast<receiver<input_type> *>(this),
2978 this->output_ports(), this->my_body );
2979 }
2980
2981 gateway_type& gateway() {
2982 return my_gateway;
2983 }
2984
2985
2986
2987
2988 bool register_successor(successor_type&) override {
2989 __TBB_ASSERT(false, "Successors must be registered only via ports");
2990 return false;
2991 }
2992
2993
2994 bool remove_successor(successor_type&) override {
2995 __TBB_ASSERT(false, "Successors must be removed only via ports");
2996 return false;
2997 }
2998
2999 template<typename Body>
3000 Body copy_function_object() {
3001 typedef multifunction_body<input_type, typename base_type::output_ports_type> mfn_body_type;
3002 typedef async_body<Input, typename base_type::output_ports_type, gateway_type, Body> async_body_type;
3003 mfn_body_type &body_ref = *this->my_body;
3004 async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
3005 return ab.get_body();
3006 }
3007
3008 protected:
3009
3010 void reset_node( reset_flags f) override {
3011 base_type::reset_node(f);
3012 }
3013 };
3014
3015 #include "detail/_flow_graph_node_set_impl.h"
3016
3017 template< typename T >
3018 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
3019 public:
3020 typedef T input_type;
3021 typedef T output_type;
3022 typedef typename receiver<input_type>::predecessor_type predecessor_type;
3023 typedef typename sender<output_type>::successor_type successor_type;
3024
3025 __TBB_NOINLINE_SYM explicit overwrite_node(graph &g)
3026 : graph_node(g), my_successors(this), my_buffer_is_valid(false)
3027 {
3028 fgt_node( CODEPTR(), FLOW_OVERWRITE_NODE, &this->my_graph,
3029 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
3030 }
3031
3032 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3033 template <typename... Args>
3034 overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) {
3035 make_edges_in_order(nodes, *this);
3036 }
3037 #endif
3038
3039
3040 __TBB_NOINLINE_SYM overwrite_node( const overwrite_node& src ) : overwrite_node(src.my_graph) {}
3041
3042 ~overwrite_node() {}
3043
3044 bool register_successor( successor_type &s ) override {
3045 spin_mutex::scoped_lock l( my_mutex );
3046 if (my_buffer_is_valid && is_graph_active( my_graph )) {
3047
3048 bool ret = s.try_put( my_buffer );
3049 if ( ret ) {
3050
3051 my_successors.register_successor( s );
3052 } else {
3053
3054
3055
3056
3057 small_object_allocator allocator{};
3058 typedef register_predecessor_task task_type;
3059 graph_task* t = allocator.new_object<task_type>(graph_reference(), allocator, *this, s);
3060 graph_reference().reserve_wait();
3061 spawn_in_graph_arena( my_graph, *t );
3062 }
3063 } else {
3064
3065 my_successors.register_successor( s );
3066 }
3067 return true;
3068 }
3069
3070 bool remove_successor( successor_type &s ) override {
3071 spin_mutex::scoped_lock l( my_mutex );
3072 my_successors.remove_successor(s);
3073 return true;
3074 }
3075
3076 bool try_get( input_type &v ) override {
3077 spin_mutex::scoped_lock l( my_mutex );
3078 if ( my_buffer_is_valid ) {
3079 v = my_buffer;
3080 return true;
3081 }
3082 return false;
3083 }
3084
3085
3086 bool try_reserve( T &v ) override {
3087 return try_get(v);
3088 }
3089
3090
3091 bool try_release() override { return true; }
3092
3093
3094 bool try_consume() override { return true; }
3095
3096 bool is_valid() {
3097 spin_mutex::scoped_lock l( my_mutex );
3098 return my_buffer_is_valid;
3099 }
3100
3101 void clear() {
3102 spin_mutex::scoped_lock l( my_mutex );
3103 my_buffer_is_valid = false;
3104 }
3105
3106 protected:
3107
3108 template< typename R, typename B > friend class run_and_put_task;
3109 template<typename X, typename Y> friend class broadcast_cache;
3110 template<typename X, typename Y> friend class round_robin_cache;
3111 graph_task* try_put_task( const input_type &v ) override {
3112 spin_mutex::scoped_lock l( my_mutex );
3113 return try_put_task_impl(v);
3114 }
3115
3116 graph_task * try_put_task_impl(const input_type &v) {
3117 my_buffer = v;
3118 my_buffer_is_valid = true;
3119 graph_task* rtask = my_successors.try_put_task(v);
3120 if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
3121 return rtask;
3122 }
3123
3124 graph& graph_reference() const override {
3125 return my_graph;
3126 }
3127
3128
3129 struct register_predecessor_task : public graph_task {
3130 register_predecessor_task(
3131 graph& g, small_object_allocator& allocator, predecessor_type& owner, successor_type& succ)
3132 : graph_task(g, allocator), o(owner), s(succ) {};
3133
3134 task* execute(execution_data& ed) override {
3135
3136 using tbb::detail::d1::register_predecessor;
3137 using tbb::detail::d1::register_successor;
3138 if ( !register_predecessor(s, o) ) {
3139 register_successor(o, s);
3140 }
3141 finalize<register_predecessor_task>(ed);
3142 return nullptr;
3143 }
3144
3145 task* cancel(execution_data& ed) override {
3146 finalize<register_predecessor_task>(ed);
3147 return nullptr;
3148 }
3149
3150 predecessor_type& o;
3151 successor_type& s;
3152 };
3153
3154 spin_mutex my_mutex;
3155 broadcast_cache< input_type, null_rw_mutex > my_successors;
3156 input_type my_buffer;
3157 bool my_buffer_is_valid;
3158
3159 void reset_node( reset_flags f) override {
3160 my_buffer_is_valid = false;
3161 if (f&rf_clear_edges) {
3162 my_successors.clear();
3163 }
3164 }
3165 };
3166
3167 template< typename T >
3168 class write_once_node : public overwrite_node<T> {
3169 public:
3170 typedef T input_type;
3171 typedef T output_type;
3172 typedef overwrite_node<T> base_type;
3173 typedef typename receiver<input_type>::predecessor_type predecessor_type;
3174 typedef typename sender<output_type>::successor_type successor_type;
3175
3176
3177 __TBB_NOINLINE_SYM explicit write_once_node(graph& g) : base_type(g) {
3178 fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3179 static_cast<receiver<input_type> *>(this),
3180 static_cast<sender<output_type> *>(this) );
3181 }
3182
3183 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3184 template <typename... Args>
3185 write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) {
3186 make_edges_in_order(nodes, *this);
3187 }
3188 #endif
3189
3190
3191 __TBB_NOINLINE_SYM write_once_node( const write_once_node& src ) : base_type(src) {
3192 fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3193 static_cast<receiver<input_type> *>(this),
3194 static_cast<sender<output_type> *>(this) );
3195 }
3196
3197 protected:
3198 template< typename R, typename B > friend class run_and_put_task;
3199 template<typename X, typename Y> friend class broadcast_cache;
3200 template<typename X, typename Y> friend class round_robin_cache;
3201 graph_task *try_put_task( const T &v ) override {
3202 spin_mutex::scoped_lock l( this->my_mutex );
3203 return this->my_buffer_is_valid ? nullptr : this->try_put_task_impl(v);
3204 }
3205 };
3206
3207 inline void set_name(const graph& g, const char *name) {
3208 fgt_graph_desc(&g, name);
3209 }
3210
3211 template <typename Output>
3212 inline void set_name(const input_node<Output>& node, const char *name) {
3213 fgt_node_desc(&node, name);
3214 }
3215
3216 template <typename Input, typename Output, typename Policy>
3217 inline void set_name(const function_node<Input, Output, Policy>& node, const char *name) {
3218 fgt_node_desc(&node, name);
3219 }
3220
3221 template <typename Output, typename Policy>
3222 inline void set_name(const continue_node<Output,Policy>& node, const char *name) {
3223 fgt_node_desc(&node, name);
3224 }
3225
3226 template <typename T>
3227 inline void set_name(const broadcast_node<T>& node, const char *name) {
3228 fgt_node_desc(&node, name);
3229 }
3230
3231 template <typename T>
3232 inline void set_name(const buffer_node<T>& node, const char *name) {
3233 fgt_node_desc(&node, name);
3234 }
3235
3236 template <typename T>
3237 inline void set_name(const queue_node<T>& node, const char *name) {
3238 fgt_node_desc(&node, name);
3239 }
3240
3241 template <typename T>
3242 inline void set_name(const sequencer_node<T>& node, const char *name) {
3243 fgt_node_desc(&node, name);
3244 }
3245
3246 template <typename T, typename Compare>
3247 inline void set_name(const priority_queue_node<T, Compare>& node, const char *name) {
3248 fgt_node_desc(&node, name);
3249 }
3250
3251 template <typename T, typename DecrementType>
3252 inline void set_name(const limiter_node<T, DecrementType>& node, const char *name) {
3253 fgt_node_desc(&node, name);
3254 }
3255
3256 template <typename OutputTuple, typename JP>
3257 inline void set_name(const join_node<OutputTuple, JP>& node, const char *name) {
3258 fgt_node_desc(&node, name);
3259 }
3260
3261 template <typename... Types>
3262 inline void set_name(const indexer_node<Types...>& node, const char *name) {
3263 fgt_node_desc(&node, name);
3264 }
3265
3266 template <typename T>
3267 inline void set_name(const overwrite_node<T>& node, const char *name) {
3268 fgt_node_desc(&node, name);
3269 }
3270
3271 template <typename T>
3272 inline void set_name(const write_once_node<T>& node, const char *name) {
3273 fgt_node_desc(&node, name);
3274 }
3275
3276 template<typename Input, typename Output, typename Policy>
3277 inline void set_name(const multifunction_node<Input, Output, Policy>& node, const char *name) {
3278 fgt_multioutput_node_desc(&node, name);
3279 }
3280
3281 template<typename TupleType>
3282 inline void set_name(const split_node<TupleType>& node, const char *name) {
3283 fgt_multioutput_node_desc(&node, name);
3284 }
3285
3286 template< typename InputTuple, typename OutputTuple >
3287 inline void set_name(const composite_node<InputTuple, OutputTuple>& node, const char *name) {
3288 fgt_multiinput_multioutput_node_desc(&node, name);
3289 }
3290
3291 template<typename Input, typename Output, typename Policy>
3292 inline void set_name(const async_node<Input, Output, Policy>& node, const char *name)
3293 {
3294 fgt_multioutput_node_desc(&node, name);
3295 }
3296 }
3297 }
3298 }
3299
3300
3301
3302 #include "detail/_flow_graph_nodes_deduction.h"
3303
3304 namespace tbb {
3305 namespace flow {
3306 inline namespace v1 {
3307 using detail::d1::receiver;
3308 using detail::d1::sender;
3309
3310 using detail::d1::serial;
3311 using detail::d1::unlimited;
3312
3313 using detail::d1::reset_flags;
3314 using detail::d1::rf_reset_protocol;
3315 using detail::d1::rf_reset_bodies;
3316 using detail::d1::rf_clear_edges;
3317
3318 using detail::d1::graph;
3319 using detail::d1::graph_node;
3320 using detail::d1::continue_msg;
3321
3322 using detail::d1::input_node;
3323 using detail::d1::function_node;
3324 using detail::d1::multifunction_node;
3325 using detail::d1::split_node;
3326 using detail::d1::output_port;
3327 using detail::d1::indexer_node;
3328 using detail::d1::tagged_msg;
3329 using detail::d1::cast_to;
3330 using detail::d1::is_a;
3331 using detail::d1::continue_node;
3332 using detail::d1::overwrite_node;
3333 using detail::d1::write_once_node;
3334 using detail::d1::broadcast_node;
3335 using detail::d1::buffer_node;
3336 using detail::d1::queue_node;
3337 using detail::d1::sequencer_node;
3338 using detail::d1::priority_queue_node;
3339 using detail::d1::limiter_node;
3340 using namespace detail::d1::graph_policy_namespace;
3341 using detail::d1::join_node;
3342 using detail::d1::input_port;
3343 using detail::d1::copy_body;
3344 using detail::d1::make_edge;
3345 using detail::d1::remove_edge;
3346 using detail::d1::tag_value;
3347 using detail::d1::composite_node;
3348 using detail::d1::async_node;
3349 using detail::d1::node_priority_t;
3350 using detail::d1::no_priority;
3351
3352 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3353 using detail::d1::follows;
3354 using detail::d1::precedes;
3355 using detail::d1::make_node_set;
3356 using detail::d1::make_edges;
3357 #endif
3358
3359 }
3360 }
3361
3362 using detail::d1::flow_control;
3363
3364 namespace profiling {
3365 using detail::d1::set_name;
3366 }
3367
3368 }
3369
3370
3371 #if TBB_USE_PROFILING_TOOLS && ( __unix__ || __APPLE__ )
3372
3373 #undef __TBB_NOINLINE_SYM
3374 #endif
3375
3376 #endif