Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-07-30 08:46:17

0001 /*
0002     Copyright (c) 2005-2023 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB__flow_graph_node_impl_H
0018 #define __TBB__flow_graph_node_impl_H
0019 
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023 
0024 #include "_flow_graph_item_buffer_impl.h"
0025 
0026 template< typename T, typename A >
0027 class function_input_queue : public item_buffer<T,A> {
0028 public:
0029     bool empty() const {
0030         return this->buffer_empty();
0031     }
0032 
0033     const T& front() const {
0034         return this->item_buffer<T, A>::front();
0035     }
0036 
0037     void pop() {
0038         this->destroy_front();
0039     }
0040 
0041     bool push( T& t ) {
0042         return this->push_back( t );
0043     }
0044 };
0045 
0046 //! Input and scheduling for a function node that takes a type Input as input
0047 //  The only up-ref is apply_body_impl, which should implement the function
0048 //  call and any handling of the result.
0049 template< typename Input, typename Policy, typename A, typename ImplType >
0050 class function_input_base : public receiver<Input>, no_assign {
0051     enum op_type {reg_pred, rem_pred, try_fwd, tryput_bypass, app_body_bypass, occupy_concurrency
0052     };
0053     typedef function_input_base<Input, Policy, A, ImplType> class_type;
0054 
0055 public:
0056 
0057     //! The input type of this receiver
0058     typedef Input input_type;
0059     typedef typename receiver<input_type>::predecessor_type predecessor_type;
0060     typedef predecessor_cache<input_type, null_mutex > predecessor_cache_type;
0061     typedef function_input_queue<input_type, A> input_queue_type;
0062     typedef typename allocator_traits<A>::template rebind_alloc<input_queue_type> allocator_type;
0063     static_assert(!has_policy<queueing, Policy>::value || !has_policy<rejecting, Policy>::value, "");
0064 
0065     //! Constructor for function_input_base
0066     function_input_base( graph &g, size_t max_concurrency, node_priority_t a_priority, bool is_no_throw )
0067         : my_graph_ref(g), my_max_concurrency(max_concurrency)
0068         , my_concurrency(0), my_priority(a_priority), my_is_no_throw(is_no_throw)
0069         , my_queue(!has_policy<rejecting, Policy>::value ? new input_queue_type() : nullptr)
0070         , my_predecessors(this)
0071         , forwarder_busy(false)
0072     {
0073         my_aggregator.initialize_handler(handler_type(this));
0074     }
0075 
0076     //! Copy constructor
0077     function_input_base( const function_input_base& src )
0078         : function_input_base(src.my_graph_ref, src.my_max_concurrency, src.my_priority, src.my_is_no_throw) {}
0079 
0080     //! Destructor
0081     // The queue is allocated by the constructor for {multi}function_node.
0082     // TODO: pass the graph_buffer_policy to the base so it can allocate the queue instead.
0083     // This would be an interface-breaking change.
0084     virtual ~function_input_base() {
0085         delete my_queue;
0086         my_queue = nullptr;
0087     }
0088 
0089     graph_task* try_put_task( const input_type& t) override {
0090         if ( my_is_no_throw )
0091             return try_put_task_impl(t, has_policy<lightweight, Policy>());
0092         else
0093             return try_put_task_impl(t, std::false_type());
0094     }
0095 
0096     //! Adds src to the list of cached predecessors.
0097     bool register_predecessor( predecessor_type &src ) override {
0098         operation_type op_data(reg_pred);
0099         op_data.r = &src;
0100         my_aggregator.execute(&op_data);
0101         return true;
0102     }
0103 
0104     //! Removes src from the list of cached predecessors.
0105     bool remove_predecessor( predecessor_type &src ) override {
0106         operation_type op_data(rem_pred);
0107         op_data.r = &src;
0108         my_aggregator.execute(&op_data);
0109         return true;
0110     }
0111 
0112 protected:
0113 
0114     void reset_function_input_base( reset_flags f) {
0115         my_concurrency = 0;
0116         if(my_queue) {
0117             my_queue->reset();
0118         }
0119         reset_receiver(f);
0120         forwarder_busy = false;
0121     }
0122 
0123     graph& my_graph_ref;
0124     const size_t my_max_concurrency;
0125     size_t my_concurrency;
0126     node_priority_t my_priority;
0127     const bool my_is_no_throw;
0128     input_queue_type *my_queue;
0129     predecessor_cache<input_type, null_mutex > my_predecessors;
0130 
0131     void reset_receiver( reset_flags f) {
0132         if( f & rf_clear_edges) my_predecessors.clear();
0133         else
0134             my_predecessors.reset();
0135         __TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.empty(), "function_input_base reset failed");
0136     }
0137 
0138     graph& graph_reference() const override {
0139         return my_graph_ref;
0140     }
0141 
0142     graph_task* try_get_postponed_task(const input_type& i) {
0143         operation_type op_data(i, app_body_bypass);  // tries to pop an item or get_item
0144         my_aggregator.execute(&op_data);
0145         return op_data.bypass_t;
0146     }
0147 
0148 private:
0149 
0150     friend class apply_body_task_bypass< class_type, input_type >;
0151     friend class forward_task_bypass< class_type >;
0152 
0153     class operation_type : public aggregated_operation< operation_type > {
0154     public:
0155         char type;
0156         union {
0157             input_type *elem;
0158             predecessor_type *r;
0159         };
0160         graph_task* bypass_t;
0161         operation_type(const input_type& e, op_type t) :
0162             type(char(t)), elem(const_cast<input_type*>(&e)), bypass_t(nullptr) {}
0163         operation_type(op_type t) : type(char(t)), r(nullptr), bypass_t(nullptr) {}
0164     };
0165 
0166     bool forwarder_busy;
0167     typedef aggregating_functor<class_type, operation_type> handler_type;
0168     friend class aggregating_functor<class_type, operation_type>;
0169     aggregator< handler_type, operation_type > my_aggregator;
0170 
0171     graph_task* perform_queued_requests() {
0172         graph_task* new_task = nullptr;
0173         if(my_queue) {
0174             if(!my_queue->empty()) {
0175                 ++my_concurrency;
0176                 new_task = create_body_task(my_queue->front());
0177 
0178                 my_queue->pop();
0179             }
0180         }
0181         else {
0182             input_type i;
0183             if(my_predecessors.get_item(i)) {
0184                 ++my_concurrency;
0185                 new_task = create_body_task(i);
0186             }
0187         }
0188         return new_task;
0189     }
0190     void handle_operations(operation_type *op_list) {
0191         operation_type* tmp;
0192         while (op_list) {
0193             tmp = op_list;
0194             op_list = op_list->next;
0195             switch (tmp->type) {
0196             case reg_pred:
0197                 my_predecessors.add(*(tmp->r));
0198                 tmp->status.store(SUCCEEDED, std::memory_order_release);
0199                 if (!forwarder_busy) {
0200                     forwarder_busy = true;
0201                     spawn_forward_task();
0202                 }
0203                 break;
0204             case rem_pred:
0205                 my_predecessors.remove(*(tmp->r));
0206                 tmp->status.store(SUCCEEDED, std::memory_order_release);
0207                 break;
0208             case app_body_bypass: {
0209                 tmp->bypass_t = nullptr;
0210                 __TBB_ASSERT(my_max_concurrency != 0, nullptr);
0211                 --my_concurrency;
0212                 if(my_concurrency<my_max_concurrency)
0213                     tmp->bypass_t = perform_queued_requests();
0214                 tmp->status.store(SUCCEEDED, std::memory_order_release);
0215             }
0216                 break;
0217             case tryput_bypass: internal_try_put_task(tmp);  break;
0218             case try_fwd: internal_forward(tmp);  break;
0219             case occupy_concurrency:
0220                 if (my_concurrency < my_max_concurrency) {
0221                     ++my_concurrency;
0222                     tmp->status.store(SUCCEEDED, std::memory_order_release);
0223                 } else {
0224                     tmp->status.store(FAILED, std::memory_order_release);
0225                 }
0226                 break;
0227             }
0228         }
0229     }
0230 
0231     //! Put to the node, but return the task instead of enqueueing it
0232     void internal_try_put_task(operation_type *op) {
0233         __TBB_ASSERT(my_max_concurrency != 0, nullptr);
0234         if (my_concurrency < my_max_concurrency) {
0235             ++my_concurrency;
0236             graph_task * new_task = create_body_task(*(op->elem));
0237             op->bypass_t = new_task;
0238             op->status.store(SUCCEEDED, std::memory_order_release);
0239         } else if ( my_queue && my_queue->push(*(op->elem)) ) {
0240             op->bypass_t = SUCCESSFULLY_ENQUEUED;
0241             op->status.store(SUCCEEDED, std::memory_order_release);
0242         } else {
0243             op->bypass_t = nullptr;
0244             op->status.store(FAILED, std::memory_order_release);
0245         }
0246     }
0247 
0248     //! Creates tasks for postponed messages if available and if concurrency allows
0249     void internal_forward(operation_type *op) {
0250         op->bypass_t = nullptr;
0251         if (my_concurrency < my_max_concurrency)
0252             op->bypass_t = perform_queued_requests();
0253         if(op->bypass_t)
0254             op->status.store(SUCCEEDED, std::memory_order_release);
0255         else {
0256             forwarder_busy = false;
0257             op->status.store(FAILED, std::memory_order_release);
0258         }
0259     }
0260 
0261     graph_task* internal_try_put_bypass( const input_type& t ) {
0262         operation_type op_data(t, tryput_bypass);
0263         my_aggregator.execute(&op_data);
0264         if( op_data.status == SUCCEEDED ) {
0265             return op_data.bypass_t;
0266         }
0267         return nullptr;
0268     }
0269 
0270     graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::true_type ) {
0271         if( my_max_concurrency == 0 ) {
0272             return apply_body_bypass(t);
0273         } else {
0274             operation_type check_op(t, occupy_concurrency);
0275             my_aggregator.execute(&check_op);
0276             if( check_op.status == SUCCEEDED ) {
0277                 return apply_body_bypass(t);
0278             }
0279             return internal_try_put_bypass(t);
0280         }
0281     }
0282 
0283     graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::false_type ) {
0284         if( my_max_concurrency == 0 ) {
0285             return create_body_task(t);
0286         } else {
0287             return internal_try_put_bypass(t);
0288         }
0289     }
0290 
0291     //! Applies the body to the provided input
0292     //  then decides if more work is available
0293     graph_task* apply_body_bypass( const input_type &i ) {
0294         return static_cast<ImplType *>(this)->apply_body_impl_bypass(i);
0295     }
0296 
0297     //! allocates a task to apply a body
0298     graph_task* create_body_task( const input_type &input ) {
0299         if (!is_graph_active(my_graph_ref)) {
0300             return nullptr;
0301         }
0302         // TODO revamp: extract helper for common graph task allocation part
0303         small_object_allocator allocator{};
0304         typedef apply_body_task_bypass<class_type, input_type> task_type;
0305         graph_task* t = allocator.new_object<task_type>( my_graph_ref, allocator, *this, input, my_priority );
0306         graph_reference().reserve_wait();
0307         return t;
0308     }
0309 
0310     //! This is executed by an enqueued task, the "forwarder"
0311     graph_task* forward_task() {
0312         operation_type op_data(try_fwd);
0313         graph_task* rval = nullptr;
0314         do {
0315             op_data.status = WAIT;
0316             my_aggregator.execute(&op_data);
0317             if(op_data.status == SUCCEEDED) {
0318                 graph_task* ttask = op_data.bypass_t;
0319                 __TBB_ASSERT( ttask && ttask != SUCCESSFULLY_ENQUEUED, nullptr);
0320                 rval = combine_tasks(my_graph_ref, rval, ttask);
0321             }
0322         } while (op_data.status == SUCCEEDED);
0323         return rval;
0324     }
0325 
0326     inline graph_task* create_forward_task() {
0327         if (!is_graph_active(my_graph_ref)) {
0328             return nullptr;
0329         }
0330         small_object_allocator allocator{};
0331         typedef forward_task_bypass<class_type> task_type;
0332         graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, my_priority );
0333         graph_reference().reserve_wait();
0334         return t;
0335     }
0336 
0337     //! Spawns a task that calls forward()
0338     inline void spawn_forward_task() {
0339         graph_task* tp = create_forward_task();
0340         if(tp) {
0341             spawn_in_graph_arena(graph_reference(), *tp);
0342         }
0343     }
0344 
0345     node_priority_t priority() const override { return my_priority; }
0346 };  // function_input_base
0347 
0348 //! Implements methods for a function node that takes a type Input as input and sends
0349 //  a type Output to its successors.
0350 template< typename Input, typename Output, typename Policy, typename A>
0351 class function_input : public function_input_base<Input, Policy, A, function_input<Input,Output,Policy,A> > {
0352 public:
0353     typedef Input input_type;
0354     typedef Output output_type;
0355     typedef function_body<input_type, output_type> function_body_type;
0356     typedef function_input<Input, Output, Policy,A> my_class;
0357     typedef function_input_base<Input, Policy, A, my_class> base_type;
0358     typedef function_input_queue<input_type, A> input_queue_type;
0359 
0360     // constructor
0361     template<typename Body>
0362     function_input(
0363         graph &g, size_t max_concurrency, Body& body, node_priority_t a_priority )
0364       : base_type(g, max_concurrency, a_priority, noexcept(tbb::detail::invoke(body, input_type())))
0365       , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
0366       , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) {
0367     }
0368 
0369     //! Copy constructor
0370     function_input( const function_input& src ) :
0371         base_type(src),
0372         my_body( src.my_init_body->clone() ),
0373         my_init_body(src.my_init_body->clone() ) {
0374     }
0375 #if __INTEL_COMPILER <= 2021
0376     // Suppress superfluous diagnostic about virtual keyword absence in a destructor of an inherited
0377     // class while the parent class has the virtual keyword for the destrocutor.
0378     virtual
0379 #endif
0380     ~function_input() {
0381         delete my_body;
0382         delete my_init_body;
0383     }
0384 
0385     template< typename Body >
0386     Body copy_function_object() {
0387         function_body_type &body_ref = *this->my_body;
0388         return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
0389     }
0390 
0391     output_type apply_body_impl( const input_type& i) {
0392         // There is an extra copied needed to capture the
0393         // body execution without the try_put
0394         fgt_begin_body( my_body );
0395         output_type v = tbb::detail::invoke(*my_body, i);
0396         fgt_end_body( my_body );
0397         return v;
0398     }
0399 
0400     //TODO: consider moving into the base class
0401     graph_task* apply_body_impl_bypass( const input_type &i) {
0402         output_type v = apply_body_impl(i);
0403         graph_task* postponed_task = nullptr;
0404         if( base_type::my_max_concurrency != 0 ) {
0405             postponed_task = base_type::try_get_postponed_task(i);
0406             __TBB_ASSERT( !postponed_task || postponed_task != SUCCESSFULLY_ENQUEUED, nullptr);
0407         }
0408         if( postponed_task ) {
0409             // make the task available for other workers since we do not know successors'
0410             // execution policy
0411             spawn_in_graph_arena(base_type::graph_reference(), *postponed_task);
0412         }
0413         graph_task* successor_task = successors().try_put_task(v);
0414 #if _MSC_VER && !__INTEL_COMPILER
0415 #pragma warning (push)
0416 #pragma warning (disable: 4127)  /* suppress conditional expression is constant */
0417 #endif
0418         if(has_policy<lightweight, Policy>::value) {
0419 #if _MSC_VER && !__INTEL_COMPILER
0420 #pragma warning (pop)
0421 #endif
0422             if(!successor_task) {
0423                 // Return confirmative status since current
0424                 // node's body has been executed anyway
0425                 successor_task = SUCCESSFULLY_ENQUEUED;
0426             }
0427         }
0428         return successor_task;
0429     }
0430 
0431 protected:
0432 
0433     void reset_function_input(reset_flags f) {
0434         base_type::reset_function_input_base(f);
0435         if(f & rf_reset_bodies) {
0436             function_body_type *tmp = my_init_body->clone();
0437             delete my_body;
0438             my_body = tmp;
0439         }
0440     }
0441 
0442     function_body_type *my_body;
0443     function_body_type *my_init_body;
0444     virtual broadcast_cache<output_type > &successors() = 0;
0445 
0446 };  // function_input
0447 
0448 
0449 // helper templates to clear the successor edges of the output ports of an multifunction_node
0450 template<int N> struct clear_element {
0451     template<typename P> static void clear_this(P &p) {
0452         (void)std::get<N-1>(p).successors().clear();
0453         clear_element<N-1>::clear_this(p);
0454     }
0455 #if TBB_USE_ASSERT
0456     template<typename P> static bool this_empty(P &p) {
0457         if(std::get<N-1>(p).successors().empty())
0458             return clear_element<N-1>::this_empty(p);
0459         return false;
0460     }
0461 #endif
0462 };
0463 
0464 template<> struct clear_element<1> {
0465     template<typename P> static void clear_this(P &p) {
0466         (void)std::get<0>(p).successors().clear();
0467     }
0468 #if TBB_USE_ASSERT
0469     template<typename P> static bool this_empty(P &p) {
0470         return std::get<0>(p).successors().empty();
0471     }
0472 #endif
0473 };
0474 
0475 template <typename OutputTuple>
0476 struct init_output_ports {
0477     template <typename... Args>
0478     static OutputTuple call(graph& g, const std::tuple<Args...>&) {
0479         return OutputTuple(Args(g)...);
0480     }
0481 }; // struct init_output_ports
0482 
0483 //! Implements methods for a function node that takes a type Input as input
0484 //  and has a tuple of output ports specified.
0485 template< typename Input, typename OutputPortSet, typename Policy, typename A>
0486 class multifunction_input : public function_input_base<Input, Policy, A, multifunction_input<Input,OutputPortSet,Policy,A> > {
0487 public:
0488     static const int N = std::tuple_size<OutputPortSet>::value;
0489     typedef Input input_type;
0490     typedef OutputPortSet output_ports_type;
0491     typedef multifunction_body<input_type, output_ports_type> multifunction_body_type;
0492     typedef multifunction_input<Input, OutputPortSet, Policy, A> my_class;
0493     typedef function_input_base<Input, Policy, A, my_class> base_type;
0494     typedef function_input_queue<input_type, A> input_queue_type;
0495 
0496     // constructor
0497     template<typename Body>
0498     multifunction_input(graph &g, size_t max_concurrency,Body& body, node_priority_t a_priority )
0499       : base_type(g, max_concurrency, a_priority, noexcept(tbb::detail::invoke(body, input_type(), my_output_ports)))
0500       , my_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
0501       , my_init_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
0502       , my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports)){
0503     }
0504 
0505     //! Copy constructor
0506     multifunction_input( const multifunction_input& src ) :
0507         base_type(src),
0508         my_body( src.my_init_body->clone() ),
0509         my_init_body(src.my_init_body->clone() ),
0510         my_output_ports( init_output_ports<output_ports_type>::call(src.my_graph_ref, my_output_ports) ) {
0511     }
0512 
0513     ~multifunction_input() {
0514         delete my_body;
0515         delete my_init_body;
0516     }
0517 
0518     template< typename Body >
0519     Body copy_function_object() {
0520         multifunction_body_type &body_ref = *this->my_body;
0521         return *static_cast<Body*>(dynamic_cast< multifunction_body_leaf<input_type, output_ports_type, Body> & >(body_ref).get_body_ptr());
0522     }
0523 
0524     // for multifunction nodes we do not have a single successor as such.  So we just tell
0525     // the task we were successful.
0526     //TODO: consider moving common parts with implementation in function_input into separate function
0527     graph_task* apply_body_impl_bypass( const input_type &i ) {
0528         fgt_begin_body( my_body );
0529         (*my_body)(i, my_output_ports);
0530         fgt_end_body( my_body );
0531         graph_task* ttask = nullptr;
0532         if(base_type::my_max_concurrency != 0) {
0533             ttask = base_type::try_get_postponed_task(i);
0534         }
0535         return ttask ? ttask : SUCCESSFULLY_ENQUEUED;
0536     }
0537 
0538     output_ports_type &output_ports(){ return my_output_ports; }
0539 
0540 protected:
0541 
0542     void reset(reset_flags f) {
0543         base_type::reset_function_input_base(f);
0544         if(f & rf_clear_edges)clear_element<N>::clear_this(my_output_ports);
0545         if(f & rf_reset_bodies) {
0546             multifunction_body_type* tmp = my_init_body->clone();
0547             delete my_body;
0548             my_body = tmp;
0549         }
0550         __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "multifunction_node reset failed");
0551     }
0552 
0553     multifunction_body_type *my_body;
0554     multifunction_body_type *my_init_body;
0555     output_ports_type my_output_ports;
0556 
0557 };  // multifunction_input
0558 
0559 // template to refer to an output port of a multifunction_node
0560 template<size_t N, typename MOP>
0561 typename std::tuple_element<N, typename MOP::output_ports_type>::type &output_port(MOP &op) {
0562     return std::get<N>(op.output_ports());
0563 }
0564 
0565 inline void check_task_and_spawn(graph& g, graph_task* t) {
0566     if (t && t != SUCCESSFULLY_ENQUEUED) {
0567         spawn_in_graph_arena(g, *t);
0568     }
0569 }
0570 
0571 // helper structs for split_node
0572 template<int N>
0573 struct emit_element {
0574     template<typename T, typename P>
0575     static graph_task* emit_this(graph& g, const T &t, P &p) {
0576         // TODO: consider to collect all the tasks in task_list and spawn them all at once
0577         graph_task* last_task = std::get<N-1>(p).try_put_task(std::get<N-1>(t));
0578         check_task_and_spawn(g, last_task);
0579         return emit_element<N-1>::emit_this(g,t,p);
0580     }
0581 };
0582 
0583 template<>
0584 struct emit_element<1> {
0585     template<typename T, typename P>
0586     static graph_task* emit_this(graph& g, const T &t, P &p) {
0587         graph_task* last_task = std::get<0>(p).try_put_task(std::get<0>(t));
0588         check_task_and_spawn(g, last_task);
0589         return SUCCESSFULLY_ENQUEUED;
0590     }
0591 };
0592 
0593 //! Implements methods for an executable node that takes continue_msg as input
0594 template< typename Output, typename Policy>
0595 class continue_input : public continue_receiver {
0596 public:
0597 
0598     //! The input type of this receiver
0599     typedef continue_msg input_type;
0600 
0601     //! The output type of this receiver
0602     typedef Output output_type;
0603     typedef function_body<input_type, output_type> function_body_type;
0604     typedef continue_input<output_type, Policy> class_type;
0605 
0606     template< typename Body >
0607     continue_input( graph &g, Body& body, node_priority_t a_priority )
0608         : continue_receiver(/*number_of_predecessors=*/0, a_priority)
0609         , my_graph_ref(g)
0610         , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
0611         , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) )
0612     { }
0613 
0614     template< typename Body >
0615     continue_input( graph &g, int number_of_predecessors,
0616                     Body& body, node_priority_t a_priority )
0617       : continue_receiver( number_of_predecessors, a_priority )
0618       , my_graph_ref(g)
0619       , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
0620       , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) )
0621     { }
0622 
0623     continue_input( const continue_input& src ) : continue_receiver(src),
0624                                                   my_graph_ref(src.my_graph_ref),
0625                                                   my_body( src.my_init_body->clone() ),
0626                                                   my_init_body( src.my_init_body->clone() ) {}
0627 
0628     ~continue_input() {
0629         delete my_body;
0630         delete my_init_body;
0631     }
0632 
0633     template< typename Body >
0634     Body copy_function_object() {
0635         function_body_type &body_ref = *my_body;
0636         return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
0637     }
0638 
0639     void reset_receiver( reset_flags f) override {
0640         continue_receiver::reset_receiver(f);
0641         if(f & rf_reset_bodies) {
0642             function_body_type *tmp = my_init_body->clone();
0643             delete my_body;
0644             my_body = tmp;
0645         }
0646     }
0647 
0648 protected:
0649 
0650     graph& my_graph_ref;
0651     function_body_type *my_body;
0652     function_body_type *my_init_body;
0653 
0654     virtual broadcast_cache<output_type > &successors() = 0;
0655 
0656     friend class apply_body_task_bypass< class_type, continue_msg >;
0657 
0658     //! Applies the body to the provided input
0659     graph_task* apply_body_bypass( input_type ) {
0660         // There is an extra copied needed to capture the
0661         // body execution without the try_put
0662         fgt_begin_body( my_body );
0663         output_type v = (*my_body)( continue_msg() );
0664         fgt_end_body( my_body );
0665         return successors().try_put_task( v );
0666     }
0667 
0668     graph_task* execute() override {
0669         if(!is_graph_active(my_graph_ref)) {
0670             return nullptr;
0671         }
0672 #if _MSC_VER && !__INTEL_COMPILER
0673 #pragma warning (push)
0674 #pragma warning (disable: 4127)  /* suppress conditional expression is constant */
0675 #endif
0676         if(has_policy<lightweight, Policy>::value) {
0677 #if _MSC_VER && !__INTEL_COMPILER
0678 #pragma warning (pop)
0679 #endif
0680             return apply_body_bypass( continue_msg() );
0681         }
0682         else {
0683             small_object_allocator allocator{};
0684             typedef apply_body_task_bypass<class_type, continue_msg> task_type;
0685             graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, continue_msg(), my_priority );
0686             graph_reference().reserve_wait();
0687             return t;
0688         }
0689     }
0690 
0691     graph& graph_reference() const override {
0692         return my_graph_ref;
0693     }
0694 };  // continue_input
0695 
0696 //! Implements methods for both executable and function nodes that puts Output to its successors
0697 template< typename Output >
0698 class function_output : public sender<Output> {
0699 public:
0700 
0701     template<int N> friend struct clear_element;
0702     typedef Output output_type;
0703     typedef typename sender<output_type>::successor_type successor_type;
0704     typedef broadcast_cache<output_type> broadcast_cache_type;
0705 
0706     function_output(graph& g) : my_successors(this), my_graph_ref(g) {}
0707     function_output(const function_output& other) = delete;
0708 
0709     //! Adds a new successor to this node
0710     bool register_successor( successor_type &r ) override {
0711         successors().register_successor( r );
0712         return true;
0713     }
0714 
0715     //! Removes a successor from this node
0716     bool remove_successor( successor_type &r ) override {
0717         successors().remove_successor( r );
0718         return true;
0719     }
0720 
0721     broadcast_cache_type &successors() { return my_successors; }
0722 
0723     graph& graph_reference() const { return my_graph_ref; }
0724 protected:
0725     broadcast_cache_type my_successors;
0726     graph& my_graph_ref;
0727 };  // function_output
0728 
0729 template< typename Output >
0730 class multifunction_output : public function_output<Output> {
0731 public:
0732     typedef Output output_type;
0733     typedef function_output<output_type> base_type;
0734     using base_type::my_successors;
0735 
0736     multifunction_output(graph& g) : base_type(g) {}
0737     multifunction_output(const multifunction_output& other) : base_type(other.my_graph_ref) {}
0738 
0739     bool try_put(const output_type &i) {
0740         graph_task *res = try_put_task(i);
0741         if( !res ) return false;
0742         if( res != SUCCESSFULLY_ENQUEUED ) {
0743             // wrapping in task_arena::execute() is not needed since the method is called from
0744             // inside task::execute()
0745             spawn_in_graph_arena(graph_reference(), *res);
0746         }
0747         return true;
0748     }
0749 
0750     using base_type::graph_reference;
0751 
0752 protected:
0753 
0754     graph_task* try_put_task(const output_type &i) {
0755         return my_successors.try_put_task(i);
0756     }
0757 
0758     template <int N> friend struct emit_element;
0759 
0760 };  // multifunction_output
0761 
0762 //composite_node
0763 template<typename CompositeType>
0764 void add_nodes_impl(CompositeType*, bool) {}
0765 
0766 template< typename CompositeType, typename NodeType1, typename... NodeTypes >
0767 void add_nodes_impl(CompositeType *c_node, bool visible, const NodeType1& n1, const NodeTypes&... n) {
0768     void *addr = const_cast<NodeType1 *>(&n1);
0769 
0770     fgt_alias_port(c_node, addr, visible);
0771     add_nodes_impl(c_node, visible, n...);
0772 }
0773 
0774 #endif // __TBB__flow_graph_node_impl_H