Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-07-30 08:46:16

0001 /*
0002     Copyright (c) 2005-2022 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB_flow_graph_impl_H
0018 #define __TBB_flow_graph_impl_H
0019 
0020 // #include "../config.h"
0021 #include "_task.h"
0022 #include "../task_group.h"
0023 #include "../task_arena.h"
0024 #include "../flow_graph_abstractions.h"
0025 
0026 #include "../concurrent_priority_queue.h"
0027 
0028 #include <list>
0029 
0030 namespace tbb {
0031 namespace detail {
0032 
0033 namespace d1 {
0034 
0035 class graph_task;
0036 static graph_task* const SUCCESSFULLY_ENQUEUED = (graph_task*)-1;
0037 typedef unsigned int node_priority_t;
0038 static const node_priority_t no_priority = node_priority_t(0);
0039 
0040 class graph;
0041 class graph_node;
0042 
0043 template <typename GraphContainerType, typename GraphNodeType>
0044 class graph_iterator {
0045     friend class graph;
0046     friend class graph_node;
0047 public:
0048     typedef size_t size_type;
0049     typedef GraphNodeType value_type;
0050     typedef GraphNodeType* pointer;
0051     typedef GraphNodeType& reference;
0052     typedef const GraphNodeType& const_reference;
0053     typedef std::forward_iterator_tag iterator_category;
0054 
0055     //! Copy constructor
0056     graph_iterator(const graph_iterator& other) :
0057         my_graph(other.my_graph), current_node(other.current_node)
0058     {}
0059 
0060     //! Assignment
0061     graph_iterator& operator=(const graph_iterator& other) {
0062         if (this != &other) {
0063             my_graph = other.my_graph;
0064             current_node = other.current_node;
0065         }
0066         return *this;
0067     }
0068 
0069     //! Dereference
0070     reference operator*() const;
0071 
0072     //! Dereference
0073     pointer operator->() const;
0074 
0075     //! Equality
0076     bool operator==(const graph_iterator& other) const {
0077         return ((my_graph == other.my_graph) && (current_node == other.current_node));
0078     }
0079 
0080 #if !__TBB_CPP20_COMPARISONS_PRESENT
0081     //! Inequality
0082     bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
0083 #endif
0084 
0085     //! Pre-increment
0086     graph_iterator& operator++() {
0087         internal_forward();
0088         return *this;
0089     }
0090 
0091     //! Post-increment
0092     graph_iterator operator++(int) {
0093         graph_iterator result = *this;
0094         operator++();
0095         return result;
0096     }
0097 
0098 private:
0099     // the graph over which we are iterating
0100     GraphContainerType *my_graph;
0101     // pointer into my_graph's my_nodes list
0102     pointer current_node;
0103 
0104     //! Private initializing constructor for begin() and end() iterators
0105     graph_iterator(GraphContainerType *g, bool begin);
0106     void internal_forward();
0107 };  // class graph_iterator
0108 
0109 // flags to modify the behavior of the graph reset().  Can be combined.
0110 enum reset_flags {
0111     rf_reset_protocol = 0,
0112     rf_reset_bodies = 1 << 0,  // delete the current node body, reset to a copy of the initial node body.
0113     rf_clear_edges = 1 << 1   // delete edges
0114 };
0115 
0116 void activate_graph(graph& g);
0117 void deactivate_graph(graph& g);
0118 bool is_graph_active(graph& g);
0119 graph_task* prioritize_task(graph& g, graph_task& arena_task);
0120 void spawn_in_graph_arena(graph& g, graph_task& arena_task);
0121 void enqueue_in_graph_arena(graph &g, graph_task& arena_task);
0122 
0123 class graph;
0124 
0125 //! Base class for tasks generated by graph nodes.
0126 class graph_task : public task {
0127 public:
0128     graph_task(graph& g, small_object_allocator& allocator
0129                , node_priority_t node_priority = no_priority
0130     )
0131         : my_graph(g)
0132         , priority(node_priority)
0133         , my_allocator(allocator)
0134     {}
0135     graph& my_graph; // graph instance the task belongs to
0136     // TODO revamp: rename to my_priority
0137     node_priority_t priority;
0138     template <typename DerivedType>
0139     void destruct_and_deallocate(const execution_data& ed);
0140 protected:
0141     template <typename DerivedType>
0142     void finalize(const execution_data& ed);
0143 private:
0144     // To organize task_list
0145     graph_task* my_next{ nullptr };
0146     small_object_allocator my_allocator;
0147     // TODO revamp: elaborate internal interfaces to avoid friends declarations
0148     friend class graph_task_list;
0149     friend graph_task* prioritize_task(graph& g, graph_task& gt);
0150 };
0151 
0152 struct graph_task_comparator {
0153     bool operator()(const graph_task* left, const graph_task* right) {
0154         return left->priority < right->priority;
0155     }
0156 };
0157 
0158 typedef tbb::concurrent_priority_queue<graph_task*, graph_task_comparator> graph_task_priority_queue_t;
0159 
0160 class priority_task_selector : public task {
0161 public:
0162     priority_task_selector(graph_task_priority_queue_t& priority_queue, small_object_allocator& allocator)
0163         : my_priority_queue(priority_queue), my_allocator(allocator), my_task() {}
0164     task* execute(execution_data& ed) override {
0165         next_task();
0166         __TBB_ASSERT(my_task, nullptr);
0167         task* t_next = my_task->execute(ed);
0168         my_allocator.delete_object(this, ed);
0169         return t_next;
0170     }
0171     task* cancel(execution_data& ed) override {
0172         if (!my_task) {
0173             next_task();
0174         }
0175         __TBB_ASSERT(my_task, nullptr);
0176         task* t_next = my_task->cancel(ed);
0177         my_allocator.delete_object(this, ed);
0178         return t_next;
0179     }
0180 private:
0181     void next_task() {
0182         // TODO revamp: hold functors in priority queue instead of real tasks
0183         bool result = my_priority_queue.try_pop(my_task);
0184         __TBB_ASSERT_EX(result, "Number of critical tasks for scheduler and tasks"
0185             " in graph's priority queue mismatched");
0186         __TBB_ASSERT(my_task && my_task != SUCCESSFULLY_ENQUEUED,
0187             "Incorrect task submitted to graph priority queue");
0188         __TBB_ASSERT(my_task->priority != no_priority,
0189             "Tasks from graph's priority queue must have priority");
0190     }
0191 
0192     graph_task_priority_queue_t& my_priority_queue;
0193     small_object_allocator my_allocator;
0194     graph_task* my_task;
0195 };
0196 
0197 template <typename Receiver, typename Body> class run_and_put_task;
0198 template <typename Body> class run_task;
0199 
0200 //********************************************************************************
0201 // graph tasks helpers
0202 //********************************************************************************
0203 
0204 //! The list of graph tasks
0205 class graph_task_list : no_copy {
0206 private:
0207     graph_task* my_first;
0208     graph_task** my_next_ptr;
0209 public:
0210     //! Construct empty list
0211     graph_task_list() : my_first(nullptr), my_next_ptr(&my_first) {}
0212 
0213     //! True if list is empty; false otherwise.
0214     bool empty() const { return !my_first; }
0215 
0216     //! Push task onto back of list.
0217     void push_back(graph_task& task) {
0218         task.my_next = nullptr;
0219         *my_next_ptr = &task;
0220         my_next_ptr = &task.my_next;
0221     }
0222 
0223     //! Pop the front task from the list.
0224     graph_task& pop_front() {
0225         __TBB_ASSERT(!empty(), "attempt to pop item from empty task_list");
0226         graph_task* result = my_first;
0227         my_first = result->my_next;
0228         if (!my_first) {
0229             my_next_ptr = &my_first;
0230         }
0231         return *result;
0232     }
0233 };
0234 
0235 //! The graph class
0236 /** This class serves as a handle to the graph */
0237 class graph : no_copy, public graph_proxy {
0238     friend class graph_node;
0239 
0240     void prepare_task_arena(bool reinit = false) {
0241         if (reinit) {
0242             __TBB_ASSERT(my_task_arena, "task arena is nullptr");
0243             my_task_arena->terminate();
0244             my_task_arena->initialize(task_arena::attach());
0245         }
0246         else {
0247             __TBB_ASSERT(my_task_arena == nullptr, "task arena is not nullptr");
0248             my_task_arena = new task_arena(task_arena::attach());
0249         }
0250         if (!my_task_arena->is_active()) // failed to attach
0251             my_task_arena->initialize(); // create a new, default-initialized arena
0252         __TBB_ASSERT(my_task_arena->is_active(), "task arena is not active");
0253     }
0254 
0255 public:
0256     //! Constructs a graph with isolated task_group_context
0257     graph();
0258 
0259     //! Constructs a graph with use_this_context as context
0260     explicit graph(task_group_context& use_this_context);
0261 
0262     //! Destroys the graph.
0263     /** Calls wait_for_all, then destroys the root task and context. */
0264     ~graph();
0265 
0266     //! Used to register that an external entity may still interact with the graph.
0267     /** The graph will not return from wait_for_all until a matching number of release_wait calls is
0268     made. */
0269     void reserve_wait() override;
0270 
0271     //! Deregisters an external entity that may have interacted with the graph.
0272     /** The graph will not return from wait_for_all until all the number of reserve_wait calls
0273     matches the number of release_wait calls. */
0274     void release_wait() override;
0275 
0276     //! Wait until graph is idle and the number of release_wait calls equals to the number of
0277     //! reserve_wait calls.
0278     /** The waiting thread will go off and steal work while it is blocked in the wait_for_all. */
0279     void wait_for_all() {
0280         cancelled = false;
0281         caught_exception = false;
0282         try_call([this] {
0283             my_task_arena->execute([this] {
0284                 wait(my_wait_context, *my_context);
0285             });
0286             cancelled = my_context->is_group_execution_cancelled();
0287         }).on_exception([this] {
0288             my_context->reset();
0289             caught_exception = true;
0290             cancelled = true;
0291         });
0292         // TODO: the "if" condition below is just a work-around to support the concurrent wait
0293         // mode. The cancellation and exception mechanisms are still broken in this mode.
0294         // Consider using task group not to re-implement the same functionality.
0295         if (!(my_context->traits() & task_group_context::concurrent_wait)) {
0296             my_context->reset();  // consistent with behavior in catch()
0297         }
0298     }
0299 
0300     // TODO revamp: consider adding getter for task_group_context.
0301 
0302     // ITERATORS
0303     template<typename C, typename N>
0304     friend class graph_iterator;
0305 
0306     // Graph iterator typedefs
0307     typedef graph_iterator<graph, graph_node> iterator;
0308     typedef graph_iterator<const graph, const graph_node> const_iterator;
0309 
0310     // Graph iterator constructors
0311     //! start iterator
0312     iterator begin();
0313     //! end iterator
0314     iterator end();
0315     //! start const iterator
0316     const_iterator begin() const;
0317     //! end const iterator
0318     const_iterator end() const;
0319     //! start const iterator
0320     const_iterator cbegin() const;
0321     //! end const iterator
0322     const_iterator cend() const;
0323 
0324     // thread-unsafe state reset.
0325     void reset(reset_flags f = rf_reset_protocol);
0326 
0327     //! cancels execution of the associated task_group_context
0328     void cancel();
0329 
0330     //! return status of graph execution
0331     bool is_cancelled() { return cancelled; }
0332     bool exception_thrown() { return caught_exception; }
0333 
0334 private:
0335     wait_context my_wait_context;
0336     task_group_context *my_context;
0337     bool own_context;
0338     bool cancelled;
0339     bool caught_exception;
0340     bool my_is_active;
0341 
0342     graph_node *my_nodes, *my_nodes_last;
0343 
0344     tbb::spin_mutex nodelist_mutex;
0345     void register_node(graph_node *n);
0346     void remove_node(graph_node *n);
0347 
0348     task_arena* my_task_arena;
0349 
0350     graph_task_priority_queue_t my_priority_queue;
0351 
0352     friend void activate_graph(graph& g);
0353     friend void deactivate_graph(graph& g);
0354     friend bool is_graph_active(graph& g);
0355     friend graph_task* prioritize_task(graph& g, graph_task& arena_task);
0356     friend void spawn_in_graph_arena(graph& g, graph_task& arena_task);
0357     friend void enqueue_in_graph_arena(graph &g, graph_task& arena_task);
0358 
0359     friend class task_arena_base;
0360 
0361 };  // class graph
0362 
0363 template<typename DerivedType>
0364 inline void graph_task::destruct_and_deallocate(const execution_data& ed) {
0365     auto allocator = my_allocator;
0366     // TODO: investigate if direct call of derived destructor gives any benefits.
0367     this->~graph_task();
0368     allocator.deallocate(static_cast<DerivedType*>(this), ed);
0369 }
0370 
0371 template<typename DerivedType>
0372 inline void graph_task::finalize(const execution_data& ed) {
0373     graph& g = my_graph;
0374     destruct_and_deallocate<DerivedType>(ed);
0375     g.release_wait();
0376 }
0377 
0378 //********************************************************************************
0379 // end of graph tasks helpers
0380 //********************************************************************************
0381 
0382 
0383 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0384 class get_graph_helper;
0385 #endif
0386 
0387 //! The base of all graph nodes.
0388 class graph_node : no_copy {
0389     friend class graph;
0390     template<typename C, typename N>
0391     friend class graph_iterator;
0392 
0393 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
0394     friend class get_graph_helper;
0395 #endif
0396 
0397 protected:
0398     graph& my_graph;
0399     graph& graph_reference() const {
0400         // TODO revamp: propagate graph_reference() method to all the reference places.
0401         return my_graph;
0402     }
0403     graph_node* next = nullptr;
0404     graph_node* prev = nullptr;
0405 public:
0406     explicit graph_node(graph& g);
0407 
0408     virtual ~graph_node();
0409 
0410 protected:
0411     // performs the reset on an individual node.
0412     virtual void reset_node(reset_flags f = rf_reset_protocol) = 0;
0413 };  // class graph_node
0414 
0415 inline void activate_graph(graph& g) {
0416     g.my_is_active = true;
0417 }
0418 
0419 inline void deactivate_graph(graph& g) {
0420     g.my_is_active = false;
0421 }
0422 
0423 inline bool is_graph_active(graph& g) {
0424     return g.my_is_active;
0425 }
0426 
0427 inline graph_task* prioritize_task(graph& g, graph_task& gt) {
0428     if( no_priority == gt.priority )
0429         return &gt;
0430 
0431     //! Non-preemptive priority pattern. The original task is submitted as a work item to the
0432     //! priority queue, and a new critical task is created to take and execute a work item with
0433     //! the highest known priority. The reference counting responsibility is transferred (via
0434     //! allocate_continuation) to the new task.
0435     task* critical_task = gt.my_allocator.new_object<priority_task_selector>(g.my_priority_queue, gt.my_allocator);
0436     __TBB_ASSERT( critical_task, "bad_alloc?" );
0437     g.my_priority_queue.push(&gt);
0438     using tbb::detail::d1::submit;
0439     submit( *critical_task, *g.my_task_arena, *g.my_context, /*as_critical=*/true );
0440     return nullptr;
0441 }
0442 
0443 //! Spawns a task inside graph arena
0444 inline void spawn_in_graph_arena(graph& g, graph_task& arena_task) {
0445     if (is_graph_active(g)) {
0446         task* gt = prioritize_task(g, arena_task);
0447         if( !gt )
0448             return;
0449 
0450         __TBB_ASSERT(g.my_task_arena && g.my_task_arena->is_active(), nullptr);
0451         submit( *gt, *g.my_task_arena, *g.my_context
0452 #if __TBB_PREVIEW_CRITICAL_TASKS
0453                 , /*as_critical=*/false
0454 #endif
0455         );
0456     }
0457 }
0458 
0459 // TODO revamp: unify *_in_graph_arena functions
0460 
0461 //! Enqueues a task inside graph arena
0462 inline void enqueue_in_graph_arena(graph &g, graph_task& arena_task) {
0463     if (is_graph_active(g)) {
0464         __TBB_ASSERT( g.my_task_arena && g.my_task_arena->is_active(), "Is graph's arena initialized and active?" );
0465 
0466         // TODO revamp: decide on the approach that does not postpone critical task
0467         if( task* gt = prioritize_task(g, arena_task) )
0468             submit( *gt, *g.my_task_arena, *g.my_context, /*as_critical=*/false);
0469     }
0470 }
0471 
0472 } // namespace d1
0473 } // namespace detail
0474 } // namespace tbb
0475 
0476 #endif // __TBB_flow_graph_impl_H