Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-07-30 08:46:14

0001 /*
0002     Copyright (c) 2005-2022 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 
0018 #ifndef __TBB_detail__aggregator_H
0019 #define __TBB_detail__aggregator_H
0020 
0021 #include "_assert.h"
0022 #include "_utils.h"
0023 #include <atomic>
0024 #if !__TBBMALLOC_BUILD // TODO: check this macro with TBB Malloc
0025 #include "../profiling.h"
0026 #endif
0027 
0028 namespace tbb {
0029 namespace detail {
0030 namespace d1 {
0031 
0032 // Base class for aggregated operation
0033 template <typename Derived>
0034 class aggregated_operation {
0035 public:
0036     // Zero value means "wait" status, all other values are "user" specified values and
0037     // are defined into the scope of a class which uses "status"
0038     std::atomic<uintptr_t> status;
0039 
0040     std::atomic<Derived*> next;
0041     aggregated_operation() : status{}, next(nullptr) {}
0042 }; // class aggregated_operation
0043 
0044 // Aggregator base class
0045 /* An aggregator for collecting operations coming from multiple sources and executing
0046    them serially on a single thread.  OperationType must be derived from
0047    aggregated_operation. The parameter HandlerType is a functor that will be passed the
0048    list of operations and is expected to handle each operation appropriately, setting the
0049    status of each operation to non-zero. */
0050 template <typename OperationType>
0051 class aggregator_generic {
0052 public:
0053     aggregator_generic() : pending_operations(nullptr), handler_busy(false) {}
0054 
0055     // Execute an operation
0056     /* Places an operation into the waitlist (pending_operations), and either handles the list,
0057        or waits for the operation to complete, or returns.
0058        The long_life_time parameter specifies the life time of the given operation object.
0059        Operations with long_life_time == true may be accessed after execution.
0060        A "short" life time operation (long_life_time == false) can be destroyed
0061        during execution, and so any access to it after it was put into the waitlist,
0062        including status check, is invalid. As a consequence, waiting for completion
0063        of such operation causes undefined behavior. */
0064     template <typename HandlerType>
0065     void execute( OperationType* op, HandlerType& handle_operations, bool long_life_time = true ) {
0066         // op->status should be read before inserting the operation into the
0067         // aggregator waitlist since it can become invalid after executing a
0068         // handler (if the operation has 'short' life time.)
0069         const uintptr_t status = op->status.load(std::memory_order_relaxed);
0070 
0071         // ITT note: &(op->status) tag is used to cover accesses to this op node. This
0072         // thread has created the operation, and now releases it so that the handler
0073         // thread may handle the associated operation w/o triggering a race condition;
0074         // thus this tag will be acquired just before the operation is handled in the
0075         // handle_operations functor.
0076         call_itt_notify(releasing, &(op->status));
0077         // insert the operation in the queue.
0078         OperationType* res = pending_operations.load(std::memory_order_relaxed);
0079         do {
0080             op->next.store(res, std::memory_order_relaxed);
0081         } while (!pending_operations.compare_exchange_strong(res, op));
0082         if (!res) { // first in the list; handle the operations
0083             // ITT note: &pending_operations tag covers access to the handler_busy flag,
0084             // which this waiting handler thread will try to set before entering
0085             // handle_operations.
0086             call_itt_notify(acquired, &pending_operations);
0087             start_handle_operations(handle_operations);
0088             // The operation with 'short' life time can already be destroyed
0089             if (long_life_time)
0090                 __TBB_ASSERT(op->status.load(std::memory_order_relaxed), nullptr);
0091         }
0092         // Not first; wait for op to be ready
0093         else if (!status) { // operation is blocking here.
0094             __TBB_ASSERT(long_life_time, "Waiting for an operation object that might be destroyed during processing");
0095             call_itt_notify(prepare, &(op->status));
0096             spin_wait_while_eq(op->status, uintptr_t(0));
0097         }
0098    }
0099 
0100 private:
0101     // Trigger the handling of operations when the handler is free
0102     template <typename HandlerType>
0103     void start_handle_operations( HandlerType& handle_operations ) {
0104         OperationType* op_list;
0105 
0106         // ITT note: &handler_busy tag covers access to pending_operations as it is passed
0107         // between active and waiting handlers.  Below, the waiting handler waits until
0108         // the active handler releases, and the waiting handler acquires &handler_busy as
0109         // it becomes the active_handler. The release point is at the end of this
0110         // function, when all operations in pending_operations have been handled by the
0111         // owner of this aggregator.
0112         call_itt_notify(prepare, &handler_busy);
0113         // get the handler_busy:
0114         // only one thread can possibly spin here at a time
0115         spin_wait_until_eq(handler_busy, uintptr_t(0));
0116         call_itt_notify(acquired, &handler_busy);
0117         // acquire fence not necessary here due to causality rule and surrounding atomics
0118         handler_busy.store(1, std::memory_order_relaxed);
0119 
0120         // ITT note: &pending_operations tag covers access to the handler_busy flag
0121         // itself. Capturing the state of the pending_operations signifies that
0122         // handler_busy has been set and a new active handler will now process that list's
0123         // operations.
0124         call_itt_notify(releasing, &pending_operations);
0125         // grab pending_operations
0126         op_list = pending_operations.exchange(nullptr);
0127 
0128         // handle all the operations
0129         handle_operations(op_list);
0130 
0131         // release the handler
0132         handler_busy.store(0, std::memory_order_release);
0133     }
0134 
0135     // An atomically updated list (aka mailbox) of pending operations
0136     std::atomic<OperationType*> pending_operations;
0137     // Controls threads access to handle_operations
0138     std::atomic<uintptr_t> handler_busy;
0139 }; // class aggregator_generic
0140 
0141 template <typename HandlerType, typename OperationType>
0142 class aggregator : public aggregator_generic<OperationType> {
0143     HandlerType handle_operations;
0144 public:
0145     aggregator() = default;
0146 
0147     void initialize_handler( HandlerType h ) { handle_operations = h; }
0148 
0149     void execute(OperationType* op) {
0150         aggregator_generic<OperationType>::execute(op, handle_operations);
0151     }
0152 }; // class aggregator
0153 
0154 // the most-compatible friend declaration (vs, gcc, icc) is
0155 // template<class U, class V> friend class aggregating_functor;
0156 template <typename AggregatingClass, typename OperationList>
0157 class aggregating_functor {
0158     AggregatingClass* my_object{nullptr};
0159 public:
0160     aggregating_functor() = default;
0161     aggregating_functor( AggregatingClass* object ) : my_object(object) {
0162         __TBB_ASSERT(my_object, nullptr);
0163     }
0164 
0165     void operator()( OperationList* op_list ) {
0166         __TBB_ASSERT(my_object, nullptr);
0167         my_object->handle_operations(op_list);
0168     }
0169 }; // class aggregating_functor
0170 
0171 
0172 } // namespace d1
0173 } // namespace detail
0174 } // namespace tbb
0175 
0176 #endif // __TBB_detail__aggregator_H