File indexing completed on 2025-12-18 10:24:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB__flow_graph_node_impl_H
0018 #define __TBB__flow_graph_node_impl_H
0019
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023
0024 #include "_flow_graph_item_buffer_impl.h"
0025
0026 template< typename T, typename A >
0027 class function_input_queue : public item_buffer<T,A> {
0028 public:
0029 bool empty() const {
0030 return this->buffer_empty();
0031 }
0032
0033 const T& front() const {
0034 return this->item_buffer<T, A>::front();
0035 }
0036
0037 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0038 const message_metainfo& front_metainfo() const {
0039 return this->item_buffer<T,A>::front_metainfo();
0040 }
0041 #endif
0042
0043 void pop() {
0044 this->destroy_front();
0045 }
0046
0047 bool push( T& t ) {
0048 return this->push_back( t );
0049 }
0050
0051 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0052 bool push( T& t, const message_metainfo& metainfo ) {
0053 return this->push_back(t, metainfo);
0054 }
0055 #endif
0056 };
0057
0058
0059
0060
0061 template< typename Input, typename Policy, typename A, typename ImplType >
0062 class function_input_base : public receiver<Input>, no_assign {
0063 enum op_type {reg_pred, rem_pred, try_fwd, tryput_bypass, app_body_bypass, occupy_concurrency
0064 };
0065 typedef function_input_base<Input, Policy, A, ImplType> class_type;
0066
0067 public:
0068
0069
0070 typedef Input input_type;
0071 typedef typename receiver<input_type>::predecessor_type predecessor_type;
0072 typedef predecessor_cache<input_type, null_mutex > predecessor_cache_type;
0073 typedef function_input_queue<input_type, A> input_queue_type;
0074 typedef typename allocator_traits<A>::template rebind_alloc<input_queue_type> allocator_type;
0075 static_assert(!has_policy<queueing, Policy>::value || !has_policy<rejecting, Policy>::value, "");
0076
0077
0078 function_input_base( graph &g, size_t max_concurrency, node_priority_t a_priority, bool is_no_throw )
0079 : my_graph_ref(g), my_max_concurrency(max_concurrency)
0080 , my_concurrency(0), my_priority(a_priority), my_is_no_throw(is_no_throw)
0081 , my_queue(!has_policy<rejecting, Policy>::value ? new input_queue_type() : nullptr)
0082 , my_predecessors(this)
0083 , forwarder_busy(false)
0084 {
0085 my_aggregator.initialize_handler(handler_type(this));
0086 }
0087
0088
0089 function_input_base( const function_input_base& src )
0090 : function_input_base(src.my_graph_ref, src.my_max_concurrency, src.my_priority, src.my_is_no_throw) {}
0091
0092
0093
0094
0095
0096 virtual ~function_input_base() {
0097 delete my_queue;
0098 my_queue = nullptr;
0099 }
0100
0101 graph_task* try_put_task( const input_type& t) override {
0102 return try_put_task_base(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
0103 }
0104
0105 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0106 graph_task* try_put_task( const input_type& t, const message_metainfo& metainfo ) override {
0107 return try_put_task_base(t, metainfo);
0108 }
0109 #endif
0110
0111
0112 bool register_predecessor( predecessor_type &src ) override {
0113 operation_type op_data(reg_pred);
0114 op_data.r = &src;
0115 my_aggregator.execute(&op_data);
0116 return true;
0117 }
0118
0119
0120 bool remove_predecessor( predecessor_type &src ) override {
0121 operation_type op_data(rem_pred);
0122 op_data.r = &src;
0123 my_aggregator.execute(&op_data);
0124 return true;
0125 }
0126
0127 protected:
0128
0129 void reset_function_input_base( reset_flags f) {
0130 my_concurrency = 0;
0131 if(my_queue) {
0132 my_queue->reset();
0133 }
0134 reset_receiver(f);
0135 forwarder_busy = false;
0136 }
0137
0138 graph& my_graph_ref;
0139 const size_t my_max_concurrency;
0140 size_t my_concurrency;
0141 node_priority_t my_priority;
0142 const bool my_is_no_throw;
0143 input_queue_type *my_queue;
0144 predecessor_cache<input_type, null_mutex > my_predecessors;
0145
0146 void reset_receiver( reset_flags f) {
0147 if( f & rf_clear_edges) my_predecessors.clear();
0148 else
0149 my_predecessors.reset();
0150 __TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.empty(), "function_input_base reset failed");
0151 }
0152
0153 graph& graph_reference() const override {
0154 return my_graph_ref;
0155 }
0156
0157 graph_task* try_get_postponed_task(const input_type& i) {
0158 operation_type op_data(i, app_body_bypass);
0159 my_aggregator.execute(&op_data);
0160 return op_data.bypass_t;
0161 }
0162
0163 private:
0164
0165 friend class apply_body_task_bypass< class_type, input_type >;
0166 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0167 friend class apply_body_task_bypass< class_type, input_type, trackable_messages_graph_task >;
0168 #endif
0169 friend class forward_task_bypass< class_type >;
0170
0171 class operation_type : public d1::aggregated_operation< operation_type > {
0172 public:
0173 char type;
0174 union {
0175 input_type *elem;
0176 predecessor_type *r;
0177 };
0178 graph_task* bypass_t;
0179 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0180 message_metainfo* metainfo;
0181 #endif
0182 operation_type(const input_type& e, op_type t) :
0183 type(char(t)), elem(const_cast<input_type*>(&e)), bypass_t(nullptr)
0184 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0185 , metainfo(nullptr)
0186 #endif
0187 {}
0188 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0189 operation_type(const input_type& e, op_type t, const message_metainfo& info) :
0190 type(char(t)), elem(const_cast<input_type*>(&e)), bypass_t(nullptr),
0191 metainfo(const_cast<message_metainfo*>(&info)) {}
0192 #endif
0193 operation_type(op_type t) : type(char(t)), r(nullptr), bypass_t(nullptr) {}
0194 };
0195
0196 bool forwarder_busy;
0197 typedef d1::aggregating_functor<class_type, operation_type> handler_type;
0198 friend class d1::aggregating_functor<class_type, operation_type>;
0199 d1::aggregator< handler_type, operation_type > my_aggregator;
0200
0201 graph_task* perform_queued_requests() {
0202 graph_task* new_task = nullptr;
0203 if(my_queue) {
0204 if(!my_queue->empty()) {
0205 ++my_concurrency;
0206
0207
0208 new_task = create_body_task(my_queue->front()
0209 __TBB_FLOW_GRAPH_METAINFO_ARG(my_queue->front_metainfo()));
0210
0211 my_queue->pop();
0212 }
0213 }
0214 else {
0215 input_type i;
0216 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0217 message_metainfo metainfo;
0218 #endif
0219 if(my_predecessors.get_item(i __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo))) {
0220 ++my_concurrency;
0221 new_task = create_body_task(i __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(metainfo)));
0222 }
0223 }
0224 return new_task;
0225 }
0226 void handle_operations(operation_type *op_list) {
0227 operation_type* tmp;
0228 while (op_list) {
0229 tmp = op_list;
0230 op_list = op_list->next;
0231 switch (tmp->type) {
0232 case reg_pred:
0233 my_predecessors.add(*(tmp->r));
0234 tmp->status.store(SUCCEEDED, std::memory_order_release);
0235 if (!forwarder_busy) {
0236 forwarder_busy = true;
0237 spawn_forward_task();
0238 }
0239 break;
0240 case rem_pred:
0241 my_predecessors.remove(*(tmp->r));
0242 tmp->status.store(SUCCEEDED, std::memory_order_release);
0243 break;
0244 case app_body_bypass: {
0245 tmp->bypass_t = nullptr;
0246 __TBB_ASSERT(my_max_concurrency != 0, nullptr);
0247 --my_concurrency;
0248 if(my_concurrency<my_max_concurrency)
0249 tmp->bypass_t = perform_queued_requests();
0250 tmp->status.store(SUCCEEDED, std::memory_order_release);
0251 }
0252 break;
0253 case tryput_bypass: internal_try_put_task(tmp); break;
0254 case try_fwd: internal_forward(tmp); break;
0255 case occupy_concurrency:
0256 if (my_concurrency < my_max_concurrency) {
0257 ++my_concurrency;
0258 tmp->status.store(SUCCEEDED, std::memory_order_release);
0259 } else {
0260 tmp->status.store(FAILED, std::memory_order_release);
0261 }
0262 break;
0263 }
0264 }
0265 }
0266
0267
0268 void internal_try_put_task(operation_type *op) {
0269 __TBB_ASSERT(my_max_concurrency != 0, nullptr);
0270 if (my_concurrency < my_max_concurrency) {
0271 ++my_concurrency;
0272 graph_task* new_task = create_body_task(*(op->elem)
0273 __TBB_FLOW_GRAPH_METAINFO_ARG(*(op->metainfo)));
0274 op->bypass_t = new_task;
0275 op->status.store(SUCCEEDED, std::memory_order_release);
0276 } else if ( my_queue && my_queue->push(*(op->elem)
0277 __TBB_FLOW_GRAPH_METAINFO_ARG(*(op->metainfo))) )
0278 {
0279 op->bypass_t = SUCCESSFULLY_ENQUEUED;
0280 op->status.store(SUCCEEDED, std::memory_order_release);
0281 } else {
0282 op->bypass_t = nullptr;
0283 op->status.store(FAILED, std::memory_order_release);
0284 }
0285 }
0286
0287
0288 void internal_forward(operation_type *op) {
0289 op->bypass_t = nullptr;
0290 if (my_concurrency < my_max_concurrency)
0291 op->bypass_t = perform_queued_requests();
0292 if(op->bypass_t)
0293 op->status.store(SUCCEEDED, std::memory_order_release);
0294 else {
0295 forwarder_busy = false;
0296 op->status.store(FAILED, std::memory_order_release);
0297 }
0298 }
0299
0300 graph_task* internal_try_put_bypass( const input_type& t
0301 __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo))
0302 {
0303 operation_type op_data(t, tryput_bypass __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0304 my_aggregator.execute(&op_data);
0305 if( op_data.status == SUCCEEDED ) {
0306 return op_data.bypass_t;
0307 }
0308 return nullptr;
0309 }
0310
0311 graph_task* try_put_task_base(const input_type& t
0312 __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo))
0313 {
0314 if ( my_is_no_throw )
0315 return try_put_task_impl(t, has_policy<lightweight, Policy>()
0316 __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0317 else
0318 return try_put_task_impl(t, std::false_type()
0319 __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0320 }
0321
0322 graph_task* try_put_task_impl( const input_type& t, std::true_type
0323 __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo))
0324 {
0325 if( my_max_concurrency == 0 ) {
0326 return apply_body_bypass(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0327 } else {
0328 operation_type check_op(t, occupy_concurrency);
0329 my_aggregator.execute(&check_op);
0330 if( check_op.status == SUCCEEDED ) {
0331 return apply_body_bypass(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0332 }
0333 return internal_try_put_bypass(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0334 }
0335 }
0336
0337 graph_task* try_put_task_impl( const input_type& t, std::false_type
0338 __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo))
0339 {
0340 if( my_max_concurrency == 0 ) {
0341 return create_body_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0342 } else {
0343 return internal_try_put_bypass(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0344 }
0345 }
0346
0347
0348
0349 graph_task* apply_body_bypass( const input_type &i
0350 __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo))
0351
0352 {
0353 return static_cast<ImplType *>(this)->apply_body_impl_bypass(i __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0354 }
0355
0356
0357 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0358 template <typename Metainfo>
0359 graph_task* create_body_task( const input_type &input, Metainfo&& metainfo )
0360 #else
0361 graph_task* create_body_task( const input_type &input )
0362 #endif
0363 {
0364 if (!is_graph_active(my_graph_ref)) {
0365 return nullptr;
0366 }
0367
0368 d1::small_object_allocator allocator{};
0369 graph_task* t = nullptr;
0370 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0371 if (!metainfo.empty()) {
0372 using task_type = apply_body_task_bypass<class_type, input_type, trackable_messages_graph_task>;
0373 t = allocator.new_object<task_type>(my_graph_ref, allocator, *this, input, my_priority, std::forward<Metainfo>(metainfo));
0374 } else
0375 #endif
0376 {
0377 using task_type = apply_body_task_bypass<class_type, input_type>;
0378 t = allocator.new_object<task_type>(my_graph_ref, allocator, *this, input, my_priority);
0379 }
0380 return t;
0381 }
0382
0383
0384 graph_task* forward_task() {
0385 operation_type op_data(try_fwd);
0386 graph_task* rval = nullptr;
0387 do {
0388 op_data.status = WAIT;
0389 my_aggregator.execute(&op_data);
0390 if(op_data.status == SUCCEEDED) {
0391 graph_task* ttask = op_data.bypass_t;
0392 __TBB_ASSERT( ttask && ttask != SUCCESSFULLY_ENQUEUED, nullptr);
0393 rval = combine_tasks(my_graph_ref, rval, ttask);
0394 }
0395 } while (op_data.status == SUCCEEDED);
0396 return rval;
0397 }
0398
0399 inline graph_task* create_forward_task() {
0400 if (!is_graph_active(my_graph_ref)) {
0401 return nullptr;
0402 }
0403 d1::small_object_allocator allocator{};
0404 typedef forward_task_bypass<class_type> task_type;
0405 graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, my_priority );
0406 return t;
0407 }
0408
0409
0410 inline void spawn_forward_task() {
0411 graph_task* tp = create_forward_task();
0412 if(tp) {
0413 spawn_in_graph_arena(graph_reference(), *tp);
0414 }
0415 }
0416
0417 node_priority_t priority() const override { return my_priority; }
0418 };
0419
0420
0421
0422 template< typename Input, typename Output, typename Policy, typename A>
0423 class function_input : public function_input_base<Input, Policy, A, function_input<Input,Output,Policy,A> > {
0424 public:
0425 typedef Input input_type;
0426 typedef Output output_type;
0427 typedef function_body<input_type, output_type> function_body_type;
0428 typedef function_input<Input, Output, Policy,A> my_class;
0429 typedef function_input_base<Input, Policy, A, my_class> base_type;
0430 typedef function_input_queue<input_type, A> input_queue_type;
0431
0432
0433 template<typename Body>
0434 function_input(
0435 graph &g, size_t max_concurrency, Body& body, node_priority_t a_priority )
0436 : base_type(g, max_concurrency, a_priority, noexcept(tbb::detail::invoke(body, input_type())))
0437 , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
0438 , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) {
0439 }
0440
0441
0442 function_input( const function_input& src ) :
0443 base_type(src),
0444 my_body( src.my_init_body->clone() ),
0445 my_init_body(src.my_init_body->clone() ) {
0446 }
0447 #if __INTEL_COMPILER <= 2021
0448
0449
0450 virtual
0451 #endif
0452 ~function_input() {
0453 delete my_body;
0454 delete my_init_body;
0455 }
0456
0457 template< typename Body >
0458 Body copy_function_object() {
0459 function_body_type &body_ref = *this->my_body;
0460 return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
0461 }
0462
0463 output_type apply_body_impl( const input_type& i) {
0464
0465
0466 fgt_begin_body( my_body );
0467 output_type v = tbb::detail::invoke(*my_body, i);
0468 fgt_end_body( my_body );
0469 return v;
0470 }
0471
0472
0473 graph_task* apply_body_impl_bypass( const input_type &i
0474 __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo))
0475 {
0476 output_type v = apply_body_impl(i);
0477 graph_task* postponed_task = nullptr;
0478 if( base_type::my_max_concurrency != 0 ) {
0479 postponed_task = base_type::try_get_postponed_task(i);
0480 __TBB_ASSERT( !postponed_task || postponed_task != SUCCESSFULLY_ENQUEUED, nullptr);
0481 }
0482 if( postponed_task ) {
0483
0484
0485 spawn_in_graph_arena(base_type::graph_reference(), *postponed_task);
0486 }
0487 graph_task* successor_task = successors().try_put_task(v __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0488 #if _MSC_VER && !__INTEL_COMPILER
0489 #pragma warning (push)
0490 #pragma warning (disable: 4127)
0491 #endif
0492 if(has_policy<lightweight, Policy>::value) {
0493 #if _MSC_VER && !__INTEL_COMPILER
0494 #pragma warning (pop)
0495 #endif
0496 if(!successor_task) {
0497
0498
0499 successor_task = SUCCESSFULLY_ENQUEUED;
0500 }
0501 }
0502 return successor_task;
0503 }
0504
0505 protected:
0506
0507 void reset_function_input(reset_flags f) {
0508 base_type::reset_function_input_base(f);
0509 if(f & rf_reset_bodies) {
0510 function_body_type *tmp = my_init_body->clone();
0511 delete my_body;
0512 my_body = tmp;
0513 }
0514 }
0515
0516 function_body_type *my_body;
0517 function_body_type *my_init_body;
0518 virtual broadcast_cache<output_type > &successors() = 0;
0519
0520 };
0521
0522
0523
0524 template<int N> struct clear_element {
0525 template<typename P> static void clear_this(P &p) {
0526 (void)std::get<N-1>(p).successors().clear();
0527 clear_element<N-1>::clear_this(p);
0528 }
0529 #if TBB_USE_ASSERT
0530 template<typename P> static bool this_empty(P &p) {
0531 if(std::get<N-1>(p).successors().empty())
0532 return clear_element<N-1>::this_empty(p);
0533 return false;
0534 }
0535 #endif
0536 };
0537
0538 template<> struct clear_element<1> {
0539 template<typename P> static void clear_this(P &p) {
0540 (void)std::get<0>(p).successors().clear();
0541 }
0542 #if TBB_USE_ASSERT
0543 template<typename P> static bool this_empty(P &p) {
0544 return std::get<0>(p).successors().empty();
0545 }
0546 #endif
0547 };
0548
0549 template <typename OutputTuple>
0550 struct init_output_ports {
0551 template <typename... Args>
0552 static OutputTuple call(graph& g, const std::tuple<Args...>&) {
0553 return OutputTuple(Args(g)...);
0554 }
0555 };
0556
0557
0558
0559 template< typename Input, typename OutputPortSet, typename Policy, typename A>
0560 class multifunction_input : public function_input_base<Input, Policy, A, multifunction_input<Input,OutputPortSet,Policy,A> > {
0561 public:
0562 static const int N = std::tuple_size<OutputPortSet>::value;
0563 typedef Input input_type;
0564 typedef OutputPortSet output_ports_type;
0565 typedef multifunction_body<input_type, output_ports_type> multifunction_body_type;
0566 typedef multifunction_input<Input, OutputPortSet, Policy, A> my_class;
0567 typedef function_input_base<Input, Policy, A, my_class> base_type;
0568 typedef function_input_queue<input_type, A> input_queue_type;
0569
0570
0571 template<typename Body>
0572 multifunction_input(graph &g, size_t max_concurrency,Body& body, node_priority_t a_priority )
0573 : base_type(g, max_concurrency, a_priority, noexcept(tbb::detail::invoke(body, input_type(), my_output_ports)))
0574 , my_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
0575 , my_init_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
0576 , my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports)){
0577 }
0578
0579
0580 multifunction_input( const multifunction_input& src ) :
0581 base_type(src),
0582 my_body( src.my_init_body->clone() ),
0583 my_init_body(src.my_init_body->clone() ),
0584 my_output_ports( init_output_ports<output_ports_type>::call(src.my_graph_ref, my_output_ports) ) {
0585 }
0586
0587 ~multifunction_input() {
0588 delete my_body;
0589 delete my_init_body;
0590 }
0591
0592 template< typename Body >
0593 Body copy_function_object() {
0594 multifunction_body_type &body_ref = *this->my_body;
0595 return *static_cast<Body*>(dynamic_cast< multifunction_body_leaf<input_type, output_ports_type, Body> & >(body_ref).get_body_ptr());
0596 }
0597
0598
0599
0600
0601 graph_task* apply_body_impl_bypass( const input_type &i
0602 __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo&) )
0603 {
0604 fgt_begin_body( my_body );
0605 (*my_body)(i, my_output_ports);
0606 fgt_end_body( my_body );
0607 graph_task* ttask = nullptr;
0608 if(base_type::my_max_concurrency != 0) {
0609 ttask = base_type::try_get_postponed_task(i);
0610 }
0611 return ttask ? ttask : SUCCESSFULLY_ENQUEUED;
0612 }
0613
0614 output_ports_type &output_ports(){ return my_output_ports; }
0615
0616 protected:
0617
0618 void reset(reset_flags f) {
0619 base_type::reset_function_input_base(f);
0620 if(f & rf_clear_edges)clear_element<N>::clear_this(my_output_ports);
0621 if(f & rf_reset_bodies) {
0622 multifunction_body_type* tmp = my_init_body->clone();
0623 delete my_body;
0624 my_body = tmp;
0625 }
0626 __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "multifunction_node reset failed");
0627 }
0628
0629 multifunction_body_type *my_body;
0630 multifunction_body_type *my_init_body;
0631 output_ports_type my_output_ports;
0632
0633 };
0634
0635
0636 template<size_t N, typename MOP>
0637 typename std::tuple_element<N, typename MOP::output_ports_type>::type &output_port(MOP &op) {
0638 return std::get<N>(op.output_ports());
0639 }
0640
0641 inline void check_task_and_spawn(graph& g, graph_task* t) {
0642 if (t && t != SUCCESSFULLY_ENQUEUED) {
0643 spawn_in_graph_arena(g, *t);
0644 }
0645 }
0646
0647
0648 template<int N>
0649 struct emit_element {
0650 template<typename T, typename P>
0651 static graph_task* emit_this(graph& g, const T &t, P &p) {
0652
0653 graph_task* last_task = std::get<N-1>(p).try_put_task(std::get<N-1>(t));
0654 check_task_and_spawn(g, last_task);
0655 return emit_element<N-1>::emit_this(g,t,p);
0656 }
0657
0658 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0659 template <typename TupleType, typename PortsType>
0660 static graph_task* emit_this(graph& g, const TupleType& t, PortsType& p,
0661 const message_metainfo& metainfo)
0662 {
0663
0664 graph_task* last_task = std::get<N-1>(p).try_put_task(std::get<N-1>(t), metainfo);
0665 check_task_and_spawn(g, last_task);
0666 return emit_element<N-1>::emit_this(g, t, p, metainfo);
0667 }
0668 #endif
0669 };
0670
0671 template<>
0672 struct emit_element<1> {
0673 template<typename T, typename P>
0674 static graph_task* emit_this(graph& g, const T &t, P &p) {
0675 graph_task* last_task = std::get<0>(p).try_put_task(std::get<0>(t));
0676 check_task_and_spawn(g, last_task);
0677 return SUCCESSFULLY_ENQUEUED;
0678 }
0679
0680 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0681 template <typename TupleType, typename PortsType>
0682 static graph_task* emit_this(graph& g, const TupleType& t, PortsType& ports,
0683 const message_metainfo& metainfo)
0684 {
0685 graph_task* last_task = std::get<0>(ports).try_put_task(std::get<0>(t), metainfo);
0686 check_task_and_spawn(g, last_task);
0687 return SUCCESSFULLY_ENQUEUED;
0688 }
0689 #endif
0690 };
0691
0692
0693 template< typename Output, typename Policy>
0694 class continue_input : public continue_receiver {
0695 public:
0696
0697
0698 typedef continue_msg input_type;
0699
0700
0701 typedef Output output_type;
0702 typedef function_body<input_type, output_type> function_body_type;
0703 typedef continue_input<output_type, Policy> class_type;
0704
0705 template< typename Body >
0706 continue_input( graph &g, Body& body, node_priority_t a_priority )
0707 : continue_receiver(0, a_priority)
0708 , my_graph_ref(g)
0709 , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
0710 , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) )
0711 { }
0712
0713 template< typename Body >
0714 continue_input( graph &g, int number_of_predecessors,
0715 Body& body, node_priority_t a_priority )
0716 : continue_receiver( number_of_predecessors, a_priority )
0717 , my_graph_ref(g)
0718 , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
0719 , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) )
0720 { }
0721
0722 continue_input( const continue_input& src ) : continue_receiver(src),
0723 my_graph_ref(src.my_graph_ref),
0724 my_body( src.my_init_body->clone() ),
0725 my_init_body( src.my_init_body->clone() ) {}
0726
0727 ~continue_input() {
0728 delete my_body;
0729 delete my_init_body;
0730 }
0731
0732 template< typename Body >
0733 Body copy_function_object() {
0734 function_body_type &body_ref = *my_body;
0735 return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
0736 }
0737
0738 void reset_receiver( reset_flags f) override {
0739 continue_receiver::reset_receiver(f);
0740 if(f & rf_reset_bodies) {
0741 function_body_type *tmp = my_init_body->clone();
0742 delete my_body;
0743 my_body = tmp;
0744 }
0745 }
0746
0747 protected:
0748
0749 graph& my_graph_ref;
0750 function_body_type *my_body;
0751 function_body_type *my_init_body;
0752
0753 virtual broadcast_cache<output_type > &successors() = 0;
0754
0755 friend class apply_body_task_bypass< class_type, continue_msg >;
0756 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0757 friend class apply_body_task_bypass< class_type, continue_msg, trackable_messages_graph_task >;
0758 #endif
0759
0760
0761 graph_task* apply_body_bypass( input_type __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) {
0762
0763
0764 fgt_begin_body( my_body );
0765 output_type v = (*my_body)( continue_msg() );
0766 fgt_end_body( my_body );
0767 return successors().try_put_task( v __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo) );
0768 }
0769
0770 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0771 graph_task* execute(const message_metainfo& metainfo) override {
0772 #else
0773 graph_task* execute() override {
0774 #endif
0775 if(!is_graph_active(my_graph_ref)) {
0776 return nullptr;
0777 }
0778 #if _MSC_VER && !__INTEL_COMPILER
0779 #pragma warning (push)
0780 #pragma warning (disable: 4127)
0781 #endif
0782 if(has_policy<lightweight, Policy>::value) {
0783 #if _MSC_VER && !__INTEL_COMPILER
0784 #pragma warning (pop)
0785 #endif
0786 return apply_body_bypass( continue_msg() __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo) );
0787 }
0788 else {
0789 d1::small_object_allocator allocator{};
0790 graph_task* t = nullptr;
0791 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0792 if (!metainfo.empty()) {
0793 using task_type = apply_body_task_bypass<class_type, continue_msg, trackable_messages_graph_task>;
0794 t = allocator.new_object<task_type>( graph_reference(), allocator, *this, continue_msg(), my_priority, metainfo );
0795 } else
0796 #endif
0797 {
0798 using task_type = apply_body_task_bypass<class_type, continue_msg>;
0799 t = allocator.new_object<task_type>( graph_reference(), allocator, *this, continue_msg(), my_priority );
0800 }
0801 return t;
0802 }
0803 }
0804
0805 graph& graph_reference() const override {
0806 return my_graph_ref;
0807 }
0808 };
0809
0810
0811 template< typename Output >
0812 class function_output : public sender<Output> {
0813 public:
0814
0815 template<int N> friend struct clear_element;
0816 typedef Output output_type;
0817 typedef typename sender<output_type>::successor_type successor_type;
0818 typedef broadcast_cache<output_type> broadcast_cache_type;
0819
0820 function_output(graph& g) : my_successors(this), my_graph_ref(g) {}
0821 function_output(const function_output& other) = delete;
0822
0823
0824 bool register_successor( successor_type &r ) override {
0825 successors().register_successor( r );
0826 return true;
0827 }
0828
0829
0830 bool remove_successor( successor_type &r ) override {
0831 successors().remove_successor( r );
0832 return true;
0833 }
0834
0835 broadcast_cache_type &successors() { return my_successors; }
0836
0837 graph& graph_reference() const { return my_graph_ref; }
0838 protected:
0839 broadcast_cache_type my_successors;
0840 graph& my_graph_ref;
0841 };
0842
0843 template< typename Output >
0844 class multifunction_output : public function_output<Output> {
0845 public:
0846 typedef Output output_type;
0847 typedef function_output<output_type> base_type;
0848 using base_type::my_successors;
0849
0850 multifunction_output(graph& g) : base_type(g) {}
0851 multifunction_output(const multifunction_output& other) : base_type(other.my_graph_ref) {}
0852
0853 bool try_put(const output_type &i) {
0854 graph_task *res = try_put_task(i);
0855 if( !res ) return false;
0856 if( res != SUCCESSFULLY_ENQUEUED ) {
0857
0858
0859 spawn_in_graph_arena(graph_reference(), *res);
0860 }
0861 return true;
0862 }
0863
0864 using base_type::graph_reference;
0865
0866 protected:
0867
0868 graph_task* try_put_task(const output_type &i) {
0869 return my_successors.try_put_task(i);
0870 }
0871
0872 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0873 graph_task* try_put_task(const output_type& i, const message_metainfo& metainfo) {
0874 return my_successors.try_put_task(i, metainfo);
0875 }
0876 #endif
0877
0878 template <int N> friend struct emit_element;
0879
0880 };
0881
0882
0883 template<typename CompositeType>
0884 void add_nodes_impl(CompositeType*, bool) {}
0885
0886 template< typename CompositeType, typename NodeType1, typename... NodeTypes >
0887 void add_nodes_impl(CompositeType *c_node, bool visible, const NodeType1& n1, const NodeTypes&... n) {
0888 void *addr = const_cast<NodeType1 *>(&n1);
0889
0890 fgt_alias_port(c_node, addr, visible);
0891 add_nodes_impl(c_node, visible, n...);
0892 }
0893
0894 #endif