File indexing completed on 2025-12-18 10:24:20
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB_collaborative_call_once_H
0018 #define __TBB_collaborative_call_once_H
0019
0020 #include "task_arena.h"
0021 #include "task_group.h"
0022
0023 #include <atomic>
0024
0025 namespace tbb {
0026 namespace detail {
0027 namespace d1 {
0028
0029 #if _MSC_VER && !defined(__INTEL_COMPILER)
0030
0031 #pragma warning (push)
0032 #pragma warning (disable: 4324)
0033 #endif
0034
0035 template <typename F>
0036 class collaborative_call_stack_task : public task {
0037 const F& m_func;
0038 wait_context& m_wait_ctx;
0039
0040 void finalize() {
0041 m_wait_ctx.release();
0042 }
0043 task* execute(d1::execution_data&) override {
0044 task* res = d2::task_ptr_or_nullptr(m_func);
0045 finalize();
0046 return res;
0047 }
0048 task* cancel(d1::execution_data&) override {
0049 finalize();
0050 return nullptr;
0051 }
0052 public:
0053 collaborative_call_stack_task(const F& f, wait_context& wctx) : m_func(f), m_wait_ctx(wctx) {}
0054 };
0055
0056 constexpr std::uintptr_t collaborative_once_max_references = max_nfs_size;
0057 constexpr std::uintptr_t collaborative_once_references_mask = collaborative_once_max_references-1;
0058
0059 class alignas(max_nfs_size) collaborative_once_runner : no_copy {
0060
0061 struct storage_t {
0062 task_arena m_arena{ task_arena::attach{} };
0063 wait_context m_wait_context{1};
0064 };
0065
0066 std::atomic<std::int64_t> m_ref_count{0};
0067 std::atomic<bool> m_is_ready{false};
0068
0069
0070 union {
0071 storage_t m_storage;
0072 };
0073
0074 template<typename Fn>
0075 void isolated_execute(Fn f) {
0076 auto func = [f] {
0077 f();
0078
0079 return true;
0080 };
0081
0082 delegated_function<decltype(func)> delegate(func);
0083
0084 r1::isolate_within_arena(delegate, reinterpret_cast<std::intptr_t>(this));
0085 }
0086
0087 public:
0088 class lifetime_guard : no_copy {
0089 collaborative_once_runner& m_runner;
0090 public:
0091 lifetime_guard(collaborative_once_runner& r) : m_runner(r) {
0092 m_runner.m_ref_count++;
0093 }
0094 ~lifetime_guard() {
0095 m_runner.m_ref_count--;
0096 }
0097 };
0098
0099 collaborative_once_runner() {}
0100
0101 ~collaborative_once_runner() {
0102 spin_wait_until_eq(m_ref_count, 0, std::memory_order_acquire);
0103 if (m_is_ready.load(std::memory_order_relaxed)) {
0104 m_storage.~storage_t();
0105 }
0106 }
0107
0108 std::uintptr_t to_bits() {
0109 return reinterpret_cast<std::uintptr_t>(this);
0110 }
0111
0112 static collaborative_once_runner* from_bits(std::uintptr_t bits) {
0113 __TBB_ASSERT( (bits & collaborative_once_references_mask) == 0, "invalid pointer, last log2(max_nfs_size) bits must be zero" );
0114 return reinterpret_cast<collaborative_once_runner*>(bits);
0115 }
0116
0117 template <typename F>
0118 void run_once(F&& f) {
0119 __TBB_ASSERT(!m_is_ready.load(std::memory_order_relaxed), "storage with task_arena and wait_context is already initialized");
0120
0121 new(&m_storage) storage_t();
0122 m_storage.m_arena.execute([&] {
0123 isolated_execute([&] {
0124 task_group_context context{ task_group_context::bound,
0125 task_group_context::default_traits | task_group_context::concurrent_wait };
0126
0127 collaborative_call_stack_task<F> t{ std::forward<F>(f), m_storage.m_wait_context };
0128
0129
0130
0131 m_is_ready.store(true, std::memory_order_release);
0132 execute_and_wait(t, context, m_storage.m_wait_context, context);
0133 });
0134 });
0135 }
0136
0137 void assist() noexcept {
0138
0139 spin_wait_while_eq(m_is_ready, false);
0140 m_storage.m_arena.execute([&] {
0141 isolated_execute([&] {
0142
0143
0144 task_group_context stub_context;
0145 wait(m_storage.m_wait_context, stub_context);
0146 });
0147 });
0148 }
0149
0150 };
0151
0152 class collaborative_once_flag : no_copy {
0153 enum state : std::uintptr_t {
0154 uninitialized,
0155 done,
0156 #if TBB_USE_ASSERT
0157 dead
0158 #endif
0159 };
0160 std::atomic<std::uintptr_t> m_state{ state::uninitialized };
0161
0162 template <typename Fn, typename... Args>
0163 friend void collaborative_call_once(collaborative_once_flag& flag, Fn&& f, Args&&... args);
0164
0165 void set_completion_state(std::uintptr_t runner_bits, std::uintptr_t desired) {
0166 std::uintptr_t expected = runner_bits;
0167 do {
0168 expected = runner_bits;
0169
0170
0171
0172 spin_wait_until_eq(m_state, expected);
0173 } while (!m_state.compare_exchange_strong(expected, desired));
0174 }
0175
0176 template <typename Fn>
0177 void do_collaborative_call_once(Fn&& f) {
0178 std::uintptr_t expected = m_state.load(std::memory_order_acquire);
0179 collaborative_once_runner runner;
0180
0181 do {
0182 if (expected == state::uninitialized && m_state.compare_exchange_strong(expected, runner.to_bits())) {
0183
0184 runner.run_once([&] {
0185 try_call([&] {
0186 std::forward<Fn>(f)();
0187 }).on_exception([&] {
0188
0189 set_completion_state(runner.to_bits(), state::uninitialized);
0190 });
0191
0192 set_completion_state(runner.to_bits(), state::done);
0193 });
0194 break;
0195 } else {
0196
0197
0198
0199 do {
0200 auto max_value = expected | collaborative_once_references_mask;
0201 expected = spin_wait_while_eq(m_state, max_value);
0202
0203 } while (expected > state::done && !m_state.compare_exchange_strong(expected, expected + 1));
0204
0205 if (auto shared_runner = collaborative_once_runner::from_bits(expected & ~collaborative_once_references_mask)) {
0206 collaborative_once_runner::lifetime_guard guard{*shared_runner};
0207 m_state.fetch_sub(1);
0208
0209
0210
0211 shared_runner->assist();
0212 }
0213 }
0214 __TBB_ASSERT(m_state.load(std::memory_order_relaxed) != state::dead,
0215 "collaborative_once_flag has been prematurely destroyed");
0216 } while (expected != state::done);
0217 }
0218
0219 #if TBB_USE_ASSERT
0220 public:
0221 ~collaborative_once_flag() {
0222 m_state.store(state::dead, std::memory_order_relaxed);
0223 }
0224 #endif
0225 };
0226
0227
0228 template <typename Fn, typename... Args>
0229 void collaborative_call_once(collaborative_once_flag& flag, Fn&& fn, Args&&... args) {
0230 __TBB_ASSERT(flag.m_state.load(std::memory_order_relaxed) != collaborative_once_flag::dead,
0231 "collaborative_once_flag has been prematurely destroyed");
0232 if (flag.m_state.load(std::memory_order_acquire) != collaborative_once_flag::done) {
0233 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
0234
0235
0236 auto stored_pack = save_pack(std::forward<Args>(args)...);
0237 auto func = [&] { call(std::forward<Fn>(fn), std::move(stored_pack)); };
0238 #else
0239 auto func = [&] { fn(std::forward<Args>(args)...); };
0240 #endif
0241 flag.do_collaborative_call_once(func);
0242 }
0243 }
0244
0245 #if _MSC_VER && !defined(__INTEL_COMPILER)
0246 #pragma warning (pop)
0247 #endif
0248
0249 }
0250 }
0251
0252 using detail::d1::collaborative_call_once;
0253 using detail::d1::collaborative_once_flag;
0254 }
0255
0256 #endif