Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:12:48

0001 /*
0002     Copyright (c) 2005-2020 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB__flow_graph_join_impl_H
0018 #define __TBB__flow_graph_join_impl_H
0019 
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023 
0024 namespace internal {
0025 
0026     struct forwarding_base : tbb::internal::no_assign {
0027         forwarding_base(graph &g) : graph_ref(g) {}
0028         virtual ~forwarding_base() {}
0029         // decrement_port_count may create a forwarding task.  If we cannot handle the task
0030         // ourselves, ask decrement_port_count to deal with it.
0031         virtual task * decrement_port_count(bool handle_task) = 0;
0032         virtual void increment_port_count() = 0;
0033         // moved here so input ports can queue tasks
0034         graph& graph_ref;
0035     };
0036 
0037     // specialization that lets us keep a copy of the current_key for building results.
0038     // KeyType can be a reference type.
0039     template<typename KeyType>
0040     struct matching_forwarding_base : public forwarding_base {
0041         typedef typename tbb::internal::strip<KeyType>::type current_key_type;
0042         matching_forwarding_base(graph &g) : forwarding_base(g) { }
0043         virtual task * increment_key_count(current_key_type const & /*t*/, bool /*handle_task*/) = 0; // {return NULL;}
0044         current_key_type current_key; // so ports can refer to FE's desired items
0045     };
0046 
0047     template< int N >
0048     struct join_helper {
0049 
0050         template< typename TupleType, typename PortType >
0051         static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
0052             tbb::flow::get<N-1>( my_input ).set_join_node_pointer(port);
0053             join_helper<N-1>::set_join_node_pointer( my_input, port );
0054         }
0055         template< typename TupleType >
0056         static inline void consume_reservations( TupleType &my_input ) {
0057             tbb::flow::get<N-1>( my_input ).consume();
0058             join_helper<N-1>::consume_reservations( my_input );
0059         }
0060 
0061         template< typename TupleType >
0062         static inline void release_my_reservation( TupleType &my_input ) {
0063             tbb::flow::get<N-1>( my_input ).release();
0064         }
0065 
0066         template <typename TupleType>
0067         static inline void release_reservations( TupleType &my_input) {
0068             join_helper<N-1>::release_reservations(my_input);
0069             release_my_reservation(my_input);
0070         }
0071 
0072         template< typename InputTuple, typename OutputTuple >
0073         static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
0074             if ( !tbb::flow::get<N-1>( my_input ).reserve( tbb::flow::get<N-1>( out ) ) ) return false;
0075             if ( !join_helper<N-1>::reserve( my_input, out ) ) {
0076                 release_my_reservation( my_input );
0077                 return false;
0078             }
0079             return true;
0080         }
0081 
0082         template<typename InputTuple, typename OutputTuple>
0083         static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
0084             bool res = tbb::flow::get<N-1>(my_input).get_item(tbb::flow::get<N-1>(out) ); // may fail
0085             return join_helper<N-1>::get_my_item(my_input, out) && res;       // do get on other inputs before returning
0086         }
0087 
0088         template<typename InputTuple, typename OutputTuple>
0089         static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
0090             return get_my_item(my_input, out);
0091         }
0092 
0093         template<typename InputTuple>
0094         static inline void reset_my_port(InputTuple &my_input) {
0095             join_helper<N-1>::reset_my_port(my_input);
0096             tbb::flow::get<N-1>(my_input).reset_port();
0097         }
0098 
0099         template<typename InputTuple>
0100         static inline void reset_ports(InputTuple& my_input) {
0101             reset_my_port(my_input);
0102         }
0103 
0104         template<typename InputTuple, typename KeyFuncTuple>
0105         static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
0106             tbb::flow::get<N-1>(my_input).set_my_key_func(tbb::flow::get<N-1>(my_key_funcs));
0107             tbb::flow::get<N-1>(my_key_funcs) = NULL;
0108             join_helper<N-1>::set_key_functors(my_input, my_key_funcs);
0109         }
0110 
0111         template< typename KeyFuncTuple>
0112         static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
0113             if(tbb::flow::get<N-1>(other_inputs).get_my_key_func()) {
0114                 tbb::flow::get<N-1>(my_inputs).set_my_key_func(tbb::flow::get<N-1>(other_inputs).get_my_key_func()->clone());
0115             }
0116             join_helper<N-1>::copy_key_functors(my_inputs, other_inputs);
0117         }
0118 
0119         template<typename InputTuple>
0120         static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
0121             join_helper<N-1>::reset_inputs(my_input, f);
0122             tbb::flow::get<N-1>(my_input).reset_receiver(f);
0123         }
0124 
0125 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0126         template<typename InputTuple>
0127         static inline void extract_inputs(InputTuple &my_input) {
0128             join_helper<N-1>::extract_inputs(my_input);
0129             tbb::flow::get<N-1>(my_input).extract_receiver();
0130         }
0131 #endif
0132     };  // join_helper<N>
0133 
0134     template< >
0135     struct join_helper<1> {
0136 
0137         template< typename TupleType, typename PortType >
0138         static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
0139             tbb::flow::get<0>( my_input ).set_join_node_pointer(port);
0140         }
0141 
0142         template< typename TupleType >
0143         static inline void consume_reservations( TupleType &my_input ) {
0144             tbb::flow::get<0>( my_input ).consume();
0145         }
0146 
0147         template< typename TupleType >
0148         static inline void release_my_reservation( TupleType &my_input ) {
0149             tbb::flow::get<0>( my_input ).release();
0150         }
0151 
0152         template<typename TupleType>
0153         static inline void release_reservations( TupleType &my_input) {
0154             release_my_reservation(my_input);
0155         }
0156 
0157         template< typename InputTuple, typename OutputTuple >
0158         static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
0159             return tbb::flow::get<0>( my_input ).reserve( tbb::flow::get<0>( out ) );
0160         }
0161 
0162         template<typename InputTuple, typename OutputTuple>
0163         static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
0164             return tbb::flow::get<0>(my_input).get_item(tbb::flow::get<0>(out));
0165         }
0166 
0167         template<typename InputTuple, typename OutputTuple>
0168         static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
0169             return get_my_item(my_input, out);
0170         }
0171 
0172         template<typename InputTuple>
0173         static inline void reset_my_port(InputTuple &my_input) {
0174             tbb::flow::get<0>(my_input).reset_port();
0175         }
0176 
0177         template<typename InputTuple>
0178         static inline void reset_ports(InputTuple& my_input) {
0179             reset_my_port(my_input);
0180         }
0181 
0182         template<typename InputTuple, typename KeyFuncTuple>
0183         static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
0184             tbb::flow::get<0>(my_input).set_my_key_func(tbb::flow::get<0>(my_key_funcs));
0185             tbb::flow::get<0>(my_key_funcs) = NULL;
0186         }
0187 
0188         template< typename KeyFuncTuple>
0189         static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
0190             if(tbb::flow::get<0>(other_inputs).get_my_key_func()) {
0191                 tbb::flow::get<0>(my_inputs).set_my_key_func(tbb::flow::get<0>(other_inputs).get_my_key_func()->clone());
0192             }
0193         }
0194         template<typename InputTuple>
0195         static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
0196             tbb::flow::get<0>(my_input).reset_receiver(f);
0197         }
0198 
0199 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0200         template<typename InputTuple>
0201         static inline void extract_inputs(InputTuple &my_input) {
0202             tbb::flow::get<0>(my_input).extract_receiver();
0203         }
0204 #endif
0205     };  // join_helper<1>
0206 
0207     //! The two-phase join port
0208     template< typename T >
0209     class reserving_port : public receiver<T> {
0210     public:
0211         typedef T input_type;
0212         typedef typename receiver<input_type>::predecessor_type predecessor_type;
0213 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0214         typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
0215         typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
0216 #endif
0217     private:
0218         // ----------- Aggregator ------------
0219         enum op_type { reg_pred, rem_pred, res_item, rel_res, con_res
0220 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0221             , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
0222 #endif
0223         };
0224         typedef reserving_port<T> class_type;
0225 
0226         class reserving_port_operation : public aggregated_operation<reserving_port_operation> {
0227         public:
0228             char type;
0229             union {
0230                 T *my_arg;
0231                 predecessor_type *my_pred;
0232 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0233                 size_t cnt_val;
0234                 predecessor_list_type *plist;
0235 #endif
0236             };
0237             reserving_port_operation(const T& e, op_type t) :
0238                 type(char(t)), my_arg(const_cast<T*>(&e)) {}
0239             reserving_port_operation(const predecessor_type &s, op_type t) : type(char(t)),
0240                 my_pred(const_cast<predecessor_type *>(&s)) {}
0241             reserving_port_operation(op_type t) : type(char(t)) {}
0242         };
0243 
0244         typedef internal::aggregating_functor<class_type, reserving_port_operation> handler_type;
0245         friend class internal::aggregating_functor<class_type, reserving_port_operation>;
0246         aggregator<handler_type, reserving_port_operation> my_aggregator;
0247 
0248         void handle_operations(reserving_port_operation* op_list) {
0249             reserving_port_operation *current;
0250             bool no_predecessors;
0251             while(op_list) {
0252                 current = op_list;
0253                 op_list = op_list->next;
0254                 switch(current->type) {
0255                 case reg_pred:
0256                     no_predecessors = my_predecessors.empty();
0257                     my_predecessors.add(*(current->my_pred));
0258                     if ( no_predecessors ) {
0259                         (void) my_join->decrement_port_count(true); // may try to forward
0260                     }
0261                     __TBB_store_with_release(current->status, SUCCEEDED);
0262                     break;
0263                 case rem_pred:
0264                     my_predecessors.remove(*(current->my_pred));
0265                     if(my_predecessors.empty()) my_join->increment_port_count();
0266                     __TBB_store_with_release(current->status, SUCCEEDED);
0267                     break;
0268                 case res_item:
0269                     if ( reserved ) {
0270                         __TBB_store_with_release(current->status, FAILED);
0271                     }
0272                     else if ( my_predecessors.try_reserve( *(current->my_arg) ) ) {
0273                         reserved = true;
0274                         __TBB_store_with_release(current->status, SUCCEEDED);
0275                     } else {
0276                         if ( my_predecessors.empty() ) {
0277                             my_join->increment_port_count();
0278                         }
0279                         __TBB_store_with_release(current->status, FAILED);
0280                     }
0281                     break;
0282                 case rel_res:
0283                     reserved = false;
0284                     my_predecessors.try_release( );
0285                     __TBB_store_with_release(current->status, SUCCEEDED);
0286                     break;
0287                 case con_res:
0288                     reserved = false;
0289                     my_predecessors.try_consume( );
0290                     __TBB_store_with_release(current->status, SUCCEEDED);
0291                     break;
0292 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0293                 case add_blt_pred:
0294                     my_predecessors.internal_add_built_predecessor(*(current->my_pred));
0295                     __TBB_store_with_release(current->status, SUCCEEDED);
0296                     break;
0297                 case del_blt_pred:
0298                     my_predecessors.internal_delete_built_predecessor(*(current->my_pred));
0299                     __TBB_store_with_release(current->status, SUCCEEDED);
0300                     break;
0301                 case blt_pred_cnt:
0302                     current->cnt_val = my_predecessors.predecessor_count();
0303                     __TBB_store_with_release(current->status, SUCCEEDED);
0304                     break;
0305                 case blt_pred_cpy:
0306                     my_predecessors.copy_predecessors(*(current->plist));
0307                     __TBB_store_with_release(current->status, SUCCEEDED);
0308                     break;
0309 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
0310                 }
0311             }
0312         }
0313 
0314     protected:
0315         template< typename R, typename B > friend class run_and_put_task;
0316         template<typename X, typename Y> friend class internal::broadcast_cache;
0317         template<typename X, typename Y> friend class internal::round_robin_cache;
0318         task *try_put_task( const T & ) __TBB_override {
0319             return NULL;
0320         }
0321 
0322         graph& graph_reference() const __TBB_override {
0323             return my_join->graph_ref;
0324         }
0325 
0326     public:
0327 
0328         //! Constructor
0329         reserving_port() : reserved(false) {
0330             my_join = NULL;
0331             my_predecessors.set_owner( this );
0332             my_aggregator.initialize_handler(handler_type(this));
0333         }
0334 
0335         // copy constructor
0336         reserving_port(const reserving_port& /* other */) : receiver<T>() {
0337             reserved = false;
0338             my_join = NULL;
0339             my_predecessors.set_owner( this );
0340             my_aggregator.initialize_handler(handler_type(this));
0341         }
0342 
0343         void set_join_node_pointer(forwarding_base *join) {
0344             my_join = join;
0345         }
0346 
0347         //! Add a predecessor
0348         bool register_predecessor( predecessor_type &src ) __TBB_override {
0349             reserving_port_operation op_data(src, reg_pred);
0350             my_aggregator.execute(&op_data);
0351             return op_data.status == SUCCEEDED;
0352         }
0353 
0354         //! Remove a predecessor
0355         bool remove_predecessor( predecessor_type &src ) __TBB_override {
0356             reserving_port_operation op_data(src, rem_pred);
0357             my_aggregator.execute(&op_data);
0358             return op_data.status == SUCCEEDED;
0359         }
0360 
0361         //! Reserve an item from the port
0362         bool reserve( T &v ) {
0363             reserving_port_operation op_data(v, res_item);
0364             my_aggregator.execute(&op_data);
0365             return op_data.status == SUCCEEDED;
0366         }
0367 
0368         //! Release the port
0369         void release( ) {
0370             reserving_port_operation op_data(rel_res);
0371             my_aggregator.execute(&op_data);
0372         }
0373 
0374         //! Complete use of the port
0375         void consume( ) {
0376             reserving_port_operation op_data(con_res);
0377             my_aggregator.execute(&op_data);
0378         }
0379 
0380 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0381         built_predecessors_type &built_predecessors() __TBB_override { return my_predecessors.built_predecessors(); }
0382         void internal_add_built_predecessor(predecessor_type &src) __TBB_override {
0383             reserving_port_operation op_data(src, add_blt_pred);
0384             my_aggregator.execute(&op_data);
0385         }
0386 
0387         void internal_delete_built_predecessor(predecessor_type &src) __TBB_override {
0388             reserving_port_operation op_data(src, del_blt_pred);
0389             my_aggregator.execute(&op_data);
0390         }
0391 
0392         size_t predecessor_count() __TBB_override {
0393             reserving_port_operation op_data(blt_pred_cnt);
0394             my_aggregator.execute(&op_data);
0395             return op_data.cnt_val;
0396         }
0397 
0398         void copy_predecessors(predecessor_list_type &l) __TBB_override {
0399             reserving_port_operation op_data(blt_pred_cpy);
0400             op_data.plist = &l;
0401             my_aggregator.execute(&op_data);
0402         }
0403 
0404         void extract_receiver() {
0405             my_predecessors.built_predecessors().receiver_extract(*this);
0406         }
0407 
0408 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
0409 
0410         void reset_receiver( reset_flags f) __TBB_override {
0411             if(f & rf_clear_edges) my_predecessors.clear();
0412             else
0413             my_predecessors.reset();
0414             reserved = false;
0415             __TBB_ASSERT(!(f&rf_clear_edges) || my_predecessors.empty(), "port edges not removed");
0416         }
0417 
0418     private:
0419 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0420         friend class get_graph_helper;
0421 #endif
0422 
0423         forwarding_base *my_join;
0424         reservable_predecessor_cache< T, null_mutex > my_predecessors;
0425         bool reserved;
0426     };  // reserving_port
0427 
0428     //! queueing join_port
0429     template<typename T>
0430     class queueing_port : public receiver<T>, public item_buffer<T> {
0431     public:
0432         typedef T input_type;
0433         typedef typename receiver<input_type>::predecessor_type predecessor_type;
0434         typedef queueing_port<T> class_type;
0435 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0436         typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
0437         typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
0438 #endif
0439 
0440     // ----------- Aggregator ------------
0441     private:
0442         enum op_type { get__item, res_port, try__put_task
0443 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0444             , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
0445 #endif
0446         };
0447 
0448         class queueing_port_operation : public aggregated_operation<queueing_port_operation> {
0449         public:
0450             char type;
0451             T my_val;
0452             T *my_arg;
0453 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0454             predecessor_type *pred;
0455             size_t cnt_val;
0456             predecessor_list_type *plist;
0457 #endif
0458             task * bypass_t;
0459             // constructor for value parameter
0460             queueing_port_operation(const T& e, op_type t) :
0461                 type(char(t)), my_val(e)
0462                 , bypass_t(NULL)
0463             {}
0464             // constructor for pointer parameter
0465             queueing_port_operation(const T* p, op_type t) :
0466                 type(char(t)), my_arg(const_cast<T*>(p))
0467                 , bypass_t(NULL)
0468             {}
0469             // constructor with no parameter
0470             queueing_port_operation(op_type t) : type(char(t))
0471                 , bypass_t(NULL)
0472             {}
0473         };
0474 
0475         typedef internal::aggregating_functor<class_type, queueing_port_operation> handler_type;
0476         friend class internal::aggregating_functor<class_type, queueing_port_operation>;
0477         aggregator<handler_type, queueing_port_operation> my_aggregator;
0478 
0479         void handle_operations(queueing_port_operation* op_list) {
0480             queueing_port_operation *current;
0481             bool was_empty;
0482             while(op_list) {
0483                 current = op_list;
0484                 op_list = op_list->next;
0485                 switch(current->type) {
0486                 case try__put_task: {
0487                         task *rtask = NULL;
0488                         was_empty = this->buffer_empty();
0489                         this->push_back(current->my_val);
0490                         if (was_empty) rtask = my_join->decrement_port_count(false);
0491                         else
0492                             rtask = SUCCESSFULLY_ENQUEUED;
0493                         current->bypass_t = rtask;
0494                         __TBB_store_with_release(current->status, SUCCEEDED);
0495                     }
0496                     break;
0497                 case get__item:
0498                     if(!this->buffer_empty()) {
0499                         *(current->my_arg) = this->front();
0500                         __TBB_store_with_release(current->status, SUCCEEDED);
0501                     }
0502                     else {
0503                         __TBB_store_with_release(current->status, FAILED);
0504                     }
0505                     break;
0506                 case res_port:
0507                     __TBB_ASSERT(this->my_item_valid(this->my_head), "No item to reset");
0508                     this->destroy_front();
0509                     if(this->my_item_valid(this->my_head)) {
0510                         (void)my_join->decrement_port_count(true);
0511                     }
0512                     __TBB_store_with_release(current->status, SUCCEEDED);
0513                     break;
0514 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0515                 case add_blt_pred:
0516                     my_built_predecessors.add_edge(*(current->pred));
0517                     __TBB_store_with_release(current->status, SUCCEEDED);
0518                     break;
0519                 case del_blt_pred:
0520                     my_built_predecessors.delete_edge(*(current->pred));
0521                     __TBB_store_with_release(current->status, SUCCEEDED);
0522                     break;
0523                 case blt_pred_cnt:
0524                     current->cnt_val = my_built_predecessors.edge_count();
0525                     __TBB_store_with_release(current->status, SUCCEEDED);
0526                     break;
0527                 case blt_pred_cpy:
0528                     my_built_predecessors.copy_edges(*(current->plist));
0529                     __TBB_store_with_release(current->status, SUCCEEDED);
0530                     break;
0531 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
0532                 }
0533             }
0534         }
0535     // ------------ End Aggregator ---------------
0536 
0537     protected:
0538         template< typename R, typename B > friend class run_and_put_task;
0539         template<typename X, typename Y> friend class internal::broadcast_cache;
0540         template<typename X, typename Y> friend class internal::round_robin_cache;
0541         task *try_put_task(const T &v) __TBB_override {
0542             queueing_port_operation op_data(v, try__put_task);
0543             my_aggregator.execute(&op_data);
0544             __TBB_ASSERT(op_data.status == SUCCEEDED || !op_data.bypass_t, "inconsistent return from aggregator");
0545             if(!op_data.bypass_t) return SUCCESSFULLY_ENQUEUED;
0546             return op_data.bypass_t;
0547         }
0548 
0549         graph& graph_reference() const __TBB_override {
0550             return my_join->graph_ref;
0551         }
0552 
0553     public:
0554 
0555         //! Constructor
0556         queueing_port() : item_buffer<T>() {
0557             my_join = NULL;
0558             my_aggregator.initialize_handler(handler_type(this));
0559         }
0560 
0561         //! copy constructor
0562         queueing_port(const queueing_port& /* other */) : receiver<T>(), item_buffer<T>() {
0563             my_join = NULL;
0564             my_aggregator.initialize_handler(handler_type(this));
0565         }
0566 
0567         //! record parent for tallying available items
0568         void set_join_node_pointer(forwarding_base *join) {
0569             my_join = join;
0570         }
0571 
0572         bool get_item( T &v ) {
0573             queueing_port_operation op_data(&v, get__item);
0574             my_aggregator.execute(&op_data);
0575             return op_data.status == SUCCEEDED;
0576         }
0577 
0578         // reset_port is called when item is accepted by successor, but
0579         // is initiated by join_node.
0580         void reset_port() {
0581             queueing_port_operation op_data(res_port);
0582             my_aggregator.execute(&op_data);
0583             return;
0584         }
0585 
0586 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0587         built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
0588 
0589         void internal_add_built_predecessor(predecessor_type &p) __TBB_override {
0590             queueing_port_operation op_data(add_blt_pred);
0591             op_data.pred = &p;
0592             my_aggregator.execute(&op_data);
0593         }
0594 
0595         void internal_delete_built_predecessor(predecessor_type &p) __TBB_override {
0596             queueing_port_operation op_data(del_blt_pred);
0597             op_data.pred = &p;
0598             my_aggregator.execute(&op_data);
0599         }
0600 
0601         size_t predecessor_count() __TBB_override {
0602             queueing_port_operation op_data(blt_pred_cnt);
0603             my_aggregator.execute(&op_data);
0604             return op_data.cnt_val;
0605         }
0606 
0607         void copy_predecessors(predecessor_list_type &l) __TBB_override {
0608             queueing_port_operation op_data(blt_pred_cpy);
0609             op_data.plist = &l;
0610             my_aggregator.execute(&op_data);
0611         }
0612 
0613         void extract_receiver() {
0614             item_buffer<T>::reset();
0615             my_built_predecessors.receiver_extract(*this);
0616         }
0617 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
0618 
0619         void reset_receiver(reset_flags f) __TBB_override {
0620             tbb::internal::suppress_unused_warning(f);
0621             item_buffer<T>::reset();
0622 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0623             if (f & rf_clear_edges)
0624                 my_built_predecessors.clear();
0625 #endif
0626         }
0627 
0628     private:
0629 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0630         friend class get_graph_helper;
0631 #endif
0632 
0633         forwarding_base *my_join;
0634 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0635         edge_container<predecessor_type> my_built_predecessors;
0636 #endif
0637     };  // queueing_port
0638 
0639 #include "_flow_graph_tagged_buffer_impl.h"
0640 
0641     template<typename K>
0642     struct count_element {
0643         K my_key;
0644         size_t my_value;
0645     };
0646 
0647     // method to access the key in the counting table
0648     // the ref has already been removed from K
0649     template< typename K >
0650     struct key_to_count_functor {
0651         typedef count_element<K> table_item_type;
0652         const K& operator()(const table_item_type& v) { return v.my_key; }
0653     };
0654 
0655     // the ports can have only one template parameter.  We wrap the types needed in
0656     // a traits type
0657     template< class TraitsType >
0658     class key_matching_port :
0659         public receiver<typename TraitsType::T>,
0660         public hash_buffer< typename TraitsType::K, typename TraitsType::T, typename TraitsType::TtoK,
0661                 typename TraitsType::KHash > {
0662     public:
0663         typedef TraitsType traits;
0664         typedef key_matching_port<traits> class_type;
0665         typedef typename TraitsType::T input_type;
0666         typedef typename TraitsType::K key_type;
0667         typedef typename tbb::internal::strip<key_type>::type noref_key_type;
0668         typedef typename receiver<input_type>::predecessor_type predecessor_type;
0669         typedef typename TraitsType::TtoK type_to_key_func_type;
0670         typedef typename TraitsType::KHash hash_compare_type;
0671         typedef hash_buffer< key_type, input_type, type_to_key_func_type, hash_compare_type > buffer_type;
0672 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0673         typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
0674         typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
0675 #endif
0676     private:
0677 // ----------- Aggregator ------------
0678     private:
0679         enum op_type { try__put, get__item, res_port
0680 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0681            , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
0682 #endif
0683         };
0684 
0685         class key_matching_port_operation : public aggregated_operation<key_matching_port_operation> {
0686         public:
0687             char type;
0688             input_type my_val;
0689             input_type *my_arg;
0690 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0691             predecessor_type *pred;
0692             size_t cnt_val;
0693             predecessor_list_type *plist;
0694 #endif
0695             // constructor for value parameter
0696             key_matching_port_operation(const input_type& e, op_type t) :
0697                 type(char(t)), my_val(e) {}
0698             // constructor for pointer parameter
0699             key_matching_port_operation(const input_type* p, op_type t) :
0700                 type(char(t)), my_arg(const_cast<input_type*>(p)) {}
0701             // constructor with no parameter
0702             key_matching_port_operation(op_type t) : type(char(t)) {}
0703         };
0704 
0705         typedef internal::aggregating_functor<class_type, key_matching_port_operation> handler_type;
0706         friend class internal::aggregating_functor<class_type, key_matching_port_operation>;
0707         aggregator<handler_type, key_matching_port_operation> my_aggregator;
0708 
0709         void handle_operations(key_matching_port_operation* op_list) {
0710             key_matching_port_operation *current;
0711             while(op_list) {
0712                 current = op_list;
0713                 op_list = op_list->next;
0714                 switch(current->type) {
0715                 case try__put: {
0716                         bool was_inserted = this->insert_with_key(current->my_val);
0717                         // return failure if a duplicate insertion occurs
0718                         __TBB_store_with_release(current->status, was_inserted ? SUCCEEDED : FAILED);
0719                     }
0720                     break;
0721                 case get__item:
0722                     // use current_key from FE for item
0723                     if(!this->find_with_key(my_join->current_key, *(current->my_arg))) {
0724                         __TBB_ASSERT(false, "Failed to find item corresponding to current_key.");
0725                     }
0726                     __TBB_store_with_release(current->status, SUCCEEDED);
0727                     break;
0728                 case res_port:
0729                     // use current_key from FE for item
0730                     this->delete_with_key(my_join->current_key);
0731                     __TBB_store_with_release(current->status, SUCCEEDED);
0732                     break;
0733 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0734                 case add_blt_pred:
0735                     my_built_predecessors.add_edge(*(current->pred));
0736                     __TBB_store_with_release(current->status, SUCCEEDED);
0737                     break;
0738                 case del_blt_pred:
0739                     my_built_predecessors.delete_edge(*(current->pred));
0740                     __TBB_store_with_release(current->status, SUCCEEDED);
0741                     break;
0742                 case blt_pred_cnt:
0743                     current->cnt_val = my_built_predecessors.edge_count();
0744                     __TBB_store_with_release(current->status, SUCCEEDED);
0745                     break;
0746                 case blt_pred_cpy:
0747                     my_built_predecessors.copy_edges(*(current->plist));
0748                     __TBB_store_with_release(current->status, SUCCEEDED);
0749                     break;
0750 #endif
0751                 }
0752             }
0753         }
0754 // ------------ End Aggregator ---------------
0755     protected:
0756         template< typename R, typename B > friend class run_and_put_task;
0757         template<typename X, typename Y> friend class internal::broadcast_cache;
0758         template<typename X, typename Y> friend class internal::round_robin_cache;
0759         task *try_put_task(const input_type& v) __TBB_override {
0760             key_matching_port_operation op_data(v, try__put);
0761             task *rtask = NULL;
0762             my_aggregator.execute(&op_data);
0763             if(op_data.status == SUCCEEDED) {
0764                 rtask = my_join->increment_key_count((*(this->get_key_func()))(v), false);  // may spawn
0765                 // rtask has to reflect the return status of the try_put
0766                 if(!rtask) rtask = SUCCESSFULLY_ENQUEUED;
0767             }
0768             return rtask;
0769         }
0770 
0771         graph& graph_reference() const __TBB_override {
0772             return my_join->graph_ref;
0773         }
0774 
0775     public:
0776 
0777         key_matching_port() : receiver<input_type>(), buffer_type() {
0778             my_join = NULL;
0779             my_aggregator.initialize_handler(handler_type(this));
0780         }
0781 
0782         // copy constructor
0783         key_matching_port(const key_matching_port& /*other*/) : receiver<input_type>(), buffer_type() {
0784             my_join = NULL;
0785             my_aggregator.initialize_handler(handler_type(this));
0786         }
0787 
0788         ~key_matching_port() { }
0789 
0790         void set_join_node_pointer(forwarding_base *join) {
0791             my_join = dynamic_cast<matching_forwarding_base<key_type>*>(join);
0792         }
0793 
0794         void set_my_key_func(type_to_key_func_type *f) { this->set_key_func(f); }
0795 
0796         type_to_key_func_type* get_my_key_func() { return this->get_key_func(); }
0797 
0798         bool get_item( input_type &v ) {
0799             // aggregator uses current_key from FE for Key
0800             key_matching_port_operation op_data(&v, get__item);
0801             my_aggregator.execute(&op_data);
0802             return op_data.status == SUCCEEDED;
0803         }
0804 
0805 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0806         built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
0807 
0808         void internal_add_built_predecessor(predecessor_type &p) __TBB_override {
0809             key_matching_port_operation op_data(add_blt_pred);
0810             op_data.pred = &p;
0811             my_aggregator.execute(&op_data);
0812         }
0813 
0814         void internal_delete_built_predecessor(predecessor_type &p) __TBB_override {
0815             key_matching_port_operation op_data(del_blt_pred);
0816             op_data.pred = &p;
0817             my_aggregator.execute(&op_data);
0818         }
0819 
0820         size_t predecessor_count() __TBB_override {
0821             key_matching_port_operation op_data(blt_pred_cnt);
0822             my_aggregator.execute(&op_data);
0823             return op_data.cnt_val;
0824         }
0825 
0826         void copy_predecessors(predecessor_list_type &l) __TBB_override {
0827             key_matching_port_operation op_data(blt_pred_cpy);
0828             op_data.plist = &l;
0829             my_aggregator.execute(&op_data);
0830         }
0831 #endif
0832 
0833         // reset_port is called when item is accepted by successor, but
0834         // is initiated by join_node.
0835         void reset_port() {
0836             key_matching_port_operation op_data(res_port);
0837             my_aggregator.execute(&op_data);
0838             return;
0839         }
0840 
0841 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0842         void extract_receiver() {
0843             buffer_type::reset();
0844             my_built_predecessors.receiver_extract(*this);
0845         }
0846 #endif
0847         void reset_receiver(reset_flags f ) __TBB_override {
0848             tbb::internal::suppress_unused_warning(f);
0849             buffer_type::reset();
0850 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0851            if (f & rf_clear_edges)
0852               my_built_predecessors.clear();
0853 #endif
0854         }
0855 
0856     private:
0857         // my_join forwarding base used to count number of inputs that
0858         // received key.
0859         matching_forwarding_base<key_type> *my_join;
0860 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0861         edge_container<predecessor_type> my_built_predecessors;
0862 #endif
0863     };  // key_matching_port
0864 
0865     using namespace graph_policy_namespace;
0866 
0867     template<typename JP, typename InputTuple, typename OutputTuple>
0868     class join_node_base;
0869 
0870     //! join_node_FE : implements input port policy
0871     template<typename JP, typename InputTuple, typename OutputTuple>
0872     class join_node_FE;
0873 
0874     template<typename InputTuple, typename OutputTuple>
0875     class join_node_FE<reserving, InputTuple, OutputTuple> : public forwarding_base {
0876     public:
0877         static const int N = tbb::flow::tuple_size<OutputTuple>::value;
0878         typedef OutputTuple output_type;
0879         typedef InputTuple input_type;
0880         typedef join_node_base<reserving, InputTuple, OutputTuple> base_node_type; // for forwarding
0881 
0882         join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) {
0883             ports_with_no_inputs = N;
0884             join_helper<N>::set_join_node_pointer(my_inputs, this);
0885         }
0886 
0887         join_node_FE(const join_node_FE& other) : forwarding_base((other.forwarding_base::graph_ref)), my_node(NULL) {
0888             ports_with_no_inputs = N;
0889             join_helper<N>::set_join_node_pointer(my_inputs, this);
0890         }
0891 
0892         void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
0893 
0894        void increment_port_count() __TBB_override {
0895             ++ports_with_no_inputs;
0896         }
0897 
0898         // if all input_ports have predecessors, spawn forward to try and consume tuples
0899         task * decrement_port_count(bool handle_task) __TBB_override {
0900             if(ports_with_no_inputs.fetch_and_decrement() == 1) {
0901                 if(internal::is_graph_active(this->graph_ref)) {
0902                     task *rtask = new ( task::allocate_additional_child_of( *(this->graph_ref.root_task()) ) )
0903                         forward_task_bypass<base_node_type>(*my_node);
0904                     if(!handle_task) return rtask;
0905                     internal::spawn_in_graph_arena(this->graph_ref, *rtask);
0906                 }
0907             }
0908             return NULL;
0909         }
0910 
0911         input_type &input_ports() { return my_inputs; }
0912 
0913     protected:
0914 
0915         void reset(  reset_flags f) {
0916             // called outside of parallel contexts
0917             ports_with_no_inputs = N;
0918             join_helper<N>::reset_inputs(my_inputs, f);
0919         }
0920 
0921 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0922         void extract( ) {
0923             // called outside of parallel contexts
0924             ports_with_no_inputs = N;
0925             join_helper<N>::extract_inputs(my_inputs);
0926         }
0927 #endif
0928 
0929         // all methods on input ports should be called under mutual exclusion from join_node_base.
0930 
0931         bool tuple_build_may_succeed() {
0932             return !ports_with_no_inputs;
0933         }
0934 
0935         bool try_to_make_tuple(output_type &out) {
0936             if(ports_with_no_inputs) return false;
0937             return join_helper<N>::reserve(my_inputs, out);
0938         }
0939 
0940         void tuple_accepted() {
0941             join_helper<N>::consume_reservations(my_inputs);
0942         }
0943         void tuple_rejected() {
0944             join_helper<N>::release_reservations(my_inputs);
0945         }
0946 
0947         input_type my_inputs;
0948         base_node_type *my_node;
0949         atomic<size_t> ports_with_no_inputs;
0950     };  // join_node_FE<reserving, ... >
0951 
0952     template<typename InputTuple, typename OutputTuple>
0953     class join_node_FE<queueing, InputTuple, OutputTuple> : public forwarding_base {
0954     public:
0955         static const int N = tbb::flow::tuple_size<OutputTuple>::value;
0956         typedef OutputTuple output_type;
0957         typedef InputTuple input_type;
0958         typedef join_node_base<queueing, InputTuple, OutputTuple> base_node_type; // for forwarding
0959 
0960         join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) {
0961             ports_with_no_items = N;
0962             join_helper<N>::set_join_node_pointer(my_inputs, this);
0963         }
0964 
0965         join_node_FE(const join_node_FE& other) : forwarding_base((other.forwarding_base::graph_ref)), my_node(NULL) {
0966             ports_with_no_items = N;
0967             join_helper<N>::set_join_node_pointer(my_inputs, this);
0968         }
0969 
0970         // needed for forwarding
0971         void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
0972 
0973         void reset_port_count() {
0974             ports_with_no_items = N;
0975         }
0976 
0977         // if all input_ports have items, spawn forward to try and consume tuples
0978         task * decrement_port_count(bool handle_task) __TBB_override
0979         {
0980             if(ports_with_no_items.fetch_and_decrement() == 1) {
0981                 if(internal::is_graph_active(this->graph_ref)) {
0982                     task *rtask = new ( task::allocate_additional_child_of( *(this->graph_ref.root_task()) ) )
0983                         forward_task_bypass <base_node_type>(*my_node);
0984                     if(!handle_task) return rtask;
0985                     internal::spawn_in_graph_arena(this->graph_ref, *rtask);
0986                 }
0987             }
0988             return NULL;
0989         }
0990 
0991         void increment_port_count() __TBB_override { __TBB_ASSERT(false, NULL); }  // should never be called
0992 
0993         input_type &input_ports() { return my_inputs; }
0994 
0995     protected:
0996 
0997         void reset(  reset_flags f) {
0998             reset_port_count();
0999             join_helper<N>::reset_inputs(my_inputs, f );
1000         }
1001 
1002 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1003         void extract() {
1004             reset_port_count();
1005             join_helper<N>::extract_inputs(my_inputs);
1006         }
1007 #endif
1008         // all methods on input ports should be called under mutual exclusion from join_node_base.
1009 
1010         bool tuple_build_may_succeed() {
1011             return !ports_with_no_items;
1012         }
1013 
1014         bool try_to_make_tuple(output_type &out) {
1015             if(ports_with_no_items) return false;
1016             return join_helper<N>::get_items(my_inputs, out);
1017         }
1018 
1019         void tuple_accepted() {
1020             reset_port_count();
1021             join_helper<N>::reset_ports(my_inputs);
1022         }
1023         void tuple_rejected() {
1024             // nothing to do.
1025         }
1026 
1027         input_type my_inputs;
1028         base_node_type *my_node;
1029         atomic<size_t> ports_with_no_items;
1030     };  // join_node_FE<queueing, ...>
1031 
1032     // key_matching join front-end.
1033     template<typename InputTuple, typename OutputTuple, typename K, typename KHash>
1034     class join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple> : public matching_forwarding_base<K>,
1035              // buffer of key value counts
1036               public hash_buffer<   // typedefed below to key_to_count_buffer_type
1037                   typename tbb::internal::strip<K>::type&,        // force ref type on K
1038                   count_element<typename tbb::internal::strip<K>::type>,
1039                   internal::type_to_key_function_body<
1040                       count_element<typename tbb::internal::strip<K>::type>,
1041                       typename tbb::internal::strip<K>::type& >,
1042                   KHash >,
1043              // buffer of output items
1044              public item_buffer<OutputTuple> {
1045     public:
1046         static const int N = tbb::flow::tuple_size<OutputTuple>::value;
1047         typedef OutputTuple output_type;
1048         typedef InputTuple input_type;
1049         typedef K key_type;
1050         typedef typename tbb::internal::strip<key_type>::type unref_key_type;
1051         typedef KHash key_hash_compare;
1052         // must use K without ref.
1053         typedef count_element<unref_key_type> count_element_type;
1054         // method that lets us refer to the key of this type.
1055         typedef key_to_count_functor<unref_key_type> key_to_count_func;
1056         typedef internal::type_to_key_function_body< count_element_type, unref_key_type&> TtoK_function_body_type;
1057         typedef internal::type_to_key_function_body_leaf<count_element_type, unref_key_type&, key_to_count_func> TtoK_function_body_leaf_type;
1058         // this is the type of the special table that keeps track of the number of discrete
1059         // elements corresponding to each key that we've seen.
1060         typedef hash_buffer< unref_key_type&, count_element_type, TtoK_function_body_type, key_hash_compare >
1061                  key_to_count_buffer_type;
1062         typedef item_buffer<output_type> output_buffer_type;
1063         typedef join_node_base<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> base_node_type; // for forwarding
1064         typedef matching_forwarding_base<key_type> forwarding_base_type;
1065 
1066 // ----------- Aggregator ------------
1067         // the aggregator is only needed to serialize the access to the hash table.
1068         // and the output_buffer_type base class
1069     private:
1070         enum op_type { res_count, inc_count, may_succeed, try_make };
1071         typedef join_node_FE<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> class_type;
1072 
1073         class key_matching_FE_operation : public aggregated_operation<key_matching_FE_operation> {
1074         public:
1075             char type;
1076             unref_key_type my_val;
1077             output_type* my_output;
1078             task *bypass_t;
1079             bool enqueue_task;
1080             // constructor for value parameter
1081             key_matching_FE_operation(const unref_key_type& e , bool q_task , op_type t) : type(char(t)), my_val(e),
1082                  my_output(NULL), bypass_t(NULL), enqueue_task(q_task) {}
1083             key_matching_FE_operation(output_type *p, op_type t) : type(char(t)), my_output(p), bypass_t(NULL),
1084                  enqueue_task(true) {}
1085             // constructor with no parameter
1086             key_matching_FE_operation(op_type t) : type(char(t)), my_output(NULL), bypass_t(NULL), enqueue_task(true) {}
1087         };
1088 
1089         typedef internal::aggregating_functor<class_type, key_matching_FE_operation> handler_type;
1090         friend class internal::aggregating_functor<class_type, key_matching_FE_operation>;
1091         aggregator<handler_type, key_matching_FE_operation> my_aggregator;
1092 
1093         // called from aggregator, so serialized
1094         // returns a task pointer if the a task would have been enqueued but we asked that
1095         // it be returned.  Otherwise returns NULL.
1096         task * fill_output_buffer(unref_key_type &t, bool should_enqueue, bool handle_task) {
1097             output_type l_out;
1098             task *rtask = NULL;
1099             bool do_fwd = should_enqueue && this->buffer_empty() && internal::is_graph_active(this->graph_ref);
1100             this->current_key = t;
1101             this->delete_with_key(this->current_key);   // remove the key
1102             if(join_helper<N>::get_items(my_inputs, l_out)) {  //  <== call back
1103                 this->push_back(l_out);
1104                 if(do_fwd) {  // we enqueue if receiving an item from predecessor, not if successor asks for item
1105                     rtask = new ( task::allocate_additional_child_of( *(this->graph_ref.root_task()) ) )
1106                         forward_task_bypass<base_node_type>(*my_node);
1107                     if(handle_task) {
1108                         internal::spawn_in_graph_arena(this->graph_ref, *rtask);
1109                         rtask = NULL;
1110                     }
1111                     do_fwd = false;
1112                 }
1113                 // retire the input values
1114                 join_helper<N>::reset_ports(my_inputs);  //  <== call back
1115             }
1116             else {
1117                 __TBB_ASSERT(false, "should have had something to push");
1118             }
1119             return rtask;
1120         }
1121 
1122         void handle_operations(key_matching_FE_operation* op_list) {
1123             key_matching_FE_operation *current;
1124             while(op_list) {
1125                 current = op_list;
1126                 op_list = op_list->next;
1127                 switch(current->type) {
1128                 case res_count:  // called from BE
1129                     {
1130                         this->destroy_front();
1131                         __TBB_store_with_release(current->status, SUCCEEDED);
1132                     }
1133                     break;
1134                 case inc_count: {  // called from input ports
1135                         count_element_type *p = 0;
1136                         unref_key_type &t = current->my_val;
1137                         bool do_enqueue = current->enqueue_task;
1138                         if(!(this->find_ref_with_key(t,p))) {
1139                             count_element_type ev;
1140                             ev.my_key = t;
1141                             ev.my_value = 0;
1142                             this->insert_with_key(ev);
1143                             if(!(this->find_ref_with_key(t,p))) {
1144                                 __TBB_ASSERT(false, "should find key after inserting it");
1145                             }
1146                         }
1147                         if(++(p->my_value) == size_t(N)) {
1148                             task *rtask = fill_output_buffer(t, true, do_enqueue);
1149                             __TBB_ASSERT(!rtask || !do_enqueue, "task should not be returned");
1150                             current->bypass_t = rtask;
1151                         }
1152                     }
1153                     __TBB_store_with_release(current->status, SUCCEEDED);
1154                     break;
1155                 case may_succeed:  // called from BE
1156                     __TBB_store_with_release(current->status, this->buffer_empty() ? FAILED : SUCCEEDED);
1157                     break;
1158                 case try_make:  // called from BE
1159                     if(this->buffer_empty()) {
1160                         __TBB_store_with_release(current->status, FAILED);
1161                     }
1162                     else {
1163                         *(current->my_output) = this->front();
1164                         __TBB_store_with_release(current->status, SUCCEEDED);
1165                     }
1166                     break;
1167                 }
1168             }
1169         }
1170 // ------------ End Aggregator ---------------
1171 
1172     public:
1173         template<typename FunctionTuple>
1174         join_node_FE(graph &g, FunctionTuple &TtoK_funcs) : forwarding_base_type(g), my_node(NULL) {
1175             join_helper<N>::set_join_node_pointer(my_inputs, this);
1176             join_helper<N>::set_key_functors(my_inputs, TtoK_funcs);
1177             my_aggregator.initialize_handler(handler_type(this));
1178                     TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func());
1179             this->set_key_func(cfb);
1180         }
1181 
1182         join_node_FE(const join_node_FE& other) : forwarding_base_type((other.forwarding_base_type::graph_ref)), key_to_count_buffer_type(),
1183         output_buffer_type() {
1184             my_node = NULL;
1185             join_helper<N>::set_join_node_pointer(my_inputs, this);
1186             join_helper<N>::copy_key_functors(my_inputs, const_cast<input_type &>(other.my_inputs));
1187             my_aggregator.initialize_handler(handler_type(this));
1188             TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func());
1189             this->set_key_func(cfb);
1190         }
1191 
1192         // needed for forwarding
1193         void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
1194 
1195         void reset_port_count() {  // called from BE
1196             key_matching_FE_operation op_data(res_count);
1197             my_aggregator.execute(&op_data);
1198             return;
1199         }
1200 
1201         // if all input_ports have items, spawn forward to try and consume tuples
1202         // return a task if we are asked and did create one.
1203         task *increment_key_count(unref_key_type const & t, bool handle_task) __TBB_override {  // called from input_ports
1204             key_matching_FE_operation op_data(t, handle_task, inc_count);
1205             my_aggregator.execute(&op_data);
1206             return op_data.bypass_t;
1207         }
1208 
1209         task *decrement_port_count(bool /*handle_task*/) __TBB_override { __TBB_ASSERT(false, NULL); return NULL; }
1210 
1211         void increment_port_count() __TBB_override { __TBB_ASSERT(false, NULL); }  // should never be called
1212 
1213         input_type &input_ports() { return my_inputs; }
1214 
1215     protected:
1216 
1217         void reset(  reset_flags f ) {
1218             // called outside of parallel contexts
1219             join_helper<N>::reset_inputs(my_inputs, f);
1220 
1221             key_to_count_buffer_type::reset();
1222             output_buffer_type::reset();
1223         }
1224 
1225 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1226         void extract() {
1227             // called outside of parallel contexts
1228             join_helper<N>::extract_inputs(my_inputs);
1229             key_to_count_buffer_type::reset();  // have to reset the tag counts
1230             output_buffer_type::reset();  // also the queue of outputs
1231             // my_node->current_tag = NO_TAG;
1232         }
1233 #endif
1234         // all methods on input ports should be called under mutual exclusion from join_node_base.
1235 
1236         bool tuple_build_may_succeed() {  // called from back-end
1237             key_matching_FE_operation op_data(may_succeed);
1238             my_aggregator.execute(&op_data);
1239             return op_data.status == SUCCEEDED;
1240         }
1241 
1242         // cannot lock while calling back to input_ports.  current_key will only be set
1243         // and reset under the aggregator, so it will remain consistent.
1244         bool try_to_make_tuple(output_type &out) {
1245             key_matching_FE_operation op_data(&out,try_make);
1246             my_aggregator.execute(&op_data);
1247             return op_data.status == SUCCEEDED;
1248         }
1249 
1250         void tuple_accepted() {
1251             reset_port_count();  // reset current_key after ports reset.
1252         }
1253 
1254         void tuple_rejected() {
1255             // nothing to do.
1256         }
1257 
1258         input_type my_inputs;  // input ports
1259         base_node_type *my_node;
1260     }; // join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple>
1261 
1262     //! join_node_base
1263     template<typename JP, typename InputTuple, typename OutputTuple>
1264     class join_node_base : public graph_node, public join_node_FE<JP, InputTuple, OutputTuple>,
1265                            public sender<OutputTuple> {
1266     protected:
1267         using graph_node::my_graph;
1268     public:
1269         typedef OutputTuple output_type;
1270 
1271         typedef typename sender<output_type>::successor_type successor_type;
1272         typedef join_node_FE<JP, InputTuple, OutputTuple> input_ports_type;
1273         using input_ports_type::tuple_build_may_succeed;
1274         using input_ports_type::try_to_make_tuple;
1275         using input_ports_type::tuple_accepted;
1276         using input_ports_type::tuple_rejected;
1277 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1278         typedef typename sender<output_type>::built_successors_type built_successors_type;
1279         typedef typename sender<output_type>::successor_list_type successor_list_type;
1280 #endif
1281 
1282     private:
1283         // ----------- Aggregator ------------
1284         enum op_type { reg_succ, rem_succ, try__get, do_fwrd, do_fwrd_bypass
1285 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1286             , add_blt_succ, del_blt_succ, blt_succ_cnt, blt_succ_cpy
1287 #endif
1288         };
1289         typedef join_node_base<JP,InputTuple,OutputTuple> class_type;
1290 
1291         class join_node_base_operation : public aggregated_operation<join_node_base_operation> {
1292         public:
1293             char type;
1294             union {
1295                 output_type *my_arg;
1296                 successor_type *my_succ;
1297 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1298                 size_t cnt_val;
1299                 successor_list_type *slist;
1300 #endif
1301             };
1302             task *bypass_t;
1303             join_node_base_operation(const output_type& e, op_type t) : type(char(t)),
1304                 my_arg(const_cast<output_type*>(&e)), bypass_t(NULL) {}
1305             join_node_base_operation(const successor_type &s, op_type t) : type(char(t)),
1306                 my_succ(const_cast<successor_type *>(&s)), bypass_t(NULL) {}
1307             join_node_base_operation(op_type t) : type(char(t)), bypass_t(NULL) {}
1308         };
1309 
1310         typedef internal::aggregating_functor<class_type, join_node_base_operation> handler_type;
1311         friend class internal::aggregating_functor<class_type, join_node_base_operation>;
1312         bool forwarder_busy;
1313         aggregator<handler_type, join_node_base_operation> my_aggregator;
1314 
1315         void handle_operations(join_node_base_operation* op_list) {
1316             join_node_base_operation *current;
1317             while(op_list) {
1318                 current = op_list;
1319                 op_list = op_list->next;
1320                 switch(current->type) {
1321                 case reg_succ: {
1322                         my_successors.register_successor(*(current->my_succ));
1323                         if(tuple_build_may_succeed() && !forwarder_busy && internal::is_graph_active(my_graph)) {
1324                             task *rtask = new ( task::allocate_additional_child_of(*(my_graph.root_task())) )
1325                                     forward_task_bypass
1326                                     <join_node_base<JP,InputTuple,OutputTuple> >(*this);
1327                             internal::spawn_in_graph_arena(my_graph, *rtask);
1328                             forwarder_busy = true;
1329                         }
1330                         __TBB_store_with_release(current->status, SUCCEEDED);
1331                     }
1332                     break;
1333                 case rem_succ:
1334                     my_successors.remove_successor(*(current->my_succ));
1335                     __TBB_store_with_release(current->status, SUCCEEDED);
1336                     break;
1337                 case try__get:
1338                     if(tuple_build_may_succeed()) {
1339                         if(try_to_make_tuple(*(current->my_arg))) {
1340                             tuple_accepted();
1341                             __TBB_store_with_release(current->status, SUCCEEDED);
1342                         }
1343                         else __TBB_store_with_release(current->status, FAILED);
1344                     }
1345                     else __TBB_store_with_release(current->status, FAILED);
1346                     break;
1347                 case do_fwrd_bypass: {
1348                         bool build_succeeded;
1349                         task *last_task = NULL;
1350                         output_type out;
1351                         if(tuple_build_may_succeed()) {  // checks output queue of FE
1352                             do {
1353                                 build_succeeded = try_to_make_tuple(out);  // fetch front_end of queue
1354                                 if(build_succeeded) {
1355                                     task *new_task = my_successors.try_put_task(out);
1356                                     last_task = combine_tasks(my_graph, last_task, new_task);
1357                                     if(new_task) {
1358                                         tuple_accepted();
1359                                     }
1360                                     else {
1361                                         tuple_rejected();
1362                                         build_succeeded = false;
1363                                     }
1364                                 }
1365                             } while(build_succeeded);
1366                         }
1367                         current->bypass_t = last_task;
1368                         __TBB_store_with_release(current->status, SUCCEEDED);
1369                         forwarder_busy = false;
1370                     }
1371                     break;
1372 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1373                 case add_blt_succ:
1374                     my_successors.internal_add_built_successor(*(current->my_succ));
1375                     __TBB_store_with_release(current->status, SUCCEEDED);
1376                     break;
1377                 case del_blt_succ:
1378                     my_successors.internal_delete_built_successor(*(current->my_succ));
1379                     __TBB_store_with_release(current->status, SUCCEEDED);
1380                     break;
1381                 case blt_succ_cnt:
1382                     current->cnt_val = my_successors.successor_count();
1383                     __TBB_store_with_release(current->status, SUCCEEDED);
1384                     break;
1385                 case blt_succ_cpy:
1386                     my_successors.copy_successors(*(current->slist));
1387                     __TBB_store_with_release(current->status, SUCCEEDED);
1388                     break;
1389 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1390                 }
1391             }
1392         }
1393         // ---------- end aggregator -----------
1394     public:
1395         join_node_base(graph &g) : graph_node(g), input_ports_type(g), forwarder_busy(false) {
1396             my_successors.set_owner(this);
1397             input_ports_type::set_my_node(this);
1398             my_aggregator.initialize_handler(handler_type(this));
1399         }
1400 
1401         join_node_base(const join_node_base& other) :
1402             graph_node(other.graph_node::my_graph), input_ports_type(other),
1403             sender<OutputTuple>(), forwarder_busy(false), my_successors() {
1404             my_successors.set_owner(this);
1405             input_ports_type::set_my_node(this);
1406             my_aggregator.initialize_handler(handler_type(this));
1407         }
1408 
1409         template<typename FunctionTuple>
1410         join_node_base(graph &g, FunctionTuple f) : graph_node(g), input_ports_type(g, f), forwarder_busy(false) {
1411             my_successors.set_owner(this);
1412             input_ports_type::set_my_node(this);
1413             my_aggregator.initialize_handler(handler_type(this));
1414         }
1415 
1416         bool register_successor(successor_type &r) __TBB_override {
1417             join_node_base_operation op_data(r, reg_succ);
1418             my_aggregator.execute(&op_data);
1419             return op_data.status == SUCCEEDED;
1420         }
1421 
1422         bool remove_successor( successor_type &r) __TBB_override {
1423             join_node_base_operation op_data(r, rem_succ);
1424             my_aggregator.execute(&op_data);
1425             return op_data.status == SUCCEEDED;
1426         }
1427 
1428         bool try_get( output_type &v) __TBB_override {
1429             join_node_base_operation op_data(v, try__get);
1430             my_aggregator.execute(&op_data);
1431             return op_data.status == SUCCEEDED;
1432         }
1433 
1434 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1435         built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1436 
1437         void internal_add_built_successor( successor_type &r) __TBB_override {
1438             join_node_base_operation op_data(r, add_blt_succ);
1439             my_aggregator.execute(&op_data);
1440         }
1441 
1442         void internal_delete_built_successor( successor_type &r) __TBB_override {
1443             join_node_base_operation op_data(r, del_blt_succ);
1444             my_aggregator.execute(&op_data);
1445         }
1446 
1447         size_t successor_count() __TBB_override {
1448             join_node_base_operation op_data(blt_succ_cnt);
1449             my_aggregator.execute(&op_data);
1450             return op_data.cnt_val;
1451         }
1452 
1453         void copy_successors(successor_list_type &l) __TBB_override {
1454             join_node_base_operation op_data(blt_succ_cpy);
1455             op_data.slist = &l;
1456             my_aggregator.execute(&op_data);
1457         }
1458 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1459 
1460 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1461         void extract() __TBB_override {
1462             input_ports_type::extract();
1463             my_successors.built_successors().sender_extract(*this);
1464         }
1465 #endif
1466 
1467     protected:
1468 
1469         void reset_node(reset_flags f) __TBB_override {
1470             input_ports_type::reset(f);
1471             if(f & rf_clear_edges) my_successors.clear();
1472         }
1473 
1474     private:
1475         broadcast_cache<output_type, null_rw_mutex> my_successors;
1476 
1477         friend class forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> >;
1478         task *forward_task() {
1479             join_node_base_operation op_data(do_fwrd_bypass);
1480             my_aggregator.execute(&op_data);
1481             return op_data.bypass_t;
1482         }
1483 
1484     };  // join_node_base
1485 
1486     // join base class type generator
1487     template<int N, template<class> class PT, typename OutputTuple, typename JP>
1488     struct join_base {
1489         typedef typename internal::join_node_base<JP, typename wrap_tuple_elements<N,PT,OutputTuple>::type, OutputTuple> type;
1490     };
1491 
1492     template<int N, typename OutputTuple, typename K, typename KHash>
1493     struct join_base<N, key_matching_port, OutputTuple, key_matching<K,KHash> > {
1494         typedef key_matching<K, KHash> key_traits_type;
1495         typedef K key_type;
1496         typedef KHash key_hash_compare;
1497         typedef typename internal::join_node_base< key_traits_type,
1498                 // ports type
1499                 typename wrap_key_tuple_elements<N,key_matching_port,key_traits_type,OutputTuple>::type,
1500                 OutputTuple > type;
1501     };
1502 
1503     //! unfolded_join_node : passes input_ports_type to join_node_base.  We build the input port type
1504     //  using tuple_element.  The class PT is the port type (reserving_port, queueing_port, key_matching_port)
1505     //  and should match the typename.
1506 
1507     template<int N, template<class> class PT, typename OutputTuple, typename JP>
1508     class unfolded_join_node : public join_base<N,PT,OutputTuple,JP>::type {
1509     public:
1510         typedef typename wrap_tuple_elements<N, PT, OutputTuple>::type input_ports_type;
1511         typedef OutputTuple output_type;
1512     private:
1513         typedef join_node_base<JP, input_ports_type, output_type > base_type;
1514     public:
1515         unfolded_join_node(graph &g) : base_type(g) {}
1516         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1517     };
1518 
1519 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1520     template <typename K, typename T>
1521     struct key_from_message_body {
1522         K operator()(const T& t) const {
1523             using tbb::flow::key_from_message;
1524             return key_from_message<K>(t);
1525         }
1526     };
1527     // Adds const to reference type
1528     template <typename K, typename T>
1529     struct key_from_message_body<K&,T> {
1530         const K& operator()(const T& t) const {
1531             using tbb::flow::key_from_message;
1532             return key_from_message<const K&>(t);
1533         }
1534     };
1535 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1536     // key_matching unfolded_join_node.  This must be a separate specialization because the constructors
1537     // differ.
1538 
1539     template<typename OutputTuple, typename K, typename KHash>
1540     class unfolded_join_node<2,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1541             join_base<2,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1542         typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1543         typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1544     public:
1545         typedef typename wrap_key_tuple_elements<2,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1546         typedef OutputTuple output_type;
1547     private:
1548         typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1549         typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1550         typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1551         typedef typename tbb::flow::tuple< f0_p, f1_p > func_initializer_type;
1552     public:
1553 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1554         unfolded_join_node(graph &g) : base_type(g,
1555                 func_initializer_type(
1556                     new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1557                     new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>())
1558                     ) ) {
1559         }
1560 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1561         template<typename Body0, typename Body1>
1562         unfolded_join_node(graph &g, Body0 body0, Body1 body1) : base_type(g,
1563                 func_initializer_type(
1564                     new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1565                     new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1)
1566                     ) ) {
1567             __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 2, "wrong number of body initializers");
1568         }
1569         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1570     };
1571 
1572     template<typename OutputTuple, typename K, typename KHash>
1573     class unfolded_join_node<3,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1574             join_base<3,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1575         typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1576         typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1577         typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1578     public:
1579         typedef typename wrap_key_tuple_elements<3,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1580         typedef OutputTuple output_type;
1581     private:
1582         typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1583         typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1584         typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1585         typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1586         typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p > func_initializer_type;
1587     public:
1588 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1589         unfolded_join_node(graph &g) : base_type(g,
1590                 func_initializer_type(
1591                     new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1592                     new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1593                     new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>())
1594                     ) ) {
1595         }
1596 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1597         template<typename Body0, typename Body1, typename Body2>
1598         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2) : base_type(g,
1599                 func_initializer_type(
1600                     new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1601                     new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1602                     new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2)
1603                     ) ) {
1604             __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 3, "wrong number of body initializers");
1605         }
1606         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1607     };
1608 
1609     template<typename OutputTuple, typename K, typename KHash>
1610     class unfolded_join_node<4,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1611             join_base<4,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1612         typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1613         typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1614         typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1615         typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1616     public:
1617         typedef typename wrap_key_tuple_elements<4,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1618         typedef OutputTuple output_type;
1619     private:
1620         typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1621         typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1622         typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1623         typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1624         typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1625         typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p > func_initializer_type;
1626     public:
1627 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1628         unfolded_join_node(graph &g) : base_type(g,
1629                 func_initializer_type(
1630                     new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1631                     new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1632                     new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1633                     new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>())
1634                     ) ) {
1635         }
1636 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1637         template<typename Body0, typename Body1, typename Body2, typename Body3>
1638         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3) : base_type(g,
1639                 func_initializer_type(
1640                     new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1641                     new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1642                     new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1643                     new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3)
1644                     ) ) {
1645             __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 4, "wrong number of body initializers");
1646         }
1647         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1648     };
1649 
1650     template<typename OutputTuple, typename K, typename KHash>
1651     class unfolded_join_node<5,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1652             join_base<5,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1653         typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1654         typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1655         typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1656         typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1657         typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1658     public:
1659         typedef typename wrap_key_tuple_elements<5,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1660         typedef OutputTuple output_type;
1661     private:
1662         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1663         typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1664         typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1665         typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1666         typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1667         typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1668         typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p > func_initializer_type;
1669     public:
1670 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1671         unfolded_join_node(graph &g) : base_type(g,
1672                 func_initializer_type(
1673                     new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1674                     new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1675                     new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1676                     new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1677                     new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>())
1678                     ) ) {
1679         }
1680 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1681         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4>
1682         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4) : base_type(g,
1683                 func_initializer_type(
1684                     new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1685                     new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1686                     new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1687                     new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1688                     new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4)
1689                     ) ) {
1690             __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 5, "wrong number of body initializers");
1691         }
1692         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1693     };
1694 
1695 #if __TBB_VARIADIC_MAX >= 6
1696     template<typename OutputTuple, typename K, typename KHash>
1697     class unfolded_join_node<6,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1698             join_base<6,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1699         typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1700         typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1701         typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1702         typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1703         typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1704         typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1705     public:
1706         typedef typename wrap_key_tuple_elements<6,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1707         typedef OutputTuple output_type;
1708     private:
1709         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1710         typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1711         typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1712         typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1713         typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1714         typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1715         typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1716         typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p > func_initializer_type;
1717     public:
1718 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1719         unfolded_join_node(graph &g) : base_type(g,
1720                 func_initializer_type(
1721                     new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1722                     new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1723                     new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1724                     new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1725                     new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1726                     new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>())
1727                     ) ) {
1728         }
1729 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1730         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, typename Body5>
1731         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, Body5 body5)
1732                 : base_type(g, func_initializer_type(
1733                     new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1734                     new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1735                     new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1736                     new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1737                     new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1738                     new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5)
1739                     ) ) {
1740             __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 6, "wrong number of body initializers");
1741         }
1742         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1743     };
1744 #endif
1745 
1746 #if __TBB_VARIADIC_MAX >= 7
1747     template<typename OutputTuple, typename K, typename KHash>
1748     class unfolded_join_node<7,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1749             join_base<7,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1750         typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1751         typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1752         typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1753         typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1754         typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1755         typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1756         typedef typename tbb::flow::tuple_element<6, OutputTuple>::type T6;
1757     public:
1758         typedef typename wrap_key_tuple_elements<7,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1759         typedef OutputTuple output_type;
1760     private:
1761         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1762         typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1763         typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1764         typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1765         typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1766         typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1767         typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1768         typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1769         typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p > func_initializer_type;
1770     public:
1771 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1772         unfolded_join_node(graph &g) : base_type(g,
1773                 func_initializer_type(
1774                     new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1775                     new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1776                     new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1777                     new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1778                     new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1779                     new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1780                     new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>())
1781                     ) ) {
1782         }
1783 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1784         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1785                  typename Body5, typename Body6>
1786         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1787                 Body5 body5, Body6 body6) : base_type(g, func_initializer_type(
1788                     new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1789                     new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1790                     new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1791                     new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1792                     new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1793                     new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1794                     new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6)
1795                     ) ) {
1796             __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 7, "wrong number of body initializers");
1797         }
1798         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1799     };
1800 #endif
1801 
1802 #if __TBB_VARIADIC_MAX >= 8
1803     template<typename OutputTuple, typename K, typename KHash>
1804     class unfolded_join_node<8,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1805             join_base<8,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1806         typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1807         typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1808         typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1809         typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1810         typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1811         typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1812         typedef typename tbb::flow::tuple_element<6, OutputTuple>::type T6;
1813         typedef typename tbb::flow::tuple_element<7, OutputTuple>::type T7;
1814     public:
1815         typedef typename wrap_key_tuple_elements<8,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1816         typedef OutputTuple output_type;
1817     private:
1818         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1819         typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1820         typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1821         typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1822         typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1823         typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1824         typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1825         typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1826         typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1827         typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p > func_initializer_type;
1828     public:
1829 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1830         unfolded_join_node(graph &g) : base_type(g,
1831                 func_initializer_type(
1832                     new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1833                     new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1834                     new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1835                     new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1836                     new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1837                     new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1838                     new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1839                     new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>())
1840                     ) ) {
1841         }
1842 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1843         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1844                  typename Body5, typename Body6, typename Body7>
1845         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1846                 Body5 body5, Body6 body6, Body7 body7) : base_type(g, func_initializer_type(
1847                     new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1848                     new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1849                     new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1850                     new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1851                     new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1852                     new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1853                     new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6),
1854                     new internal::type_to_key_function_body_leaf<T7, K, Body7>(body7)
1855                     ) ) {
1856             __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 8, "wrong number of body initializers");
1857         }
1858         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1859     };
1860 #endif
1861 
1862 #if __TBB_VARIADIC_MAX >= 9
1863     template<typename OutputTuple, typename K, typename KHash>
1864     class unfolded_join_node<9,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1865             join_base<9,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1866         typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1867         typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1868         typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1869         typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1870         typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1871         typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1872         typedef typename tbb::flow::tuple_element<6, OutputTuple>::type T6;
1873         typedef typename tbb::flow::tuple_element<7, OutputTuple>::type T7;
1874         typedef typename tbb::flow::tuple_element<8, OutputTuple>::type T8;
1875     public:
1876         typedef typename wrap_key_tuple_elements<9,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1877         typedef OutputTuple output_type;
1878     private:
1879         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1880         typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1881         typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1882         typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1883         typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1884         typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1885         typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1886         typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1887         typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1888         typedef typename internal::type_to_key_function_body<T8, K> *f8_p;
1889         typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p > func_initializer_type;
1890     public:
1891 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1892         unfolded_join_node(graph &g) : base_type(g,
1893                 func_initializer_type(
1894                     new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1895                     new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1896                     new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1897                     new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1898                     new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1899                     new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1900                     new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1901                     new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1902                     new internal::type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>())
1903                     ) ) {
1904         }
1905 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1906         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1907                  typename Body5, typename Body6, typename Body7, typename Body8>
1908         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1909                 Body5 body5, Body6 body6, Body7 body7, Body8 body8) : base_type(g, func_initializer_type(
1910                     new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1911                     new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1912                     new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1913                     new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1914                     new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1915                     new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1916                     new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6),
1917                     new internal::type_to_key_function_body_leaf<T7, K, Body7>(body7),
1918                     new internal::type_to_key_function_body_leaf<T8, K, Body8>(body8)
1919                     ) ) {
1920             __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 9, "wrong number of body initializers");
1921         }
1922         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1923     };
1924 #endif
1925 
1926 #if __TBB_VARIADIC_MAX >= 10
1927     template<typename OutputTuple, typename K, typename KHash>
1928     class unfolded_join_node<10,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1929             join_base<10,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1930         typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1931         typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1932         typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1933         typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1934         typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1935         typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1936         typedef typename tbb::flow::tuple_element<6, OutputTuple>::type T6;
1937         typedef typename tbb::flow::tuple_element<7, OutputTuple>::type T7;
1938         typedef typename tbb::flow::tuple_element<8, OutputTuple>::type T8;
1939         typedef typename tbb::flow::tuple_element<9, OutputTuple>::type T9;
1940     public:
1941         typedef typename wrap_key_tuple_elements<10,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1942         typedef OutputTuple output_type;
1943     private:
1944         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1945         typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1946         typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1947         typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1948         typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1949         typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1950         typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1951         typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1952         typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1953         typedef typename internal::type_to_key_function_body<T8, K> *f8_p;
1954         typedef typename internal::type_to_key_function_body<T9, K> *f9_p;
1955         typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p, f9_p > func_initializer_type;
1956     public:
1957 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1958         unfolded_join_node(graph &g) : base_type(g,
1959                 func_initializer_type(
1960                     new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1961                     new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1962                     new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1963                     new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1964                     new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1965                     new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1966                     new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1967                     new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1968                     new internal::type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>()),
1969                     new internal::type_to_key_function_body_leaf<T9, K, key_from_message_body<K,T9> >(key_from_message_body<K,T9>())
1970                     ) ) {
1971         }
1972 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1973         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1974             typename Body5, typename Body6, typename Body7, typename Body8, typename Body9>
1975         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1976                 Body5 body5, Body6 body6, Body7 body7, Body8 body8, Body9 body9) : base_type(g, func_initializer_type(
1977                     new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1978                     new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1979                     new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1980                     new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1981                     new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1982                     new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1983                     new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6),
1984                     new internal::type_to_key_function_body_leaf<T7, K, Body7>(body7),
1985                     new internal::type_to_key_function_body_leaf<T8, K, Body8>(body8),
1986                     new internal::type_to_key_function_body_leaf<T9, K, Body9>(body9)
1987                     ) ) {
1988             __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 10, "wrong number of body initializers");
1989         }
1990         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1991     };
1992 #endif
1993 
1994     //! templated function to refer to input ports of the join node
1995     template<size_t N, typename JNT>
1996     typename tbb::flow::tuple_element<N, typename JNT::input_ports_type>::type &input_port(JNT &jn) {
1997         return tbb::flow::get<N>(jn.input_ports());
1998     }
1999 
2000 }
2001 #endif // __TBB__flow_graph_join_impl_H
2002