File indexing completed on 2025-01-18 10:12:48
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB__flow_graph_node_impl_H
0018 #define __TBB__flow_graph_node_impl_H
0019
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023
0024 #include "_flow_graph_item_buffer_impl.h"
0025
0026
0027 namespace internal {
0028
0029 using tbb::internal::aggregated_operation;
0030 using tbb::internal::aggregating_functor;
0031 using tbb::internal::aggregator;
0032
0033 template< typename T, typename A >
0034 class function_input_queue : public item_buffer<T,A> {
0035 public:
0036 bool empty() const {
0037 return this->buffer_empty();
0038 }
0039
0040 const T& front() const {
0041 return this->item_buffer<T, A>::front();
0042 }
0043
0044 bool pop( T& t ) {
0045 return this->pop_front( t );
0046 }
0047
0048 void pop() {
0049 this->destroy_front();
0050 }
0051
0052 bool push( T& t ) {
0053 return this->push_back( t );
0054 }
0055 };
0056
0057
0058
0059
0060 template< typename Input, typename Policy, typename A, typename ImplType >
0061 class function_input_base : public receiver<Input>, tbb::internal::no_assign {
0062 enum op_type {reg_pred, rem_pred, try_fwd, tryput_bypass, app_body_bypass, occupy_concurrency
0063 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0064 , add_blt_pred, del_blt_pred,
0065 blt_pred_cnt, blt_pred_cpy
0066 #endif
0067 };
0068 typedef function_input_base<Input, Policy, A, ImplType> class_type;
0069
0070 public:
0071
0072
0073 typedef Input input_type;
0074 typedef typename receiver<input_type>::predecessor_type predecessor_type;
0075 typedef predecessor_cache<input_type, null_mutex > predecessor_cache_type;
0076 typedef function_input_queue<input_type, A> input_queue_type;
0077 typedef typename tbb::internal::allocator_rebind<A, input_queue_type>::type queue_allocator_type;
0078 __TBB_STATIC_ASSERT(!((internal::has_policy<queueing, Policy>::value) && (internal::has_policy<rejecting, Policy>::value)),
0079 "queueing and rejecting policies can't be specified simultaneously");
0080
0081 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0082 typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
0083 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
0084 #endif
0085
0086
0087 function_input_base(
0088 graph &g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(size_t max_concurrency, node_priority_t priority)
0089 ) : my_graph_ref(g), my_max_concurrency(max_concurrency)
0090 , __TBB_FLOW_GRAPH_PRIORITY_ARG1(my_concurrency(0), my_priority(priority))
0091 , my_queue(!internal::has_policy<rejecting, Policy>::value ? new input_queue_type() : NULL)
0092 , forwarder_busy(false)
0093 {
0094 my_predecessors.set_owner(this);
0095 my_aggregator.initialize_handler(handler_type(this));
0096 }
0097
0098
0099 function_input_base( const function_input_base& src)
0100 : receiver<Input>(), tbb::internal::no_assign()
0101 , my_graph_ref(src.my_graph_ref), my_max_concurrency(src.my_max_concurrency)
0102 , __TBB_FLOW_GRAPH_PRIORITY_ARG1(my_concurrency(0), my_priority(src.my_priority))
0103 , my_queue(src.my_queue ? new input_queue_type() : NULL), forwarder_busy(false)
0104 {
0105 my_predecessors.set_owner(this);
0106 my_aggregator.initialize_handler(handler_type(this));
0107 }
0108
0109
0110
0111
0112
0113 virtual ~function_input_base() {
0114 if ( my_queue ) delete my_queue;
0115 }
0116
0117 task* try_put_task( const input_type& t) __TBB_override {
0118 return try_put_task_impl(t, internal::has_policy<lightweight, Policy>());
0119 }
0120
0121
0122 bool register_predecessor( predecessor_type &src ) __TBB_override {
0123 operation_type op_data(reg_pred);
0124 op_data.r = &src;
0125 my_aggregator.execute(&op_data);
0126 return true;
0127 }
0128
0129
0130 bool remove_predecessor( predecessor_type &src ) __TBB_override {
0131 operation_type op_data(rem_pred);
0132 op_data.r = &src;
0133 my_aggregator.execute(&op_data);
0134 return true;
0135 }
0136
0137 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0138
0139 void internal_add_built_predecessor( predecessor_type &src) __TBB_override {
0140 operation_type op_data(add_blt_pred);
0141 op_data.r = &src;
0142 my_aggregator.execute(&op_data);
0143 }
0144
0145
0146 void internal_delete_built_predecessor( predecessor_type &src) __TBB_override {
0147 operation_type op_data(del_blt_pred);
0148 op_data.r = &src;
0149 my_aggregator.execute(&op_data);
0150 }
0151
0152 size_t predecessor_count() __TBB_override {
0153 operation_type op_data(blt_pred_cnt);
0154 my_aggregator.execute(&op_data);
0155 return op_data.cnt_val;
0156 }
0157
0158 void copy_predecessors(predecessor_list_type &v) __TBB_override {
0159 operation_type op_data(blt_pred_cpy);
0160 op_data.predv = &v;
0161 my_aggregator.execute(&op_data);
0162 }
0163
0164 built_predecessors_type &built_predecessors() __TBB_override {
0165 return my_predecessors.built_predecessors();
0166 }
0167 #endif
0168
0169 protected:
0170
0171 void reset_function_input_base( reset_flags f) {
0172 my_concurrency = 0;
0173 if(my_queue) {
0174 my_queue->reset();
0175 }
0176 reset_receiver(f);
0177 forwarder_busy = false;
0178 }
0179
0180 graph& my_graph_ref;
0181 const size_t my_max_concurrency;
0182 size_t my_concurrency;
0183 __TBB_FLOW_GRAPH_PRIORITY_EXPR( node_priority_t my_priority; )
0184 input_queue_type *my_queue;
0185 predecessor_cache<input_type, null_mutex > my_predecessors;
0186
0187 void reset_receiver( reset_flags f) __TBB_override {
0188 if( f & rf_clear_edges) my_predecessors.clear();
0189 else
0190 my_predecessors.reset();
0191 __TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.empty(), "function_input_base reset failed");
0192 }
0193
0194 graph& graph_reference() const __TBB_override {
0195 return my_graph_ref;
0196 }
0197
0198 task* try_get_postponed_task(const input_type& i) {
0199 operation_type op_data(i, app_body_bypass);
0200 my_aggregator.execute(&op_data);
0201 return op_data.bypass_t;
0202 }
0203
0204 private:
0205
0206 friend class apply_body_task_bypass< class_type, input_type >;
0207 friend class forward_task_bypass< class_type >;
0208
0209 class operation_type : public aggregated_operation< operation_type > {
0210 public:
0211 char type;
0212 union {
0213 input_type *elem;
0214 predecessor_type *r;
0215 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0216 size_t cnt_val;
0217 predecessor_list_type *predv;
0218 #endif
0219 };
0220 tbb::task *bypass_t;
0221 operation_type(const input_type& e, op_type t) :
0222 type(char(t)), elem(const_cast<input_type*>(&e)) {}
0223 operation_type(op_type t) : type(char(t)), r(NULL) {}
0224 };
0225
0226 bool forwarder_busy;
0227 typedef internal::aggregating_functor<class_type, operation_type> handler_type;
0228 friend class internal::aggregating_functor<class_type, operation_type>;
0229 aggregator< handler_type, operation_type > my_aggregator;
0230
0231 task* perform_queued_requests() {
0232 task* new_task = NULL;
0233 if(my_queue) {
0234 if(!my_queue->empty()) {
0235 ++my_concurrency;
0236 new_task = create_body_task(my_queue->front());
0237
0238 my_queue->pop();
0239 }
0240 }
0241 else {
0242 input_type i;
0243 if(my_predecessors.get_item(i)) {
0244 ++my_concurrency;
0245 new_task = create_body_task(i);
0246 }
0247 }
0248 return new_task;
0249 }
0250 void handle_operations(operation_type *op_list) {
0251 operation_type *tmp;
0252 while (op_list) {
0253 tmp = op_list;
0254 op_list = op_list->next;
0255 switch (tmp->type) {
0256 case reg_pred:
0257 my_predecessors.add(*(tmp->r));
0258 __TBB_store_with_release(tmp->status, SUCCEEDED);
0259 if (!forwarder_busy) {
0260 forwarder_busy = true;
0261 spawn_forward_task();
0262 }
0263 break;
0264 case rem_pred:
0265 my_predecessors.remove(*(tmp->r));
0266 __TBB_store_with_release(tmp->status, SUCCEEDED);
0267 break;
0268 case app_body_bypass: {
0269 tmp->bypass_t = NULL;
0270 __TBB_ASSERT(my_max_concurrency != 0, NULL);
0271 --my_concurrency;
0272 if(my_concurrency<my_max_concurrency)
0273 tmp->bypass_t = perform_queued_requests();
0274
0275 __TBB_store_with_release(tmp->status, SUCCEEDED);
0276 }
0277 break;
0278 case tryput_bypass: internal_try_put_task(tmp); break;
0279 case try_fwd: internal_forward(tmp); break;
0280 case occupy_concurrency:
0281 if (my_concurrency < my_max_concurrency) {
0282 ++my_concurrency;
0283 __TBB_store_with_release(tmp->status, SUCCEEDED);
0284 } else {
0285 __TBB_store_with_release(tmp->status, FAILED);
0286 }
0287 break;
0288 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0289 case add_blt_pred: {
0290 my_predecessors.internal_add_built_predecessor(*(tmp->r));
0291 __TBB_store_with_release(tmp->status, SUCCEEDED);
0292 }
0293 break;
0294 case del_blt_pred:
0295 my_predecessors.internal_delete_built_predecessor(*(tmp->r));
0296 __TBB_store_with_release(tmp->status, SUCCEEDED);
0297 break;
0298 case blt_pred_cnt:
0299 tmp->cnt_val = my_predecessors.predecessor_count();
0300 __TBB_store_with_release(tmp->status, SUCCEEDED);
0301 break;
0302 case blt_pred_cpy:
0303 my_predecessors.copy_predecessors( *(tmp->predv) );
0304 __TBB_store_with_release(tmp->status, SUCCEEDED);
0305 break;
0306 #endif
0307 }
0308 }
0309 }
0310
0311
0312 void internal_try_put_task(operation_type *op) {
0313 __TBB_ASSERT(my_max_concurrency != 0, NULL);
0314 if (my_concurrency < my_max_concurrency) {
0315 ++my_concurrency;
0316 task * new_task = create_body_task(*(op->elem));
0317 op->bypass_t = new_task;
0318 __TBB_store_with_release(op->status, SUCCEEDED);
0319 } else if ( my_queue && my_queue->push(*(op->elem)) ) {
0320 op->bypass_t = SUCCESSFULLY_ENQUEUED;
0321 __TBB_store_with_release(op->status, SUCCEEDED);
0322 } else {
0323 op->bypass_t = NULL;
0324 __TBB_store_with_release(op->status, FAILED);
0325 }
0326 }
0327
0328
0329 void internal_forward(operation_type *op) {
0330 op->bypass_t = NULL;
0331 if (my_concurrency < my_max_concurrency || !my_max_concurrency)
0332 op->bypass_t = perform_queued_requests();
0333 if(op->bypass_t)
0334 __TBB_store_with_release(op->status, SUCCEEDED);
0335 else {
0336 forwarder_busy = false;
0337 __TBB_store_with_release(op->status, FAILED);
0338 }
0339 }
0340
0341 task* internal_try_put_bypass( const input_type& t ) {
0342 operation_type op_data(t, tryput_bypass);
0343 my_aggregator.execute(&op_data);
0344 if( op_data.status == internal::SUCCEEDED ) {
0345 return op_data.bypass_t;
0346 }
0347 return NULL;
0348 }
0349
0350 task* try_put_task_impl( const input_type& t, tbb::internal::true_type ) {
0351 if( my_max_concurrency == 0 ) {
0352 return apply_body_bypass(t);
0353 } else {
0354 operation_type check_op(t, occupy_concurrency);
0355 my_aggregator.execute(&check_op);
0356 if( check_op.status == internal::SUCCEEDED ) {
0357 return apply_body_bypass(t);
0358 }
0359 return internal_try_put_bypass(t);
0360 }
0361 }
0362
0363 task* try_put_task_impl( const input_type& t, tbb::internal::false_type ) {
0364 if( my_max_concurrency == 0 ) {
0365 return create_body_task(t);
0366 } else {
0367 return internal_try_put_bypass(t);
0368 }
0369 }
0370
0371
0372
0373 task * apply_body_bypass( const input_type &i ) {
0374 return static_cast<ImplType *>(this)->apply_body_impl_bypass(i);
0375 }
0376
0377
0378 inline task * create_body_task( const input_type &input ) {
0379 return (internal::is_graph_active(my_graph_ref)) ?
0380 new( task::allocate_additional_child_of(*(my_graph_ref.root_task())) )
0381 apply_body_task_bypass < class_type, input_type >(
0382 *this, __TBB_FLOW_GRAPH_PRIORITY_ARG1(input, my_priority))
0383 : NULL;
0384 }
0385
0386
0387 task* forward_task() {
0388 operation_type op_data(try_fwd);
0389 task* rval = NULL;
0390 do {
0391 op_data.status = WAIT;
0392 my_aggregator.execute(&op_data);
0393 if(op_data.status == SUCCEEDED) {
0394 task* ttask = op_data.bypass_t;
0395 __TBB_ASSERT( ttask && ttask != SUCCESSFULLY_ENQUEUED, NULL );
0396 rval = combine_tasks(my_graph_ref, rval, ttask);
0397 }
0398 } while (op_data.status == SUCCEEDED);
0399 return rval;
0400 }
0401
0402 inline task *create_forward_task() {
0403 return (internal::is_graph_active(my_graph_ref)) ?
0404 new( task::allocate_additional_child_of(*(my_graph_ref.root_task())) )
0405 forward_task_bypass< class_type >( __TBB_FLOW_GRAPH_PRIORITY_ARG1(*this, my_priority) )
0406 : NULL;
0407 }
0408
0409
0410 inline void spawn_forward_task() {
0411 task* tp = create_forward_task();
0412 if(tp) {
0413 internal::spawn_in_graph_arena(graph_reference(), *tp);
0414 }
0415 }
0416 };
0417
0418
0419
0420 template< typename Input, typename Output, typename Policy, typename A>
0421 class function_input : public function_input_base<Input, Policy, A, function_input<Input,Output,Policy,A> > {
0422 public:
0423 typedef Input input_type;
0424 typedef Output output_type;
0425 typedef function_body<input_type, output_type> function_body_type;
0426 typedef function_input<Input, Output, Policy,A> my_class;
0427 typedef function_input_base<Input, Policy, A, my_class> base_type;
0428 typedef function_input_queue<input_type, A> input_queue_type;
0429
0430
0431 template<typename Body>
0432 function_input(
0433 graph &g, size_t max_concurrency,
0434 __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body& body, node_priority_t priority)
0435 ) : base_type(g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(max_concurrency, priority))
0436 , my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
0437 , my_init_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ) {
0438 }
0439
0440
0441 function_input( const function_input& src ) :
0442 base_type(src),
0443 my_body( src.my_init_body->clone() ),
0444 my_init_body(src.my_init_body->clone() ) {
0445 }
0446
0447 ~function_input() {
0448 delete my_body;
0449 delete my_init_body;
0450 }
0451
0452 template< typename Body >
0453 Body copy_function_object() {
0454 function_body_type &body_ref = *this->my_body;
0455 return dynamic_cast< internal::function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
0456 }
0457
0458 output_type apply_body_impl( const input_type& i) {
0459
0460
0461 tbb::internal::fgt_begin_body( my_body );
0462 output_type v = (*my_body)(i);
0463 tbb::internal::fgt_end_body( my_body );
0464 return v;
0465 }
0466
0467
0468 task * apply_body_impl_bypass( const input_type &i) {
0469 output_type v = apply_body_impl(i);
0470 #if TBB_DEPRECATED_MESSAGE_FLOW_ORDER
0471 task* successor_task = successors().try_put_task(v);
0472 #endif
0473 task* postponed_task = NULL;
0474 if( base_type::my_max_concurrency != 0 ) {
0475 postponed_task = base_type::try_get_postponed_task(i);
0476 __TBB_ASSERT( !postponed_task || postponed_task != SUCCESSFULLY_ENQUEUED, NULL );
0477 }
0478 #if TBB_DEPRECATED_MESSAGE_FLOW_ORDER
0479 graph& g = base_type::my_graph_ref;
0480 return combine_tasks(g, successor_task, postponed_task);
0481 #else
0482 if( postponed_task ) {
0483
0484
0485 internal::spawn_in_graph_arena(base_type::graph_reference(), *postponed_task);
0486 }
0487 task* successor_task = successors().try_put_task(v);
0488 #if _MSC_VER && !__INTEL_COMPILER
0489 #pragma warning (push)
0490 #pragma warning (disable: 4127)
0491 #endif
0492 if(internal::has_policy<lightweight, Policy>::value) {
0493 #if _MSC_VER && !__INTEL_COMPILER
0494 #pragma warning (pop)
0495 #endif
0496 if(!successor_task) {
0497
0498
0499 successor_task = SUCCESSFULLY_ENQUEUED;
0500 }
0501 }
0502 return successor_task;
0503 #endif
0504 }
0505
0506 protected:
0507
0508 void reset_function_input(reset_flags f) {
0509 base_type::reset_function_input_base(f);
0510 if(f & rf_reset_bodies) {
0511 function_body_type *tmp = my_init_body->clone();
0512 delete my_body;
0513 my_body = tmp;
0514 }
0515 }
0516
0517 function_body_type *my_body;
0518 function_body_type *my_init_body;
0519 virtual broadcast_cache<output_type > &successors() = 0;
0520
0521 };
0522
0523
0524
0525 template<int N> struct clear_element {
0526 template<typename P> static void clear_this(P &p) {
0527 (void)tbb::flow::get<N-1>(p).successors().clear();
0528 clear_element<N-1>::clear_this(p);
0529 }
0530 template<typename P> static bool this_empty(P &p) {
0531 if(tbb::flow::get<N-1>(p).successors().empty())
0532 return clear_element<N-1>::this_empty(p);
0533 return false;
0534 }
0535 };
0536
0537 template<> struct clear_element<1> {
0538 template<typename P> static void clear_this(P &p) {
0539 (void)tbb::flow::get<0>(p).successors().clear();
0540 }
0541 template<typename P> static bool this_empty(P &p) {
0542 return tbb::flow::get<0>(p).successors().empty();
0543 }
0544 };
0545
0546 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0547
0548 template<int N> struct extract_element {
0549 template<typename P> static void extract_this(P &p) {
0550 (void)tbb::flow::get<N-1>(p).successors().built_successors().sender_extract(tbb::flow::get<N-1>(p));
0551 extract_element<N-1>::extract_this(p);
0552 }
0553 };
0554
0555 template<> struct extract_element<1> {
0556 template<typename P> static void extract_this(P &p) {
0557 (void)tbb::flow::get<0>(p).successors().built_successors().sender_extract(tbb::flow::get<0>(p));
0558 }
0559 };
0560 #endif
0561
0562 template <typename OutputTuple>
0563 struct init_output_ports {
0564 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
0565 template <typename... Args>
0566 static OutputTuple call(graph& g, const tbb::flow::tuple<Args...>&) {
0567 return OutputTuple(Args(g)...);
0568 }
0569 #else
0570 template <typename T1>
0571 static OutputTuple call(graph& g, const tbb::flow::tuple<T1>&) {
0572 return OutputTuple(T1(g));
0573 }
0574
0575 template <typename T1, typename T2>
0576 static OutputTuple call(graph& g, const tbb::flow::tuple<T1, T2>&) {
0577 return OutputTuple(T1(g), T2(g));
0578 }
0579
0580 template <typename T1, typename T2, typename T3>
0581 static OutputTuple call(graph& g, const tbb::flow::tuple<T1, T2, T3>&) {
0582 return OutputTuple(T1(g), T2(g), T3(g));
0583 }
0584
0585 template <typename T1, typename T2, typename T3, typename T4>
0586 static OutputTuple call(graph& g, const tbb::flow::tuple<T1, T2, T3, T4>&) {
0587 return OutputTuple(T1(g), T2(g), T3(g), T4(g));
0588 }
0589
0590 template <typename T1, typename T2, typename T3, typename T4, typename T5>
0591 static OutputTuple call(graph& g, const tbb::flow::tuple<T1, T2, T3, T4, T5>&) {
0592 return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g));
0593 }
0594 #if __TBB_VARIADIC_MAX >= 6
0595 template <typename T1, typename T2, typename T3, typename T4, typename T5, typename T6>
0596 static OutputTuple call(graph& g, const tbb::flow::tuple<T1, T2, T3, T4, T5, T6>&) {
0597 return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g), T6(g));
0598 }
0599 #endif
0600 #if __TBB_VARIADIC_MAX >= 7
0601 template <typename T1, typename T2, typename T3, typename T4,
0602 typename T5, typename T6, typename T7>
0603 static OutputTuple call(graph& g,
0604 const tbb::flow::tuple<T1, T2, T3, T4, T5, T6, T7>&) {
0605 return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g), T6(g), T7(g));
0606 }
0607 #endif
0608 #if __TBB_VARIADIC_MAX >= 8
0609 template <typename T1, typename T2, typename T3, typename T4,
0610 typename T5, typename T6, typename T7, typename T8>
0611 static OutputTuple call(graph& g,
0612 const tbb::flow::tuple<T1, T2, T3, T4, T5, T6, T7, T8>&) {
0613 return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g), T6(g), T7(g), T8(g));
0614 }
0615 #endif
0616 #if __TBB_VARIADIC_MAX >= 9
0617 template <typename T1, typename T2, typename T3, typename T4,
0618 typename T5, typename T6, typename T7, typename T8, typename T9>
0619 static OutputTuple call(graph& g,
0620 const tbb::flow::tuple<T1, T2, T3, T4, T5, T6, T7, T8, T9>&) {
0621 return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g), T6(g), T7(g), T8(g), T9(g));
0622 }
0623 #endif
0624 #if __TBB_VARIADIC_MAX >= 9
0625 template <typename T1, typename T2, typename T3, typename T4, typename T5,
0626 typename T6, typename T7, typename T8, typename T9, typename T10>
0627 static OutputTuple call(graph& g,
0628 const tbb::flow::tuple<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>&) {
0629 return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g), T6(g), T7(g), T8(g), T9(g), T10(g));
0630 }
0631 #endif
0632 #endif
0633 };
0634
0635
0636
0637 template< typename Input, typename OutputPortSet, typename Policy, typename A>
0638 class multifunction_input : public function_input_base<Input, Policy, A, multifunction_input<Input,OutputPortSet,Policy,A> > {
0639 public:
0640 static const int N = tbb::flow::tuple_size<OutputPortSet>::value;
0641 typedef Input input_type;
0642 typedef OutputPortSet output_ports_type;
0643 typedef multifunction_body<input_type, output_ports_type> multifunction_body_type;
0644 typedef multifunction_input<Input, OutputPortSet, Policy, A> my_class;
0645 typedef function_input_base<Input, Policy, A, my_class> base_type;
0646 typedef function_input_queue<input_type, A> input_queue_type;
0647
0648
0649 template<typename Body>
0650 multifunction_input(graph &g, size_t max_concurrency,
0651 __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body& body, node_priority_t priority)
0652 ) : base_type(g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(max_concurrency, priority))
0653 , my_body( new internal::multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
0654 , my_init_body( new internal::multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
0655 , my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports)){
0656 }
0657
0658
0659 multifunction_input( const multifunction_input& src ) :
0660 base_type(src),
0661 my_body( src.my_init_body->clone() ),
0662 my_init_body(src.my_init_body->clone() ),
0663 my_output_ports( init_output_ports<output_ports_type>::call(src.my_graph_ref, my_output_ports) ) {
0664 }
0665
0666 ~multifunction_input() {
0667 delete my_body;
0668 delete my_init_body;
0669 }
0670
0671 template< typename Body >
0672 Body copy_function_object() {
0673 multifunction_body_type &body_ref = *this->my_body;
0674 return *static_cast<Body*>(dynamic_cast< internal::multifunction_body_leaf<input_type, output_ports_type, Body> & >(body_ref).get_body_ptr());
0675 }
0676
0677
0678
0679
0680 task * apply_body_impl_bypass( const input_type &i) {
0681 tbb::internal::fgt_begin_body( my_body );
0682 (*my_body)(i, my_output_ports);
0683 tbb::internal::fgt_end_body( my_body );
0684 task* ttask = NULL;
0685 if(base_type::my_max_concurrency != 0) {
0686 ttask = base_type::try_get_postponed_task(i);
0687 }
0688 return ttask ? ttask : SUCCESSFULLY_ENQUEUED;
0689 }
0690
0691 output_ports_type &output_ports(){ return my_output_ports; }
0692
0693 protected:
0694 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0695 void extract() {
0696 extract_element<N>::extract_this(my_output_ports);
0697 }
0698 #endif
0699
0700 void reset(reset_flags f) {
0701 base_type::reset_function_input_base(f);
0702 if(f & rf_clear_edges)clear_element<N>::clear_this(my_output_ports);
0703 if(f & rf_reset_bodies) {
0704 multifunction_body_type *tmp = my_init_body->clone();
0705 delete my_body;
0706 my_body = tmp;
0707 }
0708 __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "multifunction_node reset failed");
0709 }
0710
0711 multifunction_body_type *my_body;
0712 multifunction_body_type *my_init_body;
0713 output_ports_type my_output_ports;
0714
0715 };
0716
0717
0718 template<size_t N, typename MOP>
0719 typename tbb::flow::tuple_element<N, typename MOP::output_ports_type>::type &output_port(MOP &op) {
0720 return tbb::flow::get<N>(op.output_ports());
0721 }
0722
0723 inline void check_task_and_spawn(graph& g, task* t) {
0724 if (t && t != SUCCESSFULLY_ENQUEUED) {
0725 internal::spawn_in_graph_arena(g, *t);
0726 }
0727 }
0728
0729
0730 template<int N>
0731 struct emit_element {
0732 template<typename T, typename P>
0733 static task* emit_this(graph& g, const T &t, P &p) {
0734
0735 task* last_task = tbb::flow::get<N-1>(p).try_put_task(tbb::flow::get<N-1>(t));
0736 check_task_and_spawn(g, last_task);
0737 return emit_element<N-1>::emit_this(g,t,p);
0738 }
0739 };
0740
0741 template<>
0742 struct emit_element<1> {
0743 template<typename T, typename P>
0744 static task* emit_this(graph& g, const T &t, P &p) {
0745 task* last_task = tbb::flow::get<0>(p).try_put_task(tbb::flow::get<0>(t));
0746 check_task_and_spawn(g, last_task);
0747 return SUCCESSFULLY_ENQUEUED;
0748 }
0749 };
0750
0751
0752 template< typename Output, typename Policy>
0753 class continue_input : public continue_receiver {
0754 public:
0755
0756
0757 typedef continue_msg input_type;
0758
0759
0760 typedef Output output_type;
0761 typedef function_body<input_type, output_type> function_body_type;
0762 typedef continue_input<output_type, Policy> class_type;
0763
0764 template< typename Body >
0765 continue_input( graph &g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body& body, node_priority_t priority) )
0766 : continue_receiver(__TBB_FLOW_GRAPH_PRIORITY_ARG1(0, priority))
0767 , my_graph_ref(g)
0768 , my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
0769 , my_init_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
0770 { }
0771
0772 template< typename Body >
0773 continue_input( graph &g, int number_of_predecessors,
0774 __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body& body, node_priority_t priority)
0775 ) : continue_receiver( __TBB_FLOW_GRAPH_PRIORITY_ARG1(number_of_predecessors, priority) )
0776 , my_graph_ref(g)
0777 , my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
0778 , my_init_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
0779 { }
0780
0781 continue_input( const continue_input& src ) : continue_receiver(src),
0782 my_graph_ref(src.my_graph_ref),
0783 my_body( src.my_init_body->clone() ),
0784 my_init_body( src.my_init_body->clone() ) {}
0785
0786 ~continue_input() {
0787 delete my_body;
0788 delete my_init_body;
0789 }
0790
0791 template< typename Body >
0792 Body copy_function_object() {
0793 function_body_type &body_ref = *my_body;
0794 return dynamic_cast< internal::function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
0795 }
0796
0797 void reset_receiver( reset_flags f) __TBB_override {
0798 continue_receiver::reset_receiver(f);
0799 if(f & rf_reset_bodies) {
0800 function_body_type *tmp = my_init_body->clone();
0801 delete my_body;
0802 my_body = tmp;
0803 }
0804 }
0805
0806 protected:
0807
0808 graph& my_graph_ref;
0809 function_body_type *my_body;
0810 function_body_type *my_init_body;
0811
0812 virtual broadcast_cache<output_type > &successors() = 0;
0813
0814 friend class apply_body_task_bypass< class_type, continue_msg >;
0815
0816
0817 task *apply_body_bypass( input_type ) {
0818
0819
0820 tbb::internal::fgt_begin_body( my_body );
0821 output_type v = (*my_body)( continue_msg() );
0822 tbb::internal::fgt_end_body( my_body );
0823 return successors().try_put_task( v );
0824 }
0825
0826 task* execute() __TBB_override {
0827 if(!internal::is_graph_active(my_graph_ref)) {
0828 return NULL;
0829 }
0830 #if _MSC_VER && !__INTEL_COMPILER
0831 #pragma warning (push)
0832 #pragma warning (disable: 4127)
0833 #endif
0834 if(internal::has_policy<lightweight, Policy>::value) {
0835 #if _MSC_VER && !__INTEL_COMPILER
0836 #pragma warning (pop)
0837 #endif
0838 return apply_body_bypass( continue_msg() );
0839 }
0840 else {
0841 return new ( task::allocate_additional_child_of( *(my_graph_ref.root_task()) ) )
0842 apply_body_task_bypass< class_type, continue_msg >(
0843 *this, __TBB_FLOW_GRAPH_PRIORITY_ARG1(continue_msg(), my_priority) );
0844 }
0845 }
0846
0847 graph& graph_reference() const __TBB_override {
0848 return my_graph_ref;
0849 }
0850 };
0851
0852
0853 template< typename Output >
0854 class function_output : public sender<Output> {
0855 public:
0856
0857 template<int N> friend struct clear_element;
0858 typedef Output output_type;
0859 typedef typename sender<output_type>::successor_type successor_type;
0860 typedef broadcast_cache<output_type> broadcast_cache_type;
0861 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0862 typedef typename sender<output_type>::built_successors_type built_successors_type;
0863 typedef typename sender<output_type>::successor_list_type successor_list_type;
0864 #endif
0865
0866 function_output( graph& g) : my_graph_ref(g) { my_successors.set_owner(this); }
0867 function_output(const function_output & other) : sender<output_type>(), my_graph_ref(other.my_graph_ref) {
0868 my_successors.set_owner(this);
0869 }
0870
0871
0872 bool register_successor( successor_type &r ) __TBB_override {
0873 successors().register_successor( r );
0874 return true;
0875 }
0876
0877
0878 bool remove_successor( successor_type &r ) __TBB_override {
0879 successors().remove_successor( r );
0880 return true;
0881 }
0882
0883 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0884 built_successors_type &built_successors() __TBB_override { return successors().built_successors(); }
0885
0886
0887 void internal_add_built_successor( successor_type &r) __TBB_override {
0888 successors().internal_add_built_successor( r );
0889 }
0890
0891 void internal_delete_built_successor( successor_type &r) __TBB_override {
0892 successors().internal_delete_built_successor( r );
0893 }
0894
0895 size_t successor_count() __TBB_override {
0896 return successors().successor_count();
0897 }
0898
0899 void copy_successors( successor_list_type &v) __TBB_override {
0900 successors().copy_successors(v);
0901 }
0902 #endif
0903
0904
0905
0906
0907
0908
0909
0910
0911
0912 task *try_put_task(const output_type &i) {
0913 return my_successors.try_put_task(i);
0914 }
0915
0916 broadcast_cache_type &successors() { return my_successors; }
0917
0918 graph& graph_reference() const { return my_graph_ref; }
0919 protected:
0920 broadcast_cache_type my_successors;
0921 graph& my_graph_ref;
0922 };
0923
0924 template< typename Output >
0925 class multifunction_output : public function_output<Output> {
0926 public:
0927 typedef Output output_type;
0928 typedef function_output<output_type> base_type;
0929 using base_type::my_successors;
0930
0931 multifunction_output(graph& g) : base_type(g) {my_successors.set_owner(this);}
0932 multifunction_output( const multifunction_output& other) : base_type(other.my_graph_ref) { my_successors.set_owner(this); }
0933
0934 bool try_put(const output_type &i) {
0935 task *res = try_put_task(i);
0936 if(!res) return false;
0937 if(res != SUCCESSFULLY_ENQUEUED) {
0938 FLOW_SPAWN(*res);
0939 }
0940 return true;
0941 }
0942
0943 using base_type::graph_reference;
0944
0945 protected:
0946
0947 task* try_put_task(const output_type &i) {
0948 return my_successors.try_put_task(i);
0949 }
0950
0951 template <int N> friend struct emit_element;
0952
0953 };
0954
0955
0956 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
0957 template<typename CompositeType>
0958 void add_nodes_impl(CompositeType*, bool) {}
0959
0960 template< typename CompositeType, typename NodeType1, typename... NodeTypes >
0961 void add_nodes_impl(CompositeType *c_node, bool visible, const NodeType1& n1, const NodeTypes&... n) {
0962 void *addr = const_cast<NodeType1 *>(&n1);
0963
0964 fgt_alias_port(c_node, addr, visible);
0965 add_nodes_impl(c_node, visible, n...);
0966 }
0967 #endif
0968
0969 }
0970
0971 #endif