![]() |
|
|||
File indexing completed on 2025-07-30 08:46:14
0001 /* 0002 Copyright (c) 2005-2022 Intel Corporation 0003 0004 Licensed under the Apache License, Version 2.0 (the "License"); 0005 you may not use this file except in compliance with the License. 0006 You may obtain a copy of the License at 0007 0008 http://www.apache.org/licenses/LICENSE-2.0 0009 0010 Unless required by applicable law or agreed to in writing, software 0011 distributed under the License is distributed on an "AS IS" BASIS, 0012 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 0013 See the License for the specific language governing permissions and 0014 limitations under the License. 0015 */ 0016 0017 0018 #ifndef __TBB_detail__aggregator_H 0019 #define __TBB_detail__aggregator_H 0020 0021 #include "_assert.h" 0022 #include "_utils.h" 0023 #include <atomic> 0024 #if !__TBBMALLOC_BUILD // TODO: check this macro with TBB Malloc 0025 #include "../profiling.h" 0026 #endif 0027 0028 namespace tbb { 0029 namespace detail { 0030 namespace d1 { 0031 0032 // Base class for aggregated operation 0033 template <typename Derived> 0034 class aggregated_operation { 0035 public: 0036 // Zero value means "wait" status, all other values are "user" specified values and 0037 // are defined into the scope of a class which uses "status" 0038 std::atomic<uintptr_t> status; 0039 0040 std::atomic<Derived*> next; 0041 aggregated_operation() : status{}, next(nullptr) {} 0042 }; // class aggregated_operation 0043 0044 // Aggregator base class 0045 /* An aggregator for collecting operations coming from multiple sources and executing 0046 them serially on a single thread. OperationType must be derived from 0047 aggregated_operation. The parameter HandlerType is a functor that will be passed the 0048 list of operations and is expected to handle each operation appropriately, setting the 0049 status of each operation to non-zero. */ 0050 template <typename OperationType> 0051 class aggregator_generic { 0052 public: 0053 aggregator_generic() : pending_operations(nullptr), handler_busy(false) {} 0054 0055 // Execute an operation 0056 /* Places an operation into the waitlist (pending_operations), and either handles the list, 0057 or waits for the operation to complete, or returns. 0058 The long_life_time parameter specifies the life time of the given operation object. 0059 Operations with long_life_time == true may be accessed after execution. 0060 A "short" life time operation (long_life_time == false) can be destroyed 0061 during execution, and so any access to it after it was put into the waitlist, 0062 including status check, is invalid. As a consequence, waiting for completion 0063 of such operation causes undefined behavior. */ 0064 template <typename HandlerType> 0065 void execute( OperationType* op, HandlerType& handle_operations, bool long_life_time = true ) { 0066 // op->status should be read before inserting the operation into the 0067 // aggregator waitlist since it can become invalid after executing a 0068 // handler (if the operation has 'short' life time.) 0069 const uintptr_t status = op->status.load(std::memory_order_relaxed); 0070 0071 // ITT note: &(op->status) tag is used to cover accesses to this op node. This 0072 // thread has created the operation, and now releases it so that the handler 0073 // thread may handle the associated operation w/o triggering a race condition; 0074 // thus this tag will be acquired just before the operation is handled in the 0075 // handle_operations functor. 0076 call_itt_notify(releasing, &(op->status)); 0077 // insert the operation in the queue. 0078 OperationType* res = pending_operations.load(std::memory_order_relaxed); 0079 do { 0080 op->next.store(res, std::memory_order_relaxed); 0081 } while (!pending_operations.compare_exchange_strong(res, op)); 0082 if (!res) { // first in the list; handle the operations 0083 // ITT note: &pending_operations tag covers access to the handler_busy flag, 0084 // which this waiting handler thread will try to set before entering 0085 // handle_operations. 0086 call_itt_notify(acquired, &pending_operations); 0087 start_handle_operations(handle_operations); 0088 // The operation with 'short' life time can already be destroyed 0089 if (long_life_time) 0090 __TBB_ASSERT(op->status.load(std::memory_order_relaxed), nullptr); 0091 } 0092 // Not first; wait for op to be ready 0093 else if (!status) { // operation is blocking here. 0094 __TBB_ASSERT(long_life_time, "Waiting for an operation object that might be destroyed during processing"); 0095 call_itt_notify(prepare, &(op->status)); 0096 spin_wait_while_eq(op->status, uintptr_t(0)); 0097 } 0098 } 0099 0100 private: 0101 // Trigger the handling of operations when the handler is free 0102 template <typename HandlerType> 0103 void start_handle_operations( HandlerType& handle_operations ) { 0104 OperationType* op_list; 0105 0106 // ITT note: &handler_busy tag covers access to pending_operations as it is passed 0107 // between active and waiting handlers. Below, the waiting handler waits until 0108 // the active handler releases, and the waiting handler acquires &handler_busy as 0109 // it becomes the active_handler. The release point is at the end of this 0110 // function, when all operations in pending_operations have been handled by the 0111 // owner of this aggregator. 0112 call_itt_notify(prepare, &handler_busy); 0113 // get the handler_busy: 0114 // only one thread can possibly spin here at a time 0115 spin_wait_until_eq(handler_busy, uintptr_t(0)); 0116 call_itt_notify(acquired, &handler_busy); 0117 // acquire fence not necessary here due to causality rule and surrounding atomics 0118 handler_busy.store(1, std::memory_order_relaxed); 0119 0120 // ITT note: &pending_operations tag covers access to the handler_busy flag 0121 // itself. Capturing the state of the pending_operations signifies that 0122 // handler_busy has been set and a new active handler will now process that list's 0123 // operations. 0124 call_itt_notify(releasing, &pending_operations); 0125 // grab pending_operations 0126 op_list = pending_operations.exchange(nullptr); 0127 0128 // handle all the operations 0129 handle_operations(op_list); 0130 0131 // release the handler 0132 handler_busy.store(0, std::memory_order_release); 0133 } 0134 0135 // An atomically updated list (aka mailbox) of pending operations 0136 std::atomic<OperationType*> pending_operations; 0137 // Controls threads access to handle_operations 0138 std::atomic<uintptr_t> handler_busy; 0139 }; // class aggregator_generic 0140 0141 template <typename HandlerType, typename OperationType> 0142 class aggregator : public aggregator_generic<OperationType> { 0143 HandlerType handle_operations; 0144 public: 0145 aggregator() = default; 0146 0147 void initialize_handler( HandlerType h ) { handle_operations = h; } 0148 0149 void execute(OperationType* op) { 0150 aggregator_generic<OperationType>::execute(op, handle_operations); 0151 } 0152 }; // class aggregator 0153 0154 // the most-compatible friend declaration (vs, gcc, icc) is 0155 // template<class U, class V> friend class aggregating_functor; 0156 template <typename AggregatingClass, typename OperationList> 0157 class aggregating_functor { 0158 AggregatingClass* my_object{nullptr}; 0159 public: 0160 aggregating_functor() = default; 0161 aggregating_functor( AggregatingClass* object ) : my_object(object) { 0162 __TBB_ASSERT(my_object, nullptr); 0163 } 0164 0165 void operator()( OperationList* op_list ) { 0166 __TBB_ASSERT(my_object, nullptr); 0167 my_object->handle_operations(op_list); 0168 } 0169 }; // class aggregating_functor 0170 0171 0172 } // namespace d1 0173 } // namespace detail 0174 } // namespace tbb 0175 0176 #endif // __TBB_detail__aggregator_H
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
![]() ![]() |