Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-12-18 10:24:20

0001 /*
0002     Copyright (c) 2021-2024 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB_collaborative_call_once_H
0018 #define __TBB_collaborative_call_once_H
0019 
0020 #include "task_arena.h"
0021 #include "task_group.h"
0022 
0023 #include <atomic>
0024 
0025 namespace tbb {
0026 namespace detail {
0027 namespace d1 {
0028 
0029 #if _MSC_VER && !defined(__INTEL_COMPILER)
0030     // Suppress warning: structure was padded due to alignment specifier
0031     #pragma warning (push)
0032     #pragma warning (disable: 4324)
0033 #endif
0034 
0035 template <typename F>
0036 class collaborative_call_stack_task : public task {
0037     const F& m_func;
0038     wait_context& m_wait_ctx;
0039 
0040     void finalize() {
0041         m_wait_ctx.release();
0042     }
0043     task* execute(d1::execution_data&) override {
0044         task* res = d2::task_ptr_or_nullptr(m_func);
0045         finalize();
0046         return res;
0047     }
0048     task* cancel(d1::execution_data&) override {
0049         finalize();
0050         return nullptr;
0051     }
0052 public:
0053     collaborative_call_stack_task(const F& f, wait_context& wctx) : m_func(f), m_wait_ctx(wctx) {}
0054 };
0055 
0056 constexpr std::uintptr_t collaborative_once_max_references = max_nfs_size;
0057 constexpr std::uintptr_t collaborative_once_references_mask = collaborative_once_max_references-1;
0058 
0059 class alignas(max_nfs_size) collaborative_once_runner : no_copy {
0060 
0061     struct storage_t {
0062         task_arena m_arena{ task_arena::attach{} };
0063         wait_context m_wait_context{1};
0064     };
0065 
0066     std::atomic<std::int64_t> m_ref_count{0};
0067     std::atomic<bool> m_is_ready{false};
0068 
0069     // Storage with task_arena and wait_context must be initialized only by winner thread
0070     union {
0071         storage_t m_storage;
0072     };
0073 
0074     template<typename Fn>
0075     void isolated_execute(Fn f) {
0076         auto func = [f] {
0077             f();
0078            // delegate_base requires bool returning functor while isolate_within_arena ignores the result
0079             return true;
0080         };
0081 
0082         delegated_function<decltype(func)> delegate(func);
0083 
0084         r1::isolate_within_arena(delegate, reinterpret_cast<std::intptr_t>(this));
0085     }
0086 
0087 public:
0088     class lifetime_guard : no_copy {
0089         collaborative_once_runner& m_runner;
0090     public:
0091         lifetime_guard(collaborative_once_runner& r) : m_runner(r) {
0092             m_runner.m_ref_count++;
0093         }
0094         ~lifetime_guard() {
0095             m_runner.m_ref_count--;
0096         }
0097     };
0098 
0099     collaborative_once_runner() {}
0100 
0101     ~collaborative_once_runner() {
0102         spin_wait_until_eq(m_ref_count, 0, std::memory_order_acquire);
0103         if (m_is_ready.load(std::memory_order_relaxed)) {
0104             m_storage.~storage_t();
0105         }
0106     }
0107 
0108     std::uintptr_t to_bits() {
0109         return reinterpret_cast<std::uintptr_t>(this);
0110     }
0111 
0112     static collaborative_once_runner* from_bits(std::uintptr_t bits) {
0113         __TBB_ASSERT( (bits & collaborative_once_references_mask) == 0, "invalid pointer, last log2(max_nfs_size) bits must be zero" );
0114         return reinterpret_cast<collaborative_once_runner*>(bits);
0115     }
0116 
0117     template <typename F>
0118     void run_once(F&& f) {
0119         __TBB_ASSERT(!m_is_ready.load(std::memory_order_relaxed), "storage with task_arena and wait_context is already initialized");
0120         // Initialize internal state
0121         new(&m_storage) storage_t();
0122         m_storage.m_arena.execute([&] {
0123             isolated_execute([&] {
0124                 task_group_context context{ task_group_context::bound,
0125                     task_group_context::default_traits | task_group_context::concurrent_wait };
0126 
0127                 collaborative_call_stack_task<F> t{ std::forward<F>(f), m_storage.m_wait_context };
0128 
0129                 // Set the ready flag after entering the execute body to prevent
0130                 // moonlighting threads from occupying all slots inside the arena.
0131                 m_is_ready.store(true, std::memory_order_release);
0132                 execute_and_wait(t, context, m_storage.m_wait_context, context);
0133             });
0134         });
0135     }
0136 
0137     void assist() noexcept {
0138         // Do not join the arena until the winner thread takes the slot
0139         spin_wait_while_eq(m_is_ready, false);
0140         m_storage.m_arena.execute([&] {
0141             isolated_execute([&] {
0142                 // We do not want to get an exception from user functor on moonlighting threads.
0143                 // The exception is handled with the winner thread
0144                 task_group_context stub_context;
0145                 wait(m_storage.m_wait_context, stub_context);
0146             });
0147         });
0148     }
0149 
0150 };
0151 
0152 class collaborative_once_flag : no_copy {
0153     enum state : std::uintptr_t {
0154         uninitialized,
0155         done,
0156 #if TBB_USE_ASSERT
0157         dead
0158 #endif
0159     };
0160     std::atomic<std::uintptr_t> m_state{ state::uninitialized };
0161 
0162     template <typename Fn, typename... Args>
0163     friend void collaborative_call_once(collaborative_once_flag& flag, Fn&& f, Args&&... args);
0164 
0165     void set_completion_state(std::uintptr_t runner_bits, std::uintptr_t desired) {
0166         std::uintptr_t expected = runner_bits;
0167         do {
0168             expected = runner_bits;
0169             // Possible inefficiency: when we start waiting,
0170             // some moonlighting threads might continue coming that will prolong our waiting.
0171             // Fortunately, there are limited number of threads on the system so wait time is limited.
0172             spin_wait_until_eq(m_state, expected);
0173         } while (!m_state.compare_exchange_strong(expected, desired));
0174     }
0175 
0176     template <typename Fn>
0177     void do_collaborative_call_once(Fn&& f) {
0178         std::uintptr_t expected = m_state.load(std::memory_order_acquire);
0179         collaborative_once_runner runner;
0180 
0181         do {
0182             if (expected == state::uninitialized && m_state.compare_exchange_strong(expected, runner.to_bits())) {
0183                 // Winner thread
0184                 runner.run_once([&] {
0185                     try_call([&] {
0186                         std::forward<Fn>(f)();
0187                     }).on_exception([&] {
0188                         // Reset the state to uninitialized to allow other threads to try initialization again
0189                         set_completion_state(runner.to_bits(), state::uninitialized);
0190                     });
0191                     // We successfully executed functor
0192                     set_completion_state(runner.to_bits(), state::done);
0193                 });
0194                 break;
0195             } else {
0196                 // Moonlighting thread: we need to add a reference to the state to prolong runner lifetime.
0197                 // However, the maximum number of references are limited with runner alignment.
0198                 // So, we use CAS loop and spin_wait to guarantee that references never exceed "max_value".
0199                 do {
0200                     auto max_value = expected | collaborative_once_references_mask;
0201                     expected = spin_wait_while_eq(m_state, max_value);
0202                 // "expected > state::done" prevents storing values, when state is uninitialized or done
0203                 } while (expected > state::done && !m_state.compare_exchange_strong(expected, expected + 1));
0204 
0205                 if (auto shared_runner = collaborative_once_runner::from_bits(expected & ~collaborative_once_references_mask)) {
0206                     collaborative_once_runner::lifetime_guard guard{*shared_runner};
0207                     m_state.fetch_sub(1);
0208 
0209                     // The moonlighting threads are not expected to handle exceptions from user functor.
0210                     // Therefore, no exception is expected from assist().
0211                     shared_runner->assist();
0212                 }
0213             }
0214             __TBB_ASSERT(m_state.load(std::memory_order_relaxed) != state::dead,
0215                          "collaborative_once_flag has been prematurely destroyed");
0216         } while (expected != state::done);
0217     }
0218 
0219 #if TBB_USE_ASSERT
0220 public:
0221     ~collaborative_once_flag() {
0222         m_state.store(state::dead, std::memory_order_relaxed);
0223     }
0224 #endif
0225 };
0226 
0227 
0228 template <typename Fn, typename... Args>
0229 void collaborative_call_once(collaborative_once_flag& flag, Fn&& fn, Args&&... args) {
0230     __TBB_ASSERT(flag.m_state.load(std::memory_order_relaxed) != collaborative_once_flag::dead,
0231                  "collaborative_once_flag has been prematurely destroyed");
0232     if (flag.m_state.load(std::memory_order_acquire) != collaborative_once_flag::done) {
0233     #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
0234         // Using stored_pack to suppress bug in GCC 4.8
0235         // with parameter pack expansion in lambda
0236         auto stored_pack = save_pack(std::forward<Args>(args)...);
0237         auto func = [&] { call(std::forward<Fn>(fn), std::move(stored_pack)); };
0238     #else
0239         auto func = [&] { fn(std::forward<Args>(args)...); };
0240     #endif
0241         flag.do_collaborative_call_once(func);
0242     }
0243 }
0244 
0245 #if _MSC_VER && !defined(__INTEL_COMPILER)
0246     #pragma warning (pop) // 4324 warning
0247 #endif
0248 
0249 } // namespace d1
0250 } // namespace detail
0251 
0252 using detail::d1::collaborative_call_once;
0253 using detail::d1::collaborative_once_flag;
0254 } // namespace tbb
0255 
0256 #endif // __TBB_collaborative_call_once_H