Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-07-30 08:46:17

0001 /*
0002     Copyright (c) 2005-2022 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB__flow_graph_join_impl_H
0018 #define __TBB__flow_graph_join_impl_H
0019 
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023 
0024 // included into namespace tbb::detail::d1
0025 
0026     struct forwarding_base : no_assign {
0027         forwarding_base(graph &g) : graph_ref(g) {}
0028         virtual ~forwarding_base() {}
0029         graph& graph_ref;
0030     };
0031 
0032     struct queueing_forwarding_base : forwarding_base {
0033         using forwarding_base::forwarding_base;
0034         // decrement_port_count may create a forwarding task.  If we cannot handle the task
0035         // ourselves, ask decrement_port_count to deal with it.
0036         virtual graph_task* decrement_port_count(bool handle_task) = 0;
0037     };
0038 
0039     struct reserving_forwarding_base : forwarding_base {
0040         using forwarding_base::forwarding_base;
0041         // decrement_port_count may create a forwarding task.  If we cannot handle the task
0042         // ourselves, ask decrement_port_count to deal with it.
0043         virtual graph_task* decrement_port_count() = 0;
0044         virtual void increment_port_count() = 0;
0045     };
0046 
0047     // specialization that lets us keep a copy of the current_key for building results.
0048     // KeyType can be a reference type.
0049     template<typename KeyType>
0050     struct matching_forwarding_base : public forwarding_base {
0051         typedef typename std::decay<KeyType>::type current_key_type;
0052         matching_forwarding_base(graph &g) : forwarding_base(g) { }
0053         virtual graph_task* increment_key_count(current_key_type const & /*t*/) = 0;
0054         current_key_type current_key; // so ports can refer to FE's desired items
0055     };
0056 
0057     template< int N >
0058     struct join_helper {
0059 
0060         template< typename TupleType, typename PortType >
0061         static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
0062             std::get<N-1>( my_input ).set_join_node_pointer(port);
0063             join_helper<N-1>::set_join_node_pointer( my_input, port );
0064         }
0065         template< typename TupleType >
0066         static inline void consume_reservations( TupleType &my_input ) {
0067             std::get<N-1>( my_input ).consume();
0068             join_helper<N-1>::consume_reservations( my_input );
0069         }
0070 
0071         template< typename TupleType >
0072         static inline void release_my_reservation( TupleType &my_input ) {
0073             std::get<N-1>( my_input ).release();
0074         }
0075 
0076         template <typename TupleType>
0077         static inline void release_reservations( TupleType &my_input) {
0078             join_helper<N-1>::release_reservations(my_input);
0079             release_my_reservation(my_input);
0080         }
0081 
0082         template< typename InputTuple, typename OutputTuple >
0083         static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
0084             if ( !std::get<N-1>( my_input ).reserve( std::get<N-1>( out ) ) ) return false;
0085             if ( !join_helper<N-1>::reserve( my_input, out ) ) {
0086                 release_my_reservation( my_input );
0087                 return false;
0088             }
0089             return true;
0090         }
0091 
0092         template<typename InputTuple, typename OutputTuple>
0093         static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
0094             bool res = std::get<N-1>(my_input).get_item(std::get<N-1>(out) ); // may fail
0095             return join_helper<N-1>::get_my_item(my_input, out) && res;       // do get on other inputs before returning
0096         }
0097 
0098         template<typename InputTuple, typename OutputTuple>
0099         static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
0100             return get_my_item(my_input, out);
0101         }
0102 
0103         template<typename InputTuple>
0104         static inline void reset_my_port(InputTuple &my_input) {
0105             join_helper<N-1>::reset_my_port(my_input);
0106             std::get<N-1>(my_input).reset_port();
0107         }
0108 
0109         template<typename InputTuple>
0110         static inline void reset_ports(InputTuple& my_input) {
0111             reset_my_port(my_input);
0112         }
0113 
0114         template<typename InputTuple, typename KeyFuncTuple>
0115         static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
0116             std::get<N-1>(my_input).set_my_key_func(std::get<N-1>(my_key_funcs));
0117             std::get<N-1>(my_key_funcs) = nullptr;
0118             join_helper<N-1>::set_key_functors(my_input, my_key_funcs);
0119         }
0120 
0121         template< typename KeyFuncTuple>
0122         static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
0123             __TBB_ASSERT(
0124                 std::get<N-1>(other_inputs).get_my_key_func(),
0125                 "key matching join node should not be instantiated without functors."
0126             );
0127             std::get<N-1>(my_inputs).set_my_key_func(std::get<N-1>(other_inputs).get_my_key_func()->clone());
0128             join_helper<N-1>::copy_key_functors(my_inputs, other_inputs);
0129         }
0130 
0131         template<typename InputTuple>
0132         static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
0133             join_helper<N-1>::reset_inputs(my_input, f);
0134             std::get<N-1>(my_input).reset_receiver(f);
0135         }
0136     };  // join_helper<N>
0137 
0138     template< >
0139     struct join_helper<1> {
0140 
0141         template< typename TupleType, typename PortType >
0142         static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
0143             std::get<0>( my_input ).set_join_node_pointer(port);
0144         }
0145 
0146         template< typename TupleType >
0147         static inline void consume_reservations( TupleType &my_input ) {
0148             std::get<0>( my_input ).consume();
0149         }
0150 
0151         template< typename TupleType >
0152         static inline void release_my_reservation( TupleType &my_input ) {
0153             std::get<0>( my_input ).release();
0154         }
0155 
0156         template<typename TupleType>
0157         static inline void release_reservations( TupleType &my_input) {
0158             release_my_reservation(my_input);
0159         }
0160 
0161         template< typename InputTuple, typename OutputTuple >
0162         static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
0163             return std::get<0>( my_input ).reserve( std::get<0>( out ) );
0164         }
0165 
0166         template<typename InputTuple, typename OutputTuple>
0167         static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
0168             return std::get<0>(my_input).get_item(std::get<0>(out));
0169         }
0170 
0171         template<typename InputTuple, typename OutputTuple>
0172         static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
0173             return get_my_item(my_input, out);
0174         }
0175 
0176         template<typename InputTuple>
0177         static inline void reset_my_port(InputTuple &my_input) {
0178             std::get<0>(my_input).reset_port();
0179         }
0180 
0181         template<typename InputTuple>
0182         static inline void reset_ports(InputTuple& my_input) {
0183             reset_my_port(my_input);
0184         }
0185 
0186         template<typename InputTuple, typename KeyFuncTuple>
0187         static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
0188             std::get<0>(my_input).set_my_key_func(std::get<0>(my_key_funcs));
0189             std::get<0>(my_key_funcs) = nullptr;
0190         }
0191 
0192         template< typename KeyFuncTuple>
0193         static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
0194             __TBB_ASSERT(
0195                 std::get<0>(other_inputs).get_my_key_func(),
0196                 "key matching join node should not be instantiated without functors."
0197             );
0198             std::get<0>(my_inputs).set_my_key_func(std::get<0>(other_inputs).get_my_key_func()->clone());
0199         }
0200         template<typename InputTuple>
0201         static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
0202             std::get<0>(my_input).reset_receiver(f);
0203         }
0204     };  // join_helper<1>
0205 
0206     //! The two-phase join port
0207     template< typename T >
0208     class reserving_port : public receiver<T> {
0209     public:
0210         typedef T input_type;
0211         typedef typename receiver<input_type>::predecessor_type predecessor_type;
0212 
0213     private:
0214         // ----------- Aggregator ------------
0215         enum op_type { reg_pred, rem_pred, res_item, rel_res, con_res
0216         };
0217         typedef reserving_port<T> class_type;
0218 
0219         class reserving_port_operation : public aggregated_operation<reserving_port_operation> {
0220         public:
0221             char type;
0222             union {
0223                 T *my_arg;
0224                 predecessor_type *my_pred;
0225             };
0226             reserving_port_operation(const T& e, op_type t) :
0227                 type(char(t)), my_arg(const_cast<T*>(&e)) {}
0228             reserving_port_operation(const predecessor_type &s, op_type t) : type(char(t)),
0229                 my_pred(const_cast<predecessor_type *>(&s)) {}
0230             reserving_port_operation(op_type t) : type(char(t)) {}
0231         };
0232 
0233         typedef aggregating_functor<class_type, reserving_port_operation> handler_type;
0234         friend class aggregating_functor<class_type, reserving_port_operation>;
0235         aggregator<handler_type, reserving_port_operation> my_aggregator;
0236 
0237         void handle_operations(reserving_port_operation* op_list) {
0238             reserving_port_operation *current;
0239             bool was_missing_predecessors = false;
0240             while(op_list) {
0241                 current = op_list;
0242                 op_list = op_list->next;
0243                 switch(current->type) {
0244                 case reg_pred:
0245                     was_missing_predecessors = my_predecessors.empty();
0246                     my_predecessors.add(*(current->my_pred));
0247                     if ( was_missing_predecessors ) {
0248                         (void) my_join->decrement_port_count(); // may try to forward
0249                     }
0250                     current->status.store( SUCCEEDED, std::memory_order_release);
0251                     break;
0252                 case rem_pred:
0253                     if ( !my_predecessors.empty() ) {
0254                         my_predecessors.remove(*(current->my_pred));
0255                         if ( my_predecessors.empty() ) // was the last predecessor
0256                             my_join->increment_port_count();
0257                     }
0258                     // TODO: consider returning failure if there were no predecessors to remove
0259                     current->status.store( SUCCEEDED, std::memory_order_release );
0260                     break;
0261                 case res_item:
0262                     if ( reserved ) {
0263                         current->status.store( FAILED, std::memory_order_release);
0264                     }
0265                     else if ( my_predecessors.try_reserve( *(current->my_arg) ) ) {
0266                         reserved = true;
0267                         current->status.store( SUCCEEDED, std::memory_order_release);
0268                     } else {
0269                         if ( my_predecessors.empty() ) {
0270                             my_join->increment_port_count();
0271                         }
0272                         current->status.store( FAILED, std::memory_order_release);
0273                     }
0274                     break;
0275                 case rel_res:
0276                     reserved = false;
0277                     my_predecessors.try_release( );
0278                     current->status.store( SUCCEEDED, std::memory_order_release);
0279                     break;
0280                 case con_res:
0281                     reserved = false;
0282                     my_predecessors.try_consume( );
0283                     current->status.store( SUCCEEDED, std::memory_order_release);
0284                     break;
0285                 }
0286             }
0287         }
0288 
0289     protected:
0290         template< typename R, typename B > friend class run_and_put_task;
0291         template<typename X, typename Y> friend class broadcast_cache;
0292         template<typename X, typename Y> friend class round_robin_cache;
0293         graph_task* try_put_task( const T & ) override {
0294             return nullptr;
0295         }
0296 
0297         graph& graph_reference() const override {
0298             return my_join->graph_ref;
0299         }
0300 
0301     public:
0302 
0303         //! Constructor
0304         reserving_port() : my_join(nullptr), my_predecessors(this), reserved(false) {
0305             my_aggregator.initialize_handler(handler_type(this));
0306         }
0307 
0308         // copy constructor
0309         reserving_port(const reserving_port& /* other */) = delete;
0310 
0311         void set_join_node_pointer(reserving_forwarding_base *join) {
0312             my_join = join;
0313         }
0314 
0315         //! Add a predecessor
0316         bool register_predecessor( predecessor_type &src ) override {
0317             reserving_port_operation op_data(src, reg_pred);
0318             my_aggregator.execute(&op_data);
0319             return op_data.status == SUCCEEDED;
0320         }
0321 
0322         //! Remove a predecessor
0323         bool remove_predecessor( predecessor_type &src ) override {
0324             reserving_port_operation op_data(src, rem_pred);
0325             my_aggregator.execute(&op_data);
0326             return op_data.status == SUCCEEDED;
0327         }
0328 
0329         //! Reserve an item from the port
0330         bool reserve( T &v ) {
0331             reserving_port_operation op_data(v, res_item);
0332             my_aggregator.execute(&op_data);
0333             return op_data.status == SUCCEEDED;
0334         }
0335 
0336         //! Release the port
0337         void release( ) {
0338             reserving_port_operation op_data(rel_res);
0339             my_aggregator.execute(&op_data);
0340         }
0341 
0342         //! Complete use of the port
0343         void consume( ) {
0344             reserving_port_operation op_data(con_res);
0345             my_aggregator.execute(&op_data);
0346         }
0347 
0348         void reset_receiver( reset_flags f) {
0349             if(f & rf_clear_edges) my_predecessors.clear();
0350             else
0351             my_predecessors.reset();
0352             reserved = false;
0353             __TBB_ASSERT(!(f&rf_clear_edges) || my_predecessors.empty(), "port edges not removed");
0354         }
0355 
0356     private:
0357 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0358         friend class get_graph_helper;
0359 #endif
0360 
0361         reserving_forwarding_base *my_join;
0362         reservable_predecessor_cache< T, null_mutex > my_predecessors;
0363         bool reserved;
0364     };  // reserving_port
0365 
0366     //! queueing join_port
0367     template<typename T>
0368     class queueing_port : public receiver<T>, public item_buffer<T> {
0369     public:
0370         typedef T input_type;
0371         typedef typename receiver<input_type>::predecessor_type predecessor_type;
0372         typedef queueing_port<T> class_type;
0373 
0374     // ----------- Aggregator ------------
0375     private:
0376         enum op_type { get__item, res_port, try__put_task
0377         };
0378 
0379         class queueing_port_operation : public aggregated_operation<queueing_port_operation> {
0380         public:
0381             char type;
0382             T my_val;
0383             T* my_arg;
0384             graph_task* bypass_t;
0385             // constructor for value parameter
0386             queueing_port_operation(const T& e, op_type t) :
0387                 type(char(t)), my_val(e), my_arg(nullptr)
0388                 , bypass_t(nullptr)
0389             {}
0390             // constructor for pointer parameter
0391             queueing_port_operation(const T* p, op_type t) :
0392                 type(char(t)), my_arg(const_cast<T*>(p))
0393                 , bypass_t(nullptr)
0394             {}
0395             // constructor with no parameter
0396             queueing_port_operation(op_type t) : type(char(t)), my_arg(nullptr)
0397                 , bypass_t(nullptr)
0398             {}
0399         };
0400 
0401         typedef aggregating_functor<class_type, queueing_port_operation> handler_type;
0402         friend class aggregating_functor<class_type, queueing_port_operation>;
0403         aggregator<handler_type, queueing_port_operation> my_aggregator;
0404 
0405         void handle_operations(queueing_port_operation* op_list) {
0406             queueing_port_operation *current;
0407             bool was_empty;
0408             while(op_list) {
0409                 current = op_list;
0410                 op_list = op_list->next;
0411                 switch(current->type) {
0412                 case try__put_task: {
0413                         graph_task* rtask = nullptr;
0414                         was_empty = this->buffer_empty();
0415                         this->push_back(current->my_val);
0416                         if (was_empty) rtask = my_join->decrement_port_count(false);
0417                         else
0418                             rtask = SUCCESSFULLY_ENQUEUED;
0419                         current->bypass_t = rtask;
0420                         current->status.store( SUCCEEDED, std::memory_order_release);
0421                     }
0422                     break;
0423                 case get__item:
0424                     if(!this->buffer_empty()) {
0425                         __TBB_ASSERT(current->my_arg, nullptr);
0426                         *(current->my_arg) = this->front();
0427                         current->status.store( SUCCEEDED, std::memory_order_release);
0428                     }
0429                     else {
0430                         current->status.store( FAILED, std::memory_order_release);
0431                     }
0432                     break;
0433                 case res_port:
0434                     __TBB_ASSERT(this->my_item_valid(this->my_head), "No item to reset");
0435                     this->destroy_front();
0436                     if(this->my_item_valid(this->my_head)) {
0437                         (void)my_join->decrement_port_count(true);
0438                     }
0439                     current->status.store( SUCCEEDED, std::memory_order_release);
0440                     break;
0441                 }
0442             }
0443         }
0444     // ------------ End Aggregator ---------------
0445 
0446     protected:
0447         template< typename R, typename B > friend class run_and_put_task;
0448         template<typename X, typename Y> friend class broadcast_cache;
0449         template<typename X, typename Y> friend class round_robin_cache;
0450         graph_task* try_put_task(const T &v) override {
0451             queueing_port_operation op_data(v, try__put_task);
0452             my_aggregator.execute(&op_data);
0453             __TBB_ASSERT(op_data.status == SUCCEEDED || !op_data.bypass_t, "inconsistent return from aggregator");
0454             if(!op_data.bypass_t) return SUCCESSFULLY_ENQUEUED;
0455             return op_data.bypass_t;
0456         }
0457 
0458         graph& graph_reference() const override {
0459             return my_join->graph_ref;
0460         }
0461 
0462     public:
0463 
0464         //! Constructor
0465         queueing_port() : item_buffer<T>() {
0466             my_join = nullptr;
0467             my_aggregator.initialize_handler(handler_type(this));
0468         }
0469 
0470         //! copy constructor
0471         queueing_port(const queueing_port& /* other */) = delete;
0472 
0473         //! record parent for tallying available items
0474         void set_join_node_pointer(queueing_forwarding_base *join) {
0475             my_join = join;
0476         }
0477 
0478         bool get_item( T &v ) {
0479             queueing_port_operation op_data(&v, get__item);
0480             my_aggregator.execute(&op_data);
0481             return op_data.status == SUCCEEDED;
0482         }
0483 
0484         // reset_port is called when item is accepted by successor, but
0485         // is initiated by join_node.
0486         void reset_port() {
0487             queueing_port_operation op_data(res_port);
0488             my_aggregator.execute(&op_data);
0489             return;
0490         }
0491 
0492         void reset_receiver(reset_flags) {
0493             item_buffer<T>::reset();
0494         }
0495 
0496     private:
0497 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0498         friend class get_graph_helper;
0499 #endif
0500 
0501         queueing_forwarding_base *my_join;
0502     };  // queueing_port
0503 
0504 #include "_flow_graph_tagged_buffer_impl.h"
0505 
0506     template<typename K>
0507     struct count_element {
0508         K my_key;
0509         size_t my_value;
0510     };
0511 
0512     // method to access the key in the counting table
0513     // the ref has already been removed from K
0514     template< typename K >
0515     struct key_to_count_functor {
0516         typedef count_element<K> table_item_type;
0517         const K& operator()(const table_item_type& v) { return v.my_key; }
0518     };
0519 
0520     // the ports can have only one template parameter.  We wrap the types needed in
0521     // a traits type
0522     template< class TraitsType >
0523     class key_matching_port :
0524         public receiver<typename TraitsType::T>,
0525         public hash_buffer< typename TraitsType::K, typename TraitsType::T, typename TraitsType::TtoK,
0526                 typename TraitsType::KHash > {
0527     public:
0528         typedef TraitsType traits;
0529         typedef key_matching_port<traits> class_type;
0530         typedef typename TraitsType::T input_type;
0531         typedef typename TraitsType::K key_type;
0532         typedef typename std::decay<key_type>::type noref_key_type;
0533         typedef typename receiver<input_type>::predecessor_type predecessor_type;
0534         typedef typename TraitsType::TtoK type_to_key_func_type;
0535         typedef typename TraitsType::KHash hash_compare_type;
0536         typedef hash_buffer< key_type, input_type, type_to_key_func_type, hash_compare_type > buffer_type;
0537 
0538     private:
0539 // ----------- Aggregator ------------
0540     private:
0541         enum op_type { try__put, get__item, res_port
0542         };
0543 
0544         class key_matching_port_operation : public aggregated_operation<key_matching_port_operation> {
0545         public:
0546             char type;
0547             input_type my_val;
0548             input_type *my_arg;
0549             // constructor for value parameter
0550             key_matching_port_operation(const input_type& e, op_type t) :
0551                 type(char(t)), my_val(e), my_arg(nullptr) {}
0552             // constructor for pointer parameter
0553             key_matching_port_operation(const input_type* p, op_type t) :
0554                 type(char(t)), my_arg(const_cast<input_type*>(p)) {}
0555             // constructor with no parameter
0556             key_matching_port_operation(op_type t) : type(char(t)), my_arg(nullptr) {}
0557         };
0558 
0559         typedef aggregating_functor<class_type, key_matching_port_operation> handler_type;
0560         friend class aggregating_functor<class_type, key_matching_port_operation>;
0561         aggregator<handler_type, key_matching_port_operation> my_aggregator;
0562 
0563         void handle_operations(key_matching_port_operation* op_list) {
0564             key_matching_port_operation *current;
0565             while(op_list) {
0566                 current = op_list;
0567                 op_list = op_list->next;
0568                 switch(current->type) {
0569                 case try__put: {
0570                         bool was_inserted = this->insert_with_key(current->my_val);
0571                         // return failure if a duplicate insertion occurs
0572                         current->status.store( was_inserted ? SUCCEEDED : FAILED, std::memory_order_release);
0573                     }
0574                     break;
0575                 case get__item:
0576                     // use current_key from FE for item
0577                     __TBB_ASSERT(current->my_arg, nullptr);
0578                     if(!this->find_with_key(my_join->current_key, *(current->my_arg))) {
0579                         __TBB_ASSERT(false, "Failed to find item corresponding to current_key.");
0580                     }
0581                     current->status.store( SUCCEEDED, std::memory_order_release);
0582                     break;
0583                 case res_port:
0584                     // use current_key from FE for item
0585                     this->delete_with_key(my_join->current_key);
0586                     current->status.store( SUCCEEDED, std::memory_order_release);
0587                     break;
0588                 }
0589             }
0590         }
0591 // ------------ End Aggregator ---------------
0592     protected:
0593         template< typename R, typename B > friend class run_and_put_task;
0594         template<typename X, typename Y> friend class broadcast_cache;
0595         template<typename X, typename Y> friend class round_robin_cache;
0596         graph_task* try_put_task(const input_type& v) override {
0597             key_matching_port_operation op_data(v, try__put);
0598             graph_task* rtask = nullptr;
0599             my_aggregator.execute(&op_data);
0600             if(op_data.status == SUCCEEDED) {
0601                 rtask = my_join->increment_key_count((*(this->get_key_func()))(v));  // may spawn
0602                 // rtask has to reflect the return status of the try_put
0603                 if(!rtask) rtask = SUCCESSFULLY_ENQUEUED;
0604             }
0605             return rtask;
0606         }
0607 
0608         graph& graph_reference() const override {
0609             return my_join->graph_ref;
0610         }
0611 
0612     public:
0613 
0614         key_matching_port() : receiver<input_type>(), buffer_type() {
0615             my_join = nullptr;
0616             my_aggregator.initialize_handler(handler_type(this));
0617         }
0618 
0619         // copy constructor
0620         key_matching_port(const key_matching_port& /*other*/) = delete;
0621 #if __INTEL_COMPILER <= 2021
0622         // Suppress superfluous diagnostic about virtual keyword absence in a destructor of an inherited
0623         // class while the parent class has the virtual keyword for the destrocutor.
0624         virtual
0625 #endif
0626         ~key_matching_port() { }
0627 
0628         void set_join_node_pointer(forwarding_base *join) {
0629             my_join = dynamic_cast<matching_forwarding_base<key_type>*>(join);
0630         }
0631 
0632         void set_my_key_func(type_to_key_func_type *f) { this->set_key_func(f); }
0633 
0634         type_to_key_func_type* get_my_key_func() { return this->get_key_func(); }
0635 
0636         bool get_item( input_type &v ) {
0637             // aggregator uses current_key from FE for Key
0638             key_matching_port_operation op_data(&v, get__item);
0639             my_aggregator.execute(&op_data);
0640             return op_data.status == SUCCEEDED;
0641         }
0642 
0643         // reset_port is called when item is accepted by successor, but
0644         // is initiated by join_node.
0645         void reset_port() {
0646             key_matching_port_operation op_data(res_port);
0647             my_aggregator.execute(&op_data);
0648             return;
0649         }
0650 
0651         void reset_receiver(reset_flags ) {
0652             buffer_type::reset();
0653         }
0654 
0655     private:
0656         // my_join forwarding base used to count number of inputs that
0657         // received key.
0658         matching_forwarding_base<key_type> *my_join;
0659     };  // key_matching_port
0660 
0661     using namespace graph_policy_namespace;
0662 
0663     template<typename JP, typename InputTuple, typename OutputTuple>
0664     class join_node_base;
0665 
0666     //! join_node_FE : implements input port policy
0667     template<typename JP, typename InputTuple, typename OutputTuple>
0668     class join_node_FE;
0669 
0670     template<typename InputTuple, typename OutputTuple>
0671     class join_node_FE<reserving, InputTuple, OutputTuple> : public reserving_forwarding_base {
0672     private:
0673         static const int N = std::tuple_size<OutputTuple>::value;
0674         typedef OutputTuple output_type;
0675         typedef InputTuple input_type;
0676         typedef join_node_base<reserving, InputTuple, OutputTuple> base_node_type; // for forwarding
0677     public:
0678         join_node_FE(graph &g) : reserving_forwarding_base(g), my_node(nullptr) {
0679             ports_with_no_inputs = N;
0680             join_helper<N>::set_join_node_pointer(my_inputs, this);
0681         }
0682 
0683         join_node_FE(const join_node_FE& other) : reserving_forwarding_base((other.reserving_forwarding_base::graph_ref)), my_node(nullptr) {
0684             ports_with_no_inputs = N;
0685             join_helper<N>::set_join_node_pointer(my_inputs, this);
0686         }
0687 
0688         void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
0689 
0690        void increment_port_count() override {
0691             ++ports_with_no_inputs;
0692         }
0693 
0694         // if all input_ports have predecessors, spawn forward to try and consume tuples
0695         graph_task* decrement_port_count() override {
0696             if(ports_with_no_inputs.fetch_sub(1) == 1) {
0697                 if(is_graph_active(this->graph_ref)) {
0698                     small_object_allocator allocator{};
0699                     typedef forward_task_bypass<base_node_type> task_type;
0700                     graph_task* t = allocator.new_object<task_type>(graph_ref, allocator, *my_node);
0701                     graph_ref.reserve_wait();
0702                     spawn_in_graph_arena(this->graph_ref, *t);
0703                 }
0704             }
0705             return nullptr;
0706         }
0707 
0708         input_type &input_ports() { return my_inputs; }
0709 
0710     protected:
0711 
0712         void reset(  reset_flags f) {
0713             // called outside of parallel contexts
0714             ports_with_no_inputs = N;
0715             join_helper<N>::reset_inputs(my_inputs, f);
0716         }
0717 
0718         // all methods on input ports should be called under mutual exclusion from join_node_base.
0719 
0720         bool tuple_build_may_succeed() {
0721             return !ports_with_no_inputs;
0722         }
0723 
0724         bool try_to_make_tuple(output_type &out) {
0725             if(ports_with_no_inputs) return false;
0726             return join_helper<N>::reserve(my_inputs, out);
0727         }
0728 
0729         void tuple_accepted() {
0730             join_helper<N>::consume_reservations(my_inputs);
0731         }
0732         void tuple_rejected() {
0733             join_helper<N>::release_reservations(my_inputs);
0734         }
0735 
0736         input_type my_inputs;
0737         base_node_type *my_node;
0738         std::atomic<std::size_t> ports_with_no_inputs;
0739     };  // join_node_FE<reserving, ... >
0740 
0741     template<typename InputTuple, typename OutputTuple>
0742     class join_node_FE<queueing, InputTuple, OutputTuple> : public queueing_forwarding_base {
0743     public:
0744         static const int N = std::tuple_size<OutputTuple>::value;
0745         typedef OutputTuple output_type;
0746         typedef InputTuple input_type;
0747         typedef join_node_base<queueing, InputTuple, OutputTuple> base_node_type; // for forwarding
0748 
0749         join_node_FE(graph &g) : queueing_forwarding_base(g), my_node(nullptr) {
0750             ports_with_no_items = N;
0751             join_helper<N>::set_join_node_pointer(my_inputs, this);
0752         }
0753 
0754         join_node_FE(const join_node_FE& other) : queueing_forwarding_base((other.queueing_forwarding_base::graph_ref)), my_node(nullptr) {
0755             ports_with_no_items = N;
0756             join_helper<N>::set_join_node_pointer(my_inputs, this);
0757         }
0758 
0759         // needed for forwarding
0760         void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
0761 
0762         void reset_port_count() {
0763             ports_with_no_items = N;
0764         }
0765 
0766         // if all input_ports have items, spawn forward to try and consume tuples
0767         graph_task* decrement_port_count(bool handle_task) override
0768         {
0769             if(ports_with_no_items.fetch_sub(1) == 1) {
0770                 if(is_graph_active(this->graph_ref)) {
0771                     small_object_allocator allocator{};
0772                     typedef forward_task_bypass<base_node_type> task_type;
0773                     graph_task* t = allocator.new_object<task_type>(graph_ref, allocator, *my_node);
0774                     graph_ref.reserve_wait();
0775                     if( !handle_task )
0776                         return t;
0777                     spawn_in_graph_arena(this->graph_ref, *t);
0778                 }
0779             }
0780             return nullptr;
0781         }
0782 
0783         input_type &input_ports() { return my_inputs; }
0784 
0785     protected:
0786 
0787         void reset(  reset_flags f) {
0788             reset_port_count();
0789             join_helper<N>::reset_inputs(my_inputs, f );
0790         }
0791 
0792         // all methods on input ports should be called under mutual exclusion from join_node_base.
0793 
0794         bool tuple_build_may_succeed() {
0795             return !ports_with_no_items;
0796         }
0797 
0798         bool try_to_make_tuple(output_type &out) {
0799             if(ports_with_no_items) return false;
0800             return join_helper<N>::get_items(my_inputs, out);
0801         }
0802 
0803         void tuple_accepted() {
0804             reset_port_count();
0805             join_helper<N>::reset_ports(my_inputs);
0806         }
0807         void tuple_rejected() {
0808             // nothing to do.
0809         }
0810 
0811         input_type my_inputs;
0812         base_node_type *my_node;
0813         std::atomic<std::size_t> ports_with_no_items;
0814     };  // join_node_FE<queueing, ...>
0815 
0816     // key_matching join front-end.
0817     template<typename InputTuple, typename OutputTuple, typename K, typename KHash>
0818     class join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple> : public matching_forwarding_base<K>,
0819              // buffer of key value counts
0820               public hash_buffer<   // typedefed below to key_to_count_buffer_type
0821                   typename std::decay<K>::type&,        // force ref type on K
0822                   count_element<typename std::decay<K>::type>,
0823                   type_to_key_function_body<
0824                       count_element<typename std::decay<K>::type>,
0825                       typename std::decay<K>::type& >,
0826                   KHash >,
0827              // buffer of output items
0828              public item_buffer<OutputTuple> {
0829     public:
0830         static const int N = std::tuple_size<OutputTuple>::value;
0831         typedef OutputTuple output_type;
0832         typedef InputTuple input_type;
0833         typedef K key_type;
0834         typedef typename std::decay<key_type>::type unref_key_type;
0835         typedef KHash key_hash_compare;
0836         // must use K without ref.
0837         typedef count_element<unref_key_type> count_element_type;
0838         // method that lets us refer to the key of this type.
0839         typedef key_to_count_functor<unref_key_type> key_to_count_func;
0840         typedef type_to_key_function_body< count_element_type, unref_key_type&> TtoK_function_body_type;
0841         typedef type_to_key_function_body_leaf<count_element_type, unref_key_type&, key_to_count_func> TtoK_function_body_leaf_type;
0842         // this is the type of the special table that keeps track of the number of discrete
0843         // elements corresponding to each key that we've seen.
0844         typedef hash_buffer< unref_key_type&, count_element_type, TtoK_function_body_type, key_hash_compare >
0845                  key_to_count_buffer_type;
0846         typedef item_buffer<output_type> output_buffer_type;
0847         typedef join_node_base<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> base_node_type; // for forwarding
0848         typedef matching_forwarding_base<key_type> forwarding_base_type;
0849 
0850 // ----------- Aggregator ------------
0851         // the aggregator is only needed to serialize the access to the hash table.
0852         // and the output_buffer_type base class
0853     private:
0854         enum op_type { res_count, inc_count, may_succeed, try_make };
0855         typedef join_node_FE<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> class_type;
0856 
0857         class key_matching_FE_operation : public aggregated_operation<key_matching_FE_operation> {
0858         public:
0859             char type;
0860             unref_key_type my_val;
0861             output_type* my_output;
0862             graph_task* bypass_t;
0863             // constructor for value parameter
0864             key_matching_FE_operation(const unref_key_type& e , op_type t) : type(char(t)), my_val(e),
0865                  my_output(nullptr), bypass_t(nullptr) {}
0866             key_matching_FE_operation(output_type *p, op_type t) : type(char(t)), my_output(p), bypass_t(nullptr) {}
0867             // constructor with no parameter
0868             key_matching_FE_operation(op_type t) : type(char(t)), my_output(nullptr), bypass_t(nullptr) {}
0869         };
0870 
0871         typedef aggregating_functor<class_type, key_matching_FE_operation> handler_type;
0872         friend class aggregating_functor<class_type, key_matching_FE_operation>;
0873         aggregator<handler_type, key_matching_FE_operation> my_aggregator;
0874 
0875         // called from aggregator, so serialized
0876         // returns a task pointer if the a task would have been enqueued but we asked that
0877         // it be returned.  Otherwise returns nullptr.
0878         graph_task* fill_output_buffer(unref_key_type &t) {
0879             output_type l_out;
0880             graph_task* rtask = nullptr;
0881             bool do_fwd = this->buffer_empty() && is_graph_active(this->graph_ref);
0882             this->current_key = t;
0883             this->delete_with_key(this->current_key);   // remove the key
0884             if(join_helper<N>::get_items(my_inputs, l_out)) {  //  <== call back
0885                 this->push_back(l_out);
0886                 if(do_fwd) {  // we enqueue if receiving an item from predecessor, not if successor asks for item
0887                     small_object_allocator allocator{};
0888                     typedef forward_task_bypass<base_node_type> task_type;
0889                     rtask = allocator.new_object<task_type>(this->graph_ref, allocator, *my_node);
0890                     this->graph_ref.reserve_wait();
0891                     do_fwd = false;
0892                 }
0893                 // retire the input values
0894                 join_helper<N>::reset_ports(my_inputs);  //  <== call back
0895             }
0896             else {
0897                 __TBB_ASSERT(false, "should have had something to push");
0898             }
0899             return rtask;
0900         }
0901 
0902         void handle_operations(key_matching_FE_operation* op_list) {
0903             key_matching_FE_operation *current;
0904             while(op_list) {
0905                 current = op_list;
0906                 op_list = op_list->next;
0907                 switch(current->type) {
0908                 case res_count:  // called from BE
0909                     {
0910                         this->destroy_front();
0911                         current->status.store( SUCCEEDED, std::memory_order_release);
0912                     }
0913                     break;
0914                 case inc_count: {  // called from input ports
0915                         count_element_type *p = nullptr;
0916                         unref_key_type &t = current->my_val;
0917                         if(!(this->find_ref_with_key(t,p))) {
0918                             count_element_type ev;
0919                             ev.my_key = t;
0920                             ev.my_value = 0;
0921                             this->insert_with_key(ev);
0922                             bool found = this->find_ref_with_key(t, p);
0923                             __TBB_ASSERT_EX(found, "should find key after inserting it");
0924                         }
0925                         if(++(p->my_value) == size_t(N)) {
0926                             current->bypass_t = fill_output_buffer(t);
0927                         }
0928                     }
0929                     current->status.store( SUCCEEDED, std::memory_order_release);
0930                     break;
0931                 case may_succeed:  // called from BE
0932                     current->status.store( this->buffer_empty() ? FAILED : SUCCEEDED, std::memory_order_release);
0933                     break;
0934                 case try_make:  // called from BE
0935                     if(this->buffer_empty()) {
0936                         current->status.store( FAILED, std::memory_order_release);
0937                     }
0938                     else {
0939                         *(current->my_output) = this->front();
0940                         current->status.store( SUCCEEDED, std::memory_order_release);
0941                     }
0942                     break;
0943                 }
0944             }
0945         }
0946 // ------------ End Aggregator ---------------
0947 
0948     public:
0949         template<typename FunctionTuple>
0950         join_node_FE(graph &g, FunctionTuple &TtoK_funcs) : forwarding_base_type(g), my_node(nullptr) {
0951             join_helper<N>::set_join_node_pointer(my_inputs, this);
0952             join_helper<N>::set_key_functors(my_inputs, TtoK_funcs);
0953             my_aggregator.initialize_handler(handler_type(this));
0954                     TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func());
0955             this->set_key_func(cfb);
0956         }
0957 
0958         join_node_FE(const join_node_FE& other) : forwarding_base_type((other.forwarding_base_type::graph_ref)), key_to_count_buffer_type(),
0959         output_buffer_type() {
0960             my_node = nullptr;
0961             join_helper<N>::set_join_node_pointer(my_inputs, this);
0962             join_helper<N>::copy_key_functors(my_inputs, const_cast<input_type &>(other.my_inputs));
0963             my_aggregator.initialize_handler(handler_type(this));
0964             TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func());
0965             this->set_key_func(cfb);
0966         }
0967 
0968         // needed for forwarding
0969         void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
0970 
0971         void reset_port_count() {  // called from BE
0972             key_matching_FE_operation op_data(res_count);
0973             my_aggregator.execute(&op_data);
0974             return;
0975         }
0976 
0977         // if all input_ports have items, spawn forward to try and consume tuples
0978         // return a task if we are asked and did create one.
0979         graph_task *increment_key_count(unref_key_type const & t) override {  // called from input_ports
0980             key_matching_FE_operation op_data(t, inc_count);
0981             my_aggregator.execute(&op_data);
0982             return op_data.bypass_t;
0983         }
0984 
0985         input_type &input_ports() { return my_inputs; }
0986 
0987     protected:
0988 
0989         void reset(  reset_flags f ) {
0990             // called outside of parallel contexts
0991             join_helper<N>::reset_inputs(my_inputs, f);
0992 
0993             key_to_count_buffer_type::reset();
0994             output_buffer_type::reset();
0995         }
0996 
0997         // all methods on input ports should be called under mutual exclusion from join_node_base.
0998 
0999         bool tuple_build_may_succeed() {  // called from back-end
1000             key_matching_FE_operation op_data(may_succeed);
1001             my_aggregator.execute(&op_data);
1002             return op_data.status == SUCCEEDED;
1003         }
1004 
1005         // cannot lock while calling back to input_ports.  current_key will only be set
1006         // and reset under the aggregator, so it will remain consistent.
1007         bool try_to_make_tuple(output_type &out) {
1008             key_matching_FE_operation op_data(&out,try_make);
1009             my_aggregator.execute(&op_data);
1010             return op_data.status == SUCCEEDED;
1011         }
1012 
1013         void tuple_accepted() {
1014             reset_port_count();  // reset current_key after ports reset.
1015         }
1016 
1017         void tuple_rejected() {
1018             // nothing to do.
1019         }
1020 
1021         input_type my_inputs;  // input ports
1022         base_node_type *my_node;
1023     }; // join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple>
1024 
1025     //! join_node_base
1026     template<typename JP, typename InputTuple, typename OutputTuple>
1027     class join_node_base : public graph_node, public join_node_FE<JP, InputTuple, OutputTuple>,
1028                            public sender<OutputTuple> {
1029     protected:
1030         using graph_node::my_graph;
1031     public:
1032         typedef OutputTuple output_type;
1033 
1034         typedef typename sender<output_type>::successor_type successor_type;
1035         typedef join_node_FE<JP, InputTuple, OutputTuple> input_ports_type;
1036         using input_ports_type::tuple_build_may_succeed;
1037         using input_ports_type::try_to_make_tuple;
1038         using input_ports_type::tuple_accepted;
1039         using input_ports_type::tuple_rejected;
1040 
1041     private:
1042         // ----------- Aggregator ------------
1043         enum op_type { reg_succ, rem_succ, try__get, do_fwrd, do_fwrd_bypass
1044         };
1045         typedef join_node_base<JP,InputTuple,OutputTuple> class_type;
1046 
1047         class join_node_base_operation : public aggregated_operation<join_node_base_operation> {
1048         public:
1049             char type;
1050             union {
1051                 output_type *my_arg;
1052                 successor_type *my_succ;
1053             };
1054             graph_task* bypass_t;
1055             join_node_base_operation(const output_type& e, op_type t) : type(char(t)),
1056                 my_arg(const_cast<output_type*>(&e)), bypass_t(nullptr) {}
1057             join_node_base_operation(const successor_type &s, op_type t) : type(char(t)),
1058                 my_succ(const_cast<successor_type *>(&s)), bypass_t(nullptr) {}
1059             join_node_base_operation(op_type t) : type(char(t)), bypass_t(nullptr) {}
1060         };
1061 
1062         typedef aggregating_functor<class_type, join_node_base_operation> handler_type;
1063         friend class aggregating_functor<class_type, join_node_base_operation>;
1064         bool forwarder_busy;
1065         aggregator<handler_type, join_node_base_operation> my_aggregator;
1066 
1067         void handle_operations(join_node_base_operation* op_list) {
1068             join_node_base_operation *current;
1069             while(op_list) {
1070                 current = op_list;
1071                 op_list = op_list->next;
1072                 switch(current->type) {
1073                 case reg_succ: {
1074                         my_successors.register_successor(*(current->my_succ));
1075                         if(tuple_build_may_succeed() && !forwarder_busy && is_graph_active(my_graph)) {
1076                             small_object_allocator allocator{};
1077                             typedef forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> > task_type;
1078                             graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
1079                             my_graph.reserve_wait();
1080                             spawn_in_graph_arena(my_graph, *t);
1081                             forwarder_busy = true;
1082                         }
1083                         current->status.store( SUCCEEDED, std::memory_order_release);
1084                     }
1085                     break;
1086                 case rem_succ:
1087                     my_successors.remove_successor(*(current->my_succ));
1088                     current->status.store( SUCCEEDED, std::memory_order_release);
1089                     break;
1090                 case try__get:
1091                     if(tuple_build_may_succeed()) {
1092                         if(try_to_make_tuple(*(current->my_arg))) {
1093                             tuple_accepted();
1094                             current->status.store( SUCCEEDED, std::memory_order_release);
1095                         }
1096                         else current->status.store( FAILED, std::memory_order_release);
1097                     }
1098                     else current->status.store( FAILED, std::memory_order_release);
1099                     break;
1100                 case do_fwrd_bypass: {
1101                         bool build_succeeded;
1102                         graph_task *last_task = nullptr;
1103                         output_type out;
1104                         // forwarding must be exclusive, because try_to_make_tuple and tuple_accepted
1105                         // are separate locked methods in the FE.  We could conceivably fetch the front
1106                         // of the FE queue, then be swapped out, have someone else consume the FE's
1107                         // object, then come back, forward, and then try to remove it from the queue
1108                         // again. Without reservation of the FE, the methods accessing it must be locked.
1109                         // We could remember the keys of the objects we forwarded, and then remove
1110                         // them from the input ports after forwarding is complete?
1111                         if(tuple_build_may_succeed()) {  // checks output queue of FE
1112                             do {
1113                                 build_succeeded = try_to_make_tuple(out);  // fetch front_end of queue
1114                                 if(build_succeeded) {
1115                                     graph_task *new_task = my_successors.try_put_task(out);
1116                                     last_task = combine_tasks(my_graph, last_task, new_task);
1117                                     if(new_task) {
1118                                         tuple_accepted();
1119                                     }
1120                                     else {
1121                                         tuple_rejected();
1122                                         build_succeeded = false;
1123                                     }
1124                                 }
1125                             } while(build_succeeded);
1126                         }
1127                         current->bypass_t = last_task;
1128                         current->status.store( SUCCEEDED, std::memory_order_release);
1129                         forwarder_busy = false;
1130                     }
1131                     break;
1132                 }
1133             }
1134         }
1135         // ---------- end aggregator -----------
1136     public:
1137         join_node_base(graph &g)
1138             : graph_node(g), input_ports_type(g), forwarder_busy(false), my_successors(this)
1139         {
1140             input_ports_type::set_my_node(this);
1141             my_aggregator.initialize_handler(handler_type(this));
1142         }
1143 
1144         join_node_base(const join_node_base& other) :
1145             graph_node(other.graph_node::my_graph), input_ports_type(other),
1146             sender<OutputTuple>(), forwarder_busy(false), my_successors(this)
1147         {
1148             input_ports_type::set_my_node(this);
1149             my_aggregator.initialize_handler(handler_type(this));
1150         }
1151 
1152         template<typename FunctionTuple>
1153         join_node_base(graph &g, FunctionTuple f)
1154             : graph_node(g), input_ports_type(g, f), forwarder_busy(false), my_successors(this)
1155         {
1156             input_ports_type::set_my_node(this);
1157             my_aggregator.initialize_handler(handler_type(this));
1158         }
1159 
1160         bool register_successor(successor_type &r) override {
1161             join_node_base_operation op_data(r, reg_succ);
1162             my_aggregator.execute(&op_data);
1163             return op_data.status == SUCCEEDED;
1164         }
1165 
1166         bool remove_successor( successor_type &r) override {
1167             join_node_base_operation op_data(r, rem_succ);
1168             my_aggregator.execute(&op_data);
1169             return op_data.status == SUCCEEDED;
1170         }
1171 
1172         bool try_get( output_type &v) override {
1173             join_node_base_operation op_data(v, try__get);
1174             my_aggregator.execute(&op_data);
1175             return op_data.status == SUCCEEDED;
1176         }
1177 
1178     protected:
1179         void reset_node(reset_flags f) override {
1180             input_ports_type::reset(f);
1181             if(f & rf_clear_edges) my_successors.clear();
1182         }
1183 
1184     private:
1185         broadcast_cache<output_type, null_rw_mutex> my_successors;
1186 
1187         friend class forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> >;
1188         graph_task *forward_task() {
1189             join_node_base_operation op_data(do_fwrd_bypass);
1190             my_aggregator.execute(&op_data);
1191             return op_data.bypass_t;
1192         }
1193 
1194     };  // join_node_base
1195 
1196     // join base class type generator
1197     template<int N, template<class> class PT, typename OutputTuple, typename JP>
1198     struct join_base {
1199         typedef join_node_base<JP, typename wrap_tuple_elements<N,PT,OutputTuple>::type, OutputTuple> type;
1200     };
1201 
1202     template<int N, typename OutputTuple, typename K, typename KHash>
1203     struct join_base<N, key_matching_port, OutputTuple, key_matching<K,KHash> > {
1204         typedef key_matching<K, KHash> key_traits_type;
1205         typedef K key_type;
1206         typedef KHash key_hash_compare;
1207         typedef join_node_base< key_traits_type,
1208                 // ports type
1209                 typename wrap_key_tuple_elements<N,key_matching_port,key_traits_type,OutputTuple>::type,
1210                 OutputTuple > type;
1211     };
1212 
1213     //! unfolded_join_node : passes input_ports_type to join_node_base.  We build the input port type
1214     //  using tuple_element.  The class PT is the port type (reserving_port, queueing_port, key_matching_port)
1215     //  and should match the typename.
1216 
1217     template<int M, template<class> class PT, typename OutputTuple, typename JP>
1218     class unfolded_join_node : public join_base<M,PT,OutputTuple,JP>::type {
1219     public:
1220         typedef typename wrap_tuple_elements<M, PT, OutputTuple>::type input_ports_type;
1221         typedef OutputTuple output_type;
1222     private:
1223         typedef join_node_base<JP, input_ports_type, output_type > base_type;
1224     public:
1225         unfolded_join_node(graph &g) : base_type(g) {}
1226         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1227     };
1228 
1229 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1230     template <typename K, typename T>
1231     struct key_from_message_body {
1232         K operator()(const T& t) const {
1233             return key_from_message<K>(t);
1234         }
1235     };
1236     // Adds const to reference type
1237     template <typename K, typename T>
1238     struct key_from_message_body<K&,T> {
1239         const K& operator()(const T& t) const {
1240             return key_from_message<const K&>(t);
1241         }
1242     };
1243 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1244     // key_matching unfolded_join_node.  This must be a separate specialization because the constructors
1245     // differ.
1246 
1247     template<typename OutputTuple, typename K, typename KHash>
1248     class unfolded_join_node<2,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1249             join_base<2,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1250         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1251         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1252     public:
1253         typedef typename wrap_key_tuple_elements<2,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1254         typedef OutputTuple output_type;
1255     private:
1256         typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1257         typedef type_to_key_function_body<T0, K> *f0_p;
1258         typedef type_to_key_function_body<T1, K> *f1_p;
1259         typedef std::tuple< f0_p, f1_p > func_initializer_type;
1260     public:
1261 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1262         unfolded_join_node(graph &g) : base_type(g,
1263                 func_initializer_type(
1264                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1265                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>())
1266                     ) ) {
1267         }
1268 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1269         template<typename Body0, typename Body1>
1270         unfolded_join_node(graph &g, Body0 body0, Body1 body1) : base_type(g,
1271                 func_initializer_type(
1272                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1273                     new type_to_key_function_body_leaf<T1, K, Body1>(body1)
1274                     ) ) {
1275             static_assert(std::tuple_size<OutputTuple>::value == 2, "wrong number of body initializers");
1276         }
1277         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1278     };
1279 
1280     template<typename OutputTuple, typename K, typename KHash>
1281     class unfolded_join_node<3,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1282             join_base<3,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1283         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1284         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1285         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1286     public:
1287         typedef typename wrap_key_tuple_elements<3,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1288         typedef OutputTuple output_type;
1289     private:
1290         typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1291         typedef type_to_key_function_body<T0, K> *f0_p;
1292         typedef type_to_key_function_body<T1, K> *f1_p;
1293         typedef type_to_key_function_body<T2, K> *f2_p;
1294         typedef std::tuple< f0_p, f1_p, f2_p > func_initializer_type;
1295     public:
1296 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1297         unfolded_join_node(graph &g) : base_type(g,
1298                 func_initializer_type(
1299                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1300                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1301                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>())
1302                     ) ) {
1303         }
1304 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1305         template<typename Body0, typename Body1, typename Body2>
1306         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2) : base_type(g,
1307                 func_initializer_type(
1308                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1309                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1310                     new type_to_key_function_body_leaf<T2, K, Body2>(body2)
1311                     ) ) {
1312             static_assert(std::tuple_size<OutputTuple>::value == 3, "wrong number of body initializers");
1313         }
1314         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1315     };
1316 
1317     template<typename OutputTuple, typename K, typename KHash>
1318     class unfolded_join_node<4,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1319             join_base<4,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1320         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1321         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1322         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1323         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1324     public:
1325         typedef typename wrap_key_tuple_elements<4,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1326         typedef OutputTuple output_type;
1327     private:
1328         typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1329         typedef type_to_key_function_body<T0, K> *f0_p;
1330         typedef type_to_key_function_body<T1, K> *f1_p;
1331         typedef type_to_key_function_body<T2, K> *f2_p;
1332         typedef type_to_key_function_body<T3, K> *f3_p;
1333         typedef std::tuple< f0_p, f1_p, f2_p, f3_p > func_initializer_type;
1334     public:
1335 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1336         unfolded_join_node(graph &g) : base_type(g,
1337                 func_initializer_type(
1338                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1339                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1340                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1341                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>())
1342                     ) ) {
1343         }
1344 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1345         template<typename Body0, typename Body1, typename Body2, typename Body3>
1346         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3) : base_type(g,
1347                 func_initializer_type(
1348                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1349                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1350                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1351                     new type_to_key_function_body_leaf<T3, K, Body3>(body3)
1352                     ) ) {
1353             static_assert(std::tuple_size<OutputTuple>::value == 4, "wrong number of body initializers");
1354         }
1355         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1356     };
1357 
1358     template<typename OutputTuple, typename K, typename KHash>
1359     class unfolded_join_node<5,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1360             join_base<5,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1361         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1362         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1363         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1364         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1365         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1366     public:
1367         typedef typename wrap_key_tuple_elements<5,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1368         typedef OutputTuple output_type;
1369     private:
1370         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1371         typedef type_to_key_function_body<T0, K> *f0_p;
1372         typedef type_to_key_function_body<T1, K> *f1_p;
1373         typedef type_to_key_function_body<T2, K> *f2_p;
1374         typedef type_to_key_function_body<T3, K> *f3_p;
1375         typedef type_to_key_function_body<T4, K> *f4_p;
1376         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p > func_initializer_type;
1377     public:
1378 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1379         unfolded_join_node(graph &g) : base_type(g,
1380                 func_initializer_type(
1381                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1382                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1383                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1384                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1385                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>())
1386                     ) ) {
1387         }
1388 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1389         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4>
1390         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4) : base_type(g,
1391                 func_initializer_type(
1392                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1393                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1394                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1395                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1396                     new type_to_key_function_body_leaf<T4, K, Body4>(body4)
1397                     ) ) {
1398             static_assert(std::tuple_size<OutputTuple>::value == 5, "wrong number of body initializers");
1399         }
1400         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1401     };
1402 
1403 #if __TBB_VARIADIC_MAX >= 6
1404     template<typename OutputTuple, typename K, typename KHash>
1405     class unfolded_join_node<6,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1406             join_base<6,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1407         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1408         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1409         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1410         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1411         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1412         typedef typename std::tuple_element<5, OutputTuple>::type T5;
1413     public:
1414         typedef typename wrap_key_tuple_elements<6,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1415         typedef OutputTuple output_type;
1416     private:
1417         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1418         typedef type_to_key_function_body<T0, K> *f0_p;
1419         typedef type_to_key_function_body<T1, K> *f1_p;
1420         typedef type_to_key_function_body<T2, K> *f2_p;
1421         typedef type_to_key_function_body<T3, K> *f3_p;
1422         typedef type_to_key_function_body<T4, K> *f4_p;
1423         typedef type_to_key_function_body<T5, K> *f5_p;
1424         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p > func_initializer_type;
1425     public:
1426 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1427         unfolded_join_node(graph &g) : base_type(g,
1428                 func_initializer_type(
1429                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1430                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1431                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1432                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1433                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1434                     new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>())
1435                     ) ) {
1436         }
1437 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1438         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, typename Body5>
1439         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, Body5 body5)
1440                 : base_type(g, func_initializer_type(
1441                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1442                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1443                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1444                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1445                     new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1446                     new type_to_key_function_body_leaf<T5, K, Body5>(body5)
1447                     ) ) {
1448             static_assert(std::tuple_size<OutputTuple>::value == 6, "wrong number of body initializers");
1449         }
1450         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1451     };
1452 #endif
1453 
1454 #if __TBB_VARIADIC_MAX >= 7
1455     template<typename OutputTuple, typename K, typename KHash>
1456     class unfolded_join_node<7,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1457             join_base<7,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1458         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1459         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1460         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1461         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1462         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1463         typedef typename std::tuple_element<5, OutputTuple>::type T5;
1464         typedef typename std::tuple_element<6, OutputTuple>::type T6;
1465     public:
1466         typedef typename wrap_key_tuple_elements<7,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1467         typedef OutputTuple output_type;
1468     private:
1469         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1470         typedef type_to_key_function_body<T0, K> *f0_p;
1471         typedef type_to_key_function_body<T1, K> *f1_p;
1472         typedef type_to_key_function_body<T2, K> *f2_p;
1473         typedef type_to_key_function_body<T3, K> *f3_p;
1474         typedef type_to_key_function_body<T4, K> *f4_p;
1475         typedef type_to_key_function_body<T5, K> *f5_p;
1476         typedef type_to_key_function_body<T6, K> *f6_p;
1477         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p > func_initializer_type;
1478     public:
1479 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1480         unfolded_join_node(graph &g) : base_type(g,
1481                 func_initializer_type(
1482                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1483                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1484                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1485                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1486                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1487                     new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1488                     new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>())
1489                     ) ) {
1490         }
1491 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1492         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1493                  typename Body5, typename Body6>
1494         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1495                 Body5 body5, Body6 body6) : base_type(g, func_initializer_type(
1496                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1497                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1498                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1499                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1500                     new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1501                     new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1502                     new type_to_key_function_body_leaf<T6, K, Body6>(body6)
1503                     ) ) {
1504             static_assert(std::tuple_size<OutputTuple>::value == 7, "wrong number of body initializers");
1505         }
1506         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1507     };
1508 #endif
1509 
1510 #if __TBB_VARIADIC_MAX >= 8
1511     template<typename OutputTuple, typename K, typename KHash>
1512     class unfolded_join_node<8,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1513             join_base<8,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1514         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1515         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1516         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1517         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1518         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1519         typedef typename std::tuple_element<5, OutputTuple>::type T5;
1520         typedef typename std::tuple_element<6, OutputTuple>::type T6;
1521         typedef typename std::tuple_element<7, OutputTuple>::type T7;
1522     public:
1523         typedef typename wrap_key_tuple_elements<8,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1524         typedef OutputTuple output_type;
1525     private:
1526         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1527         typedef type_to_key_function_body<T0, K> *f0_p;
1528         typedef type_to_key_function_body<T1, K> *f1_p;
1529         typedef type_to_key_function_body<T2, K> *f2_p;
1530         typedef type_to_key_function_body<T3, K> *f3_p;
1531         typedef type_to_key_function_body<T4, K> *f4_p;
1532         typedef type_to_key_function_body<T5, K> *f5_p;
1533         typedef type_to_key_function_body<T6, K> *f6_p;
1534         typedef type_to_key_function_body<T7, K> *f7_p;
1535         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p > func_initializer_type;
1536     public:
1537 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1538         unfolded_join_node(graph &g) : base_type(g,
1539                 func_initializer_type(
1540                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1541                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1542                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1543                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1544                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1545                     new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1546                     new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1547                     new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>())
1548                     ) ) {
1549         }
1550 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1551         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1552                  typename Body5, typename Body6, typename Body7>
1553         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1554                 Body5 body5, Body6 body6, Body7 body7) : base_type(g, func_initializer_type(
1555                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1556                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1557                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1558                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1559                     new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1560                     new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1561                     new type_to_key_function_body_leaf<T6, K, Body6>(body6),
1562                     new type_to_key_function_body_leaf<T7, K, Body7>(body7)
1563                     ) ) {
1564             static_assert(std::tuple_size<OutputTuple>::value == 8, "wrong number of body initializers");
1565         }
1566         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1567     };
1568 #endif
1569 
1570 #if __TBB_VARIADIC_MAX >= 9
1571     template<typename OutputTuple, typename K, typename KHash>
1572     class unfolded_join_node<9,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1573             join_base<9,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1574         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1575         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1576         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1577         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1578         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1579         typedef typename std::tuple_element<5, OutputTuple>::type T5;
1580         typedef typename std::tuple_element<6, OutputTuple>::type T6;
1581         typedef typename std::tuple_element<7, OutputTuple>::type T7;
1582         typedef typename std::tuple_element<8, OutputTuple>::type T8;
1583     public:
1584         typedef typename wrap_key_tuple_elements<9,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1585         typedef OutputTuple output_type;
1586     private:
1587         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1588         typedef type_to_key_function_body<T0, K> *f0_p;
1589         typedef type_to_key_function_body<T1, K> *f1_p;
1590         typedef type_to_key_function_body<T2, K> *f2_p;
1591         typedef type_to_key_function_body<T3, K> *f3_p;
1592         typedef type_to_key_function_body<T4, K> *f4_p;
1593         typedef type_to_key_function_body<T5, K> *f5_p;
1594         typedef type_to_key_function_body<T6, K> *f6_p;
1595         typedef type_to_key_function_body<T7, K> *f7_p;
1596         typedef type_to_key_function_body<T8, K> *f8_p;
1597         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p > func_initializer_type;
1598     public:
1599 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1600         unfolded_join_node(graph &g) : base_type(g,
1601                 func_initializer_type(
1602                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1603                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1604                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1605                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1606                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1607                     new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1608                     new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1609                     new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1610                     new type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>())
1611                     ) ) {
1612         }
1613 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1614         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1615                  typename Body5, typename Body6, typename Body7, typename Body8>
1616         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1617                 Body5 body5, Body6 body6, Body7 body7, Body8 body8) : base_type(g, func_initializer_type(
1618                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1619                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1620                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1621                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1622                     new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1623                     new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1624                     new type_to_key_function_body_leaf<T6, K, Body6>(body6),
1625                     new type_to_key_function_body_leaf<T7, K, Body7>(body7),
1626                     new type_to_key_function_body_leaf<T8, K, Body8>(body8)
1627                     ) ) {
1628             static_assert(std::tuple_size<OutputTuple>::value == 9, "wrong number of body initializers");
1629         }
1630         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1631     };
1632 #endif
1633 
1634 #if __TBB_VARIADIC_MAX >= 10
1635     template<typename OutputTuple, typename K, typename KHash>
1636     class unfolded_join_node<10,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1637             join_base<10,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1638         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1639         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1640         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1641         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1642         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1643         typedef typename std::tuple_element<5, OutputTuple>::type T5;
1644         typedef typename std::tuple_element<6, OutputTuple>::type T6;
1645         typedef typename std::tuple_element<7, OutputTuple>::type T7;
1646         typedef typename std::tuple_element<8, OutputTuple>::type T8;
1647         typedef typename std::tuple_element<9, OutputTuple>::type T9;
1648     public:
1649         typedef typename wrap_key_tuple_elements<10,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1650         typedef OutputTuple output_type;
1651     private:
1652         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1653         typedef type_to_key_function_body<T0, K> *f0_p;
1654         typedef type_to_key_function_body<T1, K> *f1_p;
1655         typedef type_to_key_function_body<T2, K> *f2_p;
1656         typedef type_to_key_function_body<T3, K> *f3_p;
1657         typedef type_to_key_function_body<T4, K> *f4_p;
1658         typedef type_to_key_function_body<T5, K> *f5_p;
1659         typedef type_to_key_function_body<T6, K> *f6_p;
1660         typedef type_to_key_function_body<T7, K> *f7_p;
1661         typedef type_to_key_function_body<T8, K> *f8_p;
1662         typedef type_to_key_function_body<T9, K> *f9_p;
1663         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p, f9_p > func_initializer_type;
1664     public:
1665 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1666         unfolded_join_node(graph &g) : base_type(g,
1667                 func_initializer_type(
1668                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1669                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1670                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1671                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1672                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1673                     new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1674                     new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1675                     new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1676                     new type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>()),
1677                     new type_to_key_function_body_leaf<T9, K, key_from_message_body<K,T9> >(key_from_message_body<K,T9>())
1678                     ) ) {
1679         }
1680 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1681         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1682             typename Body5, typename Body6, typename Body7, typename Body8, typename Body9>
1683         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1684                 Body5 body5, Body6 body6, Body7 body7, Body8 body8, Body9 body9) : base_type(g, func_initializer_type(
1685                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1686                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1687                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1688                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1689                     new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1690                     new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1691                     new type_to_key_function_body_leaf<T6, K, Body6>(body6),
1692                     new type_to_key_function_body_leaf<T7, K, Body7>(body7),
1693                     new type_to_key_function_body_leaf<T8, K, Body8>(body8),
1694                     new type_to_key_function_body_leaf<T9, K, Body9>(body9)
1695                     ) ) {
1696             static_assert(std::tuple_size<OutputTuple>::value == 10, "wrong number of body initializers");
1697         }
1698         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1699     };
1700 #endif
1701 
1702     //! templated function to refer to input ports of the join node
1703     template<size_t N, typename JNT>
1704     typename std::tuple_element<N, typename JNT::input_ports_type>::type &input_port(JNT &jn) {
1705         return std::get<N>(jn.input_ports());
1706     }
1707 
1708 #endif // __TBB__flow_graph_join_impl_H