File indexing completed on 2025-12-18 10:24:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB__flow_graph_join_impl_H
0018 #define __TBB__flow_graph_join_impl_H
0019
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023
0024
0025
0026 struct forwarding_base : no_assign {
0027 forwarding_base(graph &g) : graph_ref(g) {}
0028 virtual ~forwarding_base() {}
0029 graph& graph_ref;
0030 };
0031
0032 struct queueing_forwarding_base : forwarding_base {
0033 using forwarding_base::forwarding_base;
0034
0035
0036 virtual graph_task* decrement_port_count(bool handle_task) = 0;
0037 };
0038
0039 struct reserving_forwarding_base : forwarding_base {
0040 using forwarding_base::forwarding_base;
0041
0042
0043 virtual graph_task* decrement_port_count() = 0;
0044 virtual void increment_port_count() = 0;
0045 };
0046
0047
0048
0049 template<typename KeyType>
0050 struct matching_forwarding_base : public forwarding_base {
0051 typedef typename std::decay<KeyType>::type current_key_type;
0052 matching_forwarding_base(graph &g) : forwarding_base(g) { }
0053 virtual graph_task* increment_key_count(current_key_type const & ) = 0;
0054 current_key_type current_key;
0055 };
0056
0057 template< int N >
0058 struct join_helper {
0059
0060 template< typename TupleType, typename PortType >
0061 static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
0062 std::get<N-1>( my_input ).set_join_node_pointer(port);
0063 join_helper<N-1>::set_join_node_pointer( my_input, port );
0064 }
0065 template< typename TupleType >
0066 static inline void consume_reservations( TupleType &my_input ) {
0067 std::get<N-1>( my_input ).consume();
0068 join_helper<N-1>::consume_reservations( my_input );
0069 }
0070
0071 template< typename TupleType >
0072 static inline void release_my_reservation( TupleType &my_input ) {
0073 std::get<N-1>( my_input ).release();
0074 }
0075
0076 template <typename TupleType>
0077 static inline void release_reservations( TupleType &my_input) {
0078 join_helper<N-1>::release_reservations(my_input);
0079 release_my_reservation(my_input);
0080 }
0081
0082 template< typename InputTuple, typename OutputTuple >
0083 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
0084 if ( !std::get<N-1>( my_input ).reserve( std::get<N-1>( out ) ) ) return false;
0085 if ( !join_helper<N-1>::reserve( my_input, out ) ) {
0086 release_my_reservation( my_input );
0087 return false;
0088 }
0089 return true;
0090 }
0091
0092 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0093 template <typename InputTuple, typename OutputTuple>
0094 static inline bool reserve(InputTuple& my_input, OutputTuple& out, message_metainfo& metainfo) {
0095 message_metainfo element_metainfo;
0096 if (!std::get<N - 1>(my_input).reserve(std::get<N - 1>(out), element_metainfo)) return false;
0097 if (!join_helper<N - 1>::reserve(my_input, out, metainfo)) {
0098 release_my_reservation(my_input);
0099 return false;
0100 }
0101 metainfo.merge(element_metainfo);
0102 return true;
0103
0104 }
0105 #endif
0106
0107 template<typename InputTuple, typename OutputTuple>
0108 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
0109 bool res = std::get<N-1>(my_input).get_item(std::get<N-1>(out) );
0110 return join_helper<N-1>::get_my_item(my_input, out) && res;
0111 }
0112
0113 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0114 template <typename InputTuple, typename OutputTuple>
0115 static inline bool get_my_item(InputTuple& my_input, OutputTuple& out, message_metainfo& metainfo) {
0116 message_metainfo element_metainfo;
0117 bool res = std::get<N-1>(my_input).get_item(std::get<N-1>(out), element_metainfo);
0118 metainfo.merge(element_metainfo);
0119 return join_helper<N-1>::get_my_item(my_input, out, metainfo) && res;
0120 }
0121 #endif
0122
0123 template<typename InputTuple, typename OutputTuple>
0124 static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
0125 return get_my_item(my_input, out);
0126 }
0127
0128 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0129 template <typename InputTuple, typename OutputTuple>
0130 static inline bool get_items(InputTuple& my_input, OutputTuple& out, message_metainfo& metainfo) {
0131 return get_my_item(my_input, out, metainfo);
0132 }
0133 #endif
0134
0135 template<typename InputTuple>
0136 static inline void reset_my_port(InputTuple &my_input) {
0137 join_helper<N-1>::reset_my_port(my_input);
0138 std::get<N-1>(my_input).reset_port();
0139 }
0140
0141 template<typename InputTuple>
0142 static inline void reset_ports(InputTuple& my_input) {
0143 reset_my_port(my_input);
0144 }
0145
0146 template<typename InputTuple, typename KeyFuncTuple>
0147 static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
0148 std::get<N-1>(my_input).set_my_key_func(std::get<N-1>(my_key_funcs));
0149 std::get<N-1>(my_key_funcs) = nullptr;
0150 join_helper<N-1>::set_key_functors(my_input, my_key_funcs);
0151 }
0152
0153 template< typename KeyFuncTuple>
0154 static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
0155 __TBB_ASSERT(
0156 std::get<N-1>(other_inputs).get_my_key_func(),
0157 "key matching join node should not be instantiated without functors."
0158 );
0159 std::get<N-1>(my_inputs).set_my_key_func(std::get<N-1>(other_inputs).get_my_key_func()->clone());
0160 join_helper<N-1>::copy_key_functors(my_inputs, other_inputs);
0161 }
0162
0163 template<typename InputTuple>
0164 static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
0165 join_helper<N-1>::reset_inputs(my_input, f);
0166 std::get<N-1>(my_input).reset_receiver(f);
0167 }
0168 };
0169
0170 template< >
0171 struct join_helper<1> {
0172
0173 template< typename TupleType, typename PortType >
0174 static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
0175 std::get<0>( my_input ).set_join_node_pointer(port);
0176 }
0177
0178 template< typename TupleType >
0179 static inline void consume_reservations( TupleType &my_input ) {
0180 std::get<0>( my_input ).consume();
0181 }
0182
0183 template< typename TupleType >
0184 static inline void release_my_reservation( TupleType &my_input ) {
0185 std::get<0>( my_input ).release();
0186 }
0187
0188 template<typename TupleType>
0189 static inline void release_reservations( TupleType &my_input) {
0190 release_my_reservation(my_input);
0191 }
0192
0193 template< typename InputTuple, typename OutputTuple >
0194 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
0195 return std::get<0>( my_input ).reserve( std::get<0>( out ) );
0196 }
0197
0198 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0199 template <typename InputTuple, typename OutputTuple>
0200 static inline bool reserve(InputTuple& my_input, OutputTuple& out, message_metainfo& metainfo) {
0201 message_metainfo element_metainfo;
0202 bool result = std::get<0>(my_input).reserve(std::get<0>(out), element_metainfo);
0203 metainfo.merge(element_metainfo);
0204 return result;
0205 }
0206 #endif
0207
0208 template<typename InputTuple, typename OutputTuple>
0209 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
0210 return std::get<0>(my_input).get_item(std::get<0>(out));
0211 }
0212
0213 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0214 template <typename InputTuple, typename OutputTuple>
0215 static inline bool get_my_item(InputTuple& my_input, OutputTuple& out, message_metainfo& metainfo) {
0216 message_metainfo element_metainfo;
0217 bool res = std::get<0>(my_input).get_item(std::get<0>(out), element_metainfo);
0218 metainfo.merge(element_metainfo);
0219 return res;
0220 }
0221 #endif
0222
0223 template<typename InputTuple, typename OutputTuple>
0224 static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
0225 return get_my_item(my_input, out);
0226 }
0227
0228 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0229 template <typename InputTuple, typename OutputTuple>
0230 static inline bool get_items(InputTuple& my_input, OutputTuple& out, message_metainfo& metainfo) {
0231 return get_my_item(my_input, out, metainfo);
0232 }
0233 #endif
0234
0235 template<typename InputTuple>
0236 static inline void reset_my_port(InputTuple &my_input) {
0237 std::get<0>(my_input).reset_port();
0238 }
0239
0240 template<typename InputTuple>
0241 static inline void reset_ports(InputTuple& my_input) {
0242 reset_my_port(my_input);
0243 }
0244
0245 template<typename InputTuple, typename KeyFuncTuple>
0246 static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
0247 std::get<0>(my_input).set_my_key_func(std::get<0>(my_key_funcs));
0248 std::get<0>(my_key_funcs) = nullptr;
0249 }
0250
0251 template< typename KeyFuncTuple>
0252 static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
0253 __TBB_ASSERT(
0254 std::get<0>(other_inputs).get_my_key_func(),
0255 "key matching join node should not be instantiated without functors."
0256 );
0257 std::get<0>(my_inputs).set_my_key_func(std::get<0>(other_inputs).get_my_key_func()->clone());
0258 }
0259 template<typename InputTuple>
0260 static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
0261 std::get<0>(my_input).reset_receiver(f);
0262 }
0263 };
0264
0265
0266 template< typename T >
0267 class reserving_port : public receiver<T> {
0268 public:
0269 typedef T input_type;
0270 typedef typename receiver<input_type>::predecessor_type predecessor_type;
0271
0272 private:
0273
0274 enum op_type { reg_pred, rem_pred, res_item, rel_res, con_res
0275 };
0276 typedef reserving_port<T> class_type;
0277
0278 class reserving_port_operation : public d1::aggregated_operation<reserving_port_operation> {
0279 public:
0280 char type;
0281 union {
0282 T *my_arg;
0283 predecessor_type *my_pred;
0284 };
0285 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0286 message_metainfo* metainfo;
0287 #endif
0288 reserving_port_operation(const T& e, op_type t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo& info)) :
0289 type(char(t)), my_arg(const_cast<T*>(&e))
0290 __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo(&info)) {}
0291 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0292 reserving_port_operation(const T& e, op_type t)
0293 : type(char(t)), my_arg(const_cast<T*>(&e)), metainfo(nullptr) {}
0294 #endif
0295 reserving_port_operation(const predecessor_type &s, op_type t) : type(char(t)),
0296 my_pred(const_cast<predecessor_type *>(&s)) {}
0297 reserving_port_operation(op_type t) : type(char(t)) {}
0298 };
0299
0300 typedef d1::aggregating_functor<class_type, reserving_port_operation> handler_type;
0301 friend class d1::aggregating_functor<class_type, reserving_port_operation>;
0302 d1::aggregator<handler_type, reserving_port_operation> my_aggregator;
0303
0304 void handle_operations(reserving_port_operation* op_list) {
0305 reserving_port_operation *current;
0306 bool was_missing_predecessors = false;
0307 while(op_list) {
0308 current = op_list;
0309 op_list = op_list->next;
0310 switch(current->type) {
0311 case reg_pred:
0312 was_missing_predecessors = my_predecessors.empty();
0313 my_predecessors.add(*(current->my_pred));
0314 if ( was_missing_predecessors ) {
0315 (void) my_join->decrement_port_count();
0316 }
0317 current->status.store( SUCCEEDED, std::memory_order_release);
0318 break;
0319 case rem_pred:
0320 if ( !my_predecessors.empty() ) {
0321 my_predecessors.remove(*(current->my_pred));
0322 if ( my_predecessors.empty() )
0323 my_join->increment_port_count();
0324 }
0325
0326 current->status.store( SUCCEEDED, std::memory_order_release );
0327 break;
0328 case res_item:
0329 if ( reserved ) {
0330 current->status.store( FAILED, std::memory_order_release);
0331 }
0332 else {
0333 bool reserve_result = false;
0334 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0335 if (current->metainfo) {
0336 reserve_result = my_predecessors.try_reserve(*(current->my_arg),
0337 *(current->metainfo));
0338 } else
0339 #endif
0340 {
0341 reserve_result = my_predecessors.try_reserve(*(current->my_arg));
0342 }
0343 if (reserve_result) {
0344 reserved = true;
0345 current->status.store( SUCCEEDED, std::memory_order_release);
0346 } else {
0347 if ( my_predecessors.empty() ) {
0348 my_join->increment_port_count();
0349 }
0350 current->status.store( FAILED, std::memory_order_release);
0351 }
0352 }
0353 break;
0354 case rel_res:
0355 reserved = false;
0356 my_predecessors.try_release( );
0357 current->status.store( SUCCEEDED, std::memory_order_release);
0358 break;
0359 case con_res:
0360 reserved = false;
0361 my_predecessors.try_consume( );
0362 current->status.store( SUCCEEDED, std::memory_order_release);
0363 break;
0364 }
0365 }
0366 }
0367
0368 protected:
0369 template< typename R, typename B > friend class run_and_put_task;
0370 template<typename X, typename Y> friend class broadcast_cache;
0371 template<typename X, typename Y> friend class round_robin_cache;
0372 graph_task* try_put_task( const T & ) override {
0373 return nullptr;
0374 }
0375
0376 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0377 graph_task* try_put_task(const T&, const message_metainfo&) override { return nullptr; }
0378 #endif
0379
0380 graph& graph_reference() const override {
0381 return my_join->graph_ref;
0382 }
0383
0384 public:
0385
0386
0387 reserving_port() : my_join(nullptr), my_predecessors(this), reserved(false) {
0388 my_aggregator.initialize_handler(handler_type(this));
0389 }
0390
0391
0392 reserving_port(const reserving_port& ) = delete;
0393
0394 void set_join_node_pointer(reserving_forwarding_base *join) {
0395 my_join = join;
0396 }
0397
0398
0399 bool register_predecessor( predecessor_type &src ) override {
0400 reserving_port_operation op_data(src, reg_pred);
0401 my_aggregator.execute(&op_data);
0402 return op_data.status == SUCCEEDED;
0403 }
0404
0405
0406 bool remove_predecessor( predecessor_type &src ) override {
0407 reserving_port_operation op_data(src, rem_pred);
0408 my_aggregator.execute(&op_data);
0409 return op_data.status == SUCCEEDED;
0410 }
0411
0412
0413 bool reserve( T &v ) {
0414 reserving_port_operation op_data(v, res_item);
0415 my_aggregator.execute(&op_data);
0416 return op_data.status == SUCCEEDED;
0417 }
0418
0419 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0420 bool reserve( T& v, message_metainfo& metainfo ) {
0421 reserving_port_operation op_data(v, res_item, metainfo);
0422 my_aggregator.execute(&op_data);
0423 return op_data.status == SUCCEEDED;
0424 }
0425 #endif
0426
0427
0428 void release( ) {
0429 reserving_port_operation op_data(rel_res);
0430 my_aggregator.execute(&op_data);
0431 }
0432
0433
0434 void consume( ) {
0435 reserving_port_operation op_data(con_res);
0436 my_aggregator.execute(&op_data);
0437 }
0438
0439 void reset_receiver( reset_flags f) {
0440 if(f & rf_clear_edges) my_predecessors.clear();
0441 else
0442 my_predecessors.reset();
0443 reserved = false;
0444 __TBB_ASSERT(!(f&rf_clear_edges) || my_predecessors.empty(), "port edges not removed");
0445 }
0446
0447 private:
0448 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0449 friend class get_graph_helper;
0450 #endif
0451
0452 reserving_forwarding_base *my_join;
0453 reservable_predecessor_cache< T, null_mutex > my_predecessors;
0454 bool reserved;
0455 };
0456
0457
0458 template<typename T>
0459 class queueing_port : public receiver<T>, public item_buffer<T> {
0460 public:
0461 typedef T input_type;
0462 typedef typename receiver<input_type>::predecessor_type predecessor_type;
0463 typedef queueing_port<T> class_type;
0464
0465
0466 private:
0467 enum op_type { get__item, res_port, try__put_task
0468 };
0469
0470 class queueing_port_operation : public d1::aggregated_operation<queueing_port_operation> {
0471 public:
0472 char type;
0473 T my_val;
0474 T* my_arg;
0475 graph_task* bypass_t;
0476 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0477 message_metainfo* metainfo;
0478 #endif
0479
0480 queueing_port_operation(const T& e, op_type t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& info))
0481 : type(char(t)), my_val(e), my_arg(nullptr)
0482 , bypass_t(nullptr)
0483 __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo(const_cast<message_metainfo*>(&info)))
0484 {}
0485
0486 queueing_port_operation(const T* p, op_type t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo& info)) :
0487 type(char(t)), my_arg(const_cast<T*>(p))
0488 , bypass_t(nullptr)
0489 __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo(&info))
0490 {}
0491 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0492 queueing_port_operation(const T* p, op_type t)
0493 : type(char(t)), my_arg(const_cast<T*>(p)), bypass_t(nullptr), metainfo(nullptr)
0494 {}
0495 #endif
0496
0497 queueing_port_operation(op_type t) : type(char(t)), my_arg(nullptr)
0498 , bypass_t(nullptr)
0499 __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo(nullptr))
0500 {}
0501 };
0502
0503 typedef d1::aggregating_functor<class_type, queueing_port_operation> handler_type;
0504 friend class d1::aggregating_functor<class_type, queueing_port_operation>;
0505 d1::aggregator<handler_type, queueing_port_operation> my_aggregator;
0506
0507 void handle_operations(queueing_port_operation* op_list) {
0508 queueing_port_operation *current;
0509 bool was_empty;
0510 while(op_list) {
0511 current = op_list;
0512 op_list = op_list->next;
0513 switch(current->type) {
0514 case try__put_task: {
0515 graph_task* rtask = nullptr;
0516 was_empty = this->buffer_empty();
0517 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0518 __TBB_ASSERT(current->metainfo, nullptr);
0519 this->push_back(current->my_val, *(current->metainfo));
0520 #else
0521 this->push_back(current->my_val);
0522 #endif
0523 if (was_empty) rtask = my_join->decrement_port_count(false);
0524 else
0525 rtask = SUCCESSFULLY_ENQUEUED;
0526 current->bypass_t = rtask;
0527 current->status.store( SUCCEEDED, std::memory_order_release);
0528 }
0529 break;
0530 case get__item:
0531 if(!this->buffer_empty()) {
0532 __TBB_ASSERT(current->my_arg, nullptr);
0533 *(current->my_arg) = this->front();
0534 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0535 if (current->metainfo) {
0536 *(current->metainfo) = this->front_metainfo();
0537 }
0538 #endif
0539 current->status.store( SUCCEEDED, std::memory_order_release);
0540 }
0541 else {
0542 current->status.store( FAILED, std::memory_order_release);
0543 }
0544 break;
0545 case res_port:
0546 __TBB_ASSERT(this->my_item_valid(this->my_head), "No item to reset");
0547 this->destroy_front();
0548 if(this->my_item_valid(this->my_head)) {
0549 (void)my_join->decrement_port_count(true);
0550 }
0551 current->status.store( SUCCEEDED, std::memory_order_release);
0552 break;
0553 }
0554 }
0555 }
0556
0557
0558 protected:
0559 template< typename R, typename B > friend class run_and_put_task;
0560 template<typename X, typename Y> friend class broadcast_cache;
0561 template<typename X, typename Y> friend class round_robin_cache;
0562
0563 private:
0564 graph_task* try_put_task_impl(const T& v __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) {
0565 queueing_port_operation op_data(v, try__put_task __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0566 my_aggregator.execute(&op_data);
0567 __TBB_ASSERT(op_data.status == SUCCEEDED || !op_data.bypass_t, "inconsistent return from aggregator");
0568 if(!op_data.bypass_t) return SUCCESSFULLY_ENQUEUED;
0569 return op_data.bypass_t;
0570 }
0571
0572 protected:
0573 graph_task* try_put_task(const T &v) override {
0574 return try_put_task_impl(v __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
0575 }
0576
0577 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0578 graph_task* try_put_task(const T& v, const message_metainfo& metainfo) override {
0579 return try_put_task_impl(v, metainfo);
0580 }
0581 #endif
0582
0583 graph& graph_reference() const override {
0584 return my_join->graph_ref;
0585 }
0586
0587 public:
0588
0589
0590 queueing_port() : item_buffer<T>() {
0591 my_join = nullptr;
0592 my_aggregator.initialize_handler(handler_type(this));
0593 }
0594
0595
0596 queueing_port(const queueing_port& ) = delete;
0597
0598
0599 void set_join_node_pointer(queueing_forwarding_base *join) {
0600 my_join = join;
0601 }
0602
0603 bool get_item( T &v ) {
0604 queueing_port_operation op_data(&v, get__item);
0605 my_aggregator.execute(&op_data);
0606 return op_data.status == SUCCEEDED;
0607 }
0608
0609 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0610 bool get_item( T& v, message_metainfo& metainfo ) {
0611 queueing_port_operation op_data(&v, get__item, metainfo);
0612 my_aggregator.execute(&op_data);
0613 return op_data.status == SUCCEEDED;
0614 }
0615 #endif
0616
0617
0618
0619 void reset_port() {
0620 queueing_port_operation op_data(res_port);
0621 my_aggregator.execute(&op_data);
0622 return;
0623 }
0624
0625 void reset_receiver(reset_flags) {
0626 item_buffer<T>::reset();
0627 }
0628
0629 private:
0630 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0631 friend class get_graph_helper;
0632 #endif
0633
0634 queueing_forwarding_base *my_join;
0635 };
0636
0637 #include "_flow_graph_tagged_buffer_impl.h"
0638
0639 template<typename K>
0640 struct count_element {
0641 K my_key;
0642 size_t my_value;
0643 };
0644
0645
0646
0647 template< typename K >
0648 struct key_to_count_functor {
0649 typedef count_element<K> table_item_type;
0650 const K& operator()(const table_item_type& v) { return v.my_key; }
0651 };
0652
0653 template <typename K, typename T, typename TtoK, typename KHash>
0654 struct key_matching_port_base {
0655 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0656 using type = metainfo_hash_buffer<K, T, TtoK, KHash>;
0657 #else
0658 using type = hash_buffer<K, T, TtoK, KHash>;
0659 #endif
0660 };
0661
0662
0663
0664 template< class TraitsType >
0665 class key_matching_port :
0666 public receiver<typename TraitsType::T>,
0667 public key_matching_port_base< typename TraitsType::K, typename TraitsType::T, typename TraitsType::TtoK,
0668 typename TraitsType::KHash >::type
0669 {
0670 public:
0671 typedef TraitsType traits;
0672 typedef key_matching_port<traits> class_type;
0673 typedef typename TraitsType::T input_type;
0674 typedef typename TraitsType::K key_type;
0675 typedef typename std::decay<key_type>::type noref_key_type;
0676 typedef typename receiver<input_type>::predecessor_type predecessor_type;
0677 typedef typename TraitsType::TtoK type_to_key_func_type;
0678 typedef typename TraitsType::KHash hash_compare_type;
0679 typedef typename key_matching_port_base<key_type, input_type, type_to_key_func_type, hash_compare_type>::type buffer_type;
0680
0681 private:
0682
0683 private:
0684 enum op_type { try__put, get__item, res_port
0685 };
0686
0687 class key_matching_port_operation : public d1::aggregated_operation<key_matching_port_operation> {
0688 public:
0689 char type;
0690 input_type my_val;
0691 input_type *my_arg;
0692 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0693 message_metainfo* metainfo = nullptr;
0694 #endif
0695
0696 key_matching_port_operation(const input_type& e, op_type t
0697 __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& info))
0698 : type(char(t)), my_val(e), my_arg(nullptr)
0699 __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo(const_cast<message_metainfo*>(&info))) {}
0700
0701
0702 key_matching_port_operation(const input_type* p, op_type t
0703 __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo& info))
0704 : type(char(t)), my_arg(const_cast<input_type*>(p))
0705 __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo(&info)) {}
0706
0707
0708 key_matching_port_operation(op_type t) : type(char(t)), my_arg(nullptr) {}
0709 };
0710
0711 typedef d1::aggregating_functor<class_type, key_matching_port_operation> handler_type;
0712 friend class d1::aggregating_functor<class_type, key_matching_port_operation>;
0713 d1::aggregator<handler_type, key_matching_port_operation> my_aggregator;
0714
0715 void handle_operations(key_matching_port_operation* op_list) {
0716 key_matching_port_operation *current;
0717 while(op_list) {
0718 current = op_list;
0719 op_list = op_list->next;
0720 switch(current->type) {
0721 case try__put: {
0722 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0723 __TBB_ASSERT(current->metainfo, nullptr);
0724 bool was_inserted = this->insert_with_key(current->my_val, *(current->metainfo));
0725 #else
0726 bool was_inserted = this->insert_with_key(current->my_val);
0727 #endif
0728
0729 current->status.store( was_inserted ? SUCCEEDED : FAILED, std::memory_order_release);
0730 }
0731 break;
0732 case get__item: {
0733
0734 __TBB_ASSERT(current->my_arg, nullptr);
0735 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0736 __TBB_ASSERT(current->metainfo, nullptr);
0737 bool find_result = this->find_with_key(my_join->current_key, *(current->my_arg),
0738 *(current->metainfo));
0739 #else
0740 bool find_result = this->find_with_key(my_join->current_key, *(current->my_arg));
0741 #endif
0742 #if TBB_USE_DEBUG
0743 if (!find_result) {
0744 __TBB_ASSERT(false, "Failed to find item corresponding to current_key.");
0745 }
0746 #else
0747 tbb::detail::suppress_unused_warning(find_result);
0748 #endif
0749 current->status.store( SUCCEEDED, std::memory_order_release);
0750 }
0751 break;
0752 case res_port:
0753
0754 this->delete_with_key(my_join->current_key);
0755 current->status.store( SUCCEEDED, std::memory_order_release);
0756 break;
0757 }
0758 }
0759 }
0760
0761 protected:
0762 template< typename R, typename B > friend class run_and_put_task;
0763 template<typename X, typename Y> friend class broadcast_cache;
0764 template<typename X, typename Y> friend class round_robin_cache;
0765 private:
0766 graph_task* try_put_task_impl(const input_type& v __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) {
0767 key_matching_port_operation op_data(v, try__put __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0768 graph_task* rtask = nullptr;
0769 my_aggregator.execute(&op_data);
0770 if(op_data.status == SUCCEEDED) {
0771 rtask = my_join->increment_key_count((*(this->get_key_func()))(v));
0772
0773 if(!rtask) rtask = SUCCESSFULLY_ENQUEUED;
0774 }
0775 return rtask;
0776 }
0777 protected:
0778 graph_task* try_put_task(const input_type& v) override {
0779 return try_put_task_impl(v __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
0780 }
0781
0782 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0783 graph_task* try_put_task(const input_type& v, const message_metainfo& metainfo) override {
0784 return try_put_task_impl(v, metainfo);
0785 }
0786 #endif
0787
0788 graph& graph_reference() const override {
0789 return my_join->graph_ref;
0790 }
0791
0792 public:
0793
0794 key_matching_port() : receiver<input_type>(), buffer_type() {
0795 my_join = nullptr;
0796 my_aggregator.initialize_handler(handler_type(this));
0797 }
0798
0799
0800 key_matching_port(const key_matching_port& ) = delete;
0801 #if __INTEL_COMPILER <= 2021
0802
0803
0804 virtual
0805 #endif
0806 ~key_matching_port() { }
0807
0808 void set_join_node_pointer(forwarding_base *join) {
0809 my_join = dynamic_cast<matching_forwarding_base<key_type>*>(join);
0810 }
0811
0812 void set_my_key_func(type_to_key_func_type *f) { this->set_key_func(f); }
0813
0814 type_to_key_func_type* get_my_key_func() { return this->get_key_func(); }
0815
0816 bool get_item( input_type &v ) {
0817
0818 key_matching_port_operation op_data(&v, get__item);
0819 my_aggregator.execute(&op_data);
0820 return op_data.status == SUCCEEDED;
0821 }
0822
0823 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0824 bool get_item( input_type& v, message_metainfo& metainfo ) {
0825
0826 key_matching_port_operation op_data(&v, get__item, metainfo);
0827 my_aggregator.execute(&op_data);
0828 return op_data.status == SUCCEEDED;
0829 }
0830 #endif
0831
0832
0833
0834 void reset_port() {
0835 key_matching_port_operation op_data(res_port);
0836 my_aggregator.execute(&op_data);
0837 return;
0838 }
0839
0840 void reset_receiver(reset_flags ) {
0841 buffer_type::reset();
0842 }
0843
0844 private:
0845
0846
0847 matching_forwarding_base<key_type> *my_join;
0848 };
0849
0850 using namespace graph_policy_namespace;
0851
0852 template<typename JP, typename InputTuple, typename OutputTuple>
0853 class join_node_base;
0854
0855
0856 template<typename JP, typename InputTuple, typename OutputTuple>
0857 class join_node_FE;
0858
0859 template<typename InputTuple, typename OutputTuple>
0860 class join_node_FE<reserving, InputTuple, OutputTuple> : public reserving_forwarding_base {
0861 private:
0862 static const int N = std::tuple_size<OutputTuple>::value;
0863 typedef OutputTuple output_type;
0864 typedef InputTuple input_type;
0865 typedef join_node_base<reserving, InputTuple, OutputTuple> base_node_type;
0866 public:
0867 join_node_FE(graph &g) : reserving_forwarding_base(g), my_node(nullptr) {
0868 ports_with_no_inputs = N;
0869 join_helper<N>::set_join_node_pointer(my_inputs, this);
0870 }
0871
0872 join_node_FE(const join_node_FE& other) : reserving_forwarding_base((other.reserving_forwarding_base::graph_ref)), my_node(nullptr) {
0873 ports_with_no_inputs = N;
0874 join_helper<N>::set_join_node_pointer(my_inputs, this);
0875 }
0876
0877 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
0878
0879 void increment_port_count() override {
0880 ++ports_with_no_inputs;
0881 }
0882
0883
0884 graph_task* decrement_port_count() override {
0885 if(ports_with_no_inputs.fetch_sub(1) == 1) {
0886 if(is_graph_active(this->graph_ref)) {
0887 d1::small_object_allocator allocator{};
0888 typedef forward_task_bypass<base_node_type> task_type;
0889 graph_task* t = allocator.new_object<task_type>(graph_ref, allocator, *my_node);
0890 spawn_in_graph_arena(this->graph_ref, *t);
0891 }
0892 }
0893 return nullptr;
0894 }
0895
0896 input_type &input_ports() { return my_inputs; }
0897
0898 protected:
0899
0900 void reset( reset_flags f) {
0901
0902 ports_with_no_inputs = N;
0903 join_helper<N>::reset_inputs(my_inputs, f);
0904 }
0905
0906
0907
0908 bool tuple_build_may_succeed() {
0909 return !ports_with_no_inputs;
0910 }
0911
0912 bool try_to_make_tuple(output_type &out) {
0913 if(ports_with_no_inputs) return false;
0914 return join_helper<N>::reserve(my_inputs, out);
0915 }
0916
0917 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0918 bool try_to_make_tuple(output_type &out, message_metainfo& metainfo) {
0919 if (ports_with_no_inputs) return false;
0920 return join_helper<N>::reserve(my_inputs, out, metainfo);
0921 }
0922 #endif
0923
0924 void tuple_accepted() {
0925 join_helper<N>::consume_reservations(my_inputs);
0926 }
0927 void tuple_rejected() {
0928 join_helper<N>::release_reservations(my_inputs);
0929 }
0930
0931 input_type my_inputs;
0932 base_node_type *my_node;
0933 std::atomic<std::size_t> ports_with_no_inputs;
0934 };
0935
0936 template<typename InputTuple, typename OutputTuple>
0937 class join_node_FE<queueing, InputTuple, OutputTuple> : public queueing_forwarding_base {
0938 public:
0939 static const int N = std::tuple_size<OutputTuple>::value;
0940 typedef OutputTuple output_type;
0941 typedef InputTuple input_type;
0942 typedef join_node_base<queueing, InputTuple, OutputTuple> base_node_type;
0943
0944 join_node_FE(graph &g) : queueing_forwarding_base(g), my_node(nullptr) {
0945 ports_with_no_items = N;
0946 join_helper<N>::set_join_node_pointer(my_inputs, this);
0947 }
0948
0949 join_node_FE(const join_node_FE& other) : queueing_forwarding_base((other.queueing_forwarding_base::graph_ref)), my_node(nullptr) {
0950 ports_with_no_items = N;
0951 join_helper<N>::set_join_node_pointer(my_inputs, this);
0952 }
0953
0954
0955 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
0956
0957 void reset_port_count() {
0958 ports_with_no_items = N;
0959 }
0960
0961
0962 graph_task* decrement_port_count(bool handle_task) override
0963 {
0964 if(ports_with_no_items.fetch_sub(1) == 1) {
0965 if(is_graph_active(this->graph_ref)) {
0966 d1::small_object_allocator allocator{};
0967 typedef forward_task_bypass<base_node_type> task_type;
0968 graph_task* t = allocator.new_object<task_type>(graph_ref, allocator, *my_node);
0969 if( !handle_task )
0970 return t;
0971 spawn_in_graph_arena(this->graph_ref, *t);
0972 }
0973 }
0974 return nullptr;
0975 }
0976
0977 input_type &input_ports() { return my_inputs; }
0978
0979 protected:
0980
0981 void reset( reset_flags f) {
0982 reset_port_count();
0983 join_helper<N>::reset_inputs(my_inputs, f );
0984 }
0985
0986
0987
0988 bool tuple_build_may_succeed() {
0989 return !ports_with_no_items;
0990 }
0991
0992 bool try_to_make_tuple(output_type &out) {
0993 if(ports_with_no_items) return false;
0994 return join_helper<N>::get_items(my_inputs, out);
0995 }
0996
0997 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0998 bool try_to_make_tuple(output_type &out, message_metainfo& metainfo) {
0999 if(ports_with_no_items) return false;
1000 return join_helper<N>::get_items(my_inputs, out, metainfo);
1001 }
1002 #endif
1003
1004 void tuple_accepted() {
1005 reset_port_count();
1006 join_helper<N>::reset_ports(my_inputs);
1007 }
1008 void tuple_rejected() {
1009
1010 }
1011
1012 input_type my_inputs;
1013 base_node_type *my_node;
1014 std::atomic<std::size_t> ports_with_no_items;
1015 };
1016
1017
1018 template<typename InputTuple, typename OutputTuple, typename K, typename KHash>
1019 class join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple> : public matching_forwarding_base<K>,
1020
1021 public hash_buffer<
1022 typename std::decay<K>::type&,
1023 count_element<typename std::decay<K>::type>,
1024 type_to_key_function_body<
1025 count_element<typename std::decay<K>::type>,
1026 typename std::decay<K>::type& >,
1027 KHash >,
1028
1029 public item_buffer<OutputTuple> {
1030 public:
1031 static const int N = std::tuple_size<OutputTuple>::value;
1032 typedef OutputTuple output_type;
1033 typedef InputTuple input_type;
1034 typedef K key_type;
1035 typedef typename std::decay<key_type>::type unref_key_type;
1036 typedef KHash key_hash_compare;
1037
1038 typedef count_element<unref_key_type> count_element_type;
1039
1040 typedef key_to_count_functor<unref_key_type> key_to_count_func;
1041 typedef type_to_key_function_body< count_element_type, unref_key_type&> TtoK_function_body_type;
1042 typedef type_to_key_function_body_leaf<count_element_type, unref_key_type&, key_to_count_func> TtoK_function_body_leaf_type;
1043
1044
1045 typedef hash_buffer< unref_key_type&, count_element_type, TtoK_function_body_type, key_hash_compare >
1046 key_to_count_buffer_type;
1047 typedef item_buffer<output_type> output_buffer_type;
1048 typedef join_node_base<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> base_node_type;
1049 typedef matching_forwarding_base<key_type> forwarding_base_type;
1050
1051
1052
1053
1054 private:
1055 enum op_type { res_count, inc_count, may_succeed, try_make };
1056 typedef join_node_FE<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> class_type;
1057
1058 class key_matching_FE_operation : public d1::aggregated_operation<key_matching_FE_operation> {
1059 public:
1060 char type;
1061 unref_key_type my_val;
1062 output_type* my_output;
1063 graph_task* bypass_t;
1064 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1065 message_metainfo* metainfo = nullptr;
1066 #endif
1067
1068 key_matching_FE_operation(const unref_key_type& e , op_type t) : type(char(t)), my_val(e),
1069 my_output(nullptr), bypass_t(nullptr) {}
1070 key_matching_FE_operation(output_type *p, op_type t) : type(char(t)), my_output(p), bypass_t(nullptr) {}
1071 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1072 key_matching_FE_operation(output_type *p, op_type t, message_metainfo& info)
1073 : type(char(t)), my_output(p), bypass_t(nullptr), metainfo(&info) {}
1074 #endif
1075
1076 key_matching_FE_operation(op_type t) : type(char(t)), my_output(nullptr), bypass_t(nullptr) {}
1077 };
1078
1079 typedef d1::aggregating_functor<class_type, key_matching_FE_operation> handler_type;
1080 friend class d1::aggregating_functor<class_type, key_matching_FE_operation>;
1081 d1::aggregator<handler_type, key_matching_FE_operation> my_aggregator;
1082
1083
1084
1085
1086 graph_task* fill_output_buffer(unref_key_type &t) {
1087 output_type l_out;
1088 graph_task* rtask = nullptr;
1089 bool do_fwd = this->buffer_empty() && is_graph_active(this->graph_ref);
1090 this->current_key = t;
1091 this->delete_with_key(this->current_key);
1092 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1093 message_metainfo metainfo;
1094 #endif
1095 if(join_helper<N>::get_items(my_inputs, l_out __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo))) {
1096 this->push_back(l_out __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
1097 if(do_fwd) {
1098 d1::small_object_allocator allocator{};
1099 typedef forward_task_bypass<base_node_type> task_type;
1100 rtask = allocator.new_object<task_type>(this->graph_ref, allocator, *my_node);
1101 do_fwd = false;
1102 }
1103
1104 join_helper<N>::reset_ports(my_inputs);
1105 }
1106 else {
1107 __TBB_ASSERT(false, "should have had something to push");
1108 }
1109 return rtask;
1110 }
1111
1112 void handle_operations(key_matching_FE_operation* op_list) {
1113 key_matching_FE_operation *current;
1114 while(op_list) {
1115 current = op_list;
1116 op_list = op_list->next;
1117 switch(current->type) {
1118 case res_count:
1119 {
1120 this->destroy_front();
1121 current->status.store( SUCCEEDED, std::memory_order_release);
1122 }
1123 break;
1124 case inc_count: {
1125 count_element_type *p = nullptr;
1126 unref_key_type &t = current->my_val;
1127 if(!(this->find_ref_with_key(t,p))) {
1128 count_element_type ev;
1129 ev.my_key = t;
1130 ev.my_value = 0;
1131 this->insert_with_key(ev);
1132 bool found = this->find_ref_with_key(t, p);
1133 __TBB_ASSERT_EX(found, "should find key after inserting it");
1134 }
1135 if(++(p->my_value) == size_t(N)) {
1136 current->bypass_t = fill_output_buffer(t);
1137 }
1138 }
1139 current->status.store( SUCCEEDED, std::memory_order_release);
1140 break;
1141 case may_succeed:
1142 current->status.store( this->buffer_empty() ? FAILED : SUCCEEDED, std::memory_order_release);
1143 break;
1144 case try_make:
1145 if(this->buffer_empty()) {
1146 current->status.store( FAILED, std::memory_order_release);
1147 }
1148 else {
1149 *(current->my_output) = this->front();
1150 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1151 if (current->metainfo) {
1152 *(current->metainfo) = this->front_metainfo();
1153 }
1154 #endif
1155 current->status.store( SUCCEEDED, std::memory_order_release);
1156 }
1157 break;
1158 }
1159 }
1160 }
1161
1162
1163 public:
1164 template<typename FunctionTuple>
1165 join_node_FE(graph &g, FunctionTuple &TtoK_funcs) : forwarding_base_type(g), my_node(nullptr) {
1166 join_helper<N>::set_join_node_pointer(my_inputs, this);
1167 join_helper<N>::set_key_functors(my_inputs, TtoK_funcs);
1168 my_aggregator.initialize_handler(handler_type(this));
1169 TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func());
1170 this->set_key_func(cfb);
1171 }
1172
1173 join_node_FE(const join_node_FE& other) : forwarding_base_type((other.forwarding_base_type::graph_ref)), key_to_count_buffer_type(),
1174 output_buffer_type() {
1175 my_node = nullptr;
1176 join_helper<N>::set_join_node_pointer(my_inputs, this);
1177 join_helper<N>::copy_key_functors(my_inputs, const_cast<input_type &>(other.my_inputs));
1178 my_aggregator.initialize_handler(handler_type(this));
1179 TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func());
1180 this->set_key_func(cfb);
1181 }
1182
1183
1184 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
1185
1186 void reset_port_count() {
1187 key_matching_FE_operation op_data(res_count);
1188 my_aggregator.execute(&op_data);
1189 return;
1190 }
1191
1192
1193
1194 graph_task *increment_key_count(unref_key_type const & t) override {
1195 key_matching_FE_operation op_data(t, inc_count);
1196 my_aggregator.execute(&op_data);
1197 return op_data.bypass_t;
1198 }
1199
1200 input_type &input_ports() { return my_inputs; }
1201
1202 protected:
1203
1204 void reset( reset_flags f ) {
1205
1206 join_helper<N>::reset_inputs(my_inputs, f);
1207
1208 key_to_count_buffer_type::reset();
1209 output_buffer_type::reset();
1210 }
1211
1212
1213
1214 bool tuple_build_may_succeed() {
1215 key_matching_FE_operation op_data(may_succeed);
1216 my_aggregator.execute(&op_data);
1217 return op_data.status == SUCCEEDED;
1218 }
1219
1220
1221
1222 bool try_to_make_tuple(output_type &out) {
1223 key_matching_FE_operation op_data(&out,try_make);
1224 my_aggregator.execute(&op_data);
1225 return op_data.status == SUCCEEDED;
1226 }
1227
1228 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1229 bool try_to_make_tuple(output_type &out, message_metainfo& metainfo) {
1230 key_matching_FE_operation op_data(&out, try_make, metainfo);
1231 my_aggregator.execute(&op_data);
1232 return op_data.status == SUCCEEDED;
1233 }
1234 #endif
1235
1236 void tuple_accepted() {
1237 reset_port_count();
1238 }
1239
1240 void tuple_rejected() {
1241
1242 }
1243
1244 input_type my_inputs;
1245 base_node_type *my_node;
1246 };
1247
1248
1249 template<typename JP, typename InputTuple, typename OutputTuple>
1250 class join_node_base : public graph_node, public join_node_FE<JP, InputTuple, OutputTuple>,
1251 public sender<OutputTuple> {
1252 protected:
1253 using graph_node::my_graph;
1254 public:
1255 typedef OutputTuple output_type;
1256
1257 typedef typename sender<output_type>::successor_type successor_type;
1258 typedef join_node_FE<JP, InputTuple, OutputTuple> input_ports_type;
1259 using input_ports_type::tuple_build_may_succeed;
1260 using input_ports_type::try_to_make_tuple;
1261 using input_ports_type::tuple_accepted;
1262 using input_ports_type::tuple_rejected;
1263
1264 private:
1265
1266 enum op_type { reg_succ, rem_succ, try__get, do_fwrd, do_fwrd_bypass
1267 };
1268 typedef join_node_base<JP,InputTuple,OutputTuple> class_type;
1269
1270 class join_node_base_operation : public d1::aggregated_operation<join_node_base_operation> {
1271 public:
1272 char type;
1273 union {
1274 output_type *my_arg;
1275 successor_type *my_succ;
1276 };
1277 graph_task* bypass_t;
1278 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1279 message_metainfo* metainfo;
1280 #endif
1281 join_node_base_operation(const output_type& e, op_type t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo& info))
1282 : type(char(t)), my_arg(const_cast<output_type*>(&e)), bypass_t(nullptr)
1283 __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo(&info)) {}
1284 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1285 join_node_base_operation(const output_type& e, op_type t)
1286 : type(char(t)), my_arg(const_cast<output_type*>(&e)), bypass_t(nullptr), metainfo(nullptr) {}
1287 #endif
1288 join_node_base_operation(const successor_type &s, op_type t) : type(char(t)),
1289 my_succ(const_cast<successor_type *>(&s)), bypass_t(nullptr) {}
1290 join_node_base_operation(op_type t) : type(char(t)), bypass_t(nullptr) {}
1291 };
1292
1293 typedef d1::aggregating_functor<class_type, join_node_base_operation> handler_type;
1294 friend class d1::aggregating_functor<class_type, join_node_base_operation>;
1295 bool forwarder_busy;
1296 d1::aggregator<handler_type, join_node_base_operation> my_aggregator;
1297
1298 void handle_operations(join_node_base_operation* op_list) {
1299 join_node_base_operation *current;
1300 while(op_list) {
1301 current = op_list;
1302 op_list = op_list->next;
1303 switch(current->type) {
1304 case reg_succ: {
1305 my_successors.register_successor(*(current->my_succ));
1306 if(tuple_build_may_succeed() && !forwarder_busy && is_graph_active(my_graph)) {
1307 d1::small_object_allocator allocator{};
1308 typedef forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> > task_type;
1309 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
1310 spawn_in_graph_arena(my_graph, *t);
1311 forwarder_busy = true;
1312 }
1313 current->status.store( SUCCEEDED, std::memory_order_release);
1314 }
1315 break;
1316 case rem_succ:
1317 my_successors.remove_successor(*(current->my_succ));
1318 current->status.store( SUCCEEDED, std::memory_order_release);
1319 break;
1320 case try__get:
1321 if(tuple_build_may_succeed()) {
1322 bool make_tuple_result = false;
1323 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1324 if (current->metainfo) {
1325 make_tuple_result = try_to_make_tuple(*(current->my_arg), *(current->metainfo));
1326 } else
1327 #endif
1328 {
1329 make_tuple_result = try_to_make_tuple(*(current->my_arg));
1330 }
1331 if(make_tuple_result) {
1332 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1333 if (current->metainfo) {
1334
1335
1336
1337 for (auto waiter : current->metainfo->waiters()) {
1338 waiter->reserve(1);
1339 }
1340 }
1341 #endif
1342 tuple_accepted();
1343 current->status.store( SUCCEEDED, std::memory_order_release);
1344 }
1345 else current->status.store( FAILED, std::memory_order_release);
1346 }
1347 else current->status.store( FAILED, std::memory_order_release);
1348 break;
1349 case do_fwrd_bypass: {
1350 bool build_succeeded;
1351 graph_task *last_task = nullptr;
1352 output_type out;
1353
1354
1355
1356
1357
1358
1359
1360 if(tuple_build_may_succeed()) {
1361 do {
1362 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1363 message_metainfo metainfo;
1364 #endif
1365
1366 build_succeeded = try_to_make_tuple(out __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
1367 if(build_succeeded) {
1368 graph_task *new_task =
1369 my_successors.try_put_task(out __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
1370 last_task = combine_tasks(my_graph, last_task, new_task);
1371 if(new_task) {
1372 tuple_accepted();
1373 }
1374 else {
1375 tuple_rejected();
1376 build_succeeded = false;
1377 }
1378 }
1379 } while(build_succeeded);
1380 }
1381 current->bypass_t = last_task;
1382 current->status.store( SUCCEEDED, std::memory_order_release);
1383 forwarder_busy = false;
1384 }
1385 break;
1386 }
1387 }
1388 }
1389
1390 public:
1391 join_node_base(graph &g)
1392 : graph_node(g), input_ports_type(g), forwarder_busy(false), my_successors(this)
1393 {
1394 input_ports_type::set_my_node(this);
1395 my_aggregator.initialize_handler(handler_type(this));
1396 }
1397
1398 join_node_base(const join_node_base& other) :
1399 graph_node(other.graph_node::my_graph), input_ports_type(other),
1400 sender<OutputTuple>(), forwarder_busy(false), my_successors(this)
1401 {
1402 input_ports_type::set_my_node(this);
1403 my_aggregator.initialize_handler(handler_type(this));
1404 }
1405
1406 template<typename FunctionTuple>
1407 join_node_base(graph &g, FunctionTuple f)
1408 : graph_node(g), input_ports_type(g, f), forwarder_busy(false), my_successors(this)
1409 {
1410 input_ports_type::set_my_node(this);
1411 my_aggregator.initialize_handler(handler_type(this));
1412 }
1413
1414 bool register_successor(successor_type &r) override {
1415 join_node_base_operation op_data(r, reg_succ);
1416 my_aggregator.execute(&op_data);
1417 return op_data.status == SUCCEEDED;
1418 }
1419
1420 bool remove_successor( successor_type &r) override {
1421 join_node_base_operation op_data(r, rem_succ);
1422 my_aggregator.execute(&op_data);
1423 return op_data.status == SUCCEEDED;
1424 }
1425
1426 bool try_get( output_type &v) override {
1427 join_node_base_operation op_data(v, try__get);
1428 my_aggregator.execute(&op_data);
1429 return op_data.status == SUCCEEDED;
1430 }
1431
1432 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
1433 bool try_get( output_type &v, message_metainfo& metainfo) override {
1434 join_node_base_operation op_data(v, try__get, metainfo);
1435 my_aggregator.execute(&op_data);
1436 return op_data.status == SUCCEEDED;
1437 }
1438 #endif
1439
1440 protected:
1441 void reset_node(reset_flags f) override {
1442 input_ports_type::reset(f);
1443 if(f & rf_clear_edges) my_successors.clear();
1444 }
1445
1446 private:
1447 broadcast_cache<output_type, null_rw_mutex> my_successors;
1448
1449 friend class forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> >;
1450 graph_task *forward_task() {
1451 join_node_base_operation op_data(do_fwrd_bypass);
1452 my_aggregator.execute(&op_data);
1453 return op_data.bypass_t;
1454 }
1455
1456 };
1457
1458
1459 template<int N, template<class> class PT, typename OutputTuple, typename JP>
1460 struct join_base {
1461 typedef join_node_base<JP, typename wrap_tuple_elements<N,PT,OutputTuple>::type, OutputTuple> type;
1462 };
1463
1464 template<int N, typename OutputTuple, typename K, typename KHash>
1465 struct join_base<N, key_matching_port, OutputTuple, key_matching<K,KHash> > {
1466 typedef key_matching<K, KHash> key_traits_type;
1467 typedef K key_type;
1468 typedef KHash key_hash_compare;
1469 typedef join_node_base< key_traits_type,
1470
1471 typename wrap_key_tuple_elements<N,key_matching_port,key_traits_type,OutputTuple>::type,
1472 OutputTuple > type;
1473 };
1474
1475
1476
1477
1478
1479 template<int M, template<class> class PT, typename OutputTuple, typename JP>
1480 class unfolded_join_node : public join_base<M,PT,OutputTuple,JP>::type {
1481 public:
1482 typedef typename wrap_tuple_elements<M, PT, OutputTuple>::type input_ports_type;
1483 typedef OutputTuple output_type;
1484 private:
1485 typedef join_node_base<JP, input_ports_type, output_type > base_type;
1486 public:
1487 unfolded_join_node(graph &g) : base_type(g) {}
1488 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1489 };
1490
1491 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1492 template <typename K, typename T>
1493 struct key_from_message_body {
1494 K operator()(const T& t) const {
1495 return key_from_message<K>(t);
1496 }
1497 };
1498
1499 template <typename K, typename T>
1500 struct key_from_message_body<K&,T> {
1501 const K& operator()(const T& t) const {
1502 return key_from_message<const K&>(t);
1503 }
1504 };
1505 #endif
1506
1507
1508
1509 template<typename OutputTuple, typename K, typename KHash>
1510 class unfolded_join_node<2,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1511 join_base<2,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1512 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1513 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1514 public:
1515 typedef typename wrap_key_tuple_elements<2,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1516 typedef OutputTuple output_type;
1517 private:
1518 typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1519 typedef type_to_key_function_body<T0, K> *f0_p;
1520 typedef type_to_key_function_body<T1, K> *f1_p;
1521 typedef std::tuple< f0_p, f1_p > func_initializer_type;
1522 public:
1523 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1524 unfolded_join_node(graph &g) : base_type(g,
1525 func_initializer_type(
1526 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1527 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>())
1528 ) ) {
1529 }
1530 #endif
1531 template<typename Body0, typename Body1>
1532 unfolded_join_node(graph &g, Body0 body0, Body1 body1) : base_type(g,
1533 func_initializer_type(
1534 new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1535 new type_to_key_function_body_leaf<T1, K, Body1>(body1)
1536 ) ) {
1537 static_assert(std::tuple_size<OutputTuple>::value == 2, "wrong number of body initializers");
1538 }
1539 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1540 };
1541
1542 template<typename OutputTuple, typename K, typename KHash>
1543 class unfolded_join_node<3,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1544 join_base<3,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1545 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1546 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1547 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1548 public:
1549 typedef typename wrap_key_tuple_elements<3,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1550 typedef OutputTuple output_type;
1551 private:
1552 typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1553 typedef type_to_key_function_body<T0, K> *f0_p;
1554 typedef type_to_key_function_body<T1, K> *f1_p;
1555 typedef type_to_key_function_body<T2, K> *f2_p;
1556 typedef std::tuple< f0_p, f1_p, f2_p > func_initializer_type;
1557 public:
1558 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1559 unfolded_join_node(graph &g) : base_type(g,
1560 func_initializer_type(
1561 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1562 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1563 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>())
1564 ) ) {
1565 }
1566 #endif
1567 template<typename Body0, typename Body1, typename Body2>
1568 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2) : base_type(g,
1569 func_initializer_type(
1570 new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1571 new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1572 new type_to_key_function_body_leaf<T2, K, Body2>(body2)
1573 ) ) {
1574 static_assert(std::tuple_size<OutputTuple>::value == 3, "wrong number of body initializers");
1575 }
1576 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1577 };
1578
1579 template<typename OutputTuple, typename K, typename KHash>
1580 class unfolded_join_node<4,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1581 join_base<4,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1582 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1583 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1584 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1585 typedef typename std::tuple_element<3, OutputTuple>::type T3;
1586 public:
1587 typedef typename wrap_key_tuple_elements<4,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1588 typedef OutputTuple output_type;
1589 private:
1590 typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1591 typedef type_to_key_function_body<T0, K> *f0_p;
1592 typedef type_to_key_function_body<T1, K> *f1_p;
1593 typedef type_to_key_function_body<T2, K> *f2_p;
1594 typedef type_to_key_function_body<T3, K> *f3_p;
1595 typedef std::tuple< f0_p, f1_p, f2_p, f3_p > func_initializer_type;
1596 public:
1597 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1598 unfolded_join_node(graph &g) : base_type(g,
1599 func_initializer_type(
1600 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1601 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1602 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1603 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>())
1604 ) ) {
1605 }
1606 #endif
1607 template<typename Body0, typename Body1, typename Body2, typename Body3>
1608 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3) : base_type(g,
1609 func_initializer_type(
1610 new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1611 new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1612 new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1613 new type_to_key_function_body_leaf<T3, K, Body3>(body3)
1614 ) ) {
1615 static_assert(std::tuple_size<OutputTuple>::value == 4, "wrong number of body initializers");
1616 }
1617 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1618 };
1619
1620 template<typename OutputTuple, typename K, typename KHash>
1621 class unfolded_join_node<5,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1622 join_base<5,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1623 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1624 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1625 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1626 typedef typename std::tuple_element<3, OutputTuple>::type T3;
1627 typedef typename std::tuple_element<4, OutputTuple>::type T4;
1628 public:
1629 typedef typename wrap_key_tuple_elements<5,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1630 typedef OutputTuple output_type;
1631 private:
1632 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1633 typedef type_to_key_function_body<T0, K> *f0_p;
1634 typedef type_to_key_function_body<T1, K> *f1_p;
1635 typedef type_to_key_function_body<T2, K> *f2_p;
1636 typedef type_to_key_function_body<T3, K> *f3_p;
1637 typedef type_to_key_function_body<T4, K> *f4_p;
1638 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p > func_initializer_type;
1639 public:
1640 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1641 unfolded_join_node(graph &g) : base_type(g,
1642 func_initializer_type(
1643 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1644 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1645 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1646 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1647 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>())
1648 ) ) {
1649 }
1650 #endif
1651 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4>
1652 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4) : base_type(g,
1653 func_initializer_type(
1654 new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1655 new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1656 new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1657 new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1658 new type_to_key_function_body_leaf<T4, K, Body4>(body4)
1659 ) ) {
1660 static_assert(std::tuple_size<OutputTuple>::value == 5, "wrong number of body initializers");
1661 }
1662 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1663 };
1664
1665 #if __TBB_VARIADIC_MAX >= 6
1666 template<typename OutputTuple, typename K, typename KHash>
1667 class unfolded_join_node<6,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1668 join_base<6,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1669 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1670 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1671 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1672 typedef typename std::tuple_element<3, OutputTuple>::type T3;
1673 typedef typename std::tuple_element<4, OutputTuple>::type T4;
1674 typedef typename std::tuple_element<5, OutputTuple>::type T5;
1675 public:
1676 typedef typename wrap_key_tuple_elements<6,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1677 typedef OutputTuple output_type;
1678 private:
1679 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1680 typedef type_to_key_function_body<T0, K> *f0_p;
1681 typedef type_to_key_function_body<T1, K> *f1_p;
1682 typedef type_to_key_function_body<T2, K> *f2_p;
1683 typedef type_to_key_function_body<T3, K> *f3_p;
1684 typedef type_to_key_function_body<T4, K> *f4_p;
1685 typedef type_to_key_function_body<T5, K> *f5_p;
1686 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p > func_initializer_type;
1687 public:
1688 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1689 unfolded_join_node(graph &g) : base_type(g,
1690 func_initializer_type(
1691 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1692 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1693 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1694 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1695 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1696 new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>())
1697 ) ) {
1698 }
1699 #endif
1700 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, typename Body5>
1701 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, Body5 body5)
1702 : base_type(g, func_initializer_type(
1703 new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1704 new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1705 new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1706 new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1707 new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1708 new type_to_key_function_body_leaf<T5, K, Body5>(body5)
1709 ) ) {
1710 static_assert(std::tuple_size<OutputTuple>::value == 6, "wrong number of body initializers");
1711 }
1712 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1713 };
1714 #endif
1715
1716 #if __TBB_VARIADIC_MAX >= 7
1717 template<typename OutputTuple, typename K, typename KHash>
1718 class unfolded_join_node<7,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1719 join_base<7,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1720 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1721 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1722 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1723 typedef typename std::tuple_element<3, OutputTuple>::type T3;
1724 typedef typename std::tuple_element<4, OutputTuple>::type T4;
1725 typedef typename std::tuple_element<5, OutputTuple>::type T5;
1726 typedef typename std::tuple_element<6, OutputTuple>::type T6;
1727 public:
1728 typedef typename wrap_key_tuple_elements<7,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1729 typedef OutputTuple output_type;
1730 private:
1731 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1732 typedef type_to_key_function_body<T0, K> *f0_p;
1733 typedef type_to_key_function_body<T1, K> *f1_p;
1734 typedef type_to_key_function_body<T2, K> *f2_p;
1735 typedef type_to_key_function_body<T3, K> *f3_p;
1736 typedef type_to_key_function_body<T4, K> *f4_p;
1737 typedef type_to_key_function_body<T5, K> *f5_p;
1738 typedef type_to_key_function_body<T6, K> *f6_p;
1739 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p > func_initializer_type;
1740 public:
1741 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1742 unfolded_join_node(graph &g) : base_type(g,
1743 func_initializer_type(
1744 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1745 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1746 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1747 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1748 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1749 new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1750 new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>())
1751 ) ) {
1752 }
1753 #endif
1754 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1755 typename Body5, typename Body6>
1756 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1757 Body5 body5, Body6 body6) : base_type(g, func_initializer_type(
1758 new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1759 new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1760 new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1761 new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1762 new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1763 new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1764 new type_to_key_function_body_leaf<T6, K, Body6>(body6)
1765 ) ) {
1766 static_assert(std::tuple_size<OutputTuple>::value == 7, "wrong number of body initializers");
1767 }
1768 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1769 };
1770 #endif
1771
1772 #if __TBB_VARIADIC_MAX >= 8
1773 template<typename OutputTuple, typename K, typename KHash>
1774 class unfolded_join_node<8,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1775 join_base<8,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1776 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1777 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1778 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1779 typedef typename std::tuple_element<3, OutputTuple>::type T3;
1780 typedef typename std::tuple_element<4, OutputTuple>::type T4;
1781 typedef typename std::tuple_element<5, OutputTuple>::type T5;
1782 typedef typename std::tuple_element<6, OutputTuple>::type T6;
1783 typedef typename std::tuple_element<7, OutputTuple>::type T7;
1784 public:
1785 typedef typename wrap_key_tuple_elements<8,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1786 typedef OutputTuple output_type;
1787 private:
1788 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1789 typedef type_to_key_function_body<T0, K> *f0_p;
1790 typedef type_to_key_function_body<T1, K> *f1_p;
1791 typedef type_to_key_function_body<T2, K> *f2_p;
1792 typedef type_to_key_function_body<T3, K> *f3_p;
1793 typedef type_to_key_function_body<T4, K> *f4_p;
1794 typedef type_to_key_function_body<T5, K> *f5_p;
1795 typedef type_to_key_function_body<T6, K> *f6_p;
1796 typedef type_to_key_function_body<T7, K> *f7_p;
1797 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p > func_initializer_type;
1798 public:
1799 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1800 unfolded_join_node(graph &g) : base_type(g,
1801 func_initializer_type(
1802 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1803 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1804 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1805 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1806 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1807 new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1808 new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1809 new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>())
1810 ) ) {
1811 }
1812 #endif
1813 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1814 typename Body5, typename Body6, typename Body7>
1815 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1816 Body5 body5, Body6 body6, Body7 body7) : base_type(g, func_initializer_type(
1817 new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1818 new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1819 new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1820 new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1821 new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1822 new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1823 new type_to_key_function_body_leaf<T6, K, Body6>(body6),
1824 new type_to_key_function_body_leaf<T7, K, Body7>(body7)
1825 ) ) {
1826 static_assert(std::tuple_size<OutputTuple>::value == 8, "wrong number of body initializers");
1827 }
1828 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1829 };
1830 #endif
1831
1832 #if __TBB_VARIADIC_MAX >= 9
1833 template<typename OutputTuple, typename K, typename KHash>
1834 class unfolded_join_node<9,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1835 join_base<9,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1836 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1837 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1838 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1839 typedef typename std::tuple_element<3, OutputTuple>::type T3;
1840 typedef typename std::tuple_element<4, OutputTuple>::type T4;
1841 typedef typename std::tuple_element<5, OutputTuple>::type T5;
1842 typedef typename std::tuple_element<6, OutputTuple>::type T6;
1843 typedef typename std::tuple_element<7, OutputTuple>::type T7;
1844 typedef typename std::tuple_element<8, OutputTuple>::type T8;
1845 public:
1846 typedef typename wrap_key_tuple_elements<9,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1847 typedef OutputTuple output_type;
1848 private:
1849 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1850 typedef type_to_key_function_body<T0, K> *f0_p;
1851 typedef type_to_key_function_body<T1, K> *f1_p;
1852 typedef type_to_key_function_body<T2, K> *f2_p;
1853 typedef type_to_key_function_body<T3, K> *f3_p;
1854 typedef type_to_key_function_body<T4, K> *f4_p;
1855 typedef type_to_key_function_body<T5, K> *f5_p;
1856 typedef type_to_key_function_body<T6, K> *f6_p;
1857 typedef type_to_key_function_body<T7, K> *f7_p;
1858 typedef type_to_key_function_body<T8, K> *f8_p;
1859 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p > func_initializer_type;
1860 public:
1861 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1862 unfolded_join_node(graph &g) : base_type(g,
1863 func_initializer_type(
1864 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1865 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1866 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1867 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1868 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1869 new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1870 new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1871 new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1872 new type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>())
1873 ) ) {
1874 }
1875 #endif
1876 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1877 typename Body5, typename Body6, typename Body7, typename Body8>
1878 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1879 Body5 body5, Body6 body6, Body7 body7, Body8 body8) : base_type(g, func_initializer_type(
1880 new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1881 new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1882 new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1883 new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1884 new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1885 new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1886 new type_to_key_function_body_leaf<T6, K, Body6>(body6),
1887 new type_to_key_function_body_leaf<T7, K, Body7>(body7),
1888 new type_to_key_function_body_leaf<T8, K, Body8>(body8)
1889 ) ) {
1890 static_assert(std::tuple_size<OutputTuple>::value == 9, "wrong number of body initializers");
1891 }
1892 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1893 };
1894 #endif
1895
1896 #if __TBB_VARIADIC_MAX >= 10
1897 template<typename OutputTuple, typename K, typename KHash>
1898 class unfolded_join_node<10,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1899 join_base<10,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1900 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1901 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1902 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1903 typedef typename std::tuple_element<3, OutputTuple>::type T3;
1904 typedef typename std::tuple_element<4, OutputTuple>::type T4;
1905 typedef typename std::tuple_element<5, OutputTuple>::type T5;
1906 typedef typename std::tuple_element<6, OutputTuple>::type T6;
1907 typedef typename std::tuple_element<7, OutputTuple>::type T7;
1908 typedef typename std::tuple_element<8, OutputTuple>::type T8;
1909 typedef typename std::tuple_element<9, OutputTuple>::type T9;
1910 public:
1911 typedef typename wrap_key_tuple_elements<10,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1912 typedef OutputTuple output_type;
1913 private:
1914 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1915 typedef type_to_key_function_body<T0, K> *f0_p;
1916 typedef type_to_key_function_body<T1, K> *f1_p;
1917 typedef type_to_key_function_body<T2, K> *f2_p;
1918 typedef type_to_key_function_body<T3, K> *f3_p;
1919 typedef type_to_key_function_body<T4, K> *f4_p;
1920 typedef type_to_key_function_body<T5, K> *f5_p;
1921 typedef type_to_key_function_body<T6, K> *f6_p;
1922 typedef type_to_key_function_body<T7, K> *f7_p;
1923 typedef type_to_key_function_body<T8, K> *f8_p;
1924 typedef type_to_key_function_body<T9, K> *f9_p;
1925 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p, f9_p > func_initializer_type;
1926 public:
1927 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1928 unfolded_join_node(graph &g) : base_type(g,
1929 func_initializer_type(
1930 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1931 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1932 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1933 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1934 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1935 new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1936 new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1937 new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1938 new type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>()),
1939 new type_to_key_function_body_leaf<T9, K, key_from_message_body<K,T9> >(key_from_message_body<K,T9>())
1940 ) ) {
1941 }
1942 #endif
1943 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1944 typename Body5, typename Body6, typename Body7, typename Body8, typename Body9>
1945 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1946 Body5 body5, Body6 body6, Body7 body7, Body8 body8, Body9 body9) : base_type(g, func_initializer_type(
1947 new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1948 new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1949 new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1950 new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1951 new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1952 new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1953 new type_to_key_function_body_leaf<T6, K, Body6>(body6),
1954 new type_to_key_function_body_leaf<T7, K, Body7>(body7),
1955 new type_to_key_function_body_leaf<T8, K, Body8>(body8),
1956 new type_to_key_function_body_leaf<T9, K, Body9>(body9)
1957 ) ) {
1958 static_assert(std::tuple_size<OutputTuple>::value == 10, "wrong number of body initializers");
1959 }
1960 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1961 };
1962 #endif
1963
1964
1965 template<size_t N, typename JNT>
1966 typename std::tuple_element<N, typename JNT::input_ports_type>::type &input_port(JNT &jn) {
1967 return std::get<N>(jn.input_ports());
1968 }
1969
1970 #endif