Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:12:56

0001 /*
0002     Copyright (c) 2005-2020 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB_flow_graph_H
0018 #define __TBB_flow_graph_H
0019 
0020 #define __TBB_flow_graph_H_include_area
0021 #include "internal/_warning_suppress_enable_notice.h"
0022 
0023 #include "tbb_stddef.h"
0024 #include "atomic.h"
0025 #include "spin_mutex.h"
0026 #include "null_mutex.h"
0027 #include "spin_rw_mutex.h"
0028 #include "null_rw_mutex.h"
0029 #include "task.h"
0030 #include "cache_aligned_allocator.h"
0031 #include "tbb_exception.h"
0032 #include "pipeline.h"
0033 #include "internal/_template_helpers.h"
0034 #include "internal/_aggregator_impl.h"
0035 #include "tbb/internal/_allocator_traits.h"
0036 #include "tbb_profiling.h"
0037 #include "task_arena.h"
0038 
0039 #if TBB_USE_THREADING_TOOLS && TBB_PREVIEW_FLOW_GRAPH_TRACE && ( __linux__ || __APPLE__ )
0040    #if __INTEL_COMPILER
0041        // Disabled warning "routine is both inline and noinline"
0042        #pragma warning (push)
0043        #pragma warning( disable: 2196 )
0044    #endif
0045    #define __TBB_NOINLINE_SYM __attribute__((noinline))
0046 #else
0047    #define __TBB_NOINLINE_SYM
0048 #endif 
0049 
0050 #if __TBB_PREVIEW_ASYNC_MSG
0051 #include <vector>    // std::vector in internal::async_storage
0052 #include <memory>    // std::shared_ptr in async_msg
0053 #endif
0054 
0055 #if __TBB_PREVIEW_STREAMING_NODE
0056 // For streaming_node
0057 #include <array>            // std::array
0058 #include <unordered_map>    // std::unordered_map
0059 #include <type_traits>      // std::decay, std::true_type, std::false_type
0060 #endif // __TBB_PREVIEW_STREAMING_NODE
0061 
0062 #if TBB_DEPRECATED_FLOW_ENQUEUE
0063 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
0064 #else
0065 #define FLOW_SPAWN(a) tbb::task::spawn((a))
0066 #endif
0067 
0068 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
0069 #define __TBB_DEFAULT_NODE_ALLOCATOR(T) cache_aligned_allocator<T>
0070 #else
0071 #define __TBB_DEFAULT_NODE_ALLOCATOR(T) null_type
0072 #endif
0073 
0074 // use the VC10 or gcc version of tuple if it is available.
0075 #if __TBB_CPP11_TUPLE_PRESENT
0076     #include <tuple>
0077 namespace tbb {
0078     namespace flow {
0079         using std::tuple;
0080         using std::tuple_size;
0081         using std::tuple_element;
0082         using std::get;
0083     }
0084 }
0085 #else
0086     #include "compat/tuple"
0087 #endif
0088 
0089 #include<list>
0090 #include<queue>
0091 
0092 /** @file
0093   \brief The graph related classes and functions
0094 
0095   There are some applications that best express dependencies as messages
0096   passed between nodes in a graph.  These messages may contain data or
0097   simply act as signals that a predecessors has completed. The graph
0098   class and its associated node classes can be used to express such
0099   applications.
0100 */
0101 
0102 namespace tbb {
0103 namespace flow {
0104 
0105 //! An enumeration the provides the two most common concurrency levels: unlimited and serial
0106 enum concurrency { unlimited = 0, serial = 1 };
0107 
0108 namespace interface11 {
0109 
0110 //! A generic null type
0111 struct null_type {};
0112 
0113 //! An empty class used for messages that mean "I'm done"
0114 class continue_msg {};
0115 
0116 //! Forward declaration section
0117 template< typename T > class sender;
0118 template< typename T > class receiver;
0119 class continue_receiver;
0120 
0121 template< typename T, typename U > class limiter_node;  // needed for resetting decrementer
0122 
0123 template< typename R, typename B > class run_and_put_task;
0124 
0125 namespace internal {
0126 
0127 template<typename T, typename M> class successor_cache;
0128 template<typename T, typename M> class broadcast_cache;
0129 template<typename T, typename M> class round_robin_cache;
0130 template<typename T, typename M> class predecessor_cache;
0131 template<typename T, typename M> class reservable_predecessor_cache;
0132 
0133 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0134 namespace order {
0135 struct following;
0136 struct preceding;
0137 }
0138 template<typename Order, typename... Args> struct node_set;
0139 #endif
0140 
0141 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0142 // Holder of edges both for caches and for those nodes which do not have predecessor caches.
0143 // C == receiver< ... > or sender< ... >, depending.
0144 template<typename C>
0145 class edge_container {
0146 
0147 public:
0148     typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type;
0149 
0150     void add_edge(C &s) {
0151         built_edges.push_back(&s);
0152     }
0153 
0154     void delete_edge(C &s) {
0155         for (typename edge_list_type::iterator i = built_edges.begin(); i != built_edges.end(); ++i) {
0156             if (*i == &s) {
0157                 (void)built_edges.erase(i);
0158                 return;  // only remove one predecessor per request
0159             }
0160         }
0161     }
0162 
0163     void copy_edges(edge_list_type &v) {
0164         v = built_edges;
0165     }
0166 
0167     size_t edge_count() {
0168         return (size_t)(built_edges.size());
0169     }
0170 
0171     void clear() {
0172         built_edges.clear();
0173     }
0174 
0175     // methods remove the statement from all predecessors/successors liste in the edge
0176     // container.
0177     template< typename S > void sender_extract(S &s);
0178     template< typename R > void receiver_extract(R &r);
0179 
0180 private:
0181     edge_list_type built_edges;
0182 };  // class edge_container
0183 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
0184 
0185 } // namespace internal
0186 
0187 } // namespace interfaceX
0188 } // namespace flow
0189 } // namespace tbb
0190 
0191 //! The graph class
0192 #include "internal/_flow_graph_impl.h"
0193 
0194 namespace tbb {
0195 namespace flow {
0196 namespace interface11 {
0197 
0198 // enqueue left task if necessary. Returns the non-enqueued task if there is one.
0199 static inline tbb::task *combine_tasks(graph& g, tbb::task * left, tbb::task * right) {
0200     // if no RHS task, don't change left.
0201     if (right == NULL) return left;
0202     // right != NULL
0203     if (left == NULL) return right;
0204     if (left == SUCCESSFULLY_ENQUEUED) return right;
0205     // left contains a task
0206     if (right != SUCCESSFULLY_ENQUEUED) {
0207         // both are valid tasks
0208         internal::spawn_in_graph_arena(g, *left);
0209         return right;
0210     }
0211     return left;
0212 }
0213 
0214 #if __TBB_PREVIEW_ASYNC_MSG
0215 
0216 template < typename T > class __TBB_DEPRECATED async_msg;
0217 
0218 namespace internal {
0219 
0220 template < typename T > class async_storage;
0221 
0222 template< typename T, typename = void >
0223 struct async_helpers {
0224     typedef async_msg<T> async_type;
0225     typedef T filtered_type;
0226 
0227     static const bool is_async_type = false;
0228 
0229     static const void* to_void_ptr(const T& t) {
0230         return static_cast<const void*>(&t);
0231     }
0232 
0233     static void* to_void_ptr(T& t) {
0234         return static_cast<void*>(&t);
0235     }
0236 
0237     static const T& from_void_ptr(const void* p) {
0238         return *static_cast<const T*>(p);
0239     }
0240 
0241     static T& from_void_ptr(void* p) {
0242         return *static_cast<T*>(p);
0243     }
0244 
0245     static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
0246         if (is_async) {
0247             // This (T) is NOT async and incoming 'A<X> t' IS async
0248             // Get data from async_msg
0249             const async_msg<filtered_type>& msg = async_helpers< async_msg<filtered_type> >::from_void_ptr(p);
0250             task* const new_task = msg.my_storage->subscribe(*this_recv, this_recv->graph_reference());
0251             // finalize() must be called after subscribe() because set() can be called in finalize()
0252             // and 'this_recv' client must be subscribed by this moment
0253             msg.finalize();
0254             return new_task;
0255         }
0256         else {
0257             // Incoming 't' is NOT async
0258             return this_recv->try_put_task(from_void_ptr(p));
0259         }
0260     }
0261 };
0262 
0263 template< typename T >
0264 struct async_helpers< T, typename std::enable_if< std::is_base_of<async_msg<typename T::async_msg_data_type>, T>::value >::type > {
0265     typedef T async_type;
0266     typedef typename T::async_msg_data_type filtered_type;
0267 
0268     static const bool is_async_type = true;
0269 
0270     // Receiver-classes use const interfaces
0271     static const void* to_void_ptr(const T& t) {
0272         return static_cast<const void*>(&static_cast<const async_msg<filtered_type>&>(t));
0273     }
0274 
0275     static void* to_void_ptr(T& t) {
0276         return static_cast<void*>(&static_cast<async_msg<filtered_type>&>(t));
0277     }
0278 
0279     // Sender-classes use non-const interfaces
0280     static const T& from_void_ptr(const void* p) {
0281         return *static_cast<const T*>(static_cast<const async_msg<filtered_type>*>(p));
0282     }
0283 
0284     static T& from_void_ptr(void* p) {
0285         return *static_cast<T*>(static_cast<async_msg<filtered_type>*>(p));
0286     }
0287 
0288     // Used in receiver<T> class
0289     static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
0290         if (is_async) {
0291             // Both are async
0292             return this_recv->try_put_task(from_void_ptr(p));
0293         }
0294         else {
0295             // This (T) is async and incoming 'X t' is NOT async
0296             // Create async_msg for X
0297             const filtered_type& t = async_helpers<filtered_type>::from_void_ptr(p);
0298             const T msg(t);
0299             return this_recv->try_put_task(msg);
0300         }
0301     }
0302 };
0303 
0304 class untyped_receiver;
0305 
0306 class untyped_sender {
0307     template< typename, typename > friend class internal::predecessor_cache;
0308     template< typename, typename > friend class internal::reservable_predecessor_cache;
0309 public:
0310     //! The successor type for this node
0311     typedef untyped_receiver successor_type;
0312 
0313     virtual ~untyped_sender() {}
0314 
0315     // NOTE: Following part of PUBLIC section is copy-paste from original sender<T> class
0316 
0317     // TODO: Prevent untyped successor registration
0318 
0319     //! Add a new successor to this node
0320     virtual bool register_successor( successor_type &r ) = 0;
0321 
0322     //! Removes a successor from this node
0323     virtual bool remove_successor( successor_type &r ) = 0;
0324 
0325     //! Releases the reserved item
0326     virtual bool try_release( ) { return false; }
0327 
0328     //! Consumes the reserved item
0329     virtual bool try_consume( ) { return false; }
0330 
0331 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0332     //! interface to record edges for traversal & deletion
0333     typedef internal::edge_container<successor_type> built_successors_type;
0334     typedef built_successors_type::edge_list_type successor_list_type;
0335     virtual built_successors_type &built_successors()                   = 0;
0336     virtual void    internal_add_built_successor( successor_type & )    = 0;
0337     virtual void    internal_delete_built_successor( successor_type & ) = 0;
0338     virtual void    copy_successors( successor_list_type &)             = 0;
0339     virtual size_t  successor_count()                                   = 0;
0340 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
0341 protected:
0342     //! Request an item from the sender
0343     template< typename X >
0344     bool try_get( X &t ) {
0345         return try_get_wrapper( internal::async_helpers<X>::to_void_ptr(t), internal::async_helpers<X>::is_async_type );
0346     }
0347 
0348     //! Reserves an item in the sender
0349     template< typename X >
0350     bool try_reserve( X &t ) {
0351         return try_reserve_wrapper( internal::async_helpers<X>::to_void_ptr(t), internal::async_helpers<X>::is_async_type );
0352     }
0353 
0354     virtual bool try_get_wrapper( void* p, bool is_async ) = 0;
0355     virtual bool try_reserve_wrapper( void* p, bool is_async ) = 0;
0356 };
0357 
0358 class untyped_receiver  {
0359     template< typename, typename > friend class run_and_put_task;
0360 
0361     template< typename, typename > friend class internal::broadcast_cache;
0362     template< typename, typename > friend class internal::round_robin_cache;
0363     template< typename, typename > friend class internal::successor_cache;
0364 
0365 #if __TBB_PREVIEW_OPENCL_NODE
0366     template< typename, typename > friend class proxy_dependency_receiver;
0367 #endif /* __TBB_PREVIEW_OPENCL_NODE */
0368 public:
0369     //! The predecessor type for this node
0370     typedef untyped_sender predecessor_type;
0371 
0372     //! Destructor
0373     virtual ~untyped_receiver() {}
0374 
0375     //! Put an item to the receiver
0376     template<typename X>
0377     bool try_put(const X& t) {
0378         task *res = try_put_task(t);
0379         if (!res) return false;
0380         if (res != SUCCESSFULLY_ENQUEUED) internal::spawn_in_graph_arena(graph_reference(), *res);
0381         return true;
0382     }
0383 
0384     // NOTE: Following part of PUBLIC section is copy-paste from original receiver<T> class
0385 
0386     // TODO: Prevent untyped predecessor registration
0387 
0388     //! Add a predecessor to the node
0389     virtual bool register_predecessor( predecessor_type & ) { return false; }
0390 
0391     //! Remove a predecessor from the node
0392     virtual bool remove_predecessor( predecessor_type & ) { return false; }
0393 
0394 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0395     typedef internal::edge_container<predecessor_type> built_predecessors_type;
0396     typedef built_predecessors_type::edge_list_type predecessor_list_type;
0397     virtual built_predecessors_type &built_predecessors()                  = 0;
0398     virtual void   internal_add_built_predecessor( predecessor_type & )    = 0;
0399     virtual void   internal_delete_built_predecessor( predecessor_type & ) = 0;
0400     virtual void   copy_predecessors( predecessor_list_type & )            = 0;
0401     virtual size_t predecessor_count()                                     = 0;
0402 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
0403 protected:
0404     template<typename X>
0405     task *try_put_task(const X& t) {
0406         return try_put_task_wrapper( internal::async_helpers<X>::to_void_ptr(t), internal::async_helpers<X>::is_async_type );
0407     }
0408 
0409     virtual task* try_put_task_wrapper( const void* p, bool is_async ) = 0;
0410 
0411     virtual graph& graph_reference() const = 0;
0412 
0413     // NOTE: Following part of PROTECTED and PRIVATE sections is copy-paste from original receiver<T> class
0414 
0415     //! put receiver back in initial state
0416     virtual void reset_receiver(reset_flags f = rf_reset_protocol) = 0;
0417 
0418     virtual bool is_continue_receiver() { return false; }
0419 };
0420 
0421 } // namespace internal
0422 
0423 //! Pure virtual template class that defines a sender of messages of type T
0424 template< typename T >
0425 class sender : public internal::untyped_sender {
0426 public:
0427     //! The output type of this sender
0428     __TBB_DEPRECATED typedef T output_type;
0429 
0430     __TBB_DEPRECATED typedef typename internal::async_helpers<T>::filtered_type filtered_type;
0431 
0432     //! Request an item from the sender
0433     virtual bool try_get( T & ) { return false; }
0434 
0435     //! Reserves an item in the sender
0436     virtual bool try_reserve( T & ) { return false; }
0437 
0438 protected:
0439     virtual bool try_get_wrapper( void* p, bool is_async ) __TBB_override {
0440         // Both async OR both are NOT async
0441         if ( internal::async_helpers<T>::is_async_type == is_async ) {
0442             return try_get( internal::async_helpers<T>::from_void_ptr(p) );
0443         }
0444         // Else: this (T) is async OR incoming 't' is async
0445         __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_get()");
0446         return false;
0447     }
0448 
0449     virtual bool try_reserve_wrapper( void* p, bool is_async ) __TBB_override {
0450         // Both async OR both are NOT async
0451         if ( internal::async_helpers<T>::is_async_type == is_async ) {
0452             return try_reserve( internal::async_helpers<T>::from_void_ptr(p) );
0453         }
0454         // Else: this (T) is async OR incoming 't' is async
0455         __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_reserve()");
0456         return false;
0457     }
0458 };  // class sender<T>
0459 
0460 //! Pure virtual template class that defines a receiver of messages of type T
0461 template< typename T >
0462 class receiver : public internal::untyped_receiver {
0463     template< typename > friend class internal::async_storage;
0464     template< typename, typename > friend struct internal::async_helpers;
0465 public:
0466     //! The input type of this receiver
0467     __TBB_DEPRECATED typedef T input_type;
0468 
0469     __TBB_DEPRECATED typedef typename internal::async_helpers<T>::filtered_type filtered_type;
0470 
0471     //! Put an item to the receiver
0472     bool try_put( const typename internal::async_helpers<T>::filtered_type& t ) {
0473         return internal::untyped_receiver::try_put(t);
0474     }
0475 
0476     bool try_put( const typename internal::async_helpers<T>::async_type& t ) {
0477         return internal::untyped_receiver::try_put(t);
0478     }
0479 
0480 protected:
0481     virtual task* try_put_task_wrapper( const void *p, bool is_async ) __TBB_override {
0482         return internal::async_helpers<T>::try_put_task_wrapper_impl(this, p, is_async);
0483     }
0484 
0485     //! Put item to successor; return task to run the successor if possible.
0486     virtual task *try_put_task(const T& t) = 0;
0487 
0488 }; // class receiver<T>
0489 
0490 #else // __TBB_PREVIEW_ASYNC_MSG
0491 
0492 //! Pure virtual template class that defines a sender of messages of type T
0493 template< typename T >
0494 class sender {
0495 public:
0496     //! The output type of this sender
0497     __TBB_DEPRECATED typedef T output_type;
0498 
0499     //! The successor type for this node
0500     __TBB_DEPRECATED typedef receiver<T> successor_type;
0501 
0502     virtual ~sender() {}
0503 
0504     // NOTE: Following part of PUBLIC section is partly copy-pasted in sender<T> under #if __TBB_PREVIEW_ASYNC_MSG
0505 
0506     //! Add a new successor to this node
0507     __TBB_DEPRECATED virtual bool register_successor( successor_type &r ) = 0;
0508 
0509     //! Removes a successor from this node
0510     __TBB_DEPRECATED virtual bool remove_successor( successor_type &r ) = 0;
0511 
0512     //! Request an item from the sender
0513     virtual bool try_get( T & ) { return false; }
0514 
0515     //! Reserves an item in the sender
0516     virtual bool try_reserve( T & ) { return false; }
0517 
0518     //! Releases the reserved item
0519     virtual bool try_release( ) { return false; }
0520 
0521     //! Consumes the reserved item
0522     virtual bool try_consume( ) { return false; }
0523 
0524 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0525     //! interface to record edges for traversal & deletion
0526     __TBB_DEPRECATED typedef typename  internal::edge_container<successor_type> built_successors_type;
0527     __TBB_DEPRECATED typedef typename  built_successors_type::edge_list_type successor_list_type;
0528     __TBB_DEPRECATED virtual built_successors_type &built_successors()                   = 0;
0529     __TBB_DEPRECATED virtual void    internal_add_built_successor( successor_type & )    = 0;
0530     __TBB_DEPRECATED virtual void    internal_delete_built_successor( successor_type & ) = 0;
0531     __TBB_DEPRECATED virtual void    copy_successors( successor_list_type &)             = 0;
0532     __TBB_DEPRECATED virtual size_t  successor_count()                                   = 0;
0533 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
0534 };  // class sender<T>
0535 
0536 //! Pure virtual template class that defines a receiver of messages of type T
0537 template< typename T >
0538 class receiver {
0539 public:
0540     //! The input type of this receiver
0541     __TBB_DEPRECATED typedef T input_type;
0542 
0543     //! The predecessor type for this node
0544     __TBB_DEPRECATED typedef sender<T> predecessor_type;
0545 
0546     //! Destructor
0547     virtual ~receiver() {}
0548 
0549     //! Put an item to the receiver
0550     bool try_put( const T& t ) {
0551         task *res = try_put_task(t);
0552         if (!res) return false;
0553         if (res != SUCCESSFULLY_ENQUEUED) internal::spawn_in_graph_arena(graph_reference(), *res);
0554         return true;
0555     }
0556 
0557     //! put item to successor; return task to run the successor if possible.
0558 protected:
0559     template< typename R, typename B > friend class run_and_put_task;
0560     template< typename X, typename Y > friend class internal::broadcast_cache;
0561     template< typename X, typename Y > friend class internal::round_robin_cache;
0562     virtual task *try_put_task(const T& t) = 0;
0563     virtual graph& graph_reference() const = 0;
0564 public:
0565     // NOTE: Following part of PUBLIC and PROTECTED sections is copy-pasted in receiver<T> under #if __TBB_PREVIEW_ASYNC_MSG
0566 
0567     //! Add a predecessor to the node
0568     __TBB_DEPRECATED virtual bool register_predecessor( predecessor_type & ) { return false; }
0569 
0570     //! Remove a predecessor from the node
0571     __TBB_DEPRECATED virtual bool remove_predecessor( predecessor_type & ) { return false; }
0572 
0573 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0574     __TBB_DEPRECATED typedef typename internal::edge_container<predecessor_type> built_predecessors_type;
0575     __TBB_DEPRECATED typedef typename built_predecessors_type::edge_list_type predecessor_list_type;
0576     __TBB_DEPRECATED virtual built_predecessors_type &built_predecessors()                  = 0;
0577     __TBB_DEPRECATED virtual void   internal_add_built_predecessor( predecessor_type & )    = 0;
0578     __TBB_DEPRECATED virtual void   internal_delete_built_predecessor( predecessor_type & ) = 0;
0579     __TBB_DEPRECATED virtual void   copy_predecessors( predecessor_list_type & )            = 0;
0580     __TBB_DEPRECATED virtual size_t predecessor_count()                                     = 0;
0581 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
0582 
0583 protected:
0584     //! put receiver back in initial state
0585     virtual void reset_receiver(reset_flags f = rf_reset_protocol) = 0;
0586 
0587     template<typename TT, typename M> friend class internal::successor_cache;
0588     virtual bool is_continue_receiver() { return false; }
0589 
0590 #if __TBB_PREVIEW_OPENCL_NODE
0591     template< typename, typename > friend class proxy_dependency_receiver;
0592 #endif /* __TBB_PREVIEW_OPENCL_NODE */
0593 }; // class receiver<T>
0594 
0595 #endif // __TBB_PREVIEW_ASYNC_MSG
0596 
0597 //! Base class for receivers of completion messages
0598 /** These receivers automatically reset, but cannot be explicitly waited on */
0599 class continue_receiver : public receiver< continue_msg > {
0600 public:
0601 
0602     //! The input type
0603     __TBB_DEPRECATED typedef continue_msg input_type;
0604 
0605     //! The predecessor type for this node
0606     __TBB_DEPRECATED typedef receiver<input_type>::predecessor_type predecessor_type;
0607 
0608     //! Constructor
0609     __TBB_DEPRECATED explicit continue_receiver(
0610         __TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority)) {
0611         my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
0612         my_current_count = 0;
0613         __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = priority; )
0614     }
0615 
0616     //! Copy constructor
0617     __TBB_DEPRECATED continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
0618         my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
0619         my_current_count = 0;
0620         __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = src.my_priority; )
0621     }
0622 
0623     //! Increments the trigger threshold
0624     __TBB_DEPRECATED bool register_predecessor( predecessor_type & ) __TBB_override {
0625         spin_mutex::scoped_lock l(my_mutex);
0626         ++my_predecessor_count;
0627         return true;
0628     }
0629 
0630     //! Decrements the trigger threshold
0631     /** Does not check to see if the removal of the predecessor now makes the current count
0632         exceed the new threshold.  So removing a predecessor while the graph is active can cause
0633         unexpected results. */
0634     __TBB_DEPRECATED bool remove_predecessor( predecessor_type & ) __TBB_override {
0635         spin_mutex::scoped_lock l(my_mutex);
0636         --my_predecessor_count;
0637         return true;
0638     }
0639 
0640 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0641     __TBB_DEPRECATED typedef internal::edge_container<predecessor_type> built_predecessors_type;
0642     __TBB_DEPRECATED typedef built_predecessors_type::edge_list_type predecessor_list_type;
0643     built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
0644 
0645     __TBB_DEPRECATED void internal_add_built_predecessor( predecessor_type &s) __TBB_override {
0646         spin_mutex::scoped_lock l(my_mutex);
0647         my_built_predecessors.add_edge( s );
0648     }
0649 
0650     __TBB_DEPRECATED void internal_delete_built_predecessor( predecessor_type &s) __TBB_override {
0651         spin_mutex::scoped_lock l(my_mutex);
0652         my_built_predecessors.delete_edge(s);
0653     }
0654 
0655     __TBB_DEPRECATED void copy_predecessors( predecessor_list_type &v) __TBB_override {
0656         spin_mutex::scoped_lock l(my_mutex);
0657         my_built_predecessors.copy_edges(v);
0658     }
0659 
0660     __TBB_DEPRECATED size_t predecessor_count() __TBB_override {
0661         spin_mutex::scoped_lock l(my_mutex);
0662         return my_built_predecessors.edge_count();
0663     }
0664 
0665 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
0666 
0667 protected:
0668     template< typename R, typename B > friend class run_and_put_task;
0669     template<typename X, typename Y> friend class internal::broadcast_cache;
0670     template<typename X, typename Y> friend class internal::round_robin_cache;
0671     // execute body is supposed to be too small to create a task for.
0672     task *try_put_task( const input_type & ) __TBB_override {
0673         {
0674             spin_mutex::scoped_lock l(my_mutex);
0675             if ( ++my_current_count < my_predecessor_count )
0676                 return SUCCESSFULLY_ENQUEUED;
0677             else
0678                 my_current_count = 0;
0679         }
0680         task * res = execute();
0681         return res? res : SUCCESSFULLY_ENQUEUED;
0682     }
0683 
0684 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0685     // continue_receiver must contain its own built_predecessors because it does
0686     // not have a node_cache.
0687     built_predecessors_type my_built_predecessors;
0688 #endif
0689     spin_mutex my_mutex;
0690     int my_predecessor_count;
0691     int my_current_count;
0692     int my_initial_predecessor_count;
0693     __TBB_FLOW_GRAPH_PRIORITY_EXPR( node_priority_t my_priority; )
0694     // the friend declaration in the base class did not eliminate the "protected class"
0695     // error in gcc 4.1.2
0696     template<typename U, typename V> friend class tbb::flow::interface11::limiter_node;
0697 
0698     void reset_receiver( reset_flags f ) __TBB_override {
0699         my_current_count = 0;
0700         if (f & rf_clear_edges) {
0701 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0702             my_built_predecessors.clear();
0703 #endif
0704             my_predecessor_count = my_initial_predecessor_count;
0705         }
0706     }
0707 
0708     //! Does whatever should happen when the threshold is reached
0709     /** This should be very fast or else spawn a task.  This is
0710         called while the sender is blocked in the try_put(). */
0711     virtual task * execute() = 0;
0712     template<typename TT, typename M> friend class internal::successor_cache;
0713     bool is_continue_receiver() __TBB_override { return true; }
0714 
0715 }; // class continue_receiver
0716 
0717 }  // interfaceX
0718 
0719 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
0720     template <typename K, typename T>
0721     K key_from_message( const T &t ) {
0722         return t.key();
0723     }
0724 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
0725 
0726     using interface11::sender;
0727     using interface11::receiver;
0728     using interface11::continue_receiver;
0729 }  // flow
0730 }  // tbb
0731 
0732 #include "internal/_flow_graph_trace_impl.h"
0733 #include "internal/_tbb_hash_compare_impl.h"
0734 
0735 namespace tbb {
0736 namespace flow {
0737 namespace interface11 {
0738 
0739 #include "internal/_flow_graph_body_impl.h"
0740 #include "internal/_flow_graph_cache_impl.h"
0741 #include "internal/_flow_graph_types_impl.h"
0742 #if __TBB_PREVIEW_ASYNC_MSG
0743 #include "internal/_flow_graph_async_msg_impl.h"
0744 #endif
0745 using namespace internal::graph_policy_namespace;
0746 
0747 template <typename C, typename N>
0748 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
0749 {
0750     if (begin) current_node = my_graph->my_nodes;
0751     //else it is an end iterator by default
0752 }
0753 
0754 template <typename C, typename N>
0755 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
0756     __TBB_ASSERT(current_node, "graph_iterator at end");
0757     return *operator->();
0758 }
0759 
0760 template <typename C, typename N>
0761 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
0762     return current_node;
0763 }
0764 
0765 template <typename C, typename N>
0766 void graph_iterator<C,N>::internal_forward() {
0767     if (current_node) current_node = current_node->next;
0768 }
0769 
0770 } // namespace interfaceX
0771 
0772 namespace interface10 {
0773 //! Constructs a graph with isolated task_group_context
0774 inline graph::graph() : my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
0775     prepare_task_arena();
0776     own_context = true;
0777     cancelled = false;
0778     caught_exception = false;
0779     my_context = new task_group_context(tbb::internal::FLOW_TASKS);
0780     my_root_task = (new (task::allocate_root(*my_context)) empty_task);
0781     my_root_task->set_ref_count(1);
0782     tbb::internal::fgt_graph(this);
0783     my_is_active = true;
0784 }
0785 
0786 inline graph::graph(task_group_context& use_this_context) :
0787     my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
0788     prepare_task_arena();
0789     own_context = false;
0790     cancelled = false;
0791     caught_exception = false;
0792     my_root_task = (new (task::allocate_root(*my_context)) empty_task);
0793     my_root_task->set_ref_count(1);
0794     tbb::internal::fgt_graph(this);
0795     my_is_active = true;
0796 }
0797 
0798 inline graph::~graph() {
0799     wait_for_all();
0800     my_root_task->set_ref_count(0);
0801     tbb::task::destroy(*my_root_task);
0802     if (own_context) delete my_context;
0803     delete my_task_arena;
0804 }
0805 
0806 inline void graph::reserve_wait() {
0807     if (my_root_task) {
0808         my_root_task->increment_ref_count();
0809         tbb::internal::fgt_reserve_wait(this);
0810     }
0811 }
0812 
0813 inline void graph::release_wait() {
0814     if (my_root_task) {
0815         tbb::internal::fgt_release_wait(this);
0816         my_root_task->decrement_ref_count();
0817     }
0818 }
0819 
0820 inline void graph::register_node(tbb::flow::interface11::graph_node *n) {
0821     n->next = NULL;
0822     {
0823         spin_mutex::scoped_lock lock(nodelist_mutex);
0824         n->prev = my_nodes_last;
0825         if (my_nodes_last) my_nodes_last->next = n;
0826         my_nodes_last = n;
0827         if (!my_nodes) my_nodes = n;
0828     }
0829 }
0830 
0831 inline void graph::remove_node(tbb::flow::interface11::graph_node *n) {
0832     {
0833         spin_mutex::scoped_lock lock(nodelist_mutex);
0834         __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
0835         if (n->prev) n->prev->next = n->next;
0836         if (n->next) n->next->prev = n->prev;
0837         if (my_nodes_last == n) my_nodes_last = n->prev;
0838         if (my_nodes == n) my_nodes = n->next;
0839     }
0840     n->prev = n->next = NULL;
0841 }
0842 
0843 inline void graph::reset( tbb::flow::interface11::reset_flags f ) {
0844     // reset context
0845     tbb::flow::interface11::internal::deactivate_graph(*this);
0846 
0847     if(my_context) my_context->reset();
0848     cancelled = false;
0849     caught_exception = false;
0850     // reset all the nodes comprising the graph
0851     for(iterator ii = begin(); ii != end(); ++ii) {
0852         tbb::flow::interface11::graph_node *my_p = &(*ii);
0853         my_p->reset_node(f);
0854     }
0855     // Reattach the arena. Might be useful to run the graph in a particular task_arena
0856     // while not limiting graph lifetime to a single task_arena::execute() call.
0857     prepare_task_arena( /*reinit=*/true );
0858     tbb::flow::interface11::internal::activate_graph(*this);
0859     // now spawn the tasks necessary to start the graph
0860     for(task_list_type::iterator rti = my_reset_task_list.begin(); rti != my_reset_task_list.end(); ++rti) {
0861         tbb::flow::interface11::internal::spawn_in_graph_arena(*this, *(*rti));
0862     }
0863     my_reset_task_list.clear();
0864 }
0865 
0866 inline graph::iterator graph::begin() { return iterator(this, true); }
0867 
0868 inline graph::iterator graph::end() { return iterator(this, false); }
0869 
0870 inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
0871 
0872 inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
0873 
0874 inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
0875 
0876 inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
0877 
0878 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
0879 inline void graph::set_name(const char *name) {
0880     tbb::internal::fgt_graph_desc(this, name);
0881 }
0882 #endif
0883 
0884 } // namespace interface10
0885 
0886 namespace interface11 {
0887 
0888 inline graph_node::graph_node(graph& g) : my_graph(g) {
0889     my_graph.register_node(this);
0890 }
0891 
0892 inline graph_node::~graph_node() {
0893     my_graph.remove_node(this);
0894 }
0895 
0896 #include "internal/_flow_graph_node_impl.h"
0897 
0898 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0899 using internal::node_set;
0900 #endif
0901 
0902 //! An executable node that acts as a source, i.e. it has no predecessors
0903 template < typename Output >
0904 class input_node : public graph_node, public sender< Output > {
0905 public:
0906     //! The type of the output message, which is complete
0907     typedef Output output_type;
0908 
0909     //! The type of successors of this node
0910     typedef typename sender<output_type>::successor_type successor_type;
0911 
0912     //Source node has no input type
0913     typedef null_type input_type;
0914 
0915 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0916     typedef typename sender<output_type>::built_successors_type built_successors_type;
0917     typedef typename sender<output_type>::successor_list_type successor_list_type;
0918 #endif
0919 
0920     //! Constructor for a node with a successor
0921     template< typename Body >
0922      __TBB_NOINLINE_SYM input_node( graph &g, Body body )
0923         : graph_node(g), my_active(false),
0924         my_body( new internal::input_body_leaf< output_type, Body>(body) ),
0925         my_init_body( new internal::input_body_leaf< output_type, Body>(body) ),
0926         my_reserved(false), my_has_cached_item(false)
0927     {
0928         my_successors.set_owner(this);
0929         tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
0930                                            static_cast<sender<output_type> *>(this), this->my_body );
0931     }
0932 
0933 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0934     template <typename Body, typename... Successors>
0935     input_node( const node_set<internal::order::preceding, Successors...>& successors, Body body )
0936         : input_node(successors.graph_reference(), body) {
0937         make_edges(*this, successors);
0938     }
0939 #endif
0940 
0941     //! Copy constructor
0942     __TBB_NOINLINE_SYM input_node( const input_node& src ) :
0943         graph_node(src.my_graph), sender<Output>(),
0944         my_active(false),
0945         my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
0946         my_reserved(false), my_has_cached_item(false)
0947     {
0948         my_successors.set_owner(this);
0949         tbb::internal::fgt_node_with_body(CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
0950                                            static_cast<sender<output_type> *>(this), this->my_body );
0951     }
0952 
0953     //! The destructor
0954     ~input_node() { delete my_body; delete my_init_body; }
0955 
0956 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
0957     void set_name( const char *name ) __TBB_override {
0958         tbb::internal::fgt_node_desc( this, name );
0959     }
0960 #endif
0961 
0962     //! Add a new successor to this node
0963     bool register_successor( successor_type &r ) __TBB_override {
0964         spin_mutex::scoped_lock lock(my_mutex);
0965         my_successors.register_successor(r);
0966         if ( my_active )
0967             spawn_put();
0968         return true;
0969     }
0970 
0971     //! Removes a successor from this node
0972     bool remove_successor( successor_type &r ) __TBB_override {
0973         spin_mutex::scoped_lock lock(my_mutex);
0974         my_successors.remove_successor(r);
0975         return true;
0976     }
0977 
0978 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0979 
0980     built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
0981 
0982     void internal_add_built_successor( successor_type &r) __TBB_override {
0983         spin_mutex::scoped_lock lock(my_mutex);
0984         my_successors.internal_add_built_successor(r);
0985     }
0986 
0987     void internal_delete_built_successor( successor_type &r) __TBB_override {
0988         spin_mutex::scoped_lock lock(my_mutex);
0989         my_successors.internal_delete_built_successor(r);
0990     }
0991 
0992     size_t successor_count() __TBB_override {
0993         spin_mutex::scoped_lock lock(my_mutex);
0994         return my_successors.successor_count();
0995     }
0996 
0997     void copy_successors(successor_list_type &v) __TBB_override {
0998         spin_mutex::scoped_lock l(my_mutex);
0999         my_successors.copy_successors(v);
1000     }
1001 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1002 
1003     //! Request an item from the node
1004     bool try_get( output_type &v ) __TBB_override {
1005         spin_mutex::scoped_lock lock(my_mutex);
1006         if ( my_reserved )
1007             return false;
1008 
1009         if ( my_has_cached_item ) {
1010             v = my_cached_item;
1011             my_has_cached_item = false;
1012             return true;
1013         }
1014         // we've been asked to provide an item, but we have none.  enqueue a task to
1015         // provide one.
1016         if ( my_active )
1017             spawn_put();
1018         return false;
1019     }
1020 
1021     //! Reserves an item.
1022     bool try_reserve( output_type &v ) __TBB_override {
1023         spin_mutex::scoped_lock lock(my_mutex);
1024         if ( my_reserved ) {
1025             return false;
1026         }
1027 
1028         if ( my_has_cached_item ) {
1029             v = my_cached_item;
1030             my_reserved = true;
1031             return true;
1032         } else {
1033             return false;
1034         }
1035     }
1036 
1037     //! Release a reserved item.
1038     /** true = item has been released and so remains in sender, dest must request or reserve future items */
1039     bool try_release( ) __TBB_override {
1040         spin_mutex::scoped_lock lock(my_mutex);
1041         __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
1042         my_reserved = false;
1043         if(!my_successors.empty())
1044             spawn_put();
1045         return true;
1046     }
1047 
1048     //! Consumes a reserved item
1049     bool try_consume( ) __TBB_override {
1050         spin_mutex::scoped_lock lock(my_mutex);
1051         __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1052         my_reserved = false;
1053         my_has_cached_item = false;
1054         if ( !my_successors.empty() ) {
1055             spawn_put();
1056         }
1057         return true;
1058     }
1059 
1060     //! Activates a node that was created in the inactive state
1061     void activate() {
1062         spin_mutex::scoped_lock lock(my_mutex);
1063         my_active = true;
1064         if (!my_successors.empty())
1065             spawn_put();
1066     }
1067 
1068     template<typename Body>
1069     Body copy_function_object() {
1070         internal::input_body<output_type> &body_ref = *this->my_body;
1071         return dynamic_cast< internal::input_body_leaf<output_type, Body> & >(body_ref).get_body();
1072     }
1073 
1074 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1075     void extract( ) __TBB_override {
1076         my_successors.built_successors().sender_extract(*this);   // removes "my_owner" == this from each successor
1077         my_active = false;
1078         my_reserved = false;
1079         if(my_has_cached_item) my_has_cached_item = false;
1080     }
1081 #endif
1082 
1083 protected:
1084 
1085     //! resets the input_node to its initial state
1086     void reset_node( reset_flags f) __TBB_override {
1087         my_active = false;
1088         my_reserved = false;
1089         my_has_cached_item = false;
1090 
1091         if(f & rf_clear_edges) my_successors.clear();
1092         if(f & rf_reset_bodies) {
1093             internal::input_body<output_type> *tmp = my_init_body->clone();
1094             delete my_body;
1095             my_body = tmp;
1096         }
1097     }
1098 
1099 private:
1100     spin_mutex my_mutex;
1101     bool my_active;
1102     internal::input_body<output_type> *my_body;
1103     internal::input_body<output_type> *my_init_body;
1104     internal::broadcast_cache< output_type > my_successors;
1105     bool my_reserved;
1106     bool my_has_cached_item;
1107     output_type my_cached_item;
1108 
1109     // used by apply_body_bypass, can invoke body of node.
1110     bool try_reserve_apply_body(output_type &v) {
1111         spin_mutex::scoped_lock lock(my_mutex);
1112         if ( my_reserved ) {
1113             return false;
1114         }
1115         if ( !my_has_cached_item ) {
1116             tbb::internal::fgt_begin_body( my_body );
1117 
1118 #if TBB_DEPRECATED_INPUT_NODE_BODY
1119             bool r = (*my_body)(my_cached_item);
1120             if (r) {
1121                 my_has_cached_item = true;
1122             }
1123 #else
1124             flow_control control;
1125             my_cached_item = (*my_body)(control);
1126             my_has_cached_item = !control.is_pipeline_stopped;
1127 #endif
1128             tbb::internal::fgt_end_body( my_body );
1129         }
1130         if ( my_has_cached_item ) {
1131             v = my_cached_item;
1132             my_reserved = true;
1133             return true;
1134         } else {
1135             return false;
1136         }
1137     }
1138 
1139     task* create_put_task() {
1140         return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1141                         internal:: source_task_bypass < input_node< output_type > >( *this ) );
1142     }
1143 
1144     //! Spawns a task that applies the body
1145     void spawn_put( ) {
1146         if(internal::is_graph_active(this->my_graph)) {
1147             internal::spawn_in_graph_arena(this->my_graph, *create_put_task());
1148         }
1149     }
1150 
1151     friend class internal::source_task_bypass< input_node< output_type > >;
1152     //! Applies the body.  Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
1153     task * apply_body_bypass( ) {
1154         output_type v;
1155         if ( !try_reserve_apply_body(v) )
1156             return NULL;
1157 
1158         task *last_task = my_successors.try_put_task(v);
1159         if ( last_task )
1160             try_consume();
1161         else
1162             try_release();
1163         return last_task;
1164     }
1165 };  // class input_node
1166 
1167 #if TBB_USE_SOURCE_NODE_AS_ALIAS
1168 template < typename Output >
1169 class source_node : public input_node <Output> {
1170 public:
1171     //! Constructor for a node with a successor
1172     template< typename Body >
1173      __TBB_NOINLINE_SYM source_node( graph &g, Body body )
1174          : input_node<Output>(g, body)
1175     {
1176     }
1177 
1178 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1179     template <typename Body, typename... Successors>
1180     source_node( const node_set<internal::order::preceding, Successors...>& successors, Body body )
1181         : input_node<Output>(successors, body) {
1182      }
1183 #endif
1184 };
1185 #else // TBB_USE_SOURCE_NODE_AS_ALIAS
1186 //! An executable node that acts as a source, i.e. it has no predecessors
1187 template < typename Output > class
1188 __TBB_DEPRECATED_MSG("TBB Warning: tbb::flow::source_node is deprecated, use tbb::flow::input_node." )
1189 source_node : public graph_node, public sender< Output > {
1190 public:
1191     //! The type of the output message, which is complete
1192     typedef Output output_type;
1193 
1194     //! The type of successors of this node
1195     typedef typename sender<output_type>::successor_type successor_type;
1196 
1197     //Source node has no input type
1198     typedef null_type input_type;
1199 
1200 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1201     typedef typename sender<output_type>::built_successors_type built_successors_type;
1202     typedef typename sender<output_type>::successor_list_type successor_list_type;
1203 #endif
1204 
1205     //! Constructor for a node with a successor
1206     template< typename Body >
1207      __TBB_NOINLINE_SYM source_node( graph &g, Body body, bool is_active = true )
1208         : graph_node(g), my_active(is_active), init_my_active(is_active),
1209         my_body( new internal::source_body_leaf< output_type, Body>(body) ),
1210         my_init_body( new internal::source_body_leaf< output_type, Body>(body) ),
1211         my_reserved(false), my_has_cached_item(false)
1212     {
1213         my_successors.set_owner(this);
1214     tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
1215                                            static_cast<sender<output_type> *>(this), this->my_body );
1216     }
1217 
1218 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1219     template <typename Body, typename... Successors>
1220     source_node( const node_set<internal::order::preceding, Successors...>& successors, Body body, bool is_active = true )
1221         : source_node(successors.graph_reference(), body, is_active) {
1222         make_edges(*this, successors);
1223     }
1224 #endif
1225 
1226     //! Copy constructor
1227     __TBB_NOINLINE_SYM source_node( const source_node& src ) :
1228         graph_node(src.my_graph), sender<Output>(),
1229         my_active(src.init_my_active),
1230         init_my_active(src.init_my_active), my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
1231         my_reserved(false), my_has_cached_item(false)
1232     {
1233         my_successors.set_owner(this);
1234         tbb::internal::fgt_node_with_body(CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
1235                                            static_cast<sender<output_type> *>(this), this->my_body );
1236     }
1237 
1238     //! The destructor
1239     ~source_node() { delete my_body; delete my_init_body; }
1240 
1241 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1242     void set_name( const char *name ) __TBB_override {
1243         tbb::internal::fgt_node_desc( this, name );
1244     }
1245 #endif
1246 
1247     //! Add a new successor to this node
1248     bool register_successor( successor_type &r ) __TBB_override {
1249         spin_mutex::scoped_lock lock(my_mutex);
1250         my_successors.register_successor(r);
1251         if ( my_active )
1252             spawn_put();
1253         return true;
1254     }
1255 
1256     //! Removes a successor from this node
1257     bool remove_successor( successor_type &r ) __TBB_override {
1258         spin_mutex::scoped_lock lock(my_mutex);
1259         my_successors.remove_successor(r);
1260         return true;
1261     }
1262 
1263 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1264 
1265     built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1266 
1267     void internal_add_built_successor( successor_type &r) __TBB_override {
1268         spin_mutex::scoped_lock lock(my_mutex);
1269         my_successors.internal_add_built_successor(r);
1270     }
1271 
1272     void internal_delete_built_successor( successor_type &r) __TBB_override {
1273         spin_mutex::scoped_lock lock(my_mutex);
1274         my_successors.internal_delete_built_successor(r);
1275     }
1276 
1277     size_t successor_count() __TBB_override {
1278         spin_mutex::scoped_lock lock(my_mutex);
1279         return my_successors.successor_count();
1280     }
1281 
1282     void copy_successors(successor_list_type &v) __TBB_override {
1283         spin_mutex::scoped_lock l(my_mutex);
1284         my_successors.copy_successors(v);
1285     }
1286 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1287 
1288     //! Request an item from the node
1289     bool try_get( output_type &v ) __TBB_override {
1290         spin_mutex::scoped_lock lock(my_mutex);
1291         if ( my_reserved )
1292             return false;
1293 
1294         if ( my_has_cached_item ) {
1295             v = my_cached_item;
1296             my_has_cached_item = false;
1297             return true;
1298         }
1299         // we've been asked to provide an item, but we have none.  enqueue a task to
1300         // provide one.
1301         spawn_put();
1302         return false;
1303     }
1304 
1305     //! Reserves an item.
1306     bool try_reserve( output_type &v ) __TBB_override {
1307         spin_mutex::scoped_lock lock(my_mutex);
1308         if ( my_reserved ) {
1309             return false;
1310         }
1311 
1312         if ( my_has_cached_item ) {
1313             v = my_cached_item;
1314             my_reserved = true;
1315             return true;
1316         } else {
1317             return false;
1318         }
1319     }
1320 
1321     //! Release a reserved item.
1322     /** true = item has been released and so remains in sender, dest must request or reserve future items */
1323     bool try_release( ) __TBB_override {
1324         spin_mutex::scoped_lock lock(my_mutex);
1325         __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
1326         my_reserved = false;
1327         if(!my_successors.empty())
1328             spawn_put();
1329         return true;
1330     }
1331 
1332     //! Consumes a reserved item
1333     bool try_consume( ) __TBB_override {
1334         spin_mutex::scoped_lock lock(my_mutex);
1335         __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1336         my_reserved = false;
1337         my_has_cached_item = false;
1338         if ( !my_successors.empty() ) {
1339             spawn_put();
1340         }
1341         return true;
1342     }
1343 
1344     //! Activates a node that was created in the inactive state
1345     void activate() {
1346         spin_mutex::scoped_lock lock(my_mutex);
1347         my_active = true;
1348         if (!my_successors.empty())
1349             spawn_put();
1350     }
1351 
1352     template<typename Body>
1353     Body copy_function_object() {
1354         internal::source_body<output_type> &body_ref = *this->my_body;
1355         return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
1356     }
1357 
1358 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1359     void extract( ) __TBB_override {
1360         my_successors.built_successors().sender_extract(*this);   // removes "my_owner" == this from each successor
1361         my_active = init_my_active;
1362         my_reserved = false;
1363         if(my_has_cached_item) my_has_cached_item = false;
1364     }
1365 #endif
1366 
1367 protected:
1368 
1369     //! resets the source_node to its initial state
1370     void reset_node( reset_flags f) __TBB_override {
1371         my_active = init_my_active;
1372         my_reserved =false;
1373         if(my_has_cached_item) {
1374             my_has_cached_item = false;
1375         }
1376         if(f & rf_clear_edges) my_successors.clear();
1377         if(f & rf_reset_bodies) {
1378             internal::source_body<output_type> *tmp = my_init_body->clone();
1379             delete my_body;
1380             my_body = tmp;
1381         }
1382         if(my_active)
1383             internal::add_task_to_graph_reset_list(this->my_graph, create_put_task());
1384     }
1385 
1386 private:
1387     spin_mutex my_mutex;
1388     bool my_active;
1389     bool init_my_active;
1390     internal::source_body<output_type> *my_body;
1391     internal::source_body<output_type> *my_init_body;
1392     internal::broadcast_cache< output_type > my_successors;
1393     bool my_reserved;
1394     bool my_has_cached_item;
1395     output_type my_cached_item;
1396 
1397     // used by apply_body_bypass, can invoke body of node.
1398     bool try_reserve_apply_body(output_type &v) {
1399         spin_mutex::scoped_lock lock(my_mutex);
1400         if ( my_reserved ) {
1401             return false;
1402         }
1403         if ( !my_has_cached_item ) {
1404             tbb::internal::fgt_begin_body( my_body );
1405             bool r = (*my_body)(my_cached_item);
1406             tbb::internal::fgt_end_body( my_body );
1407             if (r) {
1408                 my_has_cached_item = true;
1409             }
1410         }
1411         if ( my_has_cached_item ) {
1412             v = my_cached_item;
1413             my_reserved = true;
1414             return true;
1415         } else {
1416             return false;
1417         }
1418     }
1419 
1420     // when resetting, and if the source_node was created with my_active == true, then
1421     // when we reset the node we must store a task to run the node, and spawn it only
1422     // after the reset is complete and is_active() is again true.  This is why we don't
1423     // test for is_active() here.
1424     task* create_put_task() {
1425         return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1426                         internal:: source_task_bypass < source_node< output_type > >( *this ) );
1427     }
1428 
1429     //! Spawns a task that applies the body
1430     void spawn_put( ) {
1431         if(internal::is_graph_active(this->my_graph)) {
1432             internal::spawn_in_graph_arena(this->my_graph, *create_put_task());
1433         }
1434     }
1435 
1436     friend class internal::source_task_bypass< source_node< output_type > >;
1437     //! Applies the body.  Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
1438     task * apply_body_bypass( ) {
1439         output_type v;
1440         if ( !try_reserve_apply_body(v) )
1441             return NULL;
1442 
1443         task *last_task = my_successors.try_put_task(v);
1444         if ( last_task )
1445             try_consume();
1446         else
1447             try_release();
1448         return last_task;
1449     }
1450 };  // class source_node
1451 #endif // TBB_USE_SOURCE_NODE_AS_ALIAS
1452 
1453 //! Implements a function node that supports Input -> Output
1454 template<typename Input, typename Output = continue_msg, typename Policy = queueing,
1455          typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input)>
1456 class function_node
1457     : public graph_node
1458 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1459     , public internal::function_input< Input, Output, Policy, Allocator >
1460 #else
1461     , public internal::function_input< Input, Output, Policy, cache_aligned_allocator<Input> >
1462 #endif
1463     , public internal::function_output<Output> {
1464 
1465 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1466     typedef Allocator internals_allocator;
1467 #else
1468     typedef cache_aligned_allocator<Input> internals_allocator;
1469 
1470     __TBB_STATIC_ASSERT(
1471         (tbb::internal::is_same_type<Allocator, null_type>::value),
1472         "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
1473         "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
1474     );
1475 #endif
1476 
1477 public:
1478     typedef Input input_type;
1479     typedef Output output_type;
1480     typedef internal::function_input<input_type,output_type,Policy,internals_allocator> input_impl_type;
1481     typedef internal::function_input_queue<input_type, internals_allocator> input_queue_type;
1482     typedef internal::function_output<output_type> fOutput_type;
1483     typedef typename input_impl_type::predecessor_type predecessor_type;
1484     typedef typename fOutput_type::successor_type successor_type;
1485 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1486     typedef typename input_impl_type::predecessor_list_type predecessor_list_type;
1487     typedef typename fOutput_type::successor_list_type successor_list_type;
1488 #endif
1489     using input_impl_type::my_predecessors;
1490 
1491     //! Constructor
1492     // input_queue_type is allocated here, but destroyed in the function_input_base.
1493     // TODO: pass the graph_buffer_policy to the function_input_base so it can all
1494     // be done in one place.  This would be an interface-breaking change.
1495     template< typename Body >
1496      __TBB_NOINLINE_SYM function_node( graph &g, size_t concurrency,
1497 #if __TBB_CPP11_PRESENT
1498                    Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority ))
1499 #else
1500                    __TBB_FLOW_GRAPH_PRIORITY_ARG1( Body body, node_priority_t priority = tbb::flow::internal::no_priority ))
1501 #endif
1502         : graph_node(g), input_impl_type(g, concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)),
1503           fOutput_type(g) {
1504         tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1505                 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1506     }
1507 
1508 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1509     template <typename Body>
1510     function_node( graph& g, size_t concurrency, Body body, node_priority_t priority )
1511         : function_node(g, concurrency, body, Policy(), priority) {}
1512 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1513 
1514 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1515     template <typename Body, typename... Args>
1516     function_node( const node_set<Args...>& nodes, size_t concurrency, Body body,
1517                    __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy p = Policy(), node_priority_t priority = tbb::flow::internal::no_priority ))
1518         : function_node(nodes.graph_reference(), concurrency, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
1519         make_edges_in_order(nodes, *this);
1520     }
1521 
1522 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1523     template <typename Body, typename... Args>
1524     function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority )
1525         : function_node(nodes, concurrency, body, Policy(), priority) {}
1526 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1527 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1528 
1529     //! Copy constructor
1530     __TBB_NOINLINE_SYM function_node( const function_node& src ) :
1531         graph_node(src.my_graph),
1532         input_impl_type(src),
1533         fOutput_type(src.my_graph) {
1534         tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1535                 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1536     }
1537 
1538 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1539     void set_name( const char *name ) __TBB_override {
1540         tbb::internal::fgt_node_desc( this, name );
1541     }
1542 #endif
1543 
1544 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1545     void extract( ) __TBB_override {
1546         my_predecessors.built_predecessors().receiver_extract(*this);
1547         successors().built_successors().sender_extract(*this);
1548     }
1549 #endif
1550 
1551 protected:
1552     template< typename R, typename B > friend class run_and_put_task;
1553     template<typename X, typename Y> friend class internal::broadcast_cache;
1554     template<typename X, typename Y> friend class internal::round_robin_cache;
1555     using input_impl_type::try_put_task;
1556 
1557     internal::broadcast_cache<output_type> &successors () __TBB_override { return fOutput_type::my_successors; }
1558 
1559     void reset_node(reset_flags f) __TBB_override {
1560         input_impl_type::reset_function_input(f);
1561         // TODO: use clear() instead.
1562         if(f & rf_clear_edges) {
1563             successors().clear();
1564             my_predecessors.clear();
1565         }
1566         __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
1567         __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
1568     }
1569 
1570 };  // class function_node
1571 
1572 //! implements a function node that supports Input -> (set of outputs)
1573 // Output is a tuple of output types.
1574 template<typename Input, typename Output, typename Policy = queueing,
1575          typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input)>
1576 class multifunction_node :
1577     public graph_node,
1578     public internal::multifunction_input
1579     <
1580         Input,
1581         typename internal::wrap_tuple_elements<
1582             tbb::flow::tuple_size<Output>::value,  // #elements in tuple
1583             internal::multifunction_output,  // wrap this around each element
1584             Output // the tuple providing the types
1585         >::type,
1586         Policy,
1587 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1588         Allocator
1589 #else
1590         cache_aligned_allocator<Input>
1591 #endif
1592     > {
1593 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1594     typedef Allocator internals_allocator;
1595 #else
1596     typedef cache_aligned_allocator<Input> internals_allocator;
1597 
1598     __TBB_STATIC_ASSERT(
1599         (tbb::internal::is_same_type<Allocator, null_type>::value),
1600         "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
1601         "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
1602     );
1603 #endif
1604 
1605 protected:
1606     static const int N = tbb::flow::tuple_size<Output>::value;
1607 public:
1608     typedef Input input_type;
1609     typedef null_type output_type;
1610     typedef typename internal::wrap_tuple_elements<N,internal::multifunction_output, Output>::type output_ports_type;
1611     typedef internal::multifunction_input<
1612         input_type, output_ports_type, Policy, internals_allocator> input_impl_type;
1613     typedef internal::function_input_queue<input_type, internals_allocator> input_queue_type;
1614 private:
1615     using input_impl_type::my_predecessors;
1616 public:
1617     template<typename Body>
1618     __TBB_NOINLINE_SYM multifunction_node(
1619         graph &g, size_t concurrency,
1620 #if __TBB_CPP11_PRESENT
1621         Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
1622 #else
1623         __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body body, node_priority_t priority = tbb::flow::internal::no_priority)
1624 #endif
1625     ) : graph_node(g), input_impl_type(g, concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
1626         tbb::internal::fgt_multioutput_node_with_body<N>(
1627             CODEPTR(), tbb::internal::FLOW_MULTIFUNCTION_NODE,
1628             &this->my_graph, static_cast<receiver<input_type> *>(this),
1629             this->output_ports(), this->my_body
1630         );
1631     }
1632 
1633 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1634     template <typename Body>
1635     __TBB_NOINLINE_SYM multifunction_node(graph& g, size_t concurrency, Body body, node_priority_t priority)
1636         : multifunction_node(g, concurrency, body, Policy(), priority) {}
1637 #endif // TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1638 
1639 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1640     template <typename Body, typename... Args>
1641     __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body,
1642                        __TBB_FLOW_GRAPH_PRIORITY_ARG1(Policy p = Policy(), node_priority_t priority = tbb::flow::internal::no_priority))
1643         : multifunction_node(nodes.graph_reference(), concurrency, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
1644         make_edges_in_order(nodes, *this);
1645     }
1646 
1647 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1648     template <typename Body, typename... Args>
1649     __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority)
1650         : multifunction_node(nodes, concurrency, body, Policy(), priority) {}
1651 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1652 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1653 
1654     __TBB_NOINLINE_SYM multifunction_node( const multifunction_node &other) :
1655         graph_node(other.my_graph), input_impl_type(other) {
1656         tbb::internal::fgt_multioutput_node_with_body<N>( CODEPTR(), tbb::internal::FLOW_MULTIFUNCTION_NODE,
1657                 &this->my_graph, static_cast<receiver<input_type> *>(this),
1658                 this->output_ports(), this->my_body );
1659     }
1660 
1661 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1662     void set_name( const char *name ) __TBB_override {
1663         tbb::internal::fgt_multioutput_node_desc( this, name );
1664     }
1665 #endif
1666 
1667 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1668     void extract( ) __TBB_override {
1669         my_predecessors.built_predecessors().receiver_extract(*this);
1670         input_impl_type::extract();
1671     }
1672 #endif
1673     // all the guts are in multifunction_input...
1674 protected:
1675     void reset_node(reset_flags f) __TBB_override { input_impl_type::reset(f); }
1676 };  // multifunction_node
1677 
1678 //! split_node: accepts a tuple as input, forwards each element of the tuple to its
1679 //  successors.  The node has unlimited concurrency, so it does not reject inputs.
1680 template<typename TupleType, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(TupleType)>
1681 class split_node : public graph_node, public receiver<TupleType> {
1682     static const int N = tbb::flow::tuple_size<TupleType>::value;
1683     typedef receiver<TupleType> base_type;
1684 public:
1685     typedef TupleType input_type;
1686 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1687     typedef Allocator allocator_type;
1688 #else
1689     __TBB_STATIC_ASSERT(
1690         (tbb::internal::is_same_type<Allocator, null_type>::value),
1691         "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
1692         "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
1693     );
1694 #endif
1695 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1696     typedef typename base_type::predecessor_type predecessor_type;
1697     typedef typename base_type::predecessor_list_type predecessor_list_type;
1698     typedef internal::predecessor_cache<input_type, null_mutex > predecessor_cache_type;
1699     typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
1700 #endif
1701 
1702     typedef typename internal::wrap_tuple_elements<
1703             N,  // #elements in tuple
1704             internal::multifunction_output,  // wrap this around each element
1705             TupleType // the tuple providing the types
1706         >::type  output_ports_type;
1707 
1708     __TBB_NOINLINE_SYM explicit split_node(graph &g)
1709         : graph_node(g),
1710           my_output_ports(internal::init_output_ports<output_ports_type>::call(g, my_output_ports))
1711     {
1712         tbb::internal::fgt_multioutput_node<N>(CODEPTR(), tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1713             static_cast<receiver<input_type> *>(this), this->output_ports());
1714     }
1715 
1716 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1717     template <typename... Args>
1718     __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) {
1719         make_edges_in_order(nodes, *this);
1720     }
1721 #endif
1722 
1723     __TBB_NOINLINE_SYM split_node(const split_node& other)
1724         : graph_node(other.my_graph), base_type(other),
1725           my_output_ports(internal::init_output_ports<output_ports_type>::call(other.my_graph, my_output_ports))
1726     {
1727         tbb::internal::fgt_multioutput_node<N>(CODEPTR(), tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1728             static_cast<receiver<input_type> *>(this), this->output_ports());
1729     }
1730 
1731 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1732     void set_name( const char *name ) __TBB_override {
1733         tbb::internal::fgt_multioutput_node_desc( this, name );
1734     }
1735 #endif
1736 
1737     output_ports_type &output_ports() { return my_output_ports; }
1738 
1739 protected:
1740     task *try_put_task(const TupleType& t) __TBB_override {
1741         // Sending split messages in parallel is not justified, as overheads would prevail.
1742         // Also, we do not have successors here. So we just tell the task returned here is successful.
1743         return internal::emit_element<N>::emit_this(this->my_graph, t, output_ports());
1744     }
1745     void reset_node(reset_flags f) __TBB_override {
1746         if (f & rf_clear_edges)
1747             internal::clear_element<N>::clear_this(my_output_ports);
1748 
1749         __TBB_ASSERT(!(f & rf_clear_edges) || internal::clear_element<N>::this_empty(my_output_ports), "split_node reset failed");
1750     }
1751     void reset_receiver(reset_flags /*f*/) __TBB_override {}
1752     graph& graph_reference() const __TBB_override {
1753         return my_graph;
1754     }
1755 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1756 private: //! split_node doesn't use this "predecessors" functionality; so, we have "dummies" here;
1757     void extract() __TBB_override {}
1758 
1759     //! Adds to list of predecessors added by make_edge
1760     void internal_add_built_predecessor(predecessor_type&) __TBB_override {}
1761 
1762     //! removes from to list of predecessors (used by remove_edge)
1763     void internal_delete_built_predecessor(predecessor_type&) __TBB_override {}
1764 
1765     size_t predecessor_count() __TBB_override { return 0; }
1766 
1767     void copy_predecessors(predecessor_list_type&) __TBB_override {}
1768 
1769     built_predecessors_type &built_predecessors() __TBB_override { return my_predessors; }
1770 
1771     //! dummy member
1772     built_predecessors_type my_predessors;
1773 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1774 
1775 private:
1776     output_ports_type my_output_ports;
1777 };
1778 
1779 //! Implements an executable node that supports continue_msg -> Output
1780 template <typename Output, typename Policy = internal::Policy<void> >
1781 class continue_node : public graph_node, public internal::continue_input<Output, Policy>,
1782                       public internal::function_output<Output> {
1783 public:
1784     typedef continue_msg input_type;
1785     typedef Output output_type;
1786     typedef internal::continue_input<Output, Policy> input_impl_type;
1787     typedef internal::function_output<output_type> fOutput_type;
1788     typedef typename input_impl_type::predecessor_type predecessor_type;
1789     typedef typename fOutput_type::successor_type successor_type;
1790 
1791     //! Constructor for executable node with continue_msg -> Output
1792     template <typename Body >
1793     __TBB_NOINLINE_SYM continue_node(
1794         graph &g,
1795 #if __TBB_CPP11_PRESENT
1796         Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
1797 #else
1798         __TBB_FLOW_GRAPH_PRIORITY_ARG1( Body body, node_priority_t priority = tbb::flow::internal::no_priority )
1799 #endif
1800     ) : graph_node(g), input_impl_type( g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority) ),
1801         fOutput_type(g) {
1802         tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1803 
1804                                            static_cast<receiver<input_type> *>(this),
1805                                            static_cast<sender<output_type> *>(this), this->my_body );
1806     }
1807 
1808 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1809     template <typename Body>
1810     continue_node( graph& g, Body body, node_priority_t priority )
1811         : continue_node(g, body, Policy(), priority) {}
1812 #endif
1813 
1814 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1815     template <typename Body, typename... Args>
1816     continue_node( const node_set<Args...>& nodes, Body body,
1817                    __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy p = Policy(), node_priority_t priority = tbb::flow::internal::no_priority))
1818         : continue_node(nodes.graph_reference(), body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority) ) {
1819         make_edges_in_order(nodes, *this);
1820     }
1821 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1822     template <typename Body, typename... Args>
1823     continue_node( const node_set<Args...>& nodes, Body body, node_priority_t priority)
1824         : continue_node(nodes, body, Policy(), priority) {}
1825 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1826 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1827 
1828     //! Constructor for executable node with continue_msg -> Output
1829     template <typename Body >
1830     __TBB_NOINLINE_SYM continue_node(
1831         graph &g, int number_of_predecessors,
1832 #if __TBB_CPP11_PRESENT
1833         Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
1834 #else
1835         __TBB_FLOW_GRAPH_PRIORITY_ARG1( Body body, node_priority_t priority = tbb::flow::internal::no_priority )
1836 #endif
1837     ) : graph_node(g)
1838       , input_impl_type(g, number_of_predecessors, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)),
1839         fOutput_type(g) {
1840         tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1841                                            static_cast<receiver<input_type> *>(this),
1842                                            static_cast<sender<output_type> *>(this), this->my_body );
1843     }
1844 
1845 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1846     template <typename Body>
1847     continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t priority)
1848         : continue_node(g, number_of_predecessors, body, Policy(), priority) {}
1849 #endif
1850 
1851 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1852     template <typename Body, typename... Args>
1853     continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1854                    Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy p = Policy(), node_priority_t priority = tbb::flow::internal::no_priority ))
1855         : continue_node(nodes.graph_reference(), number_of_predecessors, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
1856         make_edges_in_order(nodes, *this);
1857     }
1858 
1859 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1860     template <typename Body, typename... Args>
1861     continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1862                    Body body, node_priority_t priority )
1863         : continue_node(nodes, number_of_predecessors, body, Policy(), priority) {}
1864 #endif
1865 #endif
1866 
1867     //! Copy constructor
1868     __TBB_NOINLINE_SYM continue_node( const continue_node& src ) :
1869         graph_node(src.my_graph), input_impl_type(src),
1870         internal::function_output<Output>(src.my_graph) {
1871         tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1872                                            static_cast<receiver<input_type> *>(this),
1873                                            static_cast<sender<output_type> *>(this), this->my_body );
1874     }
1875 
1876 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1877     void set_name( const char *name ) __TBB_override {
1878         tbb::internal::fgt_node_desc( this, name );
1879     }
1880 #endif
1881 
1882 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1883     void extract() __TBB_override {
1884         input_impl_type::my_built_predecessors.receiver_extract(*this);
1885         successors().built_successors().sender_extract(*this);
1886     }
1887 #endif
1888 
1889 protected:
1890     template< typename R, typename B > friend class run_and_put_task;
1891     template<typename X, typename Y> friend class internal::broadcast_cache;
1892     template<typename X, typename Y> friend class internal::round_robin_cache;
1893     using input_impl_type::try_put_task;
1894     internal::broadcast_cache<output_type> &successors () __TBB_override { return fOutput_type::my_successors; }
1895 
1896     void reset_node(reset_flags f) __TBB_override {
1897         input_impl_type::reset_receiver(f);
1898         if(f & rf_clear_edges)successors().clear();
1899         __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1900     }
1901 };  // continue_node
1902 
1903 //! Forwards messages of type T to all successors
1904 template <typename T>
1905 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1906 public:
1907     typedef T input_type;
1908     typedef T output_type;
1909     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1910     typedef typename sender<output_type>::successor_type successor_type;
1911 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1912     typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1913     typedef typename sender<output_type>::successor_list_type successor_list_type;
1914 #endif
1915 private:
1916     internal::broadcast_cache<input_type> my_successors;
1917 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1918     internal::edge_container<predecessor_type> my_built_predecessors;
1919     spin_mutex pred_mutex;  // serialize accesses on edge_container
1920 #endif
1921 public:
1922 
1923     __TBB_NOINLINE_SYM explicit broadcast_node(graph& g) : graph_node(g) {
1924         my_successors.set_owner( this );
1925         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1926                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1927     }
1928 
1929 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1930     template <typename... Args>
1931     broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) {
1932         make_edges_in_order(nodes, *this);
1933     }
1934 #endif
1935 
1936     // Copy constructor
1937     __TBB_NOINLINE_SYM broadcast_node( const broadcast_node& src ) :
1938         graph_node(src.my_graph), receiver<T>(), sender<T>()
1939     {
1940         my_successors.set_owner( this );
1941         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1942                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1943     }
1944 
1945 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1946     void set_name( const char *name ) __TBB_override {
1947         tbb::internal::fgt_node_desc( this, name );
1948     }
1949 #endif
1950 
1951     //! Adds a successor
1952     bool register_successor( successor_type &r ) __TBB_override {
1953         my_successors.register_successor( r );
1954         return true;
1955     }
1956 
1957     //! Removes s as a successor
1958     bool remove_successor( successor_type &r ) __TBB_override {
1959         my_successors.remove_successor( r );
1960         return true;
1961     }
1962 
1963 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1964     typedef typename sender<T>::built_successors_type built_successors_type;
1965 
1966     built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1967 
1968     void internal_add_built_successor(successor_type &r) __TBB_override {
1969         my_successors.internal_add_built_successor(r);
1970     }
1971 
1972     void internal_delete_built_successor(successor_type &r) __TBB_override {
1973         my_successors.internal_delete_built_successor(r);
1974     }
1975 
1976     size_t successor_count() __TBB_override {
1977         return my_successors.successor_count();
1978     }
1979 
1980     void copy_successors(successor_list_type &v) __TBB_override {
1981         my_successors.copy_successors(v);
1982     }
1983 
1984     typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
1985 
1986     built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
1987 
1988     void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
1989         spin_mutex::scoped_lock l(pred_mutex);
1990         my_built_predecessors.add_edge(p);
1991     }
1992 
1993     void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
1994         spin_mutex::scoped_lock l(pred_mutex);
1995         my_built_predecessors.delete_edge(p);
1996     }
1997 
1998     size_t predecessor_count() __TBB_override {
1999         spin_mutex::scoped_lock l(pred_mutex);
2000         return my_built_predecessors.edge_count();
2001     }
2002 
2003     void copy_predecessors(predecessor_list_type &v) __TBB_override {
2004         spin_mutex::scoped_lock l(pred_mutex);
2005         my_built_predecessors.copy_edges(v);
2006     }
2007 
2008     void extract() __TBB_override {
2009         my_built_predecessors.receiver_extract(*this);
2010         my_successors.built_successors().sender_extract(*this);
2011     }
2012 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2013 
2014 protected:
2015     template< typename R, typename B > friend class run_and_put_task;
2016     template<typename X, typename Y> friend class internal::broadcast_cache;
2017     template<typename X, typename Y> friend class internal::round_robin_cache;
2018     //! build a task to run the successor if possible.  Default is old behavior.
2019     task *try_put_task(const T& t) __TBB_override {
2020         task *new_task = my_successors.try_put_task(t);
2021         if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
2022         return new_task;
2023     }
2024 
2025     graph& graph_reference() const __TBB_override {
2026         return my_graph;
2027     }
2028 
2029     void reset_receiver(reset_flags /*f*/) __TBB_override {}
2030 
2031     void reset_node(reset_flags f) __TBB_override {
2032         if (f&rf_clear_edges) {
2033            my_successors.clear();
2034 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2035            my_built_predecessors.clear();
2036 #endif
2037         }
2038         __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
2039     }
2040 };  // broadcast_node
2041 
2042 //! Forwards messages in arbitrary order
2043 template <typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
2044 class buffer_node
2045     : public graph_node
2046 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2047     , public internal::reservable_item_buffer< T, Allocator >
2048 #else
2049     , public internal::reservable_item_buffer< T, cache_aligned_allocator<T> >
2050 #endif
2051     , public receiver<T>, public sender<T> {
2052 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2053     typedef Allocator internals_allocator;
2054 #else
2055     typedef cache_aligned_allocator<T> internals_allocator;
2056 #endif
2057 public:
2058     typedef T input_type;
2059     typedef T output_type;
2060     typedef typename receiver<input_type>::predecessor_type predecessor_type;
2061     typedef typename sender<output_type>::successor_type successor_type;
2062     typedef buffer_node<T, Allocator> class_type;
2063 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2064     typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
2065     typedef typename sender<output_type>::successor_list_type successor_list_type;
2066 #endif
2067 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2068     __TBB_STATIC_ASSERT(
2069         (tbb::internal::is_same_type<Allocator, null_type>::value),
2070         "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
2071         "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
2072     );
2073 #endif
2074 
2075 protected:
2076     typedef size_t size_type;
2077     internal::round_robin_cache< T, null_rw_mutex > my_successors;
2078 
2079 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2080     internal::edge_container<predecessor_type> my_built_predecessors;
2081 #endif
2082 
2083     friend class internal::forward_task_bypass< class_type >;
2084 
2085     enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
2086 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2087         , add_blt_succ, del_blt_succ,
2088         add_blt_pred, del_blt_pred,
2089         blt_succ_cnt, blt_pred_cnt,
2090         blt_succ_cpy, blt_pred_cpy   // create vector copies of preds and succs
2091 #endif
2092     };
2093 
2094     // implements the aggregator_operation concept
2095     class buffer_operation : public internal::aggregated_operation< buffer_operation > {
2096     public:
2097         char type;
2098 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2099         task * ltask;
2100         union {
2101             input_type *elem;
2102             successor_type *r;
2103             predecessor_type *p;
2104             size_t cnt_val;
2105             successor_list_type *svec;
2106             predecessor_list_type *pvec;
2107         };
2108 #else
2109         T *elem;
2110         task * ltask;
2111         successor_type *r;
2112 #endif
2113         buffer_operation(const T& e, op_type t) : type(char(t))
2114 
2115 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2116                                                   , ltask(NULL), elem(const_cast<T*>(&e))
2117 #else
2118                                                   , elem(const_cast<T*>(&e)) , ltask(NULL)
2119 #endif
2120         {}
2121         buffer_operation(op_type t) : type(char(t)),  ltask(NULL) {}
2122     };
2123 
2124     bool forwarder_busy;
2125     typedef internal::aggregating_functor<class_type, buffer_operation> handler_type;
2126     friend class internal::aggregating_functor<class_type, buffer_operation>;
2127     internal::aggregator< handler_type, buffer_operation> my_aggregator;
2128 
2129     virtual void handle_operations(buffer_operation *op_list) {
2130         handle_operations_impl(op_list, this);
2131     }
2132 
2133     template<typename derived_type>
2134     void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
2135         __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
2136 
2137         buffer_operation *tmp = NULL;
2138         bool try_forwarding = false;
2139         while (op_list) {
2140             tmp = op_list;
2141             op_list = op_list->next;
2142             switch (tmp->type) {
2143             case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
2144             case rem_succ: internal_rem_succ(tmp); break;
2145             case req_item: internal_pop(tmp); break;
2146             case res_item: internal_reserve(tmp); break;
2147             case rel_res:  internal_release(tmp); try_forwarding = true; break;
2148             case con_res:  internal_consume(tmp); try_forwarding = true; break;
2149             case put_item: try_forwarding = internal_push(tmp); break;
2150             case try_fwd_task: internal_forward_task(tmp); break;
2151 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2152             // edge recording
2153             case add_blt_succ: internal_add_built_succ(tmp); break;
2154             case del_blt_succ: internal_del_built_succ(tmp); break;
2155             case add_blt_pred: internal_add_built_pred(tmp); break;
2156             case del_blt_pred: internal_del_built_pred(tmp); break;
2157             case blt_succ_cnt: internal_succ_cnt(tmp); break;
2158             case blt_pred_cnt: internal_pred_cnt(tmp); break;
2159             case blt_succ_cpy: internal_copy_succs(tmp); break;
2160             case blt_pred_cpy: internal_copy_preds(tmp); break;
2161 #endif
2162             }
2163         }
2164 
2165         derived->order();
2166 
2167         if (try_forwarding && !forwarder_busy) {
2168             if(internal::is_graph_active(this->my_graph)) {
2169                 forwarder_busy = true;
2170                 task *new_task = new(task::allocate_additional_child_of(*(this->my_graph.root_task()))) internal::
2171                     forward_task_bypass<class_type>(*this);
2172                 // tmp should point to the last item handled by the aggregator.  This is the operation
2173                 // the handling thread enqueued.  So modifying that record will be okay.
2174                 // workaround for icc bug
2175                 tbb::task *z = tmp->ltask;
2176                 graph &g = this->my_graph;
2177                 tmp->ltask = combine_tasks(g, z, new_task);  // in case the op generated a task
2178             }
2179         }
2180     }  // handle_operations
2181 
2182     inline task *grab_forwarding_task( buffer_operation &op_data) {
2183         return op_data.ltask;
2184     }
2185 
2186     inline bool enqueue_forwarding_task(buffer_operation &op_data) {
2187         task *ft = grab_forwarding_task(op_data);
2188         if(ft) {
2189             internal::spawn_in_graph_arena(graph_reference(), *ft);
2190             return true;
2191         }
2192         return false;
2193     }
2194 
2195     //! This is executed by an enqueued task, the "forwarder"
2196     virtual task *forward_task() {
2197         buffer_operation op_data(try_fwd_task);
2198         task *last_task = NULL;
2199         do {
2200             op_data.status = internal::WAIT;
2201             op_data.ltask = NULL;
2202             my_aggregator.execute(&op_data);
2203 
2204             // workaround for icc bug
2205             tbb::task *xtask = op_data.ltask;
2206             graph& g = this->my_graph;
2207             last_task = combine_tasks(g, last_task, xtask);
2208         } while (op_data.status ==internal::SUCCEEDED);
2209         return last_task;
2210     }
2211 
2212     //! Register successor
2213     virtual void internal_reg_succ(buffer_operation *op) {
2214         my_successors.register_successor(*(op->r));
2215         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2216     }
2217 
2218     //! Remove successor
2219     virtual void internal_rem_succ(buffer_operation *op) {
2220         my_successors.remove_successor(*(op->r));
2221         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2222     }
2223 
2224 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2225     typedef typename sender<T>::built_successors_type built_successors_type;
2226 
2227     built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
2228 
2229     virtual void internal_add_built_succ(buffer_operation *op) {
2230         my_successors.internal_add_built_successor(*(op->r));
2231         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2232     }
2233 
2234     virtual void internal_del_built_succ(buffer_operation *op) {
2235         my_successors.internal_delete_built_successor(*(op->r));
2236         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2237     }
2238 
2239     typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
2240 
2241     built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
2242 
2243     virtual void internal_add_built_pred(buffer_operation *op) {
2244         my_built_predecessors.add_edge(*(op->p));
2245         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2246     }
2247 
2248     virtual void internal_del_built_pred(buffer_operation *op) {
2249         my_built_predecessors.delete_edge(*(op->p));
2250         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2251     }
2252 
2253     virtual void internal_succ_cnt(buffer_operation *op) {
2254         op->cnt_val = my_successors.successor_count();
2255         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2256     }
2257 
2258     virtual void internal_pred_cnt(buffer_operation *op) {
2259         op->cnt_val = my_built_predecessors.edge_count();
2260         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2261     }
2262 
2263     virtual void internal_copy_succs(buffer_operation *op) {
2264         my_successors.copy_successors(*(op->svec));
2265         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2266     }
2267 
2268     virtual void internal_copy_preds(buffer_operation *op) {
2269         my_built_predecessors.copy_edges(*(op->pvec));
2270         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2271     }
2272 
2273 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2274 
2275 private:
2276     void order() {}
2277 
2278     bool is_item_valid() {
2279         return this->my_item_valid(this->my_tail - 1);
2280     }
2281 
2282     void try_put_and_add_task(task*& last_task) {
2283         task *new_task = my_successors.try_put_task(this->back());
2284         if (new_task) {
2285             // workaround for icc bug
2286             graph& g = this->my_graph;
2287             last_task = combine_tasks(g, last_task, new_task);
2288             this->destroy_back();
2289         }
2290     }
2291 
2292 protected:
2293     //! Tries to forward valid items to successors
2294     virtual void internal_forward_task(buffer_operation *op) {
2295         internal_forward_task_impl(op, this);
2296     }
2297 
2298     template<typename derived_type>
2299     void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
2300         __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
2301 
2302         if (this->my_reserved || !derived->is_item_valid()) {
2303             __TBB_store_with_release(op->status, internal::FAILED);
2304             this->forwarder_busy = false;
2305             return;
2306         }
2307         // Try forwarding, giving each successor a chance
2308         task * last_task = NULL;
2309         size_type counter = my_successors.size();
2310         for (; counter > 0 && derived->is_item_valid(); --counter)
2311             derived->try_put_and_add_task(last_task);
2312 
2313         op->ltask = last_task;  // return task
2314         if (last_task && !counter) {
2315             __TBB_store_with_release(op->status, internal::SUCCEEDED);
2316         }
2317         else {
2318             __TBB_store_with_release(op->status, internal::FAILED);
2319             forwarder_busy = false;
2320         }
2321     }
2322 
2323     virtual bool internal_push(buffer_operation *op) {
2324         this->push_back(*(op->elem));
2325         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2326         return true;
2327     }
2328 
2329     virtual void internal_pop(buffer_operation *op) {
2330         if(this->pop_back(*(op->elem))) {
2331             __TBB_store_with_release(op->status, internal::SUCCEEDED);
2332         }
2333         else {
2334             __TBB_store_with_release(op->status, internal::FAILED);
2335         }
2336     }
2337 
2338     virtual void internal_reserve(buffer_operation *op) {
2339         if(this->reserve_front(*(op->elem))) {
2340             __TBB_store_with_release(op->status, internal::SUCCEEDED);
2341         }
2342         else {
2343             __TBB_store_with_release(op->status, internal::FAILED);
2344         }
2345     }
2346 
2347     virtual void internal_consume(buffer_operation *op) {
2348         this->consume_front();
2349         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2350     }
2351 
2352     virtual void internal_release(buffer_operation *op) {
2353         this->release_front();
2354         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2355     }
2356 
2357 public:
2358     //! Constructor
2359     __TBB_NOINLINE_SYM explicit buffer_node( graph &g )
2360         : graph_node(g), internal::reservable_item_buffer<T, internals_allocator>(), receiver<T>(),
2361           sender<T>(), forwarder_busy(false)
2362     {
2363         my_successors.set_owner(this);
2364         my_aggregator.initialize_handler(handler_type(this));
2365         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2366                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
2367     }
2368 
2369 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2370     template <typename... Args>
2371     buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) {
2372         make_edges_in_order(nodes, *this);
2373     }
2374 #endif
2375 
2376     //! Copy constructor
2377     __TBB_NOINLINE_SYM buffer_node( const buffer_node& src )
2378         : graph_node(src.my_graph), internal::reservable_item_buffer<T, internals_allocator>(),
2379           receiver<T>(), sender<T>(), forwarder_busy(false)
2380     {
2381         my_successors.set_owner(this);
2382         my_aggregator.initialize_handler(handler_type(this));
2383         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2384                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
2385     }
2386 
2387 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2388     void set_name( const char *name ) __TBB_override {
2389         tbb::internal::fgt_node_desc( this, name );
2390     }
2391 #endif
2392 
2393     //
2394     // message sender implementation
2395     //
2396 
2397     //! Adds a new successor.
2398     /** Adds successor r to the list of successors; may forward tasks.  */
2399     bool register_successor( successor_type &r ) __TBB_override {
2400         buffer_operation op_data(reg_succ);
2401         op_data.r = &r;
2402         my_aggregator.execute(&op_data);
2403         (void)enqueue_forwarding_task(op_data);
2404         return true;
2405     }
2406 
2407 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2408     void internal_add_built_successor( successor_type &r) __TBB_override {
2409         buffer_operation op_data(add_blt_succ);
2410         op_data.r = &r;
2411         my_aggregator.execute(&op_data);
2412     }
2413 
2414     void internal_delete_built_successor( successor_type &r) __TBB_override {
2415         buffer_operation op_data(del_blt_succ);
2416         op_data.r = &r;
2417         my_aggregator.execute(&op_data);
2418     }
2419 
2420     void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
2421         buffer_operation op_data(add_blt_pred);
2422         op_data.p = &p;
2423         my_aggregator.execute(&op_data);
2424     }
2425 
2426     void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
2427         buffer_operation op_data(del_blt_pred);
2428         op_data.p = &p;
2429         my_aggregator.execute(&op_data);
2430     }
2431 
2432     size_t predecessor_count() __TBB_override {
2433         buffer_operation op_data(blt_pred_cnt);
2434         my_aggregator.execute(&op_data);
2435         return op_data.cnt_val;
2436     }
2437 
2438     size_t successor_count() __TBB_override {
2439         buffer_operation op_data(blt_succ_cnt);
2440         my_aggregator.execute(&op_data);
2441         return op_data.cnt_val;
2442     }
2443 
2444     void copy_predecessors( predecessor_list_type &v ) __TBB_override {
2445         buffer_operation op_data(blt_pred_cpy);
2446         op_data.pvec = &v;
2447         my_aggregator.execute(&op_data);
2448     }
2449 
2450     void copy_successors( successor_list_type &v ) __TBB_override {
2451         buffer_operation op_data(blt_succ_cpy);
2452         op_data.svec = &v;
2453         my_aggregator.execute(&op_data);
2454     }
2455 
2456 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2457 
2458     //! Removes a successor.
2459     /** Removes successor r from the list of successors.
2460         It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
2461     bool remove_successor( successor_type &r ) __TBB_override {
2462         r.remove_predecessor(*this);
2463         buffer_operation op_data(rem_succ);
2464         op_data.r = &r;
2465         my_aggregator.execute(&op_data);
2466         // even though this operation does not cause a forward, if we are the handler, and
2467         // a forward is scheduled, we may be the first to reach this point after the aggregator,
2468         // and so should check for the task.
2469         (void)enqueue_forwarding_task(op_data);
2470         return true;
2471     }
2472 
2473     //! Request an item from the buffer_node
2474     /**  true = v contains the returned item<BR>
2475          false = no item has been returned */
2476     bool try_get( T &v ) __TBB_override {
2477         buffer_operation op_data(req_item);
2478         op_data.elem = &v;
2479         my_aggregator.execute(&op_data);
2480         (void)enqueue_forwarding_task(op_data);
2481         return (op_data.status==internal::SUCCEEDED);
2482     }
2483 
2484     //! Reserves an item.
2485     /**  false = no item can be reserved<BR>
2486          true = an item is reserved */
2487     bool try_reserve( T &v ) __TBB_override {
2488         buffer_operation op_data(res_item);
2489         op_data.elem = &v;
2490         my_aggregator.execute(&op_data);
2491         (void)enqueue_forwarding_task(op_data);
2492         return (op_data.status==internal::SUCCEEDED);
2493     }
2494 
2495     //! Release a reserved item.
2496     /**  true = item has been released and so remains in sender */
2497     bool try_release() __TBB_override {
2498         buffer_operation op_data(rel_res);
2499         my_aggregator.execute(&op_data);
2500         (void)enqueue_forwarding_task(op_data);
2501         return true;
2502     }
2503 
2504     //! Consumes a reserved item.
2505     /** true = item is removed from sender and reservation removed */
2506     bool try_consume() __TBB_override {
2507         buffer_operation op_data(con_res);
2508         my_aggregator.execute(&op_data);
2509         (void)enqueue_forwarding_task(op_data);
2510         return true;
2511     }
2512 
2513 protected:
2514 
2515     template< typename R, typename B > friend class run_and_put_task;
2516     template<typename X, typename Y> friend class internal::broadcast_cache;
2517     template<typename X, typename Y> friend class internal::round_robin_cache;
2518     //! receive an item, return a task *if possible
2519     task *try_put_task(const T &t) __TBB_override {
2520         buffer_operation op_data(t, put_item);
2521         my_aggregator.execute(&op_data);
2522         task *ft = grab_forwarding_task(op_data);
2523         // sequencer_nodes can return failure (if an item has been previously inserted)
2524         // We have to spawn the returned task if our own operation fails.
2525 
2526         if(ft && op_data.status ==internal::FAILED) {
2527             // we haven't succeeded queueing the item, but for some reason the
2528             // call returned a task (if another request resulted in a successful
2529             // forward this could happen.)  Queue the task and reset the pointer.
2530             internal::spawn_in_graph_arena(graph_reference(), *ft); ft = NULL;
2531         }
2532         else if(!ft && op_data.status ==internal::SUCCEEDED) {
2533             ft = SUCCESSFULLY_ENQUEUED;
2534         }
2535         return ft;
2536     }
2537 
2538     graph& graph_reference() const __TBB_override {
2539         return my_graph;
2540     }
2541 
2542     void reset_receiver(reset_flags /*f*/) __TBB_override { }
2543 
2544 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2545 public:
2546     void extract() __TBB_override {
2547         my_built_predecessors.receiver_extract(*this);
2548         my_successors.built_successors().sender_extract(*this);
2549     }
2550 #endif
2551 
2552 protected:
2553     void reset_node( reset_flags f) __TBB_override {
2554         internal::reservable_item_buffer<T, internals_allocator>::reset();
2555         // TODO: just clear structures
2556         if (f&rf_clear_edges) {
2557             my_successors.clear();
2558 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2559             my_built_predecessors.clear();
2560 #endif
2561         }
2562         forwarder_busy = false;
2563     }
2564 };  // buffer_node
2565 
2566 //! Forwards messages in FIFO order
2567 template <typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
2568 class queue_node : public buffer_node<T, Allocator> {
2569 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2570     __TBB_STATIC_ASSERT(
2571         (tbb::internal::is_same_type<Allocator, null_type>::value),
2572         "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
2573         "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
2574     );
2575 #endif
2576 protected:
2577     typedef buffer_node<T, Allocator> base_type;
2578     typedef typename base_type::size_type size_type;
2579     typedef typename base_type::buffer_operation queue_operation;
2580     typedef queue_node class_type;
2581 
2582 private:
2583     template<typename, typename> friend class buffer_node;
2584 
2585     bool is_item_valid() {
2586         return this->my_item_valid(this->my_head);
2587     }
2588 
2589     void try_put_and_add_task(task*& last_task) {
2590         task *new_task = this->my_successors.try_put_task(this->front());
2591         if (new_task) {
2592             // workaround for icc bug
2593             graph& graph_ref = this->graph_reference();
2594             last_task = combine_tasks(graph_ref, last_task, new_task);
2595             this->destroy_front();
2596         }
2597     }
2598 
2599 protected:
2600     void internal_forward_task(queue_operation *op) __TBB_override {
2601         this->internal_forward_task_impl(op, this);
2602     }
2603 
2604     void internal_pop(queue_operation *op) __TBB_override {
2605         if ( this->my_reserved || !this->my_item_valid(this->my_head)){
2606             __TBB_store_with_release(op->status, internal::FAILED);
2607         }
2608         else {
2609             this->pop_front(*(op->elem));
2610             __TBB_store_with_release(op->status, internal::SUCCEEDED);
2611         }
2612     }
2613     void internal_reserve(queue_operation *op) __TBB_override {
2614         if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2615             __TBB_store_with_release(op->status, internal::FAILED);
2616         }
2617         else {
2618             this->reserve_front(*(op->elem));
2619             __TBB_store_with_release(op->status, internal::SUCCEEDED);
2620         }
2621     }
2622     void internal_consume(queue_operation *op) __TBB_override {
2623         this->consume_front();
2624         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2625     }
2626 
2627 public:
2628     typedef T input_type;
2629     typedef T output_type;
2630     typedef typename receiver<input_type>::predecessor_type predecessor_type;
2631     typedef typename sender<output_type>::successor_type successor_type;
2632 
2633     //! Constructor
2634     __TBB_NOINLINE_SYM explicit queue_node( graph &g ) : base_type(g) {
2635         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2636                                  static_cast<receiver<input_type> *>(this),
2637                                  static_cast<sender<output_type> *>(this) );
2638     }
2639 
2640 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2641     template <typename... Args>
2642     queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) {
2643         make_edges_in_order(nodes, *this);
2644     }
2645 #endif
2646 
2647     //! Copy constructor
2648     __TBB_NOINLINE_SYM queue_node( const queue_node& src) : base_type(src) {
2649         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2650                                  static_cast<receiver<input_type> *>(this),
2651                                  static_cast<sender<output_type> *>(this) );
2652     }
2653 
2654 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2655     void set_name( const char *name ) __TBB_override {
2656         tbb::internal::fgt_node_desc( this, name );
2657     }
2658 #endif
2659 
2660 protected:
2661     void reset_node( reset_flags f) __TBB_override {
2662         base_type::reset_node(f);
2663     }
2664 };  // queue_node
2665 
2666 //! Forwards messages in sequence order
2667 template< typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
2668 class sequencer_node : public queue_node<T, Allocator> {
2669     internal::function_body< T, size_t > *my_sequencer;
2670     // my_sequencer should be a benign function and must be callable
2671     // from a parallel context.  Does this mean it needn't be reset?
2672 public:
2673 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2674     __TBB_STATIC_ASSERT(
2675         (tbb::internal::is_same_type<Allocator, null_type>::value),
2676         "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
2677         "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
2678     );
2679 #endif
2680     typedef T input_type;
2681     typedef T output_type;
2682     typedef typename receiver<input_type>::predecessor_type predecessor_type;
2683     typedef typename sender<output_type>::successor_type successor_type;
2684 
2685     //! Constructor
2686     template< typename Sequencer >
2687     __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, Allocator>(g),
2688         my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {
2689         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2690                                  static_cast<receiver<input_type> *>(this),
2691                                  static_cast<sender<output_type> *>(this) );
2692     }
2693 
2694 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2695     template <typename Sequencer, typename... Args>
2696     sequencer_node( const node_set<Args...>& nodes, const Sequencer& s)
2697         : sequencer_node(nodes.graph_reference(), s) {
2698         make_edges_in_order(nodes, *this);
2699     }
2700 #endif
2701 
2702     //! Copy constructor
2703     __TBB_NOINLINE_SYM sequencer_node( const sequencer_node& src ) : queue_node<T, Allocator>(src),
2704         my_sequencer( src.my_sequencer->clone() ) {
2705         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2706                                  static_cast<receiver<input_type> *>(this),
2707                                  static_cast<sender<output_type> *>(this) );
2708     }
2709 
2710     //! Destructor
2711     ~sequencer_node() { delete my_sequencer; }
2712 
2713 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2714     void set_name( const char *name ) __TBB_override {
2715         tbb::internal::fgt_node_desc( this, name );
2716     }
2717 #endif
2718 
2719 protected:
2720     typedef typename buffer_node<T, Allocator>::size_type size_type;
2721     typedef typename buffer_node<T, Allocator>::buffer_operation sequencer_operation;
2722 
2723 private:
2724     bool internal_push(sequencer_operation *op) __TBB_override {
2725         size_type tag = (*my_sequencer)(*(op->elem));
2726 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
2727         if (tag < this->my_head) {
2728             // have already emitted a message with this tag
2729             __TBB_store_with_release(op->status, internal::FAILED);
2730             return false;
2731         }
2732 #endif
2733         // cannot modify this->my_tail now; the buffer would be inconsistent.
2734         size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
2735 
2736         if (this->size(new_tail) > this->capacity()) {
2737             this->grow_my_array(this->size(new_tail));
2738         }
2739         this->my_tail = new_tail;
2740 
2741         const internal::op_stat res = this->place_item(tag, *(op->elem)) ? internal::SUCCEEDED : internal::FAILED;
2742         __TBB_store_with_release(op->status, res);
2743         return res ==internal::SUCCEEDED;
2744     }
2745 };  // sequencer_node
2746 
2747 //! Forwards messages in priority order
2748 template<typename T, typename Compare = std::less<T>, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T)>
2749 class priority_queue_node : public buffer_node<T, Allocator> {
2750 public:
2751 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2752     __TBB_STATIC_ASSERT(
2753         (tbb::internal::is_same_type<Allocator, null_type>::value),
2754         "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. "
2755         "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR."
2756     );
2757 #endif
2758     typedef T input_type;
2759     typedef T output_type;
2760     typedef buffer_node<T,Allocator> base_type;
2761     typedef priority_queue_node class_type;
2762     typedef typename receiver<input_type>::predecessor_type predecessor_type;
2763     typedef typename sender<output_type>::successor_type successor_type;
2764 
2765     //! Constructor
2766     __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() )
2767         : buffer_node<T, Allocator>(g), compare(comp), mark(0) {
2768         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2769                                  static_cast<receiver<input_type> *>(this),
2770                                  static_cast<sender<output_type> *>(this) );
2771     }
2772 
2773 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2774     template <typename... Args>
2775     priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare())
2776         : priority_queue_node(nodes.graph_reference(), comp) {
2777         make_edges_in_order(nodes, *this);
2778     }
2779 #endif
2780 
2781     //! Copy constructor
2782     __TBB_NOINLINE_SYM priority_queue_node( const priority_queue_node &src )
2783         : buffer_node<T, Allocator>(src), mark(0)
2784     {
2785         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2786                                  static_cast<receiver<input_type> *>(this),
2787                                  static_cast<sender<output_type> *>(this) );
2788     }
2789 
2790 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2791     void set_name( const char *name ) __TBB_override {
2792         tbb::internal::fgt_node_desc( this, name );
2793     }
2794 #endif
2795 
2796 protected:
2797 
2798     void reset_node( reset_flags f) __TBB_override {
2799         mark = 0;
2800         base_type::reset_node(f);
2801     }
2802 
2803     typedef typename buffer_node<T, Allocator>::size_type size_type;
2804     typedef typename buffer_node<T, Allocator>::item_type item_type;
2805     typedef typename buffer_node<T, Allocator>::buffer_operation prio_operation;
2806 
2807     //! Tries to forward valid items to successors
2808     void internal_forward_task(prio_operation *op) __TBB_override {
2809         this->internal_forward_task_impl(op, this);
2810     }
2811 
2812     void handle_operations(prio_operation *op_list) __TBB_override {
2813         this->handle_operations_impl(op_list, this);
2814     }
2815 
2816     bool internal_push(prio_operation *op) __TBB_override {
2817         prio_push(*(op->elem));
2818         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2819         return true;
2820     }
2821 
2822     void internal_pop(prio_operation *op) __TBB_override {
2823         // if empty or already reserved, don't pop
2824         if ( this->my_reserved == true || this->my_tail == 0 ) {
2825             __TBB_store_with_release(op->status, internal::FAILED);
2826             return;
2827         }
2828 
2829         *(op->elem) = prio();
2830         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2831         prio_pop();
2832 
2833     }
2834 
2835     // pops the highest-priority item, saves copy
2836     void internal_reserve(prio_operation *op) __TBB_override {
2837         if (this->my_reserved == true || this->my_tail == 0) {
2838             __TBB_store_with_release(op->status, internal::FAILED);
2839             return;
2840         }
2841         this->my_reserved = true;
2842         *(op->elem) = prio();
2843         reserved_item = *(op->elem);
2844         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2845         prio_pop();
2846     }
2847 
2848     void internal_consume(prio_operation *op) __TBB_override {
2849         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2850         this->my_reserved = false;
2851         reserved_item = input_type();
2852     }
2853 
2854     void internal_release(prio_operation *op) __TBB_override {
2855         __TBB_store_with_release(op->status, internal::SUCCEEDED);
2856         prio_push(reserved_item);
2857         this->my_reserved = false;
2858         reserved_item = input_type();
2859     }
2860 
2861 private:
2862     template<typename, typename> friend class buffer_node;
2863 
2864     void order() {
2865         if (mark < this->my_tail) heapify();
2866         __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
2867     }
2868 
2869     bool is_item_valid() {
2870         return this->my_tail > 0;
2871     }
2872 
2873     void try_put_and_add_task(task*& last_task) {
2874         task * new_task = this->my_successors.try_put_task(this->prio());
2875         if (new_task) {
2876             // workaround for icc bug
2877             graph& graph_ref = this->graph_reference();
2878             last_task = combine_tasks(graph_ref, last_task, new_task);
2879             prio_pop();
2880         }
2881     }
2882 
2883 private:
2884     Compare compare;
2885     size_type mark;
2886 
2887     input_type reserved_item;
2888 
2889     // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
2890     bool prio_use_tail() {
2891         __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
2892         return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
2893     }
2894 
2895     // prio_push: checks that the item will fit, expand array if necessary, put at end
2896     void prio_push(const T &src) {
2897         if ( this->my_tail >= this->my_array_size )
2898             this->grow_my_array( this->my_tail + 1 );
2899         (void) this->place_item(this->my_tail, src);
2900         ++(this->my_tail);
2901         __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
2902     }
2903 
2904     // prio_pop: deletes highest priority item from the array, and if it is item
2905     // 0, move last item to 0 and reheap.  If end of array, just destroy and decrement tail
2906     // and mark.  Assumes the array has already been tested for emptiness; no failure.
2907     void prio_pop()  {
2908         if (prio_use_tail()) {
2909             // there are newly pushed elements; last one higher than top
2910             // copy the data
2911             this->destroy_item(this->my_tail-1);
2912             --(this->my_tail);
2913             __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2914             return;
2915         }
2916         this->destroy_item(0);
2917         if(this->my_tail > 1) {
2918             // push the last element down heap
2919             __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
2920             this->move_item(0,this->my_tail - 1);
2921         }
2922         --(this->my_tail);
2923         if(mark > this->my_tail) --mark;
2924         if (this->my_tail > 1) // don't reheap for heap of size 1
2925             reheap();
2926         __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2927     }
2928 
2929     const T& prio() {
2930         return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
2931     }
2932 
2933     // turn array into heap
2934     void heapify() {
2935         if(this->my_tail == 0) {
2936             mark = 0;
2937             return;
2938         }
2939         if (!mark) mark = 1;
2940         for (; mark<this->my_tail; ++mark) { // for each unheaped element
2941             size_type cur_pos = mark;
2942             input_type to_place;
2943             this->fetch_item(mark,to_place);
2944             do { // push to_place up the heap
2945                 size_type parent = (cur_pos-1)>>1;
2946                 if (!compare(this->get_my_item(parent), to_place))
2947                     break;
2948                 this->move_item(cur_pos, parent);
2949                 cur_pos = parent;
2950             } while( cur_pos );
2951             (void) this->place_item(cur_pos, to_place);
2952         }
2953     }
2954 
2955     // otherwise heapified array with new root element; rearrange to heap
2956     void reheap() {
2957         size_type cur_pos=0, child=1;
2958         while (child < mark) {
2959             size_type target = child;
2960             if (child+1<mark &&
2961                 compare(this->get_my_item(child),
2962                         this->get_my_item(child+1)))
2963                 ++target;
2964             // target now has the higher priority child
2965             if (compare(this->get_my_item(target),
2966                         this->get_my_item(cur_pos)))
2967                 break;
2968             // swap
2969             this->swap_items(cur_pos, target);
2970             cur_pos = target;
2971             child = (cur_pos<<1)+1;
2972         }
2973     }
2974 };  // priority_queue_node
2975 
2976 } // interfaceX
2977 
2978 namespace interface11 {
2979 
2980 //! Forwards messages only if the threshold has not been reached
2981 /** This node forwards items until its threshold is reached.
2982     It contains no buffering.  If the downstream node rejects, the
2983     message is dropped. */
2984 template< typename T, typename DecrementType=continue_msg >
2985 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
2986 public:
2987     typedef T input_type;
2988     typedef T output_type;
2989     typedef typename receiver<input_type>::predecessor_type predecessor_type;
2990     typedef typename sender<output_type>::successor_type successor_type;
2991 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2992     typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
2993     typedef typename sender<output_type>::built_successors_type built_successors_type;
2994     typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
2995     typedef typename sender<output_type>::successor_list_type successor_list_type;
2996 #endif
2997     //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
2998 
2999 private:
3000     size_t my_threshold;
3001     size_t my_count; //number of successful puts
3002     size_t my_tries; //number of active put attempts
3003     internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors;
3004     spin_mutex my_mutex;
3005     internal::broadcast_cache< T > my_successors;
3006     __TBB_DEPRECATED_LIMITER_EXPR( int init_decrement_predecessors; )
3007 
3008     friend class internal::forward_task_bypass< limiter_node<T,DecrementType> >;
3009 
3010     // Let decrementer call decrement_counter()
3011     friend class internal::decrementer< limiter_node<T,DecrementType>, DecrementType >;
3012 
3013     bool check_conditions() {  // always called under lock
3014         return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
3015     }
3016 
3017     // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEUED
3018     task *forward_task() {
3019         input_type v;
3020         task *rval = NULL;
3021         bool reserved = false;
3022             {
3023                 spin_mutex::scoped_lock lock(my_mutex);
3024                 if ( check_conditions() )
3025                     ++my_tries;
3026                 else
3027                     return NULL;
3028             }
3029 
3030         //SUCCESS
3031         // if we can reserve and can put, we consume the reservation
3032         // we increment the count and decrement the tries
3033         if ( (my_predecessors.try_reserve(v)) == true ){
3034             reserved=true;
3035             if ( (rval = my_successors.try_put_task(v)) != NULL ){
3036                 {
3037                     spin_mutex::scoped_lock lock(my_mutex);
3038                     ++my_count;
3039                     --my_tries;
3040                     my_predecessors.try_consume();
3041                     if ( check_conditions() ) {
3042                         if ( internal::is_graph_active(this->my_graph) ) {
3043                             task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3044                                 internal::forward_task_bypass< limiter_node<T, DecrementType> >( *this );
3045                             internal::spawn_in_graph_arena(graph_reference(), *rtask);
3046                         }
3047                     }
3048                 }
3049                 return rval;
3050             }
3051         }
3052         //FAILURE
3053         //if we can't reserve, we decrement the tries
3054         //if we can reserve but can't put, we decrement the tries and release the reservation
3055         {
3056             spin_mutex::scoped_lock lock(my_mutex);
3057             --my_tries;
3058             if (reserved) my_predecessors.try_release();
3059             if ( check_conditions() ) {
3060                 if ( internal::is_graph_active(this->my_graph) ) {
3061                     task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3062                         internal::forward_task_bypass< limiter_node<T, DecrementType> >( *this );
3063                     __TBB_ASSERT(!rval, "Have two tasks to handle");
3064                     return rtask;
3065                 }
3066             }
3067             return rval;
3068         }
3069     }
3070 
3071     void forward() {
3072         __TBB_ASSERT(false, "Should never be called");
3073         return;
3074     }
3075 
3076     task* decrement_counter( long long delta ) {
3077         {
3078             spin_mutex::scoped_lock lock(my_mutex);
3079             if( delta > 0 && size_t(delta) > my_count )
3080                 my_count = 0;
3081             else if( delta < 0 && size_t(delta) > my_threshold - my_count )
3082                 my_count = my_threshold;
3083             else
3084                 my_count -= size_t(delta); // absolute value of delta is sufficiently small
3085         }
3086         return forward_task();
3087     }
3088 
3089     void initialize() {
3090         my_predecessors.set_owner(this);
3091         my_successors.set_owner(this);
3092         decrement.set_owner(this);
3093         tbb::internal::fgt_node(
3094             CODEPTR(), tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
3095             static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
3096             static_cast<sender<output_type> *>(this)
3097         );
3098     }
3099 public:
3100     //! The internal receiver< DecrementType > that decrements the count
3101     internal::decrementer< limiter_node<T, DecrementType>, DecrementType > decrement;
3102 
3103 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
3104     __TBB_STATIC_ASSERT( (tbb::internal::is_same_type<DecrementType, continue_msg>::value),
3105                          "Deprecated interface of the limiter node can be used only in conjunction "
3106                          "with continue_msg as the type of DecrementType template parameter." );
3107 #endif // Check for incompatible interface
3108 
3109     //! Constructor
3110     limiter_node(graph &g,
3111                  __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
3112         : graph_node(g), my_threshold(threshold), my_count(0),
3113           __TBB_DEPRECATED_LIMITER_ARG4(
3114               my_tries(0), decrement(),
3115               init_decrement_predecessors(num_decrement_predecessors),
3116               decrement(num_decrement_predecessors)) {
3117         initialize();
3118     }
3119 
3120 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3121     template <typename... Args>
3122     limiter_node(const node_set<Args...>& nodes, size_t threshold)
3123         : limiter_node(nodes.graph_reference(), threshold) {
3124         make_edges_in_order(nodes, *this);
3125     }
3126 #endif
3127 
3128     //! Copy constructor
3129     limiter_node( const limiter_node& src ) :
3130         graph_node(src.my_graph), receiver<T>(), sender<T>(),
3131         my_threshold(src.my_threshold), my_count(0),
3132         __TBB_DEPRECATED_LIMITER_ARG4(
3133             my_tries(0), decrement(),
3134             init_decrement_predecessors(src.init_decrement_predecessors),
3135             decrement(src.init_decrement_predecessors)) {
3136         initialize();
3137     }
3138 
3139 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3140     void set_name( const char *name ) __TBB_override {
3141         tbb::internal::fgt_node_desc( this, name );
3142     }
3143 #endif
3144 
3145     //! Replace the current successor with this new successor
3146     bool register_successor( successor_type &r ) __TBB_override {
3147         spin_mutex::scoped_lock lock(my_mutex);
3148         bool was_empty = my_successors.empty();
3149         my_successors.register_successor(r);
3150         //spawn a forward task if this is the only successor
3151         if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
3152             if ( internal::is_graph_active(this->my_graph) ) {
3153                 task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3154                     internal::forward_task_bypass < limiter_node<T, DecrementType> >( *this );
3155                 internal::spawn_in_graph_arena(graph_reference(), *task);
3156             }
3157         }
3158         return true;
3159     }
3160 
3161     //! Removes a successor from this node
3162     /** r.remove_predecessor(*this) is also called. */
3163     bool remove_successor( successor_type &r ) __TBB_override {
3164         r.remove_predecessor(*this);
3165         my_successors.remove_successor(r);
3166         return true;
3167     }
3168 
3169 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3170     built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
3171     built_predecessors_type &built_predecessors() __TBB_override { return my_predecessors.built_predecessors(); }
3172 
3173     void internal_add_built_successor(successor_type &src) __TBB_override {
3174         my_successors.internal_add_built_successor(src);
3175     }
3176 
3177     void internal_delete_built_successor(successor_type &src) __TBB_override {
3178         my_successors.internal_delete_built_successor(src);
3179     }
3180 
3181     size_t successor_count() __TBB_override { return my_successors.successor_count(); }
3182 
3183     void copy_successors(successor_list_type &v) __TBB_override {
3184         my_successors.copy_successors(v);
3185     }
3186 
3187     void internal_add_built_predecessor(predecessor_type &src) __TBB_override {
3188         my_predecessors.internal_add_built_predecessor(src);
3189     }
3190 
3191     void internal_delete_built_predecessor(predecessor_type &src) __TBB_override {
3192         my_predecessors.internal_delete_built_predecessor(src);
3193     }
3194 
3195     size_t predecessor_count() __TBB_override { return my_predecessors.predecessor_count(); }
3196 
3197     void copy_predecessors(predecessor_list_type &v) __TBB_override {
3198         my_predecessors.copy_predecessors(v);
3199     }
3200 
3201     void extract() __TBB_override {
3202         my_count = 0;
3203         my_successors.built_successors().sender_extract(*this);
3204         my_predecessors.built_predecessors().receiver_extract(*this);
3205         decrement.built_predecessors().receiver_extract(decrement);
3206     }
3207 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3208 
3209     //! Adds src to the list of cached predecessors.
3210     bool register_predecessor( predecessor_type &src ) __TBB_override {
3211         spin_mutex::scoped_lock lock(my_mutex);
3212         my_predecessors.add( src );
3213         if ( my_count + my_tries < my_threshold && !my_successors.empty() && internal::is_graph_active(this->my_graph) ) {
3214             task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3215                 internal::forward_task_bypass < limiter_node<T, DecrementType> >( *this );
3216             internal::spawn_in_graph_arena(graph_reference(), *task);
3217         }
3218         return true;
3219     }
3220 
3221     //! Removes src from the list of cached predecessors.
3222     bool remove_predecessor( predecessor_type &src ) __TBB_override {
3223         my_predecessors.remove( src );
3224         return true;
3225     }
3226 
3227 protected:
3228 
3229     template< typename R, typename B > friend class run_and_put_task;
3230     template<typename X, typename Y> friend class internal::broadcast_cache;
3231     template<typename X, typename Y> friend class internal::round_robin_cache;
3232     //! Puts an item to this receiver
3233     task *try_put_task( const T &t ) __TBB_override {
3234         {
3235             spin_mutex::scoped_lock lock(my_mutex);
3236             if ( my_count + my_tries >= my_threshold )
3237                 return NULL;
3238             else
3239                 ++my_tries;
3240         }
3241 
3242         task * rtask = my_successors.try_put_task(t);
3243 
3244         if ( !rtask ) {  // try_put_task failed.
3245             spin_mutex::scoped_lock lock(my_mutex);
3246             --my_tries;
3247             if (check_conditions() && internal::is_graph_active(this->my_graph)) {
3248                 rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3249                     internal::forward_task_bypass< limiter_node<T, DecrementType> >( *this );
3250             }
3251         }
3252         else {
3253             spin_mutex::scoped_lock lock(my_mutex);
3254             ++my_count;
3255             --my_tries;
3256              }
3257         return rtask;
3258     }
3259 
3260     graph& graph_reference() const __TBB_override { return my_graph; }
3261 
3262     void reset_receiver(reset_flags /*f*/) __TBB_override {
3263         __TBB_ASSERT(false,NULL);  // should never be called
3264     }
3265 
3266     void reset_node( reset_flags f) __TBB_override {
3267         my_count = 0;
3268         if(f & rf_clear_edges) {
3269             my_predecessors.clear();
3270             my_successors.clear();
3271         }
3272         else
3273         {
3274             my_predecessors.reset( );
3275         }
3276         decrement.reset_receiver(f);
3277     }
3278 };  // limiter_node
3279 
3280 #include "internal/_flow_graph_join_impl.h"
3281 
3282 using internal::reserving_port;
3283 using internal::queueing_port;
3284 using internal::key_matching_port;
3285 using internal::input_port;
3286 using internal::tag_value;
3287 
3288 template<typename OutputTuple, typename JP=queueing> class join_node;
3289 
3290 template<typename OutputTuple>
3291 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
3292 private:
3293     static const int N = tbb::flow::tuple_size<OutputTuple>::value;
3294     typedef typename internal::unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
3295 public:
3296     typedef OutputTuple output_type;
3297     typedef typename unfolded_type::input_ports_type input_ports_type;
3298      __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
3299         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
3300                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
3301     }
3302 
3303 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3304     template <typename... Args>
3305     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) {
3306         make_edges_in_order(nodes, *this);
3307     }
3308 #endif
3309 
3310     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
3311         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
3312                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
3313     }
3314 
3315 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3316     void set_name( const char *name ) __TBB_override {
3317         tbb::internal::fgt_node_desc( this, name );
3318     }
3319 #endif
3320 
3321 };
3322 
3323 template<typename OutputTuple>
3324 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
3325 private:
3326     static const int N = tbb::flow::tuple_size<OutputTuple>::value;
3327     typedef typename internal::unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
3328 public:
3329     typedef OutputTuple output_type;
3330     typedef typename unfolded_type::input_ports_type input_ports_type;
3331      __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
3332         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
3333                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
3334     }
3335 
3336 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3337     template <typename... Args>
3338     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) {
3339         make_edges_in_order(nodes, *this);
3340     }
3341 #endif
3342 
3343     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
3344         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
3345                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
3346     }
3347 
3348 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3349     void set_name( const char *name ) __TBB_override {
3350         tbb::internal::fgt_node_desc( this, name );
3351     }
3352 #endif
3353 
3354 };
3355 
3356 // template for key_matching join_node
3357 // tag_matching join_node is a specialization of key_matching, and is source-compatible.
3358 template<typename OutputTuple, typename K, typename KHash>
3359 class join_node<OutputTuple, key_matching<K, KHash> > : public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value,
3360       key_matching_port, OutputTuple, key_matching<K,KHash> > {
3361 private:
3362     static const int N = tbb::flow::tuple_size<OutputTuple>::value;
3363     typedef typename internal::unfolded_join_node<N, key_matching_port, OutputTuple, key_matching<K,KHash> > unfolded_type;
3364 public:
3365     typedef OutputTuple output_type;
3366     typedef typename unfolded_type::input_ports_type input_ports_type;
3367 
3368 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
3369     join_node(graph &g) : unfolded_type(g) {}
3370 
3371 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3372     template <typename... Args>
3373     join_node(const node_set<Args...>& nodes, key_matching<K, KHash> = key_matching<K, KHash>())
3374         : join_node(nodes.graph_reference()) {
3375         make_edges_in_order(nodes, *this);
3376     }
3377 #endif
3378 
3379 #endif  /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
3380 
3381     template<typename __TBB_B0, typename __TBB_B1>
3382      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
3383         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3384                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3385     }
3386     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
3387      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
3388         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3389                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3390     }
3391     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
3392      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
3393         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3394                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3395     }
3396     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
3397      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
3398             unfolded_type(g, b0, b1, b2, b3, b4) {
3399         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3400                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3401     }
3402 #if __TBB_VARIADIC_MAX >= 6
3403     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3404         typename __TBB_B5>
3405      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
3406             unfolded_type(g, b0, b1, b2, b3, b4, b5) {
3407         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3408                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3409     }
3410 #endif
3411 #if __TBB_VARIADIC_MAX >= 7
3412     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3413         typename __TBB_B5, typename __TBB_B6>
3414      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
3415             unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
3416         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3417                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3418     }
3419 #endif
3420 #if __TBB_VARIADIC_MAX >= 8
3421     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3422         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
3423      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3424             __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
3425         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3426                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3427     }
3428 #endif
3429 #if __TBB_VARIADIC_MAX >= 9
3430     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3431         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
3432      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3433             __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
3434         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3435                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3436     }
3437 #endif
3438 #if __TBB_VARIADIC_MAX >= 10
3439     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3440         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
3441      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3442             __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
3443         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3444                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3445     }
3446 #endif
3447 
3448 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3449     template <typename... Args, typename... Bodies>
3450     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies)
3451         : join_node(nodes.graph_reference(), bodies...) {
3452         make_edges_in_order(nodes, *this);
3453     }
3454 #endif
3455 
3456     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
3457         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3458                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3459     }
3460 
3461 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3462     void set_name( const char *name ) __TBB_override {
3463         tbb::internal::fgt_node_desc( this, name );
3464     }
3465 #endif
3466 
3467 };
3468 
3469 // indexer node
3470 #include "internal/_flow_graph_indexer_impl.h"
3471 
3472 // TODO: Implement interface with variadic template or tuple
3473 template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
3474                       typename T4=null_type, typename T5=null_type, typename T6=null_type,
3475                       typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
3476 
3477 //indexer node specializations
3478 template<typename T0>
3479 class indexer_node<T0> : public internal::unfolded_indexer_node<tuple<T0> > {
3480 private:
3481     static const int N = 1;
3482 public:
3483     typedef tuple<T0> InputTuple;
3484     typedef typename internal::tagged_msg<size_t, T0> output_type;
3485     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3486     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3487         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3488                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3489     }
3490 
3491 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3492     template <typename... Args>
3493     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3494         make_edges_in_order(nodes, *this);
3495     }
3496 #endif
3497 
3498     // Copy constructor
3499     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3500         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3501                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3502     }
3503 
3504 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3505      void set_name( const char *name ) __TBB_override {
3506         tbb::internal::fgt_node_desc( this, name );
3507     }
3508 #endif
3509 };
3510 
3511 template<typename T0, typename T1>
3512 class indexer_node<T0, T1> : public internal::unfolded_indexer_node<tuple<T0, T1> > {
3513 private:
3514     static const int N = 2;
3515 public:
3516     typedef tuple<T0, T1> InputTuple;
3517     typedef typename internal::tagged_msg<size_t, T0, T1> output_type;
3518     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3519     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3520         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3521                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3522     }
3523 
3524 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3525     template <typename... Args>
3526     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3527         make_edges_in_order(nodes, *this);
3528     }
3529 #endif
3530 
3531     // Copy constructor
3532     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3533         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3534                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3535     }
3536 
3537 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3538      void set_name( const char *name ) __TBB_override {
3539         tbb::internal::fgt_node_desc( this, name );
3540     }
3541 #endif
3542 };
3543 
3544 template<typename T0, typename T1, typename T2>
3545 class indexer_node<T0, T1, T2> : public internal::unfolded_indexer_node<tuple<T0, T1, T2> > {
3546 private:
3547     static const int N = 3;
3548 public:
3549     typedef tuple<T0, T1, T2> InputTuple;
3550     typedef typename internal::tagged_msg<size_t, T0, T1, T2> output_type;
3551     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3552     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3553         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3554                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3555     }
3556 
3557 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3558     template <typename... Args>
3559     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3560         make_edges_in_order(nodes, *this);
3561     }
3562 #endif
3563 
3564     // Copy constructor
3565     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3566         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3567                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3568     }
3569 
3570 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3571     void set_name( const char *name ) __TBB_override {
3572         tbb::internal::fgt_node_desc( this, name );
3573     }
3574 #endif
3575 };
3576 
3577 template<typename T0, typename T1, typename T2, typename T3>
3578 class indexer_node<T0, T1, T2, T3> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3> > {
3579 private:
3580     static const int N = 4;
3581 public:
3582     typedef tuple<T0, T1, T2, T3> InputTuple;
3583     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3> output_type;
3584     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3585     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3586         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3587                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3588     }
3589 
3590 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3591     template <typename... Args>
3592     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3593         make_edges_in_order(nodes, *this);
3594     }
3595 #endif
3596 
3597     // Copy constructor
3598     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3599         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3600                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3601     }
3602 
3603 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3604     void set_name( const char *name ) __TBB_override {
3605         tbb::internal::fgt_node_desc( this, name );
3606     }
3607 #endif
3608 };
3609 
3610 template<typename T0, typename T1, typename T2, typename T3, typename T4>
3611 class indexer_node<T0, T1, T2, T3, T4> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4> > {
3612 private:
3613     static const int N = 5;
3614 public:
3615     typedef tuple<T0, T1, T2, T3, T4> InputTuple;
3616     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4> output_type;
3617     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3618     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3619         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3620                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3621     }
3622 
3623 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3624     template <typename... Args>
3625     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3626         make_edges_in_order(nodes, *this);
3627     }
3628 #endif
3629 
3630     // Copy constructor
3631     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3632         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3633                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3634     }
3635 
3636 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3637     void set_name( const char *name ) __TBB_override {
3638         tbb::internal::fgt_node_desc( this, name );
3639     }
3640 #endif
3641 };
3642 
3643 #if __TBB_VARIADIC_MAX >= 6
3644 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
3645 class indexer_node<T0, T1, T2, T3, T4, T5> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
3646 private:
3647     static const int N = 6;
3648 public:
3649     typedef tuple<T0, T1, T2, T3, T4, T5> InputTuple;
3650     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5> output_type;
3651     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3652     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3653         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3654                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3655     }
3656 
3657 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3658     template <typename... Args>
3659     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3660         make_edges_in_order(nodes, *this);
3661     }
3662 #endif
3663 
3664     // Copy constructor
3665     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3666         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3667                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3668     }
3669 
3670 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3671     void set_name( const char *name ) __TBB_override {
3672         tbb::internal::fgt_node_desc( this, name );
3673     }
3674 #endif
3675 };
3676 #endif //variadic max 6
3677 
3678 #if __TBB_VARIADIC_MAX >= 7
3679 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3680          typename T6>
3681 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
3682 private:
3683     static const int N = 7;
3684 public:
3685     typedef tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
3686     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6> output_type;
3687     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3688     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3689         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3690                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3691     }
3692 
3693 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3694     template <typename... Args>
3695     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3696         make_edges_in_order(nodes, *this);
3697     }
3698 #endif
3699 
3700     // Copy constructor
3701     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3702         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3703                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3704     }
3705 
3706 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3707     void set_name( const char *name ) __TBB_override {
3708         tbb::internal::fgt_node_desc( this, name );
3709     }
3710 #endif
3711 };
3712 #endif //variadic max 7
3713 
3714 #if __TBB_VARIADIC_MAX >= 8
3715 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3716          typename T6, typename T7>
3717 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
3718 private:
3719     static const int N = 8;
3720 public:
3721     typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
3722     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7> output_type;
3723     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3724     indexer_node(graph& g) : unfolded_type(g) {
3725         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3726                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3727     }
3728 
3729 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3730     template <typename... Args>
3731     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3732         make_edges_in_order(nodes, *this);
3733     }
3734 #endif
3735 
3736     // Copy constructor
3737     indexer_node( const indexer_node& other ) : unfolded_type(other) {
3738         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3739                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3740     }
3741 
3742 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3743     void set_name( const char *name ) __TBB_override {
3744         tbb::internal::fgt_node_desc( this, name );
3745     }
3746 #endif
3747 };
3748 #endif //variadic max 8
3749 
3750 #if __TBB_VARIADIC_MAX >= 9
3751 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3752          typename T6, typename T7, typename T8>
3753 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
3754 private:
3755     static const int N = 9;
3756 public:
3757     typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
3758     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8> output_type;
3759     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3760     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3761         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3762                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3763     }
3764 
3765 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3766     template <typename... Args>
3767     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3768         make_edges_in_order(nodes, *this);
3769     }
3770 #endif
3771 
3772     // Copy constructor
3773     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3774         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3775                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3776     }
3777 
3778 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3779     void set_name( const char *name ) __TBB_override {
3780         tbb::internal::fgt_node_desc( this, name );
3781     }
3782 #endif
3783 };
3784 #endif //variadic max 9
3785 
3786 #if __TBB_VARIADIC_MAX >= 10
3787 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3788          typename T6, typename T7, typename T8, typename T9>
3789 class indexer_node/*default*/ : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
3790 private:
3791     static const int N = 10;
3792 public:
3793     typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
3794     typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
3795     typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3796     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3797         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3798                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3799     }
3800 
3801 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3802     template <typename... Args>
3803     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3804         make_edges_in_order(nodes, *this);
3805     }
3806 #endif
3807 
3808     // Copy constructor
3809     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3810         tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3811                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
3812     }
3813 
3814 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3815     void set_name( const char *name ) __TBB_override {
3816         tbb::internal::fgt_node_desc( this, name );
3817     }
3818 #endif
3819 };
3820 #endif //variadic max 10
3821 
3822 #if __TBB_PREVIEW_ASYNC_MSG
3823 inline void internal_make_edge( internal::untyped_sender &p, internal::untyped_receiver &s ) {
3824 #else
3825 template< typename T >
3826 inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
3827 #endif
3828 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3829     s.internal_add_built_predecessor(p);
3830     p.internal_add_built_successor(s);
3831 #endif
3832     p.register_successor( s );
3833     tbb::internal::fgt_make_edge( &p, &s );
3834 }
3835 
3836 //! Makes an edge between a single predecessor and a single successor
3837 template< typename T >
3838 inline void make_edge( sender<T> &p, receiver<T> &s ) {
3839     internal_make_edge( p, s );
3840 }
3841 
3842 #if __TBB_PREVIEW_ASYNC_MSG
3843 template< typename TS, typename TR,
3844     typename = typename tbb::internal::enable_if<tbb::internal::is_same_type<TS, internal::untyped_sender>::value
3845                                               || tbb::internal::is_same_type<TR, internal::untyped_receiver>::value>::type>
3846 inline void make_edge( TS &p, TR &s ) {
3847     internal_make_edge( p, s );
3848 }
3849 
3850 template< typename T >
3851 inline void make_edge( sender<T> &p, receiver<typename T::async_msg_data_type> &s ) {
3852     internal_make_edge( p, s );
3853 }
3854 
3855 template< typename T >
3856 inline void make_edge( sender<typename T::async_msg_data_type> &p, receiver<T> &s ) {
3857     internal_make_edge( p, s );
3858 }
3859 
3860 #endif // __TBB_PREVIEW_ASYNC_MSG
3861 
3862 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3863 //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
3864 template< typename T, typename V,
3865           typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3866 inline void make_edge( T& output, V& input) {
3867     make_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3868 }
3869 
3870 //Makes an edge from port 0 of a multi-output predecessor to a receiver.
3871 template< typename T, typename R,
3872           typename = typename T::output_ports_type >
3873 inline void make_edge( T& output, receiver<R>& input) {
3874      make_edge(get<0>(output.output_ports()), input);
3875 }
3876 
3877 //Makes an edge from a sender to port 0 of a multi-input successor.
3878 template< typename S,  typename V,
3879           typename = typename V::input_ports_type >
3880 inline void make_edge( sender<S>& output, V& input) {
3881      make_edge(output, get<0>(input.input_ports()));
3882 }
3883 #endif
3884 
3885 #if __TBB_PREVIEW_ASYNC_MSG
3886 inline void internal_remove_edge( internal::untyped_sender &p, internal::untyped_receiver &s ) {
3887 #else
3888 template< typename T >
3889 inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
3890 #endif
3891     p.remove_successor( s );
3892 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3893     // TODO: should we try to remove p from the predecessor list of s, in case the edge is reversed?
3894     p.internal_delete_built_successor(s);
3895     s.internal_delete_built_predecessor(p);
3896 #endif
3897     tbb::internal::fgt_remove_edge( &p, &s );
3898 }
3899 
3900 //! Removes an edge between a single predecessor and a single successor
3901 template< typename T >
3902 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
3903     internal_remove_edge( p, s );
3904 }
3905 
3906 #if __TBB_PREVIEW_ASYNC_MSG
3907 template< typename TS, typename TR,
3908     typename = typename tbb::internal::enable_if<tbb::internal::is_same_type<TS, internal::untyped_sender>::value
3909                                               || tbb::internal::is_same_type<TR, internal::untyped_receiver>::value>::type>
3910 inline void remove_edge( TS &p, TR &s ) {
3911     internal_remove_edge( p, s );
3912 }
3913 
3914 template< typename T >
3915 inline void remove_edge( sender<T> &p, receiver<typename T::async_msg_data_type> &s ) {
3916     internal_remove_edge( p, s );
3917 }
3918 
3919 template< typename T >
3920 inline void remove_edge( sender<typename T::async_msg_data_type> &p, receiver<T> &s ) {
3921     internal_remove_edge( p, s );
3922 }
3923 #endif // __TBB_PREVIEW_ASYNC_MSG
3924 
3925 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3926 //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
3927 template< typename T, typename V,
3928           typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3929 inline void remove_edge( T& output, V& input) {
3930     remove_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3931 }
3932 
3933 //Removes an edge between port 0 of a multi-output predecessor and a receiver.
3934 template< typename T, typename R,
3935           typename = typename T::output_ports_type >
3936 inline void remove_edge( T& output, receiver<R>& input) {
3937      remove_edge(get<0>(output.output_ports()), input);
3938 }
3939 //Removes an edge between a sender and port 0 of a multi-input successor.
3940 template< typename S,  typename V,
3941           typename = typename V::input_ports_type >
3942 inline void remove_edge( sender<S>& output, V& input) {
3943      remove_edge(output, get<0>(input.input_ports()));
3944 }
3945 #endif
3946 
3947 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3948 template<typename C >
3949 template< typename S >
3950 void internal::edge_container<C>::sender_extract( S &s ) {
3951     edge_list_type e = built_edges;
3952     for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3953         remove_edge(s, **i);
3954     }
3955 }
3956 
3957 template<typename C >
3958 template< typename R >
3959 void internal::edge_container<C>::receiver_extract( R &r ) {
3960     edge_list_type e = built_edges;
3961     for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3962         remove_edge(**i, r);
3963     }
3964 }
3965 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3966 
3967 //! Returns a copy of the body from a function or continue node
3968 template< typename Body, typename Node >
3969 Body copy_body( Node &n ) {
3970     return n.template copy_function_object<Body>();
3971 }
3972 
3973 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3974 
3975 //composite_node
3976 template< typename InputTuple, typename OutputTuple > class composite_node;
3977 
3978 template< typename... InputTypes, typename... OutputTypes>
3979 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<OutputTypes...> > : public graph_node{
3980 
3981 public:
3982     typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3983     typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3984 
3985 private:
3986     std::unique_ptr<input_ports_type> my_input_ports;
3987     std::unique_ptr<output_ports_type> my_output_ports;
3988 
3989     static const size_t NUM_INPUTS = sizeof...(InputTypes);
3990     static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3991 
3992 protected:
3993     void reset_node(reset_flags) __TBB_override {}
3994 
3995 public:
3996 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3997     composite_node( graph &g, const char *type_name = "composite_node" ) : graph_node(g) {
3998         tbb::internal::fgt_multiinput_multioutput_node( CODEPTR(), tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
3999         tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_name );
4000     }
4001 #else
4002     composite_node( graph &g ) : graph_node(g) {
4003         tbb::internal::fgt_multiinput_multioutput_node( CODEPTR(), tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
4004     }
4005 #endif
4006 
4007     template<typename T1, typename T2>
4008     void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
4009         __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
4010         __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
4011         my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T1>(input_ports_tuple));
4012         my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T2>(output_ports_tuple));
4013 
4014         tbb::internal::fgt_internal_input_alias_helper<T1, NUM_INPUTS>::alias_port( this, input_ports_tuple);
4015         tbb::internal::fgt_internal_output_alias_helper<T2, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
4016     }
4017 
4018     template< typename... NodeTypes >
4019     void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
4020 
4021     template< typename... NodeTypes >
4022     void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
4023 
4024 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4025     void set_name( const char *name ) __TBB_override {
4026         tbb::internal::fgt_multiinput_multioutput_node_desc( this, name );
4027     }
4028 #endif
4029 
4030     input_ports_type& input_ports() {
4031          __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
4032          return *my_input_ports;
4033     }
4034 
4035     output_ports_type& output_ports() {
4036          __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
4037          return *my_output_ports;
4038     }
4039 
4040 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4041     void extract() __TBB_override {
4042         __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
4043     }
4044 #endif
4045 };  // class composite_node
4046 
4047 //composite_node with only input ports
4048 template< typename... InputTypes>
4049 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<> > : public graph_node {
4050 public:
4051     typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
4052 
4053 private:
4054     std::unique_ptr<input_ports_type> my_input_ports;
4055     static const size_t NUM_INPUTS = sizeof...(InputTypes);
4056 
4057 protected:
4058     void reset_node(reset_flags) __TBB_override {}
4059 
4060 public:
4061 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4062     composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
4063         tbb::internal::fgt_composite( CODEPTR(), this, &g );
4064         tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_name );
4065     }
4066 #else
4067     composite_node( graph &g ) : graph_node(g) {
4068         tbb::internal::fgt_composite( CODEPTR(), this, &g );
4069     }
4070 #endif
4071 
4072    template<typename T>
4073    void set_external_ports(T&& input_ports_tuple) {
4074        __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
4075 
4076        my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T>(input_ports_tuple));
4077 
4078        tbb::internal::fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, std::forward<T>(input_ports_tuple));
4079    }
4080 
4081     template< typename... NodeTypes >
4082     void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
4083 
4084     template< typename... NodeTypes >
4085     void add_nodes( const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
4086 
4087 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4088     void set_name( const char *name ) __TBB_override {
4089         tbb::internal::fgt_multiinput_multioutput_node_desc( this, name );
4090     }
4091 #endif
4092 
4093     input_ports_type& input_ports() {
4094          __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
4095          return *my_input_ports;
4096     }
4097 
4098 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4099     void extract() __TBB_override {
4100         __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
4101     }
4102 #endif
4103 
4104 };  // class composite_node
4105 
4106 //composite_nodes with only output_ports
4107 template<typename... OutputTypes>
4108 class composite_node <tbb::flow::tuple<>, tbb::flow::tuple<OutputTypes...> > : public graph_node {
4109 public:
4110     typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
4111 
4112 private:
4113     std::unique_ptr<output_ports_type> my_output_ports;
4114     static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
4115 
4116 protected:
4117     void reset_node(reset_flags) __TBB_override {}
4118 
4119 public:
4120 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4121     __TBB_NOINLINE_SYM composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
4122         tbb::internal::fgt_composite( CODEPTR(), this, &g );
4123         tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_name );
4124     }
4125 #else
4126     __TBB_NOINLINE_SYM composite_node( graph &g ) : graph_node(g) {
4127         tbb::internal::fgt_composite( CODEPTR(), this, &g );
4128     }
4129 #endif
4130 
4131    template<typename T>
4132    void set_external_ports(T&& output_ports_tuple) {
4133        __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
4134 
4135        my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T>(output_ports_tuple));
4136 
4137        tbb::internal::fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, std::forward<T>(output_ports_tuple));
4138    }
4139 
4140     template<typename... NodeTypes >
4141     void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
4142 
4143     template<typename... NodeTypes >
4144     void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
4145 
4146 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4147     void set_name( const char *name ) __TBB_override {
4148         tbb::internal::fgt_multiinput_multioutput_node_desc( this, name );
4149     }
4150 #endif
4151 
4152     output_ports_type& output_ports() {
4153          __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
4154          return *my_output_ports;
4155     }
4156 
4157 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4158     void extract() __TBB_override {
4159         __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
4160     }
4161 #endif
4162 
4163 };  // class composite_node
4164 
4165 #endif // __TBB_FLOW_GRAPH_CPP11_FEATURES
4166 
4167 namespace internal {
4168 
4169 template<typename Gateway>
4170 class async_body_base: tbb::internal::no_assign {
4171 public:
4172     typedef Gateway gateway_type;
4173 
4174     async_body_base(gateway_type *gateway): my_gateway(gateway) { }
4175     void set_gateway(gateway_type *gateway) {
4176         my_gateway = gateway;
4177     }
4178 
4179 protected:
4180     gateway_type *my_gateway;
4181 };
4182 
4183 template<typename Input, typename Ports, typename Gateway, typename Body>
4184 class async_body: public async_body_base<Gateway> {
4185 public:
4186     typedef async_body_base<Gateway> base_type;
4187     typedef Gateway gateway_type;
4188 
4189     async_body(const Body &body, gateway_type *gateway)
4190         : base_type(gateway), my_body(body) { }
4191 
4192     void operator()( const Input &v, Ports & ) {
4193         my_body(v, *this->my_gateway);
4194     }
4195 
4196     Body get_body() { return my_body; }
4197 
4198 private:
4199     Body my_body;
4200 };
4201 
4202 } // namespace internal
4203 
4204 } // namespace interfaceX
4205 namespace interface11 {
4206 
4207 //! Implements async node
4208 template < typename Input, typename Output,
4209            typename Policy = queueing_lightweight,
4210            typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input) >
4211 class async_node
4212     : public multifunction_node< Input, tuple< Output >, Policy, Allocator >, public sender< Output >
4213 {
4214 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
4215     __TBB_STATIC_ASSERT(
4216         (tbb::internal::is_same_type<Allocator, null_type>::value),
4217         "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. "
4218         "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR."
4219     );
4220 #endif
4221     typedef multifunction_node< Input, tuple< Output >, Policy, Allocator > base_type;
4222     typedef typename internal::multifunction_input<Input, typename base_type::output_ports_type, Policy, Allocator> mfn_input_type;
4223 
4224 public:
4225     typedef Input input_type;
4226     typedef Output output_type;
4227     typedef receiver<input_type> receiver_type;
4228     typedef typename receiver_type::predecessor_type predecessor_type;
4229     typedef typename sender<output_type>::successor_type successor_type;
4230     typedef receiver_gateway<output_type> gateway_type;
4231     typedef internal::async_body_base<gateway_type> async_body_base_type;
4232     typedef typename base_type::output_ports_type output_ports_type;
4233 
4234 private:
4235     struct try_put_functor {
4236         typedef internal::multifunction_output<Output> output_port_type;
4237         output_port_type *port;
4238         // TODO: pass value by copy since we do not want to block asynchronous thread.
4239         const Output *value;
4240         bool result;
4241         try_put_functor(output_port_type &p, const Output &v) : port(&p), value(&v), result(false) { }
4242         void operator()() {
4243             result = port->try_put(*value);
4244         }
4245     };
4246 
4247     class receiver_gateway_impl: public receiver_gateway<Output> {
4248     public:
4249         receiver_gateway_impl(async_node* node): my_node(node) {}
4250         void reserve_wait() __TBB_override {
4251             tbb::internal::fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
4252             my_node->my_graph.reserve_wait();
4253         }
4254 
4255         void release_wait() __TBB_override {
4256             my_node->my_graph.release_wait();
4257             tbb::internal::fgt_async_commit(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
4258         }
4259 
4260         //! Implements gateway_type::try_put for an external activity to submit a message to FG
4261         bool try_put(const Output &i) __TBB_override {
4262             return my_node->try_put_impl(i);
4263         }
4264 
4265     private:
4266         async_node* my_node;
4267     } my_gateway;
4268 
4269     //The substitute of 'this' for member construction, to prevent compiler warnings
4270     async_node* self() { return this; }
4271 
4272     //! Implements gateway_type::try_put for an external activity to submit a message to FG
4273     bool try_put_impl(const Output &i) {
4274         internal::multifunction_output<Output> &port_0 = internal::output_port<0>(*this);
4275         internal::broadcast_cache<output_type>& port_successors = port_0.successors();
4276         tbb::internal::fgt_async_try_put_begin(this, &port_0);
4277         task_list tasks;
4278         bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
4279         __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
4280                       "Return status is inconsistent with the method operation." );
4281 
4282         while( !tasks.empty() ) {
4283             internal::enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
4284         }
4285         tbb::internal::fgt_async_try_put_end(this, &port_0);
4286         return is_at_least_one_put_successful;
4287     }
4288 
4289 public:
4290     template<typename Body>
4291     __TBB_NOINLINE_SYM async_node(
4292         graph &g, size_t concurrency,
4293 #if __TBB_CPP11_PRESENT
4294         Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority)
4295 #else
4296         __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body body, node_priority_t priority = tbb::flow::internal::no_priority)
4297 #endif
4298     ) : base_type(
4299         g, concurrency,
4300         internal::async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
4301         (body, &my_gateway) __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority) ), my_gateway(self()) {
4302         tbb::internal::fgt_multioutput_node_with_body<1>(
4303             CODEPTR(), tbb::internal::FLOW_ASYNC_NODE,
4304             &this->my_graph, static_cast<receiver<input_type> *>(this),
4305             this->output_ports(), this->my_body
4306         );
4307     }
4308 
4309 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
4310     template <typename Body, typename... Args>
4311     __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t priority)
4312         : async_node(g, concurrency, body, Policy(), priority) {}
4313 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4314 
4315 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4316     template <typename Body, typename... Args>
4317     __TBB_NOINLINE_SYM async_node(
4318         const node_set<Args...>& nodes, size_t concurrency, Body body,
4319         __TBB_FLOW_GRAPH_PRIORITY_ARG1(Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority)
4320     ) : async_node(nodes.graph_reference(), concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
4321         make_edges_in_order(nodes, *this);
4322     }
4323 
4324 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4325     template <typename Body, typename... Args>
4326     __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority)
4327         : async_node(nodes, concurrency, body, Policy(), priority) {}
4328 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4329 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4330 
4331     __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
4332         static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
4333         static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
4334 
4335         tbb::internal::fgt_multioutput_node_with_body<1>( CODEPTR(), tbb::internal::FLOW_ASYNC_NODE,
4336                 &this->my_graph, static_cast<receiver<input_type> *>(this),
4337                 this->output_ports(), this->my_body );
4338     }
4339 
4340     gateway_type& gateway() {
4341         return my_gateway;
4342     }
4343 
4344 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4345     void set_name( const char *name ) __TBB_override {
4346         tbb::internal::fgt_multioutput_node_desc( this, name );
4347     }
4348 #endif
4349 
4350     // Define sender< Output >
4351 
4352     //! Add a new successor to this node
4353     bool register_successor( successor_type &r ) __TBB_override {
4354         return internal::output_port<0>(*this).register_successor(r);
4355     }
4356 
4357     //! Removes a successor from this node
4358     bool remove_successor( successor_type &r ) __TBB_override {
4359         return internal::output_port<0>(*this).remove_successor(r);
4360     }
4361 
4362     template<typename Body>
4363     Body copy_function_object() {
4364         typedef internal::multifunction_body<input_type, typename base_type::output_ports_type> mfn_body_type;
4365         typedef internal::async_body<Input, typename base_type::output_ports_type, gateway_type, Body> async_body_type;
4366         mfn_body_type &body_ref = *this->my_body;
4367         async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< internal::multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
4368         return ab.get_body();
4369     }
4370 
4371 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4372     //! interface to record edges for traversal & deletion
4373     typedef typename  internal::edge_container<successor_type> built_successors_type;
4374     typedef typename  built_successors_type::edge_list_type successor_list_type;
4375     built_successors_type &built_successors() __TBB_override {
4376         return internal::output_port<0>(*this).built_successors();
4377     }
4378 
4379     void internal_add_built_successor( successor_type &r ) __TBB_override {
4380         internal::output_port<0>(*this).internal_add_built_successor(r);
4381     }
4382 
4383     void internal_delete_built_successor( successor_type &r ) __TBB_override {
4384         internal::output_port<0>(*this).internal_delete_built_successor(r);
4385     }
4386 
4387     void copy_successors( successor_list_type &l ) __TBB_override {
4388         internal::output_port<0>(*this).copy_successors(l);
4389     }
4390 
4391     size_t  successor_count() __TBB_override {
4392         return internal::output_port<0>(*this).successor_count();
4393     }
4394 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
4395 
4396 protected:
4397 
4398     void reset_node( reset_flags f) __TBB_override {
4399        base_type::reset_node(f);
4400     }
4401 };
4402 
4403 #if __TBB_PREVIEW_STREAMING_NODE
4404 #include "internal/_flow_graph_streaming_node.h"
4405 #endif // __TBB_PREVIEW_STREAMING_NODE
4406 
4407 #include "internal/_flow_graph_node_set_impl.h"
4408 
4409 template< typename T >
4410 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
4411 public:
4412     typedef T input_type;
4413     typedef T output_type;
4414     typedef typename receiver<input_type>::predecessor_type predecessor_type;
4415     typedef typename sender<output_type>::successor_type successor_type;
4416 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4417     typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
4418     typedef typename sender<output_type>::built_successors_type built_successors_type;
4419     typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
4420     typedef typename sender<output_type>::successor_list_type successor_list_type;
4421 #endif
4422 
4423     __TBB_NOINLINE_SYM explicit overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
4424         my_successors.set_owner( this );
4425         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
4426                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
4427     }
4428 
4429 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4430     template <typename... Args>
4431     overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) {
4432         make_edges_in_order(nodes, *this);
4433     }
4434 #endif
4435 
4436     //! Copy constructor; doesn't take anything from src; default won't work
4437     __TBB_NOINLINE_SYM overwrite_node( const overwrite_node& src ) :
4438         graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
4439     {
4440         my_successors.set_owner( this );
4441         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
4442                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
4443     }
4444 
4445     ~overwrite_node() {}
4446 
4447 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4448     void set_name( const char *name ) __TBB_override {
4449         tbb::internal::fgt_node_desc( this, name );
4450     }
4451 #endif
4452 
4453    bool register_successor( successor_type &s ) __TBB_override {
4454         spin_mutex::scoped_lock l( my_mutex );
4455         if (my_buffer_is_valid && internal::is_graph_active( my_graph )) {
4456             // We have a valid value that must be forwarded immediately.
4457             bool ret = s.try_put( my_buffer );
4458             if ( ret ) {
4459                 // We add the successor that accepted our put
4460                 my_successors.register_successor( s );
4461             } else {
4462                 // In case of reservation a race between the moment of reservation and register_successor can appear,
4463                 // because failed reserve does not mean that register_successor is not ready to put a message immediately.
4464                 // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
4465                 // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
4466                 task *rtask = new ( task::allocate_additional_child_of( *( my_graph.root_task() ) ) )
4467                     register_predecessor_task( *this, s );
4468                 internal::spawn_in_graph_arena( my_graph, *rtask );
4469             }
4470         } else {
4471             // No valid value yet, just add as successor
4472             my_successors.register_successor( s );
4473         }
4474         return true;
4475     }
4476 
4477     bool remove_successor( successor_type &s ) __TBB_override {
4478         spin_mutex::scoped_lock l( my_mutex );
4479         my_successors.remove_successor(s);
4480         return true;
4481     }
4482 
4483 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4484     built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
4485     built_successors_type   &built_successors()   __TBB_override { return my_successors.built_successors(); }
4486 
4487     void internal_add_built_successor( successor_type &s) __TBB_override {
4488         spin_mutex::scoped_lock l( my_mutex );
4489         my_successors.internal_add_built_successor(s);
4490     }
4491 
4492     void internal_delete_built_successor( successor_type &s) __TBB_override {
4493         spin_mutex::scoped_lock l( my_mutex );
4494         my_successors.internal_delete_built_successor(s);
4495     }
4496 
4497     size_t successor_count() __TBB_override {
4498         spin_mutex::scoped_lock l( my_mutex );
4499         return my_successors.successor_count();
4500     }
4501 
4502     void copy_successors(successor_list_type &v) __TBB_override {
4503         spin_mutex::scoped_lock l( my_mutex );
4504         my_successors.copy_successors(v);
4505     }
4506 
4507     void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
4508         spin_mutex::scoped_lock l( my_mutex );
4509         my_built_predecessors.add_edge(p);
4510     }
4511 
4512     void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
4513         spin_mutex::scoped_lock l( my_mutex );
4514         my_built_predecessors.delete_edge(p);
4515     }
4516 
4517     size_t predecessor_count() __TBB_override {
4518         spin_mutex::scoped_lock l( my_mutex );
4519         return my_built_predecessors.edge_count();
4520     }
4521 
4522     void copy_predecessors( predecessor_list_type &v ) __TBB_override {
4523         spin_mutex::scoped_lock l( my_mutex );
4524         my_built_predecessors.copy_edges(v);
4525     }
4526 
4527     void extract() __TBB_override {
4528         my_buffer_is_valid = false;
4529         built_successors().sender_extract(*this);
4530         built_predecessors().receiver_extract(*this);
4531     }
4532 
4533 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
4534 
4535     bool try_get( input_type &v ) __TBB_override {
4536         spin_mutex::scoped_lock l( my_mutex );
4537         if ( my_buffer_is_valid ) {
4538             v = my_buffer;
4539             return true;
4540         }
4541         return false;
4542     }
4543 
4544     //! Reserves an item
4545     bool try_reserve( T &v ) __TBB_override {
4546         return try_get(v);
4547     }
4548 
4549     //! Releases the reserved item
4550     bool try_release() __TBB_override { return true; }
4551 
4552     //! Consumes the reserved item
4553     bool try_consume() __TBB_override { return true; }
4554 
4555     bool is_valid() {
4556        spin_mutex::scoped_lock l( my_mutex );
4557        return my_buffer_is_valid;
4558     }
4559 
4560     void clear() {
4561        spin_mutex::scoped_lock l( my_mutex );
4562        my_buffer_is_valid = false;
4563     }
4564 
4565 protected:
4566 
4567     template< typename R, typename B > friend class run_and_put_task;
4568     template<typename X, typename Y> friend class internal::broadcast_cache;
4569     template<typename X, typename Y> friend class internal::round_robin_cache;
4570     task * try_put_task( const input_type &v ) __TBB_override {
4571         spin_mutex::scoped_lock l( my_mutex );
4572         return try_put_task_impl(v);
4573     }
4574 
4575     task * try_put_task_impl(const input_type &v) {
4576         my_buffer = v;
4577         my_buffer_is_valid = true;
4578         task * rtask = my_successors.try_put_task(v);
4579         if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
4580         return rtask;
4581     }
4582 
4583     graph& graph_reference() const __TBB_override {
4584         return my_graph;
4585     }
4586 
4587     //! Breaks an infinite loop between the node reservation and register_successor call
4588     struct register_predecessor_task : public graph_task {
4589 
4590         register_predecessor_task(predecessor_type& owner, successor_type& succ) :
4591             o(owner), s(succ) {};
4592 
4593         tbb::task* execute() __TBB_override {
4594             if (!s.register_predecessor(o)) {
4595                 o.register_successor(s);
4596             }
4597             return NULL;
4598         }
4599 
4600         predecessor_type& o;
4601         successor_type& s;
4602     };
4603 
4604     spin_mutex my_mutex;
4605     internal::broadcast_cache< input_type, null_rw_mutex > my_successors;
4606 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4607     internal::edge_container<predecessor_type> my_built_predecessors;
4608 #endif
4609     input_type my_buffer;
4610     bool my_buffer_is_valid;
4611     void reset_receiver(reset_flags /*f*/) __TBB_override {}
4612 
4613     void reset_node( reset_flags f) __TBB_override {
4614         my_buffer_is_valid = false;
4615        if (f&rf_clear_edges) {
4616            my_successors.clear();
4617        }
4618     }
4619 };  // overwrite_node
4620 
4621 template< typename T >
4622 class write_once_node : public overwrite_node<T> {
4623 public:
4624     typedef T input_type;
4625     typedef T output_type;
4626     typedef overwrite_node<T> base_type;
4627     typedef typename receiver<input_type>::predecessor_type predecessor_type;
4628     typedef typename sender<output_type>::successor_type successor_type;
4629 
4630     //! Constructor
4631     __TBB_NOINLINE_SYM explicit write_once_node(graph& g) : base_type(g) {
4632         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
4633                                  static_cast<receiver<input_type> *>(this),
4634                                  static_cast<sender<output_type> *>(this) );
4635     }
4636 
4637 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4638     template <typename... Args>
4639     write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) {
4640         make_edges_in_order(nodes, *this);
4641     }
4642 #endif
4643 
4644     //! Copy constructor: call base class copy constructor
4645     __TBB_NOINLINE_SYM write_once_node( const write_once_node& src ) : base_type(src) {
4646         tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
4647                                  static_cast<receiver<input_type> *>(this),
4648                                  static_cast<sender<output_type> *>(this) );
4649     }
4650 
4651 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4652     void set_name( const char *name ) __TBB_override {
4653         tbb::internal::fgt_node_desc( this, name );
4654     }
4655 #endif
4656 
4657 protected:
4658     template< typename R, typename B > friend class run_and_put_task;
4659     template<typename X, typename Y> friend class internal::broadcast_cache;
4660     template<typename X, typename Y> friend class internal::round_robin_cache;
4661     task *try_put_task( const T &v ) __TBB_override {
4662         spin_mutex::scoped_lock l( this->my_mutex );
4663         return this->my_buffer_is_valid ? NULL : this->try_put_task_impl(v);
4664     }
4665 };
4666 
4667 } // interfaceX
4668 
4669     using interface11::reset_flags;
4670     using interface11::rf_reset_protocol;
4671     using interface11::rf_reset_bodies;
4672     using interface11::rf_clear_edges;
4673 
4674     using interface11::graph;
4675     using interface11::graph_node;
4676     using interface11::continue_msg;
4677     using interface11::source_node;
4678     using interface11::input_node;
4679     using interface11::function_node;
4680     using interface11::multifunction_node;
4681     using interface11::split_node;
4682     using interface11::internal::output_port;
4683     using interface11::indexer_node;
4684     using interface11::internal::tagged_msg;
4685     using interface11::internal::cast_to;
4686     using interface11::internal::is_a;
4687     using interface11::continue_node;
4688     using interface11::overwrite_node;
4689     using interface11::write_once_node;
4690     using interface11::broadcast_node;
4691     using interface11::buffer_node;
4692     using interface11::queue_node;
4693     using interface11::sequencer_node;
4694     using interface11::priority_queue_node;
4695     using interface11::limiter_node;
4696     using namespace interface11::internal::graph_policy_namespace;
4697     using interface11::join_node;
4698     using interface11::input_port;
4699     using interface11::copy_body;
4700     using interface11::make_edge;
4701     using interface11::remove_edge;
4702     using interface11::internal::tag_value;
4703 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
4704     using interface11::composite_node;
4705 #endif
4706     using interface11::async_node;
4707 #if __TBB_PREVIEW_ASYNC_MSG
4708     using interface11::async_msg;
4709 #endif
4710 #if __TBB_PREVIEW_STREAMING_NODE
4711     using interface11::port_ref;
4712     using interface11::streaming_node;
4713 #endif // __TBB_PREVIEW_STREAMING_NODE
4714 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4715     using internal::node_priority_t;
4716     using internal::no_priority;
4717 #endif
4718 
4719 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4720     using interface11::internal::follows;
4721     using interface11::internal::precedes;
4722     using interface11::internal::make_node_set;
4723     using interface11::internal::make_edges;
4724 #endif
4725 
4726 } // flow
4727 } // tbb
4728 
4729 // Include deduction guides for node classes
4730 #include "internal/_flow_graph_nodes_deduction.h"
4731 
4732 #undef __TBB_PFG_RESET_ARG
4733 #undef __TBB_COMMA
4734 #undef __TBB_DEFAULT_NODE_ALLOCATOR
4735 
4736 #include "internal/_warning_suppress_disable_notice.h"
4737 #undef __TBB_flow_graph_H_include_area
4738 
4739 #if TBB_USE_THREADING_TOOLS && TBB_PREVIEW_FLOW_GRAPH_TRACE && ( __linux__ || __APPLE__ )
4740    #undef __TBB_NOINLINE_SYM
4741 #endif
4742 
4743 #endif // __TBB_flow_graph_H