Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /include/oneapi/tbb/flow_graph.h was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 /*
0002     Copyright (c) 2005-2023 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB_flow_graph_H
0018 #define __TBB_flow_graph_H
0019 
0020 #include <atomic>
0021 #include <memory>
0022 #include <type_traits>
0023 
0024 #include "detail/_config.h"
0025 #include "detail/_namespace_injection.h"
0026 #include "spin_mutex.h"
0027 #include "null_mutex.h"
0028 #include "spin_rw_mutex.h"
0029 #include "null_rw_mutex.h"
0030 #include "detail/_pipeline_filters.h"
0031 #include "detail/_task.h"
0032 #include "detail/_small_object_pool.h"
0033 #include "cache_aligned_allocator.h"
0034 #include "detail/_exception.h"
0035 #include "detail/_template_helpers.h"
0036 #include "detail/_aggregator.h"
0037 #include "detail/_allocator_traits.h"
0038 #include "detail/_utils.h"
0039 #include "profiling.h"
0040 #include "task_arena.h"
0041 
0042 #if TBB_USE_PROFILING_TOOLS && ( __unix__ || __APPLE__ )
0043    #if __INTEL_COMPILER
0044        // Disabled warning "routine is both inline and noinline"
0045        #pragma warning (push)
0046        #pragma warning( disable: 2196 )
0047    #endif
0048    #define __TBB_NOINLINE_SYM __attribute__((noinline))
0049 #else
0050    #define __TBB_NOINLINE_SYM
0051 #endif
0052 
0053 #include <tuple>
0054 #include <list>
0055 #include <queue>
0056 #if __TBB_CPP20_CONCEPTS_PRESENT
0057 #include <concepts>
0058 #endif
0059 
0060 /** @file
0061   \brief The graph related classes and functions
0062 
0063   There are some applications that best express dependencies as messages
0064   passed between nodes in a graph.  These messages may contain data or
0065   simply act as signals that a predecessors has completed. The graph
0066   class and its associated node classes can be used to express such
0067   applications.
0068 */
0069 
0070 namespace tbb {
0071 namespace detail {
0072 
0073 namespace d1 {
0074 
0075 //! An enumeration the provides the two most common concurrency levels: unlimited and serial
0076 enum concurrency { unlimited = 0, serial = 1 };
0077 
0078 //! A generic null type
0079 struct null_type {};
0080 
0081 //! An empty class used for messages that mean "I'm done"
0082 class continue_msg {};
0083 
0084 } // namespace d1
0085 
0086 #if __TBB_CPP20_CONCEPTS_PRESENT
0087 namespace d0 {
0088 
0089 template <typename ReturnType, typename OutputType>
0090 concept node_body_return_type = std::same_as<OutputType, tbb::detail::d1::continue_msg> ||
0091                                 std::convertible_to<OutputType, ReturnType>;
0092 
0093 // TODO: consider using std::invocable here
0094 template <typename Body, typename Output>
0095 concept continue_node_body = std::copy_constructible<Body> &&
0096                              requires( Body& body, const tbb::detail::d1::continue_msg& v ) {
0097                                  { body(v) } -> node_body_return_type<Output>;
0098                              };
0099 
0100 template <typename Body, typename Input, typename Output>
0101 concept function_node_body = std::copy_constructible<Body> &&
0102                              std::invocable<Body&, const Input&> &&
0103                              node_body_return_type<std::invoke_result_t<Body&, const Input&>, Output>;
0104 
0105 template <typename FunctionObject, typename Input, typename Key>
0106 concept join_node_function_object = std::copy_constructible<FunctionObject> &&
0107                                     std::invocable<FunctionObject&, const Input&> &&
0108                                     std::convertible_to<std::invoke_result_t<FunctionObject&, const Input&>, Key>;
0109 
0110 template <typename Body, typename Output>
0111 concept input_node_body = std::copy_constructible<Body> &&
0112                           requires( Body& body, tbb::detail::d1::flow_control& fc ) {
0113                               { body(fc) } -> adaptive_same_as<Output>;
0114                           };
0115 
0116 template <typename Body, typename Input, typename OutputPortsType>
0117 concept multifunction_node_body = std::copy_constructible<Body> &&
0118                                   std::invocable<Body&, const Input&, OutputPortsType&>;
0119 
0120 template <typename Sequencer, typename Value>
0121 concept sequencer = std::copy_constructible<Sequencer> &&
0122                     std::invocable<Sequencer&, const Value&> &&
0123                     std::convertible_to<std::invoke_result_t<Sequencer&, const Value&>, std::size_t>;
0124 
0125 template <typename Body, typename Input, typename GatewayType>
0126 concept async_node_body = std::copy_constructible<Body> &&
0127                           std::invocable<Body&, const Input&, GatewayType&>;
0128 
0129 } // namespace d0
0130 #endif // __TBB_CPP20_CONCEPTS_PRESENT
0131 
0132 namespace d1 {
0133 
0134 //! Forward declaration section
0135 template< typename T > class sender;
0136 template< typename T > class receiver;
0137 class continue_receiver;
0138 
0139 template< typename T, typename U > class limiter_node;  // needed for resetting decrementer
0140 
0141 template<typename T, typename M> class successor_cache;
0142 template<typename T, typename M> class broadcast_cache;
0143 template<typename T, typename M> class round_robin_cache;
0144 template<typename T, typename M> class predecessor_cache;
0145 template<typename T, typename M> class reservable_predecessor_cache;
0146 
0147 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0148 namespace order {
0149 struct following;
0150 struct preceding;
0151 }
0152 template<typename Order, typename... Args> struct node_set;
0153 #endif
0154 
0155 
0156 } // namespace d1
0157 } // namespace detail
0158 } // namespace tbb
0159 
0160 //! The graph class
0161 #include "detail/_flow_graph_impl.h"
0162 
0163 namespace tbb {
0164 namespace detail {
0165 namespace d1 {
0166 
0167 static inline std::pair<graph_task*, graph_task*> order_tasks(graph_task* first, graph_task* second) {
0168     if (second->priority > first->priority)
0169         return std::make_pair(second, first);
0170     return std::make_pair(first, second);
0171 }
0172 
0173 // submit task if necessary. Returns the non-enqueued task if there is one.
0174 static inline graph_task* combine_tasks(graph& g, graph_task* left, graph_task* right) {
0175     // if no RHS task, don't change left.
0176     if (right == nullptr) return left;
0177     // right != nullptr
0178     if (left == nullptr) return right;
0179     if (left == SUCCESSFULLY_ENQUEUED) return right;
0180     // left contains a task
0181     if (right != SUCCESSFULLY_ENQUEUED) {
0182         // both are valid tasks
0183         auto tasks_pair = order_tasks(left, right);
0184         spawn_in_graph_arena(g, *tasks_pair.first);
0185         return tasks_pair.second;
0186     }
0187     return left;
0188 }
0189 
0190 //! Pure virtual template class that defines a sender of messages of type T
0191 template< typename T >
0192 class sender {
0193 public:
0194     virtual ~sender() {}
0195 
0196     //! Request an item from the sender
0197     virtual bool try_get( T & ) { return false; }
0198 
0199     //! Reserves an item in the sender
0200     virtual bool try_reserve( T & ) { return false; }
0201 
0202     //! Releases the reserved item
0203     virtual bool try_release( ) { return false; }
0204 
0205     //! Consumes the reserved item
0206     virtual bool try_consume( ) { return false; }
0207 
0208 protected:
0209     //! The output type of this sender
0210     typedef T output_type;
0211 
0212     //! The successor type for this node
0213     typedef receiver<T> successor_type;
0214 
0215     //! Add a new successor to this node
0216     virtual bool register_successor( successor_type &r ) = 0;
0217 
0218     //! Removes a successor from this node
0219     virtual bool remove_successor( successor_type &r ) = 0;
0220 
0221     template<typename C>
0222     friend bool register_successor(sender<C>& s, receiver<C>& r);
0223 
0224     template<typename C>
0225     friend bool remove_successor  (sender<C>& s, receiver<C>& r);
0226 };  // class sender<T>
0227 
0228 template<typename C>
0229 bool register_successor(sender<C>& s, receiver<C>& r) {
0230     return s.register_successor(r);
0231 }
0232 
0233 template<typename C>
0234 bool remove_successor(sender<C>& s, receiver<C>& r) {
0235     return s.remove_successor(r);
0236 }
0237 
0238 //! Pure virtual template class that defines a receiver of messages of type T
0239 template< typename T >
0240 class receiver {
0241 public:
0242     //! Destructor
0243     virtual ~receiver() {}
0244 
0245     //! Put an item to the receiver
0246     bool try_put( const T& t ) {
0247         graph_task *res = try_put_task(t);
0248         if (!res) return false;
0249         if (res != SUCCESSFULLY_ENQUEUED) spawn_in_graph_arena(graph_reference(), *res);
0250         return true;
0251     }
0252 
0253     //! put item to successor; return task to run the successor if possible.
0254 protected:
0255     //! The input type of this receiver
0256     typedef T input_type;
0257 
0258     //! The predecessor type for this node
0259     typedef sender<T> predecessor_type;
0260 
0261     template< typename R, typename B > friend class run_and_put_task;
0262     template< typename X, typename Y > friend class broadcast_cache;
0263     template< typename X, typename Y > friend class round_robin_cache;
0264     virtual graph_task *try_put_task(const T& t) = 0;
0265     virtual graph& graph_reference() const = 0;
0266 
0267     template<typename TT, typename M> friend class successor_cache;
0268     virtual bool is_continue_receiver() { return false; }
0269 
0270     // TODO revamp: reconsider the inheritance and move node priority out of receiver
0271     virtual node_priority_t priority() const { return no_priority; }
0272 
0273     //! Add a predecessor to the node
0274     virtual bool register_predecessor( predecessor_type & ) { return false; }
0275 
0276     //! Remove a predecessor from the node
0277     virtual bool remove_predecessor( predecessor_type & ) { return false; }
0278 
0279     template <typename C>
0280     friend bool register_predecessor(receiver<C>& r, sender<C>& s);
0281     template <typename C>
0282     friend bool remove_predecessor  (receiver<C>& r, sender<C>& s);
0283 }; // class receiver<T>
0284 
0285 template <typename C>
0286 bool register_predecessor(receiver<C>& r, sender<C>& s) {
0287     return r.register_predecessor(s);
0288 }
0289 
0290 template <typename C>
0291 bool remove_predecessor(receiver<C>& r, sender<C>& s) {
0292     return r.remove_predecessor(s);
0293 }
0294 
0295 //! Base class for receivers of completion messages
0296 /** These receivers automatically reset, but cannot be explicitly waited on */
0297 class continue_receiver : public receiver< continue_msg > {
0298 protected:
0299 
0300     //! Constructor
0301     explicit continue_receiver( int number_of_predecessors, node_priority_t a_priority ) {
0302         my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
0303         my_current_count = 0;
0304         my_priority = a_priority;
0305     }
0306 
0307     //! Copy constructor
0308     continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
0309         my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
0310         my_current_count = 0;
0311         my_priority = src.my_priority;
0312     }
0313 
0314     //! Increments the trigger threshold
0315     bool register_predecessor( predecessor_type & ) override {
0316         spin_mutex::scoped_lock l(my_mutex);
0317         ++my_predecessor_count;
0318         return true;
0319     }
0320 
0321     //! Decrements the trigger threshold
0322     /** Does not check to see if the removal of the predecessor now makes the current count
0323         exceed the new threshold.  So removing a predecessor while the graph is active can cause
0324         unexpected results. */
0325     bool remove_predecessor( predecessor_type & ) override {
0326         spin_mutex::scoped_lock l(my_mutex);
0327         --my_predecessor_count;
0328         return true;
0329     }
0330 
0331     //! The input type
0332     typedef continue_msg input_type;
0333 
0334     //! The predecessor type for this node
0335     typedef receiver<input_type>::predecessor_type predecessor_type;
0336 
0337     template< typename R, typename B > friend class run_and_put_task;
0338     template<typename X, typename Y> friend class broadcast_cache;
0339     template<typename X, typename Y> friend class round_robin_cache;
0340     // execute body is supposed to be too small to create a task for.
0341     graph_task* try_put_task( const input_type & ) override {
0342         {
0343             spin_mutex::scoped_lock l(my_mutex);
0344             if ( ++my_current_count < my_predecessor_count )
0345                 return SUCCESSFULLY_ENQUEUED;
0346             else
0347                 my_current_count = 0;
0348         }
0349         graph_task* res = execute();
0350         return res? res : SUCCESSFULLY_ENQUEUED;
0351     }
0352 
0353     spin_mutex my_mutex;
0354     int my_predecessor_count;
0355     int my_current_count;
0356     int my_initial_predecessor_count;
0357     node_priority_t my_priority;
0358     // the friend declaration in the base class did not eliminate the "protected class"
0359     // error in gcc 4.1.2
0360     template<typename U, typename V> friend class limiter_node;
0361 
0362     virtual void reset_receiver( reset_flags f ) {
0363         my_current_count = 0;
0364         if (f & rf_clear_edges) {
0365             my_predecessor_count = my_initial_predecessor_count;
0366         }
0367     }
0368 
0369     //! Does whatever should happen when the threshold is reached
0370     /** This should be very fast or else spawn a task.  This is
0371         called while the sender is blocked in the try_put(). */
0372     virtual graph_task* execute() = 0;
0373     template<typename TT, typename M> friend class successor_cache;
0374     bool is_continue_receiver() override { return true; }
0375 
0376     node_priority_t priority() const override { return my_priority; }
0377 }; // class continue_receiver
0378 
0379 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
0380     template <typename K, typename T>
0381     K key_from_message( const T &t ) {
0382         return t.key();
0383     }
0384 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
0385 
0386 } // d1
0387 } // detail
0388 } // tbb
0389 
0390 #include "detail/_flow_graph_trace_impl.h"
0391 #include "detail/_hash_compare.h"
0392 
0393 namespace tbb {
0394 namespace detail {
0395 namespace d1 {
0396 
0397 #include "detail/_flow_graph_body_impl.h"
0398 #include "detail/_flow_graph_cache_impl.h"
0399 #include "detail/_flow_graph_types_impl.h"
0400 
0401 using namespace graph_policy_namespace;
0402 
0403 template <typename C, typename N>
0404 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(nullptr)
0405 {
0406     if (begin) current_node = my_graph->my_nodes;
0407     //else it is an end iterator by default
0408 }
0409 
0410 template <typename C, typename N>
0411 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
0412     __TBB_ASSERT(current_node, "graph_iterator at end");
0413     return *operator->();
0414 }
0415 
0416 template <typename C, typename N>
0417 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
0418     return current_node;
0419 }
0420 
0421 template <typename C, typename N>
0422 void graph_iterator<C,N>::internal_forward() {
0423     if (current_node) current_node = current_node->next;
0424 }
0425 
0426 //! Constructs a graph with isolated task_group_context
0427 inline graph::graph() : my_wait_context(0), my_nodes(nullptr), my_nodes_last(nullptr), my_task_arena(nullptr) {
0428     prepare_task_arena();
0429     own_context = true;
0430     cancelled = false;
0431     caught_exception = false;
0432     my_context = new (r1::cache_aligned_allocate(sizeof(task_group_context))) task_group_context(FLOW_TASKS);
0433     fgt_graph(this);
0434     my_is_active = true;
0435 }
0436 
0437 inline graph::graph(task_group_context& use_this_context) :
0438     my_wait_context(0), my_context(&use_this_context), my_nodes(nullptr), my_nodes_last(nullptr), my_task_arena(nullptr) {
0439     prepare_task_arena();
0440     own_context = false;
0441     cancelled = false;
0442     caught_exception = false;
0443     fgt_graph(this);
0444     my_is_active = true;
0445 }
0446 
0447 inline graph::~graph() {
0448     wait_for_all();
0449     if (own_context) {
0450         my_context->~task_group_context();
0451         r1::cache_aligned_deallocate(my_context);
0452     }
0453     delete my_task_arena;
0454 }
0455 
0456 inline void graph::reserve_wait() {
0457     my_wait_context.reserve();
0458     fgt_reserve_wait(this);
0459 }
0460 
0461 inline void graph::release_wait() {
0462     fgt_release_wait(this);
0463     my_wait_context.release();
0464 }
0465 
0466 inline void graph::register_node(graph_node *n) {
0467     n->next = nullptr;
0468     {
0469         spin_mutex::scoped_lock lock(nodelist_mutex);
0470         n->prev = my_nodes_last;
0471         if (my_nodes_last) my_nodes_last->next = n;
0472         my_nodes_last = n;
0473         if (!my_nodes) my_nodes = n;
0474     }
0475 }
0476 
0477 inline void graph::remove_node(graph_node *n) {
0478     {
0479         spin_mutex::scoped_lock lock(nodelist_mutex);
0480         __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
0481         if (n->prev) n->prev->next = n->next;
0482         if (n->next) n->next->prev = n->prev;
0483         if (my_nodes_last == n) my_nodes_last = n->prev;
0484         if (my_nodes == n) my_nodes = n->next;
0485     }
0486     n->prev = n->next = nullptr;
0487 }
0488 
0489 inline void graph::reset( reset_flags f ) {
0490     // reset context
0491     deactivate_graph(*this);
0492 
0493     my_context->reset();
0494     cancelled = false;
0495     caught_exception = false;
0496     // reset all the nodes comprising the graph
0497     for(iterator ii = begin(); ii != end(); ++ii) {
0498         graph_node *my_p = &(*ii);
0499         my_p->reset_node(f);
0500     }
0501     // Reattach the arena. Might be useful to run the graph in a particular task_arena
0502     // while not limiting graph lifetime to a single task_arena::execute() call.
0503     prepare_task_arena( /*reinit=*/true );
0504     activate_graph(*this);
0505 }
0506 
0507 inline void graph::cancel() {
0508     my_context->cancel_group_execution();
0509 }
0510 
0511 inline graph::iterator graph::begin() { return iterator(this, true); }
0512 
0513 inline graph::iterator graph::end() { return iterator(this, false); }
0514 
0515 inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
0516 
0517 inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
0518 
0519 inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
0520 
0521 inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
0522 
0523 inline graph_node::graph_node(graph& g) : my_graph(g) {
0524     my_graph.register_node(this);
0525 }
0526 
0527 inline graph_node::~graph_node() {
0528     my_graph.remove_node(this);
0529 }
0530 
0531 #include "detail/_flow_graph_node_impl.h"
0532 
0533 
0534 //! An executable node that acts as a source, i.e. it has no predecessors
0535 
0536 template < typename Output >
0537     __TBB_requires(std::copyable<Output>)
0538 class input_node : public graph_node, public sender< Output > {
0539 public:
0540     //! The type of the output message, which is complete
0541     typedef Output output_type;
0542 
0543     //! The type of successors of this node
0544     typedef typename sender<output_type>::successor_type successor_type;
0545 
0546     // Input node has no input type
0547     typedef null_type input_type;
0548 
0549     //! Constructor for a node with a successor
0550     template< typename Body >
0551         __TBB_requires(input_node_body<Body, Output>)
0552      __TBB_NOINLINE_SYM input_node( graph &g, Body body )
0553          : graph_node(g), my_active(false)
0554          , my_body( new input_body_leaf< output_type, Body>(body) )
0555          , my_init_body( new input_body_leaf< output_type, Body>(body) )
0556          , my_successors(this), my_reserved(false), my_has_cached_item(false)
0557     {
0558         fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph,
0559                            static_cast<sender<output_type> *>(this), this->my_body);
0560     }
0561 
0562 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0563     template <typename Body, typename... Successors>
0564         __TBB_requires(input_node_body<Body, Output>)
0565     input_node( const node_set<order::preceding, Successors...>& successors, Body body )
0566         : input_node(successors.graph_reference(), body)
0567     {
0568         make_edges(*this, successors);
0569     }
0570 #endif
0571 
0572     //! Copy constructor
0573     __TBB_NOINLINE_SYM input_node( const input_node& src )
0574         : graph_node(src.my_graph), sender<Output>()
0575         , my_active(false)
0576         , my_body(src.my_init_body->clone()), my_init_body(src.my_init_body->clone())
0577         , my_successors(this), my_reserved(false), my_has_cached_item(false)
0578     {
0579         fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph,
0580                            static_cast<sender<output_type> *>(this), this->my_body);
0581     }
0582 
0583     //! The destructor
0584     ~input_node() { delete my_body; delete my_init_body; }
0585 
0586     //! Add a new successor to this node
0587     bool register_successor( successor_type &r ) override {
0588         spin_mutex::scoped_lock lock(my_mutex);
0589         my_successors.register_successor(r);
0590         if ( my_active )
0591             spawn_put();
0592         return true;
0593     }
0594 
0595     //! Removes a successor from this node
0596     bool remove_successor( successor_type &r ) override {
0597         spin_mutex::scoped_lock lock(my_mutex);
0598         my_successors.remove_successor(r);
0599         return true;
0600     }
0601 
0602     //! Request an item from the node
0603     bool try_get( output_type &v ) override {
0604         spin_mutex::scoped_lock lock(my_mutex);
0605         if ( my_reserved )
0606             return false;
0607 
0608         if ( my_has_cached_item ) {
0609             v = my_cached_item;
0610             my_has_cached_item = false;
0611             return true;
0612         }
0613         // we've been asked to provide an item, but we have none.  enqueue a task to
0614         // provide one.
0615         if ( my_active )
0616             spawn_put();
0617         return false;
0618     }
0619 
0620     //! Reserves an item.
0621     bool try_reserve( output_type &v ) override {
0622         spin_mutex::scoped_lock lock(my_mutex);
0623         if ( my_reserved ) {
0624             return false;
0625         }
0626 
0627         if ( my_has_cached_item ) {
0628             v = my_cached_item;
0629             my_reserved = true;
0630             return true;
0631         } else {
0632             return false;
0633         }
0634     }
0635 
0636     //! Release a reserved item.
0637     /** true = item has been released and so remains in sender, dest must request or reserve future items */
0638     bool try_release( ) override {
0639         spin_mutex::scoped_lock lock(my_mutex);
0640         __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
0641         my_reserved = false;
0642         if(!my_successors.empty())
0643             spawn_put();
0644         return true;
0645     }
0646 
0647     //! Consumes a reserved item
0648     bool try_consume( ) override {
0649         spin_mutex::scoped_lock lock(my_mutex);
0650         __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
0651         my_reserved = false;
0652         my_has_cached_item = false;
0653         if ( !my_successors.empty() ) {
0654             spawn_put();
0655         }
0656         return true;
0657     }
0658 
0659     //! Activates a node that was created in the inactive state
0660     void activate() {
0661         spin_mutex::scoped_lock lock(my_mutex);
0662         my_active = true;
0663         if (!my_successors.empty())
0664             spawn_put();
0665     }
0666 
0667     template<typename Body>
0668     Body copy_function_object() {
0669         input_body<output_type> &body_ref = *this->my_body;
0670         return dynamic_cast< input_body_leaf<output_type, Body> & >(body_ref).get_body();
0671     }
0672 
0673 protected:
0674 
0675     //! resets the input_node to its initial state
0676     void reset_node( reset_flags f) override {
0677         my_active = false;
0678         my_reserved = false;
0679         my_has_cached_item = false;
0680 
0681         if(f & rf_clear_edges) my_successors.clear();
0682         if(f & rf_reset_bodies) {
0683             input_body<output_type> *tmp = my_init_body->clone();
0684             delete my_body;
0685             my_body = tmp;
0686         }
0687     }
0688 
0689 private:
0690     spin_mutex my_mutex;
0691     bool my_active;
0692     input_body<output_type> *my_body;
0693     input_body<output_type> *my_init_body;
0694     broadcast_cache< output_type > my_successors;
0695     bool my_reserved;
0696     bool my_has_cached_item;
0697     output_type my_cached_item;
0698 
0699     // used by apply_body_bypass, can invoke body of node.
0700     bool try_reserve_apply_body(output_type &v) {
0701         spin_mutex::scoped_lock lock(my_mutex);
0702         if ( my_reserved ) {
0703             return false;
0704         }
0705         if ( !my_has_cached_item ) {
0706             flow_control control;
0707 
0708             fgt_begin_body( my_body );
0709 
0710             my_cached_item = (*my_body)(control);
0711             my_has_cached_item = !control.is_pipeline_stopped;
0712 
0713             fgt_end_body( my_body );
0714         }
0715         if ( my_has_cached_item ) {
0716             v = my_cached_item;
0717             my_reserved = true;
0718             return true;
0719         } else {
0720             return false;
0721         }
0722     }
0723 
0724     graph_task* create_put_task() {
0725         small_object_allocator allocator{};
0726         typedef input_node_task_bypass< input_node<output_type> > task_type;
0727         graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
0728         my_graph.reserve_wait();
0729         return t;
0730     }
0731 
0732     //! Spawns a task that applies the body
0733     void spawn_put( ) {
0734         if(is_graph_active(this->my_graph)) {
0735             spawn_in_graph_arena(this->my_graph, *create_put_task());
0736         }
0737     }
0738 
0739     friend class input_node_task_bypass< input_node<output_type> >;
0740     //! Applies the body.  Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
0741     graph_task* apply_body_bypass( ) {
0742         output_type v;
0743         if ( !try_reserve_apply_body(v) )
0744             return nullptr;
0745 
0746         graph_task *last_task = my_successors.try_put_task(v);
0747         if ( last_task )
0748             try_consume();
0749         else
0750             try_release();
0751         return last_task;
0752     }
0753 };  // class input_node
0754 
0755 //! Implements a function node that supports Input -> Output
0756 template<typename Input, typename Output = continue_msg, typename Policy = queueing>
0757     __TBB_requires(std::default_initializable<Input> &&
0758                    std::copy_constructible<Input> &&
0759                    std::copy_constructible<Output>)
0760 class function_node
0761     : public graph_node
0762     , public function_input< Input, Output, Policy, cache_aligned_allocator<Input> >
0763     , public function_output<Output>
0764 {
0765     typedef cache_aligned_allocator<Input> internals_allocator;
0766 
0767 public:
0768     typedef Input input_type;
0769     typedef Output output_type;
0770     typedef function_input<input_type,output_type,Policy,internals_allocator> input_impl_type;
0771     typedef function_input_queue<input_type, internals_allocator> input_queue_type;
0772     typedef function_output<output_type> fOutput_type;
0773     typedef typename input_impl_type::predecessor_type predecessor_type;
0774     typedef typename fOutput_type::successor_type successor_type;
0775 
0776     using input_impl_type::my_predecessors;
0777 
0778     //! Constructor
0779     // input_queue_type is allocated here, but destroyed in the function_input_base.
0780     // TODO: pass the graph_buffer_policy to the function_input_base so it can all
0781     // be done in one place.  This would be an interface-breaking change.
0782     template< typename Body >
0783         __TBB_requires(function_node_body<Body, Input, Output>)
0784      __TBB_NOINLINE_SYM function_node( graph &g, size_t concurrency,
0785                    Body body, Policy = Policy(), node_priority_t a_priority = no_priority )
0786         : graph_node(g), input_impl_type(g, concurrency, body, a_priority),
0787           fOutput_type(g) {
0788         fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph,
0789                 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
0790     }
0791 
0792     template <typename Body>
0793         __TBB_requires(function_node_body<Body, Input, Output>)
0794     function_node( graph& g, size_t concurrency, Body body, node_priority_t a_priority )
0795         : function_node(g, concurrency, body, Policy(), a_priority) {}
0796 
0797 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0798     template <typename Body, typename... Args>
0799         __TBB_requires(function_node_body<Body, Input, Output>)
0800     function_node( const node_set<Args...>& nodes, size_t concurrency, Body body,
0801                    Policy p = Policy(), node_priority_t a_priority = no_priority )
0802         : function_node(nodes.graph_reference(), concurrency, body, p, a_priority) {
0803         make_edges_in_order(nodes, *this);
0804     }
0805 
0806     template <typename Body, typename... Args>
0807         __TBB_requires(function_node_body<Body, Input, Output>)
0808     function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority )
0809         : function_node(nodes, concurrency, body, Policy(), a_priority) {}
0810 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0811 
0812     //! Copy constructor
0813     __TBB_NOINLINE_SYM function_node( const function_node& src ) :
0814         graph_node(src.my_graph),
0815         input_impl_type(src),
0816         fOutput_type(src.my_graph) {
0817         fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph,
0818                 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
0819     }
0820 
0821 protected:
0822     template< typename R, typename B > friend class run_and_put_task;
0823     template<typename X, typename Y> friend class broadcast_cache;
0824     template<typename X, typename Y> friend class round_robin_cache;
0825     using input_impl_type::try_put_task;
0826 
0827     broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; }
0828 
0829     void reset_node(reset_flags f) override {
0830         input_impl_type::reset_function_input(f);
0831         // TODO: use clear() instead.
0832         if(f & rf_clear_edges) {
0833             successors().clear();
0834             my_predecessors.clear();
0835         }
0836         __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
0837         __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
0838     }
0839 
0840 };  // class function_node
0841 
0842 //! implements a function node that supports Input -> (set of outputs)
0843 // Output is a tuple of output types.
0844 template<typename Input, typename Output, typename Policy = queueing>
0845     __TBB_requires(std::default_initializable<Input> &&
0846                    std::copy_constructible<Input>)
0847 class multifunction_node :
0848     public graph_node,
0849     public multifunction_input
0850     <
0851         Input,
0852         typename wrap_tuple_elements<
0853             std::tuple_size<Output>::value,  // #elements in tuple
0854             multifunction_output,  // wrap this around each element
0855             Output // the tuple providing the types
0856         >::type,
0857         Policy,
0858         cache_aligned_allocator<Input>
0859     >
0860 {
0861     typedef cache_aligned_allocator<Input> internals_allocator;
0862 
0863 protected:
0864     static const int N = std::tuple_size<Output>::value;
0865 public:
0866     typedef Input input_type;
0867     typedef null_type output_type;
0868     typedef typename wrap_tuple_elements<N,multifunction_output, Output>::type output_ports_type;
0869     typedef multifunction_input<
0870         input_type, output_ports_type, Policy, internals_allocator> input_impl_type;
0871     typedef function_input_queue<input_type, internals_allocator> input_queue_type;
0872 private:
0873     using input_impl_type::my_predecessors;
0874 public:
0875     template<typename Body>
0876         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
0877     __TBB_NOINLINE_SYM multifunction_node(
0878         graph &g, size_t concurrency,
0879         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
0880     ) : graph_node(g), input_impl_type(g, concurrency, body, a_priority) {
0881         fgt_multioutput_node_with_body<N>(
0882             CODEPTR(), FLOW_MULTIFUNCTION_NODE,
0883             &this->my_graph, static_cast<receiver<input_type> *>(this),
0884             this->output_ports(), this->my_body
0885         );
0886     }
0887 
0888     template <typename Body>
0889         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
0890     __TBB_NOINLINE_SYM multifunction_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority)
0891         : multifunction_node(g, concurrency, body, Policy(), a_priority) {}
0892 
0893 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0894     template <typename Body, typename... Args>
0895         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
0896     __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body,
0897                        Policy p = Policy(), node_priority_t a_priority = no_priority)
0898         : multifunction_node(nodes.graph_reference(), concurrency, body, p, a_priority) {
0899         make_edges_in_order(nodes, *this);
0900     }
0901 
0902     template <typename Body, typename... Args>
0903         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
0904     __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority)
0905         : multifunction_node(nodes, concurrency, body, Policy(), a_priority) {}
0906 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0907 
0908     __TBB_NOINLINE_SYM multifunction_node( const multifunction_node &other) :
0909         graph_node(other.my_graph), input_impl_type(other) {
0910         fgt_multioutput_node_with_body<N>( CODEPTR(), FLOW_MULTIFUNCTION_NODE,
0911                 &this->my_graph, static_cast<receiver<input_type> *>(this),
0912                 this->output_ports(), this->my_body );
0913     }
0914 
0915     // all the guts are in multifunction_input...
0916 protected:
0917     void reset_node(reset_flags f) override { input_impl_type::reset(f); }
0918 };  // multifunction_node
0919 
0920 //! split_node: accepts a tuple as input, forwards each element of the tuple to its
0921 //  successors.  The node has unlimited concurrency, so it does not reject inputs.
0922 template<typename TupleType>
0923 class split_node : public graph_node, public receiver<TupleType> {
0924     static const int N = std::tuple_size<TupleType>::value;
0925     typedef receiver<TupleType> base_type;
0926 public:
0927     typedef TupleType input_type;
0928     typedef typename wrap_tuple_elements<
0929             N,  // #elements in tuple
0930             multifunction_output,  // wrap this around each element
0931             TupleType // the tuple providing the types
0932         >::type  output_ports_type;
0933 
0934     __TBB_NOINLINE_SYM explicit split_node(graph &g)
0935         : graph_node(g),
0936           my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports))
0937     {
0938         fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph,
0939             static_cast<receiver<input_type> *>(this), this->output_ports());
0940     }
0941 
0942 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0943     template <typename... Args>
0944     __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) {
0945         make_edges_in_order(nodes, *this);
0946     }
0947 #endif
0948 
0949     __TBB_NOINLINE_SYM split_node(const split_node& other)
0950         : graph_node(other.my_graph), base_type(other),
0951           my_output_ports(init_output_ports<output_ports_type>::call(other.my_graph, my_output_ports))
0952     {
0953         fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph,
0954             static_cast<receiver<input_type> *>(this), this->output_ports());
0955     }
0956 
0957     output_ports_type &output_ports() { return my_output_ports; }
0958 
0959 protected:
0960     graph_task *try_put_task(const TupleType& t) override {
0961         // Sending split messages in parallel is not justified, as overheads would prevail.
0962         // Also, we do not have successors here. So we just tell the task returned here is successful.
0963         return emit_element<N>::emit_this(this->my_graph, t, output_ports());
0964     }
0965     void reset_node(reset_flags f) override {
0966         if (f & rf_clear_edges)
0967             clear_element<N>::clear_this(my_output_ports);
0968 
0969         __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "split_node reset failed");
0970     }
0971     graph& graph_reference() const override {
0972         return my_graph;
0973     }
0974 
0975 private:
0976     output_ports_type my_output_ports;
0977 };
0978 
0979 //! Implements an executable node that supports continue_msg -> Output
0980 template <typename Output, typename Policy = Policy<void> >
0981     __TBB_requires(std::copy_constructible<Output>)
0982 class continue_node : public graph_node, public continue_input<Output, Policy>,
0983                       public function_output<Output> {
0984 public:
0985     typedef continue_msg input_type;
0986     typedef Output output_type;
0987     typedef continue_input<Output, Policy> input_impl_type;
0988     typedef function_output<output_type> fOutput_type;
0989     typedef typename input_impl_type::predecessor_type predecessor_type;
0990     typedef typename fOutput_type::successor_type successor_type;
0991 
0992     //! Constructor for executable node with continue_msg -> Output
0993     template <typename Body >
0994         __TBB_requires(continue_node_body<Body, Output>)
0995     __TBB_NOINLINE_SYM continue_node(
0996         graph &g,
0997         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
0998     ) : graph_node(g), input_impl_type( g, body, a_priority ),
0999         fOutput_type(g) {
1000         fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1001 
1002                                            static_cast<receiver<input_type> *>(this),
1003                                            static_cast<sender<output_type> *>(this), this->my_body );
1004     }
1005 
1006     template <typename Body>
1007         __TBB_requires(continue_node_body<Body, Output>)
1008     continue_node( graph& g, Body body, node_priority_t a_priority )
1009         : continue_node(g, body, Policy(), a_priority) {}
1010 
1011 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1012     template <typename Body, typename... Args>
1013         __TBB_requires(continue_node_body<Body, Output>)
1014     continue_node( const node_set<Args...>& nodes, Body body,
1015                    Policy p = Policy(), node_priority_t a_priority = no_priority )
1016         : continue_node(nodes.graph_reference(), body, p, a_priority ) {
1017         make_edges_in_order(nodes, *this);
1018     }
1019     template <typename Body, typename... Args>
1020         __TBB_requires(continue_node_body<Body, Output>)
1021     continue_node( const node_set<Args...>& nodes, Body body, node_priority_t a_priority)
1022         : continue_node(nodes, body, Policy(), a_priority) {}
1023 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1024 
1025     //! Constructor for executable node with continue_msg -> Output
1026     template <typename Body >
1027         __TBB_requires(continue_node_body<Body, Output>)
1028     __TBB_NOINLINE_SYM continue_node(
1029         graph &g, int number_of_predecessors,
1030         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
1031     ) : graph_node(g)
1032       , input_impl_type(g, number_of_predecessors, body, a_priority),
1033         fOutput_type(g) {
1034         fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1035                                            static_cast<receiver<input_type> *>(this),
1036                                            static_cast<sender<output_type> *>(this), this->my_body );
1037     }
1038 
1039     template <typename Body>
1040         __TBB_requires(continue_node_body<Body, Output>)
1041     continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t a_priority)
1042         : continue_node(g, number_of_predecessors, body, Policy(), a_priority) {}
1043 
1044 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1045     template <typename Body, typename... Args>
1046         __TBB_requires(continue_node_body<Body, Output>)
1047     continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1048                    Body body, Policy p = Policy(), node_priority_t a_priority = no_priority )
1049         : continue_node(nodes.graph_reference(), number_of_predecessors, body, p, a_priority) {
1050         make_edges_in_order(nodes, *this);
1051     }
1052 
1053     template <typename Body, typename... Args>
1054         __TBB_requires(continue_node_body<Body, Output>)
1055     continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1056                    Body body, node_priority_t a_priority )
1057         : continue_node(nodes, number_of_predecessors, body, Policy(), a_priority) {}
1058 #endif
1059 
1060     //! Copy constructor
1061     __TBB_NOINLINE_SYM continue_node( const continue_node& src ) :
1062         graph_node(src.my_graph), input_impl_type(src),
1063         function_output<Output>(src.my_graph) {
1064         fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1065                                            static_cast<receiver<input_type> *>(this),
1066                                            static_cast<sender<output_type> *>(this), this->my_body );
1067     }
1068 
1069 protected:
1070     template< typename R, typename B > friend class run_and_put_task;
1071     template<typename X, typename Y> friend class broadcast_cache;
1072     template<typename X, typename Y> friend class round_robin_cache;
1073     using input_impl_type::try_put_task;
1074     broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; }
1075 
1076     void reset_node(reset_flags f) override {
1077         input_impl_type::reset_receiver(f);
1078         if(f & rf_clear_edges)successors().clear();
1079         __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1080     }
1081 };  // continue_node
1082 
1083 //! Forwards messages of type T to all successors
1084 template <typename T>
1085 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1086 public:
1087     typedef T input_type;
1088     typedef T output_type;
1089     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1090     typedef typename sender<output_type>::successor_type successor_type;
1091 private:
1092     broadcast_cache<input_type> my_successors;
1093 public:
1094 
1095     __TBB_NOINLINE_SYM explicit broadcast_node(graph& g) : graph_node(g), my_successors(this) {
1096         fgt_node( CODEPTR(), FLOW_BROADCAST_NODE, &this->my_graph,
1097                   static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1098     }
1099 
1100 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1101     template <typename... Args>
1102     broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) {
1103         make_edges_in_order(nodes, *this);
1104     }
1105 #endif
1106 
1107     // Copy constructor
1108     __TBB_NOINLINE_SYM broadcast_node( const broadcast_node& src ) : broadcast_node(src.my_graph) {}
1109 
1110     //! Adds a successor
1111     bool register_successor( successor_type &r ) override {
1112         my_successors.register_successor( r );
1113         return true;
1114     }
1115 
1116     //! Removes s as a successor
1117     bool remove_successor( successor_type &r ) override {
1118         my_successors.remove_successor( r );
1119         return true;
1120     }
1121 
1122 protected:
1123     template< typename R, typename B > friend class run_and_put_task;
1124     template<typename X, typename Y> friend class broadcast_cache;
1125     template<typename X, typename Y> friend class round_robin_cache;
1126     //! build a task to run the successor if possible.  Default is old behavior.
1127     graph_task *try_put_task(const T& t) override {
1128         graph_task *new_task = my_successors.try_put_task(t);
1129         if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
1130         return new_task;
1131     }
1132 
1133     graph& graph_reference() const override {
1134         return my_graph;
1135     }
1136 
1137     void reset_node(reset_flags f) override {
1138         if (f&rf_clear_edges) {
1139            my_successors.clear();
1140         }
1141         __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
1142     }
1143 };  // broadcast_node
1144 
1145 //! Forwards messages in arbitrary order
1146 template <typename T>
1147 class buffer_node
1148     : public graph_node
1149     , public reservable_item_buffer< T, cache_aligned_allocator<T> >
1150     , public receiver<T>, public sender<T>
1151 {
1152     typedef cache_aligned_allocator<T> internals_allocator;
1153 
1154 public:
1155     typedef T input_type;
1156     typedef T output_type;
1157     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1158     typedef typename sender<output_type>::successor_type successor_type;
1159     typedef buffer_node<T> class_type;
1160 
1161 protected:
1162     typedef size_t size_type;
1163     round_robin_cache< T, null_rw_mutex > my_successors;
1164 
1165     friend class forward_task_bypass< class_type >;
1166 
1167     enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
1168     };
1169 
1170     // implements the aggregator_operation concept
1171     class buffer_operation : public aggregated_operation< buffer_operation > {
1172     public:
1173         char type;
1174         T* elem;
1175         graph_task* ltask;
1176         successor_type *r;
1177 
1178         buffer_operation(const T& e, op_type t) : type(char(t))
1179                                                   , elem(const_cast<T*>(&e)) , ltask(nullptr)
1180                                                   , r(nullptr)
1181         {}
1182         buffer_operation(op_type t) : type(char(t)), elem(nullptr), ltask(nullptr), r(nullptr) {}
1183     };
1184 
1185     bool forwarder_busy;
1186     typedef aggregating_functor<class_type, buffer_operation> handler_type;
1187     friend class aggregating_functor<class_type, buffer_operation>;
1188     aggregator< handler_type, buffer_operation> my_aggregator;
1189 
1190     virtual void handle_operations(buffer_operation *op_list) {
1191         handle_operations_impl(op_list, this);
1192     }
1193 
1194     template<typename derived_type>
1195     void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
1196         __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1197 
1198         buffer_operation *tmp = nullptr;
1199         bool try_forwarding = false;
1200         while (op_list) {
1201             tmp = op_list;
1202             op_list = op_list->next;
1203             switch (tmp->type) {
1204             case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
1205             case rem_succ: internal_rem_succ(tmp); break;
1206             case req_item: internal_pop(tmp); break;
1207             case res_item: internal_reserve(tmp); break;
1208             case rel_res:  internal_release(tmp); try_forwarding = true; break;
1209             case con_res:  internal_consume(tmp); try_forwarding = true; break;
1210             case put_item: try_forwarding = internal_push(tmp); break;
1211             case try_fwd_task: internal_forward_task(tmp); break;
1212             }
1213         }
1214 
1215         derived->order();
1216 
1217         if (try_forwarding && !forwarder_busy) {
1218             if(is_graph_active(this->my_graph)) {
1219                 forwarder_busy = true;
1220                 typedef forward_task_bypass<class_type> task_type;
1221                 small_object_allocator allocator{};
1222                 graph_task* new_task = allocator.new_object<task_type>(graph_reference(), allocator, *this);
1223                 my_graph.reserve_wait();
1224                 // tmp should point to the last item handled by the aggregator.  This is the operation
1225                 // the handling thread enqueued.  So modifying that record will be okay.
1226                 // TODO revamp: check that the issue is still present
1227                 // workaround for icc bug  (at least 12.0 and 13.0)
1228                 // error: function "tbb::flow::interfaceX::combine_tasks" cannot be called with the given argument list
1229                 //        argument types are: (graph, graph_task *, graph_task *)
1230                 graph_task *z = tmp->ltask;
1231                 graph &g = this->my_graph;
1232                 tmp->ltask = combine_tasks(g, z, new_task);  // in case the op generated a task
1233             }
1234         }
1235     }  // handle_operations
1236 
1237     inline graph_task *grab_forwarding_task( buffer_operation &op_data) {
1238         return op_data.ltask;
1239     }
1240 
1241     inline bool enqueue_forwarding_task(buffer_operation &op_data) {
1242         graph_task *ft = grab_forwarding_task(op_data);
1243         if(ft) {
1244             spawn_in_graph_arena(graph_reference(), *ft);
1245             return true;
1246         }
1247         return false;
1248     }
1249 
1250     //! This is executed by an enqueued task, the "forwarder"
1251     virtual graph_task *forward_task() {
1252         buffer_operation op_data(try_fwd_task);
1253         graph_task *last_task = nullptr;
1254         do {
1255             op_data.status = WAIT;
1256             op_data.ltask = nullptr;
1257             my_aggregator.execute(&op_data);
1258 
1259             // workaround for icc bug
1260             graph_task *xtask = op_data.ltask;
1261             graph& g = this->my_graph;
1262             last_task = combine_tasks(g, last_task, xtask);
1263         } while (op_data.status ==SUCCEEDED);
1264         return last_task;
1265     }
1266 
1267     //! Register successor
1268     virtual void internal_reg_succ(buffer_operation *op) {
1269         __TBB_ASSERT(op->r, nullptr);
1270         my_successors.register_successor(*(op->r));
1271         op->status.store(SUCCEEDED, std::memory_order_release);
1272     }
1273 
1274     //! Remove successor
1275     virtual void internal_rem_succ(buffer_operation *op) {
1276         __TBB_ASSERT(op->r, nullptr);
1277         my_successors.remove_successor(*(op->r));
1278         op->status.store(SUCCEEDED, std::memory_order_release);
1279     }
1280 
1281 private:
1282     void order() {}
1283 
1284     bool is_item_valid() {
1285         return this->my_item_valid(this->my_tail - 1);
1286     }
1287 
1288     void try_put_and_add_task(graph_task*& last_task) {
1289         graph_task *new_task = my_successors.try_put_task(this->back());
1290         if (new_task) {
1291             // workaround for icc bug
1292             graph& g = this->my_graph;
1293             last_task = combine_tasks(g, last_task, new_task);
1294             this->destroy_back();
1295         }
1296     }
1297 
1298 protected:
1299     //! Tries to forward valid items to successors
1300     virtual void internal_forward_task(buffer_operation *op) {
1301         internal_forward_task_impl(op, this);
1302     }
1303 
1304     template<typename derived_type>
1305     void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
1306         __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1307 
1308         if (this->my_reserved || !derived->is_item_valid()) {
1309             op->status.store(FAILED, std::memory_order_release);
1310             this->forwarder_busy = false;
1311             return;
1312         }
1313         // Try forwarding, giving each successor a chance
1314         graph_task* last_task = nullptr;
1315         size_type counter = my_successors.size();
1316         for (; counter > 0 && derived->is_item_valid(); --counter)
1317             derived->try_put_and_add_task(last_task);
1318 
1319         op->ltask = last_task;  // return task
1320         if (last_task && !counter) {
1321             op->status.store(SUCCEEDED, std::memory_order_release);
1322         }
1323         else {
1324             op->status.store(FAILED, std::memory_order_release);
1325             forwarder_busy = false;
1326         }
1327     }
1328 
1329     virtual bool internal_push(buffer_operation *op) {
1330         __TBB_ASSERT(op->elem, nullptr);
1331         this->push_back(*(op->elem));
1332         op->status.store(SUCCEEDED, std::memory_order_release);
1333         return true;
1334     }
1335 
1336     virtual void internal_pop(buffer_operation *op) {
1337         __TBB_ASSERT(op->elem, nullptr);
1338         if(this->pop_back(*(op->elem))) {
1339             op->status.store(SUCCEEDED, std::memory_order_release);
1340         }
1341         else {
1342             op->status.store(FAILED, std::memory_order_release);
1343         }
1344     }
1345 
1346     virtual void internal_reserve(buffer_operation *op) {
1347         __TBB_ASSERT(op->elem, nullptr);
1348         if(this->reserve_front(*(op->elem))) {
1349             op->status.store(SUCCEEDED, std::memory_order_release);
1350         }
1351         else {
1352             op->status.store(FAILED, std::memory_order_release);
1353         }
1354     }
1355 
1356     virtual void internal_consume(buffer_operation *op) {
1357         this->consume_front();
1358         op->status.store(SUCCEEDED, std::memory_order_release);
1359     }
1360 
1361     virtual void internal_release(buffer_operation *op) {
1362         this->release_front();
1363         op->status.store(SUCCEEDED, std::memory_order_release);
1364     }
1365 
1366 public:
1367     //! Constructor
1368     __TBB_NOINLINE_SYM explicit buffer_node( graph &g )
1369         : graph_node(g), reservable_item_buffer<T, internals_allocator>(), receiver<T>(),
1370           sender<T>(), my_successors(this), forwarder_busy(false)
1371     {
1372         my_aggregator.initialize_handler(handler_type(this));
1373         fgt_node( CODEPTR(), FLOW_BUFFER_NODE, &this->my_graph,
1374                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1375     }
1376 
1377 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1378     template <typename... Args>
1379     buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) {
1380         make_edges_in_order(nodes, *this);
1381     }
1382 #endif
1383 
1384     //! Copy constructor
1385     __TBB_NOINLINE_SYM buffer_node( const buffer_node& src ) : buffer_node(src.my_graph) {}
1386 
1387     //
1388     // message sender implementation
1389     //
1390 
1391     //! Adds a new successor.
1392     /** Adds successor r to the list of successors; may forward tasks.  */
1393     bool register_successor( successor_type &r ) override {
1394         buffer_operation op_data(reg_succ);
1395         op_data.r = &r;
1396         my_aggregator.execute(&op_data);
1397         (void)enqueue_forwarding_task(op_data);
1398         return true;
1399     }
1400 
1401     //! Removes a successor.
1402     /** Removes successor r from the list of successors.
1403         It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
1404     bool remove_successor( successor_type &r ) override {
1405         // TODO revamp: investigate why full qualification is necessary here
1406         tbb::detail::d1::remove_predecessor(r, *this);
1407         buffer_operation op_data(rem_succ);
1408         op_data.r = &r;
1409         my_aggregator.execute(&op_data);
1410         // even though this operation does not cause a forward, if we are the handler, and
1411         // a forward is scheduled, we may be the first to reach this point after the aggregator,
1412         // and so should check for the task.
1413         (void)enqueue_forwarding_task(op_data);
1414         return true;
1415     }
1416 
1417     //! Request an item from the buffer_node
1418     /**  true = v contains the returned item<BR>
1419          false = no item has been returned */
1420     bool try_get( T &v ) override {
1421         buffer_operation op_data(req_item);
1422         op_data.elem = &v;
1423         my_aggregator.execute(&op_data);
1424         (void)enqueue_forwarding_task(op_data);
1425         return (op_data.status==SUCCEEDED);
1426     }
1427 
1428     //! Reserves an item.
1429     /**  false = no item can be reserved<BR>
1430          true = an item is reserved */
1431     bool try_reserve( T &v ) override {
1432         buffer_operation op_data(res_item);
1433         op_data.elem = &v;
1434         my_aggregator.execute(&op_data);
1435         (void)enqueue_forwarding_task(op_data);
1436         return (op_data.status==SUCCEEDED);
1437     }
1438 
1439     //! Release a reserved item.
1440     /**  true = item has been released and so remains in sender */
1441     bool try_release() override {
1442         buffer_operation op_data(rel_res);
1443         my_aggregator.execute(&op_data);
1444         (void)enqueue_forwarding_task(op_data);
1445         return true;
1446     }
1447 
1448     //! Consumes a reserved item.
1449     /** true = item is removed from sender and reservation removed */
1450     bool try_consume() override {
1451         buffer_operation op_data(con_res);
1452         my_aggregator.execute(&op_data);
1453         (void)enqueue_forwarding_task(op_data);
1454         return true;
1455     }
1456 
1457 protected:
1458 
1459     template< typename R, typename B > friend class run_and_put_task;
1460     template<typename X, typename Y> friend class broadcast_cache;
1461     template<typename X, typename Y> friend class round_robin_cache;
1462     //! receive an item, return a task *if possible
1463     graph_task *try_put_task(const T &t) override {
1464         buffer_operation op_data(t, put_item);
1465         my_aggregator.execute(&op_data);
1466         graph_task *ft = grab_forwarding_task(op_data);
1467         // sequencer_nodes can return failure (if an item has been previously inserted)
1468         // We have to spawn the returned task if our own operation fails.
1469 
1470         if(ft && op_data.status ==FAILED) {
1471             // we haven't succeeded queueing the item, but for some reason the
1472             // call returned a task (if another request resulted in a successful
1473             // forward this could happen.)  Queue the task and reset the pointer.
1474             spawn_in_graph_arena(graph_reference(), *ft); ft = nullptr;
1475         }
1476         else if(!ft && op_data.status ==SUCCEEDED) {
1477             ft = SUCCESSFULLY_ENQUEUED;
1478         }
1479         return ft;
1480     }
1481 
1482     graph& graph_reference() const override {
1483         return my_graph;
1484     }
1485 
1486 protected:
1487     void reset_node( reset_flags f) override {
1488         reservable_item_buffer<T, internals_allocator>::reset();
1489         // TODO: just clear structures
1490         if (f&rf_clear_edges) {
1491             my_successors.clear();
1492         }
1493         forwarder_busy = false;
1494     }
1495 };  // buffer_node
1496 
1497 //! Forwards messages in FIFO order
1498 template <typename T>
1499 class queue_node : public buffer_node<T> {
1500 protected:
1501     typedef buffer_node<T> base_type;
1502     typedef typename base_type::size_type size_type;
1503     typedef typename base_type::buffer_operation queue_operation;
1504     typedef queue_node class_type;
1505 
1506 private:
1507     template<typename> friend class buffer_node;
1508 
1509     bool is_item_valid() {
1510         return this->my_item_valid(this->my_head);
1511     }
1512 
1513     void try_put_and_add_task(graph_task*& last_task) {
1514         graph_task *new_task = this->my_successors.try_put_task(this->front());
1515         if (new_task) {
1516             // workaround for icc bug
1517             graph& graph_ref = this->graph_reference();
1518             last_task = combine_tasks(graph_ref, last_task, new_task);
1519             this->destroy_front();
1520         }
1521     }
1522 
1523 protected:
1524     void internal_forward_task(queue_operation *op) override {
1525         this->internal_forward_task_impl(op, this);
1526     }
1527 
1528     void internal_pop(queue_operation *op) override {
1529         if ( this->my_reserved || !this->my_item_valid(this->my_head)){
1530             op->status.store(FAILED, std::memory_order_release);
1531         }
1532         else {
1533             this->pop_front(*(op->elem));
1534             op->status.store(SUCCEEDED, std::memory_order_release);
1535         }
1536     }
1537     void internal_reserve(queue_operation *op) override {
1538         if (this->my_reserved || !this->my_item_valid(this->my_head)) {
1539             op->status.store(FAILED, std::memory_order_release);
1540         }
1541         else {
1542             this->reserve_front(*(op->elem));
1543             op->status.store(SUCCEEDED, std::memory_order_release);
1544         }
1545     }
1546     void internal_consume(queue_operation *op) override {
1547         this->consume_front();
1548         op->status.store(SUCCEEDED, std::memory_order_release);
1549     }
1550 
1551 public:
1552     typedef T input_type;
1553     typedef T output_type;
1554     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1555     typedef typename sender<output_type>::successor_type successor_type;
1556 
1557     //! Constructor
1558     __TBB_NOINLINE_SYM explicit queue_node( graph &g ) : base_type(g) {
1559         fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph),
1560                                  static_cast<receiver<input_type> *>(this),
1561                                  static_cast<sender<output_type> *>(this) );
1562     }
1563 
1564 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1565     template <typename... Args>
1566     queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) {
1567         make_edges_in_order(nodes, *this);
1568     }
1569 #endif
1570 
1571     //! Copy constructor
1572     __TBB_NOINLINE_SYM queue_node( const queue_node& src) : base_type(src) {
1573         fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph),
1574                                  static_cast<receiver<input_type> *>(this),
1575                                  static_cast<sender<output_type> *>(this) );
1576     }
1577 
1578 
1579 protected:
1580     void reset_node( reset_flags f) override {
1581         base_type::reset_node(f);
1582     }
1583 };  // queue_node
1584 
1585 //! Forwards messages in sequence order
1586 template <typename T>
1587     __TBB_requires(std::copyable<T>)
1588 class sequencer_node : public queue_node<T> {
1589     function_body< T, size_t > *my_sequencer;
1590     // my_sequencer should be a benign function and must be callable
1591     // from a parallel context.  Does this mean it needn't be reset?
1592 public:
1593     typedef T input_type;
1594     typedef T output_type;
1595     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1596     typedef typename sender<output_type>::successor_type successor_type;
1597 
1598     //! Constructor
1599     template< typename Sequencer >
1600         __TBB_requires(sequencer<Sequencer, T>)
1601     __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T>(g),
1602         my_sequencer(new function_body_leaf< T, size_t, Sequencer>(s) ) {
1603         fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph),
1604                                  static_cast<receiver<input_type> *>(this),
1605                                  static_cast<sender<output_type> *>(this) );
1606     }
1607 
1608 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1609     template <typename Sequencer, typename... Args>
1610         __TBB_requires(sequencer<Sequencer, T>)
1611     sequencer_node( const node_set<Args...>& nodes, const Sequencer& s)
1612         : sequencer_node(nodes.graph_reference(), s) {
1613         make_edges_in_order(nodes, *this);
1614     }
1615 #endif
1616 
1617     //! Copy constructor
1618     __TBB_NOINLINE_SYM sequencer_node( const sequencer_node& src ) : queue_node<T>(src),
1619         my_sequencer( src.my_sequencer->clone() ) {
1620         fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph),
1621                                  static_cast<receiver<input_type> *>(this),
1622                                  static_cast<sender<output_type> *>(this) );
1623     }
1624 
1625     //! Destructor
1626     ~sequencer_node() { delete my_sequencer; }
1627 
1628 protected:
1629     typedef typename buffer_node<T>::size_type size_type;
1630     typedef typename buffer_node<T>::buffer_operation sequencer_operation;
1631 
1632 private:
1633     bool internal_push(sequencer_operation *op) override {
1634         size_type tag = (*my_sequencer)(*(op->elem));
1635 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
1636         if (tag < this->my_head) {
1637             // have already emitted a message with this tag
1638             op->status.store(FAILED, std::memory_order_release);
1639             return false;
1640         }
1641 #endif
1642         // cannot modify this->my_tail now; the buffer would be inconsistent.
1643         size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
1644 
1645         if (this->size(new_tail) > this->capacity()) {
1646             this->grow_my_array(this->size(new_tail));
1647         }
1648         this->my_tail = new_tail;
1649 
1650         const op_stat res = this->place_item(tag, *(op->elem)) ? SUCCEEDED : FAILED;
1651         op->status.store(res, std::memory_order_release);
1652         return res ==SUCCEEDED;
1653     }
1654 };  // sequencer_node
1655 
1656 //! Forwards messages in priority order
1657 template<typename T, typename Compare = std::less<T>>
1658 class priority_queue_node : public buffer_node<T> {
1659 public:
1660     typedef T input_type;
1661     typedef T output_type;
1662     typedef buffer_node<T> base_type;
1663     typedef priority_queue_node class_type;
1664     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1665     typedef typename sender<output_type>::successor_type successor_type;
1666 
1667     //! Constructor
1668     __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() )
1669         : buffer_node<T>(g), compare(comp), mark(0) {
1670         fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
1671                                  static_cast<receiver<input_type> *>(this),
1672                                  static_cast<sender<output_type> *>(this) );
1673     }
1674 
1675 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1676     template <typename... Args>
1677     priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare())
1678         : priority_queue_node(nodes.graph_reference(), comp) {
1679         make_edges_in_order(nodes, *this);
1680     }
1681 #endif
1682 
1683     //! Copy constructor
1684     __TBB_NOINLINE_SYM priority_queue_node( const priority_queue_node &src )
1685         : buffer_node<T>(src), mark(0)
1686     {
1687         fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
1688                                  static_cast<receiver<input_type> *>(this),
1689                                  static_cast<sender<output_type> *>(this) );
1690     }
1691 
1692 protected:
1693 
1694     void reset_node( reset_flags f) override {
1695         mark = 0;
1696         base_type::reset_node(f);
1697     }
1698 
1699     typedef typename buffer_node<T>::size_type size_type;
1700     typedef typename buffer_node<T>::item_type item_type;
1701     typedef typename buffer_node<T>::buffer_operation prio_operation;
1702 
1703     //! Tries to forward valid items to successors
1704     void internal_forward_task(prio_operation *op) override {
1705         this->internal_forward_task_impl(op, this);
1706     }
1707 
1708     void handle_operations(prio_operation *op_list) override {
1709         this->handle_operations_impl(op_list, this);
1710     }
1711 
1712     bool internal_push(prio_operation *op) override {
1713         prio_push(*(op->elem));
1714         op->status.store(SUCCEEDED, std::memory_order_release);
1715         return true;
1716     }
1717 
1718     void internal_pop(prio_operation *op) override {
1719         // if empty or already reserved, don't pop
1720         if ( this->my_reserved == true || this->my_tail == 0 ) {
1721             op->status.store(FAILED, std::memory_order_release);
1722             return;
1723         }
1724 
1725         *(op->elem) = prio();
1726         op->status.store(SUCCEEDED, std::memory_order_release);
1727         prio_pop();
1728 
1729     }
1730 
1731     // pops the highest-priority item, saves copy
1732     void internal_reserve(prio_operation *op) override {
1733         if (this->my_reserved == true || this->my_tail == 0) {
1734             op->status.store(FAILED, std::memory_order_release);
1735             return;
1736         }
1737         this->my_reserved = true;
1738         *(op->elem) = prio();
1739         reserved_item = *(op->elem);
1740         op->status.store(SUCCEEDED, std::memory_order_release);
1741         prio_pop();
1742     }
1743 
1744     void internal_consume(prio_operation *op) override {
1745         op->status.store(SUCCEEDED, std::memory_order_release);
1746         this->my_reserved = false;
1747         reserved_item = input_type();
1748     }
1749 
1750     void internal_release(prio_operation *op) override {
1751         op->status.store(SUCCEEDED, std::memory_order_release);
1752         prio_push(reserved_item);
1753         this->my_reserved = false;
1754         reserved_item = input_type();
1755     }
1756 
1757 private:
1758     template<typename> friend class buffer_node;
1759 
1760     void order() {
1761         if (mark < this->my_tail) heapify();
1762         __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
1763     }
1764 
1765     bool is_item_valid() {
1766         return this->my_tail > 0;
1767     }
1768 
1769     void try_put_and_add_task(graph_task*& last_task) {
1770         graph_task * new_task = this->my_successors.try_put_task(this->prio());
1771         if (new_task) {
1772             // workaround for icc bug
1773             graph& graph_ref = this->graph_reference();
1774             last_task = combine_tasks(graph_ref, last_task, new_task);
1775             prio_pop();
1776         }
1777     }
1778 
1779 private:
1780     Compare compare;
1781     size_type mark;
1782 
1783     input_type reserved_item;
1784 
1785     // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
1786     bool prio_use_tail() {
1787         __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
1788         return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
1789     }
1790 
1791     // prio_push: checks that the item will fit, expand array if necessary, put at end
1792     void prio_push(const T &src) {
1793         if ( this->my_tail >= this->my_array_size )
1794             this->grow_my_array( this->my_tail + 1 );
1795         (void) this->place_item(this->my_tail, src);
1796         ++(this->my_tail);
1797         __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
1798     }
1799 
1800     // prio_pop: deletes highest priority item from the array, and if it is item
1801     // 0, move last item to 0 and reheap.  If end of array, just destroy and decrement tail
1802     // and mark.  Assumes the array has already been tested for emptiness; no failure.
1803     void prio_pop()  {
1804         if (prio_use_tail()) {
1805             // there are newly pushed elements; last one higher than top
1806             // copy the data
1807             this->destroy_item(this->my_tail-1);
1808             --(this->my_tail);
1809             __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
1810             return;
1811         }
1812         this->destroy_item(0);
1813         if(this->my_tail > 1) {
1814             // push the last element down heap
1815             __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), nullptr);
1816             this->move_item(0,this->my_tail - 1);
1817         }
1818         --(this->my_tail);
1819         if(mark > this->my_tail) --mark;
1820         if (this->my_tail > 1) // don't reheap for heap of size 1
1821             reheap();
1822         __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
1823     }
1824 
1825     const T& prio() {
1826         return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
1827     }
1828 
1829     // turn array into heap
1830     void heapify() {
1831         if(this->my_tail == 0) {
1832             mark = 0;
1833             return;
1834         }
1835         if (!mark) mark = 1;
1836         for (; mark<this->my_tail; ++mark) { // for each unheaped element
1837             size_type cur_pos = mark;
1838             input_type to_place;
1839             this->fetch_item(mark,to_place);
1840             do { // push to_place up the heap
1841                 size_type parent = (cur_pos-1)>>1;
1842                 if (!compare(this->get_my_item(parent), to_place))
1843                     break;
1844                 this->move_item(cur_pos, parent);
1845                 cur_pos = parent;
1846             } while( cur_pos );
1847             (void) this->place_item(cur_pos, to_place);
1848         }
1849     }
1850 
1851     // otherwise heapified array with new root element; rearrange to heap
1852     void reheap() {
1853         size_type cur_pos=0, child=1;
1854         while (child < mark) {
1855             size_type target = child;
1856             if (child+1<mark &&
1857                 compare(this->get_my_item(child),
1858                         this->get_my_item(child+1)))
1859                 ++target;
1860             // target now has the higher priority child
1861             if (compare(this->get_my_item(target),
1862                         this->get_my_item(cur_pos)))
1863                 break;
1864             // swap
1865             this->swap_items(cur_pos, target);
1866             cur_pos = target;
1867             child = (cur_pos<<1)+1;
1868         }
1869     }
1870 };  // priority_queue_node
1871 
1872 //! Forwards messages only if the threshold has not been reached
1873 /** This node forwards items until its threshold is reached.
1874     It contains no buffering.  If the downstream node rejects, the
1875     message is dropped. */
1876 template< typename T, typename DecrementType=continue_msg >
1877 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
1878 public:
1879     typedef T input_type;
1880     typedef T output_type;
1881     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1882     typedef typename sender<output_type>::successor_type successor_type;
1883     //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
1884 
1885 private:
1886     size_t my_threshold;
1887     size_t my_count; // number of successful puts
1888     size_t my_tries; // number of active put attempts
1889     size_t my_future_decrement; // number of active decrement
1890     reservable_predecessor_cache< T, spin_mutex > my_predecessors;
1891     spin_mutex my_mutex;
1892     broadcast_cache< T > my_successors;
1893 
1894     //! The internal receiver< DecrementType > that adjusts the count
1895     threshold_regulator< limiter_node<T, DecrementType>, DecrementType > decrement;
1896 
1897     graph_task* decrement_counter( long long delta ) {
1898         if ( delta > 0 && size_t(delta) > my_threshold ) {
1899             delta = my_threshold;
1900         }
1901 
1902         {
1903             spin_mutex::scoped_lock lock(my_mutex);
1904             if ( delta > 0 && size_t(delta) > my_count ) {
1905                 if( my_tries > 0 ) {
1906                     my_future_decrement += (size_t(delta) - my_count);
1907                 }
1908                 my_count = 0;
1909             }
1910             else if ( delta < 0 && size_t(-delta) > my_threshold - my_count ) {
1911                 my_count = my_threshold;
1912             }
1913             else {
1914                 my_count -= size_t(delta); // absolute value of delta is sufficiently small
1915             }
1916             __TBB_ASSERT(my_count <= my_threshold, "counter values are truncated to be inside the [0, threshold] interval");
1917         }
1918         return forward_task();
1919     }
1920 
1921     // Let threshold_regulator call decrement_counter()
1922     friend class threshold_regulator< limiter_node<T, DecrementType>, DecrementType >;
1923 
1924     friend class forward_task_bypass< limiter_node<T,DecrementType> >;
1925 
1926     bool check_conditions() {  // always called under lock
1927         return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
1928     }
1929 
1930     // only returns a valid task pointer or nullptr, never SUCCESSFULLY_ENQUEUED
1931     graph_task* forward_task() {
1932         input_type v;
1933         graph_task* rval = nullptr;
1934         bool reserved = false;
1935 
1936         {
1937             spin_mutex::scoped_lock lock(my_mutex);
1938             if ( check_conditions() )
1939                 ++my_tries;
1940             else
1941                 return nullptr;
1942         }
1943 
1944         //SUCCESS
1945         // if we can reserve and can put, we consume the reservation
1946         // we increment the count and decrement the tries
1947         if ( (my_predecessors.try_reserve(v)) == true ) {
1948             reserved = true;
1949             if ( (rval = my_successors.try_put_task(v)) != nullptr ) {
1950                 {
1951                     spin_mutex::scoped_lock lock(my_mutex);
1952                     ++my_count;
1953                     if ( my_future_decrement ) {
1954                         if ( my_count > my_future_decrement ) {
1955                             my_count -= my_future_decrement;
1956                             my_future_decrement = 0;
1957                         }
1958                         else {
1959                             my_future_decrement -= my_count;
1960                             my_count = 0;
1961                         }
1962                     }
1963                     --my_tries;
1964                     my_predecessors.try_consume();
1965                     if ( check_conditions() ) {
1966                         if ( is_graph_active(this->my_graph) ) {
1967                             typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
1968                             small_object_allocator allocator{};
1969                             graph_task* rtask = allocator.new_object<task_type>( my_graph, allocator, *this );
1970                             my_graph.reserve_wait();
1971                             spawn_in_graph_arena(graph_reference(), *rtask);
1972                         }
1973                     }
1974                 }
1975                 return rval;
1976             }
1977         }
1978         //FAILURE
1979         //if we can't reserve, we decrement the tries
1980         //if we can reserve but can't put, we decrement the tries and release the reservation
1981         {
1982             spin_mutex::scoped_lock lock(my_mutex);
1983             --my_tries;
1984             if (reserved) my_predecessors.try_release();
1985             if ( check_conditions() ) {
1986                 if ( is_graph_active(this->my_graph) ) {
1987                     small_object_allocator allocator{};
1988                     typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
1989                     graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
1990                     my_graph.reserve_wait();
1991                     __TBB_ASSERT(!rval, "Have two tasks to handle");
1992                     return t;
1993                 }
1994             }
1995             return rval;
1996         }
1997     }
1998 
1999     void initialize() {
2000         fgt_node(
2001             CODEPTR(), FLOW_LIMITER_NODE, &this->my_graph,
2002             static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
2003             static_cast<sender<output_type> *>(this)
2004         );
2005     }
2006 
2007 public:
2008     //! Constructor
2009     limiter_node(graph &g, size_t threshold)
2010         : graph_node(g), my_threshold(threshold), my_count(0), my_tries(0), my_future_decrement(0),
2011         my_predecessors(this), my_successors(this), decrement(this)
2012     {
2013         initialize();
2014     }
2015 
2016 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2017     template <typename... Args>
2018     limiter_node(const node_set<Args...>& nodes, size_t threshold)
2019         : limiter_node(nodes.graph_reference(), threshold) {
2020         make_edges_in_order(nodes, *this);
2021     }
2022 #endif
2023 
2024     //! Copy constructor
2025     limiter_node( const limiter_node& src ) : limiter_node(src.my_graph, src.my_threshold) {}
2026 
2027     //! The interface for accessing internal receiver< DecrementType > that adjusts the count
2028     receiver<DecrementType>& decrementer() { return decrement; }
2029 
2030     //! Replace the current successor with this new successor
2031     bool register_successor( successor_type &r ) override {
2032         spin_mutex::scoped_lock lock(my_mutex);
2033         bool was_empty = my_successors.empty();
2034         my_successors.register_successor(r);
2035         //spawn a forward task if this is the only successor
2036         if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
2037             if ( is_graph_active(this->my_graph) ) {
2038                 small_object_allocator allocator{};
2039                 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2040                 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
2041                 my_graph.reserve_wait();
2042                 spawn_in_graph_arena(graph_reference(), *t);
2043             }
2044         }
2045         return true;
2046     }
2047 
2048     //! Removes a successor from this node
2049     /** r.remove_predecessor(*this) is also called. */
2050     bool remove_successor( successor_type &r ) override {
2051         // TODO revamp: investigate why qualification is needed for remove_predecessor() call
2052         tbb::detail::d1::remove_predecessor(r, *this);
2053         my_successors.remove_successor(r);
2054         return true;
2055     }
2056 
2057     //! Adds src to the list of cached predecessors.
2058     bool register_predecessor( predecessor_type &src ) override {
2059         spin_mutex::scoped_lock lock(my_mutex);
2060         my_predecessors.add( src );
2061         if ( my_count + my_tries < my_threshold && !my_successors.empty() && is_graph_active(this->my_graph) ) {
2062             small_object_allocator allocator{};
2063             typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2064             graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
2065             my_graph.reserve_wait();
2066             spawn_in_graph_arena(graph_reference(), *t);
2067         }
2068         return true;
2069     }
2070 
2071     //! Removes src from the list of cached predecessors.
2072     bool remove_predecessor( predecessor_type &src ) override {
2073         my_predecessors.remove( src );
2074         return true;
2075     }
2076 
2077 protected:
2078 
2079     template< typename R, typename B > friend class run_and_put_task;
2080     template<typename X, typename Y> friend class broadcast_cache;
2081     template<typename X, typename Y> friend class round_robin_cache;
2082     //! Puts an item to this receiver
2083     graph_task* try_put_task( const T &t ) override {
2084         {
2085             spin_mutex::scoped_lock lock(my_mutex);
2086             if ( my_count + my_tries >= my_threshold )
2087                 return nullptr;
2088             else
2089                 ++my_tries;
2090         }
2091 
2092         graph_task* rtask = my_successors.try_put_task(t);
2093         if ( !rtask ) {  // try_put_task failed.
2094             spin_mutex::scoped_lock lock(my_mutex);
2095             --my_tries;
2096             if (check_conditions() && is_graph_active(this->my_graph)) {
2097                 small_object_allocator allocator{};
2098                 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2099                 rtask = allocator.new_object<task_type>(my_graph, allocator, *this);
2100                 my_graph.reserve_wait();
2101             }
2102         }
2103         else {
2104             spin_mutex::scoped_lock lock(my_mutex);
2105             ++my_count;
2106             if ( my_future_decrement ) {
2107                 if ( my_count > my_future_decrement ) {
2108                     my_count -= my_future_decrement;
2109                     my_future_decrement = 0;
2110                 }
2111                 else {
2112                     my_future_decrement -= my_count;
2113                     my_count = 0;
2114                 }
2115             }
2116             --my_tries;
2117         }
2118         return rtask;
2119     }
2120 
2121     graph& graph_reference() const override { return my_graph; }
2122 
2123     void reset_node( reset_flags f ) override {
2124         my_count = 0;
2125         if ( f & rf_clear_edges ) {
2126             my_predecessors.clear();
2127             my_successors.clear();
2128         }
2129         else {
2130             my_predecessors.reset();
2131         }
2132         decrement.reset_receiver(f);
2133     }
2134 };  // limiter_node
2135 
2136 #include "detail/_flow_graph_join_impl.h"
2137 
2138 template<typename OutputTuple, typename JP=queueing> class join_node;
2139 
2140 template<typename OutputTuple>
2141 class join_node<OutputTuple,reserving>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
2142 private:
2143     static const int N = std::tuple_size<OutputTuple>::value;
2144     typedef unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
2145 public:
2146     typedef OutputTuple output_type;
2147     typedef typename unfolded_type::input_ports_type input_ports_type;
2148      __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
2149         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2150                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2151     }
2152 
2153 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2154     template <typename... Args>
2155     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) {
2156         make_edges_in_order(nodes, *this);
2157     }
2158 #endif
2159 
2160     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2161         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2162                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2163     }
2164 
2165 };
2166 
2167 template<typename OutputTuple>
2168 class join_node<OutputTuple,queueing>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
2169 private:
2170     static const int N = std::tuple_size<OutputTuple>::value;
2171     typedef unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
2172 public:
2173     typedef OutputTuple output_type;
2174     typedef typename unfolded_type::input_ports_type input_ports_type;
2175      __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
2176         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2177                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2178     }
2179 
2180 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2181     template <typename... Args>
2182     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) {
2183         make_edges_in_order(nodes, *this);
2184     }
2185 #endif
2186 
2187     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2188         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2189                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2190     }
2191 
2192 };
2193 
2194 #if __TBB_CPP20_CONCEPTS_PRESENT
2195 // Helper function which is well-formed only if all of the elements in OutputTuple
2196 // satisfies join_node_function_object<body[i], tuple[i], K>
2197 template <typename OutputTuple, typename K,
2198           typename... Functions, std::size_t... Idx>
2199 void join_node_function_objects_helper( std::index_sequence<Idx...> )
2200     requires (std::tuple_size_v<OutputTuple> == sizeof...(Functions)) &&
2201              (... && join_node_function_object<Functions, std::tuple_element_t<Idx, OutputTuple>, K>);
2202 
2203 template <typename OutputTuple, typename K, typename... Functions>
2204 concept join_node_functions = requires {
2205     join_node_function_objects_helper<OutputTuple, K, Functions...>(std::make_index_sequence<sizeof...(Functions)>{});
2206 };
2207 
2208 #endif
2209 
2210 // template for key_matching join_node
2211 // tag_matching join_node is a specialization of key_matching, and is source-compatible.
2212 template<typename OutputTuple, typename K, typename KHash>
2213 class join_node<OutputTuple, key_matching<K, KHash> > : public unfolded_join_node<std::tuple_size<OutputTuple>::value,
2214       key_matching_port, OutputTuple, key_matching<K,KHash> > {
2215 private:
2216     static const int N = std::tuple_size<OutputTuple>::value;
2217     typedef unfolded_join_node<N, key_matching_port, OutputTuple, key_matching<K,KHash> > unfolded_type;
2218 public:
2219     typedef OutputTuple output_type;
2220     typedef typename unfolded_type::input_ports_type input_ports_type;
2221 
2222 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
2223     join_node(graph &g) : unfolded_type(g) {}
2224 #endif  /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
2225 
2226     template<typename __TBB_B0, typename __TBB_B1>
2227         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1>)
2228      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
2229         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2230                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2231     }
2232     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
2233         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2>)
2234      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
2235         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2236                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2237     }
2238     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
2239         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3>)
2240      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
2241         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2242                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2243     }
2244     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
2245         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4>)
2246      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
2247             unfolded_type(g, b0, b1, b2, b3, b4) {
2248         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2249                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2250     }
2251 #if __TBB_VARIADIC_MAX >= 6
2252     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2253         typename __TBB_B5>
2254         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5>)
2255      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
2256             unfolded_type(g, b0, b1, b2, b3, b4, b5) {
2257         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2258                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2259     }
2260 #endif
2261 #if __TBB_VARIADIC_MAX >= 7
2262     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2263         typename __TBB_B5, typename __TBB_B6>
2264         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6>)
2265      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
2266             unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
2267         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2268                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2269     }
2270 #endif
2271 #if __TBB_VARIADIC_MAX >= 8
2272     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2273         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
2274         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7>)
2275      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2276             __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
2277         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2278                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2279     }
2280 #endif
2281 #if __TBB_VARIADIC_MAX >= 9
2282     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2283         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
2284         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8>)
2285      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2286             __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
2287         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2288                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2289     }
2290 #endif
2291 #if __TBB_VARIADIC_MAX >= 10
2292     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2293         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
2294         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8, __TBB_B9>)
2295      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2296             __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
2297         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2298                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2299     }
2300 #endif
2301 
2302 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2303     template <
2304 #if (__clang_major__ == 3 && __clang_minor__ == 4)
2305         // clang 3.4 misdeduces 'Args...' for 'node_set' while it can cope with template template parameter.
2306         template<typename...> class node_set,
2307 #endif
2308         typename... Args, typename... Bodies
2309     >
2310     __TBB_requires((sizeof...(Bodies) == 0) || join_node_functions<OutputTuple, K, Bodies...>)
2311     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies)
2312         : join_node(nodes.graph_reference(), bodies...) {
2313         make_edges_in_order(nodes, *this);
2314     }
2315 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2316 
2317     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2318         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2319                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2320     }
2321 
2322 };
2323 
2324 // indexer node
2325 #include "detail/_flow_graph_indexer_impl.h"
2326 
2327 // TODO: Implement interface with variadic template or tuple
2328 template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
2329                       typename T4=null_type, typename T5=null_type, typename T6=null_type,
2330                       typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
2331 
2332 //indexer node specializations
2333 template<typename T0>
2334 class indexer_node<T0> : public unfolded_indexer_node<std::tuple<T0> > {
2335 private:
2336     static const int N = 1;
2337 public:
2338     typedef std::tuple<T0> InputTuple;
2339     typedef tagged_msg<size_t, T0> output_type;
2340     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2341     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2342         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2343                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2344     }
2345 
2346 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2347     template <typename... Args>
2348     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2349         make_edges_in_order(nodes, *this);
2350     }
2351 #endif
2352 
2353     // Copy constructor
2354     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2355         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2356                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2357     }
2358 };
2359 
2360 template<typename T0, typename T1>
2361 class indexer_node<T0, T1> : public unfolded_indexer_node<std::tuple<T0, T1> > {
2362 private:
2363     static const int N = 2;
2364 public:
2365     typedef std::tuple<T0, T1> InputTuple;
2366     typedef tagged_msg<size_t, T0, T1> output_type;
2367     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2368     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2369         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2370                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2371     }
2372 
2373 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2374     template <typename... Args>
2375     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2376         make_edges_in_order(nodes, *this);
2377     }
2378 #endif
2379 
2380     // Copy constructor
2381     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2382         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2383                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2384     }
2385 
2386 };
2387 
2388 template<typename T0, typename T1, typename T2>
2389 class indexer_node<T0, T1, T2> : public unfolded_indexer_node<std::tuple<T0, T1, T2> > {
2390 private:
2391     static const int N = 3;
2392 public:
2393     typedef std::tuple<T0, T1, T2> InputTuple;
2394     typedef tagged_msg<size_t, T0, T1, T2> output_type;
2395     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2396     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2397         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2398                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2399     }
2400 
2401 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2402     template <typename... Args>
2403     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2404         make_edges_in_order(nodes, *this);
2405     }
2406 #endif
2407 
2408     // Copy constructor
2409     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2410         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2411                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2412     }
2413 
2414 };
2415 
2416 template<typename T0, typename T1, typename T2, typename T3>
2417 class indexer_node<T0, T1, T2, T3> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3> > {
2418 private:
2419     static const int N = 4;
2420 public:
2421     typedef std::tuple<T0, T1, T2, T3> InputTuple;
2422     typedef tagged_msg<size_t, T0, T1, T2, T3> output_type;
2423     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2424     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2425         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2426                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2427     }
2428 
2429 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2430     template <typename... Args>
2431     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2432         make_edges_in_order(nodes, *this);
2433     }
2434 #endif
2435 
2436     // Copy constructor
2437     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2438         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2439                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2440     }
2441 
2442 };
2443 
2444 template<typename T0, typename T1, typename T2, typename T3, typename T4>
2445 class indexer_node<T0, T1, T2, T3, T4> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4> > {
2446 private:
2447     static const int N = 5;
2448 public:
2449     typedef std::tuple<T0, T1, T2, T3, T4> InputTuple;
2450     typedef tagged_msg<size_t, T0, T1, T2, T3, T4> output_type;
2451     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2452     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2453         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2454                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2455     }
2456 
2457 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2458     template <typename... Args>
2459     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2460         make_edges_in_order(nodes, *this);
2461     }
2462 #endif
2463 
2464     // Copy constructor
2465     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2466         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2467                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2468     }
2469 
2470 };
2471 
2472 #if __TBB_VARIADIC_MAX >= 6
2473 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
2474 class indexer_node<T0, T1, T2, T3, T4, T5> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5> > {
2475 private:
2476     static const int N = 6;
2477 public:
2478     typedef std::tuple<T0, T1, T2, T3, T4, T5> InputTuple;
2479     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5> output_type;
2480     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2481     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2482         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2483                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2484     }
2485 
2486 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2487     template <typename... Args>
2488     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2489         make_edges_in_order(nodes, *this);
2490     }
2491 #endif
2492 
2493     // Copy constructor
2494     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2495         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2496                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2497     }
2498 
2499 };
2500 #endif //variadic max 6
2501 
2502 #if __TBB_VARIADIC_MAX >= 7
2503 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2504          typename T6>
2505 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6> > {
2506 private:
2507     static const int N = 7;
2508 public:
2509     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
2510     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6> output_type;
2511     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2512     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2513         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2514                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2515     }
2516 
2517 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2518     template <typename... Args>
2519     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2520         make_edges_in_order(nodes, *this);
2521     }
2522 #endif
2523 
2524     // Copy constructor
2525     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2526         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2527                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2528     }
2529 
2530 };
2531 #endif //variadic max 7
2532 
2533 #if __TBB_VARIADIC_MAX >= 8
2534 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2535          typename T6, typename T7>
2536 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
2537 private:
2538     static const int N = 8;
2539 public:
2540     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
2541     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7> output_type;
2542     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2543     indexer_node(graph& g) : unfolded_type(g) {
2544         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2545                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2546     }
2547 
2548 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2549     template <typename... Args>
2550     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2551         make_edges_in_order(nodes, *this);
2552     }
2553 #endif
2554 
2555     // Copy constructor
2556     indexer_node( const indexer_node& other ) : unfolded_type(other) {
2557         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2558                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2559     }
2560 
2561 };
2562 #endif //variadic max 8
2563 
2564 #if __TBB_VARIADIC_MAX >= 9
2565 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2566          typename T6, typename T7, typename T8>
2567 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
2568 private:
2569     static const int N = 9;
2570 public:
2571     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
2572     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8> output_type;
2573     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2574     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2575         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2576                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2577     }
2578 
2579 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2580     template <typename... Args>
2581     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2582         make_edges_in_order(nodes, *this);
2583     }
2584 #endif
2585 
2586     // Copy constructor
2587     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2588         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2589                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2590     }
2591 
2592 };
2593 #endif //variadic max 9
2594 
2595 #if __TBB_VARIADIC_MAX >= 10
2596 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2597          typename T6, typename T7, typename T8, typename T9>
2598 class indexer_node/*default*/ : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
2599 private:
2600     static const int N = 10;
2601 public:
2602     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
2603     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
2604     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2605     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2606         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2607                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2608     }
2609 
2610 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2611     template <typename... Args>
2612     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2613         make_edges_in_order(nodes, *this);
2614     }
2615 #endif
2616 
2617     // Copy constructor
2618     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2619         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2620                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2621     }
2622 
2623 };
2624 #endif //variadic max 10
2625 
2626 template< typename T >
2627 inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
2628     register_successor(p, s);
2629     fgt_make_edge( &p, &s );
2630 }
2631 
2632 //! Makes an edge between a single predecessor and a single successor
2633 template< typename T >
2634 inline void make_edge( sender<T> &p, receiver<T> &s ) {
2635     internal_make_edge( p, s );
2636 }
2637 
2638 //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
2639 template< typename T, typename V,
2640           typename = typename T::output_ports_type, typename = typename V::input_ports_type >
2641 inline void make_edge( T& output, V& input) {
2642     make_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports()));
2643 }
2644 
2645 //Makes an edge from port 0 of a multi-output predecessor to a receiver.
2646 template< typename T, typename R,
2647           typename = typename T::output_ports_type >
2648 inline void make_edge( T& output, receiver<R>& input) {
2649      make_edge(std::get<0>(output.output_ports()), input);
2650 }
2651 
2652 //Makes an edge from a sender to port 0 of a multi-input successor.
2653 template< typename S,  typename V,
2654           typename = typename V::input_ports_type >
2655 inline void make_edge( sender<S>& output, V& input) {
2656      make_edge(output, std::get<0>(input.input_ports()));
2657 }
2658 
2659 template< typename T >
2660 inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
2661     remove_successor( p, s );
2662     fgt_remove_edge( &p, &s );
2663 }
2664 
2665 //! Removes an edge between a single predecessor and a single successor
2666 template< typename T >
2667 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
2668     internal_remove_edge( p, s );
2669 }
2670 
2671 //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
2672 template< typename T, typename V,
2673           typename = typename T::output_ports_type, typename = typename V::input_ports_type >
2674 inline void remove_edge( T& output, V& input) {
2675     remove_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports()));
2676 }
2677 
2678 //Removes an edge between port 0 of a multi-output predecessor and a receiver.
2679 template< typename T, typename R,
2680           typename = typename T::output_ports_type >
2681 inline void remove_edge( T& output, receiver<R>& input) {
2682      remove_edge(std::get<0>(output.output_ports()), input);
2683 }
2684 //Removes an edge between a sender and port 0 of a multi-input successor.
2685 template< typename S,  typename V,
2686           typename = typename V::input_ports_type >
2687 inline void remove_edge( sender<S>& output, V& input) {
2688      remove_edge(output, std::get<0>(input.input_ports()));
2689 }
2690 
2691 //! Returns a copy of the body from a function or continue node
2692 template< typename Body, typename Node >
2693 Body copy_body( Node &n ) {
2694     return n.template copy_function_object<Body>();
2695 }
2696 
2697 //composite_node
2698 template< typename InputTuple, typename OutputTuple > class composite_node;
2699 
2700 template< typename... InputTypes, typename... OutputTypes>
2701 class composite_node <std::tuple<InputTypes...>, std::tuple<OutputTypes...> > : public graph_node {
2702 
2703 public:
2704     typedef std::tuple< receiver<InputTypes>&... > input_ports_type;
2705     typedef std::tuple< sender<OutputTypes>&... > output_ports_type;
2706 
2707 private:
2708     std::unique_ptr<input_ports_type> my_input_ports;
2709     std::unique_ptr<output_ports_type> my_output_ports;
2710 
2711     static const size_t NUM_INPUTS = sizeof...(InputTypes);
2712     static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
2713 
2714 protected:
2715     void reset_node(reset_flags) override {}
2716 
2717 public:
2718     composite_node( graph &g ) : graph_node(g) {
2719         fgt_multiinput_multioutput_node( CODEPTR(), FLOW_COMPOSITE_NODE, this, &this->my_graph );
2720     }
2721 
2722     template<typename T1, typename T2>
2723     void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
2724         static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
2725         static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
2726 
2727         fgt_internal_input_alias_helper<T1, NUM_INPUTS>::alias_port( this, input_ports_tuple);
2728         fgt_internal_output_alias_helper<T2, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
2729 
2730         my_input_ports.reset( new input_ports_type(std::forward<T1>(input_ports_tuple)) );
2731         my_output_ports.reset( new output_ports_type(std::forward<T2>(output_ports_tuple)) );
2732     }
2733 
2734     template< typename... NodeTypes >
2735     void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
2736 
2737     template< typename... NodeTypes >
2738     void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
2739 
2740 
2741     input_ports_type& input_ports() {
2742          __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
2743          return *my_input_ports;
2744     }
2745 
2746     output_ports_type& output_ports() {
2747          __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
2748          return *my_output_ports;
2749     }
2750 };  // class composite_node
2751 
2752 //composite_node with only input ports
2753 template< typename... InputTypes>
2754 class composite_node <std::tuple<InputTypes...>, std::tuple<> > : public graph_node {
2755 public:
2756     typedef std::tuple< receiver<InputTypes>&... > input_ports_type;
2757 
2758 private:
2759     std::unique_ptr<input_ports_type> my_input_ports;
2760     static const size_t NUM_INPUTS = sizeof...(InputTypes);
2761 
2762 protected:
2763     void reset_node(reset_flags) override {}
2764 
2765 public:
2766     composite_node( graph &g ) : graph_node(g) {
2767         fgt_composite( CODEPTR(), this, &g );
2768     }
2769 
2770    template<typename T>
2771    void set_external_ports(T&& input_ports_tuple) {
2772        static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
2773 
2774        fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, input_ports_tuple);
2775 
2776        my_input_ports.reset( new input_ports_type(std::forward<T>(input_ports_tuple)) );
2777    }
2778 
2779     template< typename... NodeTypes >
2780     void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
2781 
2782     template< typename... NodeTypes >
2783     void add_nodes( const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
2784 
2785 
2786     input_ports_type& input_ports() {
2787          __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
2788          return *my_input_ports;
2789     }
2790 
2791 };  // class composite_node
2792 
2793 //composite_nodes with only output_ports
2794 template<typename... OutputTypes>
2795 class composite_node <std::tuple<>, std::tuple<OutputTypes...> > : public graph_node {
2796 public:
2797     typedef std::tuple< sender<OutputTypes>&... > output_ports_type;
2798 
2799 private:
2800     std::unique_ptr<output_ports_type> my_output_ports;
2801     static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
2802 
2803 protected:
2804     void reset_node(reset_flags) override {}
2805 
2806 public:
2807     __TBB_NOINLINE_SYM composite_node( graph &g ) : graph_node(g) {
2808         fgt_composite( CODEPTR(), this, &g );
2809     }
2810 
2811    template<typename T>
2812    void set_external_ports(T&& output_ports_tuple) {
2813        static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
2814 
2815        fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
2816 
2817        my_output_ports.reset( new output_ports_type(std::forward<T>(output_ports_tuple)) );
2818    }
2819 
2820     template<typename... NodeTypes >
2821     void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
2822 
2823     template<typename... NodeTypes >
2824     void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
2825 
2826 
2827     output_ports_type& output_ports() {
2828          __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
2829          return *my_output_ports;
2830     }
2831 
2832 };  // class composite_node
2833 
2834 template<typename Gateway>
2835 class async_body_base: no_assign {
2836 public:
2837     typedef Gateway gateway_type;
2838 
2839     async_body_base(gateway_type *gateway): my_gateway(gateway) { }
2840     void set_gateway(gateway_type *gateway) {
2841         my_gateway = gateway;
2842     }
2843 
2844 protected:
2845     gateway_type *my_gateway;
2846 };
2847 
2848 template<typename Input, typename Ports, typename Gateway, typename Body>
2849 class async_body: public async_body_base<Gateway> {
2850 private:
2851     Body my_body;
2852 
2853 public:
2854     typedef async_body_base<Gateway> base_type;
2855     typedef Gateway gateway_type;
2856 
2857     async_body(const Body &body, gateway_type *gateway)
2858         : base_type(gateway), my_body(body) { }
2859 
2860     void operator()( const Input &v, Ports & ) noexcept(noexcept(tbb::detail::invoke(my_body, v, std::declval<gateway_type&>()))) {
2861         tbb::detail::invoke(my_body, v, *this->my_gateway);
2862     }
2863 
2864     Body get_body() { return my_body; }
2865 };
2866 
2867 //! Implements async node
2868 template < typename Input, typename Output,
2869            typename Policy = queueing_lightweight >
2870     __TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input>)
2871 class async_node
2872     : public multifunction_node< Input, std::tuple< Output >, Policy >, public sender< Output >
2873 {
2874     typedef multifunction_node< Input, std::tuple< Output >, Policy > base_type;
2875     typedef multifunction_input<
2876         Input, typename base_type::output_ports_type, Policy, cache_aligned_allocator<Input>> mfn_input_type;
2877 
2878 public:
2879     typedef Input input_type;
2880     typedef Output output_type;
2881     typedef receiver<input_type> receiver_type;
2882     typedef receiver<output_type> successor_type;
2883     typedef sender<input_type> predecessor_type;
2884     typedef receiver_gateway<output_type> gateway_type;
2885     typedef async_body_base<gateway_type> async_body_base_type;
2886     typedef typename base_type::output_ports_type output_ports_type;
2887 
2888 private:
2889     class receiver_gateway_impl: public receiver_gateway<Output> {
2890     public:
2891         receiver_gateway_impl(async_node* node): my_node(node) {}
2892         void reserve_wait() override {
2893             fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
2894             my_node->my_graph.reserve_wait();
2895         }
2896 
2897         void release_wait() override {
2898             async_node* n = my_node;
2899             graph* g = &n->my_graph;
2900             g->release_wait();
2901             fgt_async_commit(static_cast<typename async_node::receiver_type *>(n), g);
2902         }
2903 
2904         //! Implements gateway_type::try_put for an external activity to submit a message to FG
2905         bool try_put(const Output &i) override {
2906             return my_node->try_put_impl(i);
2907         }
2908 
2909     private:
2910         async_node* my_node;
2911     } my_gateway;
2912 
2913     //The substitute of 'this' for member construction, to prevent compiler warnings
2914     async_node* self() { return this; }
2915 
2916     //! Implements gateway_type::try_put for an external activity to submit a message to FG
2917     bool try_put_impl(const Output &i) {
2918         multifunction_output<Output> &port_0 = output_port<0>(*this);
2919         broadcast_cache<output_type>& port_successors = port_0.successors();
2920         fgt_async_try_put_begin(this, &port_0);
2921         // TODO revamp: change to std::list<graph_task*>
2922         graph_task_list tasks;
2923         bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
2924         __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
2925                       "Return status is inconsistent with the method operation." );
2926 
2927         while( !tasks.empty() ) {
2928             enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
2929         }
2930         fgt_async_try_put_end(this, &port_0);
2931         return is_at_least_one_put_successful;
2932     }
2933 
2934 public:
2935     template<typename Body>
2936         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2937     __TBB_NOINLINE_SYM async_node(
2938         graph &g, size_t concurrency,
2939         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
2940     ) : base_type(
2941         g, concurrency,
2942         async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
2943         (body, &my_gateway), a_priority ), my_gateway(self()) {
2944         fgt_multioutput_node_with_body<1>(
2945             CODEPTR(), FLOW_ASYNC_NODE,
2946             &this->my_graph, static_cast<receiver<input_type> *>(this),
2947             this->output_ports(), this->my_body
2948         );
2949     }
2950 
2951     template <typename Body>
2952         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2953     __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority)
2954         : async_node(g, concurrency, body, Policy(), a_priority) {}
2955 
2956 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2957     template <typename Body, typename... Args>
2958         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2959     __TBB_NOINLINE_SYM async_node(
2960         const node_set<Args...>& nodes, size_t concurrency, Body body,
2961         Policy = Policy(), node_priority_t a_priority = no_priority )
2962         : async_node(nodes.graph_reference(), concurrency, body, a_priority) {
2963         make_edges_in_order(nodes, *this);
2964     }
2965 
2966     template <typename Body, typename... Args>
2967         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2968     __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority)
2969         : async_node(nodes, concurrency, body, Policy(), a_priority) {}
2970 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2971 
2972     __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
2973         static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
2974         static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
2975 
2976         fgt_multioutput_node_with_body<1>( CODEPTR(), FLOW_ASYNC_NODE,
2977                 &this->my_graph, static_cast<receiver<input_type> *>(this),
2978                 this->output_ports(), this->my_body );
2979     }
2980 
2981     gateway_type& gateway() {
2982         return my_gateway;
2983     }
2984 
2985     // Define sender< Output >
2986 
2987     //! Add a new successor to this node
2988     bool register_successor(successor_type&) override {
2989         __TBB_ASSERT(false, "Successors must be registered only via ports");
2990         return false;
2991     }
2992 
2993     //! Removes a successor from this node
2994     bool remove_successor(successor_type&) override {
2995         __TBB_ASSERT(false, "Successors must be removed only via ports");
2996         return false;
2997     }
2998 
2999     template<typename Body>
3000     Body copy_function_object() {
3001         typedef multifunction_body<input_type, typename base_type::output_ports_type> mfn_body_type;
3002         typedef async_body<Input, typename base_type::output_ports_type, gateway_type, Body> async_body_type;
3003         mfn_body_type &body_ref = *this->my_body;
3004         async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
3005         return ab.get_body();
3006     }
3007 
3008 protected:
3009 
3010     void reset_node( reset_flags f) override {
3011        base_type::reset_node(f);
3012     }
3013 };
3014 
3015 #include "detail/_flow_graph_node_set_impl.h"
3016 
3017 template< typename T >
3018 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
3019 public:
3020     typedef T input_type;
3021     typedef T output_type;
3022     typedef typename receiver<input_type>::predecessor_type predecessor_type;
3023     typedef typename sender<output_type>::successor_type successor_type;
3024 
3025     __TBB_NOINLINE_SYM explicit overwrite_node(graph &g)
3026         : graph_node(g), my_successors(this), my_buffer_is_valid(false)
3027     {
3028         fgt_node( CODEPTR(), FLOW_OVERWRITE_NODE, &this->my_graph,
3029                   static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
3030     }
3031 
3032 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3033     template <typename... Args>
3034     overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) {
3035         make_edges_in_order(nodes, *this);
3036     }
3037 #endif
3038 
3039     //! Copy constructor; doesn't take anything from src; default won't work
3040     __TBB_NOINLINE_SYM overwrite_node( const overwrite_node& src ) : overwrite_node(src.my_graph) {}
3041 
3042     ~overwrite_node() {}
3043 
3044     bool register_successor( successor_type &s ) override {
3045         spin_mutex::scoped_lock l( my_mutex );
3046         if (my_buffer_is_valid && is_graph_active( my_graph )) {
3047             // We have a valid value that must be forwarded immediately.
3048             bool ret = s.try_put( my_buffer );
3049             if ( ret ) {
3050                 // We add the successor that accepted our put
3051                 my_successors.register_successor( s );
3052             } else {
3053                 // In case of reservation a race between the moment of reservation and register_successor can appear,
3054                 // because failed reserve does not mean that register_successor is not ready to put a message immediately.
3055                 // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
3056                 // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
3057                 small_object_allocator allocator{};
3058                 typedef register_predecessor_task task_type;
3059                 graph_task* t = allocator.new_object<task_type>(graph_reference(), allocator, *this, s);
3060                 graph_reference().reserve_wait();
3061                 spawn_in_graph_arena( my_graph, *t );
3062             }
3063         } else {
3064             // No valid value yet, just add as successor
3065             my_successors.register_successor( s );
3066         }
3067         return true;
3068     }
3069 
3070     bool remove_successor( successor_type &s ) override {
3071         spin_mutex::scoped_lock l( my_mutex );
3072         my_successors.remove_successor(s);
3073         return true;
3074     }
3075 
3076     bool try_get( input_type &v ) override {
3077         spin_mutex::scoped_lock l( my_mutex );
3078         if ( my_buffer_is_valid ) {
3079             v = my_buffer;
3080             return true;
3081         }
3082         return false;
3083     }
3084 
3085     //! Reserves an item
3086     bool try_reserve( T &v ) override {
3087         return try_get(v);
3088     }
3089 
3090     //! Releases the reserved item
3091     bool try_release() override { return true; }
3092 
3093     //! Consumes the reserved item
3094     bool try_consume() override { return true; }
3095 
3096     bool is_valid() {
3097        spin_mutex::scoped_lock l( my_mutex );
3098        return my_buffer_is_valid;
3099     }
3100 
3101     void clear() {
3102        spin_mutex::scoped_lock l( my_mutex );
3103        my_buffer_is_valid = false;
3104     }
3105 
3106 protected:
3107 
3108     template< typename R, typename B > friend class run_and_put_task;
3109     template<typename X, typename Y> friend class broadcast_cache;
3110     template<typename X, typename Y> friend class round_robin_cache;
3111     graph_task* try_put_task( const input_type &v ) override {
3112         spin_mutex::scoped_lock l( my_mutex );
3113         return try_put_task_impl(v);
3114     }
3115 
3116     graph_task * try_put_task_impl(const input_type &v) {
3117         my_buffer = v;
3118         my_buffer_is_valid = true;
3119         graph_task* rtask = my_successors.try_put_task(v);
3120         if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
3121         return rtask;
3122     }
3123 
3124     graph& graph_reference() const override {
3125         return my_graph;
3126     }
3127 
3128     //! Breaks an infinite loop between the node reservation and register_successor call
3129     struct register_predecessor_task : public graph_task {
3130         register_predecessor_task(
3131             graph& g, small_object_allocator& allocator, predecessor_type& owner, successor_type& succ)
3132             : graph_task(g, allocator), o(owner), s(succ) {};
3133 
3134         task* execute(execution_data& ed) override {
3135             // TODO revamp: investigate why qualification is needed for register_successor() call
3136             using tbb::detail::d1::register_predecessor;
3137             using tbb::detail::d1::register_successor;
3138             if ( !register_predecessor(s, o) ) {
3139                 register_successor(o, s);
3140             }
3141             finalize<register_predecessor_task>(ed);
3142             return nullptr;
3143         }
3144 
3145         task* cancel(execution_data& ed) override {
3146             finalize<register_predecessor_task>(ed);
3147             return nullptr;
3148         }
3149 
3150         predecessor_type& o;
3151         successor_type& s;
3152     };
3153 
3154     spin_mutex my_mutex;
3155     broadcast_cache< input_type, null_rw_mutex > my_successors;
3156     input_type my_buffer;
3157     bool my_buffer_is_valid;
3158 
3159     void reset_node( reset_flags f) override {
3160         my_buffer_is_valid = false;
3161        if (f&rf_clear_edges) {
3162            my_successors.clear();
3163        }
3164     }
3165 };  // overwrite_node
3166 
3167 template< typename T >
3168 class write_once_node : public overwrite_node<T> {
3169 public:
3170     typedef T input_type;
3171     typedef T output_type;
3172     typedef overwrite_node<T> base_type;
3173     typedef typename receiver<input_type>::predecessor_type predecessor_type;
3174     typedef typename sender<output_type>::successor_type successor_type;
3175 
3176     //! Constructor
3177     __TBB_NOINLINE_SYM explicit write_once_node(graph& g) : base_type(g) {
3178         fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3179                                  static_cast<receiver<input_type> *>(this),
3180                                  static_cast<sender<output_type> *>(this) );
3181     }
3182 
3183 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3184     template <typename... Args>
3185     write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) {
3186         make_edges_in_order(nodes, *this);
3187     }
3188 #endif
3189 
3190     //! Copy constructor: call base class copy constructor
3191     __TBB_NOINLINE_SYM write_once_node( const write_once_node& src ) : base_type(src) {
3192         fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3193                                  static_cast<receiver<input_type> *>(this),
3194                                  static_cast<sender<output_type> *>(this) );
3195     }
3196 
3197 protected:
3198     template< typename R, typename B > friend class run_and_put_task;
3199     template<typename X, typename Y> friend class broadcast_cache;
3200     template<typename X, typename Y> friend class round_robin_cache;
3201     graph_task *try_put_task( const T &v ) override {
3202         spin_mutex::scoped_lock l( this->my_mutex );
3203         return this->my_buffer_is_valid ? nullptr : this->try_put_task_impl(v);
3204     }
3205 }; // write_once_node
3206 
3207 inline void set_name(const graph& g, const char *name) {
3208     fgt_graph_desc(&g, name);
3209 }
3210 
3211 template <typename Output>
3212 inline void set_name(const input_node<Output>& node, const char *name) {
3213     fgt_node_desc(&node, name);
3214 }
3215 
3216 template <typename Input, typename Output, typename Policy>
3217 inline void set_name(const function_node<Input, Output, Policy>& node, const char *name) {
3218     fgt_node_desc(&node, name);
3219 }
3220 
3221 template <typename Output, typename Policy>
3222 inline void set_name(const continue_node<Output,Policy>& node, const char *name) {
3223     fgt_node_desc(&node, name);
3224 }
3225 
3226 template <typename T>
3227 inline void set_name(const broadcast_node<T>& node, const char *name) {
3228     fgt_node_desc(&node, name);
3229 }
3230 
3231 template <typename T>
3232 inline void set_name(const buffer_node<T>& node, const char *name) {
3233     fgt_node_desc(&node, name);
3234 }
3235 
3236 template <typename T>
3237 inline void set_name(const queue_node<T>& node, const char *name) {
3238     fgt_node_desc(&node, name);
3239 }
3240 
3241 template <typename T>
3242 inline void set_name(const sequencer_node<T>& node, const char *name) {
3243     fgt_node_desc(&node, name);
3244 }
3245 
3246 template <typename T, typename Compare>
3247 inline void set_name(const priority_queue_node<T, Compare>& node, const char *name) {
3248     fgt_node_desc(&node, name);
3249 }
3250 
3251 template <typename T, typename DecrementType>
3252 inline void set_name(const limiter_node<T, DecrementType>& node, const char *name) {
3253     fgt_node_desc(&node, name);
3254 }
3255 
3256 template <typename OutputTuple, typename JP>
3257 inline void set_name(const join_node<OutputTuple, JP>& node, const char *name) {
3258     fgt_node_desc(&node, name);
3259 }
3260 
3261 template <typename... Types>
3262 inline void set_name(const indexer_node<Types...>& node, const char *name) {
3263     fgt_node_desc(&node, name);
3264 }
3265 
3266 template <typename T>
3267 inline void set_name(const overwrite_node<T>& node, const char *name) {
3268     fgt_node_desc(&node, name);
3269 }
3270 
3271 template <typename T>
3272 inline void set_name(const write_once_node<T>& node, const char *name) {
3273     fgt_node_desc(&node, name);
3274 }
3275 
3276 template<typename Input, typename Output, typename Policy>
3277 inline void set_name(const multifunction_node<Input, Output, Policy>& node, const char *name) {
3278     fgt_multioutput_node_desc(&node, name);
3279 }
3280 
3281 template<typename TupleType>
3282 inline void set_name(const split_node<TupleType>& node, const char *name) {
3283     fgt_multioutput_node_desc(&node, name);
3284 }
3285 
3286 template< typename InputTuple, typename OutputTuple >
3287 inline void set_name(const composite_node<InputTuple, OutputTuple>& node, const char *name) {
3288     fgt_multiinput_multioutput_node_desc(&node, name);
3289 }
3290 
3291 template<typename Input, typename Output, typename Policy>
3292 inline void set_name(const async_node<Input, Output, Policy>& node, const char *name)
3293 {
3294     fgt_multioutput_node_desc(&node, name);
3295 }
3296 } // d1
3297 } // detail
3298 } // tbb
3299 
3300 
3301 // Include deduction guides for node classes
3302 #include "detail/_flow_graph_nodes_deduction.h"
3303 
3304 namespace tbb {
3305 namespace flow {
3306 inline namespace v1 {
3307     using detail::d1::receiver;
3308     using detail::d1::sender;
3309 
3310     using detail::d1::serial;
3311     using detail::d1::unlimited;
3312 
3313     using detail::d1::reset_flags;
3314     using detail::d1::rf_reset_protocol;
3315     using detail::d1::rf_reset_bodies;
3316     using detail::d1::rf_clear_edges;
3317 
3318     using detail::d1::graph;
3319     using detail::d1::graph_node;
3320     using detail::d1::continue_msg;
3321 
3322     using detail::d1::input_node;
3323     using detail::d1::function_node;
3324     using detail::d1::multifunction_node;
3325     using detail::d1::split_node;
3326     using detail::d1::output_port;
3327     using detail::d1::indexer_node;
3328     using detail::d1::tagged_msg;
3329     using detail::d1::cast_to;
3330     using detail::d1::is_a;
3331     using detail::d1::continue_node;
3332     using detail::d1::overwrite_node;
3333     using detail::d1::write_once_node;
3334     using detail::d1::broadcast_node;
3335     using detail::d1::buffer_node;
3336     using detail::d1::queue_node;
3337     using detail::d1::sequencer_node;
3338     using detail::d1::priority_queue_node;
3339     using detail::d1::limiter_node;
3340     using namespace detail::d1::graph_policy_namespace;
3341     using detail::d1::join_node;
3342     using detail::d1::input_port;
3343     using detail::d1::copy_body;
3344     using detail::d1::make_edge;
3345     using detail::d1::remove_edge;
3346     using detail::d1::tag_value;
3347     using detail::d1::composite_node;
3348     using detail::d1::async_node;
3349     using detail::d1::node_priority_t;
3350     using detail::d1::no_priority;
3351 
3352 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3353     using detail::d1::follows;
3354     using detail::d1::precedes;
3355     using detail::d1::make_node_set;
3356     using detail::d1::make_edges;
3357 #endif
3358 
3359 } // v1
3360 } // flow
3361 
3362     using detail::d1::flow_control;
3363 
3364 namespace profiling {
3365     using detail::d1::set_name;
3366 } // profiling
3367 
3368 } // tbb
3369 
3370 
3371 #if TBB_USE_PROFILING_TOOLS  && ( __unix__ || __APPLE__ )
3372    // We don't do pragma pop here, since it still gives warning on the USER side
3373    #undef __TBB_NOINLINE_SYM
3374 #endif
3375 
3376 #endif // __TBB_flow_graph_H