Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:12:51

0001 /*
0002     Copyright (c) 2005-2020 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB__aggregator_H
0018 #define __TBB__aggregator_H
0019 
0020 #define __TBB_aggregator_H_include_area
0021 #include "internal/_warning_suppress_enable_notice.h"
0022 
0023 #if !TBB_PREVIEW_AGGREGATOR
0024 #error Set TBB_PREVIEW_AGGREGATOR before including aggregator.h
0025 #endif
0026 
0027 #include "atomic.h"
0028 #include "tbb_profiling.h"
0029 
0030 namespace tbb {
0031 namespace interface6 {
0032 
0033 using namespace tbb::internal;
0034 
0035 class aggregator_operation {
0036     template<typename handler_type> friend class aggregator_ext;
0037     uintptr_t status;
0038     aggregator_operation* my_next;
0039 public:
0040     enum aggregator_operation_status { agg_waiting=0, agg_finished };
0041     aggregator_operation() : status(agg_waiting), my_next(NULL) {}
0042     /// Call start before handling this operation
0043     void start() { call_itt_notify(acquired, &status); }
0044     /// Call finish when done handling this operation
0045     /** The operation will be released to its originating thread, and possibly deleted. */
0046     void finish() { itt_store_word_with_release(status, uintptr_t(agg_finished)); }
0047     aggregator_operation* next() { return itt_hide_load_word(my_next);}
0048     void set_next(aggregator_operation* n) { itt_hide_store_word(my_next, n); }
0049 };
0050 
0051 namespace internal {
0052 
0053 class basic_operation_base : public aggregator_operation {
0054     friend class basic_handler;
0055     virtual void apply_body() = 0;
0056 public:
0057     basic_operation_base() : aggregator_operation() {}
0058     virtual ~basic_operation_base() {}
0059 };
0060 
0061 template<typename Body>
0062 class basic_operation : public basic_operation_base, no_assign {
0063     const Body& my_body;
0064     void apply_body() __TBB_override { my_body(); }
0065 public:
0066     basic_operation(const Body& b) : basic_operation_base(), my_body(b) {}
0067 };
0068 
0069 class basic_handler {
0070 public:
0071     basic_handler() {}
0072     void operator()(aggregator_operation* op_list) const {
0073         while (op_list) {
0074             // ITT note: &(op_list->status) tag is used to cover accesses to the operation data.
0075             // The executing thread "acquires" the tag (see start()) and then performs
0076             // the associated operation w/o triggering a race condition diagnostics.
0077             // A thread that created the operation is waiting for its status (see execute_impl()),
0078             // so when this thread is done with the operation, it will "release" the tag
0079             // and update the status (see finish()) to give control back to the waiting thread.
0080             basic_operation_base& request = static_cast<basic_operation_base&>(*op_list);
0081             // IMPORTANT: need to advance op_list to op_list->next() before calling request.finish()
0082             op_list = op_list->next();
0083             request.start();
0084             request.apply_body();
0085             request.finish();
0086         }
0087     }
0088 };
0089 
0090 } // namespace internal
0091 
0092 //! Aggregator base class and expert interface
0093 /** An aggregator for collecting operations coming from multiple sources and executing
0094     them serially on a single thread. */
0095 template <typename handler_type>
0096 class aggregator_ext : tbb::internal::no_copy {
0097 public:
0098     aggregator_ext(const handler_type& h) : handler_busy(0), handle_operations(h) { mailbox = NULL; }
0099 
0100     //! EXPERT INTERFACE: Enter a user-made operation into the aggregator's mailbox.
0101     /** Details of user-made operations must be handled by user-provided handler */
0102     void process(aggregator_operation *op) { execute_impl(*op); }
0103 
0104 protected:
0105     /** Place operation in mailbox, then either handle mailbox or wait for the operation
0106         to be completed by a different thread. */
0107     void execute_impl(aggregator_operation& op) {
0108         aggregator_operation* res;
0109 
0110         // ITT note: &(op.status) tag is used to cover accesses to this operation. This
0111         // thread has created the operation, and now releases it so that the handler
0112         // thread may handle the associated operation w/o triggering a race condition;
0113         // thus this tag will be acquired just before the operation is handled in the
0114         // handle_operations functor.
0115         call_itt_notify(releasing, &(op.status));
0116         // insert the operation into the list
0117         do {
0118             // ITT may flag the following line as a race; it is a false positive:
0119             // This is an atomic read; we don't provide itt_hide_load_word for atomics
0120             op.my_next = res = mailbox; // NOT A RACE
0121         } while (mailbox.compare_and_swap(&op, res) != res);
0122         if (!res) { // first in the list; handle the operations
0123             // ITT note: &mailbox tag covers access to the handler_busy flag, which this
0124             // waiting handler thread will try to set before entering handle_operations.
0125             call_itt_notify(acquired, &mailbox);
0126             start_handle_operations();
0127             __TBB_ASSERT(op.status, NULL);
0128         }
0129         else { // not first; wait for op to be ready
0130             call_itt_notify(prepare, &(op.status));
0131             spin_wait_while_eq(op.status, uintptr_t(aggregator_operation::agg_waiting));
0132             itt_load_word_with_acquire(op.status);
0133         }
0134     }
0135 
0136 
0137 private:
0138     //! An atomically updated list (aka mailbox) of aggregator_operations
0139     atomic<aggregator_operation *> mailbox;
0140 
0141     //! Controls thread access to handle_operations
0142     /** Behaves as boolean flag where 0=false, 1=true */
0143     uintptr_t handler_busy;
0144 
0145     handler_type handle_operations;
0146 
0147     //! Trigger the handling of operations when the handler is free
0148     void start_handle_operations() {
0149         aggregator_operation *pending_operations;
0150 
0151         // ITT note: &handler_busy tag covers access to mailbox as it is passed
0152         // between active and waiting handlers.  Below, the waiting handler waits until
0153         // the active handler releases, and the waiting handler acquires &handler_busy as
0154         // it becomes the active_handler. The release point is at the end of this
0155         // function, when all operations in mailbox have been handled by the
0156         // owner of this aggregator.
0157         call_itt_notify(prepare, &handler_busy);
0158         // get handler_busy: only one thread can possibly spin here at a time
0159         spin_wait_until_eq(handler_busy, uintptr_t(0));
0160         call_itt_notify(acquired, &handler_busy);
0161         // acquire fence not necessary here due to causality rule and surrounding atomics
0162         __TBB_store_with_release(handler_busy, uintptr_t(1));
0163 
0164         // ITT note: &mailbox tag covers access to the handler_busy flag itself.
0165         // Capturing the state of the mailbox signifies that handler_busy has been
0166         // set and a new active handler will now process that list's operations.
0167         call_itt_notify(releasing, &mailbox);
0168         // grab pending_operations
0169         pending_operations = mailbox.fetch_and_store(NULL);
0170 
0171         // handle all the operations
0172         handle_operations(pending_operations);
0173 
0174         // release the handler
0175         itt_store_word_with_release(handler_busy, uintptr_t(0));
0176     }
0177 };
0178 
0179 //! Basic aggregator interface
0180 class aggregator : private aggregator_ext<internal::basic_handler> {
0181 public:
0182     aggregator() : aggregator_ext<internal::basic_handler>(internal::basic_handler()) {}
0183     //! BASIC INTERFACE: Enter a function for exclusive execution by the aggregator.
0184     /** The calling thread stores the function object in a basic_operation and
0185         places the operation in the aggregator's mailbox */
0186     template<typename Body>
0187     void execute(const Body& b) {
0188         internal::basic_operation<Body> op(b);
0189         this->execute_impl(op);
0190     }
0191 };
0192 
0193 } // namespace interface6
0194 
0195 using interface6::aggregator;
0196 using interface6::aggregator_ext;
0197 using interface6::aggregator_operation;
0198 
0199 } // namespace tbb
0200 
0201 #include "internal/_warning_suppress_disable_notice.h"
0202 #undef __TBB_aggregator_H_include_area
0203 
0204 #endif  // __TBB__aggregator_H