Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:12:48

0001 /*
0002     Copyright (c) 2005-2020 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB__flow_graph_node_impl_H
0018 #define __TBB__flow_graph_node_impl_H
0019 
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023 
0024 #include "_flow_graph_item_buffer_impl.h"
0025 
0026 //! @cond INTERNAL
0027 namespace internal {
0028 
0029     using tbb::internal::aggregated_operation;
0030     using tbb::internal::aggregating_functor;
0031     using tbb::internal::aggregator;
0032 
0033      template< typename T, typename A >
0034      class function_input_queue : public item_buffer<T,A> {
0035      public:
0036          bool empty() const {
0037              return this->buffer_empty();
0038          }
0039 
0040          const T& front() const {
0041              return this->item_buffer<T, A>::front();
0042          }
0043 
0044          bool pop( T& t ) {
0045              return this->pop_front( t );
0046          }
0047 
0048          void pop() {
0049              this->destroy_front();
0050          }
0051 
0052          bool push( T& t ) {
0053              return this->push_back( t );
0054          }
0055      };
0056 
0057     //! Input and scheduling for a function node that takes a type Input as input
0058     //  The only up-ref is apply_body_impl, which should implement the function
0059     //  call and any handling of the result.
0060     template< typename Input, typename Policy, typename A, typename ImplType >
0061     class function_input_base : public receiver<Input>, tbb::internal::no_assign {
0062         enum op_type {reg_pred, rem_pred, try_fwd, tryput_bypass, app_body_bypass, occupy_concurrency
0063 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0064             , add_blt_pred, del_blt_pred,
0065             blt_pred_cnt, blt_pred_cpy   // create vector copies of preds and succs
0066 #endif
0067         };
0068         typedef function_input_base<Input, Policy, A, ImplType> class_type;
0069 
0070     public:
0071 
0072         //! The input type of this receiver
0073         typedef Input input_type;
0074         typedef typename receiver<input_type>::predecessor_type predecessor_type;
0075         typedef predecessor_cache<input_type, null_mutex > predecessor_cache_type;
0076         typedef function_input_queue<input_type, A> input_queue_type;
0077         typedef typename tbb::internal::allocator_rebind<A, input_queue_type>::type queue_allocator_type;
0078         __TBB_STATIC_ASSERT(!((internal::has_policy<queueing, Policy>::value) && (internal::has_policy<rejecting, Policy>::value)),
0079                               "queueing and rejecting policies can't be specified simultaneously");
0080 
0081 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0082         typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
0083         typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
0084 #endif
0085 
0086         //! Constructor for function_input_base
0087         function_input_base(
0088             graph &g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(size_t max_concurrency, node_priority_t priority)
0089         ) : my_graph_ref(g), my_max_concurrency(max_concurrency)
0090           , __TBB_FLOW_GRAPH_PRIORITY_ARG1(my_concurrency(0), my_priority(priority))
0091           , my_queue(!internal::has_policy<rejecting, Policy>::value ? new input_queue_type() : NULL)
0092           , forwarder_busy(false)
0093         {
0094             my_predecessors.set_owner(this);
0095             my_aggregator.initialize_handler(handler_type(this));
0096         }
0097 
0098         //! Copy constructor
0099         function_input_base( const function_input_base& src)
0100             : receiver<Input>(), tbb::internal::no_assign()
0101             , my_graph_ref(src.my_graph_ref), my_max_concurrency(src.my_max_concurrency)
0102             , __TBB_FLOW_GRAPH_PRIORITY_ARG1(my_concurrency(0), my_priority(src.my_priority))
0103             , my_queue(src.my_queue ? new input_queue_type() : NULL), forwarder_busy(false)
0104         {
0105             my_predecessors.set_owner(this);
0106             my_aggregator.initialize_handler(handler_type(this));
0107         }
0108 
0109         //! Destructor
0110         // The queue is allocated by the constructor for {multi}function_node.
0111         // TODO: pass the graph_buffer_policy to the base so it can allocate the queue instead.
0112         // This would be an interface-breaking change.
0113         virtual ~function_input_base() {
0114             if ( my_queue ) delete my_queue;
0115         }
0116 
0117         task* try_put_task( const input_type& t) __TBB_override {
0118             return try_put_task_impl(t, internal::has_policy<lightweight, Policy>());
0119         }
0120 
0121         //! Adds src to the list of cached predecessors.
0122         bool register_predecessor( predecessor_type &src ) __TBB_override {
0123             operation_type op_data(reg_pred);
0124             op_data.r = &src;
0125             my_aggregator.execute(&op_data);
0126             return true;
0127         }
0128 
0129         //! Removes src from the list of cached predecessors.
0130         bool remove_predecessor( predecessor_type &src ) __TBB_override {
0131             operation_type op_data(rem_pred);
0132             op_data.r = &src;
0133             my_aggregator.execute(&op_data);
0134             return true;
0135         }
0136 
0137 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0138         //! Adds to list of predecessors added by make_edge
0139         void internal_add_built_predecessor( predecessor_type &src) __TBB_override {
0140             operation_type op_data(add_blt_pred);
0141             op_data.r = &src;
0142             my_aggregator.execute(&op_data);
0143         }
0144 
0145         //! removes from to list of predecessors (used by remove_edge)
0146         void internal_delete_built_predecessor( predecessor_type &src) __TBB_override {
0147             operation_type op_data(del_blt_pred);
0148             op_data.r = &src;
0149             my_aggregator.execute(&op_data);
0150         }
0151 
0152         size_t predecessor_count() __TBB_override {
0153             operation_type op_data(blt_pred_cnt);
0154             my_aggregator.execute(&op_data);
0155             return op_data.cnt_val;
0156         }
0157 
0158         void copy_predecessors(predecessor_list_type &v) __TBB_override {
0159             operation_type op_data(blt_pred_cpy);
0160             op_data.predv = &v;
0161             my_aggregator.execute(&op_data);
0162         }
0163 
0164         built_predecessors_type &built_predecessors() __TBB_override {
0165             return my_predecessors.built_predecessors();
0166         }
0167 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
0168 
0169     protected:
0170 
0171         void reset_function_input_base( reset_flags f) {
0172             my_concurrency = 0;
0173             if(my_queue) {
0174                 my_queue->reset();
0175             }
0176             reset_receiver(f);
0177             forwarder_busy = false;
0178         }
0179 
0180         graph& my_graph_ref;
0181         const size_t my_max_concurrency;
0182         size_t my_concurrency;
0183         __TBB_FLOW_GRAPH_PRIORITY_EXPR( node_priority_t my_priority; )
0184         input_queue_type *my_queue;
0185         predecessor_cache<input_type, null_mutex > my_predecessors;
0186 
0187         void reset_receiver( reset_flags f) __TBB_override {
0188             if( f & rf_clear_edges) my_predecessors.clear();
0189             else
0190                 my_predecessors.reset();
0191             __TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.empty(), "function_input_base reset failed");
0192         }
0193 
0194         graph& graph_reference() const __TBB_override {
0195             return my_graph_ref;
0196         }
0197 
0198         task* try_get_postponed_task(const input_type& i) {
0199             operation_type op_data(i, app_body_bypass);  // tries to pop an item or get_item
0200             my_aggregator.execute(&op_data);
0201             return op_data.bypass_t;
0202         }
0203 
0204     private:
0205 
0206         friend class apply_body_task_bypass< class_type, input_type >;
0207         friend class forward_task_bypass< class_type >;
0208 
0209         class operation_type : public aggregated_operation< operation_type > {
0210         public:
0211             char type;
0212             union {
0213                 input_type *elem;
0214                 predecessor_type *r;
0215 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0216                 size_t cnt_val;
0217                 predecessor_list_type *predv;
0218 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
0219             };
0220             tbb::task *bypass_t;
0221             operation_type(const input_type& e, op_type t) :
0222                 type(char(t)), elem(const_cast<input_type*>(&e)) {}
0223             operation_type(op_type t) : type(char(t)), r(NULL) {}
0224         };
0225 
0226         bool forwarder_busy;
0227         typedef internal::aggregating_functor<class_type, operation_type> handler_type;
0228         friend class internal::aggregating_functor<class_type, operation_type>;
0229         aggregator< handler_type, operation_type > my_aggregator;
0230 
0231         task* perform_queued_requests() {
0232             task* new_task = NULL;
0233             if(my_queue) {
0234                 if(!my_queue->empty()) {
0235                     ++my_concurrency;
0236                     new_task = create_body_task(my_queue->front());
0237 
0238                     my_queue->pop();
0239                 }
0240             }
0241             else {
0242                 input_type i;
0243                 if(my_predecessors.get_item(i)) {
0244                     ++my_concurrency;
0245                     new_task = create_body_task(i);
0246                 }
0247             }
0248             return new_task;
0249         }
0250         void handle_operations(operation_type *op_list) {
0251             operation_type *tmp;
0252             while (op_list) {
0253                 tmp = op_list;
0254                 op_list = op_list->next;
0255                 switch (tmp->type) {
0256                 case reg_pred:
0257                     my_predecessors.add(*(tmp->r));
0258                     __TBB_store_with_release(tmp->status, SUCCEEDED);
0259                     if (!forwarder_busy) {
0260                         forwarder_busy = true;
0261                         spawn_forward_task();
0262                     }
0263                     break;
0264                 case rem_pred:
0265                     my_predecessors.remove(*(tmp->r));
0266                     __TBB_store_with_release(tmp->status, SUCCEEDED);
0267                     break;
0268                 case app_body_bypass: {
0269                         tmp->bypass_t = NULL;
0270                         __TBB_ASSERT(my_max_concurrency != 0, NULL);
0271                         --my_concurrency;
0272                         if(my_concurrency<my_max_concurrency)
0273                             tmp->bypass_t = perform_queued_requests();
0274 
0275                         __TBB_store_with_release(tmp->status, SUCCEEDED);
0276                     }
0277                     break;
0278                 case tryput_bypass: internal_try_put_task(tmp);  break;
0279                 case try_fwd: internal_forward(tmp);  break;
0280                 case occupy_concurrency:
0281                     if (my_concurrency < my_max_concurrency) {
0282                         ++my_concurrency;
0283                         __TBB_store_with_release(tmp->status, SUCCEEDED);
0284                     } else {
0285                         __TBB_store_with_release(tmp->status, FAILED);
0286                     }
0287                     break;
0288 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0289                 case add_blt_pred: {
0290                          my_predecessors.internal_add_built_predecessor(*(tmp->r));
0291                         __TBB_store_with_release(tmp->status, SUCCEEDED);
0292                     }
0293                     break;
0294                 case del_blt_pred:
0295                     my_predecessors.internal_delete_built_predecessor(*(tmp->r));
0296                     __TBB_store_with_release(tmp->status, SUCCEEDED);
0297                     break;
0298                 case blt_pred_cnt:
0299                     tmp->cnt_val = my_predecessors.predecessor_count();
0300                     __TBB_store_with_release(tmp->status, SUCCEEDED);
0301                     break;
0302                 case blt_pred_cpy:
0303                     my_predecessors.copy_predecessors( *(tmp->predv) );
0304                     __TBB_store_with_release(tmp->status, SUCCEEDED);
0305                     break;
0306 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
0307                 }
0308             }
0309         }
0310 
0311         //! Put to the node, but return the task instead of enqueueing it
0312         void internal_try_put_task(operation_type *op) {
0313             __TBB_ASSERT(my_max_concurrency != 0, NULL);
0314             if (my_concurrency < my_max_concurrency) {
0315                ++my_concurrency;
0316                task * new_task = create_body_task(*(op->elem));
0317                op->bypass_t = new_task;
0318                __TBB_store_with_release(op->status, SUCCEEDED);
0319            } else if ( my_queue && my_queue->push(*(op->elem)) ) {
0320                op->bypass_t = SUCCESSFULLY_ENQUEUED;
0321                __TBB_store_with_release(op->status, SUCCEEDED);
0322            } else {
0323                op->bypass_t = NULL;
0324                __TBB_store_with_release(op->status, FAILED);
0325            }
0326         }
0327 
0328         //! Creates tasks for postponed messages if available and if concurrency allows
0329         void internal_forward(operation_type *op) {
0330             op->bypass_t = NULL;
0331             if (my_concurrency < my_max_concurrency || !my_max_concurrency)
0332                 op->bypass_t = perform_queued_requests();
0333             if(op->bypass_t)
0334                 __TBB_store_with_release(op->status, SUCCEEDED);
0335             else {
0336                 forwarder_busy = false;
0337                 __TBB_store_with_release(op->status, FAILED);
0338             }
0339         }
0340 
0341         task* internal_try_put_bypass( const input_type& t ) {
0342             operation_type op_data(t, tryput_bypass);
0343             my_aggregator.execute(&op_data);
0344             if( op_data.status == internal::SUCCEEDED ) {
0345                 return op_data.bypass_t;
0346             }
0347             return NULL;
0348         }
0349 
0350         task* try_put_task_impl( const input_type& t, /*lightweight=*/tbb::internal::true_type ) {
0351             if( my_max_concurrency == 0 ) {
0352                 return apply_body_bypass(t);
0353             } else {
0354                 operation_type check_op(t, occupy_concurrency);
0355                 my_aggregator.execute(&check_op);
0356                 if( check_op.status == internal::SUCCEEDED ) {
0357                     return apply_body_bypass(t);
0358                 }
0359                 return internal_try_put_bypass(t);
0360             }
0361         }
0362 
0363         task* try_put_task_impl( const input_type& t, /*lightweight=*/tbb::internal::false_type ) {
0364             if( my_max_concurrency == 0 ) {
0365                 return create_body_task(t);
0366             } else {
0367                 return internal_try_put_bypass(t);
0368             }
0369         }
0370 
0371         //! Applies the body to the provided input
0372         //  then decides if more work is available
0373         task * apply_body_bypass( const input_type &i ) {
0374             return static_cast<ImplType *>(this)->apply_body_impl_bypass(i);
0375         }
0376 
0377         //! allocates a task to apply a body
0378         inline task * create_body_task( const input_type &input ) {
0379             return (internal::is_graph_active(my_graph_ref)) ?
0380                 new( task::allocate_additional_child_of(*(my_graph_ref.root_task())) )
0381                 apply_body_task_bypass < class_type, input_type >(
0382                     *this, __TBB_FLOW_GRAPH_PRIORITY_ARG1(input, my_priority))
0383                 : NULL;
0384         }
0385 
0386        //! This is executed by an enqueued task, the "forwarder"
0387        task* forward_task() {
0388            operation_type op_data(try_fwd);
0389            task* rval = NULL;
0390            do {
0391                op_data.status = WAIT;
0392                my_aggregator.execute(&op_data);
0393                if(op_data.status == SUCCEEDED) {
0394                    task* ttask = op_data.bypass_t;
0395                    __TBB_ASSERT( ttask && ttask != SUCCESSFULLY_ENQUEUED, NULL );
0396                    rval = combine_tasks(my_graph_ref, rval, ttask);
0397                }
0398            } while (op_data.status == SUCCEEDED);
0399            return rval;
0400        }
0401 
0402        inline task *create_forward_task() {
0403            return (internal::is_graph_active(my_graph_ref)) ?
0404                new( task::allocate_additional_child_of(*(my_graph_ref.root_task())) )
0405                forward_task_bypass< class_type >( __TBB_FLOW_GRAPH_PRIORITY_ARG1(*this, my_priority) )
0406                : NULL;
0407        }
0408 
0409        //! Spawns a task that calls forward()
0410        inline void spawn_forward_task() {
0411            task* tp = create_forward_task();
0412            if(tp) {
0413                internal::spawn_in_graph_arena(graph_reference(), *tp);
0414            }
0415        }
0416     };  // function_input_base
0417 
0418     //! Implements methods for a function node that takes a type Input as input and sends
0419     //  a type Output to its successors.
0420     template< typename Input, typename Output, typename Policy, typename A>
0421     class function_input : public function_input_base<Input, Policy, A, function_input<Input,Output,Policy,A> > {
0422     public:
0423         typedef Input input_type;
0424         typedef Output output_type;
0425         typedef function_body<input_type, output_type> function_body_type;
0426         typedef function_input<Input, Output, Policy,A> my_class;
0427         typedef function_input_base<Input, Policy, A, my_class> base_type;
0428         typedef function_input_queue<input_type, A> input_queue_type;
0429 
0430         // constructor
0431         template<typename Body>
0432         function_input(
0433             graph &g, size_t max_concurrency,
0434             __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body& body, node_priority_t priority)
0435         ) : base_type(g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(max_concurrency, priority))
0436           , my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
0437           , my_init_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ) {
0438         }
0439 
0440         //! Copy constructor
0441         function_input( const function_input& src ) :
0442                 base_type(src),
0443                 my_body( src.my_init_body->clone() ),
0444                 my_init_body(src.my_init_body->clone() ) {
0445         }
0446 
0447         ~function_input() {
0448             delete my_body;
0449             delete my_init_body;
0450         }
0451 
0452         template< typename Body >
0453         Body copy_function_object() {
0454             function_body_type &body_ref = *this->my_body;
0455             return dynamic_cast< internal::function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
0456         }
0457 
0458         output_type apply_body_impl( const input_type& i) {
0459             // There is an extra copied needed to capture the
0460             // body execution without the try_put
0461             tbb::internal::fgt_begin_body( my_body );
0462             output_type v = (*my_body)(i);
0463             tbb::internal::fgt_end_body( my_body );
0464             return v;
0465         }
0466 
0467         //TODO: consider moving into the base class
0468         task * apply_body_impl_bypass( const input_type &i) {
0469             output_type v = apply_body_impl(i);
0470 #if TBB_DEPRECATED_MESSAGE_FLOW_ORDER
0471             task* successor_task = successors().try_put_task(v);
0472 #endif
0473             task* postponed_task = NULL;
0474             if( base_type::my_max_concurrency != 0 ) {
0475                 postponed_task = base_type::try_get_postponed_task(i);
0476                 __TBB_ASSERT( !postponed_task || postponed_task != SUCCESSFULLY_ENQUEUED, NULL );
0477             }
0478 #if TBB_DEPRECATED_MESSAGE_FLOW_ORDER
0479             graph& g = base_type::my_graph_ref;
0480             return combine_tasks(g, successor_task, postponed_task);
0481 #else
0482             if( postponed_task ) {
0483                 // make the task available for other workers since we do not know successors'
0484                 // execution policy
0485                 internal::spawn_in_graph_arena(base_type::graph_reference(), *postponed_task);
0486             }
0487             task* successor_task = successors().try_put_task(v);
0488 #if _MSC_VER && !__INTEL_COMPILER
0489 #pragma warning (push)
0490 #pragma warning (disable: 4127)  /* suppress conditional expression is constant */
0491 #endif
0492             if(internal::has_policy<lightweight, Policy>::value) {
0493 #if _MSC_VER && !__INTEL_COMPILER
0494 #pragma warning (pop)
0495 #endif
0496                 if(!successor_task) {
0497                     // Return confirmative status since current
0498                     // node's body has been executed anyway
0499                     successor_task = SUCCESSFULLY_ENQUEUED;
0500                 }
0501             }
0502             return successor_task;
0503 #endif /* TBB_DEPRECATED_MESSAGE_FLOW_ORDER */
0504         }
0505 
0506     protected:
0507 
0508         void reset_function_input(reset_flags f) {
0509             base_type::reset_function_input_base(f);
0510             if(f & rf_reset_bodies) {
0511                 function_body_type *tmp = my_init_body->clone();
0512                 delete my_body;
0513                 my_body = tmp;
0514             }
0515         }
0516 
0517         function_body_type *my_body;
0518         function_body_type *my_init_body;
0519         virtual broadcast_cache<output_type > &successors() = 0;
0520 
0521     };  // function_input
0522 
0523 
0524     // helper templates to clear the successor edges of the output ports of an multifunction_node
0525     template<int N> struct clear_element {
0526         template<typename P> static void clear_this(P &p) {
0527             (void)tbb::flow::get<N-1>(p).successors().clear();
0528             clear_element<N-1>::clear_this(p);
0529         }
0530         template<typename P> static bool this_empty(P &p) {
0531             if(tbb::flow::get<N-1>(p).successors().empty())
0532                 return clear_element<N-1>::this_empty(p);
0533             return false;
0534         }
0535     };
0536 
0537     template<> struct clear_element<1> {
0538         template<typename P> static void clear_this(P &p) {
0539             (void)tbb::flow::get<0>(p).successors().clear();
0540         }
0541         template<typename P> static bool this_empty(P &p) {
0542             return tbb::flow::get<0>(p).successors().empty();
0543         }
0544     };
0545 
0546 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0547     // helper templates to extract the output ports of an multifunction_node from graph
0548     template<int N> struct extract_element {
0549         template<typename P> static void extract_this(P &p) {
0550             (void)tbb::flow::get<N-1>(p).successors().built_successors().sender_extract(tbb::flow::get<N-1>(p));
0551             extract_element<N-1>::extract_this(p);
0552         }
0553     };
0554 
0555     template<> struct extract_element<1> {
0556         template<typename P> static void extract_this(P &p) {
0557             (void)tbb::flow::get<0>(p).successors().built_successors().sender_extract(tbb::flow::get<0>(p));
0558         }
0559     };
0560 #endif
0561 
0562     template <typename OutputTuple>
0563     struct init_output_ports {
0564 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
0565         template <typename... Args>
0566         static OutputTuple call(graph& g, const tbb::flow::tuple<Args...>&) {
0567             return OutputTuple(Args(g)...);
0568         }
0569 #else // __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
0570         template <typename T1>
0571         static OutputTuple call(graph& g, const tbb::flow::tuple<T1>&) {
0572             return OutputTuple(T1(g));
0573         }
0574 
0575         template <typename T1, typename T2>
0576         static OutputTuple call(graph& g, const tbb::flow::tuple<T1, T2>&) {
0577             return OutputTuple(T1(g), T2(g));
0578         }
0579 
0580         template <typename T1, typename T2, typename T3>
0581         static OutputTuple call(graph& g, const tbb::flow::tuple<T1, T2, T3>&) {
0582             return OutputTuple(T1(g), T2(g), T3(g));
0583         }
0584 
0585         template <typename T1, typename T2, typename T3, typename T4>
0586         static OutputTuple call(graph& g, const tbb::flow::tuple<T1, T2, T3, T4>&) {
0587             return OutputTuple(T1(g), T2(g), T3(g), T4(g));
0588         }
0589 
0590         template <typename T1, typename T2, typename T3, typename T4, typename T5>
0591         static OutputTuple call(graph& g, const tbb::flow::tuple<T1, T2, T3, T4, T5>&) {
0592             return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g));
0593         }
0594 #if __TBB_VARIADIC_MAX >= 6
0595         template <typename T1, typename T2, typename T3, typename T4, typename T5, typename T6>
0596         static OutputTuple call(graph& g, const tbb::flow::tuple<T1, T2, T3, T4, T5, T6>&) {
0597             return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g), T6(g));
0598         }
0599 #endif
0600 #if __TBB_VARIADIC_MAX >= 7
0601         template <typename T1, typename T2, typename T3, typename T4,
0602                   typename T5, typename T6, typename T7>
0603         static OutputTuple call(graph& g,
0604                                 const tbb::flow::tuple<T1, T2, T3, T4, T5, T6, T7>&) {
0605             return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g), T6(g), T7(g));
0606         }
0607 #endif
0608 #if __TBB_VARIADIC_MAX >= 8
0609         template <typename T1, typename T2, typename T3, typename T4,
0610                   typename T5, typename T6, typename T7, typename T8>
0611         static OutputTuple call(graph& g,
0612                                 const tbb::flow::tuple<T1, T2, T3, T4, T5, T6, T7, T8>&) {
0613             return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g), T6(g), T7(g), T8(g));
0614         }
0615 #endif
0616 #if __TBB_VARIADIC_MAX >= 9
0617         template <typename T1, typename T2, typename T3, typename T4,
0618                   typename T5, typename T6, typename T7, typename T8, typename T9>
0619         static OutputTuple call(graph& g,
0620                                 const tbb::flow::tuple<T1, T2, T3, T4, T5, T6, T7, T8, T9>&) {
0621             return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g), T6(g), T7(g), T8(g), T9(g));
0622         }
0623 #endif
0624 #if __TBB_VARIADIC_MAX >= 9
0625         template <typename T1, typename T2, typename T3, typename T4, typename T5,
0626                   typename T6, typename T7, typename T8, typename T9, typename T10>
0627         static OutputTuple call(graph& g,
0628                                 const tbb::flow::tuple<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>&) {
0629             return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g), T6(g), T7(g), T8(g), T9(g), T10(g));
0630         }
0631 #endif
0632 #endif // __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
0633     }; // struct init_output_ports
0634 
0635     //! Implements methods for a function node that takes a type Input as input
0636     //  and has a tuple of output ports specified.
0637     template< typename Input, typename OutputPortSet, typename Policy, typename A>
0638     class multifunction_input : public function_input_base<Input, Policy, A, multifunction_input<Input,OutputPortSet,Policy,A> > {
0639     public:
0640         static const int N = tbb::flow::tuple_size<OutputPortSet>::value;
0641         typedef Input input_type;
0642         typedef OutputPortSet output_ports_type;
0643         typedef multifunction_body<input_type, output_ports_type> multifunction_body_type;
0644         typedef multifunction_input<Input, OutputPortSet, Policy, A> my_class;
0645         typedef function_input_base<Input, Policy, A, my_class> base_type;
0646         typedef function_input_queue<input_type, A> input_queue_type;
0647 
0648         // constructor
0649         template<typename Body>
0650         multifunction_input(graph &g, size_t max_concurrency,
0651                             __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body& body, node_priority_t priority)
0652         ) : base_type(g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(max_concurrency, priority))
0653           , my_body( new internal::multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
0654           , my_init_body( new internal::multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
0655           , my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports)){
0656         }
0657 
0658         //! Copy constructor
0659         multifunction_input( const multifunction_input& src ) :
0660                 base_type(src),
0661                 my_body( src.my_init_body->clone() ),
0662                 my_init_body(src.my_init_body->clone() ),
0663                 my_output_ports( init_output_ports<output_ports_type>::call(src.my_graph_ref, my_output_ports) ) {
0664         }
0665 
0666         ~multifunction_input() {
0667             delete my_body;
0668             delete my_init_body;
0669         }
0670 
0671         template< typename Body >
0672         Body copy_function_object() {
0673             multifunction_body_type &body_ref = *this->my_body;
0674             return *static_cast<Body*>(dynamic_cast< internal::multifunction_body_leaf<input_type, output_ports_type, Body> & >(body_ref).get_body_ptr());
0675         }
0676 
0677         // for multifunction nodes we do not have a single successor as such.  So we just tell
0678         // the task we were successful.
0679         //TODO: consider moving common parts with implementation in function_input into separate function
0680         task * apply_body_impl_bypass( const input_type &i) {
0681             tbb::internal::fgt_begin_body( my_body );
0682             (*my_body)(i, my_output_ports);
0683             tbb::internal::fgt_end_body( my_body );
0684             task* ttask = NULL;
0685             if(base_type::my_max_concurrency != 0) {
0686                 ttask = base_type::try_get_postponed_task(i);
0687             }
0688             return ttask ? ttask : SUCCESSFULLY_ENQUEUED;
0689         }
0690 
0691         output_ports_type &output_ports(){ return my_output_ports; }
0692 
0693     protected:
0694 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0695         void extract() {
0696             extract_element<N>::extract_this(my_output_ports);
0697         }
0698 #endif
0699 
0700         void reset(reset_flags f) {
0701             base_type::reset_function_input_base(f);
0702             if(f & rf_clear_edges)clear_element<N>::clear_this(my_output_ports);
0703             if(f & rf_reset_bodies) {
0704                 multifunction_body_type *tmp = my_init_body->clone();
0705                 delete my_body;
0706                 my_body = tmp;
0707             }
0708             __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "multifunction_node reset failed");
0709         }
0710 
0711         multifunction_body_type *my_body;
0712         multifunction_body_type *my_init_body;
0713         output_ports_type my_output_ports;
0714 
0715     };  // multifunction_input
0716 
0717     // template to refer to an output port of a multifunction_node
0718     template<size_t N, typename MOP>
0719     typename tbb::flow::tuple_element<N, typename MOP::output_ports_type>::type &output_port(MOP &op) {
0720         return tbb::flow::get<N>(op.output_ports());
0721     }
0722 
0723     inline void check_task_and_spawn(graph& g, task* t) {
0724         if (t && t != SUCCESSFULLY_ENQUEUED) {
0725             internal::spawn_in_graph_arena(g, *t);
0726         }
0727     }
0728 
0729     // helper structs for split_node
0730     template<int N>
0731     struct emit_element {
0732         template<typename T, typename P>
0733         static task* emit_this(graph& g, const T &t, P &p) {
0734             // TODO: consider to collect all the tasks in task_list and spawn them all at once
0735             task* last_task = tbb::flow::get<N-1>(p).try_put_task(tbb::flow::get<N-1>(t));
0736             check_task_and_spawn(g, last_task);
0737             return emit_element<N-1>::emit_this(g,t,p);
0738         }
0739     };
0740 
0741     template<>
0742     struct emit_element<1> {
0743         template<typename T, typename P>
0744         static task* emit_this(graph& g, const T &t, P &p) {
0745             task* last_task = tbb::flow::get<0>(p).try_put_task(tbb::flow::get<0>(t));
0746             check_task_and_spawn(g, last_task);
0747             return SUCCESSFULLY_ENQUEUED;
0748         }
0749     };
0750 
0751     //! Implements methods for an executable node that takes continue_msg as input
0752     template< typename Output, typename Policy>
0753     class continue_input : public continue_receiver {
0754     public:
0755 
0756         //! The input type of this receiver
0757         typedef continue_msg input_type;
0758 
0759         //! The output type of this receiver
0760         typedef Output output_type;
0761         typedef function_body<input_type, output_type> function_body_type;
0762         typedef continue_input<output_type, Policy> class_type;
0763 
0764         template< typename Body >
0765         continue_input( graph &g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body& body, node_priority_t priority) )
0766             : continue_receiver(__TBB_FLOW_GRAPH_PRIORITY_ARG1(/*number_of_predecessors=*/0, priority))
0767             , my_graph_ref(g)
0768             , my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
0769             , my_init_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
0770             { }
0771 
0772         template< typename Body >
0773         continue_input( graph &g, int number_of_predecessors,
0774                         __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body& body, node_priority_t priority)
0775         ) : continue_receiver( __TBB_FLOW_GRAPH_PRIORITY_ARG1(number_of_predecessors, priority) )
0776           , my_graph_ref(g)
0777           , my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
0778           , my_init_body( new internal::function_body_leaf< input_type, output_type, Body>(body) )
0779         { }
0780 
0781         continue_input( const continue_input& src ) : continue_receiver(src),
0782             my_graph_ref(src.my_graph_ref),
0783             my_body( src.my_init_body->clone() ),
0784             my_init_body( src.my_init_body->clone() ) {}
0785 
0786         ~continue_input() {
0787             delete my_body;
0788             delete my_init_body;
0789         }
0790 
0791         template< typename Body >
0792         Body copy_function_object() {
0793             function_body_type &body_ref = *my_body;
0794             return dynamic_cast< internal::function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
0795         }
0796 
0797         void reset_receiver( reset_flags f) __TBB_override {
0798             continue_receiver::reset_receiver(f);
0799             if(f & rf_reset_bodies) {
0800                 function_body_type *tmp = my_init_body->clone();
0801                 delete my_body;
0802                 my_body = tmp;
0803             }
0804         }
0805 
0806     protected:
0807 
0808         graph& my_graph_ref;
0809         function_body_type *my_body;
0810         function_body_type *my_init_body;
0811 
0812         virtual broadcast_cache<output_type > &successors() = 0;
0813 
0814         friend class apply_body_task_bypass< class_type, continue_msg >;
0815 
0816         //! Applies the body to the provided input
0817         task *apply_body_bypass( input_type ) {
0818             // There is an extra copied needed to capture the
0819             // body execution without the try_put
0820             tbb::internal::fgt_begin_body( my_body );
0821             output_type v = (*my_body)( continue_msg() );
0822             tbb::internal::fgt_end_body( my_body );
0823             return successors().try_put_task( v );
0824         }
0825 
0826         task* execute() __TBB_override {
0827             if(!internal::is_graph_active(my_graph_ref)) {
0828                 return NULL;
0829             }
0830 #if _MSC_VER && !__INTEL_COMPILER
0831 #pragma warning (push)
0832 #pragma warning (disable: 4127)  /* suppress conditional expression is constant */
0833 #endif
0834             if(internal::has_policy<lightweight, Policy>::value) {
0835 #if _MSC_VER && !__INTEL_COMPILER
0836 #pragma warning (pop)
0837 #endif
0838                 return apply_body_bypass( continue_msg() );
0839             }
0840             else {
0841                 return new ( task::allocate_additional_child_of( *(my_graph_ref.root_task()) ) )
0842                        apply_body_task_bypass< class_type, continue_msg >(
0843                            *this, __TBB_FLOW_GRAPH_PRIORITY_ARG1(continue_msg(), my_priority) );
0844             }
0845         }
0846 
0847         graph& graph_reference() const __TBB_override {
0848             return my_graph_ref;
0849         }
0850     };  // continue_input
0851 
0852     //! Implements methods for both executable and function nodes that puts Output to its successors
0853     template< typename Output >
0854     class function_output : public sender<Output> {
0855     public:
0856 
0857         template<int N> friend struct clear_element;
0858         typedef Output output_type;
0859         typedef typename sender<output_type>::successor_type successor_type;
0860         typedef broadcast_cache<output_type> broadcast_cache_type;
0861 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0862         typedef typename sender<output_type>::built_successors_type built_successors_type;
0863         typedef typename sender<output_type>::successor_list_type successor_list_type;
0864 #endif
0865 
0866         function_output( graph& g) : my_graph_ref(g) { my_successors.set_owner(this); }
0867         function_output(const function_output & other) : sender<output_type>(), my_graph_ref(other.my_graph_ref) {
0868             my_successors.set_owner(this);
0869         }
0870 
0871         //! Adds a new successor to this node
0872         bool register_successor( successor_type &r ) __TBB_override {
0873             successors().register_successor( r );
0874             return true;
0875         }
0876 
0877         //! Removes a successor from this node
0878         bool remove_successor( successor_type &r ) __TBB_override {
0879             successors().remove_successor( r );
0880             return true;
0881         }
0882 
0883 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0884         built_successors_type &built_successors() __TBB_override { return successors().built_successors(); }
0885 
0886 
0887         void internal_add_built_successor( successor_type &r) __TBB_override {
0888             successors().internal_add_built_successor( r );
0889         }
0890 
0891         void internal_delete_built_successor( successor_type &r) __TBB_override {
0892             successors().internal_delete_built_successor( r );
0893         }
0894 
0895         size_t successor_count() __TBB_override {
0896             return successors().successor_count();
0897         }
0898 
0899         void  copy_successors( successor_list_type &v) __TBB_override {
0900             successors().copy_successors(v);
0901         }
0902 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
0903 
0904         // for multifunction_node.  The function_body that implements
0905         // the node will have an input and an output tuple of ports.  To put
0906         // an item to a successor, the body should
0907         //
0908         //    get<I>(output_ports).try_put(output_value);
0909         //
0910         // if task pointer is returned will always spawn and return true, else
0911         // return value will be bool returned from successors.try_put.
0912         task *try_put_task(const output_type &i) { // not a virtual method in this class
0913             return my_successors.try_put_task(i);
0914         }
0915 
0916         broadcast_cache_type &successors() { return my_successors; }
0917 
0918         graph& graph_reference() const { return my_graph_ref; }
0919     protected:
0920         broadcast_cache_type my_successors;
0921         graph& my_graph_ref;
0922     };  // function_output
0923 
0924     template< typename Output >
0925     class multifunction_output : public function_output<Output> {
0926     public:
0927         typedef Output output_type;
0928         typedef function_output<output_type> base_type;
0929         using base_type::my_successors;
0930 
0931         multifunction_output(graph& g) : base_type(g) {my_successors.set_owner(this);}
0932         multifunction_output( const multifunction_output& other) : base_type(other.my_graph_ref) { my_successors.set_owner(this); }
0933 
0934         bool try_put(const output_type &i) {
0935             task *res = try_put_task(i);
0936             if(!res) return false;
0937             if(res != SUCCESSFULLY_ENQUEUED) {
0938                 FLOW_SPAWN(*res); // TODO: Spawn task inside arena
0939             }
0940             return true;
0941         }
0942 
0943         using base_type::graph_reference;
0944 
0945     protected:
0946 
0947         task* try_put_task(const output_type &i) {
0948             return my_successors.try_put_task(i);
0949         }
0950 
0951         template <int N> friend struct emit_element;
0952 
0953     };  // multifunction_output
0954 
0955 //composite_node
0956 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
0957     template<typename CompositeType>
0958     void add_nodes_impl(CompositeType*, bool) {}
0959 
0960     template< typename CompositeType, typename NodeType1, typename... NodeTypes >
0961     void add_nodes_impl(CompositeType *c_node, bool visible, const NodeType1& n1, const NodeTypes&... n) {
0962         void *addr = const_cast<NodeType1 *>(&n1);
0963 
0964         fgt_alias_port(c_node, addr, visible);
0965         add_nodes_impl(c_node, visible, n...);
0966     }
0967 #endif
0968 
0969 }  // internal
0970 
0971 #endif // __TBB__flow_graph_node_impl_H