Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-12-18 10:24:25

0001 /*
0002     Copyright (c) 2005-2024 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB_flow_graph_H
0018 #define __TBB_flow_graph_H
0019 
0020 #include <atomic>
0021 #include <memory>
0022 #include <type_traits>
0023 
0024 #include "detail/_config.h"
0025 #include "detail/_namespace_injection.h"
0026 #include "spin_mutex.h"
0027 #include "null_mutex.h"
0028 #include "spin_rw_mutex.h"
0029 #include "null_rw_mutex.h"
0030 #include "detail/_pipeline_filters.h"
0031 #include "detail/_task.h"
0032 #include "detail/_small_object_pool.h"
0033 #include "cache_aligned_allocator.h"
0034 #include "detail/_exception.h"
0035 #include "detail/_template_helpers.h"
0036 #include "detail/_aggregator.h"
0037 #include "detail/_allocator_traits.h"
0038 #include "detail/_utils.h"
0039 #include "profiling.h"
0040 #include "task_arena.h"
0041 
0042 #if TBB_USE_PROFILING_TOOLS && ( __unix__ || __APPLE__ )
0043    #if __INTEL_COMPILER
0044        // Disabled warning "routine is both inline and noinline"
0045        #pragma warning (push)
0046        #pragma warning( disable: 2196 )
0047    #endif
0048    #define __TBB_NOINLINE_SYM __attribute__((noinline))
0049 #else
0050    #define __TBB_NOINLINE_SYM
0051 #endif
0052 
0053 #include <tuple>
0054 #include <list>
0055 #include <forward_list>
0056 #include <queue>
0057 #if __TBB_CPP20_CONCEPTS_PRESENT
0058 #include <concepts>
0059 #endif
0060 
0061 /** @file
0062   \brief The graph related classes and functions
0063 
0064   There are some applications that best express dependencies as messages
0065   passed between nodes in a graph.  These messages may contain data or
0066   simply act as signals that a predecessors has completed. The graph
0067   class and its associated node classes can be used to express such
0068   applications.
0069 */
0070 
0071 namespace tbb {
0072 namespace detail {
0073 
0074 namespace d2 {
0075 
0076 //! An enumeration the provides the two most common concurrency levels: unlimited and serial
0077 enum concurrency { unlimited = 0, serial = 1 };
0078 
0079 //! A generic null type
0080 struct null_type {};
0081 
0082 //! An empty class used for messages that mean "I'm done"
0083 class continue_msg {};
0084 
0085 } // namespace d2
0086 
0087 #if __TBB_CPP20_CONCEPTS_PRESENT
0088 namespace d0 {
0089 
0090 template <typename ReturnType, typename OutputType>
0091 concept node_body_return_type = std::same_as<OutputType, tbb::detail::d2::continue_msg> ||
0092                                 std::convertible_to<OutputType, ReturnType>;
0093 
0094 // TODO: consider using std::invocable here
0095 template <typename Body, typename Output>
0096 concept continue_node_body = std::copy_constructible<Body> &&
0097                              requires( Body& body, const tbb::detail::d2::continue_msg& v ) {
0098                                  { body(v) } -> node_body_return_type<Output>;
0099                              };
0100 
0101 template <typename Body, typename Input, typename Output>
0102 concept function_node_body = std::copy_constructible<Body> &&
0103                              std::invocable<Body&, const Input&> &&
0104                              node_body_return_type<std::invoke_result_t<Body&, const Input&>, Output>;
0105 
0106 template <typename FunctionObject, typename Input, typename Key>
0107 concept join_node_function_object = std::copy_constructible<FunctionObject> &&
0108                                     std::invocable<FunctionObject&, const Input&> &&
0109                                     std::convertible_to<std::invoke_result_t<FunctionObject&, const Input&>, Key>;
0110 
0111 template <typename Body, typename Output>
0112 concept input_node_body = std::copy_constructible<Body> &&
0113                           requires( Body& body, tbb::detail::d1::flow_control& fc ) {
0114                               { body(fc) } -> adaptive_same_as<Output>;
0115                           };
0116 
0117 template <typename Body, typename Input, typename OutputPortsType>
0118 concept multifunction_node_body = std::copy_constructible<Body> &&
0119                                   std::invocable<Body&, const Input&, OutputPortsType&>;
0120 
0121 template <typename Sequencer, typename Value>
0122 concept sequencer = std::copy_constructible<Sequencer> &&
0123                     std::invocable<Sequencer&, const Value&> &&
0124                     std::convertible_to<std::invoke_result_t<Sequencer&, const Value&>, std::size_t>;
0125 
0126 template <typename Body, typename Input, typename GatewayType>
0127 concept async_node_body = std::copy_constructible<Body> &&
0128                           std::invocable<Body&, const Input&, GatewayType&>;
0129 
0130 } // namespace d0
0131 #endif // __TBB_CPP20_CONCEPTS_PRESENT
0132 
0133 namespace d2 {
0134 
0135 //! Forward declaration section
0136 template< typename T > class sender;
0137 template< typename T > class receiver;
0138 class continue_receiver;
0139 
0140 template< typename T, typename U > class limiter_node;  // needed for resetting decrementer
0141 
0142 template<typename T, typename M> class successor_cache;
0143 template<typename T, typename M> class broadcast_cache;
0144 template<typename T, typename M> class round_robin_cache;
0145 template<typename T, typename M> class predecessor_cache;
0146 template<typename T, typename M> class reservable_predecessor_cache;
0147 
0148 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0149 namespace order {
0150 struct following;
0151 struct preceding;
0152 }
0153 template<typename Order, typename... Args> struct node_set;
0154 #endif
0155 
0156 
0157 } // namespace d2
0158 } // namespace detail
0159 } // namespace tbb
0160 
0161 //! The graph class
0162 #include "detail/_flow_graph_impl.h"
0163 
0164 namespace tbb {
0165 namespace detail {
0166 namespace d2 {
0167 
0168 static inline std::pair<graph_task*, graph_task*> order_tasks(graph_task* first, graph_task* second) {
0169     if (second->priority > first->priority)
0170         return std::make_pair(second, first);
0171     return std::make_pair(first, second);
0172 }
0173 
0174 // submit task if necessary. Returns the non-enqueued task if there is one.
0175 static inline graph_task* combine_tasks(graph& g, graph_task* left, graph_task* right) {
0176     // if no RHS task, don't change left.
0177     if (right == nullptr) return left;
0178     // right != nullptr
0179     if (left == nullptr) return right;
0180     if (left == SUCCESSFULLY_ENQUEUED) return right;
0181     // left contains a task
0182     if (right != SUCCESSFULLY_ENQUEUED) {
0183         // both are valid tasks
0184         auto tasks_pair = order_tasks(left, right);
0185         spawn_in_graph_arena(g, *tasks_pair.first);
0186         return tasks_pair.second;
0187     }
0188     return left;
0189 }
0190 
0191 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0192 class message_metainfo {
0193 public:
0194     using waiters_type = std::forward_list<d1::wait_context_vertex*>;
0195 
0196     message_metainfo() = default;
0197 
0198     message_metainfo(const waiters_type& waiters) : my_waiters(waiters) {}
0199     message_metainfo(waiters_type&& waiters) : my_waiters(std::move(waiters)) {}
0200 
0201     const waiters_type& waiters() const & { return my_waiters; }
0202     waiters_type&& waiters() && { return std::move(my_waiters); }
0203 
0204     bool empty() const { return my_waiters.empty(); }
0205 
0206     void merge(const message_metainfo& other) {
0207         // TODO: should we avoid duplications on merging
0208         my_waiters.insert_after(my_waiters.before_begin(),
0209                                 other.waiters().begin(),
0210                                 other.waiters().end());
0211     }
0212 private:
0213     waiters_type my_waiters;
0214 }; // class message_metainfo
0215 
0216 #define __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo) , metainfo
0217 
0218 #else
0219 #define __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)
0220 #endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0221 
0222 //! Pure virtual template class that defines a sender of messages of type T
0223 template< typename T >
0224 class sender {
0225 public:
0226     virtual ~sender() {}
0227 
0228     //! Request an item from the sender
0229     virtual bool try_get( T & ) { return false; }
0230 
0231 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0232     virtual bool try_get( T &, message_metainfo& ) { return false; }
0233 #endif
0234 
0235     //! Reserves an item in the sender
0236     virtual bool try_reserve( T & ) { return false; }
0237 
0238 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0239     virtual bool try_reserve( T &, message_metainfo& ) { return false; }
0240 #endif
0241 
0242     //! Releases the reserved item
0243     virtual bool try_release( ) { return false; }
0244 
0245     //! Consumes the reserved item
0246     virtual bool try_consume( ) { return false; }
0247 
0248 protected:
0249     //! The output type of this sender
0250     typedef T output_type;
0251 
0252     //! The successor type for this node
0253     typedef receiver<T> successor_type;
0254 
0255     //! Add a new successor to this node
0256     virtual bool register_successor( successor_type &r ) = 0;
0257 
0258     //! Removes a successor from this node
0259     virtual bool remove_successor( successor_type &r ) = 0;
0260 
0261     template<typename C>
0262     friend bool register_successor(sender<C>& s, receiver<C>& r);
0263 
0264     template<typename C>
0265     friend bool remove_successor  (sender<C>& s, receiver<C>& r);
0266 };  // class sender<T>
0267 
0268 template<typename C>
0269 bool register_successor(sender<C>& s, receiver<C>& r) {
0270     return s.register_successor(r);
0271 }
0272 
0273 template<typename C>
0274 bool remove_successor(sender<C>& s, receiver<C>& r) {
0275     return s.remove_successor(r);
0276 }
0277 
0278 //! Pure virtual template class that defines a receiver of messages of type T
0279 template< typename T >
0280 class receiver {
0281 private:
0282     template <typename... TryPutTaskArgs>
0283     bool internal_try_put(const T& t, TryPutTaskArgs&&... args) {
0284         graph_task* res = try_put_task(t, std::forward<TryPutTaskArgs>(args)...);
0285         if (!res) return false;
0286         if (res != SUCCESSFULLY_ENQUEUED) spawn_in_graph_arena(graph_reference(), *res);
0287         return true;
0288     }
0289 
0290 public:
0291     //! Destructor
0292     virtual ~receiver() {}
0293 
0294     //! Put an item to the receiver
0295     bool try_put( const T& t ) {
0296         return internal_try_put(t);
0297     }
0298 
0299 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0300     //! Put an item to the receiver and wait for completion
0301     bool try_put_and_wait( const T& t ) {
0302         // Since try_put_and_wait is a blocking call, it is safe to create wait_context on stack
0303         d1::wait_context_vertex msg_wait_vertex{};
0304 
0305         bool res = internal_try_put(t, message_metainfo{message_metainfo::waiters_type{&msg_wait_vertex}});
0306         if (res) {
0307             __TBB_ASSERT(graph_reference().my_context != nullptr, "No wait_context associated with the Flow Graph");
0308             wait(msg_wait_vertex.get_context(), *graph_reference().my_context);
0309         }
0310         return res;
0311     }
0312 #endif
0313 
0314     //! put item to successor; return task to run the successor if possible.
0315 protected:
0316     //! The input type of this receiver
0317     typedef T input_type;
0318 
0319     //! The predecessor type for this node
0320     typedef sender<T> predecessor_type;
0321 
0322     template< typename R, typename B > friend class run_and_put_task;
0323     template< typename X, typename Y > friend class broadcast_cache;
0324     template< typename X, typename Y > friend class round_robin_cache;
0325     virtual graph_task *try_put_task(const T& t) = 0;
0326 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0327     virtual graph_task *try_put_task(const T& t, const message_metainfo&) = 0;
0328 #endif
0329     virtual graph& graph_reference() const = 0;
0330 
0331     template<typename TT, typename M> friend class successor_cache;
0332     virtual bool is_continue_receiver() { return false; }
0333 
0334     // TODO revamp: reconsider the inheritance and move node priority out of receiver
0335     virtual node_priority_t priority() const { return no_priority; }
0336 
0337     //! Add a predecessor to the node
0338     virtual bool register_predecessor( predecessor_type & ) { return false; }
0339 
0340     //! Remove a predecessor from the node
0341     virtual bool remove_predecessor( predecessor_type & ) { return false; }
0342 
0343     template <typename C>
0344     friend bool register_predecessor(receiver<C>& r, sender<C>& s);
0345     template <typename C>
0346     friend bool remove_predecessor  (receiver<C>& r, sender<C>& s);
0347 }; // class receiver<T>
0348 
0349 template <typename C>
0350 bool register_predecessor(receiver<C>& r, sender<C>& s) {
0351     return r.register_predecessor(s);
0352 }
0353 
0354 template <typename C>
0355 bool remove_predecessor(receiver<C>& r, sender<C>& s) {
0356     return r.remove_predecessor(s);
0357 }
0358 
0359 //! Base class for receivers of completion messages
0360 /** These receivers automatically reset, but cannot be explicitly waited on */
0361 class continue_receiver : public receiver< continue_msg > {
0362 protected:
0363 
0364     //! Constructor
0365     explicit continue_receiver( int number_of_predecessors, node_priority_t a_priority ) {
0366         my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
0367         my_current_count = 0;
0368         my_priority = a_priority;
0369     }
0370 
0371     //! Copy constructor
0372     continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
0373         my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
0374         my_current_count = 0;
0375         my_priority = src.my_priority;
0376     }
0377 
0378     //! Increments the trigger threshold
0379     bool register_predecessor( predecessor_type & ) override {
0380         spin_mutex::scoped_lock l(my_mutex);
0381         ++my_predecessor_count;
0382         return true;
0383     }
0384 
0385     //! Decrements the trigger threshold
0386     /** Does not check to see if the removal of the predecessor now makes the current count
0387         exceed the new threshold.  So removing a predecessor while the graph is active can cause
0388         unexpected results. */
0389     bool remove_predecessor( predecessor_type & ) override {
0390         spin_mutex::scoped_lock l(my_mutex);
0391         --my_predecessor_count;
0392         return true;
0393     }
0394 
0395     //! The input type
0396     typedef continue_msg input_type;
0397 
0398     //! The predecessor type for this node
0399     typedef receiver<input_type>::predecessor_type predecessor_type;
0400 
0401     template< typename R, typename B > friend class run_and_put_task;
0402     template<typename X, typename Y> friend class broadcast_cache;
0403     template<typename X, typename Y> friend class round_robin_cache;
0404 
0405 private:
0406     // execute body is supposed to be too small to create a task for.
0407     graph_task* try_put_task_impl( const input_type& __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) {
0408 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0409         message_metainfo predecessor_metainfo;
0410 #endif
0411         {
0412             spin_mutex::scoped_lock l(my_mutex);
0413 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0414             // Prolong the wait and store the metainfo until receiving signals from all the predecessors
0415             for (auto waiter : metainfo.waiters()) {
0416                 waiter->reserve(1);
0417             }
0418             my_current_metainfo.merge(metainfo);
0419 #endif
0420             if ( ++my_current_count < my_predecessor_count )
0421                 return SUCCESSFULLY_ENQUEUED;
0422             else {
0423                 my_current_count = 0;
0424 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0425                 predecessor_metainfo = my_current_metainfo;
0426                 my_current_metainfo = message_metainfo{};
0427 #endif
0428             }
0429         }
0430 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0431         graph_task* res = execute(predecessor_metainfo);
0432         for (auto waiter : predecessor_metainfo.waiters()) {
0433             waiter->release(1);
0434         }
0435 #else
0436         graph_task* res = execute();
0437 #endif
0438         return res? res : SUCCESSFULLY_ENQUEUED;
0439     }
0440 
0441 protected:
0442     graph_task* try_put_task( const input_type& input ) override {
0443         return try_put_task_impl(input __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
0444     }
0445 
0446 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0447     graph_task* try_put_task( const input_type& input, const message_metainfo& metainfo ) override {
0448         return try_put_task_impl(input, metainfo);
0449     }
0450 #endif
0451 
0452     spin_mutex my_mutex;
0453     int my_predecessor_count;
0454     int my_current_count;
0455     int my_initial_predecessor_count;
0456 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0457     message_metainfo my_current_metainfo;
0458 #endif
0459     node_priority_t my_priority;
0460     // the friend declaration in the base class did not eliminate the "protected class"
0461     // error in gcc 4.1.2
0462     template<typename U, typename V> friend class limiter_node;
0463 
0464     virtual void reset_receiver( reset_flags f ) {
0465         my_current_count = 0;
0466         if (f & rf_clear_edges) {
0467             my_predecessor_count = my_initial_predecessor_count;
0468         }
0469     }
0470 
0471     //! Does whatever should happen when the threshold is reached
0472     /** This should be very fast or else spawn a task.  This is
0473         called while the sender is blocked in the try_put(). */
0474 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0475     virtual graph_task* execute(const message_metainfo& metainfo) = 0;
0476 #else
0477     virtual graph_task* execute() = 0;
0478 #endif
0479     template<typename TT, typename M> friend class successor_cache;
0480     bool is_continue_receiver() override { return true; }
0481 
0482     node_priority_t priority() const override { return my_priority; }
0483 }; // class continue_receiver
0484 
0485 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
0486     template <typename K, typename T>
0487     K key_from_message( const T &t ) {
0488         return t.key();
0489     }
0490 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
0491 
0492 } // d1
0493 } // detail
0494 } // tbb
0495 
0496 #include "detail/_flow_graph_trace_impl.h"
0497 #include "detail/_hash_compare.h"
0498 
0499 namespace tbb {
0500 namespace detail {
0501 namespace d2 {
0502 
0503 #include "detail/_flow_graph_body_impl.h"
0504 #include "detail/_flow_graph_cache_impl.h"
0505 #include "detail/_flow_graph_types_impl.h"
0506 
0507 using namespace graph_policy_namespace;
0508 
0509 template <typename C, typename N>
0510 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(nullptr)
0511 {
0512     if (begin) current_node = my_graph->my_nodes;
0513     //else it is an end iterator by default
0514 }
0515 
0516 template <typename C, typename N>
0517 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
0518     __TBB_ASSERT(current_node, "graph_iterator at end");
0519     return *operator->();
0520 }
0521 
0522 template <typename C, typename N>
0523 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
0524     return current_node;
0525 }
0526 
0527 template <typename C, typename N>
0528 void graph_iterator<C,N>::internal_forward() {
0529     if (current_node) current_node = current_node->next;
0530 }
0531 
0532 //! Constructs a graph with isolated task_group_context
0533 inline graph::graph() : my_wait_context_vertex(0), my_nodes(nullptr), my_nodes_last(nullptr), my_task_arena(nullptr) {
0534     prepare_task_arena();
0535     own_context = true;
0536     cancelled = false;
0537     caught_exception = false;
0538     my_context = new (r1::cache_aligned_allocate(sizeof(task_group_context))) task_group_context(FLOW_TASKS);
0539     fgt_graph(this);
0540     my_is_active = true;
0541 }
0542 
0543 inline graph::graph(task_group_context& use_this_context) :
0544     my_wait_context_vertex(0), my_context(&use_this_context), my_nodes(nullptr), my_nodes_last(nullptr), my_task_arena(nullptr) {
0545     prepare_task_arena();
0546     own_context = false;
0547     cancelled = false;
0548     caught_exception = false;
0549     fgt_graph(this);
0550     my_is_active = true;
0551 }
0552 
0553 inline graph::~graph() {
0554     wait_for_all();
0555     if (own_context) {
0556         my_context->~task_group_context();
0557         r1::cache_aligned_deallocate(my_context);
0558     }
0559     delete my_task_arena;
0560 }
0561 
0562 inline void graph::reserve_wait() {
0563     my_wait_context_vertex.reserve();
0564     fgt_reserve_wait(this);
0565 }
0566 
0567 inline void graph::release_wait() {
0568     fgt_release_wait(this);
0569     my_wait_context_vertex.release();
0570 }
0571 
0572 inline void graph::register_node(graph_node *n) {
0573     n->next = nullptr;
0574     {
0575         spin_mutex::scoped_lock lock(nodelist_mutex);
0576         n->prev = my_nodes_last;
0577         if (my_nodes_last) my_nodes_last->next = n;
0578         my_nodes_last = n;
0579         if (!my_nodes) my_nodes = n;
0580     }
0581 }
0582 
0583 inline void graph::remove_node(graph_node *n) {
0584     {
0585         spin_mutex::scoped_lock lock(nodelist_mutex);
0586         __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
0587         if (n->prev) n->prev->next = n->next;
0588         if (n->next) n->next->prev = n->prev;
0589         if (my_nodes_last == n) my_nodes_last = n->prev;
0590         if (my_nodes == n) my_nodes = n->next;
0591     }
0592     n->prev = n->next = nullptr;
0593 }
0594 
0595 inline void graph::reset( reset_flags f ) {
0596     // reset context
0597     deactivate_graph(*this);
0598 
0599     my_context->reset();
0600     cancelled = false;
0601     caught_exception = false;
0602     // reset all the nodes comprising the graph
0603     for(iterator ii = begin(); ii != end(); ++ii) {
0604         graph_node *my_p = &(*ii);
0605         my_p->reset_node(f);
0606     }
0607     // Reattach the arena. Might be useful to run the graph in a particular task_arena
0608     // while not limiting graph lifetime to a single task_arena::execute() call.
0609     prepare_task_arena( /*reinit=*/true );
0610     activate_graph(*this);
0611 }
0612 
0613 inline void graph::cancel() {
0614     my_context->cancel_group_execution();
0615 }
0616 
0617 inline graph::iterator graph::begin() { return iterator(this, true); }
0618 
0619 inline graph::iterator graph::end() { return iterator(this, false); }
0620 
0621 inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
0622 
0623 inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
0624 
0625 inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
0626 
0627 inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
0628 
0629 inline graph_node::graph_node(graph& g) : my_graph(g) {
0630     my_graph.register_node(this);
0631 }
0632 
0633 inline graph_node::~graph_node() {
0634     my_graph.remove_node(this);
0635 }
0636 
0637 #include "detail/_flow_graph_node_impl.h"
0638 
0639 
0640 //! An executable node that acts as a source, i.e. it has no predecessors
0641 
0642 template < typename Output >
0643     __TBB_requires(std::copyable<Output>)
0644 class input_node : public graph_node, public sender< Output > {
0645 public:
0646     //! The type of the output message, which is complete
0647     typedef Output output_type;
0648 
0649     //! The type of successors of this node
0650     typedef typename sender<output_type>::successor_type successor_type;
0651 
0652     // Input node has no input type
0653     typedef null_type input_type;
0654 
0655     //! Constructor for a node with a successor
0656     template< typename Body >
0657         __TBB_requires(input_node_body<Body, Output>)
0658      __TBB_NOINLINE_SYM input_node( graph &g, Body body )
0659          : graph_node(g), my_active(false)
0660          , my_body( new input_body_leaf< output_type, Body>(body) )
0661          , my_init_body( new input_body_leaf< output_type, Body>(body) )
0662          , my_successors(this), my_reserved(false), my_has_cached_item(false)
0663     {
0664         fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph,
0665                            static_cast<sender<output_type> *>(this), this->my_body);
0666     }
0667 
0668 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0669     template <typename Body, typename... Successors>
0670         __TBB_requires(input_node_body<Body, Output>)
0671     input_node( const node_set<order::preceding, Successors...>& successors, Body body )
0672         : input_node(successors.graph_reference(), body)
0673     {
0674         make_edges(*this, successors);
0675     }
0676 #endif
0677 
0678     //! Copy constructor
0679     __TBB_NOINLINE_SYM input_node( const input_node& src )
0680         : graph_node(src.my_graph), sender<Output>()
0681         , my_active(false)
0682         , my_body(src.my_init_body->clone()), my_init_body(src.my_init_body->clone())
0683         , my_successors(this), my_reserved(false), my_has_cached_item(false)
0684     {
0685         fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph,
0686                            static_cast<sender<output_type> *>(this), this->my_body);
0687     }
0688 
0689     //! The destructor
0690     ~input_node() { delete my_body; delete my_init_body; }
0691 
0692     //! Add a new successor to this node
0693     bool register_successor( successor_type &r ) override {
0694         spin_mutex::scoped_lock lock(my_mutex);
0695         my_successors.register_successor(r);
0696         if ( my_active )
0697             spawn_put();
0698         return true;
0699     }
0700 
0701     //! Removes a successor from this node
0702     bool remove_successor( successor_type &r ) override {
0703         spin_mutex::scoped_lock lock(my_mutex);
0704         my_successors.remove_successor(r);
0705         return true;
0706     }
0707 
0708     //! Request an item from the node
0709     bool try_get( output_type &v ) override {
0710         spin_mutex::scoped_lock lock(my_mutex);
0711         if ( my_reserved )
0712             return false;
0713 
0714         if ( my_has_cached_item ) {
0715             v = my_cached_item;
0716             my_has_cached_item = false;
0717             return true;
0718         }
0719         // we've been asked to provide an item, but we have none.  enqueue a task to
0720         // provide one.
0721         if ( my_active )
0722             spawn_put();
0723         return false;
0724     }
0725 
0726     //! Reserves an item.
0727     bool try_reserve( output_type &v ) override {
0728         spin_mutex::scoped_lock lock(my_mutex);
0729         if ( my_reserved ) {
0730             return false;
0731         }
0732 
0733         if ( my_has_cached_item ) {
0734             v = my_cached_item;
0735             my_reserved = true;
0736             return true;
0737         } else {
0738             return false;
0739         }
0740     }
0741 
0742 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0743 private:
0744     bool try_reserve( output_type& v, message_metainfo& ) override {
0745         return try_reserve(v);
0746     }
0747 
0748     bool try_get( output_type& v, message_metainfo& ) override {
0749         return try_get(v);
0750     }
0751 public:
0752 #endif
0753 
0754     //! Release a reserved item.
0755     /** true = item has been released and so remains in sender, dest must request or reserve future items */
0756     bool try_release( ) override {
0757         spin_mutex::scoped_lock lock(my_mutex);
0758         __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
0759         my_reserved = false;
0760         if(!my_successors.empty())
0761             spawn_put();
0762         return true;
0763     }
0764 
0765     //! Consumes a reserved item
0766     bool try_consume( ) override {
0767         spin_mutex::scoped_lock lock(my_mutex);
0768         __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
0769         my_reserved = false;
0770         my_has_cached_item = false;
0771         if ( !my_successors.empty() ) {
0772             spawn_put();
0773         }
0774         return true;
0775     }
0776 
0777     //! Activates a node that was created in the inactive state
0778     void activate() {
0779         spin_mutex::scoped_lock lock(my_mutex);
0780         my_active = true;
0781         if (!my_successors.empty())
0782             spawn_put();
0783     }
0784 
0785     template<typename Body>
0786     Body copy_function_object() {
0787         input_body<output_type> &body_ref = *this->my_body;
0788         return dynamic_cast< input_body_leaf<output_type, Body> & >(body_ref).get_body();
0789     }
0790 
0791 protected:
0792 
0793     //! resets the input_node to its initial state
0794     void reset_node( reset_flags f) override {
0795         my_active = false;
0796         my_reserved = false;
0797         my_has_cached_item = false;
0798 
0799         if(f & rf_clear_edges) my_successors.clear();
0800         if(f & rf_reset_bodies) {
0801             input_body<output_type> *tmp = my_init_body->clone();
0802             delete my_body;
0803             my_body = tmp;
0804         }
0805     }
0806 
0807 private:
0808     spin_mutex my_mutex;
0809     bool my_active;
0810     input_body<output_type> *my_body;
0811     input_body<output_type> *my_init_body;
0812     broadcast_cache< output_type > my_successors;
0813     bool my_reserved;
0814     bool my_has_cached_item;
0815     output_type my_cached_item;
0816 
0817     // used by apply_body_bypass, can invoke body of node.
0818     bool try_reserve_apply_body(output_type &v) {
0819         spin_mutex::scoped_lock lock(my_mutex);
0820         if ( my_reserved ) {
0821             return false;
0822         }
0823         if ( !my_has_cached_item ) {
0824             d1::flow_control control;
0825 
0826             fgt_begin_body( my_body );
0827 
0828             my_cached_item = (*my_body)(control);
0829             my_has_cached_item = !control.is_pipeline_stopped;
0830 
0831             fgt_end_body( my_body );
0832         }
0833         if ( my_has_cached_item ) {
0834             v = my_cached_item;
0835             my_reserved = true;
0836             return true;
0837         } else {
0838             return false;
0839         }
0840     }
0841 
0842     graph_task* create_put_task() {
0843         d1::small_object_allocator allocator{};
0844         typedef input_node_task_bypass< input_node<output_type> > task_type;
0845         graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
0846         return t;
0847     }
0848 
0849     //! Spawns a task that applies the body
0850     void spawn_put( ) {
0851         if(is_graph_active(this->my_graph)) {
0852             spawn_in_graph_arena(this->my_graph, *create_put_task());
0853         }
0854     }
0855 
0856     friend class input_node_task_bypass< input_node<output_type> >;
0857     //! Applies the body.  Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
0858     graph_task* apply_body_bypass( ) {
0859         output_type v;
0860         if ( !try_reserve_apply_body(v) )
0861             return nullptr;
0862 
0863         graph_task *last_task = my_successors.try_put_task(v);
0864         if ( last_task )
0865             try_consume();
0866         else
0867             try_release();
0868         return last_task;
0869     }
0870 };  // class input_node
0871 
0872 //! Implements a function node that supports Input -> Output
0873 template<typename Input, typename Output = continue_msg, typename Policy = queueing>
0874     __TBB_requires(std::default_initializable<Input> &&
0875                    std::copy_constructible<Input> &&
0876                    std::copy_constructible<Output>)
0877 class function_node
0878     : public graph_node
0879     , public function_input< Input, Output, Policy, cache_aligned_allocator<Input> >
0880     , public function_output<Output>
0881 {
0882     typedef cache_aligned_allocator<Input> internals_allocator;
0883 
0884 public:
0885     typedef Input input_type;
0886     typedef Output output_type;
0887     typedef function_input<input_type,output_type,Policy,internals_allocator> input_impl_type;
0888     typedef function_input_queue<input_type, internals_allocator> input_queue_type;
0889     typedef function_output<output_type> fOutput_type;
0890     typedef typename input_impl_type::predecessor_type predecessor_type;
0891     typedef typename fOutput_type::successor_type successor_type;
0892 
0893     using input_impl_type::my_predecessors;
0894 
0895     //! Constructor
0896     // input_queue_type is allocated here, but destroyed in the function_input_base.
0897     // TODO: pass the graph_buffer_policy to the function_input_base so it can all
0898     // be done in one place.  This would be an interface-breaking change.
0899     template< typename Body >
0900         __TBB_requires(function_node_body<Body, Input, Output>)
0901      __TBB_NOINLINE_SYM function_node( graph &g, size_t concurrency,
0902                    Body body, Policy = Policy(), node_priority_t a_priority = no_priority )
0903         : graph_node(g), input_impl_type(g, concurrency, body, a_priority),
0904           fOutput_type(g) {
0905         fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph,
0906                 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
0907     }
0908 
0909     template <typename Body>
0910         __TBB_requires(function_node_body<Body, Input, Output>)
0911     function_node( graph& g, size_t concurrency, Body body, node_priority_t a_priority )
0912         : function_node(g, concurrency, body, Policy(), a_priority) {}
0913 
0914 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0915     template <typename Body, typename... Args>
0916         __TBB_requires(function_node_body<Body, Input, Output>)
0917     function_node( const node_set<Args...>& nodes, size_t concurrency, Body body,
0918                    Policy p = Policy(), node_priority_t a_priority = no_priority )
0919         : function_node(nodes.graph_reference(), concurrency, body, p, a_priority) {
0920         make_edges_in_order(nodes, *this);
0921     }
0922 
0923     template <typename Body, typename... Args>
0924         __TBB_requires(function_node_body<Body, Input, Output>)
0925     function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority )
0926         : function_node(nodes, concurrency, body, Policy(), a_priority) {}
0927 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0928 
0929     //! Copy constructor
0930     __TBB_NOINLINE_SYM function_node( const function_node& src ) :
0931         graph_node(src.my_graph),
0932         input_impl_type(src),
0933         fOutput_type(src.my_graph) {
0934         fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph,
0935                 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
0936     }
0937 
0938 protected:
0939     template< typename R, typename B > friend class run_and_put_task;
0940     template<typename X, typename Y> friend class broadcast_cache;
0941     template<typename X, typename Y> friend class round_robin_cache;
0942     using input_impl_type::try_put_task;
0943 
0944     broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; }
0945 
0946     void reset_node(reset_flags f) override {
0947         input_impl_type::reset_function_input(f);
0948         // TODO: use clear() instead.
0949         if(f & rf_clear_edges) {
0950             successors().clear();
0951             my_predecessors.clear();
0952         }
0953         __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
0954         __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
0955     }
0956 
0957 };  // class function_node
0958 
0959 //! implements a function node that supports Input -> (set of outputs)
0960 // Output is a tuple of output types.
0961 template<typename Input, typename Output, typename Policy = queueing>
0962     __TBB_requires(std::default_initializable<Input> &&
0963                    std::copy_constructible<Input>)
0964 class multifunction_node :
0965     public graph_node,
0966     public multifunction_input
0967     <
0968         Input,
0969         typename wrap_tuple_elements<
0970             std::tuple_size<Output>::value,  // #elements in tuple
0971             multifunction_output,  // wrap this around each element
0972             Output // the tuple providing the types
0973         >::type,
0974         Policy,
0975         cache_aligned_allocator<Input>
0976     >
0977 {
0978     typedef cache_aligned_allocator<Input> internals_allocator;
0979 
0980 protected:
0981     static const int N = std::tuple_size<Output>::value;
0982 public:
0983     typedef Input input_type;
0984     typedef null_type output_type;
0985     typedef typename wrap_tuple_elements<N,multifunction_output, Output>::type output_ports_type;
0986     typedef multifunction_input<
0987         input_type, output_ports_type, Policy, internals_allocator> input_impl_type;
0988     typedef function_input_queue<input_type, internals_allocator> input_queue_type;
0989 private:
0990     using input_impl_type::my_predecessors;
0991 public:
0992     template<typename Body>
0993         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
0994     __TBB_NOINLINE_SYM multifunction_node(
0995         graph &g, size_t concurrency,
0996         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
0997     ) : graph_node(g), input_impl_type(g, concurrency, body, a_priority) {
0998         fgt_multioutput_node_with_body<N>(
0999             CODEPTR(), FLOW_MULTIFUNCTION_NODE,
1000             &this->my_graph, static_cast<receiver<input_type> *>(this),
1001             this->output_ports(), this->my_body
1002         );
1003     }
1004 
1005     template <typename Body>
1006         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
1007     __TBB_NOINLINE_SYM multifunction_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority)
1008         : multifunction_node(g, concurrency, body, Policy(), a_priority) {}
1009 
1010 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1011     template <typename Body, typename... Args>
1012         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
1013     __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body,
1014                        Policy p = Policy(), node_priority_t a_priority = no_priority)
1015         : multifunction_node(nodes.graph_reference(), concurrency, body, p, a_priority) {
1016         make_edges_in_order(nodes, *this);
1017     }
1018 
1019     template <typename Body, typename... Args>
1020         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
1021     __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority)
1022         : multifunction_node(nodes, concurrency, body, Policy(), a_priority) {}
1023 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1024 
1025     __TBB_NOINLINE_SYM multifunction_node( const multifunction_node &other) :
1026         graph_node(other.my_graph), input_impl_type(other) {
1027         fgt_multioutput_node_with_body<N>( CODEPTR(), FLOW_MULTIFUNCTION_NODE,
1028                 &this->my_graph, static_cast<receiver<input_type> *>(this),
1029                 this->output_ports(), this->my_body );
1030     }
1031 
1032     // all the guts are in multifunction_input...
1033 protected:
1034     void reset_node(reset_flags f) override { input_impl_type::reset(f); }
1035 };  // multifunction_node
1036 
1037 //! split_node: accepts a tuple as input, forwards each element of the tuple to its
1038 //  successors.  The node has unlimited concurrency, so it does not reject inputs.
1039 template<typename TupleType>
1040 class split_node : public graph_node, public receiver<TupleType> {
1041     static const int N = std::tuple_size<TupleType>::value;
1042     typedef receiver<TupleType> base_type;
1043 public:
1044     typedef TupleType input_type;
1045     typedef typename wrap_tuple_elements<
1046             N,  // #elements in tuple
1047             multifunction_output,  // wrap this around each element
1048             TupleType // the tuple providing the types
1049         >::type  output_ports_type;
1050 
1051     __TBB_NOINLINE_SYM explicit split_node(graph &g)
1052         : graph_node(g),
1053           my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports))
1054     {
1055         fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph,
1056             static_cast<receiver<input_type> *>(this), this->output_ports());
1057     }
1058 
1059 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1060     template <typename... Args>
1061     __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) {
1062         make_edges_in_order(nodes, *this);
1063     }
1064 #endif
1065 
1066     __TBB_NOINLINE_SYM split_node(const split_node& other)
1067         : graph_node(other.my_graph), base_type(other),
1068           my_output_ports(init_output_ports<output_ports_type>::call(other.my_graph, my_output_ports))
1069     {
1070         fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph,
1071             static_cast<receiver<input_type> *>(this), this->output_ports());
1072     }
1073 
1074     output_ports_type &output_ports() { return my_output_ports; }
1075 
1076 protected:
1077     graph_task *try_put_task(const TupleType& t) override {
1078         // Sending split messages in parallel is not justified, as overheads would prevail.
1079         // Also, we do not have successors here. So we just tell the task returned here is successful.
1080         return emit_element<N>::emit_this(this->my_graph, t, output_ports());
1081     }
1082 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1083     graph_task* try_put_task(const TupleType& t, const message_metainfo& metainfo) override {
1084         // Sending split messages in parallel is not justified, as overheads would prevail.
1085         // Also, we do not have successors here. So we just tell the task returned here is successful.
1086         return emit_element<N>::emit_this(this->my_graph, t, output_ports(), metainfo);
1087     }
1088 #endif
1089 
1090     void reset_node(reset_flags f) override {
1091         if (f & rf_clear_edges)
1092             clear_element<N>::clear_this(my_output_ports);
1093 
1094         __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "split_node reset failed");
1095     }
1096     graph& graph_reference() const override {
1097         return my_graph;
1098     }
1099 
1100 private:
1101     output_ports_type my_output_ports;
1102 };
1103 
1104 //! Implements an executable node that supports continue_msg -> Output
1105 template <typename Output, typename Policy = Policy<void> >
1106     __TBB_requires(std::copy_constructible<Output>)
1107 class continue_node : public graph_node, public continue_input<Output, Policy>,
1108                       public function_output<Output> {
1109 public:
1110     typedef continue_msg input_type;
1111     typedef Output output_type;
1112     typedef continue_input<Output, Policy> input_impl_type;
1113     typedef function_output<output_type> fOutput_type;
1114     typedef typename input_impl_type::predecessor_type predecessor_type;
1115     typedef typename fOutput_type::successor_type successor_type;
1116 
1117     //! Constructor for executable node with continue_msg -> Output
1118     template <typename Body >
1119         __TBB_requires(continue_node_body<Body, Output>)
1120     __TBB_NOINLINE_SYM continue_node(
1121         graph &g,
1122         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
1123     ) : graph_node(g), input_impl_type( g, body, a_priority ),
1124         fOutput_type(g) {
1125         fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1126 
1127                                            static_cast<receiver<input_type> *>(this),
1128                                            static_cast<sender<output_type> *>(this), this->my_body );
1129     }
1130 
1131     template <typename Body>
1132         __TBB_requires(continue_node_body<Body, Output>)
1133     continue_node( graph& g, Body body, node_priority_t a_priority )
1134         : continue_node(g, body, Policy(), a_priority) {}
1135 
1136 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1137     template <typename Body, typename... Args>
1138         __TBB_requires(continue_node_body<Body, Output>)
1139     continue_node( const node_set<Args...>& nodes, Body body,
1140                    Policy p = Policy(), node_priority_t a_priority = no_priority )
1141         : continue_node(nodes.graph_reference(), body, p, a_priority ) {
1142         make_edges_in_order(nodes, *this);
1143     }
1144     template <typename Body, typename... Args>
1145         __TBB_requires(continue_node_body<Body, Output>)
1146     continue_node( const node_set<Args...>& nodes, Body body, node_priority_t a_priority)
1147         : continue_node(nodes, body, Policy(), a_priority) {}
1148 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1149 
1150     //! Constructor for executable node with continue_msg -> Output
1151     template <typename Body >
1152         __TBB_requires(continue_node_body<Body, Output>)
1153     __TBB_NOINLINE_SYM continue_node(
1154         graph &g, int number_of_predecessors,
1155         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
1156     ) : graph_node(g)
1157       , input_impl_type(g, number_of_predecessors, body, a_priority),
1158         fOutput_type(g) {
1159         fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1160                                            static_cast<receiver<input_type> *>(this),
1161                                            static_cast<sender<output_type> *>(this), this->my_body );
1162     }
1163 
1164     template <typename Body>
1165         __TBB_requires(continue_node_body<Body, Output>)
1166     continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t a_priority)
1167         : continue_node(g, number_of_predecessors, body, Policy(), a_priority) {}
1168 
1169 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1170     template <typename Body, typename... Args>
1171         __TBB_requires(continue_node_body<Body, Output>)
1172     continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1173                    Body body, Policy p = Policy(), node_priority_t a_priority = no_priority )
1174         : continue_node(nodes.graph_reference(), number_of_predecessors, body, p, a_priority) {
1175         make_edges_in_order(nodes, *this);
1176     }
1177 
1178     template <typename Body, typename... Args>
1179         __TBB_requires(continue_node_body<Body, Output>)
1180     continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1181                    Body body, node_priority_t a_priority )
1182         : continue_node(nodes, number_of_predecessors, body, Policy(), a_priority) {}
1183 #endif
1184 
1185     //! Copy constructor
1186     __TBB_NOINLINE_SYM continue_node( const continue_node& src ) :
1187         graph_node(src.my_graph), input_impl_type(src),
1188         function_output<Output>(src.my_graph) {
1189         fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1190                                            static_cast<receiver<input_type> *>(this),
1191                                            static_cast<sender<output_type> *>(this), this->my_body );
1192     }
1193 
1194 protected:
1195     template< typename R, typename B > friend class run_and_put_task;
1196     template<typename X, typename Y> friend class broadcast_cache;
1197     template<typename X, typename Y> friend class round_robin_cache;
1198     using input_impl_type::try_put_task;
1199     broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; }
1200 
1201     void reset_node(reset_flags f) override {
1202         input_impl_type::reset_receiver(f);
1203         if(f & rf_clear_edges)successors().clear();
1204         __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1205     }
1206 };  // continue_node
1207 
1208 //! Forwards messages of type T to all successors
1209 template <typename T>
1210 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1211 public:
1212     typedef T input_type;
1213     typedef T output_type;
1214     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1215     typedef typename sender<output_type>::successor_type successor_type;
1216 private:
1217     broadcast_cache<input_type> my_successors;
1218 public:
1219 
1220     __TBB_NOINLINE_SYM explicit broadcast_node(graph& g) : graph_node(g), my_successors(this) {
1221         fgt_node( CODEPTR(), FLOW_BROADCAST_NODE, &this->my_graph,
1222                   static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1223     }
1224 
1225 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1226     template <typename... Args>
1227     broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) {
1228         make_edges_in_order(nodes, *this);
1229     }
1230 #endif
1231 
1232     // Copy constructor
1233     __TBB_NOINLINE_SYM broadcast_node( const broadcast_node& src ) : broadcast_node(src.my_graph) {}
1234 
1235     //! Adds a successor
1236     bool register_successor( successor_type &r ) override {
1237         my_successors.register_successor( r );
1238         return true;
1239     }
1240 
1241     //! Removes s as a successor
1242     bool remove_successor( successor_type &r ) override {
1243         my_successors.remove_successor( r );
1244         return true;
1245     }
1246 
1247 private:
1248     graph_task* try_put_task_impl(const T& t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) {
1249         graph_task* new_task = my_successors.try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
1250         if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
1251         return new_task;
1252     }
1253 
1254 protected:
1255     template< typename R, typename B > friend class run_and_put_task;
1256     template<typename X, typename Y> friend class broadcast_cache;
1257     template<typename X, typename Y> friend class round_robin_cache;
1258     //! build a task to run the successor if possible.  Default is old behavior.
1259     graph_task* try_put_task(const T& t) override {
1260         return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
1261     }
1262 
1263 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1264     graph_task* try_put_task(const T& t, const message_metainfo& metainfo) override {
1265         return try_put_task_impl(t, metainfo);
1266     }
1267 #endif
1268 
1269     graph& graph_reference() const override {
1270         return my_graph;
1271     }
1272 
1273     void reset_node(reset_flags f) override {
1274         if (f&rf_clear_edges) {
1275            my_successors.clear();
1276         }
1277         __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
1278     }
1279 };  // broadcast_node
1280 
1281 //! Forwards messages in arbitrary order
1282 template <typename T>
1283 class buffer_node
1284     : public graph_node
1285     , public reservable_item_buffer< T, cache_aligned_allocator<T> >
1286     , public receiver<T>, public sender<T>
1287 {
1288     typedef cache_aligned_allocator<T> internals_allocator;
1289 
1290 public:
1291     typedef T input_type;
1292     typedef T output_type;
1293     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1294     typedef typename sender<output_type>::successor_type successor_type;
1295     typedef buffer_node<T> class_type;
1296 
1297 protected:
1298     typedef size_t size_type;
1299     round_robin_cache< T, null_rw_mutex > my_successors;
1300 
1301     friend class forward_task_bypass< class_type >;
1302 
1303     enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
1304     };
1305 
1306     // implements the aggregator_operation concept
1307     class buffer_operation : public d1::aggregated_operation< buffer_operation > {
1308     public:
1309         char type;
1310         T* elem;
1311         graph_task* ltask;
1312         successor_type *r;
1313 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1314         message_metainfo* metainfo{ nullptr };
1315 #endif
1316 
1317         buffer_operation(const T& e, op_type t) : type(char(t))
1318                                                   , elem(const_cast<T*>(&e)) , ltask(nullptr)
1319                                                   , r(nullptr)
1320         {}
1321 
1322 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1323         buffer_operation(const T& e, op_type t, const message_metainfo& info)
1324             : type(char(t)), elem(const_cast<T*>(&e)), ltask(nullptr), r(nullptr)
1325             , metainfo(const_cast<message_metainfo*>(&info))
1326         {}
1327 
1328         buffer_operation(op_type t, message_metainfo& info)
1329             : type(char(t)), elem(nullptr), ltask(nullptr), r(nullptr), metainfo(&info) {}
1330 #endif
1331         buffer_operation(op_type t) : type(char(t)), elem(nullptr), ltask(nullptr), r(nullptr) {}
1332     };
1333 
1334     bool forwarder_busy;
1335     typedef d1::aggregating_functor<class_type, buffer_operation> handler_type;
1336     friend class d1::aggregating_functor<class_type, buffer_operation>;
1337     d1::aggregator< handler_type, buffer_operation> my_aggregator;
1338 
1339     virtual void handle_operations(buffer_operation *op_list) {
1340         handle_operations_impl(op_list, this);
1341     }
1342 
1343     template<typename derived_type>
1344     void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
1345         __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1346 
1347         buffer_operation *tmp = nullptr;
1348         bool try_forwarding = false;
1349         while (op_list) {
1350             tmp = op_list;
1351             op_list = op_list->next;
1352             switch (tmp->type) {
1353             case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
1354             case rem_succ: internal_rem_succ(tmp); break;
1355             case req_item: internal_pop(tmp); break;
1356             case res_item: internal_reserve(tmp); break;
1357             case rel_res:  internal_release(tmp); try_forwarding = true; break;
1358             case con_res:  internal_consume(tmp); try_forwarding = true; break;
1359             case put_item: try_forwarding = internal_push(tmp); break;
1360             case try_fwd_task: internal_forward_task(tmp); break;
1361             }
1362         }
1363 
1364         derived->order();
1365 
1366         if (try_forwarding && !forwarder_busy) {
1367             if(is_graph_active(this->my_graph)) {
1368                 forwarder_busy = true;
1369                 typedef forward_task_bypass<class_type> task_type;
1370                 d1::small_object_allocator allocator{};
1371                 graph_task* new_task = allocator.new_object<task_type>(graph_reference(), allocator, *this);
1372                 // tmp should point to the last item handled by the aggregator.  This is the operation
1373                 // the handling thread enqueued.  So modifying that record will be okay.
1374                 // TODO revamp: check that the issue is still present
1375                 // workaround for icc bug  (at least 12.0 and 13.0)
1376                 // error: function "tbb::flow::interfaceX::combine_tasks" cannot be called with the given argument list
1377                 //        argument types are: (graph, graph_task *, graph_task *)
1378                 graph_task *z = tmp->ltask;
1379                 graph &g = this->my_graph;
1380                 tmp->ltask = combine_tasks(g, z, new_task);  // in case the op generated a task
1381             }
1382         }
1383     }  // handle_operations
1384 
1385     inline graph_task *grab_forwarding_task( buffer_operation &op_data) {
1386         return op_data.ltask;
1387     }
1388 
1389     inline bool enqueue_forwarding_task(buffer_operation &op_data) {
1390         graph_task *ft = grab_forwarding_task(op_data);
1391         if(ft) {
1392             spawn_in_graph_arena(graph_reference(), *ft);
1393             return true;
1394         }
1395         return false;
1396     }
1397 
1398     //! This is executed by an enqueued task, the "forwarder"
1399     virtual graph_task *forward_task() {
1400         buffer_operation op_data(try_fwd_task);
1401         graph_task *last_task = nullptr;
1402         do {
1403             op_data.status = WAIT;
1404             op_data.ltask = nullptr;
1405             my_aggregator.execute(&op_data);
1406 
1407             // workaround for icc bug
1408             graph_task *xtask = op_data.ltask;
1409             graph& g = this->my_graph;
1410             last_task = combine_tasks(g, last_task, xtask);
1411         } while (op_data.status ==SUCCEEDED);
1412         return last_task;
1413     }
1414 
1415     //! Register successor
1416     virtual void internal_reg_succ(buffer_operation *op) {
1417         __TBB_ASSERT(op->r, nullptr);
1418         my_successors.register_successor(*(op->r));
1419         op->status.store(SUCCEEDED, std::memory_order_release);
1420     }
1421 
1422     //! Remove successor
1423     virtual void internal_rem_succ(buffer_operation *op) {
1424         __TBB_ASSERT(op->r, nullptr);
1425         my_successors.remove_successor(*(op->r));
1426         op->status.store(SUCCEEDED, std::memory_order_release);
1427     }
1428 
1429 private:
1430     void order() {}
1431 
1432     bool is_item_valid() {
1433         return this->my_item_valid(this->my_tail - 1);
1434     }
1435 
1436     void try_put_and_add_task(graph_task*& last_task) {
1437         graph_task* new_task = my_successors.try_put_task(this->back()
1438                                                           __TBB_FLOW_GRAPH_METAINFO_ARG(this->back_metainfo()));
1439         if (new_task) {
1440             // workaround for icc bug
1441             graph& g = this->my_graph;
1442             last_task = combine_tasks(g, last_task, new_task);
1443             this->destroy_back();
1444         }
1445     }
1446 
1447 protected:
1448     //! Tries to forward valid items to successors
1449     virtual void internal_forward_task(buffer_operation *op) {
1450         internal_forward_task_impl(op, this);
1451     }
1452 
1453     template<typename derived_type>
1454     void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
1455         __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1456 
1457         if (this->my_reserved || !derived->is_item_valid()) {
1458             op->status.store(FAILED, std::memory_order_release);
1459             this->forwarder_busy = false;
1460             return;
1461         }
1462         // Try forwarding, giving each successor a chance
1463         graph_task* last_task = nullptr;
1464         size_type counter = my_successors.size();
1465         for (; counter > 0 && derived->is_item_valid(); --counter)
1466             derived->try_put_and_add_task(last_task);
1467 
1468         op->ltask = last_task;  // return task
1469         if (last_task && !counter) {
1470             op->status.store(SUCCEEDED, std::memory_order_release);
1471         }
1472         else {
1473             op->status.store(FAILED, std::memory_order_release);
1474             forwarder_busy = false;
1475         }
1476     }
1477 
1478     virtual bool internal_push(buffer_operation *op) {
1479         __TBB_ASSERT(op->elem, nullptr);
1480 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1481         __TBB_ASSERT(op->metainfo, nullptr);
1482         this->push_back(*(op->elem), (*op->metainfo));
1483 #else
1484         this->push_back(*(op->elem));
1485 #endif
1486         op->status.store(SUCCEEDED, std::memory_order_release);
1487         return true;
1488     }
1489 
1490     virtual void internal_pop(buffer_operation *op) {
1491         __TBB_ASSERT(op->elem, nullptr);
1492 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1493         bool pop_result = op->metainfo ? this->pop_back(*(op->elem), *(op->metainfo))
1494                                        : this->pop_back(*(op->elem));
1495 #else
1496         bool pop_result = this->pop_back(*(op->elem));
1497 #endif
1498         if (pop_result) {
1499             op->status.store(SUCCEEDED, std::memory_order_release);
1500         }
1501         else {
1502             op->status.store(FAILED, std::memory_order_release);
1503         }
1504     }
1505 
1506     virtual void internal_reserve(buffer_operation *op) {
1507         __TBB_ASSERT(op->elem, nullptr);
1508 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1509         bool reserve_result = op->metainfo ? this->reserve_front(*(op->elem), *(op->metainfo))
1510                                            : this->reserve_front(*(op->elem));
1511 #else
1512         bool reserve_result = this->reserve_front(*(op->elem));
1513 #endif
1514         if (reserve_result) {
1515             op->status.store(SUCCEEDED, std::memory_order_release);
1516         }
1517         else {
1518             op->status.store(FAILED, std::memory_order_release);
1519         }
1520     }
1521 
1522     virtual void internal_consume(buffer_operation *op) {
1523         this->consume_front();
1524         op->status.store(SUCCEEDED, std::memory_order_release);
1525     }
1526 
1527     virtual void internal_release(buffer_operation *op) {
1528         this->release_front();
1529         op->status.store(SUCCEEDED, std::memory_order_release);
1530     }
1531 
1532 public:
1533     //! Constructor
1534     __TBB_NOINLINE_SYM explicit buffer_node( graph &g )
1535         : graph_node(g), reservable_item_buffer<T, internals_allocator>(), receiver<T>(),
1536           sender<T>(), my_successors(this), forwarder_busy(false)
1537     {
1538         my_aggregator.initialize_handler(handler_type(this));
1539         fgt_node( CODEPTR(), FLOW_BUFFER_NODE, &this->my_graph,
1540                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1541     }
1542 
1543 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1544     template <typename... Args>
1545     buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) {
1546         make_edges_in_order(nodes, *this);
1547     }
1548 #endif
1549 
1550     //! Copy constructor
1551     __TBB_NOINLINE_SYM buffer_node( const buffer_node& src ) : buffer_node(src.my_graph) {}
1552 
1553     //
1554     // message sender implementation
1555     //
1556 
1557     //! Adds a new successor.
1558     /** Adds successor r to the list of successors; may forward tasks.  */
1559     bool register_successor( successor_type &r ) override {
1560         buffer_operation op_data(reg_succ);
1561         op_data.r = &r;
1562         my_aggregator.execute(&op_data);
1563         (void)enqueue_forwarding_task(op_data);
1564         return true;
1565     }
1566 
1567     //! Removes a successor.
1568     /** Removes successor r from the list of successors.
1569         It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
1570     bool remove_successor( successor_type &r ) override {
1571         // TODO revamp: investigate why full qualification is necessary here
1572         tbb::detail::d2::remove_predecessor(r, *this);
1573         buffer_operation op_data(rem_succ);
1574         op_data.r = &r;
1575         my_aggregator.execute(&op_data);
1576         // even though this operation does not cause a forward, if we are the handler, and
1577         // a forward is scheduled, we may be the first to reach this point after the aggregator,
1578         // and so should check for the task.
1579         (void)enqueue_forwarding_task(op_data);
1580         return true;
1581     }
1582 
1583     //! Request an item from the buffer_node
1584     /**  true = v contains the returned item<BR>
1585          false = no item has been returned */
1586     bool try_get( T &v ) override {
1587         buffer_operation op_data(req_item);
1588         op_data.elem = &v;
1589         my_aggregator.execute(&op_data);
1590         (void)enqueue_forwarding_task(op_data);
1591         return (op_data.status==SUCCEEDED);
1592     }
1593 
1594 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1595     bool try_get( T &v, message_metainfo& metainfo ) override {
1596         buffer_operation op_data(req_item, metainfo);
1597         op_data.elem = &v;
1598         my_aggregator.execute(&op_data);
1599         (void)enqueue_forwarding_task(op_data);
1600         return (op_data.status==SUCCEEDED);
1601     }
1602 #endif
1603 
1604     //! Reserves an item.
1605     /**  false = no item can be reserved<BR>
1606          true = an item is reserved */
1607     bool try_reserve( T &v ) override {
1608         buffer_operation op_data(res_item);
1609         op_data.elem = &v;
1610         my_aggregator.execute(&op_data);
1611         (void)enqueue_forwarding_task(op_data);
1612         return (op_data.status==SUCCEEDED);
1613     }
1614 
1615 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1616     bool try_reserve( output_type& v, message_metainfo& metainfo ) override {
1617         buffer_operation op_data(res_item, metainfo);
1618         op_data.elem = &v;
1619         my_aggregator.execute(&op_data);
1620         (void)enqueue_forwarding_task(op_data);
1621         return op_data.status==SUCCEEDED;
1622     }
1623 #endif
1624 
1625     //! Release a reserved item.
1626     /**  true = item has been released and so remains in sender */
1627     bool try_release() override {
1628         buffer_operation op_data(rel_res);
1629         my_aggregator.execute(&op_data);
1630         (void)enqueue_forwarding_task(op_data);
1631         return true;
1632     }
1633 
1634     //! Consumes a reserved item.
1635     /** true = item is removed from sender and reservation removed */
1636     bool try_consume() override {
1637         buffer_operation op_data(con_res);
1638         my_aggregator.execute(&op_data);
1639         (void)enqueue_forwarding_task(op_data);
1640         return true;
1641     }
1642 
1643 private:
1644     graph_task* try_put_task_impl(const T& t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) {
1645         buffer_operation op_data(t, put_item __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
1646         my_aggregator.execute(&op_data);
1647         graph_task *ft = grab_forwarding_task(op_data);
1648         // sequencer_nodes can return failure (if an item has been previously inserted)
1649         // We have to spawn the returned task if our own operation fails.
1650 
1651         if(ft && op_data.status ==FAILED) {
1652             // we haven't succeeded queueing the item, but for some reason the
1653             // call returned a task (if another request resulted in a successful
1654             // forward this could happen.)  Queue the task and reset the pointer.
1655             spawn_in_graph_arena(graph_reference(), *ft); ft = nullptr;
1656         }
1657         else if(!ft && op_data.status ==SUCCEEDED) {
1658             ft = SUCCESSFULLY_ENQUEUED;
1659         }
1660         return ft;
1661     }
1662 
1663 protected:
1664 
1665     template< typename R, typename B > friend class run_and_put_task;
1666     template<typename X, typename Y> friend class broadcast_cache;
1667     template<typename X, typename Y> friend class round_robin_cache;
1668     //! receive an item, return a task *if possible
1669     graph_task *try_put_task(const T &t) override {
1670         return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
1671     }
1672 
1673 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1674     graph_task* try_put_task(const T& t, const message_metainfo& metainfo) override {
1675         return try_put_task_impl(t, metainfo);
1676     }
1677 #endif
1678 
1679     graph& graph_reference() const override {
1680         return my_graph;
1681     }
1682 
1683 protected:
1684     void reset_node( reset_flags f) override {
1685         reservable_item_buffer<T, internals_allocator>::reset();
1686         // TODO: just clear structures
1687         if (f&rf_clear_edges) {
1688             my_successors.clear();
1689         }
1690         forwarder_busy = false;
1691     }
1692 };  // buffer_node
1693 
1694 //! Forwards messages in FIFO order
1695 template <typename T>
1696 class queue_node : public buffer_node<T> {
1697 protected:
1698     typedef buffer_node<T> base_type;
1699     typedef typename base_type::size_type size_type;
1700     typedef typename base_type::buffer_operation queue_operation;
1701     typedef queue_node class_type;
1702 
1703 private:
1704     template<typename> friend class buffer_node;
1705 
1706     bool is_item_valid() {
1707         return this->my_item_valid(this->my_head);
1708     }
1709 
1710     void try_put_and_add_task(graph_task*& last_task) {
1711         graph_task* new_task = this->my_successors.try_put_task(this->front()
1712                                                                 __TBB_FLOW_GRAPH_METAINFO_ARG(this->front_metainfo()));
1713 
1714         if (new_task) {
1715             // workaround for icc bug
1716             graph& graph_ref = this->graph_reference();
1717             last_task = combine_tasks(graph_ref, last_task, new_task);
1718             this->destroy_front();
1719         }
1720     }
1721 
1722 protected:
1723     void internal_forward_task(queue_operation *op) override {
1724         this->internal_forward_task_impl(op, this);
1725     }
1726 
1727     void internal_pop(queue_operation *op) override {
1728         if ( this->my_reserved || !this->my_item_valid(this->my_head)){
1729             op->status.store(FAILED, std::memory_order_release);
1730         }
1731         else {
1732 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1733             if (op->metainfo) {
1734                 this->pop_front(*(op->elem), *(op->metainfo));
1735             } else
1736 #endif
1737             {
1738                 this->pop_front(*(op->elem));
1739             }
1740             op->status.store(SUCCEEDED, std::memory_order_release);
1741         }
1742     }
1743     void internal_reserve(queue_operation *op) override {
1744         if (this->my_reserved || !this->my_item_valid(this->my_head)) {
1745             op->status.store(FAILED, std::memory_order_release);
1746         }
1747         else {
1748 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1749             if (op->metainfo) {
1750                 this->reserve_front(*(op->elem), *(op->metainfo));
1751             }
1752             else
1753 #endif
1754             {
1755                 this->reserve_front(*(op->elem));
1756             }
1757             op->status.store(SUCCEEDED, std::memory_order_release);
1758         }
1759     }
1760     void internal_consume(queue_operation *op) override {
1761         this->consume_front();
1762         op->status.store(SUCCEEDED, std::memory_order_release);
1763     }
1764 
1765 public:
1766     typedef T input_type;
1767     typedef T output_type;
1768     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1769     typedef typename sender<output_type>::successor_type successor_type;
1770 
1771     //! Constructor
1772     __TBB_NOINLINE_SYM explicit queue_node( graph &g ) : base_type(g) {
1773         fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph),
1774                                  static_cast<receiver<input_type> *>(this),
1775                                  static_cast<sender<output_type> *>(this) );
1776     }
1777 
1778 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1779     template <typename... Args>
1780     queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) {
1781         make_edges_in_order(nodes, *this);
1782     }
1783 #endif
1784 
1785     //! Copy constructor
1786     __TBB_NOINLINE_SYM queue_node( const queue_node& src) : base_type(src) {
1787         fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph),
1788                                  static_cast<receiver<input_type> *>(this),
1789                                  static_cast<sender<output_type> *>(this) );
1790     }
1791 
1792 
1793 protected:
1794     void reset_node( reset_flags f) override {
1795         base_type::reset_node(f);
1796     }
1797 };  // queue_node
1798 
1799 //! Forwards messages in sequence order
1800 template <typename T>
1801     __TBB_requires(std::copyable<T>)
1802 class sequencer_node : public queue_node<T> {
1803     function_body< T, size_t > *my_sequencer;
1804     // my_sequencer should be a benign function and must be callable
1805     // from a parallel context.  Does this mean it needn't be reset?
1806 public:
1807     typedef T input_type;
1808     typedef T output_type;
1809     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1810     typedef typename sender<output_type>::successor_type successor_type;
1811 
1812     //! Constructor
1813     template< typename Sequencer >
1814         __TBB_requires(sequencer<Sequencer, T>)
1815     __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T>(g),
1816         my_sequencer(new function_body_leaf< T, size_t, Sequencer>(s) ) {
1817         fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph),
1818                                  static_cast<receiver<input_type> *>(this),
1819                                  static_cast<sender<output_type> *>(this) );
1820     }
1821 
1822 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1823     template <typename Sequencer, typename... Args>
1824         __TBB_requires(sequencer<Sequencer, T>)
1825     sequencer_node( const node_set<Args...>& nodes, const Sequencer& s)
1826         : sequencer_node(nodes.graph_reference(), s) {
1827         make_edges_in_order(nodes, *this);
1828     }
1829 #endif
1830 
1831     //! Copy constructor
1832     __TBB_NOINLINE_SYM sequencer_node( const sequencer_node& src ) : queue_node<T>(src),
1833         my_sequencer( src.my_sequencer->clone() ) {
1834         fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph),
1835                                  static_cast<receiver<input_type> *>(this),
1836                                  static_cast<sender<output_type> *>(this) );
1837     }
1838 
1839     //! Destructor
1840     ~sequencer_node() { delete my_sequencer; }
1841 
1842 protected:
1843     typedef typename buffer_node<T>::size_type size_type;
1844     typedef typename buffer_node<T>::buffer_operation sequencer_operation;
1845 
1846 private:
1847     bool internal_push(sequencer_operation *op) override {
1848         size_type tag = (*my_sequencer)(*(op->elem));
1849 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
1850         if (tag < this->my_head) {
1851             // have already emitted a message with this tag
1852             op->status.store(FAILED, std::memory_order_release);
1853             return false;
1854         }
1855 #endif
1856         // cannot modify this->my_tail now; the buffer would be inconsistent.
1857         size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
1858 
1859         if (this->size(new_tail) > this->capacity()) {
1860             this->grow_my_array(this->size(new_tail));
1861         }
1862         this->my_tail = new_tail;
1863 
1864 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1865         __TBB_ASSERT(op->metainfo, nullptr);
1866         bool place_item_result = this->place_item(tag, *(op->elem), *(op->metainfo));
1867         const op_stat res = place_item_result ? SUCCEEDED : FAILED;
1868 #else
1869         const op_stat res = this->place_item(tag, *(op->elem)) ? SUCCEEDED : FAILED;
1870 #endif
1871         op->status.store(res, std::memory_order_release);
1872         return res ==SUCCEEDED;
1873     }
1874 };  // sequencer_node
1875 
1876 //! Forwards messages in priority order
1877 template<typename T, typename Compare = std::less<T>>
1878 class priority_queue_node : public buffer_node<T> {
1879 public:
1880     typedef T input_type;
1881     typedef T output_type;
1882     typedef buffer_node<T> base_type;
1883     typedef priority_queue_node class_type;
1884     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1885     typedef typename sender<output_type>::successor_type successor_type;
1886 
1887     //! Constructor
1888     __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() )
1889         : buffer_node<T>(g), compare(comp), mark(0) {
1890         fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
1891                                  static_cast<receiver<input_type> *>(this),
1892                                  static_cast<sender<output_type> *>(this) );
1893     }
1894 
1895 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1896     template <typename... Args>
1897     priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare())
1898         : priority_queue_node(nodes.graph_reference(), comp) {
1899         make_edges_in_order(nodes, *this);
1900     }
1901 #endif
1902 
1903     //! Copy constructor
1904     __TBB_NOINLINE_SYM priority_queue_node( const priority_queue_node &src )
1905         : buffer_node<T>(src), mark(0)
1906     {
1907         fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
1908                                  static_cast<receiver<input_type> *>(this),
1909                                  static_cast<sender<output_type> *>(this) );
1910     }
1911 
1912 protected:
1913 
1914     void reset_node( reset_flags f) override {
1915         mark = 0;
1916         base_type::reset_node(f);
1917     }
1918 
1919     typedef typename buffer_node<T>::size_type size_type;
1920     typedef typename buffer_node<T>::item_type item_type;
1921     typedef typename buffer_node<T>::buffer_operation prio_operation;
1922 
1923     //! Tries to forward valid items to successors
1924     void internal_forward_task(prio_operation *op) override {
1925         this->internal_forward_task_impl(op, this);
1926     }
1927 
1928     void handle_operations(prio_operation *op_list) override {
1929         this->handle_operations_impl(op_list, this);
1930     }
1931 
1932     bool internal_push(prio_operation *op) override {
1933 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1934         __TBB_ASSERT(op->metainfo, nullptr);
1935         prio_push(*(op->elem), *(op->metainfo));
1936 #else
1937         prio_push(*(op->elem));
1938 #endif
1939         op->status.store(SUCCEEDED, std::memory_order_release);
1940         return true;
1941     }
1942 
1943     void internal_pop(prio_operation *op) override {
1944         // if empty or already reserved, don't pop
1945         if ( this->my_reserved == true || this->my_tail == 0 ) {
1946             op->status.store(FAILED, std::memory_order_release);
1947             return;
1948         }
1949 
1950         *(op->elem) = prio();
1951 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1952         if (op->metainfo) {
1953             *(op->metainfo) = std::move(prio_metainfo());
1954         }
1955 #endif
1956         op->status.store(SUCCEEDED, std::memory_order_release);
1957         prio_pop();
1958 
1959     }
1960 
1961     // pops the highest-priority item, saves copy
1962     void internal_reserve(prio_operation *op) override {
1963         if (this->my_reserved == true || this->my_tail == 0) {
1964             op->status.store(FAILED, std::memory_order_release);
1965             return;
1966         }
1967         this->my_reserved = true;
1968         *(op->elem) = prio();
1969 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1970         if (op->metainfo) {
1971             *(op->metainfo) = std::move(prio_metainfo());
1972             reserved_metainfo = *(op->metainfo);
1973         }
1974 #endif
1975         reserved_item = *(op->elem);
1976         op->status.store(SUCCEEDED, std::memory_order_release);
1977         prio_pop();
1978     }
1979 
1980     void internal_consume(prio_operation *op) override {
1981         op->status.store(SUCCEEDED, std::memory_order_release);
1982         this->my_reserved = false;
1983         reserved_item = input_type();
1984 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1985         for (auto waiter : reserved_metainfo.waiters()) {
1986             waiter->release(1);
1987         }
1988 
1989         reserved_metainfo = message_metainfo{};
1990 #endif
1991     }
1992 
1993     void internal_release(prio_operation *op) override {
1994         op->status.store(SUCCEEDED, std::memory_order_release);
1995         prio_push(reserved_item __TBB_FLOW_GRAPH_METAINFO_ARG(reserved_metainfo));
1996         this->my_reserved = false;
1997         reserved_item = input_type();
1998 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1999         for (auto waiter : reserved_metainfo.waiters()) {
2000             waiter->release(1);
2001         }
2002 
2003         reserved_metainfo = message_metainfo{};
2004 #endif
2005     }
2006 
2007 private:
2008     template<typename> friend class buffer_node;
2009 
2010     void order() {
2011         if (mark < this->my_tail) heapify();
2012         __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
2013     }
2014 
2015     bool is_item_valid() {
2016         return this->my_tail > 0;
2017     }
2018 
2019     void try_put_and_add_task(graph_task*& last_task) {
2020         graph_task* new_task = this->my_successors.try_put_task(this->prio()
2021                                                                 __TBB_FLOW_GRAPH_METAINFO_ARG(this->prio_metainfo()));
2022         if (new_task) {
2023             // workaround for icc bug
2024             graph& graph_ref = this->graph_reference();
2025             last_task = combine_tasks(graph_ref, last_task, new_task);
2026             prio_pop();
2027         }
2028     }
2029 
2030 private:
2031     Compare compare;
2032     size_type mark;
2033 
2034     input_type reserved_item;
2035 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
2036     message_metainfo reserved_metainfo;
2037 #endif
2038 
2039     // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
2040     bool prio_use_tail() {
2041         __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
2042         return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
2043     }
2044 
2045     // prio_push: checks that the item will fit, expand array if necessary, put at end
2046     void prio_push(const T &src __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) {
2047         if ( this->my_tail >= this->my_array_size )
2048             this->grow_my_array( this->my_tail + 1 );
2049         (void) this->place_item(this->my_tail, src __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
2050         ++(this->my_tail);
2051         __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
2052     }
2053 
2054     // prio_pop: deletes highest priority item from the array, and if it is item
2055     // 0, move last item to 0 and reheap.  If end of array, just destroy and decrement tail
2056     // and mark.  Assumes the array has already been tested for emptiness; no failure.
2057     void prio_pop()  {
2058         if (prio_use_tail()) {
2059             // there are newly pushed elements; last one higher than top
2060             // copy the data
2061             this->destroy_item(this->my_tail-1);
2062             --(this->my_tail);
2063             __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2064             return;
2065         }
2066         this->destroy_item(0);
2067         if(this->my_tail > 1) {
2068             // push the last element down heap
2069             __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), nullptr);
2070             this->move_item(0,this->my_tail - 1);
2071         }
2072         --(this->my_tail);
2073         if(mark > this->my_tail) --mark;
2074         if (this->my_tail > 1) // don't reheap for heap of size 1
2075             reheap();
2076         __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2077     }
2078 
2079     const T& prio() {
2080         return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
2081     }
2082 
2083 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
2084     message_metainfo& prio_metainfo() {
2085         return this->get_my_metainfo(prio_use_tail() ? this->my_tail-1 : 0);
2086     }
2087 #endif
2088 
2089     // turn array into heap
2090     void heapify() {
2091         if(this->my_tail == 0) {
2092             mark = 0;
2093             return;
2094         }
2095         if (!mark) mark = 1;
2096         for (; mark<this->my_tail; ++mark) { // for each unheaped element
2097             size_type cur_pos = mark;
2098             input_type to_place;
2099 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
2100             message_metainfo metainfo;
2101 #endif
2102             this->fetch_item(mark, to_place __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
2103             do { // push to_place up the heap
2104                 size_type parent = (cur_pos-1)>>1;
2105                 if (!compare(this->get_my_item(parent), to_place))
2106                     break;
2107                 this->move_item(cur_pos, parent);
2108                 cur_pos = parent;
2109             } while( cur_pos );
2110             this->place_item(cur_pos, to_place __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(metainfo)));
2111         }
2112     }
2113 
2114     // otherwise heapified array with new root element; rearrange to heap
2115     void reheap() {
2116         size_type cur_pos=0, child=1;
2117         while (child < mark) {
2118             size_type target = child;
2119             if (child+1<mark &&
2120                 compare(this->get_my_item(child),
2121                         this->get_my_item(child+1)))
2122                 ++target;
2123             // target now has the higher priority child
2124             if (compare(this->get_my_item(target),
2125                         this->get_my_item(cur_pos)))
2126                 break;
2127             // swap
2128             this->swap_items(cur_pos, target);
2129             cur_pos = target;
2130             child = (cur_pos<<1)+1;
2131         }
2132     }
2133 };  // priority_queue_node
2134 
2135 //! Forwards messages only if the threshold has not been reached
2136 /** This node forwards items until its threshold is reached.
2137     It contains no buffering.  If the downstream node rejects, the
2138     message is dropped. */
2139 template< typename T, typename DecrementType=continue_msg >
2140 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
2141 public:
2142     typedef T input_type;
2143     typedef T output_type;
2144     typedef typename receiver<input_type>::predecessor_type predecessor_type;
2145     typedef typename sender<output_type>::successor_type successor_type;
2146     //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
2147 
2148 private:
2149     size_t my_threshold;
2150     size_t my_count; // number of successful puts
2151     size_t my_tries; // number of active put attempts
2152     size_t my_future_decrement; // number of active decrement
2153     reservable_predecessor_cache< T, spin_mutex > my_predecessors;
2154     spin_mutex my_mutex;
2155     broadcast_cache< T > my_successors;
2156 
2157     //! The internal receiver< DecrementType > that adjusts the count
2158     threshold_regulator< limiter_node<T, DecrementType>, DecrementType > decrement;
2159 
2160     graph_task* decrement_counter( long long delta ) {
2161         if ( delta > 0 && size_t(delta) > my_threshold ) {
2162             delta = my_threshold;
2163         }
2164 
2165         {
2166             spin_mutex::scoped_lock lock(my_mutex);
2167             if ( delta > 0 && size_t(delta) > my_count ) {
2168                 if( my_tries > 0 ) {
2169                     my_future_decrement += (size_t(delta) - my_count);
2170                 }
2171                 my_count = 0;
2172             }
2173             else if ( delta < 0 && size_t(-delta) > my_threshold - my_count ) {
2174                 my_count = my_threshold;
2175             }
2176             else {
2177                 my_count -= size_t(delta); // absolute value of delta is sufficiently small
2178             }
2179             __TBB_ASSERT(my_count <= my_threshold, "counter values are truncated to be inside the [0, threshold] interval");
2180         }
2181         return forward_task();
2182     }
2183 
2184     // Let threshold_regulator call decrement_counter()
2185     friend class threshold_regulator< limiter_node<T, DecrementType>, DecrementType >;
2186 
2187     friend class forward_task_bypass< limiter_node<T,DecrementType> >;
2188 
2189     bool check_conditions() {  // always called under lock
2190         return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
2191     }
2192 
2193     // only returns a valid task pointer or nullptr, never SUCCESSFULLY_ENQUEUED
2194     graph_task* forward_task() {
2195         input_type v;
2196         graph_task* rval = nullptr;
2197         bool reserved = false;
2198 
2199         {
2200             spin_mutex::scoped_lock lock(my_mutex);
2201             if ( check_conditions() )
2202                 ++my_tries;
2203             else
2204                 return nullptr;
2205         }
2206 
2207         //SUCCESS
2208         // if we can reserve and can put, we consume the reservation
2209         // we increment the count and decrement the tries
2210 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
2211         message_metainfo metainfo;
2212 #endif
2213         if ( (my_predecessors.try_reserve(v __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo))) == true ) {
2214             reserved = true;
2215             if ( (rval = my_successors.try_put_task(v __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo))) != nullptr ) {
2216                 {
2217                     spin_mutex::scoped_lock lock(my_mutex);
2218                     ++my_count;
2219                     if ( my_future_decrement ) {
2220                         if ( my_count > my_future_decrement ) {
2221                             my_count -= my_future_decrement;
2222                             my_future_decrement = 0;
2223                         }
2224                         else {
2225                             my_future_decrement -= my_count;
2226                             my_count = 0;
2227                         }
2228                     }
2229                     --my_tries;
2230                     my_predecessors.try_consume();
2231                     if ( check_conditions() ) {
2232                         if ( is_graph_active(this->my_graph) ) {
2233                             typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2234                             d1::small_object_allocator allocator{};
2235                             graph_task* rtask = allocator.new_object<task_type>( my_graph, allocator, *this );
2236                             spawn_in_graph_arena(graph_reference(), *rtask);
2237                         }
2238                     }
2239                 }
2240                 return rval;
2241             }
2242         }
2243         //FAILURE
2244         //if we can't reserve, we decrement the tries
2245         //if we can reserve but can't put, we decrement the tries and release the reservation
2246         {
2247             spin_mutex::scoped_lock lock(my_mutex);
2248             --my_tries;
2249             if (reserved) my_predecessors.try_release();
2250             if ( check_conditions() ) {
2251                 if ( is_graph_active(this->my_graph) ) {
2252                     d1::small_object_allocator allocator{};
2253                     typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2254                     graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
2255                     __TBB_ASSERT(!rval, "Have two tasks to handle");
2256                     return t;
2257                 }
2258             }
2259             return rval;
2260         }
2261     }
2262 
2263     void initialize() {
2264         fgt_node(
2265             CODEPTR(), FLOW_LIMITER_NODE, &this->my_graph,
2266             static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
2267             static_cast<sender<output_type> *>(this)
2268         );
2269     }
2270 
2271 public:
2272     //! Constructor
2273     limiter_node(graph &g, size_t threshold)
2274         : graph_node(g), my_threshold(threshold), my_count(0), my_tries(0), my_future_decrement(0),
2275         my_predecessors(this), my_successors(this), decrement(this)
2276     {
2277         initialize();
2278     }
2279 
2280 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2281     template <typename... Args>
2282     limiter_node(const node_set<Args...>& nodes, size_t threshold)
2283         : limiter_node(nodes.graph_reference(), threshold) {
2284         make_edges_in_order(nodes, *this);
2285     }
2286 #endif
2287 
2288     //! Copy constructor
2289     limiter_node( const limiter_node& src ) : limiter_node(src.my_graph, src.my_threshold) {}
2290 
2291     //! The interface for accessing internal receiver< DecrementType > that adjusts the count
2292     receiver<DecrementType>& decrementer() { return decrement; }
2293 
2294     //! Replace the current successor with this new successor
2295     bool register_successor( successor_type &r ) override {
2296         spin_mutex::scoped_lock lock(my_mutex);
2297         bool was_empty = my_successors.empty();
2298         my_successors.register_successor(r);
2299         //spawn a forward task if this is the only successor
2300         if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
2301             if ( is_graph_active(this->my_graph) ) {
2302                 d1::small_object_allocator allocator{};
2303                 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2304                 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
2305                 spawn_in_graph_arena(graph_reference(), *t);
2306             }
2307         }
2308         return true;
2309     }
2310 
2311     //! Removes a successor from this node
2312     /** r.remove_predecessor(*this) is also called. */
2313     bool remove_successor( successor_type &r ) override {
2314         // TODO revamp: investigate why qualification is needed for remove_predecessor() call
2315         tbb::detail::d2::remove_predecessor(r, *this);
2316         my_successors.remove_successor(r);
2317         return true;
2318     }
2319 
2320     //! Adds src to the list of cached predecessors.
2321     bool register_predecessor( predecessor_type &src ) override {
2322         spin_mutex::scoped_lock lock(my_mutex);
2323         my_predecessors.add( src );
2324         if ( my_count + my_tries < my_threshold && !my_successors.empty() && is_graph_active(this->my_graph) ) {
2325             d1::small_object_allocator allocator{};
2326             typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2327             graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
2328             spawn_in_graph_arena(graph_reference(), *t);
2329         }
2330         return true;
2331     }
2332 
2333     //! Removes src from the list of cached predecessors.
2334     bool remove_predecessor( predecessor_type &src ) override {
2335         my_predecessors.remove( src );
2336         return true;
2337     }
2338 
2339 protected:
2340 
2341     template< typename R, typename B > friend class run_and_put_task;
2342     template<typename X, typename Y> friend class broadcast_cache;
2343     template<typename X, typename Y> friend class round_robin_cache;
2344 
2345 private:
2346     //! Puts an item to this receiver
2347     graph_task* try_put_task_impl( const T &t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) {
2348         {
2349             spin_mutex::scoped_lock lock(my_mutex);
2350             if ( my_count + my_tries >= my_threshold )
2351                 return nullptr;
2352             else
2353                 ++my_tries;
2354         }
2355 
2356         graph_task* rtask = my_successors.try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
2357         if ( !rtask ) {  // try_put_task failed.
2358             spin_mutex::scoped_lock lock(my_mutex);
2359             --my_tries;
2360             if (check_conditions() && is_graph_active(this->my_graph)) {
2361                 d1::small_object_allocator allocator{};
2362                 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2363                 rtask = allocator.new_object<task_type>(my_graph, allocator, *this);
2364             }
2365         }
2366         else {
2367             spin_mutex::scoped_lock lock(my_mutex);
2368             ++my_count;
2369             if ( my_future_decrement ) {
2370                 if ( my_count > my_future_decrement ) {
2371                     my_count -= my_future_decrement;
2372                     my_future_decrement = 0;
2373                 }
2374                 else {
2375                     my_future_decrement -= my_count;
2376                     my_count = 0;
2377                 }
2378             }
2379             --my_tries;
2380         }
2381         return rtask;
2382     }
2383 
2384 protected:
2385     graph_task* try_put_task(const T& t) override {
2386         return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
2387     }
2388 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
2389     graph_task* try_put_task(const T& t, const message_metainfo& metainfo) override {
2390         return try_put_task_impl(t, metainfo);
2391     }
2392 #endif
2393 
2394     graph& graph_reference() const override { return my_graph; }
2395 
2396     void reset_node( reset_flags f ) override {
2397         my_count = 0;
2398         if ( f & rf_clear_edges ) {
2399             my_predecessors.clear();
2400             my_successors.clear();
2401         }
2402         else {
2403             my_predecessors.reset();
2404         }
2405         decrement.reset_receiver(f);
2406     }
2407 };  // limiter_node
2408 
2409 #include "detail/_flow_graph_join_impl.h"
2410 
2411 template<typename OutputTuple, typename JP=queueing> class join_node;
2412 
2413 template<typename OutputTuple>
2414 class join_node<OutputTuple,reserving>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
2415 private:
2416     static const int N = std::tuple_size<OutputTuple>::value;
2417     typedef unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
2418 public:
2419     typedef OutputTuple output_type;
2420     typedef typename unfolded_type::input_ports_type input_ports_type;
2421      __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
2422         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2423                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2424     }
2425 
2426 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2427     template <typename... Args>
2428     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) {
2429         make_edges_in_order(nodes, *this);
2430     }
2431 #endif
2432 
2433     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2434         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2435                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2436     }
2437 
2438 };
2439 
2440 template<typename OutputTuple>
2441 class join_node<OutputTuple,queueing>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
2442 private:
2443     static const int N = std::tuple_size<OutputTuple>::value;
2444     typedef unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
2445 public:
2446     typedef OutputTuple output_type;
2447     typedef typename unfolded_type::input_ports_type input_ports_type;
2448      __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
2449         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2450                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2451     }
2452 
2453 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2454     template <typename... Args>
2455     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) {
2456         make_edges_in_order(nodes, *this);
2457     }
2458 #endif
2459 
2460     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2461         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2462                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2463     }
2464 
2465 };
2466 
2467 #if __TBB_CPP20_CONCEPTS_PRESENT
2468 // Helper function which is well-formed only if all of the elements in OutputTuple
2469 // satisfies join_node_function_object<body[i], tuple[i], K>
2470 template <typename OutputTuple, typename K,
2471           typename... Functions, std::size_t... Idx>
2472 void join_node_function_objects_helper( std::index_sequence<Idx...> )
2473     requires (std::tuple_size_v<OutputTuple> == sizeof...(Functions)) &&
2474              (... && join_node_function_object<Functions, std::tuple_element_t<Idx, OutputTuple>, K>);
2475 
2476 template <typename OutputTuple, typename K, typename... Functions>
2477 concept join_node_functions = requires {
2478     join_node_function_objects_helper<OutputTuple, K, Functions...>(std::make_index_sequence<sizeof...(Functions)>{});
2479 };
2480 
2481 #endif
2482 
2483 // template for key_matching join_node
2484 // tag_matching join_node is a specialization of key_matching, and is source-compatible.
2485 template<typename OutputTuple, typename K, typename KHash>
2486 class join_node<OutputTuple, key_matching<K, KHash> > : public unfolded_join_node<std::tuple_size<OutputTuple>::value,
2487       key_matching_port, OutputTuple, key_matching<K,KHash> > {
2488 private:
2489     static const int N = std::tuple_size<OutputTuple>::value;
2490     typedef unfolded_join_node<N, key_matching_port, OutputTuple, key_matching<K,KHash> > unfolded_type;
2491 public:
2492     typedef OutputTuple output_type;
2493     typedef typename unfolded_type::input_ports_type input_ports_type;
2494 
2495 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
2496     join_node(graph &g) : unfolded_type(g) {}
2497 #endif  /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
2498 
2499     template<typename __TBB_B0, typename __TBB_B1>
2500         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1>)
2501      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
2502         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2503                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2504     }
2505     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
2506         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2>)
2507      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
2508         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2509                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2510     }
2511     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
2512         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3>)
2513      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
2514         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2515                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2516     }
2517     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
2518         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4>)
2519      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
2520             unfolded_type(g, b0, b1, b2, b3, b4) {
2521         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2522                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2523     }
2524 #if __TBB_VARIADIC_MAX >= 6
2525     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2526         typename __TBB_B5>
2527         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5>)
2528      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
2529             unfolded_type(g, b0, b1, b2, b3, b4, b5) {
2530         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2531                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2532     }
2533 #endif
2534 #if __TBB_VARIADIC_MAX >= 7
2535     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2536         typename __TBB_B5, typename __TBB_B6>
2537         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6>)
2538      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
2539             unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
2540         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2541                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2542     }
2543 #endif
2544 #if __TBB_VARIADIC_MAX >= 8
2545     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2546         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
2547         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7>)
2548      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2549             __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
2550         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2551                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2552     }
2553 #endif
2554 #if __TBB_VARIADIC_MAX >= 9
2555     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2556         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
2557         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8>)
2558      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2559             __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
2560         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2561                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2562     }
2563 #endif
2564 #if __TBB_VARIADIC_MAX >= 10
2565     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2566         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
2567         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8, __TBB_B9>)
2568      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2569             __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
2570         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2571                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2572     }
2573 #endif
2574 
2575 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2576     template <
2577 #if (__clang_major__ == 3 && __clang_minor__ == 4)
2578         // clang 3.4 misdeduces 'Args...' for 'node_set' while it can cope with template template parameter.
2579         template<typename...> class node_set,
2580 #endif
2581         typename... Args, typename... Bodies
2582     >
2583     __TBB_requires((sizeof...(Bodies) == 0) || join_node_functions<OutputTuple, K, Bodies...>)
2584     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies)
2585         : join_node(nodes.graph_reference(), bodies...) {
2586         make_edges_in_order(nodes, *this);
2587     }
2588 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2589 
2590     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2591         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2592                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2593     }
2594 
2595 };
2596 
2597 // indexer node
2598 #include "detail/_flow_graph_indexer_impl.h"
2599 
2600 // TODO: Implement interface with variadic template or tuple
2601 template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
2602                       typename T4=null_type, typename T5=null_type, typename T6=null_type,
2603                       typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
2604 
2605 //indexer node specializations
2606 template<typename T0>
2607 class indexer_node<T0> : public unfolded_indexer_node<std::tuple<T0> > {
2608 private:
2609     static const int N = 1;
2610 public:
2611     typedef std::tuple<T0> InputTuple;
2612     typedef tagged_msg<size_t, T0> output_type;
2613     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2614     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2615         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2616                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2617     }
2618 
2619 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2620     template <typename... Args>
2621     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2622         make_edges_in_order(nodes, *this);
2623     }
2624 #endif
2625 
2626     // Copy constructor
2627     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2628         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2629                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2630     }
2631 };
2632 
2633 template<typename T0, typename T1>
2634 class indexer_node<T0, T1> : public unfolded_indexer_node<std::tuple<T0, T1> > {
2635 private:
2636     static const int N = 2;
2637 public:
2638     typedef std::tuple<T0, T1> InputTuple;
2639     typedef tagged_msg<size_t, T0, T1> output_type;
2640     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2641     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2642         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2643                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2644     }
2645 
2646 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2647     template <typename... Args>
2648     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2649         make_edges_in_order(nodes, *this);
2650     }
2651 #endif
2652 
2653     // Copy constructor
2654     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2655         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2656                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2657     }
2658 
2659 };
2660 
2661 template<typename T0, typename T1, typename T2>
2662 class indexer_node<T0, T1, T2> : public unfolded_indexer_node<std::tuple<T0, T1, T2> > {
2663 private:
2664     static const int N = 3;
2665 public:
2666     typedef std::tuple<T0, T1, T2> InputTuple;
2667     typedef tagged_msg<size_t, T0, T1, T2> output_type;
2668     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2669     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2670         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2671                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2672     }
2673 
2674 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2675     template <typename... Args>
2676     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2677         make_edges_in_order(nodes, *this);
2678     }
2679 #endif
2680 
2681     // Copy constructor
2682     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2683         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2684                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2685     }
2686 
2687 };
2688 
2689 template<typename T0, typename T1, typename T2, typename T3>
2690 class indexer_node<T0, T1, T2, T3> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3> > {
2691 private:
2692     static const int N = 4;
2693 public:
2694     typedef std::tuple<T0, T1, T2, T3> InputTuple;
2695     typedef tagged_msg<size_t, T0, T1, T2, T3> output_type;
2696     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2697     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2698         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2699                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2700     }
2701 
2702 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2703     template <typename... Args>
2704     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2705         make_edges_in_order(nodes, *this);
2706     }
2707 #endif
2708 
2709     // Copy constructor
2710     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2711         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2712                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2713     }
2714 
2715 };
2716 
2717 template<typename T0, typename T1, typename T2, typename T3, typename T4>
2718 class indexer_node<T0, T1, T2, T3, T4> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4> > {
2719 private:
2720     static const int N = 5;
2721 public:
2722     typedef std::tuple<T0, T1, T2, T3, T4> InputTuple;
2723     typedef tagged_msg<size_t, T0, T1, T2, T3, T4> output_type;
2724     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2725     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2726         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2727                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2728     }
2729 
2730 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2731     template <typename... Args>
2732     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2733         make_edges_in_order(nodes, *this);
2734     }
2735 #endif
2736 
2737     // Copy constructor
2738     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2739         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2740                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2741     }
2742 
2743 };
2744 
2745 #if __TBB_VARIADIC_MAX >= 6
2746 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
2747 class indexer_node<T0, T1, T2, T3, T4, T5> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5> > {
2748 private:
2749     static const int N = 6;
2750 public:
2751     typedef std::tuple<T0, T1, T2, T3, T4, T5> InputTuple;
2752     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5> output_type;
2753     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2754     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2755         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2756                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2757     }
2758 
2759 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2760     template <typename... Args>
2761     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2762         make_edges_in_order(nodes, *this);
2763     }
2764 #endif
2765 
2766     // Copy constructor
2767     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2768         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2769                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2770     }
2771 
2772 };
2773 #endif //variadic max 6
2774 
2775 #if __TBB_VARIADIC_MAX >= 7
2776 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2777          typename T6>
2778 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6> > {
2779 private:
2780     static const int N = 7;
2781 public:
2782     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
2783     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6> output_type;
2784     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2785     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2786         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2787                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2788     }
2789 
2790 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2791     template <typename... Args>
2792     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2793         make_edges_in_order(nodes, *this);
2794     }
2795 #endif
2796 
2797     // Copy constructor
2798     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2799         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2800                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2801     }
2802 
2803 };
2804 #endif //variadic max 7
2805 
2806 #if __TBB_VARIADIC_MAX >= 8
2807 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2808          typename T6, typename T7>
2809 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
2810 private:
2811     static const int N = 8;
2812 public:
2813     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
2814     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7> output_type;
2815     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2816     indexer_node(graph& g) : unfolded_type(g) {
2817         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2818                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2819     }
2820 
2821 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2822     template <typename... Args>
2823     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2824         make_edges_in_order(nodes, *this);
2825     }
2826 #endif
2827 
2828     // Copy constructor
2829     indexer_node( const indexer_node& other ) : unfolded_type(other) {
2830         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2831                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2832     }
2833 
2834 };
2835 #endif //variadic max 8
2836 
2837 #if __TBB_VARIADIC_MAX >= 9
2838 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2839          typename T6, typename T7, typename T8>
2840 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
2841 private:
2842     static const int N = 9;
2843 public:
2844     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
2845     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8> output_type;
2846     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2847     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2848         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2849                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2850     }
2851 
2852 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2853     template <typename... Args>
2854     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2855         make_edges_in_order(nodes, *this);
2856     }
2857 #endif
2858 
2859     // Copy constructor
2860     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2861         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2862                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2863     }
2864 
2865 };
2866 #endif //variadic max 9
2867 
2868 #if __TBB_VARIADIC_MAX >= 10
2869 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2870          typename T6, typename T7, typename T8, typename T9>
2871 class indexer_node/*default*/ : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
2872 private:
2873     static const int N = 10;
2874 public:
2875     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
2876     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
2877     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2878     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2879         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2880                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2881     }
2882 
2883 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2884     template <typename... Args>
2885     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2886         make_edges_in_order(nodes, *this);
2887     }
2888 #endif
2889 
2890     // Copy constructor
2891     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2892         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2893                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2894     }
2895 
2896 };
2897 #endif //variadic max 10
2898 
2899 template< typename T >
2900 inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
2901     register_successor(p, s);
2902     fgt_make_edge( &p, &s );
2903 }
2904 
2905 //! Makes an edge between a single predecessor and a single successor
2906 template< typename T >
2907 inline void make_edge( sender<T> &p, receiver<T> &s ) {
2908     internal_make_edge( p, s );
2909 }
2910 
2911 //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
2912 template< typename T, typename V,
2913           typename = typename T::output_ports_type, typename = typename V::input_ports_type >
2914 inline void make_edge( T& output, V& input) {
2915     make_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports()));
2916 }
2917 
2918 //Makes an edge from port 0 of a multi-output predecessor to a receiver.
2919 template< typename T, typename R,
2920           typename = typename T::output_ports_type >
2921 inline void make_edge( T& output, receiver<R>& input) {
2922      make_edge(std::get<0>(output.output_ports()), input);
2923 }
2924 
2925 //Makes an edge from a sender to port 0 of a multi-input successor.
2926 template< typename S,  typename V,
2927           typename = typename V::input_ports_type >
2928 inline void make_edge( sender<S>& output, V& input) {
2929      make_edge(output, std::get<0>(input.input_ports()));
2930 }
2931 
2932 template< typename T >
2933 inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
2934     remove_successor( p, s );
2935     fgt_remove_edge( &p, &s );
2936 }
2937 
2938 //! Removes an edge between a single predecessor and a single successor
2939 template< typename T >
2940 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
2941     internal_remove_edge( p, s );
2942 }
2943 
2944 //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
2945 template< typename T, typename V,
2946           typename = typename T::output_ports_type, typename = typename V::input_ports_type >
2947 inline void remove_edge( T& output, V& input) {
2948     remove_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports()));
2949 }
2950 
2951 //Removes an edge between port 0 of a multi-output predecessor and a receiver.
2952 template< typename T, typename R,
2953           typename = typename T::output_ports_type >
2954 inline void remove_edge( T& output, receiver<R>& input) {
2955      remove_edge(std::get<0>(output.output_ports()), input);
2956 }
2957 //Removes an edge between a sender and port 0 of a multi-input successor.
2958 template< typename S,  typename V,
2959           typename = typename V::input_ports_type >
2960 inline void remove_edge( sender<S>& output, V& input) {
2961      remove_edge(output, std::get<0>(input.input_ports()));
2962 }
2963 
2964 //! Returns a copy of the body from a function or continue node
2965 template< typename Body, typename Node >
2966 Body copy_body( Node &n ) {
2967     return n.template copy_function_object<Body>();
2968 }
2969 
2970 //composite_node
2971 template< typename InputTuple, typename OutputTuple > class composite_node;
2972 
2973 template< typename... InputTypes, typename... OutputTypes>
2974 class composite_node <std::tuple<InputTypes...>, std::tuple<OutputTypes...> > : public graph_node {
2975 
2976 public:
2977     typedef std::tuple< receiver<InputTypes>&... > input_ports_type;
2978     typedef std::tuple< sender<OutputTypes>&... > output_ports_type;
2979 
2980 private:
2981     std::unique_ptr<input_ports_type> my_input_ports;
2982     std::unique_ptr<output_ports_type> my_output_ports;
2983 
2984     static const size_t NUM_INPUTS = sizeof...(InputTypes);
2985     static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
2986 
2987 protected:
2988     void reset_node(reset_flags) override {}
2989 
2990 public:
2991     composite_node( graph &g ) : graph_node(g) {
2992         fgt_multiinput_multioutput_node( CODEPTR(), FLOW_COMPOSITE_NODE, this, &this->my_graph );
2993     }
2994 
2995     template<typename T1, typename T2>
2996     void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
2997         static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
2998         static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
2999 
3000         fgt_internal_input_alias_helper<T1, NUM_INPUTS>::alias_port( this, input_ports_tuple);
3001         fgt_internal_output_alias_helper<T2, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
3002 
3003         my_input_ports.reset( new input_ports_type(std::forward<T1>(input_ports_tuple)) );
3004         my_output_ports.reset( new output_ports_type(std::forward<T2>(output_ports_tuple)) );
3005     }
3006 
3007     template< typename... NodeTypes >
3008     void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
3009 
3010     template< typename... NodeTypes >
3011     void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
3012 
3013 
3014     input_ports_type& input_ports() {
3015          __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
3016          return *my_input_ports;
3017     }
3018 
3019     output_ports_type& output_ports() {
3020          __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
3021          return *my_output_ports;
3022     }
3023 };  // class composite_node
3024 
3025 //composite_node with only input ports
3026 template< typename... InputTypes>
3027 class composite_node <std::tuple<InputTypes...>, std::tuple<> > : public graph_node {
3028 public:
3029     typedef std::tuple< receiver<InputTypes>&... > input_ports_type;
3030 
3031 private:
3032     std::unique_ptr<input_ports_type> my_input_ports;
3033     static const size_t NUM_INPUTS = sizeof...(InputTypes);
3034 
3035 protected:
3036     void reset_node(reset_flags) override {}
3037 
3038 public:
3039     composite_node( graph &g ) : graph_node(g) {
3040         fgt_composite( CODEPTR(), this, &g );
3041     }
3042 
3043    template<typename T>
3044    void set_external_ports(T&& input_ports_tuple) {
3045        static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
3046 
3047        fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, input_ports_tuple);
3048 
3049        my_input_ports.reset( new input_ports_type(std::forward<T>(input_ports_tuple)) );
3050    }
3051 
3052     template< typename... NodeTypes >
3053     void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
3054 
3055     template< typename... NodeTypes >
3056     void add_nodes( const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
3057 
3058 
3059     input_ports_type& input_ports() {
3060          __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
3061          return *my_input_ports;
3062     }
3063 
3064 };  // class composite_node
3065 
3066 //composite_nodes with only output_ports
3067 template<typename... OutputTypes>
3068 class composite_node <std::tuple<>, std::tuple<OutputTypes...> > : public graph_node {
3069 public:
3070     typedef std::tuple< sender<OutputTypes>&... > output_ports_type;
3071 
3072 private:
3073     std::unique_ptr<output_ports_type> my_output_ports;
3074     static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3075 
3076 protected:
3077     void reset_node(reset_flags) override {}
3078 
3079 public:
3080     __TBB_NOINLINE_SYM composite_node( graph &g ) : graph_node(g) {
3081         fgt_composite( CODEPTR(), this, &g );
3082     }
3083 
3084    template<typename T>
3085    void set_external_ports(T&& output_ports_tuple) {
3086        static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
3087 
3088        fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
3089 
3090        my_output_ports.reset( new output_ports_type(std::forward<T>(output_ports_tuple)) );
3091    }
3092 
3093     template<typename... NodeTypes >
3094     void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
3095 
3096     template<typename... NodeTypes >
3097     void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
3098 
3099 
3100     output_ports_type& output_ports() {
3101          __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
3102          return *my_output_ports;
3103     }
3104 
3105 };  // class composite_node
3106 
3107 template<typename Gateway>
3108 class async_body_base: no_assign {
3109 public:
3110     typedef Gateway gateway_type;
3111 
3112     async_body_base(gateway_type *gateway): my_gateway(gateway) { }
3113     void set_gateway(gateway_type *gateway) {
3114         my_gateway = gateway;
3115     }
3116 
3117 protected:
3118     gateway_type *my_gateway;
3119 };
3120 
3121 template<typename Input, typename Ports, typename Gateway, typename Body>
3122 class async_body: public async_body_base<Gateway> {
3123 private:
3124     Body my_body;
3125 
3126 public:
3127     typedef async_body_base<Gateway> base_type;
3128     typedef Gateway gateway_type;
3129 
3130     async_body(const Body &body, gateway_type *gateway)
3131         : base_type(gateway), my_body(body) { }
3132 
3133     void operator()( const Input &v, Ports & ) noexcept(noexcept(tbb::detail::invoke(my_body, v, std::declval<gateway_type&>()))) {
3134         tbb::detail::invoke(my_body, v, *this->my_gateway);
3135     }
3136 
3137     Body get_body() { return my_body; }
3138 };
3139 
3140 //! Implements async node
3141 template < typename Input, typename Output,
3142            typename Policy = queueing_lightweight >
3143     __TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input>)
3144 class async_node
3145     : public multifunction_node< Input, std::tuple< Output >, Policy >, public sender< Output >
3146 {
3147     typedef multifunction_node< Input, std::tuple< Output >, Policy > base_type;
3148     typedef multifunction_input<
3149         Input, typename base_type::output_ports_type, Policy, cache_aligned_allocator<Input>> mfn_input_type;
3150 
3151 public:
3152     typedef Input input_type;
3153     typedef Output output_type;
3154     typedef receiver<input_type> receiver_type;
3155     typedef receiver<output_type> successor_type;
3156     typedef sender<input_type> predecessor_type;
3157     typedef receiver_gateway<output_type> gateway_type;
3158     typedef async_body_base<gateway_type> async_body_base_type;
3159     typedef typename base_type::output_ports_type output_ports_type;
3160 
3161 private:
3162     class receiver_gateway_impl: public receiver_gateway<Output> {
3163     public:
3164         receiver_gateway_impl(async_node* node): my_node(node) {}
3165         void reserve_wait() override {
3166             fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
3167             my_node->my_graph.reserve_wait();
3168         }
3169 
3170         void release_wait() override {
3171             async_node* n = my_node;
3172             graph* g = &n->my_graph;
3173             g->release_wait();
3174             fgt_async_commit(static_cast<typename async_node::receiver_type *>(n), g);
3175         }
3176 
3177         //! Implements gateway_type::try_put for an external activity to submit a message to FG
3178         bool try_put(const Output &i) override {
3179             return my_node->try_put_impl(i);
3180         }
3181 
3182     private:
3183         async_node* my_node;
3184     } my_gateway;
3185 
3186     //The substitute of 'this' for member construction, to prevent compiler warnings
3187     async_node* self() { return this; }
3188 
3189     //! Implements gateway_type::try_put for an external activity to submit a message to FG
3190     bool try_put_impl(const Output &i) {
3191         multifunction_output<Output> &port_0 = output_port<0>(*this);
3192         broadcast_cache<output_type>& port_successors = port_0.successors();
3193         fgt_async_try_put_begin(this, &port_0);
3194         // TODO revamp: change to std::list<graph_task*>
3195         graph_task_list tasks;
3196         bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
3197         __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
3198                       "Return status is inconsistent with the method operation." );
3199 
3200         while( !tasks.empty() ) {
3201             enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
3202         }
3203         fgt_async_try_put_end(this, &port_0);
3204         return is_at_least_one_put_successful;
3205     }
3206 
3207 public:
3208     template<typename Body>
3209         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
3210     __TBB_NOINLINE_SYM async_node(
3211         graph &g, size_t concurrency,
3212         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
3213     ) : base_type(
3214         g, concurrency,
3215         async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
3216         (body, &my_gateway), a_priority ), my_gateway(self()) {
3217         fgt_multioutput_node_with_body<1>(
3218             CODEPTR(), FLOW_ASYNC_NODE,
3219             &this->my_graph, static_cast<receiver<input_type> *>(this),
3220             this->output_ports(), this->my_body
3221         );
3222     }
3223 
3224     template <typename Body>
3225         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
3226     __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority)
3227         : async_node(g, concurrency, body, Policy(), a_priority) {}
3228 
3229 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3230     template <typename Body, typename... Args>
3231         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
3232     __TBB_NOINLINE_SYM async_node(
3233         const node_set<Args...>& nodes, size_t concurrency, Body body,
3234         Policy = Policy(), node_priority_t a_priority = no_priority )
3235         : async_node(nodes.graph_reference(), concurrency, body, a_priority) {
3236         make_edges_in_order(nodes, *this);
3237     }
3238 
3239     template <typename Body, typename... Args>
3240         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
3241     __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority)
3242         : async_node(nodes, concurrency, body, Policy(), a_priority) {}
3243 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3244 
3245     __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
3246         static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
3247         static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
3248 
3249         fgt_multioutput_node_with_body<1>( CODEPTR(), FLOW_ASYNC_NODE,
3250                 &this->my_graph, static_cast<receiver<input_type> *>(this),
3251                 this->output_ports(), this->my_body );
3252     }
3253 
3254     gateway_type& gateway() {
3255         return my_gateway;
3256     }
3257 
3258     // Define sender< Output >
3259 
3260     //! Add a new successor to this node
3261     bool register_successor(successor_type&) override {
3262         __TBB_ASSERT(false, "Successors must be registered only via ports");
3263         return false;
3264     }
3265 
3266     //! Removes a successor from this node
3267     bool remove_successor(successor_type&) override {
3268         __TBB_ASSERT(false, "Successors must be removed only via ports");
3269         return false;
3270     }
3271 
3272     template<typename Body>
3273     Body copy_function_object() {
3274         typedef multifunction_body<input_type, typename base_type::output_ports_type> mfn_body_type;
3275         typedef async_body<Input, typename base_type::output_ports_type, gateway_type, Body> async_body_type;
3276         mfn_body_type &body_ref = *this->my_body;
3277         async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
3278         return ab.get_body();
3279     }
3280 
3281 protected:
3282 
3283     void reset_node( reset_flags f) override {
3284        base_type::reset_node(f);
3285     }
3286 };
3287 
3288 #include "detail/_flow_graph_node_set_impl.h"
3289 
3290 template< typename T >
3291 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
3292 public:
3293     typedef T input_type;
3294     typedef T output_type;
3295     typedef typename receiver<input_type>::predecessor_type predecessor_type;
3296     typedef typename sender<output_type>::successor_type successor_type;
3297 
3298     __TBB_NOINLINE_SYM explicit overwrite_node(graph &g)
3299         : graph_node(g), my_successors(this), my_buffer_is_valid(false)
3300     {
3301         fgt_node( CODEPTR(), FLOW_OVERWRITE_NODE, &this->my_graph,
3302                   static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
3303     }
3304 
3305 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3306     template <typename... Args>
3307     overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) {
3308         make_edges_in_order(nodes, *this);
3309     }
3310 #endif
3311 
3312     //! Copy constructor; doesn't take anything from src; default won't work
3313     __TBB_NOINLINE_SYM overwrite_node( const overwrite_node& src ) : overwrite_node(src.my_graph) {}
3314 
3315     ~overwrite_node() {}
3316 
3317     bool register_successor( successor_type &s ) override {
3318         spin_mutex::scoped_lock l( my_mutex );
3319         if (my_buffer_is_valid && is_graph_active( my_graph )) {
3320             // We have a valid value that must be forwarded immediately.
3321             bool ret = s.try_put( my_buffer );
3322             if ( ret ) {
3323                 // We add the successor that accepted our put
3324                 my_successors.register_successor( s );
3325             } else {
3326                 // In case of reservation a race between the moment of reservation and register_successor can appear,
3327                 // because failed reserve does not mean that register_successor is not ready to put a message immediately.
3328                 // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
3329                 // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
3330                 d1::small_object_allocator allocator{};
3331                 typedef register_predecessor_task task_type;
3332                 graph_task* t = allocator.new_object<task_type>(graph_reference(), allocator, *this, s);
3333                 spawn_in_graph_arena( my_graph, *t );
3334             }
3335         } else {
3336             // No valid value yet, just add as successor
3337             my_successors.register_successor( s );
3338         }
3339         return true;
3340     }
3341 
3342     bool remove_successor( successor_type &s ) override {
3343         spin_mutex::scoped_lock l( my_mutex );
3344         my_successors.remove_successor(s);
3345         return true;
3346     }
3347 
3348     bool try_get( input_type &v ) override {
3349         spin_mutex::scoped_lock l( my_mutex );
3350         if ( my_buffer_is_valid ) {
3351             v = my_buffer;
3352             return true;
3353         }
3354         return false;
3355     }
3356 
3357 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
3358     bool try_get( input_type &v, message_metainfo& metainfo ) override {
3359         spin_mutex::scoped_lock l( my_mutex );
3360         if (my_buffer_is_valid) {
3361             v = my_buffer;
3362             metainfo = my_buffered_metainfo;
3363 
3364             // Since the successor of the node will use move semantics while wrapping the metainfo
3365             // that is designed to transfer the ownership of the value from single-push buffer to the task
3366             // It is required to reserve one more reference here because the value keeps in the buffer
3367             // and the ownership is not transferred
3368             for (auto msg_waiter : metainfo.waiters()) {
3369                 msg_waiter->reserve(1);
3370             }
3371             return true;
3372         }
3373         return false;
3374     }
3375 #endif
3376 
3377     //! Reserves an item
3378     bool try_reserve( T &v ) override {
3379         return try_get(v);
3380     }
3381 
3382 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
3383 private:
3384     bool try_reserve(T& v, message_metainfo& metainfo) override {
3385         spin_mutex::scoped_lock l( my_mutex );
3386         if (my_buffer_is_valid) {
3387             v = my_buffer;
3388             metainfo = my_buffered_metainfo;
3389             return true;
3390         }
3391         return false;
3392     }
3393 public:
3394 #endif
3395 
3396     //! Releases the reserved item
3397     bool try_release() override { return true; }
3398 
3399     //! Consumes the reserved item
3400     bool try_consume() override { return true; }
3401 
3402     bool is_valid() {
3403        spin_mutex::scoped_lock l( my_mutex );
3404        return my_buffer_is_valid;
3405     }
3406 
3407     void clear() {
3408        spin_mutex::scoped_lock l( my_mutex );
3409        my_buffer_is_valid = false;
3410 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
3411        for (auto msg_waiter : my_buffered_metainfo.waiters()) {
3412            msg_waiter->release(1);
3413        }
3414        my_buffered_metainfo = message_metainfo{};
3415 #endif
3416     }
3417 
3418 protected:
3419 
3420     template< typename R, typename B > friend class run_and_put_task;
3421     template<typename X, typename Y> friend class broadcast_cache;
3422     template<typename X, typename Y> friend class round_robin_cache;
3423     graph_task* try_put_task( const input_type &v ) override {
3424         spin_mutex::scoped_lock l( my_mutex );
3425         return try_put_task_impl(v __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
3426     }
3427 
3428 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
3429     graph_task* try_put_task(const input_type& v, const message_metainfo& metainfo) override {
3430         spin_mutex::scoped_lock l( my_mutex );
3431         return try_put_task_impl(v, metainfo);
3432     }
3433 #endif
3434 
3435     graph_task * try_put_task_impl(const input_type &v __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) {
3436         my_buffer = v;
3437         my_buffer_is_valid = true;
3438 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
3439         // Since the new item is pushed to the buffer - reserving the waiters
3440         for (auto msg_waiter : metainfo.waiters()) {
3441             msg_waiter->reserve(1);
3442         }
3443 
3444         // Since the item is taken out from the buffer - releasing the stored waiters
3445         for (auto msg_waiter : my_buffered_metainfo.waiters()) {
3446             msg_waiter->release(1);
3447         }
3448 
3449         my_buffered_metainfo = metainfo;
3450 #endif
3451         graph_task* rtask = my_successors.try_put_task(v __TBB_FLOW_GRAPH_METAINFO_ARG(my_buffered_metainfo) );
3452         if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
3453         return rtask;
3454     }
3455 
3456     graph& graph_reference() const override {
3457         return my_graph;
3458     }
3459 
3460     //! Breaks an infinite loop between the node reservation and register_successor call
3461     struct register_predecessor_task : public graph_task {
3462         register_predecessor_task(
3463             graph& g, d1::small_object_allocator& allocator, predecessor_type& owner, successor_type& succ)
3464             : graph_task(g, allocator), o(owner), s(succ) {};
3465 
3466         d1::task* execute(d1::execution_data& ed) override {
3467             // TODO revamp: investigate why qualification is needed for register_successor() call
3468             using tbb::detail::d2::register_predecessor;
3469             using tbb::detail::d2::register_successor;
3470             if ( !register_predecessor(s, o) ) {
3471                 register_successor(o, s);
3472             }
3473             finalize<register_predecessor_task>(ed);
3474             return nullptr;
3475         }
3476 
3477         d1::task* cancel(d1::execution_data& ed) override {
3478             finalize<register_predecessor_task>(ed);
3479             return nullptr;
3480         }
3481 
3482         predecessor_type& o;
3483         successor_type& s;
3484     };
3485 
3486     spin_mutex my_mutex;
3487     broadcast_cache< input_type, null_rw_mutex > my_successors;
3488     input_type my_buffer;
3489 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
3490     message_metainfo my_buffered_metainfo;
3491 #endif
3492     bool my_buffer_is_valid;
3493 
3494     void reset_node( reset_flags f) override {
3495         my_buffer_is_valid = false;
3496        if (f&rf_clear_edges) {
3497            my_successors.clear();
3498        }
3499     }
3500 };  // overwrite_node
3501 
3502 template< typename T >
3503 class write_once_node : public overwrite_node<T> {
3504 public:
3505     typedef T input_type;
3506     typedef T output_type;
3507     typedef overwrite_node<T> base_type;
3508     typedef typename receiver<input_type>::predecessor_type predecessor_type;
3509     typedef typename sender<output_type>::successor_type successor_type;
3510 
3511     //! Constructor
3512     __TBB_NOINLINE_SYM explicit write_once_node(graph& g) : base_type(g) {
3513         fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3514                                  static_cast<receiver<input_type> *>(this),
3515                                  static_cast<sender<output_type> *>(this) );
3516     }
3517 
3518 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3519     template <typename... Args>
3520     write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) {
3521         make_edges_in_order(nodes, *this);
3522     }
3523 #endif
3524 
3525     //! Copy constructor: call base class copy constructor
3526     __TBB_NOINLINE_SYM write_once_node( const write_once_node& src ) : base_type(src) {
3527         fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3528                                  static_cast<receiver<input_type> *>(this),
3529                                  static_cast<sender<output_type> *>(this) );
3530     }
3531 
3532 protected:
3533     template< typename R, typename B > friend class run_and_put_task;
3534     template<typename X, typename Y> friend class broadcast_cache;
3535     template<typename X, typename Y> friend class round_robin_cache;
3536     graph_task *try_put_task( const T &v ) override {
3537         spin_mutex::scoped_lock l( this->my_mutex );
3538         return this->my_buffer_is_valid ? nullptr : this->try_put_task_impl(v __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
3539     }
3540 
3541 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
3542     graph_task* try_put_task(const T& v, const message_metainfo& metainfo) override {
3543         spin_mutex::scoped_lock l( this->my_mutex );
3544         return this->my_buffer_is_valid ? nullptr : this->try_put_task_impl(v, metainfo);
3545     }
3546 #endif
3547 }; // write_once_node
3548 
3549 inline void set_name(const graph& g, const char *name) {
3550     fgt_graph_desc(&g, name);
3551 }
3552 
3553 template <typename Output>
3554 inline void set_name(const input_node<Output>& node, const char *name) {
3555     fgt_node_desc(&node, name);
3556 }
3557 
3558 template <typename Input, typename Output, typename Policy>
3559 inline void set_name(const function_node<Input, Output, Policy>& node, const char *name) {
3560     fgt_node_desc(&node, name);
3561 }
3562 
3563 template <typename Output, typename Policy>
3564 inline void set_name(const continue_node<Output,Policy>& node, const char *name) {
3565     fgt_node_desc(&node, name);
3566 }
3567 
3568 template <typename T>
3569 inline void set_name(const broadcast_node<T>& node, const char *name) {
3570     fgt_node_desc(&node, name);
3571 }
3572 
3573 template <typename T>
3574 inline void set_name(const buffer_node<T>& node, const char *name) {
3575     fgt_node_desc(&node, name);
3576 }
3577 
3578 template <typename T>
3579 inline void set_name(const queue_node<T>& node, const char *name) {
3580     fgt_node_desc(&node, name);
3581 }
3582 
3583 template <typename T>
3584 inline void set_name(const sequencer_node<T>& node, const char *name) {
3585     fgt_node_desc(&node, name);
3586 }
3587 
3588 template <typename T, typename Compare>
3589 inline void set_name(const priority_queue_node<T, Compare>& node, const char *name) {
3590     fgt_node_desc(&node, name);
3591 }
3592 
3593 template <typename T, typename DecrementType>
3594 inline void set_name(const limiter_node<T, DecrementType>& node, const char *name) {
3595     fgt_node_desc(&node, name);
3596 }
3597 
3598 template <typename OutputTuple, typename JP>
3599 inline void set_name(const join_node<OutputTuple, JP>& node, const char *name) {
3600     fgt_node_desc(&node, name);
3601 }
3602 
3603 template <typename... Types>
3604 inline void set_name(const indexer_node<Types...>& node, const char *name) {
3605     fgt_node_desc(&node, name);
3606 }
3607 
3608 template <typename T>
3609 inline void set_name(const overwrite_node<T>& node, const char *name) {
3610     fgt_node_desc(&node, name);
3611 }
3612 
3613 template <typename T>
3614 inline void set_name(const write_once_node<T>& node, const char *name) {
3615     fgt_node_desc(&node, name);
3616 }
3617 
3618 template<typename Input, typename Output, typename Policy>
3619 inline void set_name(const multifunction_node<Input, Output, Policy>& node, const char *name) {
3620     fgt_multioutput_node_desc(&node, name);
3621 }
3622 
3623 template<typename TupleType>
3624 inline void set_name(const split_node<TupleType>& node, const char *name) {
3625     fgt_multioutput_node_desc(&node, name);
3626 }
3627 
3628 template< typename InputTuple, typename OutputTuple >
3629 inline void set_name(const composite_node<InputTuple, OutputTuple>& node, const char *name) {
3630     fgt_multiinput_multioutput_node_desc(&node, name);
3631 }
3632 
3633 template<typename Input, typename Output, typename Policy>
3634 inline void set_name(const async_node<Input, Output, Policy>& node, const char *name)
3635 {
3636     fgt_multioutput_node_desc(&node, name);
3637 }
3638 } // d2
3639 } // detail
3640 } // tbb
3641 
3642 
3643 // Include deduction guides for node classes
3644 #include "detail/_flow_graph_nodes_deduction.h"
3645 
3646 namespace tbb {
3647 namespace flow {
3648 inline namespace v1 {
3649     using detail::d2::receiver;
3650     using detail::d2::sender;
3651 
3652     using detail::d2::serial;
3653     using detail::d2::unlimited;
3654 
3655     using detail::d2::reset_flags;
3656     using detail::d2::rf_reset_protocol;
3657     using detail::d2::rf_reset_bodies;
3658     using detail::d2::rf_clear_edges;
3659 
3660     using detail::d2::graph;
3661     using detail::d2::graph_node;
3662     using detail::d2::continue_msg;
3663 
3664     using detail::d2::input_node;
3665     using detail::d2::function_node;
3666     using detail::d2::multifunction_node;
3667     using detail::d2::split_node;
3668     using detail::d2::output_port;
3669     using detail::d2::indexer_node;
3670     using detail::d2::tagged_msg;
3671     using detail::d2::cast_to;
3672     using detail::d2::is_a;
3673     using detail::d2::continue_node;
3674     using detail::d2::overwrite_node;
3675     using detail::d2::write_once_node;
3676     using detail::d2::broadcast_node;
3677     using detail::d2::buffer_node;
3678     using detail::d2::queue_node;
3679     using detail::d2::sequencer_node;
3680     using detail::d2::priority_queue_node;
3681     using detail::d2::limiter_node;
3682     using namespace detail::d2::graph_policy_namespace;
3683     using detail::d2::join_node;
3684     using detail::d2::input_port;
3685     using detail::d2::copy_body;
3686     using detail::d2::make_edge;
3687     using detail::d2::remove_edge;
3688     using detail::d2::tag_value;
3689     using detail::d2::composite_node;
3690     using detail::d2::async_node;
3691     using detail::d2::node_priority_t;
3692     using detail::d2::no_priority;
3693 
3694 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3695     using detail::d2::follows;
3696     using detail::d2::precedes;
3697     using detail::d2::make_node_set;
3698     using detail::d2::make_edges;
3699 #endif
3700 
3701 } // v1
3702 } // flow
3703 
3704     using detail::d1::flow_control;
3705 
3706 namespace profiling {
3707     using detail::d2::set_name;
3708 } // profiling
3709 
3710 } // tbb
3711 
3712 
3713 #if TBB_USE_PROFILING_TOOLS  && ( __unix__ || __APPLE__ )
3714    // We don't do pragma pop here, since it still gives warning on the USER side
3715    #undef __TBB_NOINLINE_SYM
3716 #endif
3717 
3718 #endif // __TBB_flow_graph_H