File indexing completed on 2025-07-30 08:46:17
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB__flow_graph_join_impl_H
0018 #define __TBB__flow_graph_join_impl_H
0019
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023
0024
0025
0026 struct forwarding_base : no_assign {
0027 forwarding_base(graph &g) : graph_ref(g) {}
0028 virtual ~forwarding_base() {}
0029 graph& graph_ref;
0030 };
0031
0032 struct queueing_forwarding_base : forwarding_base {
0033 using forwarding_base::forwarding_base;
0034
0035
0036 virtual graph_task* decrement_port_count(bool handle_task) = 0;
0037 };
0038
0039 struct reserving_forwarding_base : forwarding_base {
0040 using forwarding_base::forwarding_base;
0041
0042
0043 virtual graph_task* decrement_port_count() = 0;
0044 virtual void increment_port_count() = 0;
0045 };
0046
0047
0048
0049 template<typename KeyType>
0050 struct matching_forwarding_base : public forwarding_base {
0051 typedef typename std::decay<KeyType>::type current_key_type;
0052 matching_forwarding_base(graph &g) : forwarding_base(g) { }
0053 virtual graph_task* increment_key_count(current_key_type const & ) = 0;
0054 current_key_type current_key;
0055 };
0056
0057 template< int N >
0058 struct join_helper {
0059
0060 template< typename TupleType, typename PortType >
0061 static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
0062 std::get<N-1>( my_input ).set_join_node_pointer(port);
0063 join_helper<N-1>::set_join_node_pointer( my_input, port );
0064 }
0065 template< typename TupleType >
0066 static inline void consume_reservations( TupleType &my_input ) {
0067 std::get<N-1>( my_input ).consume();
0068 join_helper<N-1>::consume_reservations( my_input );
0069 }
0070
0071 template< typename TupleType >
0072 static inline void release_my_reservation( TupleType &my_input ) {
0073 std::get<N-1>( my_input ).release();
0074 }
0075
0076 template <typename TupleType>
0077 static inline void release_reservations( TupleType &my_input) {
0078 join_helper<N-1>::release_reservations(my_input);
0079 release_my_reservation(my_input);
0080 }
0081
0082 template< typename InputTuple, typename OutputTuple >
0083 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
0084 if ( !std::get<N-1>( my_input ).reserve( std::get<N-1>( out ) ) ) return false;
0085 if ( !join_helper<N-1>::reserve( my_input, out ) ) {
0086 release_my_reservation( my_input );
0087 return false;
0088 }
0089 return true;
0090 }
0091
0092 template<typename InputTuple, typename OutputTuple>
0093 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
0094 bool res = std::get<N-1>(my_input).get_item(std::get<N-1>(out) );
0095 return join_helper<N-1>::get_my_item(my_input, out) && res;
0096 }
0097
0098 template<typename InputTuple, typename OutputTuple>
0099 static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
0100 return get_my_item(my_input, out);
0101 }
0102
0103 template<typename InputTuple>
0104 static inline void reset_my_port(InputTuple &my_input) {
0105 join_helper<N-1>::reset_my_port(my_input);
0106 std::get<N-1>(my_input).reset_port();
0107 }
0108
0109 template<typename InputTuple>
0110 static inline void reset_ports(InputTuple& my_input) {
0111 reset_my_port(my_input);
0112 }
0113
0114 template<typename InputTuple, typename KeyFuncTuple>
0115 static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
0116 std::get<N-1>(my_input).set_my_key_func(std::get<N-1>(my_key_funcs));
0117 std::get<N-1>(my_key_funcs) = nullptr;
0118 join_helper<N-1>::set_key_functors(my_input, my_key_funcs);
0119 }
0120
0121 template< typename KeyFuncTuple>
0122 static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
0123 __TBB_ASSERT(
0124 std::get<N-1>(other_inputs).get_my_key_func(),
0125 "key matching join node should not be instantiated without functors."
0126 );
0127 std::get<N-1>(my_inputs).set_my_key_func(std::get<N-1>(other_inputs).get_my_key_func()->clone());
0128 join_helper<N-1>::copy_key_functors(my_inputs, other_inputs);
0129 }
0130
0131 template<typename InputTuple>
0132 static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
0133 join_helper<N-1>::reset_inputs(my_input, f);
0134 std::get<N-1>(my_input).reset_receiver(f);
0135 }
0136 };
0137
0138 template< >
0139 struct join_helper<1> {
0140
0141 template< typename TupleType, typename PortType >
0142 static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
0143 std::get<0>( my_input ).set_join_node_pointer(port);
0144 }
0145
0146 template< typename TupleType >
0147 static inline void consume_reservations( TupleType &my_input ) {
0148 std::get<0>( my_input ).consume();
0149 }
0150
0151 template< typename TupleType >
0152 static inline void release_my_reservation( TupleType &my_input ) {
0153 std::get<0>( my_input ).release();
0154 }
0155
0156 template<typename TupleType>
0157 static inline void release_reservations( TupleType &my_input) {
0158 release_my_reservation(my_input);
0159 }
0160
0161 template< typename InputTuple, typename OutputTuple >
0162 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
0163 return std::get<0>( my_input ).reserve( std::get<0>( out ) );
0164 }
0165
0166 template<typename InputTuple, typename OutputTuple>
0167 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
0168 return std::get<0>(my_input).get_item(std::get<0>(out));
0169 }
0170
0171 template<typename InputTuple, typename OutputTuple>
0172 static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
0173 return get_my_item(my_input, out);
0174 }
0175
0176 template<typename InputTuple>
0177 static inline void reset_my_port(InputTuple &my_input) {
0178 std::get<0>(my_input).reset_port();
0179 }
0180
0181 template<typename InputTuple>
0182 static inline void reset_ports(InputTuple& my_input) {
0183 reset_my_port(my_input);
0184 }
0185
0186 template<typename InputTuple, typename KeyFuncTuple>
0187 static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
0188 std::get<0>(my_input).set_my_key_func(std::get<0>(my_key_funcs));
0189 std::get<0>(my_key_funcs) = nullptr;
0190 }
0191
0192 template< typename KeyFuncTuple>
0193 static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
0194 __TBB_ASSERT(
0195 std::get<0>(other_inputs).get_my_key_func(),
0196 "key matching join node should not be instantiated without functors."
0197 );
0198 std::get<0>(my_inputs).set_my_key_func(std::get<0>(other_inputs).get_my_key_func()->clone());
0199 }
0200 template<typename InputTuple>
0201 static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
0202 std::get<0>(my_input).reset_receiver(f);
0203 }
0204 };
0205
0206
0207 template< typename T >
0208 class reserving_port : public receiver<T> {
0209 public:
0210 typedef T input_type;
0211 typedef typename receiver<input_type>::predecessor_type predecessor_type;
0212
0213 private:
0214
0215 enum op_type { reg_pred, rem_pred, res_item, rel_res, con_res
0216 };
0217 typedef reserving_port<T> class_type;
0218
0219 class reserving_port_operation : public aggregated_operation<reserving_port_operation> {
0220 public:
0221 char type;
0222 union {
0223 T *my_arg;
0224 predecessor_type *my_pred;
0225 };
0226 reserving_port_operation(const T& e, op_type t) :
0227 type(char(t)), my_arg(const_cast<T*>(&e)) {}
0228 reserving_port_operation(const predecessor_type &s, op_type t) : type(char(t)),
0229 my_pred(const_cast<predecessor_type *>(&s)) {}
0230 reserving_port_operation(op_type t) : type(char(t)) {}
0231 };
0232
0233 typedef aggregating_functor<class_type, reserving_port_operation> handler_type;
0234 friend class aggregating_functor<class_type, reserving_port_operation>;
0235 aggregator<handler_type, reserving_port_operation> my_aggregator;
0236
0237 void handle_operations(reserving_port_operation* op_list) {
0238 reserving_port_operation *current;
0239 bool was_missing_predecessors = false;
0240 while(op_list) {
0241 current = op_list;
0242 op_list = op_list->next;
0243 switch(current->type) {
0244 case reg_pred:
0245 was_missing_predecessors = my_predecessors.empty();
0246 my_predecessors.add(*(current->my_pred));
0247 if ( was_missing_predecessors ) {
0248 (void) my_join->decrement_port_count();
0249 }
0250 current->status.store( SUCCEEDED, std::memory_order_release);
0251 break;
0252 case rem_pred:
0253 if ( !my_predecessors.empty() ) {
0254 my_predecessors.remove(*(current->my_pred));
0255 if ( my_predecessors.empty() )
0256 my_join->increment_port_count();
0257 }
0258
0259 current->status.store( SUCCEEDED, std::memory_order_release );
0260 break;
0261 case res_item:
0262 if ( reserved ) {
0263 current->status.store( FAILED, std::memory_order_release);
0264 }
0265 else if ( my_predecessors.try_reserve( *(current->my_arg) ) ) {
0266 reserved = true;
0267 current->status.store( SUCCEEDED, std::memory_order_release);
0268 } else {
0269 if ( my_predecessors.empty() ) {
0270 my_join->increment_port_count();
0271 }
0272 current->status.store( FAILED, std::memory_order_release);
0273 }
0274 break;
0275 case rel_res:
0276 reserved = false;
0277 my_predecessors.try_release( );
0278 current->status.store( SUCCEEDED, std::memory_order_release);
0279 break;
0280 case con_res:
0281 reserved = false;
0282 my_predecessors.try_consume( );
0283 current->status.store( SUCCEEDED, std::memory_order_release);
0284 break;
0285 }
0286 }
0287 }
0288
0289 protected:
0290 template< typename R, typename B > friend class run_and_put_task;
0291 template<typename X, typename Y> friend class broadcast_cache;
0292 template<typename X, typename Y> friend class round_robin_cache;
0293 graph_task* try_put_task( const T & ) override {
0294 return nullptr;
0295 }
0296
0297 graph& graph_reference() const override {
0298 return my_join->graph_ref;
0299 }
0300
0301 public:
0302
0303
0304 reserving_port() : my_join(nullptr), my_predecessors(this), reserved(false) {
0305 my_aggregator.initialize_handler(handler_type(this));
0306 }
0307
0308
0309 reserving_port(const reserving_port& ) = delete;
0310
0311 void set_join_node_pointer(reserving_forwarding_base *join) {
0312 my_join = join;
0313 }
0314
0315
0316 bool register_predecessor( predecessor_type &src ) override {
0317 reserving_port_operation op_data(src, reg_pred);
0318 my_aggregator.execute(&op_data);
0319 return op_data.status == SUCCEEDED;
0320 }
0321
0322
0323 bool remove_predecessor( predecessor_type &src ) override {
0324 reserving_port_operation op_data(src, rem_pred);
0325 my_aggregator.execute(&op_data);
0326 return op_data.status == SUCCEEDED;
0327 }
0328
0329
0330 bool reserve( T &v ) {
0331 reserving_port_operation op_data(v, res_item);
0332 my_aggregator.execute(&op_data);
0333 return op_data.status == SUCCEEDED;
0334 }
0335
0336
0337 void release( ) {
0338 reserving_port_operation op_data(rel_res);
0339 my_aggregator.execute(&op_data);
0340 }
0341
0342
0343 void consume( ) {
0344 reserving_port_operation op_data(con_res);
0345 my_aggregator.execute(&op_data);
0346 }
0347
0348 void reset_receiver( reset_flags f) {
0349 if(f & rf_clear_edges) my_predecessors.clear();
0350 else
0351 my_predecessors.reset();
0352 reserved = false;
0353 __TBB_ASSERT(!(f&rf_clear_edges) || my_predecessors.empty(), "port edges not removed");
0354 }
0355
0356 private:
0357 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0358 friend class get_graph_helper;
0359 #endif
0360
0361 reserving_forwarding_base *my_join;
0362 reservable_predecessor_cache< T, null_mutex > my_predecessors;
0363 bool reserved;
0364 };
0365
0366
0367 template<typename T>
0368 class queueing_port : public receiver<T>, public item_buffer<T> {
0369 public:
0370 typedef T input_type;
0371 typedef typename receiver<input_type>::predecessor_type predecessor_type;
0372 typedef queueing_port<T> class_type;
0373
0374
0375 private:
0376 enum op_type { get__item, res_port, try__put_task
0377 };
0378
0379 class queueing_port_operation : public aggregated_operation<queueing_port_operation> {
0380 public:
0381 char type;
0382 T my_val;
0383 T* my_arg;
0384 graph_task* bypass_t;
0385
0386 queueing_port_operation(const T& e, op_type t) :
0387 type(char(t)), my_val(e), my_arg(nullptr)
0388 , bypass_t(nullptr)
0389 {}
0390
0391 queueing_port_operation(const T* p, op_type t) :
0392 type(char(t)), my_arg(const_cast<T*>(p))
0393 , bypass_t(nullptr)
0394 {}
0395
0396 queueing_port_operation(op_type t) : type(char(t)), my_arg(nullptr)
0397 , bypass_t(nullptr)
0398 {}
0399 };
0400
0401 typedef aggregating_functor<class_type, queueing_port_operation> handler_type;
0402 friend class aggregating_functor<class_type, queueing_port_operation>;
0403 aggregator<handler_type, queueing_port_operation> my_aggregator;
0404
0405 void handle_operations(queueing_port_operation* op_list) {
0406 queueing_port_operation *current;
0407 bool was_empty;
0408 while(op_list) {
0409 current = op_list;
0410 op_list = op_list->next;
0411 switch(current->type) {
0412 case try__put_task: {
0413 graph_task* rtask = nullptr;
0414 was_empty = this->buffer_empty();
0415 this->push_back(current->my_val);
0416 if (was_empty) rtask = my_join->decrement_port_count(false);
0417 else
0418 rtask = SUCCESSFULLY_ENQUEUED;
0419 current->bypass_t = rtask;
0420 current->status.store( SUCCEEDED, std::memory_order_release);
0421 }
0422 break;
0423 case get__item:
0424 if(!this->buffer_empty()) {
0425 __TBB_ASSERT(current->my_arg, nullptr);
0426 *(current->my_arg) = this->front();
0427 current->status.store( SUCCEEDED, std::memory_order_release);
0428 }
0429 else {
0430 current->status.store( FAILED, std::memory_order_release);
0431 }
0432 break;
0433 case res_port:
0434 __TBB_ASSERT(this->my_item_valid(this->my_head), "No item to reset");
0435 this->destroy_front();
0436 if(this->my_item_valid(this->my_head)) {
0437 (void)my_join->decrement_port_count(true);
0438 }
0439 current->status.store( SUCCEEDED, std::memory_order_release);
0440 break;
0441 }
0442 }
0443 }
0444
0445
0446 protected:
0447 template< typename R, typename B > friend class run_and_put_task;
0448 template<typename X, typename Y> friend class broadcast_cache;
0449 template<typename X, typename Y> friend class round_robin_cache;
0450 graph_task* try_put_task(const T &v) override {
0451 queueing_port_operation op_data(v, try__put_task);
0452 my_aggregator.execute(&op_data);
0453 __TBB_ASSERT(op_data.status == SUCCEEDED || !op_data.bypass_t, "inconsistent return from aggregator");
0454 if(!op_data.bypass_t) return SUCCESSFULLY_ENQUEUED;
0455 return op_data.bypass_t;
0456 }
0457
0458 graph& graph_reference() const override {
0459 return my_join->graph_ref;
0460 }
0461
0462 public:
0463
0464
0465 queueing_port() : item_buffer<T>() {
0466 my_join = nullptr;
0467 my_aggregator.initialize_handler(handler_type(this));
0468 }
0469
0470
0471 queueing_port(const queueing_port& ) = delete;
0472
0473
0474 void set_join_node_pointer(queueing_forwarding_base *join) {
0475 my_join = join;
0476 }
0477
0478 bool get_item( T &v ) {
0479 queueing_port_operation op_data(&v, get__item);
0480 my_aggregator.execute(&op_data);
0481 return op_data.status == SUCCEEDED;
0482 }
0483
0484
0485
0486 void reset_port() {
0487 queueing_port_operation op_data(res_port);
0488 my_aggregator.execute(&op_data);
0489 return;
0490 }
0491
0492 void reset_receiver(reset_flags) {
0493 item_buffer<T>::reset();
0494 }
0495
0496 private:
0497 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0498 friend class get_graph_helper;
0499 #endif
0500
0501 queueing_forwarding_base *my_join;
0502 };
0503
0504 #include "_flow_graph_tagged_buffer_impl.h"
0505
0506 template<typename K>
0507 struct count_element {
0508 K my_key;
0509 size_t my_value;
0510 };
0511
0512
0513
0514 template< typename K >
0515 struct key_to_count_functor {
0516 typedef count_element<K> table_item_type;
0517 const K& operator()(const table_item_type& v) { return v.my_key; }
0518 };
0519
0520
0521
0522 template< class TraitsType >
0523 class key_matching_port :
0524 public receiver<typename TraitsType::T>,
0525 public hash_buffer< typename TraitsType::K, typename TraitsType::T, typename TraitsType::TtoK,
0526 typename TraitsType::KHash > {
0527 public:
0528 typedef TraitsType traits;
0529 typedef key_matching_port<traits> class_type;
0530 typedef typename TraitsType::T input_type;
0531 typedef typename TraitsType::K key_type;
0532 typedef typename std::decay<key_type>::type noref_key_type;
0533 typedef typename receiver<input_type>::predecessor_type predecessor_type;
0534 typedef typename TraitsType::TtoK type_to_key_func_type;
0535 typedef typename TraitsType::KHash hash_compare_type;
0536 typedef hash_buffer< key_type, input_type, type_to_key_func_type, hash_compare_type > buffer_type;
0537
0538 private:
0539
0540 private:
0541 enum op_type { try__put, get__item, res_port
0542 };
0543
0544 class key_matching_port_operation : public aggregated_operation<key_matching_port_operation> {
0545 public:
0546 char type;
0547 input_type my_val;
0548 input_type *my_arg;
0549
0550 key_matching_port_operation(const input_type& e, op_type t) :
0551 type(char(t)), my_val(e), my_arg(nullptr) {}
0552
0553 key_matching_port_operation(const input_type* p, op_type t) :
0554 type(char(t)), my_arg(const_cast<input_type*>(p)) {}
0555
0556 key_matching_port_operation(op_type t) : type(char(t)), my_arg(nullptr) {}
0557 };
0558
0559 typedef aggregating_functor<class_type, key_matching_port_operation> handler_type;
0560 friend class aggregating_functor<class_type, key_matching_port_operation>;
0561 aggregator<handler_type, key_matching_port_operation> my_aggregator;
0562
0563 void handle_operations(key_matching_port_operation* op_list) {
0564 key_matching_port_operation *current;
0565 while(op_list) {
0566 current = op_list;
0567 op_list = op_list->next;
0568 switch(current->type) {
0569 case try__put: {
0570 bool was_inserted = this->insert_with_key(current->my_val);
0571
0572 current->status.store( was_inserted ? SUCCEEDED : FAILED, std::memory_order_release);
0573 }
0574 break;
0575 case get__item:
0576
0577 __TBB_ASSERT(current->my_arg, nullptr);
0578 if(!this->find_with_key(my_join->current_key, *(current->my_arg))) {
0579 __TBB_ASSERT(false, "Failed to find item corresponding to current_key.");
0580 }
0581 current->status.store( SUCCEEDED, std::memory_order_release);
0582 break;
0583 case res_port:
0584
0585 this->delete_with_key(my_join->current_key);
0586 current->status.store( SUCCEEDED, std::memory_order_release);
0587 break;
0588 }
0589 }
0590 }
0591
0592 protected:
0593 template< typename R, typename B > friend class run_and_put_task;
0594 template<typename X, typename Y> friend class broadcast_cache;
0595 template<typename X, typename Y> friend class round_robin_cache;
0596 graph_task* try_put_task(const input_type& v) override {
0597 key_matching_port_operation op_data(v, try__put);
0598 graph_task* rtask = nullptr;
0599 my_aggregator.execute(&op_data);
0600 if(op_data.status == SUCCEEDED) {
0601 rtask = my_join->increment_key_count((*(this->get_key_func()))(v));
0602
0603 if(!rtask) rtask = SUCCESSFULLY_ENQUEUED;
0604 }
0605 return rtask;
0606 }
0607
0608 graph& graph_reference() const override {
0609 return my_join->graph_ref;
0610 }
0611
0612 public:
0613
0614 key_matching_port() : receiver<input_type>(), buffer_type() {
0615 my_join = nullptr;
0616 my_aggregator.initialize_handler(handler_type(this));
0617 }
0618
0619
0620 key_matching_port(const key_matching_port& ) = delete;
0621 #if __INTEL_COMPILER <= 2021
0622
0623
0624 virtual
0625 #endif
0626 ~key_matching_port() { }
0627
0628 void set_join_node_pointer(forwarding_base *join) {
0629 my_join = dynamic_cast<matching_forwarding_base<key_type>*>(join);
0630 }
0631
0632 void set_my_key_func(type_to_key_func_type *f) { this->set_key_func(f); }
0633
0634 type_to_key_func_type* get_my_key_func() { return this->get_key_func(); }
0635
0636 bool get_item( input_type &v ) {
0637
0638 key_matching_port_operation op_data(&v, get__item);
0639 my_aggregator.execute(&op_data);
0640 return op_data.status == SUCCEEDED;
0641 }
0642
0643
0644
0645 void reset_port() {
0646 key_matching_port_operation op_data(res_port);
0647 my_aggregator.execute(&op_data);
0648 return;
0649 }
0650
0651 void reset_receiver(reset_flags ) {
0652 buffer_type::reset();
0653 }
0654
0655 private:
0656
0657
0658 matching_forwarding_base<key_type> *my_join;
0659 };
0660
0661 using namespace graph_policy_namespace;
0662
0663 template<typename JP, typename InputTuple, typename OutputTuple>
0664 class join_node_base;
0665
0666
0667 template<typename JP, typename InputTuple, typename OutputTuple>
0668 class join_node_FE;
0669
0670 template<typename InputTuple, typename OutputTuple>
0671 class join_node_FE<reserving, InputTuple, OutputTuple> : public reserving_forwarding_base {
0672 private:
0673 static const int N = std::tuple_size<OutputTuple>::value;
0674 typedef OutputTuple output_type;
0675 typedef InputTuple input_type;
0676 typedef join_node_base<reserving, InputTuple, OutputTuple> base_node_type;
0677 public:
0678 join_node_FE(graph &g) : reserving_forwarding_base(g), my_node(nullptr) {
0679 ports_with_no_inputs = N;
0680 join_helper<N>::set_join_node_pointer(my_inputs, this);
0681 }
0682
0683 join_node_FE(const join_node_FE& other) : reserving_forwarding_base((other.reserving_forwarding_base::graph_ref)), my_node(nullptr) {
0684 ports_with_no_inputs = N;
0685 join_helper<N>::set_join_node_pointer(my_inputs, this);
0686 }
0687
0688 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
0689
0690 void increment_port_count() override {
0691 ++ports_with_no_inputs;
0692 }
0693
0694
0695 graph_task* decrement_port_count() override {
0696 if(ports_with_no_inputs.fetch_sub(1) == 1) {
0697 if(is_graph_active(this->graph_ref)) {
0698 small_object_allocator allocator{};
0699 typedef forward_task_bypass<base_node_type> task_type;
0700 graph_task* t = allocator.new_object<task_type>(graph_ref, allocator, *my_node);
0701 graph_ref.reserve_wait();
0702 spawn_in_graph_arena(this->graph_ref, *t);
0703 }
0704 }
0705 return nullptr;
0706 }
0707
0708 input_type &input_ports() { return my_inputs; }
0709
0710 protected:
0711
0712 void reset( reset_flags f) {
0713
0714 ports_with_no_inputs = N;
0715 join_helper<N>::reset_inputs(my_inputs, f);
0716 }
0717
0718
0719
0720 bool tuple_build_may_succeed() {
0721 return !ports_with_no_inputs;
0722 }
0723
0724 bool try_to_make_tuple(output_type &out) {
0725 if(ports_with_no_inputs) return false;
0726 return join_helper<N>::reserve(my_inputs, out);
0727 }
0728
0729 void tuple_accepted() {
0730 join_helper<N>::consume_reservations(my_inputs);
0731 }
0732 void tuple_rejected() {
0733 join_helper<N>::release_reservations(my_inputs);
0734 }
0735
0736 input_type my_inputs;
0737 base_node_type *my_node;
0738 std::atomic<std::size_t> ports_with_no_inputs;
0739 };
0740
0741 template<typename InputTuple, typename OutputTuple>
0742 class join_node_FE<queueing, InputTuple, OutputTuple> : public queueing_forwarding_base {
0743 public:
0744 static const int N = std::tuple_size<OutputTuple>::value;
0745 typedef OutputTuple output_type;
0746 typedef InputTuple input_type;
0747 typedef join_node_base<queueing, InputTuple, OutputTuple> base_node_type;
0748
0749 join_node_FE(graph &g) : queueing_forwarding_base(g), my_node(nullptr) {
0750 ports_with_no_items = N;
0751 join_helper<N>::set_join_node_pointer(my_inputs, this);
0752 }
0753
0754 join_node_FE(const join_node_FE& other) : queueing_forwarding_base((other.queueing_forwarding_base::graph_ref)), my_node(nullptr) {
0755 ports_with_no_items = N;
0756 join_helper<N>::set_join_node_pointer(my_inputs, this);
0757 }
0758
0759
0760 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
0761
0762 void reset_port_count() {
0763 ports_with_no_items = N;
0764 }
0765
0766
0767 graph_task* decrement_port_count(bool handle_task) override
0768 {
0769 if(ports_with_no_items.fetch_sub(1) == 1) {
0770 if(is_graph_active(this->graph_ref)) {
0771 small_object_allocator allocator{};
0772 typedef forward_task_bypass<base_node_type> task_type;
0773 graph_task* t = allocator.new_object<task_type>(graph_ref, allocator, *my_node);
0774 graph_ref.reserve_wait();
0775 if( !handle_task )
0776 return t;
0777 spawn_in_graph_arena(this->graph_ref, *t);
0778 }
0779 }
0780 return nullptr;
0781 }
0782
0783 input_type &input_ports() { return my_inputs; }
0784
0785 protected:
0786
0787 void reset( reset_flags f) {
0788 reset_port_count();
0789 join_helper<N>::reset_inputs(my_inputs, f );
0790 }
0791
0792
0793
0794 bool tuple_build_may_succeed() {
0795 return !ports_with_no_items;
0796 }
0797
0798 bool try_to_make_tuple(output_type &out) {
0799 if(ports_with_no_items) return false;
0800 return join_helper<N>::get_items(my_inputs, out);
0801 }
0802
0803 void tuple_accepted() {
0804 reset_port_count();
0805 join_helper<N>::reset_ports(my_inputs);
0806 }
0807 void tuple_rejected() {
0808
0809 }
0810
0811 input_type my_inputs;
0812 base_node_type *my_node;
0813 std::atomic<std::size_t> ports_with_no_items;
0814 };
0815
0816
0817 template<typename InputTuple, typename OutputTuple, typename K, typename KHash>
0818 class join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple> : public matching_forwarding_base<K>,
0819
0820 public hash_buffer<
0821 typename std::decay<K>::type&,
0822 count_element<typename std::decay<K>::type>,
0823 type_to_key_function_body<
0824 count_element<typename std::decay<K>::type>,
0825 typename std::decay<K>::type& >,
0826 KHash >,
0827
0828 public item_buffer<OutputTuple> {
0829 public:
0830 static const int N = std::tuple_size<OutputTuple>::value;
0831 typedef OutputTuple output_type;
0832 typedef InputTuple input_type;
0833 typedef K key_type;
0834 typedef typename std::decay<key_type>::type unref_key_type;
0835 typedef KHash key_hash_compare;
0836
0837 typedef count_element<unref_key_type> count_element_type;
0838
0839 typedef key_to_count_functor<unref_key_type> key_to_count_func;
0840 typedef type_to_key_function_body< count_element_type, unref_key_type&> TtoK_function_body_type;
0841 typedef type_to_key_function_body_leaf<count_element_type, unref_key_type&, key_to_count_func> TtoK_function_body_leaf_type;
0842
0843
0844 typedef hash_buffer< unref_key_type&, count_element_type, TtoK_function_body_type, key_hash_compare >
0845 key_to_count_buffer_type;
0846 typedef item_buffer<output_type> output_buffer_type;
0847 typedef join_node_base<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> base_node_type;
0848 typedef matching_forwarding_base<key_type> forwarding_base_type;
0849
0850
0851
0852
0853 private:
0854 enum op_type { res_count, inc_count, may_succeed, try_make };
0855 typedef join_node_FE<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> class_type;
0856
0857 class key_matching_FE_operation : public aggregated_operation<key_matching_FE_operation> {
0858 public:
0859 char type;
0860 unref_key_type my_val;
0861 output_type* my_output;
0862 graph_task* bypass_t;
0863
0864 key_matching_FE_operation(const unref_key_type& e , op_type t) : type(char(t)), my_val(e),
0865 my_output(nullptr), bypass_t(nullptr) {}
0866 key_matching_FE_operation(output_type *p, op_type t) : type(char(t)), my_output(p), bypass_t(nullptr) {}
0867
0868 key_matching_FE_operation(op_type t) : type(char(t)), my_output(nullptr), bypass_t(nullptr) {}
0869 };
0870
0871 typedef aggregating_functor<class_type, key_matching_FE_operation> handler_type;
0872 friend class aggregating_functor<class_type, key_matching_FE_operation>;
0873 aggregator<handler_type, key_matching_FE_operation> my_aggregator;
0874
0875
0876
0877
0878 graph_task* fill_output_buffer(unref_key_type &t) {
0879 output_type l_out;
0880 graph_task* rtask = nullptr;
0881 bool do_fwd = this->buffer_empty() && is_graph_active(this->graph_ref);
0882 this->current_key = t;
0883 this->delete_with_key(this->current_key);
0884 if(join_helper<N>::get_items(my_inputs, l_out)) {
0885 this->push_back(l_out);
0886 if(do_fwd) {
0887 small_object_allocator allocator{};
0888 typedef forward_task_bypass<base_node_type> task_type;
0889 rtask = allocator.new_object<task_type>(this->graph_ref, allocator, *my_node);
0890 this->graph_ref.reserve_wait();
0891 do_fwd = false;
0892 }
0893
0894 join_helper<N>::reset_ports(my_inputs);
0895 }
0896 else {
0897 __TBB_ASSERT(false, "should have had something to push");
0898 }
0899 return rtask;
0900 }
0901
0902 void handle_operations(key_matching_FE_operation* op_list) {
0903 key_matching_FE_operation *current;
0904 while(op_list) {
0905 current = op_list;
0906 op_list = op_list->next;
0907 switch(current->type) {
0908 case res_count:
0909 {
0910 this->destroy_front();
0911 current->status.store( SUCCEEDED, std::memory_order_release);
0912 }
0913 break;
0914 case inc_count: {
0915 count_element_type *p = nullptr;
0916 unref_key_type &t = current->my_val;
0917 if(!(this->find_ref_with_key(t,p))) {
0918 count_element_type ev;
0919 ev.my_key = t;
0920 ev.my_value = 0;
0921 this->insert_with_key(ev);
0922 bool found = this->find_ref_with_key(t, p);
0923 __TBB_ASSERT_EX(found, "should find key after inserting it");
0924 }
0925 if(++(p->my_value) == size_t(N)) {
0926 current->bypass_t = fill_output_buffer(t);
0927 }
0928 }
0929 current->status.store( SUCCEEDED, std::memory_order_release);
0930 break;
0931 case may_succeed:
0932 current->status.store( this->buffer_empty() ? FAILED : SUCCEEDED, std::memory_order_release);
0933 break;
0934 case try_make:
0935 if(this->buffer_empty()) {
0936 current->status.store( FAILED, std::memory_order_release);
0937 }
0938 else {
0939 *(current->my_output) = this->front();
0940 current->status.store( SUCCEEDED, std::memory_order_release);
0941 }
0942 break;
0943 }
0944 }
0945 }
0946
0947
0948 public:
0949 template<typename FunctionTuple>
0950 join_node_FE(graph &g, FunctionTuple &TtoK_funcs) : forwarding_base_type(g), my_node(nullptr) {
0951 join_helper<N>::set_join_node_pointer(my_inputs, this);
0952 join_helper<N>::set_key_functors(my_inputs, TtoK_funcs);
0953 my_aggregator.initialize_handler(handler_type(this));
0954 TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func());
0955 this->set_key_func(cfb);
0956 }
0957
0958 join_node_FE(const join_node_FE& other) : forwarding_base_type((other.forwarding_base_type::graph_ref)), key_to_count_buffer_type(),
0959 output_buffer_type() {
0960 my_node = nullptr;
0961 join_helper<N>::set_join_node_pointer(my_inputs, this);
0962 join_helper<N>::copy_key_functors(my_inputs, const_cast<input_type &>(other.my_inputs));
0963 my_aggregator.initialize_handler(handler_type(this));
0964 TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func());
0965 this->set_key_func(cfb);
0966 }
0967
0968
0969 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
0970
0971 void reset_port_count() {
0972 key_matching_FE_operation op_data(res_count);
0973 my_aggregator.execute(&op_data);
0974 return;
0975 }
0976
0977
0978
0979 graph_task *increment_key_count(unref_key_type const & t) override {
0980 key_matching_FE_operation op_data(t, inc_count);
0981 my_aggregator.execute(&op_data);
0982 return op_data.bypass_t;
0983 }
0984
0985 input_type &input_ports() { return my_inputs; }
0986
0987 protected:
0988
0989 void reset( reset_flags f ) {
0990
0991 join_helper<N>::reset_inputs(my_inputs, f);
0992
0993 key_to_count_buffer_type::reset();
0994 output_buffer_type::reset();
0995 }
0996
0997
0998
0999 bool tuple_build_may_succeed() {
1000 key_matching_FE_operation op_data(may_succeed);
1001 my_aggregator.execute(&op_data);
1002 return op_data.status == SUCCEEDED;
1003 }
1004
1005
1006
1007 bool try_to_make_tuple(output_type &out) {
1008 key_matching_FE_operation op_data(&out,try_make);
1009 my_aggregator.execute(&op_data);
1010 return op_data.status == SUCCEEDED;
1011 }
1012
1013 void tuple_accepted() {
1014 reset_port_count();
1015 }
1016
1017 void tuple_rejected() {
1018
1019 }
1020
1021 input_type my_inputs;
1022 base_node_type *my_node;
1023 };
1024
1025
1026 template<typename JP, typename InputTuple, typename OutputTuple>
1027 class join_node_base : public graph_node, public join_node_FE<JP, InputTuple, OutputTuple>,
1028 public sender<OutputTuple> {
1029 protected:
1030 using graph_node::my_graph;
1031 public:
1032 typedef OutputTuple output_type;
1033
1034 typedef typename sender<output_type>::successor_type successor_type;
1035 typedef join_node_FE<JP, InputTuple, OutputTuple> input_ports_type;
1036 using input_ports_type::tuple_build_may_succeed;
1037 using input_ports_type::try_to_make_tuple;
1038 using input_ports_type::tuple_accepted;
1039 using input_ports_type::tuple_rejected;
1040
1041 private:
1042
1043 enum op_type { reg_succ, rem_succ, try__get, do_fwrd, do_fwrd_bypass
1044 };
1045 typedef join_node_base<JP,InputTuple,OutputTuple> class_type;
1046
1047 class join_node_base_operation : public aggregated_operation<join_node_base_operation> {
1048 public:
1049 char type;
1050 union {
1051 output_type *my_arg;
1052 successor_type *my_succ;
1053 };
1054 graph_task* bypass_t;
1055 join_node_base_operation(const output_type& e, op_type t) : type(char(t)),
1056 my_arg(const_cast<output_type*>(&e)), bypass_t(nullptr) {}
1057 join_node_base_operation(const successor_type &s, op_type t) : type(char(t)),
1058 my_succ(const_cast<successor_type *>(&s)), bypass_t(nullptr) {}
1059 join_node_base_operation(op_type t) : type(char(t)), bypass_t(nullptr) {}
1060 };
1061
1062 typedef aggregating_functor<class_type, join_node_base_operation> handler_type;
1063 friend class aggregating_functor<class_type, join_node_base_operation>;
1064 bool forwarder_busy;
1065 aggregator<handler_type, join_node_base_operation> my_aggregator;
1066
1067 void handle_operations(join_node_base_operation* op_list) {
1068 join_node_base_operation *current;
1069 while(op_list) {
1070 current = op_list;
1071 op_list = op_list->next;
1072 switch(current->type) {
1073 case reg_succ: {
1074 my_successors.register_successor(*(current->my_succ));
1075 if(tuple_build_may_succeed() && !forwarder_busy && is_graph_active(my_graph)) {
1076 small_object_allocator allocator{};
1077 typedef forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> > task_type;
1078 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
1079 my_graph.reserve_wait();
1080 spawn_in_graph_arena(my_graph, *t);
1081 forwarder_busy = true;
1082 }
1083 current->status.store( SUCCEEDED, std::memory_order_release);
1084 }
1085 break;
1086 case rem_succ:
1087 my_successors.remove_successor(*(current->my_succ));
1088 current->status.store( SUCCEEDED, std::memory_order_release);
1089 break;
1090 case try__get:
1091 if(tuple_build_may_succeed()) {
1092 if(try_to_make_tuple(*(current->my_arg))) {
1093 tuple_accepted();
1094 current->status.store( SUCCEEDED, std::memory_order_release);
1095 }
1096 else current->status.store( FAILED, std::memory_order_release);
1097 }
1098 else current->status.store( FAILED, std::memory_order_release);
1099 break;
1100 case do_fwrd_bypass: {
1101 bool build_succeeded;
1102 graph_task *last_task = nullptr;
1103 output_type out;
1104
1105
1106
1107
1108
1109
1110
1111 if(tuple_build_may_succeed()) {
1112 do {
1113 build_succeeded = try_to_make_tuple(out);
1114 if(build_succeeded) {
1115 graph_task *new_task = my_successors.try_put_task(out);
1116 last_task = combine_tasks(my_graph, last_task, new_task);
1117 if(new_task) {
1118 tuple_accepted();
1119 }
1120 else {
1121 tuple_rejected();
1122 build_succeeded = false;
1123 }
1124 }
1125 } while(build_succeeded);
1126 }
1127 current->bypass_t = last_task;
1128 current->status.store( SUCCEEDED, std::memory_order_release);
1129 forwarder_busy = false;
1130 }
1131 break;
1132 }
1133 }
1134 }
1135
1136 public:
1137 join_node_base(graph &g)
1138 : graph_node(g), input_ports_type(g), forwarder_busy(false), my_successors(this)
1139 {
1140 input_ports_type::set_my_node(this);
1141 my_aggregator.initialize_handler(handler_type(this));
1142 }
1143
1144 join_node_base(const join_node_base& other) :
1145 graph_node(other.graph_node::my_graph), input_ports_type(other),
1146 sender<OutputTuple>(), forwarder_busy(false), my_successors(this)
1147 {
1148 input_ports_type::set_my_node(this);
1149 my_aggregator.initialize_handler(handler_type(this));
1150 }
1151
1152 template<typename FunctionTuple>
1153 join_node_base(graph &g, FunctionTuple f)
1154 : graph_node(g), input_ports_type(g, f), forwarder_busy(false), my_successors(this)
1155 {
1156 input_ports_type::set_my_node(this);
1157 my_aggregator.initialize_handler(handler_type(this));
1158 }
1159
1160 bool register_successor(successor_type &r) override {
1161 join_node_base_operation op_data(r, reg_succ);
1162 my_aggregator.execute(&op_data);
1163 return op_data.status == SUCCEEDED;
1164 }
1165
1166 bool remove_successor( successor_type &r) override {
1167 join_node_base_operation op_data(r, rem_succ);
1168 my_aggregator.execute(&op_data);
1169 return op_data.status == SUCCEEDED;
1170 }
1171
1172 bool try_get( output_type &v) override {
1173 join_node_base_operation op_data(v, try__get);
1174 my_aggregator.execute(&op_data);
1175 return op_data.status == SUCCEEDED;
1176 }
1177
1178 protected:
1179 void reset_node(reset_flags f) override {
1180 input_ports_type::reset(f);
1181 if(f & rf_clear_edges) my_successors.clear();
1182 }
1183
1184 private:
1185 broadcast_cache<output_type, null_rw_mutex> my_successors;
1186
1187 friend class forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> >;
1188 graph_task *forward_task() {
1189 join_node_base_operation op_data(do_fwrd_bypass);
1190 my_aggregator.execute(&op_data);
1191 return op_data.bypass_t;
1192 }
1193
1194 };
1195
1196
1197 template<int N, template<class> class PT, typename OutputTuple, typename JP>
1198 struct join_base {
1199 typedef join_node_base<JP, typename wrap_tuple_elements<N,PT,OutputTuple>::type, OutputTuple> type;
1200 };
1201
1202 template<int N, typename OutputTuple, typename K, typename KHash>
1203 struct join_base<N, key_matching_port, OutputTuple, key_matching<K,KHash> > {
1204 typedef key_matching<K, KHash> key_traits_type;
1205 typedef K key_type;
1206 typedef KHash key_hash_compare;
1207 typedef join_node_base< key_traits_type,
1208
1209 typename wrap_key_tuple_elements<N,key_matching_port,key_traits_type,OutputTuple>::type,
1210 OutputTuple > type;
1211 };
1212
1213
1214
1215
1216
1217 template<int M, template<class> class PT, typename OutputTuple, typename JP>
1218 class unfolded_join_node : public join_base<M,PT,OutputTuple,JP>::type {
1219 public:
1220 typedef typename wrap_tuple_elements<M, PT, OutputTuple>::type input_ports_type;
1221 typedef OutputTuple output_type;
1222 private:
1223 typedef join_node_base<JP, input_ports_type, output_type > base_type;
1224 public:
1225 unfolded_join_node(graph &g) : base_type(g) {}
1226 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1227 };
1228
1229 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1230 template <typename K, typename T>
1231 struct key_from_message_body {
1232 K operator()(const T& t) const {
1233 return key_from_message<K>(t);
1234 }
1235 };
1236
1237 template <typename K, typename T>
1238 struct key_from_message_body<K&,T> {
1239 const K& operator()(const T& t) const {
1240 return key_from_message<const K&>(t);
1241 }
1242 };
1243 #endif
1244
1245
1246
1247 template<typename OutputTuple, typename K, typename KHash>
1248 class unfolded_join_node<2,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1249 join_base<2,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1250 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1251 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1252 public:
1253 typedef typename wrap_key_tuple_elements<2,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1254 typedef OutputTuple output_type;
1255 private:
1256 typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1257 typedef type_to_key_function_body<T0, K> *f0_p;
1258 typedef type_to_key_function_body<T1, K> *f1_p;
1259 typedef std::tuple< f0_p, f1_p > func_initializer_type;
1260 public:
1261 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1262 unfolded_join_node(graph &g) : base_type(g,
1263 func_initializer_type(
1264 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1265 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>())
1266 ) ) {
1267 }
1268 #endif
1269 template<typename Body0, typename Body1>
1270 unfolded_join_node(graph &g, Body0 body0, Body1 body1) : base_type(g,
1271 func_initializer_type(
1272 new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1273 new type_to_key_function_body_leaf<T1, K, Body1>(body1)
1274 ) ) {
1275 static_assert(std::tuple_size<OutputTuple>::value == 2, "wrong number of body initializers");
1276 }
1277 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1278 };
1279
1280 template<typename OutputTuple, typename K, typename KHash>
1281 class unfolded_join_node<3,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1282 join_base<3,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1283 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1284 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1285 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1286 public:
1287 typedef typename wrap_key_tuple_elements<3,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1288 typedef OutputTuple output_type;
1289 private:
1290 typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1291 typedef type_to_key_function_body<T0, K> *f0_p;
1292 typedef type_to_key_function_body<T1, K> *f1_p;
1293 typedef type_to_key_function_body<T2, K> *f2_p;
1294 typedef std::tuple< f0_p, f1_p, f2_p > func_initializer_type;
1295 public:
1296 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1297 unfolded_join_node(graph &g) : base_type(g,
1298 func_initializer_type(
1299 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1300 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1301 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>())
1302 ) ) {
1303 }
1304 #endif
1305 template<typename Body0, typename Body1, typename Body2>
1306 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2) : base_type(g,
1307 func_initializer_type(
1308 new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1309 new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1310 new type_to_key_function_body_leaf<T2, K, Body2>(body2)
1311 ) ) {
1312 static_assert(std::tuple_size<OutputTuple>::value == 3, "wrong number of body initializers");
1313 }
1314 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1315 };
1316
1317 template<typename OutputTuple, typename K, typename KHash>
1318 class unfolded_join_node<4,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1319 join_base<4,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1320 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1321 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1322 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1323 typedef typename std::tuple_element<3, OutputTuple>::type T3;
1324 public:
1325 typedef typename wrap_key_tuple_elements<4,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1326 typedef OutputTuple output_type;
1327 private:
1328 typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1329 typedef type_to_key_function_body<T0, K> *f0_p;
1330 typedef type_to_key_function_body<T1, K> *f1_p;
1331 typedef type_to_key_function_body<T2, K> *f2_p;
1332 typedef type_to_key_function_body<T3, K> *f3_p;
1333 typedef std::tuple< f0_p, f1_p, f2_p, f3_p > func_initializer_type;
1334 public:
1335 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1336 unfolded_join_node(graph &g) : base_type(g,
1337 func_initializer_type(
1338 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1339 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1340 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1341 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>())
1342 ) ) {
1343 }
1344 #endif
1345 template<typename Body0, typename Body1, typename Body2, typename Body3>
1346 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3) : base_type(g,
1347 func_initializer_type(
1348 new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1349 new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1350 new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1351 new type_to_key_function_body_leaf<T3, K, Body3>(body3)
1352 ) ) {
1353 static_assert(std::tuple_size<OutputTuple>::value == 4, "wrong number of body initializers");
1354 }
1355 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1356 };
1357
1358 template<typename OutputTuple, typename K, typename KHash>
1359 class unfolded_join_node<5,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1360 join_base<5,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1361 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1362 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1363 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1364 typedef typename std::tuple_element<3, OutputTuple>::type T3;
1365 typedef typename std::tuple_element<4, OutputTuple>::type T4;
1366 public:
1367 typedef typename wrap_key_tuple_elements<5,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1368 typedef OutputTuple output_type;
1369 private:
1370 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1371 typedef type_to_key_function_body<T0, K> *f0_p;
1372 typedef type_to_key_function_body<T1, K> *f1_p;
1373 typedef type_to_key_function_body<T2, K> *f2_p;
1374 typedef type_to_key_function_body<T3, K> *f3_p;
1375 typedef type_to_key_function_body<T4, K> *f4_p;
1376 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p > func_initializer_type;
1377 public:
1378 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1379 unfolded_join_node(graph &g) : base_type(g,
1380 func_initializer_type(
1381 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1382 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1383 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1384 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1385 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>())
1386 ) ) {
1387 }
1388 #endif
1389 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4>
1390 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4) : base_type(g,
1391 func_initializer_type(
1392 new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1393 new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1394 new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1395 new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1396 new type_to_key_function_body_leaf<T4, K, Body4>(body4)
1397 ) ) {
1398 static_assert(std::tuple_size<OutputTuple>::value == 5, "wrong number of body initializers");
1399 }
1400 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1401 };
1402
1403 #if __TBB_VARIADIC_MAX >= 6
1404 template<typename OutputTuple, typename K, typename KHash>
1405 class unfolded_join_node<6,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1406 join_base<6,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1407 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1408 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1409 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1410 typedef typename std::tuple_element<3, OutputTuple>::type T3;
1411 typedef typename std::tuple_element<4, OutputTuple>::type T4;
1412 typedef typename std::tuple_element<5, OutputTuple>::type T5;
1413 public:
1414 typedef typename wrap_key_tuple_elements<6,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1415 typedef OutputTuple output_type;
1416 private:
1417 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1418 typedef type_to_key_function_body<T0, K> *f0_p;
1419 typedef type_to_key_function_body<T1, K> *f1_p;
1420 typedef type_to_key_function_body<T2, K> *f2_p;
1421 typedef type_to_key_function_body<T3, K> *f3_p;
1422 typedef type_to_key_function_body<T4, K> *f4_p;
1423 typedef type_to_key_function_body<T5, K> *f5_p;
1424 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p > func_initializer_type;
1425 public:
1426 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1427 unfolded_join_node(graph &g) : base_type(g,
1428 func_initializer_type(
1429 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1430 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1431 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1432 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1433 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1434 new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>())
1435 ) ) {
1436 }
1437 #endif
1438 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, typename Body5>
1439 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, Body5 body5)
1440 : base_type(g, func_initializer_type(
1441 new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1442 new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1443 new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1444 new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1445 new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1446 new type_to_key_function_body_leaf<T5, K, Body5>(body5)
1447 ) ) {
1448 static_assert(std::tuple_size<OutputTuple>::value == 6, "wrong number of body initializers");
1449 }
1450 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1451 };
1452 #endif
1453
1454 #if __TBB_VARIADIC_MAX >= 7
1455 template<typename OutputTuple, typename K, typename KHash>
1456 class unfolded_join_node<7,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1457 join_base<7,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1458 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1459 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1460 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1461 typedef typename std::tuple_element<3, OutputTuple>::type T3;
1462 typedef typename std::tuple_element<4, OutputTuple>::type T4;
1463 typedef typename std::tuple_element<5, OutputTuple>::type T5;
1464 typedef typename std::tuple_element<6, OutputTuple>::type T6;
1465 public:
1466 typedef typename wrap_key_tuple_elements<7,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1467 typedef OutputTuple output_type;
1468 private:
1469 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1470 typedef type_to_key_function_body<T0, K> *f0_p;
1471 typedef type_to_key_function_body<T1, K> *f1_p;
1472 typedef type_to_key_function_body<T2, K> *f2_p;
1473 typedef type_to_key_function_body<T3, K> *f3_p;
1474 typedef type_to_key_function_body<T4, K> *f4_p;
1475 typedef type_to_key_function_body<T5, K> *f5_p;
1476 typedef type_to_key_function_body<T6, K> *f6_p;
1477 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p > func_initializer_type;
1478 public:
1479 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1480 unfolded_join_node(graph &g) : base_type(g,
1481 func_initializer_type(
1482 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1483 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1484 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1485 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1486 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1487 new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1488 new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>())
1489 ) ) {
1490 }
1491 #endif
1492 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1493 typename Body5, typename Body6>
1494 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1495 Body5 body5, Body6 body6) : base_type(g, func_initializer_type(
1496 new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1497 new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1498 new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1499 new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1500 new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1501 new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1502 new type_to_key_function_body_leaf<T6, K, Body6>(body6)
1503 ) ) {
1504 static_assert(std::tuple_size<OutputTuple>::value == 7, "wrong number of body initializers");
1505 }
1506 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1507 };
1508 #endif
1509
1510 #if __TBB_VARIADIC_MAX >= 8
1511 template<typename OutputTuple, typename K, typename KHash>
1512 class unfolded_join_node<8,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1513 join_base<8,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1514 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1515 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1516 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1517 typedef typename std::tuple_element<3, OutputTuple>::type T3;
1518 typedef typename std::tuple_element<4, OutputTuple>::type T4;
1519 typedef typename std::tuple_element<5, OutputTuple>::type T5;
1520 typedef typename std::tuple_element<6, OutputTuple>::type T6;
1521 typedef typename std::tuple_element<7, OutputTuple>::type T7;
1522 public:
1523 typedef typename wrap_key_tuple_elements<8,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1524 typedef OutputTuple output_type;
1525 private:
1526 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1527 typedef type_to_key_function_body<T0, K> *f0_p;
1528 typedef type_to_key_function_body<T1, K> *f1_p;
1529 typedef type_to_key_function_body<T2, K> *f2_p;
1530 typedef type_to_key_function_body<T3, K> *f3_p;
1531 typedef type_to_key_function_body<T4, K> *f4_p;
1532 typedef type_to_key_function_body<T5, K> *f5_p;
1533 typedef type_to_key_function_body<T6, K> *f6_p;
1534 typedef type_to_key_function_body<T7, K> *f7_p;
1535 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p > func_initializer_type;
1536 public:
1537 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1538 unfolded_join_node(graph &g) : base_type(g,
1539 func_initializer_type(
1540 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1541 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1542 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1543 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1544 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1545 new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1546 new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1547 new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>())
1548 ) ) {
1549 }
1550 #endif
1551 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1552 typename Body5, typename Body6, typename Body7>
1553 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1554 Body5 body5, Body6 body6, Body7 body7) : base_type(g, func_initializer_type(
1555 new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1556 new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1557 new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1558 new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1559 new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1560 new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1561 new type_to_key_function_body_leaf<T6, K, Body6>(body6),
1562 new type_to_key_function_body_leaf<T7, K, Body7>(body7)
1563 ) ) {
1564 static_assert(std::tuple_size<OutputTuple>::value == 8, "wrong number of body initializers");
1565 }
1566 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1567 };
1568 #endif
1569
1570 #if __TBB_VARIADIC_MAX >= 9
1571 template<typename OutputTuple, typename K, typename KHash>
1572 class unfolded_join_node<9,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1573 join_base<9,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1574 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1575 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1576 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1577 typedef typename std::tuple_element<3, OutputTuple>::type T3;
1578 typedef typename std::tuple_element<4, OutputTuple>::type T4;
1579 typedef typename std::tuple_element<5, OutputTuple>::type T5;
1580 typedef typename std::tuple_element<6, OutputTuple>::type T6;
1581 typedef typename std::tuple_element<7, OutputTuple>::type T7;
1582 typedef typename std::tuple_element<8, OutputTuple>::type T8;
1583 public:
1584 typedef typename wrap_key_tuple_elements<9,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1585 typedef OutputTuple output_type;
1586 private:
1587 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1588 typedef type_to_key_function_body<T0, K> *f0_p;
1589 typedef type_to_key_function_body<T1, K> *f1_p;
1590 typedef type_to_key_function_body<T2, K> *f2_p;
1591 typedef type_to_key_function_body<T3, K> *f3_p;
1592 typedef type_to_key_function_body<T4, K> *f4_p;
1593 typedef type_to_key_function_body<T5, K> *f5_p;
1594 typedef type_to_key_function_body<T6, K> *f6_p;
1595 typedef type_to_key_function_body<T7, K> *f7_p;
1596 typedef type_to_key_function_body<T8, K> *f8_p;
1597 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p > func_initializer_type;
1598 public:
1599 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1600 unfolded_join_node(graph &g) : base_type(g,
1601 func_initializer_type(
1602 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1603 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1604 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1605 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1606 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1607 new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1608 new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1609 new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1610 new type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>())
1611 ) ) {
1612 }
1613 #endif
1614 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1615 typename Body5, typename Body6, typename Body7, typename Body8>
1616 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1617 Body5 body5, Body6 body6, Body7 body7, Body8 body8) : base_type(g, func_initializer_type(
1618 new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1619 new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1620 new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1621 new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1622 new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1623 new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1624 new type_to_key_function_body_leaf<T6, K, Body6>(body6),
1625 new type_to_key_function_body_leaf<T7, K, Body7>(body7),
1626 new type_to_key_function_body_leaf<T8, K, Body8>(body8)
1627 ) ) {
1628 static_assert(std::tuple_size<OutputTuple>::value == 9, "wrong number of body initializers");
1629 }
1630 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1631 };
1632 #endif
1633
1634 #if __TBB_VARIADIC_MAX >= 10
1635 template<typename OutputTuple, typename K, typename KHash>
1636 class unfolded_join_node<10,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1637 join_base<10,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1638 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1639 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1640 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1641 typedef typename std::tuple_element<3, OutputTuple>::type T3;
1642 typedef typename std::tuple_element<4, OutputTuple>::type T4;
1643 typedef typename std::tuple_element<5, OutputTuple>::type T5;
1644 typedef typename std::tuple_element<6, OutputTuple>::type T6;
1645 typedef typename std::tuple_element<7, OutputTuple>::type T7;
1646 typedef typename std::tuple_element<8, OutputTuple>::type T8;
1647 typedef typename std::tuple_element<9, OutputTuple>::type T9;
1648 public:
1649 typedef typename wrap_key_tuple_elements<10,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1650 typedef OutputTuple output_type;
1651 private:
1652 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1653 typedef type_to_key_function_body<T0, K> *f0_p;
1654 typedef type_to_key_function_body<T1, K> *f1_p;
1655 typedef type_to_key_function_body<T2, K> *f2_p;
1656 typedef type_to_key_function_body<T3, K> *f3_p;
1657 typedef type_to_key_function_body<T4, K> *f4_p;
1658 typedef type_to_key_function_body<T5, K> *f5_p;
1659 typedef type_to_key_function_body<T6, K> *f6_p;
1660 typedef type_to_key_function_body<T7, K> *f7_p;
1661 typedef type_to_key_function_body<T8, K> *f8_p;
1662 typedef type_to_key_function_body<T9, K> *f9_p;
1663 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p, f9_p > func_initializer_type;
1664 public:
1665 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1666 unfolded_join_node(graph &g) : base_type(g,
1667 func_initializer_type(
1668 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1669 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1670 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1671 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1672 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1673 new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1674 new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1675 new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1676 new type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>()),
1677 new type_to_key_function_body_leaf<T9, K, key_from_message_body<K,T9> >(key_from_message_body<K,T9>())
1678 ) ) {
1679 }
1680 #endif
1681 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1682 typename Body5, typename Body6, typename Body7, typename Body8, typename Body9>
1683 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1684 Body5 body5, Body6 body6, Body7 body7, Body8 body8, Body9 body9) : base_type(g, func_initializer_type(
1685 new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1686 new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1687 new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1688 new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1689 new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1690 new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1691 new type_to_key_function_body_leaf<T6, K, Body6>(body6),
1692 new type_to_key_function_body_leaf<T7, K, Body7>(body7),
1693 new type_to_key_function_body_leaf<T8, K, Body8>(body8),
1694 new type_to_key_function_body_leaf<T9, K, Body9>(body9)
1695 ) ) {
1696 static_assert(std::tuple_size<OutputTuple>::value == 10, "wrong number of body initializers");
1697 }
1698 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1699 };
1700 #endif
1701
1702
1703 template<size_t N, typename JNT>
1704 typename std::tuple_element<N, typename JNT::input_ports_type>::type &input_port(JNT &jn) {
1705 return std::get<N>(jn.input_ports());
1706 }
1707
1708 #endif