File indexing completed on 2025-01-18 10:12:48
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB__flow_graph_join_impl_H
0018 #define __TBB__flow_graph_join_impl_H
0019
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023
0024 namespace internal {
0025
0026 struct forwarding_base : tbb::internal::no_assign {
0027 forwarding_base(graph &g) : graph_ref(g) {}
0028 virtual ~forwarding_base() {}
0029
0030
0031 virtual task * decrement_port_count(bool handle_task) = 0;
0032 virtual void increment_port_count() = 0;
0033
0034 graph& graph_ref;
0035 };
0036
0037
0038
0039 template<typename KeyType>
0040 struct matching_forwarding_base : public forwarding_base {
0041 typedef typename tbb::internal::strip<KeyType>::type current_key_type;
0042 matching_forwarding_base(graph &g) : forwarding_base(g) { }
0043 virtual task * increment_key_count(current_key_type const & , bool ) = 0;
0044 current_key_type current_key;
0045 };
0046
0047 template< int N >
0048 struct join_helper {
0049
0050 template< typename TupleType, typename PortType >
0051 static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
0052 tbb::flow::get<N-1>( my_input ).set_join_node_pointer(port);
0053 join_helper<N-1>::set_join_node_pointer( my_input, port );
0054 }
0055 template< typename TupleType >
0056 static inline void consume_reservations( TupleType &my_input ) {
0057 tbb::flow::get<N-1>( my_input ).consume();
0058 join_helper<N-1>::consume_reservations( my_input );
0059 }
0060
0061 template< typename TupleType >
0062 static inline void release_my_reservation( TupleType &my_input ) {
0063 tbb::flow::get<N-1>( my_input ).release();
0064 }
0065
0066 template <typename TupleType>
0067 static inline void release_reservations( TupleType &my_input) {
0068 join_helper<N-1>::release_reservations(my_input);
0069 release_my_reservation(my_input);
0070 }
0071
0072 template< typename InputTuple, typename OutputTuple >
0073 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
0074 if ( !tbb::flow::get<N-1>( my_input ).reserve( tbb::flow::get<N-1>( out ) ) ) return false;
0075 if ( !join_helper<N-1>::reserve( my_input, out ) ) {
0076 release_my_reservation( my_input );
0077 return false;
0078 }
0079 return true;
0080 }
0081
0082 template<typename InputTuple, typename OutputTuple>
0083 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
0084 bool res = tbb::flow::get<N-1>(my_input).get_item(tbb::flow::get<N-1>(out) );
0085 return join_helper<N-1>::get_my_item(my_input, out) && res;
0086 }
0087
0088 template<typename InputTuple, typename OutputTuple>
0089 static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
0090 return get_my_item(my_input, out);
0091 }
0092
0093 template<typename InputTuple>
0094 static inline void reset_my_port(InputTuple &my_input) {
0095 join_helper<N-1>::reset_my_port(my_input);
0096 tbb::flow::get<N-1>(my_input).reset_port();
0097 }
0098
0099 template<typename InputTuple>
0100 static inline void reset_ports(InputTuple& my_input) {
0101 reset_my_port(my_input);
0102 }
0103
0104 template<typename InputTuple, typename KeyFuncTuple>
0105 static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
0106 tbb::flow::get<N-1>(my_input).set_my_key_func(tbb::flow::get<N-1>(my_key_funcs));
0107 tbb::flow::get<N-1>(my_key_funcs) = NULL;
0108 join_helper<N-1>::set_key_functors(my_input, my_key_funcs);
0109 }
0110
0111 template< typename KeyFuncTuple>
0112 static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
0113 if(tbb::flow::get<N-1>(other_inputs).get_my_key_func()) {
0114 tbb::flow::get<N-1>(my_inputs).set_my_key_func(tbb::flow::get<N-1>(other_inputs).get_my_key_func()->clone());
0115 }
0116 join_helper<N-1>::copy_key_functors(my_inputs, other_inputs);
0117 }
0118
0119 template<typename InputTuple>
0120 static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
0121 join_helper<N-1>::reset_inputs(my_input, f);
0122 tbb::flow::get<N-1>(my_input).reset_receiver(f);
0123 }
0124
0125 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0126 template<typename InputTuple>
0127 static inline void extract_inputs(InputTuple &my_input) {
0128 join_helper<N-1>::extract_inputs(my_input);
0129 tbb::flow::get<N-1>(my_input).extract_receiver();
0130 }
0131 #endif
0132 };
0133
0134 template< >
0135 struct join_helper<1> {
0136
0137 template< typename TupleType, typename PortType >
0138 static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
0139 tbb::flow::get<0>( my_input ).set_join_node_pointer(port);
0140 }
0141
0142 template< typename TupleType >
0143 static inline void consume_reservations( TupleType &my_input ) {
0144 tbb::flow::get<0>( my_input ).consume();
0145 }
0146
0147 template< typename TupleType >
0148 static inline void release_my_reservation( TupleType &my_input ) {
0149 tbb::flow::get<0>( my_input ).release();
0150 }
0151
0152 template<typename TupleType>
0153 static inline void release_reservations( TupleType &my_input) {
0154 release_my_reservation(my_input);
0155 }
0156
0157 template< typename InputTuple, typename OutputTuple >
0158 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
0159 return tbb::flow::get<0>( my_input ).reserve( tbb::flow::get<0>( out ) );
0160 }
0161
0162 template<typename InputTuple, typename OutputTuple>
0163 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
0164 return tbb::flow::get<0>(my_input).get_item(tbb::flow::get<0>(out));
0165 }
0166
0167 template<typename InputTuple, typename OutputTuple>
0168 static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
0169 return get_my_item(my_input, out);
0170 }
0171
0172 template<typename InputTuple>
0173 static inline void reset_my_port(InputTuple &my_input) {
0174 tbb::flow::get<0>(my_input).reset_port();
0175 }
0176
0177 template<typename InputTuple>
0178 static inline void reset_ports(InputTuple& my_input) {
0179 reset_my_port(my_input);
0180 }
0181
0182 template<typename InputTuple, typename KeyFuncTuple>
0183 static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
0184 tbb::flow::get<0>(my_input).set_my_key_func(tbb::flow::get<0>(my_key_funcs));
0185 tbb::flow::get<0>(my_key_funcs) = NULL;
0186 }
0187
0188 template< typename KeyFuncTuple>
0189 static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
0190 if(tbb::flow::get<0>(other_inputs).get_my_key_func()) {
0191 tbb::flow::get<0>(my_inputs).set_my_key_func(tbb::flow::get<0>(other_inputs).get_my_key_func()->clone());
0192 }
0193 }
0194 template<typename InputTuple>
0195 static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
0196 tbb::flow::get<0>(my_input).reset_receiver(f);
0197 }
0198
0199 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0200 template<typename InputTuple>
0201 static inline void extract_inputs(InputTuple &my_input) {
0202 tbb::flow::get<0>(my_input).extract_receiver();
0203 }
0204 #endif
0205 };
0206
0207
0208 template< typename T >
0209 class reserving_port : public receiver<T> {
0210 public:
0211 typedef T input_type;
0212 typedef typename receiver<input_type>::predecessor_type predecessor_type;
0213 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0214 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
0215 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
0216 #endif
0217 private:
0218
0219 enum op_type { reg_pred, rem_pred, res_item, rel_res, con_res
0220 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0221 , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
0222 #endif
0223 };
0224 typedef reserving_port<T> class_type;
0225
0226 class reserving_port_operation : public aggregated_operation<reserving_port_operation> {
0227 public:
0228 char type;
0229 union {
0230 T *my_arg;
0231 predecessor_type *my_pred;
0232 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0233 size_t cnt_val;
0234 predecessor_list_type *plist;
0235 #endif
0236 };
0237 reserving_port_operation(const T& e, op_type t) :
0238 type(char(t)), my_arg(const_cast<T*>(&e)) {}
0239 reserving_port_operation(const predecessor_type &s, op_type t) : type(char(t)),
0240 my_pred(const_cast<predecessor_type *>(&s)) {}
0241 reserving_port_operation(op_type t) : type(char(t)) {}
0242 };
0243
0244 typedef internal::aggregating_functor<class_type, reserving_port_operation> handler_type;
0245 friend class internal::aggregating_functor<class_type, reserving_port_operation>;
0246 aggregator<handler_type, reserving_port_operation> my_aggregator;
0247
0248 void handle_operations(reserving_port_operation* op_list) {
0249 reserving_port_operation *current;
0250 bool no_predecessors;
0251 while(op_list) {
0252 current = op_list;
0253 op_list = op_list->next;
0254 switch(current->type) {
0255 case reg_pred:
0256 no_predecessors = my_predecessors.empty();
0257 my_predecessors.add(*(current->my_pred));
0258 if ( no_predecessors ) {
0259 (void) my_join->decrement_port_count(true);
0260 }
0261 __TBB_store_with_release(current->status, SUCCEEDED);
0262 break;
0263 case rem_pred:
0264 my_predecessors.remove(*(current->my_pred));
0265 if(my_predecessors.empty()) my_join->increment_port_count();
0266 __TBB_store_with_release(current->status, SUCCEEDED);
0267 break;
0268 case res_item:
0269 if ( reserved ) {
0270 __TBB_store_with_release(current->status, FAILED);
0271 }
0272 else if ( my_predecessors.try_reserve( *(current->my_arg) ) ) {
0273 reserved = true;
0274 __TBB_store_with_release(current->status, SUCCEEDED);
0275 } else {
0276 if ( my_predecessors.empty() ) {
0277 my_join->increment_port_count();
0278 }
0279 __TBB_store_with_release(current->status, FAILED);
0280 }
0281 break;
0282 case rel_res:
0283 reserved = false;
0284 my_predecessors.try_release( );
0285 __TBB_store_with_release(current->status, SUCCEEDED);
0286 break;
0287 case con_res:
0288 reserved = false;
0289 my_predecessors.try_consume( );
0290 __TBB_store_with_release(current->status, SUCCEEDED);
0291 break;
0292 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0293 case add_blt_pred:
0294 my_predecessors.internal_add_built_predecessor(*(current->my_pred));
0295 __TBB_store_with_release(current->status, SUCCEEDED);
0296 break;
0297 case del_blt_pred:
0298 my_predecessors.internal_delete_built_predecessor(*(current->my_pred));
0299 __TBB_store_with_release(current->status, SUCCEEDED);
0300 break;
0301 case blt_pred_cnt:
0302 current->cnt_val = my_predecessors.predecessor_count();
0303 __TBB_store_with_release(current->status, SUCCEEDED);
0304 break;
0305 case blt_pred_cpy:
0306 my_predecessors.copy_predecessors(*(current->plist));
0307 __TBB_store_with_release(current->status, SUCCEEDED);
0308 break;
0309 #endif
0310 }
0311 }
0312 }
0313
0314 protected:
0315 template< typename R, typename B > friend class run_and_put_task;
0316 template<typename X, typename Y> friend class internal::broadcast_cache;
0317 template<typename X, typename Y> friend class internal::round_robin_cache;
0318 task *try_put_task( const T & ) __TBB_override {
0319 return NULL;
0320 }
0321
0322 graph& graph_reference() const __TBB_override {
0323 return my_join->graph_ref;
0324 }
0325
0326 public:
0327
0328
0329 reserving_port() : reserved(false) {
0330 my_join = NULL;
0331 my_predecessors.set_owner( this );
0332 my_aggregator.initialize_handler(handler_type(this));
0333 }
0334
0335
0336 reserving_port(const reserving_port& ) : receiver<T>() {
0337 reserved = false;
0338 my_join = NULL;
0339 my_predecessors.set_owner( this );
0340 my_aggregator.initialize_handler(handler_type(this));
0341 }
0342
0343 void set_join_node_pointer(forwarding_base *join) {
0344 my_join = join;
0345 }
0346
0347
0348 bool register_predecessor( predecessor_type &src ) __TBB_override {
0349 reserving_port_operation op_data(src, reg_pred);
0350 my_aggregator.execute(&op_data);
0351 return op_data.status == SUCCEEDED;
0352 }
0353
0354
0355 bool remove_predecessor( predecessor_type &src ) __TBB_override {
0356 reserving_port_operation op_data(src, rem_pred);
0357 my_aggregator.execute(&op_data);
0358 return op_data.status == SUCCEEDED;
0359 }
0360
0361
0362 bool reserve( T &v ) {
0363 reserving_port_operation op_data(v, res_item);
0364 my_aggregator.execute(&op_data);
0365 return op_data.status == SUCCEEDED;
0366 }
0367
0368
0369 void release( ) {
0370 reserving_port_operation op_data(rel_res);
0371 my_aggregator.execute(&op_data);
0372 }
0373
0374
0375 void consume( ) {
0376 reserving_port_operation op_data(con_res);
0377 my_aggregator.execute(&op_data);
0378 }
0379
0380 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0381 built_predecessors_type &built_predecessors() __TBB_override { return my_predecessors.built_predecessors(); }
0382 void internal_add_built_predecessor(predecessor_type &src) __TBB_override {
0383 reserving_port_operation op_data(src, add_blt_pred);
0384 my_aggregator.execute(&op_data);
0385 }
0386
0387 void internal_delete_built_predecessor(predecessor_type &src) __TBB_override {
0388 reserving_port_operation op_data(src, del_blt_pred);
0389 my_aggregator.execute(&op_data);
0390 }
0391
0392 size_t predecessor_count() __TBB_override {
0393 reserving_port_operation op_data(blt_pred_cnt);
0394 my_aggregator.execute(&op_data);
0395 return op_data.cnt_val;
0396 }
0397
0398 void copy_predecessors(predecessor_list_type &l) __TBB_override {
0399 reserving_port_operation op_data(blt_pred_cpy);
0400 op_data.plist = &l;
0401 my_aggregator.execute(&op_data);
0402 }
0403
0404 void extract_receiver() {
0405 my_predecessors.built_predecessors().receiver_extract(*this);
0406 }
0407
0408 #endif
0409
0410 void reset_receiver( reset_flags f) __TBB_override {
0411 if(f & rf_clear_edges) my_predecessors.clear();
0412 else
0413 my_predecessors.reset();
0414 reserved = false;
0415 __TBB_ASSERT(!(f&rf_clear_edges) || my_predecessors.empty(), "port edges not removed");
0416 }
0417
0418 private:
0419 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0420 friend class get_graph_helper;
0421 #endif
0422
0423 forwarding_base *my_join;
0424 reservable_predecessor_cache< T, null_mutex > my_predecessors;
0425 bool reserved;
0426 };
0427
0428
0429 template<typename T>
0430 class queueing_port : public receiver<T>, public item_buffer<T> {
0431 public:
0432 typedef T input_type;
0433 typedef typename receiver<input_type>::predecessor_type predecessor_type;
0434 typedef queueing_port<T> class_type;
0435 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0436 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
0437 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
0438 #endif
0439
0440
0441 private:
0442 enum op_type { get__item, res_port, try__put_task
0443 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0444 , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
0445 #endif
0446 };
0447
0448 class queueing_port_operation : public aggregated_operation<queueing_port_operation> {
0449 public:
0450 char type;
0451 T my_val;
0452 T *my_arg;
0453 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0454 predecessor_type *pred;
0455 size_t cnt_val;
0456 predecessor_list_type *plist;
0457 #endif
0458 task * bypass_t;
0459
0460 queueing_port_operation(const T& e, op_type t) :
0461 type(char(t)), my_val(e)
0462 , bypass_t(NULL)
0463 {}
0464
0465 queueing_port_operation(const T* p, op_type t) :
0466 type(char(t)), my_arg(const_cast<T*>(p))
0467 , bypass_t(NULL)
0468 {}
0469
0470 queueing_port_operation(op_type t) : type(char(t))
0471 , bypass_t(NULL)
0472 {}
0473 };
0474
0475 typedef internal::aggregating_functor<class_type, queueing_port_operation> handler_type;
0476 friend class internal::aggregating_functor<class_type, queueing_port_operation>;
0477 aggregator<handler_type, queueing_port_operation> my_aggregator;
0478
0479 void handle_operations(queueing_port_operation* op_list) {
0480 queueing_port_operation *current;
0481 bool was_empty;
0482 while(op_list) {
0483 current = op_list;
0484 op_list = op_list->next;
0485 switch(current->type) {
0486 case try__put_task: {
0487 task *rtask = NULL;
0488 was_empty = this->buffer_empty();
0489 this->push_back(current->my_val);
0490 if (was_empty) rtask = my_join->decrement_port_count(false);
0491 else
0492 rtask = SUCCESSFULLY_ENQUEUED;
0493 current->bypass_t = rtask;
0494 __TBB_store_with_release(current->status, SUCCEEDED);
0495 }
0496 break;
0497 case get__item:
0498 if(!this->buffer_empty()) {
0499 *(current->my_arg) = this->front();
0500 __TBB_store_with_release(current->status, SUCCEEDED);
0501 }
0502 else {
0503 __TBB_store_with_release(current->status, FAILED);
0504 }
0505 break;
0506 case res_port:
0507 __TBB_ASSERT(this->my_item_valid(this->my_head), "No item to reset");
0508 this->destroy_front();
0509 if(this->my_item_valid(this->my_head)) {
0510 (void)my_join->decrement_port_count(true);
0511 }
0512 __TBB_store_with_release(current->status, SUCCEEDED);
0513 break;
0514 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0515 case add_blt_pred:
0516 my_built_predecessors.add_edge(*(current->pred));
0517 __TBB_store_with_release(current->status, SUCCEEDED);
0518 break;
0519 case del_blt_pred:
0520 my_built_predecessors.delete_edge(*(current->pred));
0521 __TBB_store_with_release(current->status, SUCCEEDED);
0522 break;
0523 case blt_pred_cnt:
0524 current->cnt_val = my_built_predecessors.edge_count();
0525 __TBB_store_with_release(current->status, SUCCEEDED);
0526 break;
0527 case blt_pred_cpy:
0528 my_built_predecessors.copy_edges(*(current->plist));
0529 __TBB_store_with_release(current->status, SUCCEEDED);
0530 break;
0531 #endif
0532 }
0533 }
0534 }
0535
0536
0537 protected:
0538 template< typename R, typename B > friend class run_and_put_task;
0539 template<typename X, typename Y> friend class internal::broadcast_cache;
0540 template<typename X, typename Y> friend class internal::round_robin_cache;
0541 task *try_put_task(const T &v) __TBB_override {
0542 queueing_port_operation op_data(v, try__put_task);
0543 my_aggregator.execute(&op_data);
0544 __TBB_ASSERT(op_data.status == SUCCEEDED || !op_data.bypass_t, "inconsistent return from aggregator");
0545 if(!op_data.bypass_t) return SUCCESSFULLY_ENQUEUED;
0546 return op_data.bypass_t;
0547 }
0548
0549 graph& graph_reference() const __TBB_override {
0550 return my_join->graph_ref;
0551 }
0552
0553 public:
0554
0555
0556 queueing_port() : item_buffer<T>() {
0557 my_join = NULL;
0558 my_aggregator.initialize_handler(handler_type(this));
0559 }
0560
0561
0562 queueing_port(const queueing_port& ) : receiver<T>(), item_buffer<T>() {
0563 my_join = NULL;
0564 my_aggregator.initialize_handler(handler_type(this));
0565 }
0566
0567
0568 void set_join_node_pointer(forwarding_base *join) {
0569 my_join = join;
0570 }
0571
0572 bool get_item( T &v ) {
0573 queueing_port_operation op_data(&v, get__item);
0574 my_aggregator.execute(&op_data);
0575 return op_data.status == SUCCEEDED;
0576 }
0577
0578
0579
0580 void reset_port() {
0581 queueing_port_operation op_data(res_port);
0582 my_aggregator.execute(&op_data);
0583 return;
0584 }
0585
0586 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0587 built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
0588
0589 void internal_add_built_predecessor(predecessor_type &p) __TBB_override {
0590 queueing_port_operation op_data(add_blt_pred);
0591 op_data.pred = &p;
0592 my_aggregator.execute(&op_data);
0593 }
0594
0595 void internal_delete_built_predecessor(predecessor_type &p) __TBB_override {
0596 queueing_port_operation op_data(del_blt_pred);
0597 op_data.pred = &p;
0598 my_aggregator.execute(&op_data);
0599 }
0600
0601 size_t predecessor_count() __TBB_override {
0602 queueing_port_operation op_data(blt_pred_cnt);
0603 my_aggregator.execute(&op_data);
0604 return op_data.cnt_val;
0605 }
0606
0607 void copy_predecessors(predecessor_list_type &l) __TBB_override {
0608 queueing_port_operation op_data(blt_pred_cpy);
0609 op_data.plist = &l;
0610 my_aggregator.execute(&op_data);
0611 }
0612
0613 void extract_receiver() {
0614 item_buffer<T>::reset();
0615 my_built_predecessors.receiver_extract(*this);
0616 }
0617 #endif
0618
0619 void reset_receiver(reset_flags f) __TBB_override {
0620 tbb::internal::suppress_unused_warning(f);
0621 item_buffer<T>::reset();
0622 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0623 if (f & rf_clear_edges)
0624 my_built_predecessors.clear();
0625 #endif
0626 }
0627
0628 private:
0629 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0630 friend class get_graph_helper;
0631 #endif
0632
0633 forwarding_base *my_join;
0634 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0635 edge_container<predecessor_type> my_built_predecessors;
0636 #endif
0637 };
0638
0639 #include "_flow_graph_tagged_buffer_impl.h"
0640
0641 template<typename K>
0642 struct count_element {
0643 K my_key;
0644 size_t my_value;
0645 };
0646
0647
0648
0649 template< typename K >
0650 struct key_to_count_functor {
0651 typedef count_element<K> table_item_type;
0652 const K& operator()(const table_item_type& v) { return v.my_key; }
0653 };
0654
0655
0656
0657 template< class TraitsType >
0658 class key_matching_port :
0659 public receiver<typename TraitsType::T>,
0660 public hash_buffer< typename TraitsType::K, typename TraitsType::T, typename TraitsType::TtoK,
0661 typename TraitsType::KHash > {
0662 public:
0663 typedef TraitsType traits;
0664 typedef key_matching_port<traits> class_type;
0665 typedef typename TraitsType::T input_type;
0666 typedef typename TraitsType::K key_type;
0667 typedef typename tbb::internal::strip<key_type>::type noref_key_type;
0668 typedef typename receiver<input_type>::predecessor_type predecessor_type;
0669 typedef typename TraitsType::TtoK type_to_key_func_type;
0670 typedef typename TraitsType::KHash hash_compare_type;
0671 typedef hash_buffer< key_type, input_type, type_to_key_func_type, hash_compare_type > buffer_type;
0672 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0673 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
0674 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
0675 #endif
0676 private:
0677
0678 private:
0679 enum op_type { try__put, get__item, res_port
0680 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0681 , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
0682 #endif
0683 };
0684
0685 class key_matching_port_operation : public aggregated_operation<key_matching_port_operation> {
0686 public:
0687 char type;
0688 input_type my_val;
0689 input_type *my_arg;
0690 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0691 predecessor_type *pred;
0692 size_t cnt_val;
0693 predecessor_list_type *plist;
0694 #endif
0695
0696 key_matching_port_operation(const input_type& e, op_type t) :
0697 type(char(t)), my_val(e) {}
0698
0699 key_matching_port_operation(const input_type* p, op_type t) :
0700 type(char(t)), my_arg(const_cast<input_type*>(p)) {}
0701
0702 key_matching_port_operation(op_type t) : type(char(t)) {}
0703 };
0704
0705 typedef internal::aggregating_functor<class_type, key_matching_port_operation> handler_type;
0706 friend class internal::aggregating_functor<class_type, key_matching_port_operation>;
0707 aggregator<handler_type, key_matching_port_operation> my_aggregator;
0708
0709 void handle_operations(key_matching_port_operation* op_list) {
0710 key_matching_port_operation *current;
0711 while(op_list) {
0712 current = op_list;
0713 op_list = op_list->next;
0714 switch(current->type) {
0715 case try__put: {
0716 bool was_inserted = this->insert_with_key(current->my_val);
0717
0718 __TBB_store_with_release(current->status, was_inserted ? SUCCEEDED : FAILED);
0719 }
0720 break;
0721 case get__item:
0722
0723 if(!this->find_with_key(my_join->current_key, *(current->my_arg))) {
0724 __TBB_ASSERT(false, "Failed to find item corresponding to current_key.");
0725 }
0726 __TBB_store_with_release(current->status, SUCCEEDED);
0727 break;
0728 case res_port:
0729
0730 this->delete_with_key(my_join->current_key);
0731 __TBB_store_with_release(current->status, SUCCEEDED);
0732 break;
0733 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0734 case add_blt_pred:
0735 my_built_predecessors.add_edge(*(current->pred));
0736 __TBB_store_with_release(current->status, SUCCEEDED);
0737 break;
0738 case del_blt_pred:
0739 my_built_predecessors.delete_edge(*(current->pred));
0740 __TBB_store_with_release(current->status, SUCCEEDED);
0741 break;
0742 case blt_pred_cnt:
0743 current->cnt_val = my_built_predecessors.edge_count();
0744 __TBB_store_with_release(current->status, SUCCEEDED);
0745 break;
0746 case blt_pred_cpy:
0747 my_built_predecessors.copy_edges(*(current->plist));
0748 __TBB_store_with_release(current->status, SUCCEEDED);
0749 break;
0750 #endif
0751 }
0752 }
0753 }
0754
0755 protected:
0756 template< typename R, typename B > friend class run_and_put_task;
0757 template<typename X, typename Y> friend class internal::broadcast_cache;
0758 template<typename X, typename Y> friend class internal::round_robin_cache;
0759 task *try_put_task(const input_type& v) __TBB_override {
0760 key_matching_port_operation op_data(v, try__put);
0761 task *rtask = NULL;
0762 my_aggregator.execute(&op_data);
0763 if(op_data.status == SUCCEEDED) {
0764 rtask = my_join->increment_key_count((*(this->get_key_func()))(v), false);
0765
0766 if(!rtask) rtask = SUCCESSFULLY_ENQUEUED;
0767 }
0768 return rtask;
0769 }
0770
0771 graph& graph_reference() const __TBB_override {
0772 return my_join->graph_ref;
0773 }
0774
0775 public:
0776
0777 key_matching_port() : receiver<input_type>(), buffer_type() {
0778 my_join = NULL;
0779 my_aggregator.initialize_handler(handler_type(this));
0780 }
0781
0782
0783 key_matching_port(const key_matching_port& ) : receiver<input_type>(), buffer_type() {
0784 my_join = NULL;
0785 my_aggregator.initialize_handler(handler_type(this));
0786 }
0787
0788 ~key_matching_port() { }
0789
0790 void set_join_node_pointer(forwarding_base *join) {
0791 my_join = dynamic_cast<matching_forwarding_base<key_type>*>(join);
0792 }
0793
0794 void set_my_key_func(type_to_key_func_type *f) { this->set_key_func(f); }
0795
0796 type_to_key_func_type* get_my_key_func() { return this->get_key_func(); }
0797
0798 bool get_item( input_type &v ) {
0799
0800 key_matching_port_operation op_data(&v, get__item);
0801 my_aggregator.execute(&op_data);
0802 return op_data.status == SUCCEEDED;
0803 }
0804
0805 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0806 built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
0807
0808 void internal_add_built_predecessor(predecessor_type &p) __TBB_override {
0809 key_matching_port_operation op_data(add_blt_pred);
0810 op_data.pred = &p;
0811 my_aggregator.execute(&op_data);
0812 }
0813
0814 void internal_delete_built_predecessor(predecessor_type &p) __TBB_override {
0815 key_matching_port_operation op_data(del_blt_pred);
0816 op_data.pred = &p;
0817 my_aggregator.execute(&op_data);
0818 }
0819
0820 size_t predecessor_count() __TBB_override {
0821 key_matching_port_operation op_data(blt_pred_cnt);
0822 my_aggregator.execute(&op_data);
0823 return op_data.cnt_val;
0824 }
0825
0826 void copy_predecessors(predecessor_list_type &l) __TBB_override {
0827 key_matching_port_operation op_data(blt_pred_cpy);
0828 op_data.plist = &l;
0829 my_aggregator.execute(&op_data);
0830 }
0831 #endif
0832
0833
0834
0835 void reset_port() {
0836 key_matching_port_operation op_data(res_port);
0837 my_aggregator.execute(&op_data);
0838 return;
0839 }
0840
0841 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0842 void extract_receiver() {
0843 buffer_type::reset();
0844 my_built_predecessors.receiver_extract(*this);
0845 }
0846 #endif
0847 void reset_receiver(reset_flags f ) __TBB_override {
0848 tbb::internal::suppress_unused_warning(f);
0849 buffer_type::reset();
0850 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0851 if (f & rf_clear_edges)
0852 my_built_predecessors.clear();
0853 #endif
0854 }
0855
0856 private:
0857
0858
0859 matching_forwarding_base<key_type> *my_join;
0860 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0861 edge_container<predecessor_type> my_built_predecessors;
0862 #endif
0863 };
0864
0865 using namespace graph_policy_namespace;
0866
0867 template<typename JP, typename InputTuple, typename OutputTuple>
0868 class join_node_base;
0869
0870
0871 template<typename JP, typename InputTuple, typename OutputTuple>
0872 class join_node_FE;
0873
0874 template<typename InputTuple, typename OutputTuple>
0875 class join_node_FE<reserving, InputTuple, OutputTuple> : public forwarding_base {
0876 public:
0877 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
0878 typedef OutputTuple output_type;
0879 typedef InputTuple input_type;
0880 typedef join_node_base<reserving, InputTuple, OutputTuple> base_node_type;
0881
0882 join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) {
0883 ports_with_no_inputs = N;
0884 join_helper<N>::set_join_node_pointer(my_inputs, this);
0885 }
0886
0887 join_node_FE(const join_node_FE& other) : forwarding_base((other.forwarding_base::graph_ref)), my_node(NULL) {
0888 ports_with_no_inputs = N;
0889 join_helper<N>::set_join_node_pointer(my_inputs, this);
0890 }
0891
0892 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
0893
0894 void increment_port_count() __TBB_override {
0895 ++ports_with_no_inputs;
0896 }
0897
0898
0899 task * decrement_port_count(bool handle_task) __TBB_override {
0900 if(ports_with_no_inputs.fetch_and_decrement() == 1) {
0901 if(internal::is_graph_active(this->graph_ref)) {
0902 task *rtask = new ( task::allocate_additional_child_of( *(this->graph_ref.root_task()) ) )
0903 forward_task_bypass<base_node_type>(*my_node);
0904 if(!handle_task) return rtask;
0905 internal::spawn_in_graph_arena(this->graph_ref, *rtask);
0906 }
0907 }
0908 return NULL;
0909 }
0910
0911 input_type &input_ports() { return my_inputs; }
0912
0913 protected:
0914
0915 void reset( reset_flags f) {
0916
0917 ports_with_no_inputs = N;
0918 join_helper<N>::reset_inputs(my_inputs, f);
0919 }
0920
0921 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0922 void extract( ) {
0923
0924 ports_with_no_inputs = N;
0925 join_helper<N>::extract_inputs(my_inputs);
0926 }
0927 #endif
0928
0929
0930
0931 bool tuple_build_may_succeed() {
0932 return !ports_with_no_inputs;
0933 }
0934
0935 bool try_to_make_tuple(output_type &out) {
0936 if(ports_with_no_inputs) return false;
0937 return join_helper<N>::reserve(my_inputs, out);
0938 }
0939
0940 void tuple_accepted() {
0941 join_helper<N>::consume_reservations(my_inputs);
0942 }
0943 void tuple_rejected() {
0944 join_helper<N>::release_reservations(my_inputs);
0945 }
0946
0947 input_type my_inputs;
0948 base_node_type *my_node;
0949 atomic<size_t> ports_with_no_inputs;
0950 };
0951
0952 template<typename InputTuple, typename OutputTuple>
0953 class join_node_FE<queueing, InputTuple, OutputTuple> : public forwarding_base {
0954 public:
0955 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
0956 typedef OutputTuple output_type;
0957 typedef InputTuple input_type;
0958 typedef join_node_base<queueing, InputTuple, OutputTuple> base_node_type;
0959
0960 join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) {
0961 ports_with_no_items = N;
0962 join_helper<N>::set_join_node_pointer(my_inputs, this);
0963 }
0964
0965 join_node_FE(const join_node_FE& other) : forwarding_base((other.forwarding_base::graph_ref)), my_node(NULL) {
0966 ports_with_no_items = N;
0967 join_helper<N>::set_join_node_pointer(my_inputs, this);
0968 }
0969
0970
0971 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
0972
0973 void reset_port_count() {
0974 ports_with_no_items = N;
0975 }
0976
0977
0978 task * decrement_port_count(bool handle_task) __TBB_override
0979 {
0980 if(ports_with_no_items.fetch_and_decrement() == 1) {
0981 if(internal::is_graph_active(this->graph_ref)) {
0982 task *rtask = new ( task::allocate_additional_child_of( *(this->graph_ref.root_task()) ) )
0983 forward_task_bypass <base_node_type>(*my_node);
0984 if(!handle_task) return rtask;
0985 internal::spawn_in_graph_arena(this->graph_ref, *rtask);
0986 }
0987 }
0988 return NULL;
0989 }
0990
0991 void increment_port_count() __TBB_override { __TBB_ASSERT(false, NULL); }
0992
0993 input_type &input_ports() { return my_inputs; }
0994
0995 protected:
0996
0997 void reset( reset_flags f) {
0998 reset_port_count();
0999 join_helper<N>::reset_inputs(my_inputs, f );
1000 }
1001
1002 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1003 void extract() {
1004 reset_port_count();
1005 join_helper<N>::extract_inputs(my_inputs);
1006 }
1007 #endif
1008
1009
1010 bool tuple_build_may_succeed() {
1011 return !ports_with_no_items;
1012 }
1013
1014 bool try_to_make_tuple(output_type &out) {
1015 if(ports_with_no_items) return false;
1016 return join_helper<N>::get_items(my_inputs, out);
1017 }
1018
1019 void tuple_accepted() {
1020 reset_port_count();
1021 join_helper<N>::reset_ports(my_inputs);
1022 }
1023 void tuple_rejected() {
1024
1025 }
1026
1027 input_type my_inputs;
1028 base_node_type *my_node;
1029 atomic<size_t> ports_with_no_items;
1030 };
1031
1032
1033 template<typename InputTuple, typename OutputTuple, typename K, typename KHash>
1034 class join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple> : public matching_forwarding_base<K>,
1035
1036 public hash_buffer<
1037 typename tbb::internal::strip<K>::type&,
1038 count_element<typename tbb::internal::strip<K>::type>,
1039 internal::type_to_key_function_body<
1040 count_element<typename tbb::internal::strip<K>::type>,
1041 typename tbb::internal::strip<K>::type& >,
1042 KHash >,
1043
1044 public item_buffer<OutputTuple> {
1045 public:
1046 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
1047 typedef OutputTuple output_type;
1048 typedef InputTuple input_type;
1049 typedef K key_type;
1050 typedef typename tbb::internal::strip<key_type>::type unref_key_type;
1051 typedef KHash key_hash_compare;
1052
1053 typedef count_element<unref_key_type> count_element_type;
1054
1055 typedef key_to_count_functor<unref_key_type> key_to_count_func;
1056 typedef internal::type_to_key_function_body< count_element_type, unref_key_type&> TtoK_function_body_type;
1057 typedef internal::type_to_key_function_body_leaf<count_element_type, unref_key_type&, key_to_count_func> TtoK_function_body_leaf_type;
1058
1059
1060 typedef hash_buffer< unref_key_type&, count_element_type, TtoK_function_body_type, key_hash_compare >
1061 key_to_count_buffer_type;
1062 typedef item_buffer<output_type> output_buffer_type;
1063 typedef join_node_base<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> base_node_type;
1064 typedef matching_forwarding_base<key_type> forwarding_base_type;
1065
1066
1067
1068
1069 private:
1070 enum op_type { res_count, inc_count, may_succeed, try_make };
1071 typedef join_node_FE<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> class_type;
1072
1073 class key_matching_FE_operation : public aggregated_operation<key_matching_FE_operation> {
1074 public:
1075 char type;
1076 unref_key_type my_val;
1077 output_type* my_output;
1078 task *bypass_t;
1079 bool enqueue_task;
1080
1081 key_matching_FE_operation(const unref_key_type& e , bool q_task , op_type t) : type(char(t)), my_val(e),
1082 my_output(NULL), bypass_t(NULL), enqueue_task(q_task) {}
1083 key_matching_FE_operation(output_type *p, op_type t) : type(char(t)), my_output(p), bypass_t(NULL),
1084 enqueue_task(true) {}
1085
1086 key_matching_FE_operation(op_type t) : type(char(t)), my_output(NULL), bypass_t(NULL), enqueue_task(true) {}
1087 };
1088
1089 typedef internal::aggregating_functor<class_type, key_matching_FE_operation> handler_type;
1090 friend class internal::aggregating_functor<class_type, key_matching_FE_operation>;
1091 aggregator<handler_type, key_matching_FE_operation> my_aggregator;
1092
1093
1094
1095
1096 task * fill_output_buffer(unref_key_type &t, bool should_enqueue, bool handle_task) {
1097 output_type l_out;
1098 task *rtask = NULL;
1099 bool do_fwd = should_enqueue && this->buffer_empty() && internal::is_graph_active(this->graph_ref);
1100 this->current_key = t;
1101 this->delete_with_key(this->current_key);
1102 if(join_helper<N>::get_items(my_inputs, l_out)) {
1103 this->push_back(l_out);
1104 if(do_fwd) {
1105 rtask = new ( task::allocate_additional_child_of( *(this->graph_ref.root_task()) ) )
1106 forward_task_bypass<base_node_type>(*my_node);
1107 if(handle_task) {
1108 internal::spawn_in_graph_arena(this->graph_ref, *rtask);
1109 rtask = NULL;
1110 }
1111 do_fwd = false;
1112 }
1113
1114 join_helper<N>::reset_ports(my_inputs);
1115 }
1116 else {
1117 __TBB_ASSERT(false, "should have had something to push");
1118 }
1119 return rtask;
1120 }
1121
1122 void handle_operations(key_matching_FE_operation* op_list) {
1123 key_matching_FE_operation *current;
1124 while(op_list) {
1125 current = op_list;
1126 op_list = op_list->next;
1127 switch(current->type) {
1128 case res_count:
1129 {
1130 this->destroy_front();
1131 __TBB_store_with_release(current->status, SUCCEEDED);
1132 }
1133 break;
1134 case inc_count: {
1135 count_element_type *p = 0;
1136 unref_key_type &t = current->my_val;
1137 bool do_enqueue = current->enqueue_task;
1138 if(!(this->find_ref_with_key(t,p))) {
1139 count_element_type ev;
1140 ev.my_key = t;
1141 ev.my_value = 0;
1142 this->insert_with_key(ev);
1143 if(!(this->find_ref_with_key(t,p))) {
1144 __TBB_ASSERT(false, "should find key after inserting it");
1145 }
1146 }
1147 if(++(p->my_value) == size_t(N)) {
1148 task *rtask = fill_output_buffer(t, true, do_enqueue);
1149 __TBB_ASSERT(!rtask || !do_enqueue, "task should not be returned");
1150 current->bypass_t = rtask;
1151 }
1152 }
1153 __TBB_store_with_release(current->status, SUCCEEDED);
1154 break;
1155 case may_succeed:
1156 __TBB_store_with_release(current->status, this->buffer_empty() ? FAILED : SUCCEEDED);
1157 break;
1158 case try_make:
1159 if(this->buffer_empty()) {
1160 __TBB_store_with_release(current->status, FAILED);
1161 }
1162 else {
1163 *(current->my_output) = this->front();
1164 __TBB_store_with_release(current->status, SUCCEEDED);
1165 }
1166 break;
1167 }
1168 }
1169 }
1170
1171
1172 public:
1173 template<typename FunctionTuple>
1174 join_node_FE(graph &g, FunctionTuple &TtoK_funcs) : forwarding_base_type(g), my_node(NULL) {
1175 join_helper<N>::set_join_node_pointer(my_inputs, this);
1176 join_helper<N>::set_key_functors(my_inputs, TtoK_funcs);
1177 my_aggregator.initialize_handler(handler_type(this));
1178 TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func());
1179 this->set_key_func(cfb);
1180 }
1181
1182 join_node_FE(const join_node_FE& other) : forwarding_base_type((other.forwarding_base_type::graph_ref)), key_to_count_buffer_type(),
1183 output_buffer_type() {
1184 my_node = NULL;
1185 join_helper<N>::set_join_node_pointer(my_inputs, this);
1186 join_helper<N>::copy_key_functors(my_inputs, const_cast<input_type &>(other.my_inputs));
1187 my_aggregator.initialize_handler(handler_type(this));
1188 TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func());
1189 this->set_key_func(cfb);
1190 }
1191
1192
1193 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
1194
1195 void reset_port_count() {
1196 key_matching_FE_operation op_data(res_count);
1197 my_aggregator.execute(&op_data);
1198 return;
1199 }
1200
1201
1202
1203 task *increment_key_count(unref_key_type const & t, bool handle_task) __TBB_override {
1204 key_matching_FE_operation op_data(t, handle_task, inc_count);
1205 my_aggregator.execute(&op_data);
1206 return op_data.bypass_t;
1207 }
1208
1209 task *decrement_port_count(bool ) __TBB_override { __TBB_ASSERT(false, NULL); return NULL; }
1210
1211 void increment_port_count() __TBB_override { __TBB_ASSERT(false, NULL); }
1212
1213 input_type &input_ports() { return my_inputs; }
1214
1215 protected:
1216
1217 void reset( reset_flags f ) {
1218
1219 join_helper<N>::reset_inputs(my_inputs, f);
1220
1221 key_to_count_buffer_type::reset();
1222 output_buffer_type::reset();
1223 }
1224
1225 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1226 void extract() {
1227
1228 join_helper<N>::extract_inputs(my_inputs);
1229 key_to_count_buffer_type::reset();
1230 output_buffer_type::reset();
1231
1232 }
1233 #endif
1234
1235
1236 bool tuple_build_may_succeed() {
1237 key_matching_FE_operation op_data(may_succeed);
1238 my_aggregator.execute(&op_data);
1239 return op_data.status == SUCCEEDED;
1240 }
1241
1242
1243
1244 bool try_to_make_tuple(output_type &out) {
1245 key_matching_FE_operation op_data(&out,try_make);
1246 my_aggregator.execute(&op_data);
1247 return op_data.status == SUCCEEDED;
1248 }
1249
1250 void tuple_accepted() {
1251 reset_port_count();
1252 }
1253
1254 void tuple_rejected() {
1255
1256 }
1257
1258 input_type my_inputs;
1259 base_node_type *my_node;
1260 };
1261
1262
1263 template<typename JP, typename InputTuple, typename OutputTuple>
1264 class join_node_base : public graph_node, public join_node_FE<JP, InputTuple, OutputTuple>,
1265 public sender<OutputTuple> {
1266 protected:
1267 using graph_node::my_graph;
1268 public:
1269 typedef OutputTuple output_type;
1270
1271 typedef typename sender<output_type>::successor_type successor_type;
1272 typedef join_node_FE<JP, InputTuple, OutputTuple> input_ports_type;
1273 using input_ports_type::tuple_build_may_succeed;
1274 using input_ports_type::try_to_make_tuple;
1275 using input_ports_type::tuple_accepted;
1276 using input_ports_type::tuple_rejected;
1277 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1278 typedef typename sender<output_type>::built_successors_type built_successors_type;
1279 typedef typename sender<output_type>::successor_list_type successor_list_type;
1280 #endif
1281
1282 private:
1283
1284 enum op_type { reg_succ, rem_succ, try__get, do_fwrd, do_fwrd_bypass
1285 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1286 , add_blt_succ, del_blt_succ, blt_succ_cnt, blt_succ_cpy
1287 #endif
1288 };
1289 typedef join_node_base<JP,InputTuple,OutputTuple> class_type;
1290
1291 class join_node_base_operation : public aggregated_operation<join_node_base_operation> {
1292 public:
1293 char type;
1294 union {
1295 output_type *my_arg;
1296 successor_type *my_succ;
1297 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1298 size_t cnt_val;
1299 successor_list_type *slist;
1300 #endif
1301 };
1302 task *bypass_t;
1303 join_node_base_operation(const output_type& e, op_type t) : type(char(t)),
1304 my_arg(const_cast<output_type*>(&e)), bypass_t(NULL) {}
1305 join_node_base_operation(const successor_type &s, op_type t) : type(char(t)),
1306 my_succ(const_cast<successor_type *>(&s)), bypass_t(NULL) {}
1307 join_node_base_operation(op_type t) : type(char(t)), bypass_t(NULL) {}
1308 };
1309
1310 typedef internal::aggregating_functor<class_type, join_node_base_operation> handler_type;
1311 friend class internal::aggregating_functor<class_type, join_node_base_operation>;
1312 bool forwarder_busy;
1313 aggregator<handler_type, join_node_base_operation> my_aggregator;
1314
1315 void handle_operations(join_node_base_operation* op_list) {
1316 join_node_base_operation *current;
1317 while(op_list) {
1318 current = op_list;
1319 op_list = op_list->next;
1320 switch(current->type) {
1321 case reg_succ: {
1322 my_successors.register_successor(*(current->my_succ));
1323 if(tuple_build_may_succeed() && !forwarder_busy && internal::is_graph_active(my_graph)) {
1324 task *rtask = new ( task::allocate_additional_child_of(*(my_graph.root_task())) )
1325 forward_task_bypass
1326 <join_node_base<JP,InputTuple,OutputTuple> >(*this);
1327 internal::spawn_in_graph_arena(my_graph, *rtask);
1328 forwarder_busy = true;
1329 }
1330 __TBB_store_with_release(current->status, SUCCEEDED);
1331 }
1332 break;
1333 case rem_succ:
1334 my_successors.remove_successor(*(current->my_succ));
1335 __TBB_store_with_release(current->status, SUCCEEDED);
1336 break;
1337 case try__get:
1338 if(tuple_build_may_succeed()) {
1339 if(try_to_make_tuple(*(current->my_arg))) {
1340 tuple_accepted();
1341 __TBB_store_with_release(current->status, SUCCEEDED);
1342 }
1343 else __TBB_store_with_release(current->status, FAILED);
1344 }
1345 else __TBB_store_with_release(current->status, FAILED);
1346 break;
1347 case do_fwrd_bypass: {
1348 bool build_succeeded;
1349 task *last_task = NULL;
1350 output_type out;
1351 if(tuple_build_may_succeed()) {
1352 do {
1353 build_succeeded = try_to_make_tuple(out);
1354 if(build_succeeded) {
1355 task *new_task = my_successors.try_put_task(out);
1356 last_task = combine_tasks(my_graph, last_task, new_task);
1357 if(new_task) {
1358 tuple_accepted();
1359 }
1360 else {
1361 tuple_rejected();
1362 build_succeeded = false;
1363 }
1364 }
1365 } while(build_succeeded);
1366 }
1367 current->bypass_t = last_task;
1368 __TBB_store_with_release(current->status, SUCCEEDED);
1369 forwarder_busy = false;
1370 }
1371 break;
1372 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1373 case add_blt_succ:
1374 my_successors.internal_add_built_successor(*(current->my_succ));
1375 __TBB_store_with_release(current->status, SUCCEEDED);
1376 break;
1377 case del_blt_succ:
1378 my_successors.internal_delete_built_successor(*(current->my_succ));
1379 __TBB_store_with_release(current->status, SUCCEEDED);
1380 break;
1381 case blt_succ_cnt:
1382 current->cnt_val = my_successors.successor_count();
1383 __TBB_store_with_release(current->status, SUCCEEDED);
1384 break;
1385 case blt_succ_cpy:
1386 my_successors.copy_successors(*(current->slist));
1387 __TBB_store_with_release(current->status, SUCCEEDED);
1388 break;
1389 #endif
1390 }
1391 }
1392 }
1393
1394 public:
1395 join_node_base(graph &g) : graph_node(g), input_ports_type(g), forwarder_busy(false) {
1396 my_successors.set_owner(this);
1397 input_ports_type::set_my_node(this);
1398 my_aggregator.initialize_handler(handler_type(this));
1399 }
1400
1401 join_node_base(const join_node_base& other) :
1402 graph_node(other.graph_node::my_graph), input_ports_type(other),
1403 sender<OutputTuple>(), forwarder_busy(false), my_successors() {
1404 my_successors.set_owner(this);
1405 input_ports_type::set_my_node(this);
1406 my_aggregator.initialize_handler(handler_type(this));
1407 }
1408
1409 template<typename FunctionTuple>
1410 join_node_base(graph &g, FunctionTuple f) : graph_node(g), input_ports_type(g, f), forwarder_busy(false) {
1411 my_successors.set_owner(this);
1412 input_ports_type::set_my_node(this);
1413 my_aggregator.initialize_handler(handler_type(this));
1414 }
1415
1416 bool register_successor(successor_type &r) __TBB_override {
1417 join_node_base_operation op_data(r, reg_succ);
1418 my_aggregator.execute(&op_data);
1419 return op_data.status == SUCCEEDED;
1420 }
1421
1422 bool remove_successor( successor_type &r) __TBB_override {
1423 join_node_base_operation op_data(r, rem_succ);
1424 my_aggregator.execute(&op_data);
1425 return op_data.status == SUCCEEDED;
1426 }
1427
1428 bool try_get( output_type &v) __TBB_override {
1429 join_node_base_operation op_data(v, try__get);
1430 my_aggregator.execute(&op_data);
1431 return op_data.status == SUCCEEDED;
1432 }
1433
1434 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1435 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1436
1437 void internal_add_built_successor( successor_type &r) __TBB_override {
1438 join_node_base_operation op_data(r, add_blt_succ);
1439 my_aggregator.execute(&op_data);
1440 }
1441
1442 void internal_delete_built_successor( successor_type &r) __TBB_override {
1443 join_node_base_operation op_data(r, del_blt_succ);
1444 my_aggregator.execute(&op_data);
1445 }
1446
1447 size_t successor_count() __TBB_override {
1448 join_node_base_operation op_data(blt_succ_cnt);
1449 my_aggregator.execute(&op_data);
1450 return op_data.cnt_val;
1451 }
1452
1453 void copy_successors(successor_list_type &l) __TBB_override {
1454 join_node_base_operation op_data(blt_succ_cpy);
1455 op_data.slist = &l;
1456 my_aggregator.execute(&op_data);
1457 }
1458 #endif
1459
1460 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1461 void extract() __TBB_override {
1462 input_ports_type::extract();
1463 my_successors.built_successors().sender_extract(*this);
1464 }
1465 #endif
1466
1467 protected:
1468
1469 void reset_node(reset_flags f) __TBB_override {
1470 input_ports_type::reset(f);
1471 if(f & rf_clear_edges) my_successors.clear();
1472 }
1473
1474 private:
1475 broadcast_cache<output_type, null_rw_mutex> my_successors;
1476
1477 friend class forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> >;
1478 task *forward_task() {
1479 join_node_base_operation op_data(do_fwrd_bypass);
1480 my_aggregator.execute(&op_data);
1481 return op_data.bypass_t;
1482 }
1483
1484 };
1485
1486
1487 template<int N, template<class> class PT, typename OutputTuple, typename JP>
1488 struct join_base {
1489 typedef typename internal::join_node_base<JP, typename wrap_tuple_elements<N,PT,OutputTuple>::type, OutputTuple> type;
1490 };
1491
1492 template<int N, typename OutputTuple, typename K, typename KHash>
1493 struct join_base<N, key_matching_port, OutputTuple, key_matching<K,KHash> > {
1494 typedef key_matching<K, KHash> key_traits_type;
1495 typedef K key_type;
1496 typedef KHash key_hash_compare;
1497 typedef typename internal::join_node_base< key_traits_type,
1498
1499 typename wrap_key_tuple_elements<N,key_matching_port,key_traits_type,OutputTuple>::type,
1500 OutputTuple > type;
1501 };
1502
1503
1504
1505
1506
1507 template<int N, template<class> class PT, typename OutputTuple, typename JP>
1508 class unfolded_join_node : public join_base<N,PT,OutputTuple,JP>::type {
1509 public:
1510 typedef typename wrap_tuple_elements<N, PT, OutputTuple>::type input_ports_type;
1511 typedef OutputTuple output_type;
1512 private:
1513 typedef join_node_base<JP, input_ports_type, output_type > base_type;
1514 public:
1515 unfolded_join_node(graph &g) : base_type(g) {}
1516 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1517 };
1518
1519 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1520 template <typename K, typename T>
1521 struct key_from_message_body {
1522 K operator()(const T& t) const {
1523 using tbb::flow::key_from_message;
1524 return key_from_message<K>(t);
1525 }
1526 };
1527
1528 template <typename K, typename T>
1529 struct key_from_message_body<K&,T> {
1530 const K& operator()(const T& t) const {
1531 using tbb::flow::key_from_message;
1532 return key_from_message<const K&>(t);
1533 }
1534 };
1535 #endif
1536
1537
1538
1539 template<typename OutputTuple, typename K, typename KHash>
1540 class unfolded_join_node<2,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1541 join_base<2,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1542 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1543 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1544 public:
1545 typedef typename wrap_key_tuple_elements<2,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1546 typedef OutputTuple output_type;
1547 private:
1548 typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1549 typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1550 typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1551 typedef typename tbb::flow::tuple< f0_p, f1_p > func_initializer_type;
1552 public:
1553 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1554 unfolded_join_node(graph &g) : base_type(g,
1555 func_initializer_type(
1556 new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1557 new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>())
1558 ) ) {
1559 }
1560 #endif
1561 template<typename Body0, typename Body1>
1562 unfolded_join_node(graph &g, Body0 body0, Body1 body1) : base_type(g,
1563 func_initializer_type(
1564 new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1565 new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1)
1566 ) ) {
1567 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 2, "wrong number of body initializers");
1568 }
1569 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1570 };
1571
1572 template<typename OutputTuple, typename K, typename KHash>
1573 class unfolded_join_node<3,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1574 join_base<3,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1575 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1576 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1577 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1578 public:
1579 typedef typename wrap_key_tuple_elements<3,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1580 typedef OutputTuple output_type;
1581 private:
1582 typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1583 typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1584 typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1585 typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1586 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p > func_initializer_type;
1587 public:
1588 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1589 unfolded_join_node(graph &g) : base_type(g,
1590 func_initializer_type(
1591 new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1592 new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1593 new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>())
1594 ) ) {
1595 }
1596 #endif
1597 template<typename Body0, typename Body1, typename Body2>
1598 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2) : base_type(g,
1599 func_initializer_type(
1600 new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1601 new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1602 new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2)
1603 ) ) {
1604 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 3, "wrong number of body initializers");
1605 }
1606 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1607 };
1608
1609 template<typename OutputTuple, typename K, typename KHash>
1610 class unfolded_join_node<4,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1611 join_base<4,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1612 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1613 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1614 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1615 typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1616 public:
1617 typedef typename wrap_key_tuple_elements<4,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1618 typedef OutputTuple output_type;
1619 private:
1620 typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1621 typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1622 typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1623 typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1624 typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1625 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p > func_initializer_type;
1626 public:
1627 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1628 unfolded_join_node(graph &g) : base_type(g,
1629 func_initializer_type(
1630 new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1631 new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1632 new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1633 new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>())
1634 ) ) {
1635 }
1636 #endif
1637 template<typename Body0, typename Body1, typename Body2, typename Body3>
1638 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3) : base_type(g,
1639 func_initializer_type(
1640 new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1641 new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1642 new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1643 new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3)
1644 ) ) {
1645 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 4, "wrong number of body initializers");
1646 }
1647 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1648 };
1649
1650 template<typename OutputTuple, typename K, typename KHash>
1651 class unfolded_join_node<5,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1652 join_base<5,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1653 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1654 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1655 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1656 typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1657 typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1658 public:
1659 typedef typename wrap_key_tuple_elements<5,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1660 typedef OutputTuple output_type;
1661 private:
1662 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1663 typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1664 typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1665 typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1666 typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1667 typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1668 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p > func_initializer_type;
1669 public:
1670 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1671 unfolded_join_node(graph &g) : base_type(g,
1672 func_initializer_type(
1673 new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1674 new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1675 new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1676 new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1677 new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>())
1678 ) ) {
1679 }
1680 #endif
1681 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4>
1682 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4) : base_type(g,
1683 func_initializer_type(
1684 new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1685 new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1686 new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1687 new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1688 new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4)
1689 ) ) {
1690 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 5, "wrong number of body initializers");
1691 }
1692 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1693 };
1694
1695 #if __TBB_VARIADIC_MAX >= 6
1696 template<typename OutputTuple, typename K, typename KHash>
1697 class unfolded_join_node<6,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1698 join_base<6,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1699 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1700 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1701 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1702 typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1703 typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1704 typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1705 public:
1706 typedef typename wrap_key_tuple_elements<6,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1707 typedef OutputTuple output_type;
1708 private:
1709 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1710 typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1711 typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1712 typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1713 typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1714 typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1715 typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1716 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p > func_initializer_type;
1717 public:
1718 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1719 unfolded_join_node(graph &g) : base_type(g,
1720 func_initializer_type(
1721 new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1722 new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1723 new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1724 new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1725 new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1726 new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>())
1727 ) ) {
1728 }
1729 #endif
1730 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, typename Body5>
1731 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, Body5 body5)
1732 : base_type(g, func_initializer_type(
1733 new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1734 new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1735 new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1736 new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1737 new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1738 new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5)
1739 ) ) {
1740 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 6, "wrong number of body initializers");
1741 }
1742 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1743 };
1744 #endif
1745
1746 #if __TBB_VARIADIC_MAX >= 7
1747 template<typename OutputTuple, typename K, typename KHash>
1748 class unfolded_join_node<7,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1749 join_base<7,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1750 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1751 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1752 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1753 typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1754 typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1755 typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1756 typedef typename tbb::flow::tuple_element<6, OutputTuple>::type T6;
1757 public:
1758 typedef typename wrap_key_tuple_elements<7,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1759 typedef OutputTuple output_type;
1760 private:
1761 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1762 typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1763 typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1764 typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1765 typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1766 typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1767 typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1768 typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1769 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p > func_initializer_type;
1770 public:
1771 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1772 unfolded_join_node(graph &g) : base_type(g,
1773 func_initializer_type(
1774 new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1775 new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1776 new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1777 new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1778 new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1779 new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1780 new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>())
1781 ) ) {
1782 }
1783 #endif
1784 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1785 typename Body5, typename Body6>
1786 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1787 Body5 body5, Body6 body6) : base_type(g, func_initializer_type(
1788 new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1789 new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1790 new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1791 new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1792 new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1793 new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1794 new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6)
1795 ) ) {
1796 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 7, "wrong number of body initializers");
1797 }
1798 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1799 };
1800 #endif
1801
1802 #if __TBB_VARIADIC_MAX >= 8
1803 template<typename OutputTuple, typename K, typename KHash>
1804 class unfolded_join_node<8,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1805 join_base<8,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1806 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1807 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1808 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1809 typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1810 typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1811 typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1812 typedef typename tbb::flow::tuple_element<6, OutputTuple>::type T6;
1813 typedef typename tbb::flow::tuple_element<7, OutputTuple>::type T7;
1814 public:
1815 typedef typename wrap_key_tuple_elements<8,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1816 typedef OutputTuple output_type;
1817 private:
1818 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1819 typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1820 typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1821 typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1822 typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1823 typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1824 typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1825 typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1826 typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1827 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p > func_initializer_type;
1828 public:
1829 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1830 unfolded_join_node(graph &g) : base_type(g,
1831 func_initializer_type(
1832 new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1833 new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1834 new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1835 new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1836 new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1837 new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1838 new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1839 new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>())
1840 ) ) {
1841 }
1842 #endif
1843 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1844 typename Body5, typename Body6, typename Body7>
1845 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1846 Body5 body5, Body6 body6, Body7 body7) : base_type(g, func_initializer_type(
1847 new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1848 new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1849 new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1850 new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1851 new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1852 new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1853 new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6),
1854 new internal::type_to_key_function_body_leaf<T7, K, Body7>(body7)
1855 ) ) {
1856 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 8, "wrong number of body initializers");
1857 }
1858 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1859 };
1860 #endif
1861
1862 #if __TBB_VARIADIC_MAX >= 9
1863 template<typename OutputTuple, typename K, typename KHash>
1864 class unfolded_join_node<9,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1865 join_base<9,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1866 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1867 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1868 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1869 typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1870 typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1871 typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1872 typedef typename tbb::flow::tuple_element<6, OutputTuple>::type T6;
1873 typedef typename tbb::flow::tuple_element<7, OutputTuple>::type T7;
1874 typedef typename tbb::flow::tuple_element<8, OutputTuple>::type T8;
1875 public:
1876 typedef typename wrap_key_tuple_elements<9,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1877 typedef OutputTuple output_type;
1878 private:
1879 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1880 typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1881 typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1882 typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1883 typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1884 typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1885 typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1886 typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1887 typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1888 typedef typename internal::type_to_key_function_body<T8, K> *f8_p;
1889 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p > func_initializer_type;
1890 public:
1891 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1892 unfolded_join_node(graph &g) : base_type(g,
1893 func_initializer_type(
1894 new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1895 new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1896 new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1897 new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1898 new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1899 new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1900 new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1901 new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1902 new internal::type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>())
1903 ) ) {
1904 }
1905 #endif
1906 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1907 typename Body5, typename Body6, typename Body7, typename Body8>
1908 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1909 Body5 body5, Body6 body6, Body7 body7, Body8 body8) : base_type(g, func_initializer_type(
1910 new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1911 new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1912 new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1913 new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1914 new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1915 new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1916 new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6),
1917 new internal::type_to_key_function_body_leaf<T7, K, Body7>(body7),
1918 new internal::type_to_key_function_body_leaf<T8, K, Body8>(body8)
1919 ) ) {
1920 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 9, "wrong number of body initializers");
1921 }
1922 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1923 };
1924 #endif
1925
1926 #if __TBB_VARIADIC_MAX >= 10
1927 template<typename OutputTuple, typename K, typename KHash>
1928 class unfolded_join_node<10,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1929 join_base<10,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1930 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1931 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1932 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1933 typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1934 typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1935 typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1936 typedef typename tbb::flow::tuple_element<6, OutputTuple>::type T6;
1937 typedef typename tbb::flow::tuple_element<7, OutputTuple>::type T7;
1938 typedef typename tbb::flow::tuple_element<8, OutputTuple>::type T8;
1939 typedef typename tbb::flow::tuple_element<9, OutputTuple>::type T9;
1940 public:
1941 typedef typename wrap_key_tuple_elements<10,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1942 typedef OutputTuple output_type;
1943 private:
1944 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1945 typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1946 typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1947 typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1948 typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1949 typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1950 typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1951 typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1952 typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1953 typedef typename internal::type_to_key_function_body<T8, K> *f8_p;
1954 typedef typename internal::type_to_key_function_body<T9, K> *f9_p;
1955 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p, f9_p > func_initializer_type;
1956 public:
1957 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1958 unfolded_join_node(graph &g) : base_type(g,
1959 func_initializer_type(
1960 new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1961 new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1962 new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1963 new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1964 new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1965 new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1966 new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1967 new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1968 new internal::type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>()),
1969 new internal::type_to_key_function_body_leaf<T9, K, key_from_message_body<K,T9> >(key_from_message_body<K,T9>())
1970 ) ) {
1971 }
1972 #endif
1973 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1974 typename Body5, typename Body6, typename Body7, typename Body8, typename Body9>
1975 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1976 Body5 body5, Body6 body6, Body7 body7, Body8 body8, Body9 body9) : base_type(g, func_initializer_type(
1977 new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1978 new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1979 new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1980 new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1981 new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1982 new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1983 new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6),
1984 new internal::type_to_key_function_body_leaf<T7, K, Body7>(body7),
1985 new internal::type_to_key_function_body_leaf<T8, K, Body8>(body8),
1986 new internal::type_to_key_function_body_leaf<T9, K, Body9>(body9)
1987 ) ) {
1988 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 10, "wrong number of body initializers");
1989 }
1990 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1991 };
1992 #endif
1993
1994
1995 template<size_t N, typename JNT>
1996 typename tbb::flow::tuple_element<N, typename JNT::input_ports_type>::type &input_port(JNT &jn) {
1997 return tbb::flow::get<N>(jn.input_ports());
1998 }
1999
2000 }
2001 #endif
2002