Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-12-18 10:24:18

0001 /*
0002     Copyright (c) 2005-2024 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB__flow_graph_node_impl_H
0018 #define __TBB__flow_graph_node_impl_H
0019 
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023 
0024 #include "_flow_graph_item_buffer_impl.h"
0025 
0026 template< typename T, typename A >
0027 class function_input_queue : public item_buffer<T,A> {
0028 public:
0029     bool empty() const {
0030         return this->buffer_empty();
0031     }
0032 
0033     const T& front() const {
0034         return this->item_buffer<T, A>::front();
0035     }
0036 
0037 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0038     const message_metainfo& front_metainfo() const {
0039         return this->item_buffer<T,A>::front_metainfo();
0040     }
0041 #endif
0042 
0043     void pop() {
0044         this->destroy_front();
0045     }
0046 
0047     bool push( T& t ) {
0048         return this->push_back( t );
0049     }
0050 
0051 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0052     bool push( T& t, const message_metainfo& metainfo ) {
0053         return this->push_back(t, metainfo);
0054     }
0055 #endif
0056 };
0057 
0058 //! Input and scheduling for a function node that takes a type Input as input
0059 //  The only up-ref is apply_body_impl, which should implement the function
0060 //  call and any handling of the result.
0061 template< typename Input, typename Policy, typename A, typename ImplType >
0062 class function_input_base : public receiver<Input>, no_assign {
0063     enum op_type {reg_pred, rem_pred, try_fwd, tryput_bypass, app_body_bypass, occupy_concurrency
0064     };
0065     typedef function_input_base<Input, Policy, A, ImplType> class_type;
0066 
0067 public:
0068 
0069     //! The input type of this receiver
0070     typedef Input input_type;
0071     typedef typename receiver<input_type>::predecessor_type predecessor_type;
0072     typedef predecessor_cache<input_type, null_mutex > predecessor_cache_type;
0073     typedef function_input_queue<input_type, A> input_queue_type;
0074     typedef typename allocator_traits<A>::template rebind_alloc<input_queue_type> allocator_type;
0075     static_assert(!has_policy<queueing, Policy>::value || !has_policy<rejecting, Policy>::value, "");
0076 
0077     //! Constructor for function_input_base
0078     function_input_base( graph &g, size_t max_concurrency, node_priority_t a_priority, bool is_no_throw )
0079         : my_graph_ref(g), my_max_concurrency(max_concurrency)
0080         , my_concurrency(0), my_priority(a_priority), my_is_no_throw(is_no_throw)
0081         , my_queue(!has_policy<rejecting, Policy>::value ? new input_queue_type() : nullptr)
0082         , my_predecessors(this)
0083         , forwarder_busy(false)
0084     {
0085         my_aggregator.initialize_handler(handler_type(this));
0086     }
0087 
0088     //! Copy constructor
0089     function_input_base( const function_input_base& src )
0090         : function_input_base(src.my_graph_ref, src.my_max_concurrency, src.my_priority, src.my_is_no_throw) {}
0091 
0092     //! Destructor
0093     // The queue is allocated by the constructor for {multi}function_node.
0094     // TODO: pass the graph_buffer_policy to the base so it can allocate the queue instead.
0095     // This would be an interface-breaking change.
0096     virtual ~function_input_base() {
0097         delete my_queue;
0098         my_queue = nullptr;
0099     }
0100 
0101     graph_task* try_put_task( const input_type& t) override {
0102         return try_put_task_base(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
0103     }
0104 
0105 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0106     graph_task* try_put_task( const input_type& t, const message_metainfo& metainfo ) override {
0107         return try_put_task_base(t, metainfo);
0108     }
0109 #endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0110 
0111     //! Adds src to the list of cached predecessors.
0112     bool register_predecessor( predecessor_type &src ) override {
0113         operation_type op_data(reg_pred);
0114         op_data.r = &src;
0115         my_aggregator.execute(&op_data);
0116         return true;
0117     }
0118 
0119     //! Removes src from the list of cached predecessors.
0120     bool remove_predecessor( predecessor_type &src ) override {
0121         operation_type op_data(rem_pred);
0122         op_data.r = &src;
0123         my_aggregator.execute(&op_data);
0124         return true;
0125     }
0126 
0127 protected:
0128 
0129     void reset_function_input_base( reset_flags f) {
0130         my_concurrency = 0;
0131         if(my_queue) {
0132             my_queue->reset();
0133         }
0134         reset_receiver(f);
0135         forwarder_busy = false;
0136     }
0137 
0138     graph& my_graph_ref;
0139     const size_t my_max_concurrency;
0140     size_t my_concurrency;
0141     node_priority_t my_priority;
0142     const bool my_is_no_throw;
0143     input_queue_type *my_queue;
0144     predecessor_cache<input_type, null_mutex > my_predecessors;
0145 
0146     void reset_receiver( reset_flags f) {
0147         if( f & rf_clear_edges) my_predecessors.clear();
0148         else
0149             my_predecessors.reset();
0150         __TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.empty(), "function_input_base reset failed");
0151     }
0152 
0153     graph& graph_reference() const override {
0154         return my_graph_ref;
0155     }
0156 
0157     graph_task* try_get_postponed_task(const input_type& i) {
0158         operation_type op_data(i, app_body_bypass);  // tries to pop an item or get_item
0159         my_aggregator.execute(&op_data);
0160         return op_data.bypass_t;
0161     }
0162 
0163 private:
0164 
0165     friend class apply_body_task_bypass< class_type, input_type >;
0166 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0167     friend class apply_body_task_bypass< class_type, input_type, trackable_messages_graph_task >;
0168 #endif
0169     friend class forward_task_bypass< class_type >;
0170 
0171     class operation_type : public d1::aggregated_operation< operation_type > {
0172     public:
0173         char type;
0174         union {
0175             input_type *elem;
0176             predecessor_type *r;
0177         };
0178         graph_task* bypass_t;
0179 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0180         message_metainfo* metainfo;
0181 #endif
0182         operation_type(const input_type& e, op_type t) :
0183             type(char(t)), elem(const_cast<input_type*>(&e)), bypass_t(nullptr)
0184 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0185             , metainfo(nullptr)
0186 #endif
0187         {}
0188 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0189         operation_type(const input_type& e, op_type t, const message_metainfo& info) :
0190             type(char(t)), elem(const_cast<input_type*>(&e)), bypass_t(nullptr),
0191             metainfo(const_cast<message_metainfo*>(&info)) {}
0192 #endif
0193         operation_type(op_type t) : type(char(t)), r(nullptr), bypass_t(nullptr) {}
0194     };
0195 
0196     bool forwarder_busy;
0197     typedef d1::aggregating_functor<class_type, operation_type> handler_type;
0198     friend class d1::aggregating_functor<class_type, operation_type>;
0199     d1::aggregator< handler_type, operation_type > my_aggregator;
0200 
0201     graph_task* perform_queued_requests() {
0202         graph_task* new_task = nullptr;
0203         if(my_queue) {
0204             if(!my_queue->empty()) {
0205                 ++my_concurrency;
0206                 // TODO: consider removing metainfo from the queue using move semantics to avoid
0207                 // ref counter increase
0208                 new_task = create_body_task(my_queue->front()
0209                                             __TBB_FLOW_GRAPH_METAINFO_ARG(my_queue->front_metainfo()));
0210 
0211                 my_queue->pop();
0212             }
0213         }
0214         else {
0215             input_type i;
0216 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0217             message_metainfo metainfo;
0218 #endif
0219             if(my_predecessors.get_item(i __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo))) {
0220                 ++my_concurrency;
0221                 new_task = create_body_task(i __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(metainfo)));
0222             }
0223         }
0224         return new_task;
0225     }
0226     void handle_operations(operation_type *op_list) {
0227         operation_type* tmp;
0228         while (op_list) {
0229             tmp = op_list;
0230             op_list = op_list->next;
0231             switch (tmp->type) {
0232             case reg_pred:
0233                 my_predecessors.add(*(tmp->r));
0234                 tmp->status.store(SUCCEEDED, std::memory_order_release);
0235                 if (!forwarder_busy) {
0236                     forwarder_busy = true;
0237                     spawn_forward_task();
0238                 }
0239                 break;
0240             case rem_pred:
0241                 my_predecessors.remove(*(tmp->r));
0242                 tmp->status.store(SUCCEEDED, std::memory_order_release);
0243                 break;
0244             case app_body_bypass: {
0245                 tmp->bypass_t = nullptr;
0246                 __TBB_ASSERT(my_max_concurrency != 0, nullptr);
0247                 --my_concurrency;
0248                 if(my_concurrency<my_max_concurrency)
0249                     tmp->bypass_t = perform_queued_requests();
0250                 tmp->status.store(SUCCEEDED, std::memory_order_release);
0251             }
0252                 break;
0253             case tryput_bypass: internal_try_put_task(tmp);  break;
0254             case try_fwd: internal_forward(tmp);  break;
0255             case occupy_concurrency:
0256                 if (my_concurrency < my_max_concurrency) {
0257                     ++my_concurrency;
0258                     tmp->status.store(SUCCEEDED, std::memory_order_release);
0259                 } else {
0260                     tmp->status.store(FAILED, std::memory_order_release);
0261                 }
0262                 break;
0263             }
0264         }
0265     }
0266 
0267     //! Put to the node, but return the task instead of enqueueing it
0268     void internal_try_put_task(operation_type *op) {
0269         __TBB_ASSERT(my_max_concurrency != 0, nullptr);
0270         if (my_concurrency < my_max_concurrency) {
0271             ++my_concurrency;
0272             graph_task* new_task = create_body_task(*(op->elem)
0273                                                     __TBB_FLOW_GRAPH_METAINFO_ARG(*(op->metainfo)));
0274             op->bypass_t = new_task;
0275             op->status.store(SUCCEEDED, std::memory_order_release);
0276         } else if ( my_queue && my_queue->push(*(op->elem)
0277                     __TBB_FLOW_GRAPH_METAINFO_ARG(*(op->metainfo))) )
0278         {
0279             op->bypass_t = SUCCESSFULLY_ENQUEUED;
0280             op->status.store(SUCCEEDED, std::memory_order_release);
0281         } else {
0282             op->bypass_t = nullptr;
0283             op->status.store(FAILED, std::memory_order_release);
0284         }
0285     }
0286 
0287     //! Creates tasks for postponed messages if available and if concurrency allows
0288     void internal_forward(operation_type *op) {
0289         op->bypass_t = nullptr;
0290         if (my_concurrency < my_max_concurrency)
0291             op->bypass_t = perform_queued_requests();
0292         if(op->bypass_t)
0293             op->status.store(SUCCEEDED, std::memory_order_release);
0294         else {
0295             forwarder_busy = false;
0296             op->status.store(FAILED, std::memory_order_release);
0297         }
0298     }
0299 
0300     graph_task* internal_try_put_bypass( const input_type& t
0301                                          __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo))
0302     {
0303         operation_type op_data(t, tryput_bypass __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0304         my_aggregator.execute(&op_data);
0305         if( op_data.status == SUCCEEDED ) {
0306             return op_data.bypass_t;
0307         }
0308         return nullptr;
0309     }
0310 
0311     graph_task* try_put_task_base(const input_type& t
0312                                   __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo))
0313     {
0314         if ( my_is_no_throw )
0315             return try_put_task_impl(t, has_policy<lightweight, Policy>()
0316                                      __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0317         else
0318             return try_put_task_impl(t, std::false_type()
0319                                      __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0320     }
0321 
0322     graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::true_type
0323                                    __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo))
0324     {
0325         if( my_max_concurrency == 0 ) {
0326             return apply_body_bypass(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0327         } else {
0328             operation_type check_op(t, occupy_concurrency);
0329             my_aggregator.execute(&check_op);
0330             if( check_op.status == SUCCEEDED ) {
0331                 return apply_body_bypass(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0332             }
0333             return internal_try_put_bypass(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0334         }
0335     }
0336 
0337     graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::false_type
0338                                    __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo))
0339     {
0340         if( my_max_concurrency == 0 ) {
0341             return create_body_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0342         } else {
0343             return internal_try_put_bypass(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0344         }
0345     }
0346 
0347     //! Applies the body to the provided input
0348     //  then decides if more work is available
0349     graph_task* apply_body_bypass( const input_type &i
0350                                    __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo))
0351 
0352     {
0353         return static_cast<ImplType *>(this)->apply_body_impl_bypass(i __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0354     }
0355 
0356     //! allocates a task to apply a body
0357 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0358     template <typename Metainfo>
0359     graph_task* create_body_task( const input_type &input, Metainfo&& metainfo )
0360 #else
0361     graph_task* create_body_task( const input_type &input )
0362 #endif
0363     {
0364         if (!is_graph_active(my_graph_ref)) {
0365             return nullptr;
0366         }
0367         // TODO revamp: extract helper for common graph task allocation part
0368         d1::small_object_allocator allocator{};
0369         graph_task* t = nullptr;
0370 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0371         if (!metainfo.empty()) {
0372             using task_type = apply_body_task_bypass<class_type, input_type, trackable_messages_graph_task>;
0373             t = allocator.new_object<task_type>(my_graph_ref, allocator, *this, input, my_priority, std::forward<Metainfo>(metainfo));
0374         } else
0375 #endif
0376         {
0377             using task_type = apply_body_task_bypass<class_type, input_type>;
0378             t = allocator.new_object<task_type>(my_graph_ref, allocator, *this, input, my_priority);
0379         }
0380         return t;
0381     }
0382 
0383     //! This is executed by an enqueued task, the "forwarder"
0384     graph_task* forward_task() {
0385         operation_type op_data(try_fwd);
0386         graph_task* rval = nullptr;
0387         do {
0388             op_data.status = WAIT;
0389             my_aggregator.execute(&op_data);
0390             if(op_data.status == SUCCEEDED) {
0391                 graph_task* ttask = op_data.bypass_t;
0392                 __TBB_ASSERT( ttask && ttask != SUCCESSFULLY_ENQUEUED, nullptr);
0393                 rval = combine_tasks(my_graph_ref, rval, ttask);
0394             }
0395         } while (op_data.status == SUCCEEDED);
0396         return rval;
0397     }
0398 
0399     inline graph_task* create_forward_task() {
0400         if (!is_graph_active(my_graph_ref)) {
0401             return nullptr;
0402         }
0403         d1::small_object_allocator allocator{};
0404         typedef forward_task_bypass<class_type> task_type;
0405         graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, my_priority );
0406         return t;
0407     }
0408 
0409     //! Spawns a task that calls forward()
0410     inline void spawn_forward_task() {
0411         graph_task* tp = create_forward_task();
0412         if(tp) {
0413             spawn_in_graph_arena(graph_reference(), *tp);
0414         }
0415     }
0416 
0417     node_priority_t priority() const override { return my_priority; }
0418 };  // function_input_base
0419 
0420 //! Implements methods for a function node that takes a type Input as input and sends
0421 //  a type Output to its successors.
0422 template< typename Input, typename Output, typename Policy, typename A>
0423 class function_input : public function_input_base<Input, Policy, A, function_input<Input,Output,Policy,A> > {
0424 public:
0425     typedef Input input_type;
0426     typedef Output output_type;
0427     typedef function_body<input_type, output_type> function_body_type;
0428     typedef function_input<Input, Output, Policy,A> my_class;
0429     typedef function_input_base<Input, Policy, A, my_class> base_type;
0430     typedef function_input_queue<input_type, A> input_queue_type;
0431 
0432     // constructor
0433     template<typename Body>
0434     function_input(
0435         graph &g, size_t max_concurrency, Body& body, node_priority_t a_priority )
0436       : base_type(g, max_concurrency, a_priority, noexcept(tbb::detail::invoke(body, input_type())))
0437       , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
0438       , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) {
0439     }
0440 
0441     //! Copy constructor
0442     function_input( const function_input& src ) :
0443         base_type(src),
0444         my_body( src.my_init_body->clone() ),
0445         my_init_body(src.my_init_body->clone() ) {
0446     }
0447 #if __INTEL_COMPILER <= 2021
0448     // Suppress superfluous diagnostic about virtual keyword absence in a destructor of an inherited
0449     // class while the parent class has the virtual keyword for the destrocutor.
0450     virtual
0451 #endif
0452     ~function_input() {
0453         delete my_body;
0454         delete my_init_body;
0455     }
0456 
0457     template< typename Body >
0458     Body copy_function_object() {
0459         function_body_type &body_ref = *this->my_body;
0460         return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
0461     }
0462 
0463     output_type apply_body_impl( const input_type& i) {
0464         // There is an extra copied needed to capture the
0465         // body execution without the try_put
0466         fgt_begin_body( my_body );
0467         output_type v = tbb::detail::invoke(*my_body, i);
0468         fgt_end_body( my_body );
0469         return v;
0470     }
0471 
0472     //TODO: consider moving into the base class
0473     graph_task* apply_body_impl_bypass( const input_type &i
0474                                         __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo))
0475     {
0476         output_type v = apply_body_impl(i);
0477         graph_task* postponed_task = nullptr;
0478         if( base_type::my_max_concurrency != 0 ) {
0479             postponed_task = base_type::try_get_postponed_task(i);
0480             __TBB_ASSERT( !postponed_task || postponed_task != SUCCESSFULLY_ENQUEUED, nullptr);
0481         }
0482         if( postponed_task ) {
0483             // make the task available for other workers since we do not know successors'
0484             // execution policy
0485             spawn_in_graph_arena(base_type::graph_reference(), *postponed_task);
0486         }
0487         graph_task* successor_task = successors().try_put_task(v __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0488 #if _MSC_VER && !__INTEL_COMPILER
0489 #pragma warning (push)
0490 #pragma warning (disable: 4127)  /* suppress conditional expression is constant */
0491 #endif
0492         if(has_policy<lightweight, Policy>::value) {
0493 #if _MSC_VER && !__INTEL_COMPILER
0494 #pragma warning (pop)
0495 #endif
0496             if(!successor_task) {
0497                 // Return confirmative status since current
0498                 // node's body has been executed anyway
0499                 successor_task = SUCCESSFULLY_ENQUEUED;
0500             }
0501         }
0502         return successor_task;
0503     }
0504 
0505 protected:
0506 
0507     void reset_function_input(reset_flags f) {
0508         base_type::reset_function_input_base(f);
0509         if(f & rf_reset_bodies) {
0510             function_body_type *tmp = my_init_body->clone();
0511             delete my_body;
0512             my_body = tmp;
0513         }
0514     }
0515 
0516     function_body_type *my_body;
0517     function_body_type *my_init_body;
0518     virtual broadcast_cache<output_type > &successors() = 0;
0519 
0520 };  // function_input
0521 
0522 
0523 // helper templates to clear the successor edges of the output ports of an multifunction_node
0524 template<int N> struct clear_element {
0525     template<typename P> static void clear_this(P &p) {
0526         (void)std::get<N-1>(p).successors().clear();
0527         clear_element<N-1>::clear_this(p);
0528     }
0529 #if TBB_USE_ASSERT
0530     template<typename P> static bool this_empty(P &p) {
0531         if(std::get<N-1>(p).successors().empty())
0532             return clear_element<N-1>::this_empty(p);
0533         return false;
0534     }
0535 #endif
0536 };
0537 
0538 template<> struct clear_element<1> {
0539     template<typename P> static void clear_this(P &p) {
0540         (void)std::get<0>(p).successors().clear();
0541     }
0542 #if TBB_USE_ASSERT
0543     template<typename P> static bool this_empty(P &p) {
0544         return std::get<0>(p).successors().empty();
0545     }
0546 #endif
0547 };
0548 
0549 template <typename OutputTuple>
0550 struct init_output_ports {
0551     template <typename... Args>
0552     static OutputTuple call(graph& g, const std::tuple<Args...>&) {
0553         return OutputTuple(Args(g)...);
0554     }
0555 }; // struct init_output_ports
0556 
0557 //! Implements methods for a function node that takes a type Input as input
0558 //  and has a tuple of output ports specified.
0559 template< typename Input, typename OutputPortSet, typename Policy, typename A>
0560 class multifunction_input : public function_input_base<Input, Policy, A, multifunction_input<Input,OutputPortSet,Policy,A> > {
0561 public:
0562     static const int N = std::tuple_size<OutputPortSet>::value;
0563     typedef Input input_type;
0564     typedef OutputPortSet output_ports_type;
0565     typedef multifunction_body<input_type, output_ports_type> multifunction_body_type;
0566     typedef multifunction_input<Input, OutputPortSet, Policy, A> my_class;
0567     typedef function_input_base<Input, Policy, A, my_class> base_type;
0568     typedef function_input_queue<input_type, A> input_queue_type;
0569 
0570     // constructor
0571     template<typename Body>
0572     multifunction_input(graph &g, size_t max_concurrency,Body& body, node_priority_t a_priority )
0573       : base_type(g, max_concurrency, a_priority, noexcept(tbb::detail::invoke(body, input_type(), my_output_ports)))
0574       , my_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
0575       , my_init_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
0576       , my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports)){
0577     }
0578 
0579     //! Copy constructor
0580     multifunction_input( const multifunction_input& src ) :
0581         base_type(src),
0582         my_body( src.my_init_body->clone() ),
0583         my_init_body(src.my_init_body->clone() ),
0584         my_output_ports( init_output_ports<output_ports_type>::call(src.my_graph_ref, my_output_ports) ) {
0585     }
0586 
0587     ~multifunction_input() {
0588         delete my_body;
0589         delete my_init_body;
0590     }
0591 
0592     template< typename Body >
0593     Body copy_function_object() {
0594         multifunction_body_type &body_ref = *this->my_body;
0595         return *static_cast<Body*>(dynamic_cast< multifunction_body_leaf<input_type, output_ports_type, Body> & >(body_ref).get_body_ptr());
0596     }
0597 
0598     // for multifunction nodes we do not have a single successor as such.  So we just tell
0599     // the task we were successful.
0600     //TODO: consider moving common parts with implementation in function_input into separate function
0601     graph_task* apply_body_impl_bypass( const input_type &i
0602                                         __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo&) )
0603     {
0604         fgt_begin_body( my_body );
0605         (*my_body)(i, my_output_ports);
0606         fgt_end_body( my_body );
0607         graph_task* ttask = nullptr;
0608         if(base_type::my_max_concurrency != 0) {
0609             ttask = base_type::try_get_postponed_task(i);
0610         }
0611         return ttask ? ttask : SUCCESSFULLY_ENQUEUED;
0612     }
0613 
0614     output_ports_type &output_ports(){ return my_output_ports; }
0615 
0616 protected:
0617 
0618     void reset(reset_flags f) {
0619         base_type::reset_function_input_base(f);
0620         if(f & rf_clear_edges)clear_element<N>::clear_this(my_output_ports);
0621         if(f & rf_reset_bodies) {
0622             multifunction_body_type* tmp = my_init_body->clone();
0623             delete my_body;
0624             my_body = tmp;
0625         }
0626         __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "multifunction_node reset failed");
0627     }
0628 
0629     multifunction_body_type *my_body;
0630     multifunction_body_type *my_init_body;
0631     output_ports_type my_output_ports;
0632 
0633 };  // multifunction_input
0634 
0635 // template to refer to an output port of a multifunction_node
0636 template<size_t N, typename MOP>
0637 typename std::tuple_element<N, typename MOP::output_ports_type>::type &output_port(MOP &op) {
0638     return std::get<N>(op.output_ports());
0639 }
0640 
0641 inline void check_task_and_spawn(graph& g, graph_task* t) {
0642     if (t && t != SUCCESSFULLY_ENQUEUED) {
0643         spawn_in_graph_arena(g, *t);
0644     }
0645 }
0646 
0647 // helper structs for split_node
0648 template<int N>
0649 struct emit_element {
0650     template<typename T, typename P>
0651     static graph_task* emit_this(graph& g, const T &t, P &p) {
0652         // TODO: consider to collect all the tasks in task_list and spawn them all at once
0653         graph_task* last_task = std::get<N-1>(p).try_put_task(std::get<N-1>(t));
0654         check_task_and_spawn(g, last_task);
0655         return emit_element<N-1>::emit_this(g,t,p);
0656     }
0657 
0658 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0659     template <typename TupleType, typename PortsType>
0660     static graph_task* emit_this(graph& g, const TupleType& t, PortsType& p,
0661                                  const message_metainfo& metainfo)
0662     {
0663         // TODO: consider to collect all the tasks in task_list and spawn them all at once
0664         graph_task* last_task = std::get<N-1>(p).try_put_task(std::get<N-1>(t), metainfo);
0665         check_task_and_spawn(g, last_task);
0666         return emit_element<N-1>::emit_this(g, t, p, metainfo);
0667     }
0668 #endif
0669 };
0670 
0671 template<>
0672 struct emit_element<1> {
0673     template<typename T, typename P>
0674     static graph_task* emit_this(graph& g, const T &t, P &p) {
0675         graph_task* last_task = std::get<0>(p).try_put_task(std::get<0>(t));
0676         check_task_and_spawn(g, last_task);
0677         return SUCCESSFULLY_ENQUEUED;
0678     }
0679 
0680 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0681     template <typename TupleType, typename PortsType>
0682     static graph_task* emit_this(graph& g, const TupleType& t, PortsType& ports,
0683                                  const message_metainfo& metainfo)
0684     {
0685         graph_task* last_task = std::get<0>(ports).try_put_task(std::get<0>(t), metainfo);
0686         check_task_and_spawn(g, last_task);
0687         return SUCCESSFULLY_ENQUEUED;
0688     }
0689 #endif
0690 };
0691 
0692 //! Implements methods for an executable node that takes continue_msg as input
0693 template< typename Output, typename Policy>
0694 class continue_input : public continue_receiver {
0695 public:
0696 
0697     //! The input type of this receiver
0698     typedef continue_msg input_type;
0699 
0700     //! The output type of this receiver
0701     typedef Output output_type;
0702     typedef function_body<input_type, output_type> function_body_type;
0703     typedef continue_input<output_type, Policy> class_type;
0704 
0705     template< typename Body >
0706     continue_input( graph &g, Body& body, node_priority_t a_priority )
0707         : continue_receiver(/*number_of_predecessors=*/0, a_priority)
0708         , my_graph_ref(g)
0709         , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
0710         , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) )
0711     { }
0712 
0713     template< typename Body >
0714     continue_input( graph &g, int number_of_predecessors,
0715                     Body& body, node_priority_t a_priority )
0716       : continue_receiver( number_of_predecessors, a_priority )
0717       , my_graph_ref(g)
0718       , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
0719       , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) )
0720     { }
0721 
0722     continue_input( const continue_input& src ) : continue_receiver(src),
0723                                                   my_graph_ref(src.my_graph_ref),
0724                                                   my_body( src.my_init_body->clone() ),
0725                                                   my_init_body( src.my_init_body->clone() ) {}
0726 
0727     ~continue_input() {
0728         delete my_body;
0729         delete my_init_body;
0730     }
0731 
0732     template< typename Body >
0733     Body copy_function_object() {
0734         function_body_type &body_ref = *my_body;
0735         return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
0736     }
0737 
0738     void reset_receiver( reset_flags f) override {
0739         continue_receiver::reset_receiver(f);
0740         if(f & rf_reset_bodies) {
0741             function_body_type *tmp = my_init_body->clone();
0742             delete my_body;
0743             my_body = tmp;
0744         }
0745     }
0746 
0747 protected:
0748 
0749     graph& my_graph_ref;
0750     function_body_type *my_body;
0751     function_body_type *my_init_body;
0752 
0753     virtual broadcast_cache<output_type > &successors() = 0;
0754 
0755     friend class apply_body_task_bypass< class_type, continue_msg >;
0756 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0757     friend class apply_body_task_bypass< class_type, continue_msg, trackable_messages_graph_task >;
0758 #endif
0759 
0760     //! Applies the body to the provided input
0761     graph_task* apply_body_bypass( input_type __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) {
0762         // There is an extra copied needed to capture the
0763         // body execution without the try_put
0764         fgt_begin_body( my_body );
0765         output_type v = (*my_body)( continue_msg() );
0766         fgt_end_body( my_body );
0767         return successors().try_put_task( v __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo) );
0768     }
0769 
0770 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0771     graph_task* execute(const message_metainfo& metainfo) override {
0772 #else
0773     graph_task* execute() override {
0774 #endif
0775         if(!is_graph_active(my_graph_ref)) {
0776             return nullptr;
0777         }
0778 #if _MSC_VER && !__INTEL_COMPILER
0779 #pragma warning (push)
0780 #pragma warning (disable: 4127)  /* suppress conditional expression is constant */
0781 #endif
0782         if(has_policy<lightweight, Policy>::value) {
0783 #if _MSC_VER && !__INTEL_COMPILER
0784 #pragma warning (pop)
0785 #endif
0786             return apply_body_bypass( continue_msg() __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo) );
0787         }
0788         else {
0789             d1::small_object_allocator allocator{};
0790             graph_task* t = nullptr;
0791 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0792             if (!metainfo.empty()) {
0793                 using task_type = apply_body_task_bypass<class_type, continue_msg, trackable_messages_graph_task>;
0794                 t = allocator.new_object<task_type>( graph_reference(), allocator, *this, continue_msg(), my_priority, metainfo );
0795             } else
0796 #endif
0797             {
0798                 using task_type = apply_body_task_bypass<class_type, continue_msg>;
0799                 t = allocator.new_object<task_type>( graph_reference(), allocator, *this, continue_msg(), my_priority );
0800             }
0801             return t;
0802         }
0803     }
0804 
0805     graph& graph_reference() const override {
0806         return my_graph_ref;
0807     }
0808 };  // continue_input
0809 
0810 //! Implements methods for both executable and function nodes that puts Output to its successors
0811 template< typename Output >
0812 class function_output : public sender<Output> {
0813 public:
0814 
0815     template<int N> friend struct clear_element;
0816     typedef Output output_type;
0817     typedef typename sender<output_type>::successor_type successor_type;
0818     typedef broadcast_cache<output_type> broadcast_cache_type;
0819 
0820     function_output(graph& g) : my_successors(this), my_graph_ref(g) {}
0821     function_output(const function_output& other) = delete;
0822 
0823     //! Adds a new successor to this node
0824     bool register_successor( successor_type &r ) override {
0825         successors().register_successor( r );
0826         return true;
0827     }
0828 
0829     //! Removes a successor from this node
0830     bool remove_successor( successor_type &r ) override {
0831         successors().remove_successor( r );
0832         return true;
0833     }
0834 
0835     broadcast_cache_type &successors() { return my_successors; }
0836 
0837     graph& graph_reference() const { return my_graph_ref; }
0838 protected:
0839     broadcast_cache_type my_successors;
0840     graph& my_graph_ref;
0841 };  // function_output
0842 
0843 template< typename Output >
0844 class multifunction_output : public function_output<Output> {
0845 public:
0846     typedef Output output_type;
0847     typedef function_output<output_type> base_type;
0848     using base_type::my_successors;
0849 
0850     multifunction_output(graph& g) : base_type(g) {}
0851     multifunction_output(const multifunction_output& other) : base_type(other.my_graph_ref) {}
0852 
0853     bool try_put(const output_type &i) {
0854         graph_task *res = try_put_task(i);
0855         if( !res ) return false;
0856         if( res != SUCCESSFULLY_ENQUEUED ) {
0857             // wrapping in task_arena::execute() is not needed since the method is called from
0858             // inside task::execute()
0859             spawn_in_graph_arena(graph_reference(), *res);
0860         }
0861         return true;
0862     }
0863 
0864     using base_type::graph_reference;
0865 
0866 protected:
0867 
0868     graph_task* try_put_task(const output_type &i) {
0869         return my_successors.try_put_task(i);
0870     }
0871 
0872 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0873     graph_task* try_put_task(const output_type& i, const message_metainfo& metainfo) {
0874         return my_successors.try_put_task(i, metainfo);
0875     }
0876 #endif
0877 
0878     template <int N> friend struct emit_element;
0879 
0880 };  // multifunction_output
0881 
0882 //composite_node
0883 template<typename CompositeType>
0884 void add_nodes_impl(CompositeType*, bool) {}
0885 
0886 template< typename CompositeType, typename NodeType1, typename... NodeTypes >
0887 void add_nodes_impl(CompositeType *c_node, bool visible, const NodeType1& n1, const NodeTypes&... n) {
0888     void *addr = const_cast<NodeType1 *>(&n1);
0889 
0890     fgt_alias_port(c_node, addr, visible);
0891     add_nodes_impl(c_node, visible, n...);
0892 }
0893 
0894 #endif // __TBB__flow_graph_node_impl_H