Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-12-18 10:24:17

0001 /*
0002     Copyright (c) 2005-2024 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB_flow_graph_impl_H
0018 #define __TBB_flow_graph_impl_H
0019 
0020 // #include "../config.h"
0021 #include "_task.h"
0022 #include "../task_group.h"
0023 #include "../task_arena.h"
0024 #include "../flow_graph_abstractions.h"
0025 
0026 #include "../concurrent_priority_queue.h"
0027 
0028 #include <list>
0029 
0030 namespace tbb {
0031 namespace detail {
0032 
0033 namespace d2 {
0034 
0035 class graph_task;
0036 static graph_task* const SUCCESSFULLY_ENQUEUED = (graph_task*)-1;
0037 typedef unsigned int node_priority_t;
0038 static const node_priority_t no_priority = node_priority_t(0);
0039 
0040 class graph;
0041 class graph_node;
0042 
0043 template <typename GraphContainerType, typename GraphNodeType>
0044 class graph_iterator {
0045     friend class graph;
0046     friend class graph_node;
0047 public:
0048     typedef size_t size_type;
0049     typedef GraphNodeType value_type;
0050     typedef GraphNodeType* pointer;
0051     typedef GraphNodeType& reference;
0052     typedef const GraphNodeType& const_reference;
0053     typedef std::forward_iterator_tag iterator_category;
0054 
0055     //! Copy constructor
0056     graph_iterator(const graph_iterator& other) :
0057         my_graph(other.my_graph), current_node(other.current_node)
0058     {}
0059 
0060     //! Assignment
0061     graph_iterator& operator=(const graph_iterator& other) {
0062         if (this != &other) {
0063             my_graph = other.my_graph;
0064             current_node = other.current_node;
0065         }
0066         return *this;
0067     }
0068 
0069     //! Dereference
0070     reference operator*() const;
0071 
0072     //! Dereference
0073     pointer operator->() const;
0074 
0075     //! Equality
0076     bool operator==(const graph_iterator& other) const {
0077         return ((my_graph == other.my_graph) && (current_node == other.current_node));
0078     }
0079 
0080 #if !__TBB_CPP20_COMPARISONS_PRESENT
0081     //! Inequality
0082     bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
0083 #endif
0084 
0085     //! Pre-increment
0086     graph_iterator& operator++() {
0087         internal_forward();
0088         return *this;
0089     }
0090 
0091     //! Post-increment
0092     graph_iterator operator++(int) {
0093         graph_iterator result = *this;
0094         operator++();
0095         return result;
0096     }
0097 
0098 private:
0099     // the graph over which we are iterating
0100     GraphContainerType *my_graph;
0101     // pointer into my_graph's my_nodes list
0102     pointer current_node;
0103 
0104     //! Private initializing constructor for begin() and end() iterators
0105     graph_iterator(GraphContainerType *g, bool begin);
0106     void internal_forward();
0107 };  // class graph_iterator
0108 
0109 // flags to modify the behavior of the graph reset().  Can be combined.
0110 enum reset_flags {
0111     rf_reset_protocol = 0,
0112     rf_reset_bodies = 1 << 0,  // delete the current node body, reset to a copy of the initial node body.
0113     rf_clear_edges = 1 << 1   // delete edges
0114 };
0115 
0116 void activate_graph(graph& g);
0117 void deactivate_graph(graph& g);
0118 bool is_graph_active(graph& g);
0119 graph_task* prioritize_task(graph& g, graph_task& arena_task);
0120 void spawn_in_graph_arena(graph& g, graph_task& arena_task);
0121 void enqueue_in_graph_arena(graph &g, graph_task& arena_task);
0122 
0123 class graph;
0124 
0125 //! Base class for tasks generated by graph nodes.
0126 class graph_task : public d1::task {
0127 public:
0128     graph_task(graph& g, d1::small_object_allocator& allocator,
0129                node_priority_t node_priority = no_priority);
0130 
0131     graph& my_graph; // graph instance the task belongs to
0132     // TODO revamp: rename to my_priority
0133     node_priority_t priority;
0134     template <typename DerivedType>
0135     void destruct_and_deallocate(const d1::execution_data& ed);
0136 protected:
0137     template <typename DerivedType>
0138     void finalize(const d1::execution_data& ed);
0139 private:
0140     // To organize task_list
0141     graph_task* my_next{ nullptr };
0142     d1::small_object_allocator my_allocator;
0143     d1::wait_tree_vertex_interface* my_reference_vertex;
0144     // TODO revamp: elaborate internal interfaces to avoid friends declarations
0145     friend class graph_task_list;
0146     friend graph_task* prioritize_task(graph& g, graph_task& gt);
0147 };
0148 
0149 inline bool is_this_thread_in_graph_arena(graph& g);
0150 
0151 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0152 class trackable_messages_graph_task : public graph_task {
0153 public:
0154     trackable_messages_graph_task(graph& g, d1::small_object_allocator& allocator,
0155                                   node_priority_t node_priority,
0156                                   const std::forward_list<d1::wait_context_vertex*>& msg_waiters)
0157         : graph_task(g, allocator, node_priority)
0158         , my_msg_wait_context_vertices(msg_waiters)
0159     {
0160         auto last_iterator = my_msg_reference_vertices.cbefore_begin();
0161 
0162         for (auto& msg_waiter : my_msg_wait_context_vertices) {
0163             // If the task is created by the thread outside the graph arena, the lifetime of the thread reference vertex
0164             // may be shorter that the lifetime of the task, so thread reference vertex approach cannot be used
0165             // and the task should be associated with the msg wait context itself
0166             d1::wait_tree_vertex_interface* ref_vertex = is_this_thread_in_graph_arena(g) ?
0167                                                          r1::get_thread_reference_vertex(msg_waiter) :
0168                                                          msg_waiter;
0169             last_iterator = my_msg_reference_vertices.emplace_after(last_iterator,
0170                                                                     ref_vertex);
0171             ref_vertex->reserve(1);
0172         }
0173     }
0174 
0175     trackable_messages_graph_task(graph& g, d1::small_object_allocator& allocator,
0176                                   node_priority_t node_priority,
0177                                   std::forward_list<d1::wait_context_vertex*>&& msg_waiters)
0178         : graph_task(g, allocator, node_priority)
0179         , my_msg_wait_context_vertices(std::move(msg_waiters))
0180     {
0181     }
0182 
0183     const std::forward_list<d1::wait_context_vertex*> get_msg_wait_context_vertices() const {
0184         return my_msg_wait_context_vertices;
0185     }
0186 
0187 protected:
0188     template <typename DerivedType>
0189     void finalize(const d1::execution_data& ed) {
0190         auto wait_context_vertices = std::move(my_msg_wait_context_vertices);
0191         auto msg_reference_vertices = std::move(my_msg_reference_vertices);
0192         graph_task::finalize<DerivedType>(ed);
0193 
0194         // If there is no thread reference vertices associated with the task
0195         // then this task was created by transferring the ownership from other metainfo
0196         // instance (e.g. while taking from the buffer)
0197         if (msg_reference_vertices.empty()) {
0198             for (auto& msg_waiter : wait_context_vertices) {
0199                 msg_waiter->release(1);
0200             }
0201         } else {
0202             for (auto& msg_waiter : msg_reference_vertices) {
0203                 msg_waiter->release(1);
0204             }
0205         }
0206     }
0207 private:
0208     // Each task that holds information about single message wait_contexts should hold two lists
0209     // The first one is wait_contexts associated with the message itself. They are needed
0210     // to be able to broadcast the list of wait_contexts to the node successors while executing the task.
0211     // The second list is a list of reference vertices for each wait_context_vertex in the first list
0212     // to support the distributed reference counting schema
0213     std::forward_list<d1::wait_context_vertex*> my_msg_wait_context_vertices;
0214     std::forward_list<d1::wait_tree_vertex_interface*> my_msg_reference_vertices;
0215 }; // class trackable_messages_graph_task
0216 #endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0217 
0218 struct graph_task_comparator {
0219     bool operator()(const graph_task* left, const graph_task* right) {
0220         return left->priority < right->priority;
0221     }
0222 };
0223 
0224 typedef tbb::concurrent_priority_queue<graph_task*, graph_task_comparator> graph_task_priority_queue_t;
0225 
0226 class priority_task_selector : public d1::task {
0227 public:
0228     priority_task_selector(graph_task_priority_queue_t& priority_queue, d1::small_object_allocator& allocator)
0229         : my_priority_queue(priority_queue), my_allocator(allocator), my_task() {}
0230     task* execute(d1::execution_data& ed) override {
0231         next_task();
0232         __TBB_ASSERT(my_task, nullptr);
0233         task* t_next = my_task->execute(ed);
0234         my_allocator.delete_object(this, ed);
0235         return t_next;
0236     }
0237     task* cancel(d1::execution_data& ed) override {
0238         if (!my_task) {
0239             next_task();
0240         }
0241         __TBB_ASSERT(my_task, nullptr);
0242         task* t_next = my_task->cancel(ed);
0243         my_allocator.delete_object(this, ed);
0244         return t_next;
0245     }
0246 private:
0247     void next_task() {
0248         // TODO revamp: hold functors in priority queue instead of real tasks
0249         bool result = my_priority_queue.try_pop(my_task);
0250         __TBB_ASSERT_EX(result, "Number of critical tasks for scheduler and tasks"
0251             " in graph's priority queue mismatched");
0252         __TBB_ASSERT(my_task && my_task != SUCCESSFULLY_ENQUEUED,
0253             "Incorrect task submitted to graph priority queue");
0254         __TBB_ASSERT(my_task->priority != no_priority,
0255             "Tasks from graph's priority queue must have priority");
0256     }
0257 
0258     graph_task_priority_queue_t& my_priority_queue;
0259     d1::small_object_allocator my_allocator;
0260     graph_task* my_task;
0261 };
0262 
0263 template <typename Receiver, typename Body> class run_and_put_task;
0264 template <typename Body> class run_task;
0265 
0266 //********************************************************************************
0267 // graph tasks helpers
0268 //********************************************************************************
0269 
0270 //! The list of graph tasks
0271 class graph_task_list : no_copy {
0272 private:
0273     graph_task* my_first;
0274     graph_task** my_next_ptr;
0275 public:
0276     //! Construct empty list
0277     graph_task_list() : my_first(nullptr), my_next_ptr(&my_first) {}
0278 
0279     //! True if list is empty; false otherwise.
0280     bool empty() const { return !my_first; }
0281 
0282     //! Push task onto back of list.
0283     void push_back(graph_task& task) {
0284         task.my_next = nullptr;
0285         *my_next_ptr = &task;
0286         my_next_ptr = &task.my_next;
0287     }
0288 
0289     //! Pop the front task from the list.
0290     graph_task& pop_front() {
0291         __TBB_ASSERT(!empty(), "attempt to pop item from empty task_list");
0292         graph_task* result = my_first;
0293         my_first = result->my_next;
0294         if (!my_first) {
0295             my_next_ptr = &my_first;
0296         }
0297         return *result;
0298     }
0299 };
0300 
0301 //! The graph class
0302 /** This class serves as a handle to the graph */
0303 class graph : no_copy, public graph_proxy {
0304     friend class graph_node;
0305 
0306     void prepare_task_arena(bool reinit = false) {
0307         if (reinit) {
0308             __TBB_ASSERT(my_task_arena, "task arena is nullptr");
0309             my_task_arena->terminate();
0310             my_task_arena->initialize(task_arena::attach());
0311         }
0312         else {
0313             __TBB_ASSERT(my_task_arena == nullptr, "task arena is not nullptr");
0314             my_task_arena = new task_arena(task_arena::attach());
0315         }
0316         if (!my_task_arena->is_active()) // failed to attach
0317             my_task_arena->initialize(); // create a new, default-initialized arena
0318         __TBB_ASSERT(my_task_arena->is_active(), "task arena is not active");
0319     }
0320 
0321 public:
0322     //! Constructs a graph with isolated task_group_context
0323     graph();
0324 
0325     //! Constructs a graph with use_this_context as context
0326     explicit graph(task_group_context& use_this_context);
0327 
0328     //! Destroys the graph.
0329     /** Calls wait_for_all, then destroys the root task and context. */
0330     ~graph();
0331 
0332     //! Used to register that an external entity may still interact with the graph.
0333     /** The graph will not return from wait_for_all until a matching number of release_wait calls is
0334     made. */
0335     void reserve_wait() override;
0336 
0337     //! Deregisters an external entity that may have interacted with the graph.
0338     /** The graph will not return from wait_for_all until all the number of reserve_wait calls
0339     matches the number of release_wait calls. */
0340     void release_wait() override;
0341 
0342     //! Wait until graph is idle and the number of release_wait calls equals to the number of
0343     //! reserve_wait calls.
0344     /** The waiting thread will go off and steal work while it is blocked in the wait_for_all. */
0345     void wait_for_all() {
0346         cancelled = false;
0347         caught_exception = false;
0348         try_call([this] {
0349             my_task_arena->execute([this] {
0350                 wait(my_wait_context_vertex.get_context(), *my_context);
0351             });
0352             cancelled = my_context->is_group_execution_cancelled();
0353         }).on_exception([this] {
0354             my_context->reset();
0355             caught_exception = true;
0356             cancelled = true;
0357         });
0358         // TODO: the "if" condition below is just a work-around to support the concurrent wait
0359         // mode. The cancellation and exception mechanisms are still broken in this mode.
0360         // Consider using task group not to re-implement the same functionality.
0361         if (!(my_context->traits() & task_group_context::concurrent_wait)) {
0362             my_context->reset();  // consistent with behavior in catch()
0363         }
0364     }
0365 
0366     // TODO revamp: consider adding getter for task_group_context.
0367 
0368     // ITERATORS
0369     template<typename C, typename N>
0370     friend class graph_iterator;
0371 
0372     // Graph iterator typedefs
0373     typedef graph_iterator<graph, graph_node> iterator;
0374     typedef graph_iterator<const graph, const graph_node> const_iterator;
0375 
0376     // Graph iterator constructors
0377     //! start iterator
0378     iterator begin();
0379     //! end iterator
0380     iterator end();
0381     //! start const iterator
0382     const_iterator begin() const;
0383     //! end const iterator
0384     const_iterator end() const;
0385     //! start const iterator
0386     const_iterator cbegin() const;
0387     //! end const iterator
0388     const_iterator cend() const;
0389 
0390     // thread-unsafe state reset.
0391     void reset(reset_flags f = rf_reset_protocol);
0392 
0393     //! cancels execution of the associated task_group_context
0394     void cancel();
0395 
0396     //! return status of graph execution
0397     bool is_cancelled() { return cancelled; }
0398     bool exception_thrown() { return caught_exception; }
0399 
0400 private:
0401     d1::wait_context_vertex my_wait_context_vertex;
0402     task_group_context *my_context;
0403     bool own_context;
0404     bool cancelled;
0405     bool caught_exception;
0406     bool my_is_active;
0407 
0408     graph_node *my_nodes, *my_nodes_last;
0409 
0410     tbb::spin_mutex nodelist_mutex;
0411     void register_node(graph_node *n);
0412     void remove_node(graph_node *n);
0413 
0414     task_arena* my_task_arena;
0415 
0416     graph_task_priority_queue_t my_priority_queue;
0417 
0418     d1::wait_context_vertex& get_wait_context_vertex() { return my_wait_context_vertex; }
0419 
0420     friend void activate_graph(graph& g);
0421     friend void deactivate_graph(graph& g);
0422     friend bool is_graph_active(graph& g);
0423     friend bool is_this_thread_in_graph_arena(graph& g);
0424     friend graph_task* prioritize_task(graph& g, graph_task& arena_task);
0425     friend void spawn_in_graph_arena(graph& g, graph_task& arena_task);
0426     friend void enqueue_in_graph_arena(graph &g, graph_task& arena_task);
0427 
0428     friend class d1::task_arena_base;
0429     friend class graph_task;
0430 
0431     template <typename T>
0432     friend class receiver;
0433 };  // class graph
0434 
0435 template<typename DerivedType>
0436 inline void graph_task::destruct_and_deallocate(const d1::execution_data& ed) {
0437     auto allocator = my_allocator;
0438     // TODO: investigate if direct call of derived destructor gives any benefits.
0439     this->~graph_task();
0440     allocator.deallocate(static_cast<DerivedType*>(this), ed);
0441 }
0442 
0443 template<typename DerivedType>
0444 inline void graph_task::finalize(const d1::execution_data& ed) {
0445     d1::wait_tree_vertex_interface* reference_vertex = my_reference_vertex;
0446     destruct_and_deallocate<DerivedType>(ed);
0447     reference_vertex->release();
0448 }
0449 
0450 inline graph_task::graph_task(graph& g, d1::small_object_allocator& allocator,
0451                               node_priority_t node_priority)
0452     : my_graph(g)
0453     , priority(node_priority)
0454     , my_allocator(allocator)
0455 {
0456     // If the task is created by the thread outside the graph arena, the lifetime of the thread reference vertex
0457     // may be shorter that the lifetime of the task, so thread reference vertex approach cannot be used
0458     // and the task should be associated with the graph wait context itself
0459     // TODO: consider how reference counting can be improved for such a use case. Most common example is the async_node
0460     d1::wait_context_vertex* graph_wait_context_vertex = &my_graph.get_wait_context_vertex();
0461     my_reference_vertex = is_this_thread_in_graph_arena(g) ? r1::get_thread_reference_vertex(graph_wait_context_vertex)
0462                                                            : graph_wait_context_vertex;
0463     __TBB_ASSERT(my_reference_vertex, nullptr);
0464     my_reference_vertex->reserve();
0465 }
0466 
0467 //********************************************************************************
0468 // end of graph tasks helpers
0469 //********************************************************************************
0470 
0471 
0472 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0473 class get_graph_helper;
0474 #endif
0475 
0476 //! The base of all graph nodes.
0477 class graph_node : no_copy {
0478     friend class graph;
0479     template<typename C, typename N>
0480     friend class graph_iterator;
0481 
0482 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0483     friend class get_graph_helper;
0484 #endif
0485 
0486 protected:
0487     graph& my_graph;
0488     graph& graph_reference() const {
0489         // TODO revamp: propagate graph_reference() method to all the reference places.
0490         return my_graph;
0491     }
0492     graph_node* next = nullptr;
0493     graph_node* prev = nullptr;
0494 public:
0495     explicit graph_node(graph& g);
0496 
0497     virtual ~graph_node();
0498 
0499 protected:
0500     // performs the reset on an individual node.
0501     virtual void reset_node(reset_flags f = rf_reset_protocol) = 0;
0502 };  // class graph_node
0503 
0504 inline void activate_graph(graph& g) {
0505     g.my_is_active = true;
0506 }
0507 
0508 inline void deactivate_graph(graph& g) {
0509     g.my_is_active = false;
0510 }
0511 
0512 inline bool is_graph_active(graph& g) {
0513     return g.my_is_active;
0514 }
0515 
0516 inline bool is_this_thread_in_graph_arena(graph& g) {
0517     __TBB_ASSERT(g.my_task_arena && g.my_task_arena->is_active(), nullptr);
0518     return r1::execution_slot(*g.my_task_arena) != d1::slot_id(-1);
0519 }
0520 
0521 inline graph_task* prioritize_task(graph& g, graph_task& gt) {
0522     if( no_priority == gt.priority )
0523         return &gt;
0524 
0525     //! Non-preemptive priority pattern. The original task is submitted as a work item to the
0526     //! priority queue, and a new critical task is created to take and execute a work item with
0527     //! the highest known priority. The reference counting responsibility is transferred to
0528     //! the new task.
0529     d1::task* critical_task = gt.my_allocator.new_object<priority_task_selector>(g.my_priority_queue, gt.my_allocator);
0530     __TBB_ASSERT( critical_task, "bad_alloc?" );
0531     g.my_priority_queue.push(&gt);
0532     using tbb::detail::d1::submit;
0533     submit( *critical_task, *g.my_task_arena, *g.my_context, /*as_critical=*/true );
0534     return nullptr;
0535 }
0536 
0537 //! Spawns a task inside graph arena
0538 inline void spawn_in_graph_arena(graph& g, graph_task& arena_task) {
0539     if (is_graph_active(g)) {
0540         d1::task* gt = prioritize_task(g, arena_task);
0541         if( !gt )
0542             return;
0543 
0544         __TBB_ASSERT(g.my_task_arena && g.my_task_arena->is_active(), nullptr);
0545         submit( *gt, *g.my_task_arena, *g.my_context
0546 #if __TBB_PREVIEW_CRITICAL_TASKS
0547                 , /*as_critical=*/false
0548 #endif
0549         );
0550     }
0551 }
0552 
0553 // TODO revamp: unify *_in_graph_arena functions
0554 
0555 //! Enqueues a task inside graph arena
0556 inline void enqueue_in_graph_arena(graph &g, graph_task& arena_task) {
0557     if (is_graph_active(g)) {
0558         __TBB_ASSERT( g.my_task_arena && g.my_task_arena->is_active(), "Is graph's arena initialized and active?" );
0559 
0560         // TODO revamp: decide on the approach that does not postpone critical task
0561         if( d1::task* gt = prioritize_task(g, arena_task) )
0562             submit( *gt, *g.my_task_arena, *g.my_context, /*as_critical=*/false);
0563     }
0564 }
0565 
0566 } // namespace d2
0567 } // namespace detail
0568 } // namespace tbb
0569 
0570 #endif // __TBB_flow_graph_impl_H