Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-12-18 10:24:18

0001 /*
0002     Copyright (c) 2005-2024 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB__flow_graph_join_impl_H
0018 #define __TBB__flow_graph_join_impl_H
0019 
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023 
0024 // included into namespace tbb::detail::d2
0025 
0026     struct forwarding_base : no_assign {
0027         forwarding_base(graph &g) : graph_ref(g) {}
0028         virtual ~forwarding_base() {}
0029         graph& graph_ref;
0030     };
0031 
0032     struct queueing_forwarding_base : forwarding_base {
0033         using forwarding_base::forwarding_base;
0034         // decrement_port_count may create a forwarding task.  If we cannot handle the task
0035         // ourselves, ask decrement_port_count to deal with it.
0036         virtual graph_task* decrement_port_count(bool handle_task) = 0;
0037     };
0038 
0039     struct reserving_forwarding_base : forwarding_base {
0040         using forwarding_base::forwarding_base;
0041         // decrement_port_count may create a forwarding task.  If we cannot handle the task
0042         // ourselves, ask decrement_port_count to deal with it.
0043         virtual graph_task* decrement_port_count() = 0;
0044         virtual void increment_port_count() = 0;
0045     };
0046 
0047     // specialization that lets us keep a copy of the current_key for building results.
0048     // KeyType can be a reference type.
0049     template<typename KeyType>
0050     struct matching_forwarding_base : public forwarding_base {
0051         typedef typename std::decay<KeyType>::type current_key_type;
0052         matching_forwarding_base(graph &g) : forwarding_base(g) { }
0053         virtual graph_task* increment_key_count(current_key_type const & /*t*/) = 0;
0054         current_key_type current_key; // so ports can refer to FE's desired items
0055     };
0056 
0057     template< int N >
0058     struct join_helper {
0059 
0060         template< typename TupleType, typename PortType >
0061         static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
0062             std::get<N-1>( my_input ).set_join_node_pointer(port);
0063             join_helper<N-1>::set_join_node_pointer( my_input, port );
0064         }
0065         template< typename TupleType >
0066         static inline void consume_reservations( TupleType &my_input ) {
0067             std::get<N-1>( my_input ).consume();
0068             join_helper<N-1>::consume_reservations( my_input );
0069         }
0070 
0071         template< typename TupleType >
0072         static inline void release_my_reservation( TupleType &my_input ) {
0073             std::get<N-1>( my_input ).release();
0074         }
0075 
0076         template <typename TupleType>
0077         static inline void release_reservations( TupleType &my_input) {
0078             join_helper<N-1>::release_reservations(my_input);
0079             release_my_reservation(my_input);
0080         }
0081 
0082         template< typename InputTuple, typename OutputTuple >
0083         static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
0084             if ( !std::get<N-1>( my_input ).reserve( std::get<N-1>( out ) ) ) return false;
0085             if ( !join_helper<N-1>::reserve( my_input, out ) ) {
0086                 release_my_reservation( my_input );
0087                 return false;
0088             }
0089             return true;
0090         }
0091 
0092 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0093         template <typename InputTuple, typename OutputTuple>
0094         static inline bool reserve(InputTuple& my_input, OutputTuple& out, message_metainfo& metainfo) {
0095             message_metainfo element_metainfo;
0096             if (!std::get<N - 1>(my_input).reserve(std::get<N - 1>(out), element_metainfo)) return false;
0097             if (!join_helper<N - 1>::reserve(my_input, out, metainfo)) {
0098                 release_my_reservation(my_input);
0099                 return false;
0100             }
0101             metainfo.merge(element_metainfo);
0102             return true;
0103 
0104         }
0105 #endif
0106 
0107         template<typename InputTuple, typename OutputTuple>
0108         static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
0109             bool res = std::get<N-1>(my_input).get_item(std::get<N-1>(out) ); // may fail
0110             return join_helper<N-1>::get_my_item(my_input, out) && res;       // do get on other inputs before returning
0111         }
0112 
0113 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0114         template <typename InputTuple, typename OutputTuple>
0115         static inline bool get_my_item(InputTuple& my_input, OutputTuple& out, message_metainfo& metainfo) {
0116             message_metainfo element_metainfo;
0117             bool res = std::get<N-1>(my_input).get_item(std::get<N-1>(out), element_metainfo);
0118             metainfo.merge(element_metainfo);
0119             return join_helper<N-1>::get_my_item(my_input, out, metainfo) && res;
0120         }
0121 #endif
0122 
0123         template<typename InputTuple, typename OutputTuple>
0124         static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
0125             return get_my_item(my_input, out);
0126         }
0127 
0128 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0129         template <typename InputTuple, typename OutputTuple>
0130         static inline bool get_items(InputTuple& my_input, OutputTuple& out, message_metainfo& metainfo) {
0131             return get_my_item(my_input, out, metainfo);
0132         }
0133 #endif
0134 
0135         template<typename InputTuple>
0136         static inline void reset_my_port(InputTuple &my_input) {
0137             join_helper<N-1>::reset_my_port(my_input);
0138             std::get<N-1>(my_input).reset_port();
0139         }
0140 
0141         template<typename InputTuple>
0142         static inline void reset_ports(InputTuple& my_input) {
0143             reset_my_port(my_input);
0144         }
0145 
0146         template<typename InputTuple, typename KeyFuncTuple>
0147         static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
0148             std::get<N-1>(my_input).set_my_key_func(std::get<N-1>(my_key_funcs));
0149             std::get<N-1>(my_key_funcs) = nullptr;
0150             join_helper<N-1>::set_key_functors(my_input, my_key_funcs);
0151         }
0152 
0153         template< typename KeyFuncTuple>
0154         static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
0155             __TBB_ASSERT(
0156                 std::get<N-1>(other_inputs).get_my_key_func(),
0157                 "key matching join node should not be instantiated without functors."
0158             );
0159             std::get<N-1>(my_inputs).set_my_key_func(std::get<N-1>(other_inputs).get_my_key_func()->clone());
0160             join_helper<N-1>::copy_key_functors(my_inputs, other_inputs);
0161         }
0162 
0163         template<typename InputTuple>
0164         static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
0165             join_helper<N-1>::reset_inputs(my_input, f);
0166             std::get<N-1>(my_input).reset_receiver(f);
0167         }
0168     };  // join_helper<N>
0169 
0170     template< >
0171     struct join_helper<1> {
0172 
0173         template< typename TupleType, typename PortType >
0174         static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
0175             std::get<0>( my_input ).set_join_node_pointer(port);
0176         }
0177 
0178         template< typename TupleType >
0179         static inline void consume_reservations( TupleType &my_input ) {
0180             std::get<0>( my_input ).consume();
0181         }
0182 
0183         template< typename TupleType >
0184         static inline void release_my_reservation( TupleType &my_input ) {
0185             std::get<0>( my_input ).release();
0186         }
0187 
0188         template<typename TupleType>
0189         static inline void release_reservations( TupleType &my_input) {
0190             release_my_reservation(my_input);
0191         }
0192 
0193         template< typename InputTuple, typename OutputTuple >
0194         static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
0195             return std::get<0>( my_input ).reserve( std::get<0>( out ) );
0196         }
0197 
0198 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0199         template <typename InputTuple, typename OutputTuple>
0200         static inline bool reserve(InputTuple& my_input, OutputTuple& out, message_metainfo& metainfo) {
0201             message_metainfo element_metainfo;
0202             bool result = std::get<0>(my_input).reserve(std::get<0>(out), element_metainfo);
0203             metainfo.merge(element_metainfo);
0204             return result;
0205         }
0206 #endif
0207 
0208         template<typename InputTuple, typename OutputTuple>
0209         static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
0210             return std::get<0>(my_input).get_item(std::get<0>(out));
0211         }
0212 
0213 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0214         template <typename InputTuple, typename OutputTuple>
0215         static inline bool get_my_item(InputTuple& my_input, OutputTuple& out, message_metainfo& metainfo) {
0216             message_metainfo element_metainfo;
0217             bool res = std::get<0>(my_input).get_item(std::get<0>(out), element_metainfo);
0218             metainfo.merge(element_metainfo);
0219             return res;
0220         }
0221 #endif
0222 
0223         template<typename InputTuple, typename OutputTuple>
0224         static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
0225             return get_my_item(my_input, out);
0226         }
0227 
0228 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0229         template <typename InputTuple, typename OutputTuple>
0230         static inline bool get_items(InputTuple& my_input, OutputTuple& out, message_metainfo& metainfo) {
0231             return get_my_item(my_input, out, metainfo);
0232         }
0233 #endif
0234 
0235         template<typename InputTuple>
0236         static inline void reset_my_port(InputTuple &my_input) {
0237             std::get<0>(my_input).reset_port();
0238         }
0239 
0240         template<typename InputTuple>
0241         static inline void reset_ports(InputTuple& my_input) {
0242             reset_my_port(my_input);
0243         }
0244 
0245         template<typename InputTuple, typename KeyFuncTuple>
0246         static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
0247             std::get<0>(my_input).set_my_key_func(std::get<0>(my_key_funcs));
0248             std::get<0>(my_key_funcs) = nullptr;
0249         }
0250 
0251         template< typename KeyFuncTuple>
0252         static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
0253             __TBB_ASSERT(
0254                 std::get<0>(other_inputs).get_my_key_func(),
0255                 "key matching join node should not be instantiated without functors."
0256             );
0257             std::get<0>(my_inputs).set_my_key_func(std::get<0>(other_inputs).get_my_key_func()->clone());
0258         }
0259         template<typename InputTuple>
0260         static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
0261             std::get<0>(my_input).reset_receiver(f);
0262         }
0263     };  // join_helper<1>
0264 
0265     //! The two-phase join port
0266     template< typename T >
0267     class reserving_port : public receiver<T> {
0268     public:
0269         typedef T input_type;
0270         typedef typename receiver<input_type>::predecessor_type predecessor_type;
0271 
0272     private:
0273         // ----------- Aggregator ------------
0274         enum op_type { reg_pred, rem_pred, res_item, rel_res, con_res
0275         };
0276         typedef reserving_port<T> class_type;
0277 
0278         class reserving_port_operation : public d1::aggregated_operation<reserving_port_operation> {
0279         public:
0280             char type;
0281             union {
0282                 T *my_arg;
0283                 predecessor_type *my_pred;
0284             };
0285 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0286             message_metainfo* metainfo;
0287 #endif
0288             reserving_port_operation(const T& e, op_type t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo& info)) :
0289                 type(char(t)), my_arg(const_cast<T*>(&e))
0290                 __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo(&info)) {}
0291 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0292             reserving_port_operation(const T& e, op_type t)
0293                 : type(char(t)), my_arg(const_cast<T*>(&e)), metainfo(nullptr) {}
0294 #endif
0295             reserving_port_operation(const predecessor_type &s, op_type t) : type(char(t)),
0296                 my_pred(const_cast<predecessor_type *>(&s)) {}
0297             reserving_port_operation(op_type t) : type(char(t)) {}
0298         };
0299 
0300         typedef d1::aggregating_functor<class_type, reserving_port_operation> handler_type;
0301         friend class d1::aggregating_functor<class_type, reserving_port_operation>;
0302         d1::aggregator<handler_type, reserving_port_operation> my_aggregator;
0303 
0304         void handle_operations(reserving_port_operation* op_list) {
0305             reserving_port_operation *current;
0306             bool was_missing_predecessors = false;
0307             while(op_list) {
0308                 current = op_list;
0309                 op_list = op_list->next;
0310                 switch(current->type) {
0311                 case reg_pred:
0312                     was_missing_predecessors = my_predecessors.empty();
0313                     my_predecessors.add(*(current->my_pred));
0314                     if ( was_missing_predecessors ) {
0315                         (void) my_join->decrement_port_count(); // may try to forward
0316                     }
0317                     current->status.store( SUCCEEDED, std::memory_order_release);
0318                     break;
0319                 case rem_pred:
0320                     if ( !my_predecessors.empty() ) {
0321                         my_predecessors.remove(*(current->my_pred));
0322                         if ( my_predecessors.empty() ) // was the last predecessor
0323                             my_join->increment_port_count();
0324                     }
0325                     // TODO: consider returning failure if there were no predecessors to remove
0326                     current->status.store( SUCCEEDED, std::memory_order_release );
0327                     break;
0328                 case res_item:
0329                     if ( reserved ) {
0330                         current->status.store( FAILED, std::memory_order_release);
0331                     }
0332                     else {
0333                         bool reserve_result = false;
0334 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0335                         if (current->metainfo) {
0336                             reserve_result = my_predecessors.try_reserve(*(current->my_arg),
0337                                                                          *(current->metainfo));
0338                         } else
0339 #endif
0340                         {
0341                             reserve_result = my_predecessors.try_reserve(*(current->my_arg));
0342                         }
0343                         if (reserve_result) {
0344                             reserved = true;
0345                             current->status.store( SUCCEEDED, std::memory_order_release);
0346                         } else {
0347                             if ( my_predecessors.empty() ) {
0348                                 my_join->increment_port_count();
0349                             }
0350                             current->status.store( FAILED, std::memory_order_release);
0351                         }
0352                     }
0353                     break;
0354                 case rel_res:
0355                     reserved = false;
0356                     my_predecessors.try_release( );
0357                     current->status.store( SUCCEEDED, std::memory_order_release);
0358                     break;
0359                 case con_res:
0360                     reserved = false;
0361                     my_predecessors.try_consume( );
0362                     current->status.store( SUCCEEDED, std::memory_order_release);
0363                     break;
0364                 }
0365             }
0366         }
0367 
0368     protected:
0369         template< typename R, typename B > friend class run_and_put_task;
0370         template<typename X, typename Y> friend class broadcast_cache;
0371         template<typename X, typename Y> friend class round_robin_cache;
0372         graph_task* try_put_task( const T & ) override {
0373             return nullptr;
0374         }
0375 
0376 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0377     graph_task* try_put_task(const T&, const message_metainfo&) override { return nullptr; }
0378 #endif
0379 
0380         graph& graph_reference() const override {
0381             return my_join->graph_ref;
0382         }
0383 
0384     public:
0385 
0386         //! Constructor
0387         reserving_port() : my_join(nullptr), my_predecessors(this), reserved(false) {
0388             my_aggregator.initialize_handler(handler_type(this));
0389         }
0390 
0391         // copy constructor
0392         reserving_port(const reserving_port& /* other */) = delete;
0393 
0394         void set_join_node_pointer(reserving_forwarding_base *join) {
0395             my_join = join;
0396         }
0397 
0398         //! Add a predecessor
0399         bool register_predecessor( predecessor_type &src ) override {
0400             reserving_port_operation op_data(src, reg_pred);
0401             my_aggregator.execute(&op_data);
0402             return op_data.status == SUCCEEDED;
0403         }
0404 
0405         //! Remove a predecessor
0406         bool remove_predecessor( predecessor_type &src ) override {
0407             reserving_port_operation op_data(src, rem_pred);
0408             my_aggregator.execute(&op_data);
0409             return op_data.status == SUCCEEDED;
0410         }
0411 
0412         //! Reserve an item from the port
0413         bool reserve( T &v ) {
0414             reserving_port_operation op_data(v, res_item);
0415             my_aggregator.execute(&op_data);
0416             return op_data.status == SUCCEEDED;
0417         }
0418 
0419 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0420         bool reserve( T& v, message_metainfo& metainfo ) {
0421             reserving_port_operation op_data(v, res_item, metainfo);
0422             my_aggregator.execute(&op_data);
0423             return op_data.status == SUCCEEDED;
0424         }
0425 #endif
0426 
0427         //! Release the port
0428         void release( ) {
0429             reserving_port_operation op_data(rel_res);
0430             my_aggregator.execute(&op_data);
0431         }
0432 
0433         //! Complete use of the port
0434         void consume( ) {
0435             reserving_port_operation op_data(con_res);
0436             my_aggregator.execute(&op_data);
0437         }
0438 
0439         void reset_receiver( reset_flags f) {
0440             if(f & rf_clear_edges) my_predecessors.clear();
0441             else
0442             my_predecessors.reset();
0443             reserved = false;
0444             __TBB_ASSERT(!(f&rf_clear_edges) || my_predecessors.empty(), "port edges not removed");
0445         }
0446 
0447     private:
0448 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0449         friend class get_graph_helper;
0450 #endif
0451 
0452         reserving_forwarding_base *my_join;
0453         reservable_predecessor_cache< T, null_mutex > my_predecessors;
0454         bool reserved;
0455     };  // reserving_port
0456 
0457     //! queueing join_port
0458     template<typename T>
0459     class queueing_port : public receiver<T>, public item_buffer<T> {
0460     public:
0461         typedef T input_type;
0462         typedef typename receiver<input_type>::predecessor_type predecessor_type;
0463         typedef queueing_port<T> class_type;
0464 
0465     // ----------- Aggregator ------------
0466     private:
0467         enum op_type { get__item, res_port, try__put_task
0468         };
0469 
0470         class queueing_port_operation : public d1::aggregated_operation<queueing_port_operation> {
0471         public:
0472             char type;
0473             T my_val;
0474             T* my_arg;
0475             graph_task* bypass_t;
0476 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0477             message_metainfo* metainfo;
0478 #endif
0479             // constructor for value parameter
0480             queueing_port_operation(const T& e, op_type t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& info))
0481                 : type(char(t)), my_val(e), my_arg(nullptr)
0482                 , bypass_t(nullptr)
0483                 __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo(const_cast<message_metainfo*>(&info)))
0484             {}
0485             // constructor for pointer parameter
0486             queueing_port_operation(const T* p, op_type t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo& info)) :
0487                 type(char(t)), my_arg(const_cast<T*>(p))
0488                 , bypass_t(nullptr)
0489                 __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo(&info))
0490             {}
0491 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0492             queueing_port_operation(const T* p, op_type t)
0493                 : type(char(t)), my_arg(const_cast<T*>(p)), bypass_t(nullptr), metainfo(nullptr)
0494             {}
0495 #endif
0496             // constructor with no parameter
0497             queueing_port_operation(op_type t) : type(char(t)), my_arg(nullptr)
0498                 , bypass_t(nullptr)
0499                 __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo(nullptr))
0500             {}
0501         };
0502 
0503         typedef d1::aggregating_functor<class_type, queueing_port_operation> handler_type;
0504         friend class d1::aggregating_functor<class_type, queueing_port_operation>;
0505         d1::aggregator<handler_type, queueing_port_operation> my_aggregator;
0506 
0507         void handle_operations(queueing_port_operation* op_list) {
0508             queueing_port_operation *current;
0509             bool was_empty;
0510             while(op_list) {
0511                 current = op_list;
0512                 op_list = op_list->next;
0513                 switch(current->type) {
0514                 case try__put_task: {
0515                         graph_task* rtask = nullptr;
0516                         was_empty = this->buffer_empty();
0517 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0518                         __TBB_ASSERT(current->metainfo, nullptr);
0519                         this->push_back(current->my_val, *(current->metainfo));
0520 #else
0521                         this->push_back(current->my_val);
0522 #endif
0523                         if (was_empty) rtask = my_join->decrement_port_count(false);
0524                         else
0525                             rtask = SUCCESSFULLY_ENQUEUED;
0526                         current->bypass_t = rtask;
0527                         current->status.store( SUCCEEDED, std::memory_order_release);
0528                     }
0529                     break;
0530                 case get__item:
0531                     if(!this->buffer_empty()) {
0532                         __TBB_ASSERT(current->my_arg, nullptr);
0533                         *(current->my_arg) = this->front();
0534 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0535                         if (current->metainfo) {
0536                             *(current->metainfo) = this->front_metainfo();
0537                         }
0538 #endif
0539                         current->status.store( SUCCEEDED, std::memory_order_release);
0540                     }
0541                     else {
0542                         current->status.store( FAILED, std::memory_order_release);
0543                     }
0544                     break;
0545                 case res_port:
0546                     __TBB_ASSERT(this->my_item_valid(this->my_head), "No item to reset");
0547                     this->destroy_front();
0548                     if(this->my_item_valid(this->my_head)) {
0549                         (void)my_join->decrement_port_count(true);
0550                     }
0551                     current->status.store( SUCCEEDED, std::memory_order_release);
0552                     break;
0553                 }
0554             }
0555         }
0556     // ------------ End Aggregator ---------------
0557 
0558     protected:
0559         template< typename R, typename B > friend class run_and_put_task;
0560         template<typename X, typename Y> friend class broadcast_cache;
0561         template<typename X, typename Y> friend class round_robin_cache;
0562 
0563     private:
0564         graph_task* try_put_task_impl(const T& v __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) {
0565             queueing_port_operation op_data(v, try__put_task __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0566             my_aggregator.execute(&op_data);
0567             __TBB_ASSERT(op_data.status == SUCCEEDED || !op_data.bypass_t, "inconsistent return from aggregator");
0568             if(!op_data.bypass_t) return SUCCESSFULLY_ENQUEUED;
0569             return op_data.bypass_t;
0570         }
0571 
0572     protected:
0573         graph_task* try_put_task(const T &v) override {
0574             return try_put_task_impl(v __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
0575         }
0576 
0577 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0578         graph_task* try_put_task(const T& v, const message_metainfo& metainfo) override {
0579             return try_put_task_impl(v, metainfo);
0580         }
0581 #endif
0582 
0583         graph& graph_reference() const override {
0584             return my_join->graph_ref;
0585         }
0586 
0587     public:
0588 
0589         //! Constructor
0590         queueing_port() : item_buffer<T>() {
0591             my_join = nullptr;
0592             my_aggregator.initialize_handler(handler_type(this));
0593         }
0594 
0595         //! copy constructor
0596         queueing_port(const queueing_port& /* other */) = delete;
0597 
0598         //! record parent for tallying available items
0599         void set_join_node_pointer(queueing_forwarding_base *join) {
0600             my_join = join;
0601         }
0602 
0603         bool get_item( T &v ) {
0604             queueing_port_operation op_data(&v, get__item);
0605             my_aggregator.execute(&op_data);
0606             return op_data.status == SUCCEEDED;
0607         }
0608 
0609 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0610         bool get_item( T& v, message_metainfo& metainfo ) {
0611             queueing_port_operation op_data(&v, get__item, metainfo);
0612             my_aggregator.execute(&op_data);
0613             return op_data.status == SUCCEEDED;
0614         }
0615 #endif
0616 
0617         // reset_port is called when item is accepted by successor, but
0618         // is initiated by join_node.
0619         void reset_port() {
0620             queueing_port_operation op_data(res_port);
0621             my_aggregator.execute(&op_data);
0622             return;
0623         }
0624 
0625         void reset_receiver(reset_flags) {
0626             item_buffer<T>::reset();
0627         }
0628 
0629     private:
0630 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0631         friend class get_graph_helper;
0632 #endif
0633 
0634         queueing_forwarding_base *my_join;
0635     };  // queueing_port
0636 
0637 #include "_flow_graph_tagged_buffer_impl.h"
0638 
0639     template<typename K>
0640     struct count_element {
0641         K my_key;
0642         size_t my_value;
0643     };
0644 
0645     // method to access the key in the counting table
0646     // the ref has already been removed from K
0647     template< typename K >
0648     struct key_to_count_functor {
0649         typedef count_element<K> table_item_type;
0650         const K& operator()(const table_item_type& v) { return v.my_key; }
0651     };
0652 
0653     template <typename K, typename T, typename TtoK, typename KHash>
0654     struct key_matching_port_base {
0655 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0656         using type = metainfo_hash_buffer<K, T, TtoK, KHash>;
0657 #else
0658         using type = hash_buffer<K, T, TtoK, KHash>;
0659 #endif
0660     };
0661 
0662     // the ports can have only one template parameter.  We wrap the types needed in
0663     // a traits type
0664     template< class TraitsType >
0665     class key_matching_port :
0666         public receiver<typename TraitsType::T>,
0667         public key_matching_port_base< typename TraitsType::K, typename TraitsType::T, typename TraitsType::TtoK,
0668                                        typename TraitsType::KHash >::type
0669     {
0670     public:
0671         typedef TraitsType traits;
0672         typedef key_matching_port<traits> class_type;
0673         typedef typename TraitsType::T input_type;
0674         typedef typename TraitsType::K key_type;
0675         typedef typename std::decay<key_type>::type noref_key_type;
0676         typedef typename receiver<input_type>::predecessor_type predecessor_type;
0677         typedef typename TraitsType::TtoK type_to_key_func_type;
0678         typedef typename TraitsType::KHash hash_compare_type;
0679         typedef typename key_matching_port_base<key_type, input_type, type_to_key_func_type, hash_compare_type>::type buffer_type;
0680 
0681     private:
0682 // ----------- Aggregator ------------
0683     private:
0684         enum op_type { try__put, get__item, res_port
0685         };
0686 
0687         class key_matching_port_operation : public d1::aggregated_operation<key_matching_port_operation> {
0688         public:
0689             char type;
0690             input_type my_val;
0691             input_type *my_arg;
0692 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0693             message_metainfo* metainfo = nullptr;
0694 #endif
0695             // constructor for value parameter
0696             key_matching_port_operation(const input_type& e, op_type t
0697                                         __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& info))
0698                 : type(char(t)), my_val(e), my_arg(nullptr)
0699                   __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo(const_cast<message_metainfo*>(&info))) {}
0700 
0701             // constructor for pointer parameter
0702             key_matching_port_operation(const input_type* p, op_type t
0703                                         __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo& info))
0704                 : type(char(t)), my_arg(const_cast<input_type*>(p))
0705                   __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo(&info)) {}
0706 
0707             // constructor with no parameter
0708             key_matching_port_operation(op_type t) : type(char(t)), my_arg(nullptr) {}
0709         };
0710 
0711         typedef d1::aggregating_functor<class_type, key_matching_port_operation> handler_type;
0712         friend class d1::aggregating_functor<class_type, key_matching_port_operation>;
0713         d1::aggregator<handler_type, key_matching_port_operation> my_aggregator;
0714 
0715         void handle_operations(key_matching_port_operation* op_list) {
0716             key_matching_port_operation *current;
0717             while(op_list) {
0718                 current = op_list;
0719                 op_list = op_list->next;
0720                 switch(current->type) {
0721                 case try__put: {
0722 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0723                         __TBB_ASSERT(current->metainfo, nullptr);
0724                         bool was_inserted = this->insert_with_key(current->my_val, *(current->metainfo));
0725 #else
0726                         bool was_inserted = this->insert_with_key(current->my_val);
0727 #endif
0728                         // return failure if a duplicate insertion occurs
0729                         current->status.store( was_inserted ? SUCCEEDED : FAILED, std::memory_order_release);
0730                     }
0731                     break;
0732                 case get__item: {
0733                     // use current_key from FE for item
0734                     __TBB_ASSERT(current->my_arg, nullptr);
0735 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0736                     __TBB_ASSERT(current->metainfo, nullptr);
0737                     bool find_result = this->find_with_key(my_join->current_key, *(current->my_arg),
0738                                                            *(current->metainfo));
0739 #else
0740                     bool find_result = this->find_with_key(my_join->current_key, *(current->my_arg));
0741 #endif
0742 #if TBB_USE_DEBUG
0743                     if (!find_result) {
0744                         __TBB_ASSERT(false, "Failed to find item corresponding to current_key.");
0745                     }
0746 #else
0747                     tbb::detail::suppress_unused_warning(find_result);
0748 #endif
0749                     current->status.store( SUCCEEDED, std::memory_order_release);
0750                     }
0751                     break;
0752                 case res_port:
0753                     // use current_key from FE for item
0754                     this->delete_with_key(my_join->current_key);
0755                     current->status.store( SUCCEEDED, std::memory_order_release);
0756                     break;
0757                 }
0758             }
0759         }
0760 // ------------ End Aggregator ---------------
0761     protected:
0762         template< typename R, typename B > friend class run_and_put_task;
0763         template<typename X, typename Y> friend class broadcast_cache;
0764         template<typename X, typename Y> friend class round_robin_cache;
0765     private:
0766         graph_task* try_put_task_impl(const input_type& v __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) {
0767             key_matching_port_operation op_data(v, try__put __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0768             graph_task* rtask = nullptr;
0769             my_aggregator.execute(&op_data);
0770             if(op_data.status == SUCCEEDED) {
0771                 rtask = my_join->increment_key_count((*(this->get_key_func()))(v)); // may spawn
0772                 // rtask has to reflect the return status of the try_put
0773                 if(!rtask) rtask = SUCCESSFULLY_ENQUEUED;
0774             }
0775             return rtask;
0776         }
0777     protected:
0778         graph_task* try_put_task(const input_type& v) override {
0779             return try_put_task_impl(v __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
0780         }
0781 
0782 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0783         graph_task* try_put_task(const input_type& v, const message_metainfo& metainfo) override {
0784             return try_put_task_impl(v, metainfo);
0785         }
0786 #endif
0787 
0788         graph& graph_reference() const override {
0789             return my_join->graph_ref;
0790         }
0791 
0792     public:
0793 
0794         key_matching_port() : receiver<input_type>(), buffer_type() {
0795             my_join = nullptr;
0796             my_aggregator.initialize_handler(handler_type(this));
0797         }
0798 
0799         // copy constructor
0800         key_matching_port(const key_matching_port& /*other*/) = delete;
0801 #if __INTEL_COMPILER <= 2021
0802         // Suppress superfluous diagnostic about virtual keyword absence in a destructor of an inherited
0803         // class while the parent class has the virtual keyword for the destrocutor.
0804         virtual
0805 #endif
0806         ~key_matching_port() { }
0807 
0808         void set_join_node_pointer(forwarding_base *join) {
0809             my_join = dynamic_cast<matching_forwarding_base<key_type>*>(join);
0810         }
0811 
0812         void set_my_key_func(type_to_key_func_type *f) { this->set_key_func(f); }
0813 
0814         type_to_key_func_type* get_my_key_func() { return this->get_key_func(); }
0815 
0816         bool get_item( input_type &v ) {
0817             // aggregator uses current_key from FE for Key
0818             key_matching_port_operation op_data(&v, get__item);
0819             my_aggregator.execute(&op_data);
0820             return op_data.status == SUCCEEDED;
0821         }
0822 
0823 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0824         bool get_item( input_type& v, message_metainfo& metainfo ) {
0825             // aggregator uses current_key from FE for Key
0826             key_matching_port_operation op_data(&v, get__item, metainfo);
0827             my_aggregator.execute(&op_data);
0828             return op_data.status == SUCCEEDED;
0829         }
0830 #endif
0831 
0832         // reset_port is called when item is accepted by successor, but
0833         // is initiated by join_node.
0834         void reset_port() {
0835             key_matching_port_operation op_data(res_port);
0836             my_aggregator.execute(&op_data);
0837             return;
0838         }
0839 
0840         void reset_receiver(reset_flags ) {
0841             buffer_type::reset();
0842         }
0843 
0844     private:
0845         // my_join forwarding base used to count number of inputs that
0846         // received key.
0847         matching_forwarding_base<key_type> *my_join;
0848     };  // key_matching_port
0849 
0850     using namespace graph_policy_namespace;
0851 
0852     template<typename JP, typename InputTuple, typename OutputTuple>
0853     class join_node_base;
0854 
0855     //! join_node_FE : implements input port policy
0856     template<typename JP, typename InputTuple, typename OutputTuple>
0857     class join_node_FE;
0858 
0859     template<typename InputTuple, typename OutputTuple>
0860     class join_node_FE<reserving, InputTuple, OutputTuple> : public reserving_forwarding_base {
0861     private:
0862         static const int N = std::tuple_size<OutputTuple>::value;
0863         typedef OutputTuple output_type;
0864         typedef InputTuple input_type;
0865         typedef join_node_base<reserving, InputTuple, OutputTuple> base_node_type; // for forwarding
0866     public:
0867         join_node_FE(graph &g) : reserving_forwarding_base(g), my_node(nullptr) {
0868             ports_with_no_inputs = N;
0869             join_helper<N>::set_join_node_pointer(my_inputs, this);
0870         }
0871 
0872         join_node_FE(const join_node_FE& other) : reserving_forwarding_base((other.reserving_forwarding_base::graph_ref)), my_node(nullptr) {
0873             ports_with_no_inputs = N;
0874             join_helper<N>::set_join_node_pointer(my_inputs, this);
0875         }
0876 
0877         void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
0878 
0879        void increment_port_count() override {
0880             ++ports_with_no_inputs;
0881         }
0882 
0883         // if all input_ports have predecessors, spawn forward to try and consume tuples
0884         graph_task* decrement_port_count() override {
0885             if(ports_with_no_inputs.fetch_sub(1) == 1) {
0886                 if(is_graph_active(this->graph_ref)) {
0887                     d1::small_object_allocator allocator{};
0888                     typedef forward_task_bypass<base_node_type> task_type;
0889                     graph_task* t = allocator.new_object<task_type>(graph_ref, allocator, *my_node);
0890                     spawn_in_graph_arena(this->graph_ref, *t);
0891                 }
0892             }
0893             return nullptr;
0894         }
0895 
0896         input_type &input_ports() { return my_inputs; }
0897 
0898     protected:
0899 
0900         void reset(  reset_flags f) {
0901             // called outside of parallel contexts
0902             ports_with_no_inputs = N;
0903             join_helper<N>::reset_inputs(my_inputs, f);
0904         }
0905 
0906         // all methods on input ports should be called under mutual exclusion from join_node_base.
0907 
0908         bool tuple_build_may_succeed() {
0909             return !ports_with_no_inputs;
0910         }
0911 
0912         bool try_to_make_tuple(output_type &out) {
0913             if(ports_with_no_inputs) return false;
0914             return join_helper<N>::reserve(my_inputs, out);
0915         }
0916 
0917 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0918         bool try_to_make_tuple(output_type &out, message_metainfo& metainfo) {
0919             if (ports_with_no_inputs) return false;
0920             return join_helper<N>::reserve(my_inputs, out, metainfo);
0921         }
0922 #endif
0923 
0924         void tuple_accepted() {
0925             join_helper<N>::consume_reservations(my_inputs);
0926         }
0927         void tuple_rejected() {
0928             join_helper<N>::release_reservations(my_inputs);
0929         }
0930 
0931         input_type my_inputs;
0932         base_node_type *my_node;
0933         std::atomic<std::size_t> ports_with_no_inputs;
0934     };  // join_node_FE<reserving, ... >
0935 
0936     template<typename InputTuple, typename OutputTuple>
0937     class join_node_FE<queueing, InputTuple, OutputTuple> : public queueing_forwarding_base {
0938     public:
0939         static const int N = std::tuple_size<OutputTuple>::value;
0940         typedef OutputTuple output_type;
0941         typedef InputTuple input_type;
0942         typedef join_node_base<queueing, InputTuple, OutputTuple> base_node_type; // for forwarding
0943 
0944         join_node_FE(graph &g) : queueing_forwarding_base(g), my_node(nullptr) {
0945             ports_with_no_items = N;
0946             join_helper<N>::set_join_node_pointer(my_inputs, this);
0947         }
0948 
0949         join_node_FE(const join_node_FE& other) : queueing_forwarding_base((other.queueing_forwarding_base::graph_ref)), my_node(nullptr) {
0950             ports_with_no_items = N;
0951             join_helper<N>::set_join_node_pointer(my_inputs, this);
0952         }
0953 
0954         // needed for forwarding
0955         void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
0956 
0957         void reset_port_count() {
0958             ports_with_no_items = N;
0959         }
0960 
0961         // if all input_ports have items, spawn forward to try and consume tuples
0962         graph_task* decrement_port_count(bool handle_task) override
0963         {
0964             if(ports_with_no_items.fetch_sub(1) == 1) {
0965                 if(is_graph_active(this->graph_ref)) {
0966                     d1::small_object_allocator allocator{};
0967                     typedef forward_task_bypass<base_node_type> task_type;
0968                     graph_task* t = allocator.new_object<task_type>(graph_ref, allocator, *my_node);
0969                     if( !handle_task )
0970                         return t;
0971                     spawn_in_graph_arena(this->graph_ref, *t);
0972                 }
0973             }
0974             return nullptr;
0975         }
0976 
0977         input_type &input_ports() { return my_inputs; }
0978 
0979     protected:
0980 
0981         void reset(  reset_flags f) {
0982             reset_port_count();
0983             join_helper<N>::reset_inputs(my_inputs, f );
0984         }
0985 
0986         // all methods on input ports should be called under mutual exclusion from join_node_base.
0987 
0988         bool tuple_build_may_succeed() {
0989             return !ports_with_no_items;
0990         }
0991 
0992         bool try_to_make_tuple(output_type &out) {
0993             if(ports_with_no_items) return false;
0994             return join_helper<N>::get_items(my_inputs, out);
0995         }
0996 
0997 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0998         bool try_to_make_tuple(output_type &out, message_metainfo& metainfo) {
0999             if(ports_with_no_items) return false;
1000             return join_helper<N>::get_items(my_inputs, out, metainfo);
1001         }
1002 #endif
1003 
1004         void tuple_accepted() {
1005             reset_port_count();
1006             join_helper<N>::reset_ports(my_inputs);
1007         }
1008         void tuple_rejected() {
1009             // nothing to do.
1010         }
1011 
1012         input_type my_inputs;
1013         base_node_type *my_node;
1014         std::atomic<std::size_t> ports_with_no_items;
1015     };  // join_node_FE<queueing, ...>
1016 
1017     // key_matching join front-end.
1018     template<typename InputTuple, typename OutputTuple, typename K, typename KHash>
1019     class join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple> : public matching_forwarding_base<K>,
1020              // buffer of key value counts
1021               public hash_buffer<   // typedefed below to key_to_count_buffer_type
1022                   typename std::decay<K>::type&,        // force ref type on K
1023                   count_element<typename std::decay<K>::type>,
1024                   type_to_key_function_body<
1025                       count_element<typename std::decay<K>::type>,
1026                       typename std::decay<K>::type& >,
1027                   KHash >,
1028              // buffer of output items
1029              public item_buffer<OutputTuple> {
1030     public:
1031         static const int N = std::tuple_size<OutputTuple>::value;
1032         typedef OutputTuple output_type;
1033         typedef InputTuple input_type;
1034         typedef K key_type;
1035         typedef typename std::decay<key_type>::type unref_key_type;
1036         typedef KHash key_hash_compare;
1037         // must use K without ref.
1038         typedef count_element<unref_key_type> count_element_type;
1039         // method that lets us refer to the key of this type.
1040         typedef key_to_count_functor<unref_key_type> key_to_count_func;
1041         typedef type_to_key_function_body< count_element_type, unref_key_type&> TtoK_function_body_type;
1042         typedef type_to_key_function_body_leaf<count_element_type, unref_key_type&, key_to_count_func> TtoK_function_body_leaf_type;
1043         // this is the type of the special table that keeps track of the number of discrete
1044         // elements corresponding to each key that we've seen.
1045         typedef hash_buffer< unref_key_type&, count_element_type, TtoK_function_body_type, key_hash_compare >
1046                  key_to_count_buffer_type;
1047         typedef item_buffer<output_type> output_buffer_type;
1048         typedef join_node_base<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> base_node_type; // for forwarding
1049         typedef matching_forwarding_base<key_type> forwarding_base_type;
1050 
1051 // ----------- Aggregator ------------
1052         // the aggregator is only needed to serialize the access to the hash table.
1053         // and the output_buffer_type base class
1054     private:
1055         enum op_type { res_count, inc_count, may_succeed, try_make };
1056         typedef join_node_FE<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> class_type;
1057 
1058         class key_matching_FE_operation : public d1::aggregated_operation<key_matching_FE_operation> {
1059         public:
1060             char type;
1061             unref_key_type my_val;
1062             output_type* my_output;
1063             graph_task* bypass_t;
1064 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1065             message_metainfo* metainfo = nullptr;
1066 #endif
1067             // constructor for value parameter
1068             key_matching_FE_operation(const unref_key_type& e , op_type t) : type(char(t)), my_val(e),
1069                  my_output(nullptr), bypass_t(nullptr) {}
1070             key_matching_FE_operation(output_type *p, op_type t) : type(char(t)), my_output(p), bypass_t(nullptr) {}
1071 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1072             key_matching_FE_operation(output_type *p, op_type t, message_metainfo& info)
1073                 : type(char(t)), my_output(p), bypass_t(nullptr), metainfo(&info) {}
1074 #endif
1075             // constructor with no parameter
1076             key_matching_FE_operation(op_type t) : type(char(t)), my_output(nullptr), bypass_t(nullptr) {}
1077         };
1078 
1079         typedef d1::aggregating_functor<class_type, key_matching_FE_operation> handler_type;
1080         friend class d1::aggregating_functor<class_type, key_matching_FE_operation>;
1081         d1::aggregator<handler_type, key_matching_FE_operation> my_aggregator;
1082 
1083         // called from aggregator, so serialized
1084         // returns a task pointer if the a task would have been enqueued but we asked that
1085         // it be returned.  Otherwise returns nullptr.
1086         graph_task* fill_output_buffer(unref_key_type &t) {
1087             output_type l_out;
1088             graph_task* rtask = nullptr;
1089             bool do_fwd = this->buffer_empty() && is_graph_active(this->graph_ref);
1090             this->current_key = t;
1091             this->delete_with_key(this->current_key);   // remove the key
1092 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1093             message_metainfo metainfo;
1094 #endif
1095             if(join_helper<N>::get_items(my_inputs, l_out __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo))) {  //  <== call back
1096                 this->push_back(l_out __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
1097                 if(do_fwd) {  // we enqueue if receiving an item from predecessor, not if successor asks for item
1098                     d1::small_object_allocator allocator{};
1099                     typedef forward_task_bypass<base_node_type> task_type;
1100                     rtask = allocator.new_object<task_type>(this->graph_ref, allocator, *my_node);
1101                     do_fwd = false;
1102                 }
1103                 // retire the input values
1104                 join_helper<N>::reset_ports(my_inputs);  //  <== call back
1105             }
1106             else {
1107                 __TBB_ASSERT(false, "should have had something to push");
1108             }
1109             return rtask;
1110         }
1111 
1112         void handle_operations(key_matching_FE_operation* op_list) {
1113             key_matching_FE_operation *current;
1114             while(op_list) {
1115                 current = op_list;
1116                 op_list = op_list->next;
1117                 switch(current->type) {
1118                 case res_count:  // called from BE
1119                     {
1120                         this->destroy_front();
1121                         current->status.store( SUCCEEDED, std::memory_order_release);
1122                     }
1123                     break;
1124                 case inc_count: {  // called from input ports
1125                         count_element_type *p = nullptr;
1126                         unref_key_type &t = current->my_val;
1127                         if(!(this->find_ref_with_key(t,p))) {
1128                             count_element_type ev;
1129                             ev.my_key = t;
1130                             ev.my_value = 0;
1131                             this->insert_with_key(ev);
1132                             bool found = this->find_ref_with_key(t, p);
1133                             __TBB_ASSERT_EX(found, "should find key after inserting it");
1134                         }
1135                         if(++(p->my_value) == size_t(N)) {
1136                             current->bypass_t = fill_output_buffer(t);
1137                         }
1138                     }
1139                     current->status.store( SUCCEEDED, std::memory_order_release);
1140                     break;
1141                 case may_succeed:  // called from BE
1142                     current->status.store( this->buffer_empty() ? FAILED : SUCCEEDED, std::memory_order_release);
1143                     break;
1144                 case try_make:  // called from BE
1145                     if(this->buffer_empty()) {
1146                         current->status.store( FAILED, std::memory_order_release);
1147                     }
1148                     else {
1149                         *(current->my_output) = this->front();
1150 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1151                         if (current->metainfo) {
1152                             *(current->metainfo) = this->front_metainfo();
1153                         }
1154 #endif
1155                         current->status.store( SUCCEEDED, std::memory_order_release);
1156                     }
1157                     break;
1158                 }
1159             }
1160         }
1161 // ------------ End Aggregator ---------------
1162 
1163     public:
1164         template<typename FunctionTuple>
1165         join_node_FE(graph &g, FunctionTuple &TtoK_funcs) : forwarding_base_type(g), my_node(nullptr) {
1166             join_helper<N>::set_join_node_pointer(my_inputs, this);
1167             join_helper<N>::set_key_functors(my_inputs, TtoK_funcs);
1168             my_aggregator.initialize_handler(handler_type(this));
1169                     TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func());
1170             this->set_key_func(cfb);
1171         }
1172 
1173         join_node_FE(const join_node_FE& other) : forwarding_base_type((other.forwarding_base_type::graph_ref)), key_to_count_buffer_type(),
1174         output_buffer_type() {
1175             my_node = nullptr;
1176             join_helper<N>::set_join_node_pointer(my_inputs, this);
1177             join_helper<N>::copy_key_functors(my_inputs, const_cast<input_type &>(other.my_inputs));
1178             my_aggregator.initialize_handler(handler_type(this));
1179             TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func());
1180             this->set_key_func(cfb);
1181         }
1182 
1183         // needed for forwarding
1184         void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
1185 
1186         void reset_port_count() {  // called from BE
1187             key_matching_FE_operation op_data(res_count);
1188             my_aggregator.execute(&op_data);
1189             return;
1190         }
1191 
1192         // if all input_ports have items, spawn forward to try and consume tuples
1193         // return a task if we are asked and did create one.
1194         graph_task *increment_key_count(unref_key_type const & t) override {  // called from input_ports
1195             key_matching_FE_operation op_data(t, inc_count);
1196             my_aggregator.execute(&op_data);
1197             return op_data.bypass_t;
1198         }
1199 
1200         input_type &input_ports() { return my_inputs; }
1201 
1202     protected:
1203 
1204         void reset(  reset_flags f ) {
1205             // called outside of parallel contexts
1206             join_helper<N>::reset_inputs(my_inputs, f);
1207 
1208             key_to_count_buffer_type::reset();
1209             output_buffer_type::reset();
1210         }
1211 
1212         // all methods on input ports should be called under mutual exclusion from join_node_base.
1213 
1214         bool tuple_build_may_succeed() {  // called from back-end
1215             key_matching_FE_operation op_data(may_succeed);
1216             my_aggregator.execute(&op_data);
1217             return op_data.status == SUCCEEDED;
1218         }
1219 
1220         // cannot lock while calling back to input_ports.  current_key will only be set
1221         // and reset under the aggregator, so it will remain consistent.
1222         bool try_to_make_tuple(output_type &out) {
1223             key_matching_FE_operation op_data(&out,try_make);
1224             my_aggregator.execute(&op_data);
1225             return op_data.status == SUCCEEDED;
1226         }
1227 
1228 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1229         bool try_to_make_tuple(output_type &out, message_metainfo& metainfo) {
1230             key_matching_FE_operation op_data(&out, try_make, metainfo);
1231             my_aggregator.execute(&op_data);
1232             return op_data.status == SUCCEEDED;
1233         }
1234 #endif
1235 
1236         void tuple_accepted() {
1237             reset_port_count();  // reset current_key after ports reset.
1238         }
1239 
1240         void tuple_rejected() {
1241             // nothing to do.
1242         }
1243 
1244         input_type my_inputs;  // input ports
1245         base_node_type *my_node;
1246     }; // join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple>
1247 
1248     //! join_node_base
1249     template<typename JP, typename InputTuple, typename OutputTuple>
1250     class join_node_base : public graph_node, public join_node_FE<JP, InputTuple, OutputTuple>,
1251                            public sender<OutputTuple> {
1252     protected:
1253         using graph_node::my_graph;
1254     public:
1255         typedef OutputTuple output_type;
1256 
1257         typedef typename sender<output_type>::successor_type successor_type;
1258         typedef join_node_FE<JP, InputTuple, OutputTuple> input_ports_type;
1259         using input_ports_type::tuple_build_may_succeed;
1260         using input_ports_type::try_to_make_tuple;
1261         using input_ports_type::tuple_accepted;
1262         using input_ports_type::tuple_rejected;
1263 
1264     private:
1265         // ----------- Aggregator ------------
1266         enum op_type { reg_succ, rem_succ, try__get, do_fwrd, do_fwrd_bypass
1267         };
1268         typedef join_node_base<JP,InputTuple,OutputTuple> class_type;
1269 
1270         class join_node_base_operation : public d1::aggregated_operation<join_node_base_operation> {
1271         public:
1272             char type;
1273             union {
1274                 output_type *my_arg;
1275                 successor_type *my_succ;
1276             };
1277             graph_task* bypass_t;
1278 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1279             message_metainfo* metainfo;
1280 #endif
1281             join_node_base_operation(const output_type& e, op_type t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo& info))
1282                 : type(char(t)), my_arg(const_cast<output_type*>(&e)), bypass_t(nullptr)
1283                   __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo(&info)) {}
1284 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1285             join_node_base_operation(const output_type& e, op_type t)
1286                 : type(char(t)), my_arg(const_cast<output_type*>(&e)), bypass_t(nullptr), metainfo(nullptr) {}
1287 #endif
1288             join_node_base_operation(const successor_type &s, op_type t) : type(char(t)),
1289                 my_succ(const_cast<successor_type *>(&s)), bypass_t(nullptr) {}
1290             join_node_base_operation(op_type t) : type(char(t)), bypass_t(nullptr) {}
1291         };
1292 
1293         typedef d1::aggregating_functor<class_type, join_node_base_operation> handler_type;
1294         friend class d1::aggregating_functor<class_type, join_node_base_operation>;
1295         bool forwarder_busy;
1296         d1::aggregator<handler_type, join_node_base_operation> my_aggregator;
1297 
1298         void handle_operations(join_node_base_operation* op_list) {
1299             join_node_base_operation *current;
1300             while(op_list) {
1301                 current = op_list;
1302                 op_list = op_list->next;
1303                 switch(current->type) {
1304                 case reg_succ: {
1305                         my_successors.register_successor(*(current->my_succ));
1306                         if(tuple_build_may_succeed() && !forwarder_busy && is_graph_active(my_graph)) {
1307                             d1::small_object_allocator allocator{};
1308                             typedef forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> > task_type;
1309                             graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
1310                             spawn_in_graph_arena(my_graph, *t);
1311                             forwarder_busy = true;
1312                         }
1313                         current->status.store( SUCCEEDED, std::memory_order_release);
1314                     }
1315                     break;
1316                 case rem_succ:
1317                     my_successors.remove_successor(*(current->my_succ));
1318                     current->status.store( SUCCEEDED, std::memory_order_release);
1319                     break;
1320                 case try__get:
1321                     if(tuple_build_may_succeed()) {
1322                         bool make_tuple_result = false;
1323 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1324                         if (current->metainfo) {
1325                             make_tuple_result = try_to_make_tuple(*(current->my_arg), *(current->metainfo));
1326                         } else
1327 #endif
1328                         {
1329                             make_tuple_result = try_to_make_tuple(*(current->my_arg));
1330                         }
1331                         if(make_tuple_result) {
1332 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1333                             if (current->metainfo) {
1334                                 // Since elements would be removed from queues while calling to tuple_accepted
1335                                 // together with corresponding message_metainfo objects
1336                                 // we need to prolong the wait until the successor would create a task for removed elements
1337                                 for (auto waiter : current->metainfo->waiters()) {
1338                                     waiter->reserve(1);
1339                                 }
1340                             }
1341 #endif
1342                             tuple_accepted();
1343                             current->status.store( SUCCEEDED, std::memory_order_release);
1344                         }
1345                         else current->status.store( FAILED, std::memory_order_release);
1346                     }
1347                     else current->status.store( FAILED, std::memory_order_release);
1348                     break;
1349                 case do_fwrd_bypass: {
1350                         bool build_succeeded;
1351                         graph_task *last_task = nullptr;
1352                         output_type out;
1353                         // forwarding must be exclusive, because try_to_make_tuple and tuple_accepted
1354                         // are separate locked methods in the FE.  We could conceivably fetch the front
1355                         // of the FE queue, then be swapped out, have someone else consume the FE's
1356                         // object, then come back, forward, and then try to remove it from the queue
1357                         // again. Without reservation of the FE, the methods accessing it must be locked.
1358                         // We could remember the keys of the objects we forwarded, and then remove
1359                         // them from the input ports after forwarding is complete?
1360                         if(tuple_build_may_succeed()) {  // checks output queue of FE
1361                             do {
1362 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1363                                 message_metainfo metainfo;
1364 #endif
1365                                 // fetch front_end of queue
1366                                 build_succeeded = try_to_make_tuple(out __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
1367                                 if(build_succeeded) {
1368                                     graph_task *new_task =
1369                                         my_successors.try_put_task(out __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
1370                                     last_task = combine_tasks(my_graph, last_task, new_task);
1371                                     if(new_task) {
1372                                         tuple_accepted();
1373                                     }
1374                                     else {
1375                                         tuple_rejected();
1376                                         build_succeeded = false;
1377                                     }
1378                                 }
1379                             } while(build_succeeded);
1380                         }
1381                         current->bypass_t = last_task;
1382                         current->status.store( SUCCEEDED, std::memory_order_release);
1383                         forwarder_busy = false;
1384                     }
1385                     break;
1386                 }
1387             }
1388         }
1389         // ---------- end aggregator -----------
1390     public:
1391         join_node_base(graph &g)
1392             : graph_node(g), input_ports_type(g), forwarder_busy(false), my_successors(this)
1393         {
1394             input_ports_type::set_my_node(this);
1395             my_aggregator.initialize_handler(handler_type(this));
1396         }
1397 
1398         join_node_base(const join_node_base& other) :
1399             graph_node(other.graph_node::my_graph), input_ports_type(other),
1400             sender<OutputTuple>(), forwarder_busy(false), my_successors(this)
1401         {
1402             input_ports_type::set_my_node(this);
1403             my_aggregator.initialize_handler(handler_type(this));
1404         }
1405 
1406         template<typename FunctionTuple>
1407         join_node_base(graph &g, FunctionTuple f)
1408             : graph_node(g), input_ports_type(g, f), forwarder_busy(false), my_successors(this)
1409         {
1410             input_ports_type::set_my_node(this);
1411             my_aggregator.initialize_handler(handler_type(this));
1412         }
1413 
1414         bool register_successor(successor_type &r) override {
1415             join_node_base_operation op_data(r, reg_succ);
1416             my_aggregator.execute(&op_data);
1417             return op_data.status == SUCCEEDED;
1418         }
1419 
1420         bool remove_successor( successor_type &r) override {
1421             join_node_base_operation op_data(r, rem_succ);
1422             my_aggregator.execute(&op_data);
1423             return op_data.status == SUCCEEDED;
1424         }
1425 
1426         bool try_get( output_type &v) override {
1427             join_node_base_operation op_data(v, try__get);
1428             my_aggregator.execute(&op_data);
1429             return op_data.status == SUCCEEDED;
1430         }
1431 
1432 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1433         bool try_get( output_type &v, message_metainfo& metainfo) override {
1434             join_node_base_operation op_data(v, try__get, metainfo);
1435             my_aggregator.execute(&op_data);
1436             return op_data.status == SUCCEEDED;
1437         }
1438 #endif
1439 
1440     protected:
1441         void reset_node(reset_flags f) override {
1442             input_ports_type::reset(f);
1443             if(f & rf_clear_edges) my_successors.clear();
1444         }
1445 
1446     private:
1447         broadcast_cache<output_type, null_rw_mutex> my_successors;
1448 
1449         friend class forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> >;
1450         graph_task *forward_task() {
1451             join_node_base_operation op_data(do_fwrd_bypass);
1452             my_aggregator.execute(&op_data);
1453             return op_data.bypass_t;
1454         }
1455 
1456     };  // join_node_base
1457 
1458     // join base class type generator
1459     template<int N, template<class> class PT, typename OutputTuple, typename JP>
1460     struct join_base {
1461         typedef join_node_base<JP, typename wrap_tuple_elements<N,PT,OutputTuple>::type, OutputTuple> type;
1462     };
1463 
1464     template<int N, typename OutputTuple, typename K, typename KHash>
1465     struct join_base<N, key_matching_port, OutputTuple, key_matching<K,KHash> > {
1466         typedef key_matching<K, KHash> key_traits_type;
1467         typedef K key_type;
1468         typedef KHash key_hash_compare;
1469         typedef join_node_base< key_traits_type,
1470                 // ports type
1471                 typename wrap_key_tuple_elements<N,key_matching_port,key_traits_type,OutputTuple>::type,
1472                 OutputTuple > type;
1473     };
1474 
1475     //! unfolded_join_node : passes input_ports_type to join_node_base.  We build the input port type
1476     //  using tuple_element.  The class PT is the port type (reserving_port, queueing_port, key_matching_port)
1477     //  and should match the typename.
1478 
1479     template<int M, template<class> class PT, typename OutputTuple, typename JP>
1480     class unfolded_join_node : public join_base<M,PT,OutputTuple,JP>::type {
1481     public:
1482         typedef typename wrap_tuple_elements<M, PT, OutputTuple>::type input_ports_type;
1483         typedef OutputTuple output_type;
1484     private:
1485         typedef join_node_base<JP, input_ports_type, output_type > base_type;
1486     public:
1487         unfolded_join_node(graph &g) : base_type(g) {}
1488         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1489     };
1490 
1491 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1492     template <typename K, typename T>
1493     struct key_from_message_body {
1494         K operator()(const T& t) const {
1495             return key_from_message<K>(t);
1496         }
1497     };
1498     // Adds const to reference type
1499     template <typename K, typename T>
1500     struct key_from_message_body<K&,T> {
1501         const K& operator()(const T& t) const {
1502             return key_from_message<const K&>(t);
1503         }
1504     };
1505 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1506     // key_matching unfolded_join_node.  This must be a separate specialization because the constructors
1507     // differ.
1508 
1509     template<typename OutputTuple, typename K, typename KHash>
1510     class unfolded_join_node<2,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1511             join_base<2,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1512         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1513         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1514     public:
1515         typedef typename wrap_key_tuple_elements<2,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1516         typedef OutputTuple output_type;
1517     private:
1518         typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1519         typedef type_to_key_function_body<T0, K> *f0_p;
1520         typedef type_to_key_function_body<T1, K> *f1_p;
1521         typedef std::tuple< f0_p, f1_p > func_initializer_type;
1522     public:
1523 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1524         unfolded_join_node(graph &g) : base_type(g,
1525                 func_initializer_type(
1526                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1527                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>())
1528                     ) ) {
1529         }
1530 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1531         template<typename Body0, typename Body1>
1532         unfolded_join_node(graph &g, Body0 body0, Body1 body1) : base_type(g,
1533                 func_initializer_type(
1534                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1535                     new type_to_key_function_body_leaf<T1, K, Body1>(body1)
1536                     ) ) {
1537             static_assert(std::tuple_size<OutputTuple>::value == 2, "wrong number of body initializers");
1538         }
1539         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1540     };
1541 
1542     template<typename OutputTuple, typename K, typename KHash>
1543     class unfolded_join_node<3,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1544             join_base<3,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1545         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1546         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1547         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1548     public:
1549         typedef typename wrap_key_tuple_elements<3,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1550         typedef OutputTuple output_type;
1551     private:
1552         typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1553         typedef type_to_key_function_body<T0, K> *f0_p;
1554         typedef type_to_key_function_body<T1, K> *f1_p;
1555         typedef type_to_key_function_body<T2, K> *f2_p;
1556         typedef std::tuple< f0_p, f1_p, f2_p > func_initializer_type;
1557     public:
1558 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1559         unfolded_join_node(graph &g) : base_type(g,
1560                 func_initializer_type(
1561                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1562                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1563                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>())
1564                     ) ) {
1565         }
1566 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1567         template<typename Body0, typename Body1, typename Body2>
1568         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2) : base_type(g,
1569                 func_initializer_type(
1570                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1571                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1572                     new type_to_key_function_body_leaf<T2, K, Body2>(body2)
1573                     ) ) {
1574             static_assert(std::tuple_size<OutputTuple>::value == 3, "wrong number of body initializers");
1575         }
1576         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1577     };
1578 
1579     template<typename OutputTuple, typename K, typename KHash>
1580     class unfolded_join_node<4,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1581             join_base<4,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1582         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1583         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1584         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1585         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1586     public:
1587         typedef typename wrap_key_tuple_elements<4,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1588         typedef OutputTuple output_type;
1589     private:
1590         typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1591         typedef type_to_key_function_body<T0, K> *f0_p;
1592         typedef type_to_key_function_body<T1, K> *f1_p;
1593         typedef type_to_key_function_body<T2, K> *f2_p;
1594         typedef type_to_key_function_body<T3, K> *f3_p;
1595         typedef std::tuple< f0_p, f1_p, f2_p, f3_p > func_initializer_type;
1596     public:
1597 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1598         unfolded_join_node(graph &g) : base_type(g,
1599                 func_initializer_type(
1600                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1601                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1602                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1603                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>())
1604                     ) ) {
1605         }
1606 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1607         template<typename Body0, typename Body1, typename Body2, typename Body3>
1608         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3) : base_type(g,
1609                 func_initializer_type(
1610                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1611                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1612                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1613                     new type_to_key_function_body_leaf<T3, K, Body3>(body3)
1614                     ) ) {
1615             static_assert(std::tuple_size<OutputTuple>::value == 4, "wrong number of body initializers");
1616         }
1617         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1618     };
1619 
1620     template<typename OutputTuple, typename K, typename KHash>
1621     class unfolded_join_node<5,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1622             join_base<5,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1623         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1624         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1625         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1626         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1627         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1628     public:
1629         typedef typename wrap_key_tuple_elements<5,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1630         typedef OutputTuple output_type;
1631     private:
1632         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1633         typedef type_to_key_function_body<T0, K> *f0_p;
1634         typedef type_to_key_function_body<T1, K> *f1_p;
1635         typedef type_to_key_function_body<T2, K> *f2_p;
1636         typedef type_to_key_function_body<T3, K> *f3_p;
1637         typedef type_to_key_function_body<T4, K> *f4_p;
1638         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p > func_initializer_type;
1639     public:
1640 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1641         unfolded_join_node(graph &g) : base_type(g,
1642                 func_initializer_type(
1643                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1644                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1645                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1646                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1647                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>())
1648                     ) ) {
1649         }
1650 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1651         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4>
1652         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4) : base_type(g,
1653                 func_initializer_type(
1654                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1655                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1656                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1657                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1658                     new type_to_key_function_body_leaf<T4, K, Body4>(body4)
1659                     ) ) {
1660             static_assert(std::tuple_size<OutputTuple>::value == 5, "wrong number of body initializers");
1661         }
1662         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1663     };
1664 
1665 #if __TBB_VARIADIC_MAX >= 6
1666     template<typename OutputTuple, typename K, typename KHash>
1667     class unfolded_join_node<6,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1668             join_base<6,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1669         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1670         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1671         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1672         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1673         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1674         typedef typename std::tuple_element<5, OutputTuple>::type T5;
1675     public:
1676         typedef typename wrap_key_tuple_elements<6,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1677         typedef OutputTuple output_type;
1678     private:
1679         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1680         typedef type_to_key_function_body<T0, K> *f0_p;
1681         typedef type_to_key_function_body<T1, K> *f1_p;
1682         typedef type_to_key_function_body<T2, K> *f2_p;
1683         typedef type_to_key_function_body<T3, K> *f3_p;
1684         typedef type_to_key_function_body<T4, K> *f4_p;
1685         typedef type_to_key_function_body<T5, K> *f5_p;
1686         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p > func_initializer_type;
1687     public:
1688 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1689         unfolded_join_node(graph &g) : base_type(g,
1690                 func_initializer_type(
1691                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1692                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1693                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1694                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1695                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1696                     new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>())
1697                     ) ) {
1698         }
1699 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1700         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, typename Body5>
1701         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, Body5 body5)
1702                 : base_type(g, func_initializer_type(
1703                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1704                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1705                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1706                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1707                     new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1708                     new type_to_key_function_body_leaf<T5, K, Body5>(body5)
1709                     ) ) {
1710             static_assert(std::tuple_size<OutputTuple>::value == 6, "wrong number of body initializers");
1711         }
1712         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1713     };
1714 #endif
1715 
1716 #if __TBB_VARIADIC_MAX >= 7
1717     template<typename OutputTuple, typename K, typename KHash>
1718     class unfolded_join_node<7,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1719             join_base<7,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1720         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1721         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1722         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1723         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1724         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1725         typedef typename std::tuple_element<5, OutputTuple>::type T5;
1726         typedef typename std::tuple_element<6, OutputTuple>::type T6;
1727     public:
1728         typedef typename wrap_key_tuple_elements<7,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1729         typedef OutputTuple output_type;
1730     private:
1731         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1732         typedef type_to_key_function_body<T0, K> *f0_p;
1733         typedef type_to_key_function_body<T1, K> *f1_p;
1734         typedef type_to_key_function_body<T2, K> *f2_p;
1735         typedef type_to_key_function_body<T3, K> *f3_p;
1736         typedef type_to_key_function_body<T4, K> *f4_p;
1737         typedef type_to_key_function_body<T5, K> *f5_p;
1738         typedef type_to_key_function_body<T6, K> *f6_p;
1739         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p > func_initializer_type;
1740     public:
1741 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1742         unfolded_join_node(graph &g) : base_type(g,
1743                 func_initializer_type(
1744                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1745                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1746                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1747                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1748                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1749                     new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1750                     new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>())
1751                     ) ) {
1752         }
1753 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1754         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1755                  typename Body5, typename Body6>
1756         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1757                 Body5 body5, Body6 body6) : base_type(g, func_initializer_type(
1758                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1759                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1760                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1761                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1762                     new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1763                     new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1764                     new type_to_key_function_body_leaf<T6, K, Body6>(body6)
1765                     ) ) {
1766             static_assert(std::tuple_size<OutputTuple>::value == 7, "wrong number of body initializers");
1767         }
1768         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1769     };
1770 #endif
1771 
1772 #if __TBB_VARIADIC_MAX >= 8
1773     template<typename OutputTuple, typename K, typename KHash>
1774     class unfolded_join_node<8,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1775             join_base<8,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1776         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1777         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1778         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1779         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1780         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1781         typedef typename std::tuple_element<5, OutputTuple>::type T5;
1782         typedef typename std::tuple_element<6, OutputTuple>::type T6;
1783         typedef typename std::tuple_element<7, OutputTuple>::type T7;
1784     public:
1785         typedef typename wrap_key_tuple_elements<8,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1786         typedef OutputTuple output_type;
1787     private:
1788         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1789         typedef type_to_key_function_body<T0, K> *f0_p;
1790         typedef type_to_key_function_body<T1, K> *f1_p;
1791         typedef type_to_key_function_body<T2, K> *f2_p;
1792         typedef type_to_key_function_body<T3, K> *f3_p;
1793         typedef type_to_key_function_body<T4, K> *f4_p;
1794         typedef type_to_key_function_body<T5, K> *f5_p;
1795         typedef type_to_key_function_body<T6, K> *f6_p;
1796         typedef type_to_key_function_body<T7, K> *f7_p;
1797         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p > func_initializer_type;
1798     public:
1799 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1800         unfolded_join_node(graph &g) : base_type(g,
1801                 func_initializer_type(
1802                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1803                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1804                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1805                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1806                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1807                     new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1808                     new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1809                     new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>())
1810                     ) ) {
1811         }
1812 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1813         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1814                  typename Body5, typename Body6, typename Body7>
1815         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1816                 Body5 body5, Body6 body6, Body7 body7) : base_type(g, func_initializer_type(
1817                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1818                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1819                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1820                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1821                     new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1822                     new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1823                     new type_to_key_function_body_leaf<T6, K, Body6>(body6),
1824                     new type_to_key_function_body_leaf<T7, K, Body7>(body7)
1825                     ) ) {
1826             static_assert(std::tuple_size<OutputTuple>::value == 8, "wrong number of body initializers");
1827         }
1828         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1829     };
1830 #endif
1831 
1832 #if __TBB_VARIADIC_MAX >= 9
1833     template<typename OutputTuple, typename K, typename KHash>
1834     class unfolded_join_node<9,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1835             join_base<9,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1836         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1837         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1838         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1839         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1840         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1841         typedef typename std::tuple_element<5, OutputTuple>::type T5;
1842         typedef typename std::tuple_element<6, OutputTuple>::type T6;
1843         typedef typename std::tuple_element<7, OutputTuple>::type T7;
1844         typedef typename std::tuple_element<8, OutputTuple>::type T8;
1845     public:
1846         typedef typename wrap_key_tuple_elements<9,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1847         typedef OutputTuple output_type;
1848     private:
1849         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1850         typedef type_to_key_function_body<T0, K> *f0_p;
1851         typedef type_to_key_function_body<T1, K> *f1_p;
1852         typedef type_to_key_function_body<T2, K> *f2_p;
1853         typedef type_to_key_function_body<T3, K> *f3_p;
1854         typedef type_to_key_function_body<T4, K> *f4_p;
1855         typedef type_to_key_function_body<T5, K> *f5_p;
1856         typedef type_to_key_function_body<T6, K> *f6_p;
1857         typedef type_to_key_function_body<T7, K> *f7_p;
1858         typedef type_to_key_function_body<T8, K> *f8_p;
1859         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p > func_initializer_type;
1860     public:
1861 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1862         unfolded_join_node(graph &g) : base_type(g,
1863                 func_initializer_type(
1864                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1865                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1866                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1867                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1868                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1869                     new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1870                     new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1871                     new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1872                     new type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>())
1873                     ) ) {
1874         }
1875 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1876         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1877                  typename Body5, typename Body6, typename Body7, typename Body8>
1878         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1879                 Body5 body5, Body6 body6, Body7 body7, Body8 body8) : base_type(g, func_initializer_type(
1880                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1881                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1882                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1883                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1884                     new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1885                     new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1886                     new type_to_key_function_body_leaf<T6, K, Body6>(body6),
1887                     new type_to_key_function_body_leaf<T7, K, Body7>(body7),
1888                     new type_to_key_function_body_leaf<T8, K, Body8>(body8)
1889                     ) ) {
1890             static_assert(std::tuple_size<OutputTuple>::value == 9, "wrong number of body initializers");
1891         }
1892         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1893     };
1894 #endif
1895 
1896 #if __TBB_VARIADIC_MAX >= 10
1897     template<typename OutputTuple, typename K, typename KHash>
1898     class unfolded_join_node<10,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1899             join_base<10,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1900         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1901         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1902         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1903         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1904         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1905         typedef typename std::tuple_element<5, OutputTuple>::type T5;
1906         typedef typename std::tuple_element<6, OutputTuple>::type T6;
1907         typedef typename std::tuple_element<7, OutputTuple>::type T7;
1908         typedef typename std::tuple_element<8, OutputTuple>::type T8;
1909         typedef typename std::tuple_element<9, OutputTuple>::type T9;
1910     public:
1911         typedef typename wrap_key_tuple_elements<10,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1912         typedef OutputTuple output_type;
1913     private:
1914         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1915         typedef type_to_key_function_body<T0, K> *f0_p;
1916         typedef type_to_key_function_body<T1, K> *f1_p;
1917         typedef type_to_key_function_body<T2, K> *f2_p;
1918         typedef type_to_key_function_body<T3, K> *f3_p;
1919         typedef type_to_key_function_body<T4, K> *f4_p;
1920         typedef type_to_key_function_body<T5, K> *f5_p;
1921         typedef type_to_key_function_body<T6, K> *f6_p;
1922         typedef type_to_key_function_body<T7, K> *f7_p;
1923         typedef type_to_key_function_body<T8, K> *f8_p;
1924         typedef type_to_key_function_body<T9, K> *f9_p;
1925         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p, f9_p > func_initializer_type;
1926     public:
1927 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1928         unfolded_join_node(graph &g) : base_type(g,
1929                 func_initializer_type(
1930                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1931                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1932                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1933                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1934                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1935                     new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1936                     new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1937                     new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1938                     new type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>()),
1939                     new type_to_key_function_body_leaf<T9, K, key_from_message_body<K,T9> >(key_from_message_body<K,T9>())
1940                     ) ) {
1941         }
1942 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1943         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1944             typename Body5, typename Body6, typename Body7, typename Body8, typename Body9>
1945         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1946                 Body5 body5, Body6 body6, Body7 body7, Body8 body8, Body9 body9) : base_type(g, func_initializer_type(
1947                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1948                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1949                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1950                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1951                     new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1952                     new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1953                     new type_to_key_function_body_leaf<T6, K, Body6>(body6),
1954                     new type_to_key_function_body_leaf<T7, K, Body7>(body7),
1955                     new type_to_key_function_body_leaf<T8, K, Body8>(body8),
1956                     new type_to_key_function_body_leaf<T9, K, Body9>(body9)
1957                     ) ) {
1958             static_assert(std::tuple_size<OutputTuple>::value == 10, "wrong number of body initializers");
1959         }
1960         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1961     };
1962 #endif
1963 
1964     //! templated function to refer to input ports of the join node
1965     template<size_t N, typename JNT>
1966     typename std::tuple_element<N, typename JNT::input_ports_type>::type &input_port(JNT &jn) {
1967         return std::get<N>(jn.input_ports());
1968     }
1969 
1970 #endif // __TBB__flow_graph_join_impl_H