Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:12:47

0001 /*
0002     Copyright (c) 2005-2020 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB_flow_graph_impl_H
0018 #define __TBB_flow_graph_impl_H
0019 
0020 #include "../tbb_stddef.h"
0021 #include "../task.h"
0022 #include "../task_arena.h"
0023 #include "../flow_graph_abstractions.h"
0024 
0025 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
0026 #include "../concurrent_priority_queue.h"
0027 #endif
0028 
0029 #include <list>
0030 
0031 #if TBB_DEPRECATED_FLOW_ENQUEUE
0032 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
0033 #else
0034 #define FLOW_SPAWN(a) tbb::task::spawn((a))
0035 #endif
0036 
0037 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
0038 #define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr ) expr
0039 #define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority ) , priority
0040 #define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1, priority
0041 #else
0042 #define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr )
0043 #define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority )
0044 #define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1
0045 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
0046 
0047 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
0048 #define __TBB_DEPRECATED_LIMITER_EXPR( expr ) expr
0049 #define __TBB_DEPRECATED_LIMITER_ARG2( arg1, arg2 ) arg1, arg2
0050 #define __TBB_DEPRECATED_LIMITER_ARG4( arg1, arg2, arg3, arg4 ) arg1, arg3, arg4
0051 #else
0052 #define __TBB_DEPRECATED_LIMITER_EXPR( expr )
0053 #define __TBB_DEPRECATED_LIMITER_ARG2( arg1, arg2 ) arg1
0054 #define __TBB_DEPRECATED_LIMITER_ARG4( arg1, arg2, arg3, arg4 ) arg1, arg2
0055 #endif // TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
0056 
0057 namespace tbb {
0058 namespace flow {
0059 
0060 namespace internal {
0061 static tbb::task * const SUCCESSFULLY_ENQUEUED = (task *)-1;
0062 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
0063 typedef unsigned int node_priority_t;
0064 static const node_priority_t no_priority = node_priority_t(0);
0065 #endif
0066 }
0067 
0068 namespace interface10 {
0069 class graph;
0070 }
0071 
0072 namespace interface11 {
0073 
0074 using tbb::flow::internal::SUCCESSFULLY_ENQUEUED;
0075 
0076 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
0077 using tbb::flow::internal::node_priority_t;
0078 using tbb::flow::internal::no_priority;
0079 //! Base class for tasks generated by graph nodes.
0080 struct graph_task : public task {
0081     graph_task( node_priority_t node_priority = no_priority ) : priority( node_priority ) {}
0082     node_priority_t priority;
0083 };
0084 #else
0085 typedef task graph_task;
0086 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
0087 
0088 class graph_node;
0089 
0090 template <typename GraphContainerType, typename GraphNodeType>
0091 class graph_iterator {
0092     friend class tbb::flow::interface10::graph;
0093     friend class graph_node;
0094 public:
0095     typedef size_t size_type;
0096     typedef GraphNodeType value_type;
0097     typedef GraphNodeType* pointer;
0098     typedef GraphNodeType& reference;
0099     typedef const GraphNodeType& const_reference;
0100     typedef std::forward_iterator_tag iterator_category;
0101 
0102     //! Default constructor
0103     graph_iterator() : my_graph(NULL), current_node(NULL) {}
0104 
0105     //! Copy constructor
0106     graph_iterator(const graph_iterator& other) :
0107         my_graph(other.my_graph), current_node(other.current_node)
0108     {}
0109 
0110     //! Assignment
0111     graph_iterator& operator=(const graph_iterator& other) {
0112         if (this != &other) {
0113             my_graph = other.my_graph;
0114             current_node = other.current_node;
0115         }
0116         return *this;
0117     }
0118 
0119     //! Dereference
0120     reference operator*() const;
0121 
0122     //! Dereference
0123     pointer operator->() const;
0124 
0125     //! Equality
0126     bool operator==(const graph_iterator& other) const {
0127         return ((my_graph == other.my_graph) && (current_node == other.current_node));
0128     }
0129 
0130     //! Inequality
0131     bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
0132 
0133     //! Pre-increment
0134     graph_iterator& operator++() {
0135         internal_forward();
0136         return *this;
0137     }
0138 
0139     //! Post-increment
0140     graph_iterator operator++(int) {
0141         graph_iterator result = *this;
0142         operator++();
0143         return result;
0144     }
0145 
0146 private:
0147     // the graph over which we are iterating
0148     GraphContainerType *my_graph;
0149     // pointer into my_graph's my_nodes list
0150     pointer current_node;
0151 
0152     //! Private initializing constructor for begin() and end() iterators
0153     graph_iterator(GraphContainerType *g, bool begin);
0154     void internal_forward();
0155 };  // class graph_iterator
0156 
0157 // flags to modify the behavior of the graph reset().  Can be combined.
0158 enum reset_flags {
0159     rf_reset_protocol = 0,
0160     rf_reset_bodies = 1 << 0,  // delete the current node body, reset to a copy of the initial node body.
0161     rf_clear_edges = 1 << 1   // delete edges
0162 };
0163 
0164 namespace internal {
0165 
0166 void activate_graph(tbb::flow::interface10::graph& g);
0167 void deactivate_graph(tbb::flow::interface10::graph& g);
0168 bool is_graph_active(tbb::flow::interface10::graph& g);
0169 tbb::task& prioritize_task(tbb::flow::interface10::graph& g, tbb::task& arena_task);
0170 void spawn_in_graph_arena(tbb::flow::interface10::graph& g, tbb::task& arena_task);
0171 void enqueue_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task& arena_task);
0172 void add_task_to_graph_reset_list(tbb::flow::interface10::graph& g, tbb::task *tp);
0173 
0174 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
0175 struct graph_task_comparator {
0176     bool operator()(const graph_task* left, const graph_task* right) {
0177         return left->priority < right->priority;
0178     }
0179 };
0180 
0181 typedef tbb::concurrent_priority_queue<graph_task*, graph_task_comparator> graph_task_priority_queue_t;
0182 
0183 class priority_task_selector : public task {
0184 public:
0185     priority_task_selector(graph_task_priority_queue_t& priority_queue)
0186         : my_priority_queue(priority_queue) {}
0187     task* execute() __TBB_override {
0188         graph_task* t = NULL;
0189         bool result = my_priority_queue.try_pop(t);
0190         __TBB_ASSERT_EX( result, "Number of critical tasks for scheduler and tasks"
0191                          " in graph's priority queue mismatched" );
0192         __TBB_ASSERT( t && t != SUCCESSFULLY_ENQUEUED,
0193                       "Incorrect task submitted to graph priority queue" );
0194         __TBB_ASSERT( t->priority != tbb::flow::internal::no_priority,
0195                       "Tasks from graph's priority queue must have priority" );
0196         task* t_next = t->execute();
0197         task::destroy(*t);
0198         return t_next;
0199     }
0200 private:
0201     graph_task_priority_queue_t& my_priority_queue;
0202 };
0203 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
0204 
0205 }
0206 
0207 } // namespace interfaceX
0208 namespace interface10 {
0209 //! The graph class
0210 /** This class serves as a handle to the graph */
0211 class graph : tbb::internal::no_copy, public tbb::flow::graph_proxy {
0212     friend class tbb::flow::interface11::graph_node;
0213 
0214     template< typename Body >
0215     class run_task : public tbb::flow::interface11::graph_task {
0216     public:
0217         run_task(Body& body
0218 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
0219                  , tbb::flow::interface11::node_priority_t node_priority = tbb::flow::interface11::no_priority
0220         ) : tbb::flow::interface11::graph_task(node_priority),
0221 #else
0222         ) :
0223 #endif
0224         my_body(body) { }
0225         tbb::task *execute() __TBB_override {
0226             my_body();
0227             return NULL;
0228         }
0229     private:
0230         Body my_body;
0231     };
0232 
0233     template< typename Receiver, typename Body >
0234     class run_and_put_task : public tbb::flow::interface11::graph_task {
0235     public:
0236         run_and_put_task(Receiver &r, Body& body) : my_receiver(r), my_body(body) {}
0237         tbb::task *execute() __TBB_override {
0238             tbb::task *res = my_receiver.try_put_task(my_body());
0239             if (res == tbb::flow::interface11::SUCCESSFULLY_ENQUEUED) res = NULL;
0240             return res;
0241         }
0242     private:
0243         Receiver &my_receiver;
0244         Body my_body;
0245     };
0246     typedef std::list<tbb::task *> task_list_type;
0247 
0248     class wait_functor {
0249         tbb::task* graph_root_task;
0250     public:
0251         wait_functor(tbb::task* t) : graph_root_task(t) {}
0252         void operator()() const { graph_root_task->wait_for_all(); }
0253     };
0254 
0255     //! A functor that spawns a task
0256     class spawn_functor : tbb::internal::no_assign {
0257         tbb::task& spawn_task;
0258     public:
0259         spawn_functor(tbb::task& t) : spawn_task(t) {}
0260         void operator()() const {
0261             FLOW_SPAWN(spawn_task);
0262         }
0263     };
0264 
0265     void prepare_task_arena(bool reinit = false) {
0266         if (reinit) {
0267             __TBB_ASSERT(my_task_arena, "task arena is NULL");
0268             my_task_arena->terminate();
0269             my_task_arena->initialize(tbb::task_arena::attach());
0270         }
0271         else {
0272             __TBB_ASSERT(my_task_arena == NULL, "task arena is not NULL");
0273             my_task_arena = new tbb::task_arena(tbb::task_arena::attach());
0274         }
0275         if (!my_task_arena->is_active()) // failed to attach
0276             my_task_arena->initialize(); // create a new, default-initialized arena
0277         __TBB_ASSERT(my_task_arena->is_active(), "task arena is not active");
0278     }
0279 
0280 public:
0281     //! Constructs a graph with isolated task_group_context
0282     graph();
0283 
0284     //! Constructs a graph with use_this_context as context
0285     explicit graph(tbb::task_group_context& use_this_context);
0286 
0287     //! Destroys the graph.
0288     /** Calls wait_for_all, then destroys the root task and context. */
0289     ~graph();
0290 
0291 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
0292     void set_name(const char *name);
0293 #endif
0294 
0295     __TBB_DEPRECATED void increment_wait_count() {
0296         reserve_wait();
0297     }
0298 
0299     __TBB_DEPRECATED void decrement_wait_count() {
0300         release_wait();
0301     }
0302 
0303     //! Used to register that an external entity may still interact with the graph.
0304     /** The graph will not return from wait_for_all until a matching number of decrement_wait_count calls
0305     is made. */
0306     void reserve_wait() __TBB_override;
0307 
0308     //! Deregisters an external entity that may have interacted with the graph.
0309     /** The graph will not return from wait_for_all until all the number of decrement_wait_count calls
0310     matches the number of increment_wait_count calls. */
0311     void release_wait() __TBB_override;
0312 
0313     //! Spawns a task that runs a body and puts its output to a specific receiver
0314     /** The task is spawned as a child of the graph. This is useful for running tasks
0315     that need to block a wait_for_all() on the graph.  For example a one-off source. */
0316     template< typename Receiver, typename Body >
0317     __TBB_DEPRECATED void run(Receiver &r, Body body) {
0318         if (tbb::flow::interface11::internal::is_graph_active(*this)) {
0319             task* rtask = new (task::allocate_additional_child_of(*root_task()))
0320                 run_and_put_task< Receiver, Body >(r, body);
0321             my_task_arena->execute(spawn_functor(*rtask));
0322         }
0323     }
0324 
0325     //! Spawns a task that runs a function object
0326     /** The task is spawned as a child of the graph. This is useful for running tasks
0327     that need to block a wait_for_all() on the graph. For example a one-off source. */
0328     template< typename Body >
0329     __TBB_DEPRECATED void run(Body body) {
0330         if (tbb::flow::interface11::internal::is_graph_active(*this)) {
0331             task* rtask = new (task::allocate_additional_child_of(*root_task())) run_task< Body >(body);
0332             my_task_arena->execute(spawn_functor(*rtask));
0333         }
0334     }
0335 
0336     //! Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls.
0337     /** The waiting thread will go off and steal work while it is block in the wait_for_all. */
0338     void wait_for_all() {
0339         cancelled = false;
0340         caught_exception = false;
0341         if (my_root_task) {
0342 #if TBB_USE_EXCEPTIONS
0343             try {
0344 #endif
0345                 my_task_arena->execute(wait_functor(my_root_task));
0346 #if __TBB_TASK_GROUP_CONTEXT
0347                 cancelled = my_context->is_group_execution_cancelled();
0348 #endif
0349 #if TBB_USE_EXCEPTIONS
0350             }
0351             catch (...) {
0352                 my_root_task->set_ref_count(1);
0353                 my_context->reset();
0354                 caught_exception = true;
0355                 cancelled = true;
0356                 throw;
0357             }
0358 #endif
0359 #if __TBB_TASK_GROUP_CONTEXT
0360             // TODO: the "if" condition below is just a work-around to support the concurrent wait
0361             // mode. The cancellation and exception mechanisms are still broken in this mode.
0362             // Consider using task group not to re-implement the same functionality.
0363             if (!(my_context->traits() & tbb::task_group_context::concurrent_wait)) {
0364                 my_context->reset();  // consistent with behavior in catch()
0365 #endif
0366                 my_root_task->set_ref_count(1);
0367 #if __TBB_TASK_GROUP_CONTEXT
0368             }
0369 #endif
0370         }
0371     }
0372 
0373     //! Returns the root task of the graph
0374     __TBB_DEPRECATED tbb::task * root_task() {
0375         return my_root_task;
0376     }
0377 
0378     // ITERATORS
0379     template<typename C, typename N>
0380     friend class tbb::flow::interface11::graph_iterator;
0381 
0382     // Graph iterator typedefs
0383     typedef tbb::flow::interface11::graph_iterator<graph, tbb::flow::interface11::graph_node> iterator;
0384     typedef tbb::flow::interface11::graph_iterator<const graph, const tbb::flow::interface11::graph_node> const_iterator;
0385 
0386     // Graph iterator constructors
0387     //! start iterator
0388     iterator begin();
0389     //! end iterator
0390     iterator end();
0391     //! start const iterator
0392     const_iterator begin() const;
0393     //! end const iterator
0394     const_iterator end() const;
0395     //! start const iterator
0396     const_iterator cbegin() const;
0397     //! end const iterator
0398     const_iterator cend() const;
0399 
0400     //! return status of graph execution
0401     bool is_cancelled() { return cancelled; }
0402     bool exception_thrown() { return caught_exception; }
0403 
0404     // thread-unsafe state reset.
0405     void reset(tbb::flow::interface11::reset_flags f = tbb::flow::interface11::rf_reset_protocol);
0406 
0407 private:
0408     tbb::task *my_root_task;
0409 #if __TBB_TASK_GROUP_CONTEXT
0410     tbb::task_group_context *my_context;
0411 #endif
0412     bool own_context;
0413     bool cancelled;
0414     bool caught_exception;
0415     bool my_is_active;
0416     task_list_type my_reset_task_list;
0417 
0418     tbb::flow::interface11::graph_node *my_nodes, *my_nodes_last;
0419 
0420     tbb::spin_mutex nodelist_mutex;
0421     void register_node(tbb::flow::interface11::graph_node *n);
0422     void remove_node(tbb::flow::interface11::graph_node *n);
0423 
0424     tbb::task_arena* my_task_arena;
0425 
0426 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
0427     tbb::flow::interface11::internal::graph_task_priority_queue_t my_priority_queue;
0428 #endif
0429 
0430     friend void tbb::flow::interface11::internal::activate_graph(graph& g);
0431     friend void tbb::flow::interface11::internal::deactivate_graph(graph& g);
0432     friend bool tbb::flow::interface11::internal::is_graph_active(graph& g);
0433     friend tbb::task& tbb::flow::interface11::internal::prioritize_task(graph& g, tbb::task& arena_task);
0434     friend void tbb::flow::interface11::internal::spawn_in_graph_arena(graph& g, tbb::task& arena_task);
0435     friend void tbb::flow::interface11::internal::enqueue_in_graph_arena(graph &g, tbb::task& arena_task);
0436     friend void tbb::flow::interface11::internal::add_task_to_graph_reset_list(graph& g, tbb::task *tp);
0437 
0438     friend class tbb::interface7::internal::task_arena_base;
0439 
0440 };  // class graph
0441 } // namespace interface10
0442 
0443 namespace interface11 {
0444 
0445 using tbb::flow::interface10::graph;
0446 
0447 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0448 namespace internal{
0449 class get_graph_helper;
0450 }
0451 #endif
0452 
0453 //! The base of all graph nodes.
0454 class graph_node : tbb::internal::no_copy {
0455     friend class graph;
0456     template<typename C, typename N>
0457     friend class graph_iterator;
0458 
0459 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0460     friend class internal::get_graph_helper;
0461 #endif
0462 
0463 protected:
0464     graph& my_graph;
0465     graph_node *next, *prev;
0466 public:
0467     explicit graph_node(graph& g);
0468 
0469     virtual ~graph_node();
0470 
0471 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
0472     virtual void set_name(const char *name) = 0;
0473 #endif
0474 
0475 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0476     virtual void extract() = 0;
0477 #endif
0478 
0479 protected:
0480     // performs the reset on an individual node.
0481     virtual void reset_node(reset_flags f = rf_reset_protocol) = 0;
0482 };  // class graph_node
0483 
0484 namespace internal {
0485 
0486 inline void activate_graph(graph& g) {
0487     g.my_is_active = true;
0488 }
0489 
0490 inline void deactivate_graph(graph& g) {
0491     g.my_is_active = false;
0492 }
0493 
0494 inline bool is_graph_active(graph& g) {
0495     return g.my_is_active;
0496 }
0497 
0498 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
0499 inline tbb::task& prioritize_task(graph& g, tbb::task& t) {
0500     task* critical_task = &t;
0501     // TODO: change flow graph's interfaces to work with graph_task type instead of tbb::task.
0502     graph_task* gt = static_cast<graph_task*>(&t);
0503     if( gt->priority != no_priority ) {
0504         //! Non-preemptive priority pattern. The original task is submitted as a work item to the
0505         //! priority queue, and a new critical task is created to take and execute a work item with
0506         //! the highest known priority. The reference counting responsibility is transferred (via
0507         //! allocate_continuation) to the new task.
0508         critical_task = new( gt->allocate_continuation() ) priority_task_selector(g.my_priority_queue);
0509         tbb::internal::make_critical( *critical_task );
0510         g.my_priority_queue.push(gt);
0511     }
0512     return *critical_task;
0513 }
0514 #else
0515 inline tbb::task& prioritize_task(graph&, tbb::task& t) {
0516     return t;
0517 }
0518 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
0519 
0520 //! Spawns a task inside graph arena
0521 inline void spawn_in_graph_arena(graph& g, tbb::task& arena_task) {
0522     if (is_graph_active(g)) {
0523         graph::spawn_functor s_fn(prioritize_task(g, arena_task));
0524         __TBB_ASSERT(g.my_task_arena && g.my_task_arena->is_active(), NULL);
0525         g.my_task_arena->execute(s_fn);
0526     }
0527 }
0528 
0529 //! Enqueues a task inside graph arena
0530 inline void enqueue_in_graph_arena(graph &g, tbb::task& arena_task) {
0531     if (is_graph_active(g)) {
0532         __TBB_ASSERT( g.my_task_arena && g.my_task_arena->is_active(), "Is graph's arena initialized and active?" );
0533         task::enqueue(prioritize_task(g, arena_task), *g.my_task_arena);
0534     }
0535 }
0536 
0537 inline void add_task_to_graph_reset_list(graph& g, tbb::task *tp) {
0538     g.my_reset_task_list.push_back(tp);
0539 }
0540 
0541 } // namespace internal
0542 
0543 } // namespace interfaceX
0544 } // namespace flow
0545 } // namespace tbb
0546 
0547 #endif // __TBB_flow_graph_impl_H