Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-12-18 10:24:16

0001 /*
0002     Copyright (c) 2005-2024 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB__flow_graph_body_impl_H
0018 #define __TBB__flow_graph_body_impl_H
0019 
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023 
0024 // included in namespace tbb::detail::d2 (in flow_graph.h)
0025 
0026 typedef std::uint64_t tag_value;
0027 
0028 
0029 // TODO revamp: find out if there is already helper for has_policy.
0030 template<typename ... Policies> struct Policy {};
0031 
0032 template<typename ... Policies> struct has_policy;
0033 
0034 template<typename ExpectedPolicy, typename FirstPolicy, typename ...Policies>
0035 struct has_policy<ExpectedPolicy, FirstPolicy, Policies...> :
0036     std::integral_constant<bool, has_policy<ExpectedPolicy, FirstPolicy>::value ||
0037                                  has_policy<ExpectedPolicy, Policies...>::value> {};
0038 
0039 template<typename ExpectedPolicy, typename SinglePolicy>
0040 struct has_policy<ExpectedPolicy, SinglePolicy> :
0041     std::integral_constant<bool, std::is_same<ExpectedPolicy, SinglePolicy>::value> {};
0042 
0043 template<typename ExpectedPolicy, typename ...Policies>
0044 struct has_policy<ExpectedPolicy, Policy<Policies...> > : has_policy<ExpectedPolicy, Policies...> {};
0045 
0046 namespace graph_policy_namespace {
0047 
0048     struct rejecting { };
0049     struct reserving { };
0050     struct queueing  { };
0051     struct lightweight  { };
0052 
0053     // K == type of field used for key-matching.  Each tag-matching port will be provided
0054     // functor that, given an object accepted by the port, will return the
0055     /// field of type K being used for matching.
0056     template<typename K, typename KHash=d1::tbb_hash_compare<typename std::decay<K>::type > >
0057         __TBB_requires(tbb::detail::hash_compare<KHash, K>)
0058     struct key_matching {
0059         typedef K key_type;
0060         typedef typename std::decay<K>::type base_key_type;
0061         typedef KHash hash_compare_type;
0062     };
0063 
0064     // old tag_matching join's new specifier
0065     typedef key_matching<tag_value> tag_matching;
0066 
0067     // Aliases for Policy combinations
0068     typedef Policy<queueing, lightweight> queueing_lightweight;
0069     typedef Policy<rejecting, lightweight> rejecting_lightweight;
0070 
0071 } // namespace graph_policy_namespace
0072 
0073 // -------------- function_body containers ----------------------
0074 
0075 //! A functor that takes no input and generates a value of type Output
0076 template< typename Output >
0077 class input_body : no_assign {
0078 public:
0079     virtual ~input_body() {}
0080     virtual Output operator()(d1::flow_control& fc) = 0;
0081     virtual input_body* clone() = 0;
0082 };
0083 
0084 //! The leaf for input_body
0085 template< typename Output, typename Body>
0086 class input_body_leaf : public input_body<Output> {
0087 public:
0088     input_body_leaf( const Body &_body ) : body(_body) { }
0089     Output operator()(d1::flow_control& fc) override { return body(fc); }
0090     input_body_leaf* clone() override {
0091         return new input_body_leaf< Output, Body >(body);
0092     }
0093     Body get_body() { return body; }
0094 private:
0095     Body body;
0096 };
0097 
0098 //! A functor that takes an Input and generates an Output
0099 template< typename Input, typename Output >
0100 class function_body : no_assign {
0101 public:
0102     virtual ~function_body() {}
0103     virtual Output operator()(const Input &input) = 0;
0104     virtual function_body* clone() = 0;
0105 };
0106 
0107 //! the leaf for function_body
0108 template <typename Input, typename Output, typename B>
0109 class function_body_leaf : public function_body< Input, Output > {
0110 public:
0111     function_body_leaf( const B &_body ) : body(_body) { }
0112     Output operator()(const Input &i) override { return tbb::detail::invoke(body,i); }
0113     B get_body() { return body; }
0114     function_body_leaf* clone() override {
0115         return new function_body_leaf< Input, Output, B >(body);
0116     }
0117 private:
0118     B body;
0119 };
0120 
0121 //! the leaf for function_body specialized for Input and output of continue_msg
0122 template <typename B>
0123 class function_body_leaf< continue_msg, continue_msg, B> : public function_body< continue_msg, continue_msg > {
0124 public:
0125     function_body_leaf( const B &_body ) : body(_body) { }
0126     continue_msg operator()( const continue_msg &i ) override {
0127         body(i);
0128         return i;
0129     }
0130     B get_body() { return body; }
0131     function_body_leaf* clone() override {
0132         return new function_body_leaf< continue_msg, continue_msg, B >(body);
0133     }
0134 private:
0135     B body;
0136 };
0137 
0138 //! the leaf for function_body specialized for Output of continue_msg
0139 template <typename Input, typename B>
0140 class function_body_leaf< Input, continue_msg, B> : public function_body< Input, continue_msg > {
0141 public:
0142     function_body_leaf( const B &_body ) : body(_body) { }
0143     continue_msg operator()(const Input &i) override {
0144         body(i);
0145         return continue_msg();
0146     }
0147     B get_body() { return body; }
0148     function_body_leaf* clone() override {
0149         return new function_body_leaf< Input, continue_msg, B >(body);
0150     }
0151 private:
0152     B body;
0153 };
0154 
0155 //! the leaf for function_body specialized for Input of continue_msg
0156 template <typename Output, typename B>
0157 class function_body_leaf< continue_msg, Output, B > : public function_body< continue_msg, Output > {
0158 public:
0159     function_body_leaf( const B &_body ) : body(_body) { }
0160     Output operator()(const continue_msg &i) override {
0161         return body(i);
0162     }
0163     B get_body() { return body; }
0164     function_body_leaf* clone() override {
0165         return new function_body_leaf< continue_msg, Output, B >(body);
0166     }
0167 private:
0168     B body;
0169 };
0170 
0171 //! function_body that takes an Input and a set of output ports
0172 template<typename Input, typename OutputSet>
0173 class multifunction_body : no_assign {
0174 public:
0175     virtual ~multifunction_body () {}
0176     virtual void operator()(const Input &/* input*/, OutputSet &/*oset*/) = 0;
0177     virtual multifunction_body* clone() = 0;
0178     virtual void* get_body_ptr() = 0;
0179 };
0180 
0181 //! leaf for multifunction.  OutputSet can be a std::tuple or a vector.
0182 template<typename Input, typename OutputSet, typename B >
0183 class multifunction_body_leaf : public multifunction_body<Input, OutputSet> {
0184 public:
0185     multifunction_body_leaf(const B &_body) : body(_body) { }
0186     void operator()(const Input &input, OutputSet &oset) override {
0187         tbb::detail::invoke(body, input, oset); // body may explicitly put() to one or more of oset.
0188     }
0189     void* get_body_ptr() override { return &body; }
0190     multifunction_body_leaf* clone() override {
0191         return new multifunction_body_leaf<Input, OutputSet,B>(body);
0192     }
0193 
0194 private:
0195     B body;
0196 };
0197 
0198 // ------ function bodies for hash_buffers and key-matching joins.
0199 
0200 template<typename Input, typename Output>
0201 class type_to_key_function_body : no_assign {
0202     public:
0203         virtual ~type_to_key_function_body() {}
0204         virtual Output operator()(const Input &input) = 0;  // returns an Output
0205         virtual type_to_key_function_body* clone() = 0;
0206 };
0207 
0208 // specialization for ref output
0209 template<typename Input, typename Output>
0210 class type_to_key_function_body<Input,Output&> : no_assign {
0211     public:
0212         virtual ~type_to_key_function_body() {}
0213         virtual const Output & operator()(const Input &input) = 0;  // returns a const Output&
0214         virtual type_to_key_function_body* clone() = 0;
0215 };
0216 
0217 template <typename Input, typename Output, typename B>
0218 class type_to_key_function_body_leaf : public type_to_key_function_body<Input, Output> {
0219 public:
0220     type_to_key_function_body_leaf( const B &_body ) : body(_body) { }
0221     Output operator()(const Input &i) override { return tbb::detail::invoke(body, i); }
0222     type_to_key_function_body_leaf* clone() override {
0223         return new type_to_key_function_body_leaf< Input, Output, B>(body);
0224     }
0225 private:
0226     B body;
0227 };
0228 
0229 template <typename Input, typename Output, typename B>
0230 class type_to_key_function_body_leaf<Input,Output&,B> : public type_to_key_function_body< Input, Output&> {
0231 public:
0232     type_to_key_function_body_leaf( const B &_body ) : body(_body) { }
0233     const Output& operator()(const Input &i) override {
0234         return tbb::detail::invoke(body, i);
0235     }
0236     type_to_key_function_body_leaf* clone() override {
0237         return new type_to_key_function_body_leaf< Input, Output&, B>(body);
0238     }
0239 private:
0240     B body;
0241 };
0242 
0243 // --------------------------- end of function_body containers ------------------------
0244 
0245 // --------------------------- node task bodies ---------------------------------------
0246 
0247 //! A task that calls a node's forward_task function
0248 template< typename NodeType >
0249 class forward_task_bypass : public graph_task {
0250     NodeType &my_node;
0251 public:
0252     forward_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n
0253                          , node_priority_t node_priority = no_priority
0254     ) : graph_task(g, allocator, node_priority),
0255     my_node(n) {}
0256 
0257     d1::task* execute(d1::execution_data& ed) override {
0258         graph_task* next_task = my_node.forward_task();
0259         if (SUCCESSFULLY_ENQUEUED == next_task)
0260             next_task = nullptr;
0261         else if (next_task)
0262             next_task = prioritize_task(my_node.graph_reference(), *next_task);
0263         finalize<forward_task_bypass>(ed);
0264         return next_task;
0265     }
0266 
0267     d1::task* cancel(d1::execution_data& ed) override {
0268         finalize<forward_task_bypass>(ed);
0269         return nullptr;
0270     }
0271 };
0272 
0273 //! A task that calls a node's apply_body_bypass function, passing in an input of type Input
0274 //  return the task* unless it is SUCCESSFULLY_ENQUEUED, in which case return nullptr
0275 template< typename NodeType, typename Input, typename BaseTaskType = graph_task>
0276 class apply_body_task_bypass
0277     : public BaseTaskType
0278 {
0279     NodeType &my_node;
0280     Input my_input;
0281 
0282     using check_metainfo = std::is_same<BaseTaskType, graph_task>;
0283     using without_metainfo = std::true_type;
0284     using with_metainfo = std::false_type;
0285 
0286     graph_task* call_apply_body_bypass_impl(without_metainfo) {
0287         return my_node.apply_body_bypass(my_input
0288                                          __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
0289     }
0290 
0291 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0292     graph_task* call_apply_body_bypass_impl(with_metainfo) {
0293         return my_node.apply_body_bypass(my_input, message_metainfo{this->get_msg_wait_context_vertices()});
0294     }
0295 #endif
0296 
0297     graph_task* call_apply_body_bypass() {
0298         return call_apply_body_bypass_impl(check_metainfo{});
0299     }
0300 
0301 public:
0302 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0303     template <typename Metainfo>
0304     apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n, const Input &i,
0305                             node_priority_t node_priority, Metainfo&& metainfo )
0306         : BaseTaskType(g, allocator, node_priority, std::forward<Metainfo>(metainfo).waiters())
0307         , my_node(n), my_input(i) {}
0308 #endif
0309 
0310     apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType& n, const Input& i,
0311                             node_priority_t node_priority = no_priority )
0312         : BaseTaskType(g, allocator, node_priority), my_node(n), my_input(i) {}
0313 
0314     d1::task* execute(d1::execution_data& ed) override {
0315         graph_task* next_task = call_apply_body_bypass();
0316         if (SUCCESSFULLY_ENQUEUED == next_task)
0317             next_task = nullptr;
0318         else if (next_task)
0319             next_task = prioritize_task(my_node.graph_reference(), *next_task);
0320         BaseTaskType::template finalize<apply_body_task_bypass>(ed);
0321         return next_task;
0322     }
0323 
0324     d1::task* cancel(d1::execution_data& ed) override {
0325         BaseTaskType::template finalize<apply_body_task_bypass>(ed);
0326         return nullptr;
0327     }
0328 };
0329 
0330 //! A task that calls a node's apply_body_bypass function with no input
0331 template< typename NodeType >
0332 class input_node_task_bypass : public graph_task {
0333     NodeType &my_node;
0334 public:
0335     input_node_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n )
0336         : graph_task(g, allocator), my_node(n) {}
0337 
0338     d1::task* execute(d1::execution_data& ed) override {
0339         graph_task* next_task = my_node.apply_body_bypass( );
0340         if (SUCCESSFULLY_ENQUEUED == next_task)
0341             next_task = nullptr;
0342         else if (next_task)
0343             next_task = prioritize_task(my_node.graph_reference(), *next_task);
0344         finalize<input_node_task_bypass>(ed);
0345         return next_task;
0346     }
0347 
0348     d1::task* cancel(d1::execution_data& ed) override {
0349         finalize<input_node_task_bypass>(ed);
0350         return nullptr;
0351     }
0352 };
0353 
0354 // ------------------------ end of node task bodies -----------------------------------
0355 
0356 template<typename T, typename DecrementType, typename DummyType = void>
0357 class threshold_regulator;
0358 
0359 template<typename T, typename DecrementType>
0360 class threshold_regulator<T, DecrementType,
0361                   typename std::enable_if<std::is_integral<DecrementType>::value>::type>
0362     : public receiver<DecrementType>, no_copy
0363 {
0364     T* my_node;
0365 protected:
0366 
0367     graph_task* try_put_task( const DecrementType& value ) override {
0368         graph_task* result = my_node->decrement_counter( value );
0369         if( !result )
0370             result = SUCCESSFULLY_ENQUEUED;
0371         return result;
0372     }
0373 
0374 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0375     // Intentionally ignore the metainformation
0376     // If there are more items associated with passed metainfo to be processed
0377     // They should be stored in the buffer before the limiter_node
0378     graph_task* try_put_task(const DecrementType& value, const message_metainfo&) override {
0379         return try_put_task(value);
0380     }
0381 #endif
0382 
0383     graph& graph_reference() const override {
0384         return my_node->my_graph;
0385     }
0386 
0387     template<typename U, typename V> friend class limiter_node;
0388     void reset_receiver( reset_flags ) {}
0389 
0390 public:
0391     threshold_regulator(T* owner) : my_node(owner) {
0392         // Do not work with the passed pointer here as it may not be fully initialized yet
0393     }
0394 };
0395 
0396 template<typename T>
0397 class threshold_regulator<T, continue_msg, void> : public continue_receiver, no_copy {
0398 
0399     T *my_node;
0400 
0401 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0402     // Intentionally ignore the metainformation
0403     // If there are more items associated with passed metainfo to be processed
0404     // They should be stored in the buffer before the limiter_node
0405     graph_task* execute(const message_metainfo&) override {
0406 #else
0407     graph_task* execute() override {
0408 #endif
0409         return my_node->decrement_counter( 1 );
0410     }
0411 
0412 protected:
0413 
0414     graph& graph_reference() const override {
0415         return my_node->my_graph;
0416     }
0417 
0418 public:
0419 
0420     typedef continue_msg input_type;
0421     typedef continue_msg output_type;
0422     threshold_regulator(T* owner)
0423         : continue_receiver( /*number_of_predecessors=*/0, no_priority ), my_node(owner)
0424     {
0425         // Do not work with the passed pointer here as it may not be fully initialized yet
0426     }
0427 };
0428 
0429 #endif // __TBB__flow_graph_body_impl_H