Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /include/oneapi/tbb/collaborative_call_once.h was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 /*
0002     Copyright (c) 2021 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB_collaborative_call_once_H
0018 #define __TBB_collaborative_call_once_H
0019 
0020 #include "task_arena.h"
0021 #include "task_group.h"
0022 
0023 #include <atomic>
0024 
0025 namespace tbb {
0026 namespace detail {
0027 namespace d1 {
0028 
0029 #if _MSC_VER && !defined(__INTEL_COMPILER)
0030     // Suppress warning: structure was padded due to alignment specifier
0031     #pragma warning (push)
0032     #pragma warning (disable: 4324)
0033 #endif
0034 
0035 constexpr std::uintptr_t collaborative_once_max_references = max_nfs_size;
0036 constexpr std::uintptr_t collaborative_once_references_mask = collaborative_once_max_references-1;
0037 
0038 class alignas(max_nfs_size) collaborative_once_runner : no_copy {
0039 
0040     struct storage_t {
0041         task_arena m_arena{ task_arena::attach{} };
0042         wait_context m_wait_context{1};
0043     };
0044 
0045     std::atomic<std::int64_t> m_ref_count{0};
0046     std::atomic<bool> m_is_ready{false};
0047 
0048     // Storage with task_arena and wait_context must be initialized only by winner thread
0049     union {
0050         storage_t m_storage;
0051     };
0052 
0053     template<typename Fn>
0054     void isolated_execute(Fn f) {
0055         auto func = [f] {
0056             f();
0057            // delegate_base requires bool returning functor while isolate_within_arena ignores the result
0058             return true;
0059         };
0060 
0061         delegated_function<decltype(func)> delegate(func);
0062 
0063         r1::isolate_within_arena(delegate, reinterpret_cast<std::intptr_t>(this));
0064     }
0065 
0066 public:
0067     class lifetime_guard : no_copy {
0068         collaborative_once_runner& m_runner;
0069     public:
0070         lifetime_guard(collaborative_once_runner& r) : m_runner(r) {
0071             m_runner.m_ref_count++;
0072         }
0073         ~lifetime_guard() {
0074             m_runner.m_ref_count--;
0075         }
0076     };
0077 
0078     collaborative_once_runner() {}
0079 
0080     ~collaborative_once_runner() {
0081         spin_wait_until_eq(m_ref_count, 0, std::memory_order_acquire);
0082         if (m_is_ready.load(std::memory_order_relaxed)) {
0083             m_storage.~storage_t();
0084         }
0085     }
0086 
0087     std::uintptr_t to_bits() {
0088         return reinterpret_cast<std::uintptr_t>(this);
0089     }
0090 
0091     static collaborative_once_runner* from_bits(std::uintptr_t bits) {
0092         __TBB_ASSERT( (bits & collaborative_once_references_mask) == 0, "invalid pointer, last log2(max_nfs_size) bits must be zero" );
0093         return reinterpret_cast<collaborative_once_runner*>(bits);
0094     }
0095 
0096     template <typename F>
0097     void run_once(F&& f) {
0098         __TBB_ASSERT(!m_is_ready.load(std::memory_order_relaxed), "storage with task_arena and wait_context is already initialized");
0099         // Initialize internal state
0100         new(&m_storage) storage_t();
0101         m_storage.m_arena.execute([&] {
0102             isolated_execute([&] {
0103                 task_group_context context{ task_group_context::bound,
0104                     task_group_context::default_traits | task_group_context::concurrent_wait };
0105 
0106                 function_stack_task<F> t{ std::forward<F>(f), m_storage.m_wait_context };
0107 
0108                 // Set the ready flag after entering the execute body to prevent
0109                 // moonlighting threads from occupying all slots inside the arena.
0110                 m_is_ready.store(true, std::memory_order_release);
0111                 execute_and_wait(t, context, m_storage.m_wait_context, context);
0112             });
0113         });
0114     }
0115 
0116     void assist() noexcept {
0117         // Do not join the arena until the winner thread takes the slot
0118         spin_wait_while_eq(m_is_ready, false);
0119         m_storage.m_arena.execute([&] {
0120             isolated_execute([&] {
0121                 // We do not want to get an exception from user functor on moonlighting threads.
0122                 // The exception is handled with the winner thread
0123                 task_group_context stub_context;
0124                 wait(m_storage.m_wait_context, stub_context);
0125             });
0126         });
0127     }
0128 
0129 };
0130 
0131 class collaborative_once_flag : no_copy {
0132     enum state : std::uintptr_t {
0133         uninitialized,
0134         done,
0135 #if TBB_USE_ASSERT
0136         dead
0137 #endif
0138     };
0139     std::atomic<std::uintptr_t> m_state{ state::uninitialized };
0140 
0141     template <typename Fn, typename... Args>
0142     friend void collaborative_call_once(collaborative_once_flag& flag, Fn&& f, Args&&... args);
0143 
0144     void set_completion_state(std::uintptr_t runner_bits, std::uintptr_t desired) {
0145         std::uintptr_t expected = runner_bits;
0146         do {
0147             expected = runner_bits;
0148             // Possible inefficiency: when we start waiting,
0149             // some moonlighting threads might continue coming that will prolong our waiting.
0150             // Fortunately, there are limited number of threads on the system so wait time is limited.
0151             spin_wait_until_eq(m_state, expected);
0152         } while (!m_state.compare_exchange_strong(expected, desired));
0153     }
0154     
0155     template <typename Fn>
0156     void do_collaborative_call_once(Fn&& f) {
0157         std::uintptr_t expected = m_state.load(std::memory_order_acquire);
0158         collaborative_once_runner runner;
0159 
0160         do {
0161             if (expected == state::uninitialized && m_state.compare_exchange_strong(expected, runner.to_bits())) {
0162                 // Winner thread
0163                 runner.run_once([&] {
0164                     try_call([&] {
0165                         std::forward<Fn>(f)();
0166                     }).on_exception([&] {
0167                         // Reset the state to uninitialized to allow other threads to try initialization again
0168                         set_completion_state(runner.to_bits(), state::uninitialized);
0169                     });
0170                     // We successfully executed functor
0171                     set_completion_state(runner.to_bits(), state::done);
0172                 });
0173                 break;
0174             } else {
0175                 // Moonlighting thread: we need to add a reference to the state to prolong runner lifetime.
0176                 // However, the maximum number of references are limited with runner alignment.
0177                 // So, we use CAS loop and spin_wait to guarantee that references never exceed "max_value".
0178                 do {
0179                     auto max_value = expected | collaborative_once_references_mask;
0180                     expected = spin_wait_while_eq(m_state, max_value);
0181                 // "expected > state::done" prevents storing values, when state is uninitialized or done
0182                 } while (expected > state::done && !m_state.compare_exchange_strong(expected, expected + 1));
0183 
0184                 if (auto shared_runner = collaborative_once_runner::from_bits(expected & ~collaborative_once_references_mask)) {
0185                     collaborative_once_runner::lifetime_guard guard{*shared_runner};
0186                     m_state.fetch_sub(1);
0187 
0188                     // The moonlighting threads are not expected to handle exceptions from user functor.
0189                     // Therefore, no exception is expected from assist().
0190                     shared_runner->assist();
0191                 }
0192             }
0193             __TBB_ASSERT(m_state.load(std::memory_order_relaxed) != state::dead,
0194                          "collaborative_once_flag has been prematurely destroyed");
0195         } while (expected != state::done);
0196     }
0197 
0198 #if TBB_USE_ASSERT
0199 public:
0200     ~collaborative_once_flag() {
0201         m_state.store(state::dead, std::memory_order_relaxed);
0202     }
0203 #endif
0204 };
0205 
0206 
0207 template <typename Fn, typename... Args>
0208 void collaborative_call_once(collaborative_once_flag& flag, Fn&& fn, Args&&... args) {
0209     __TBB_ASSERT(flag.m_state.load(std::memory_order_relaxed) != collaborative_once_flag::dead,
0210                  "collaborative_once_flag has been prematurely destroyed");
0211     if (flag.m_state.load(std::memory_order_acquire) != collaborative_once_flag::done) {
0212     #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
0213         // Using stored_pack to suppress bug in GCC 4.8
0214         // with parameter pack expansion in lambda
0215         auto stored_pack = save_pack(std::forward<Args>(args)...);
0216         auto func = [&] { call(std::forward<Fn>(fn), std::move(stored_pack)); };
0217     #else
0218         auto func = [&] { fn(std::forward<Args>(args)...); };
0219     #endif
0220         flag.do_collaborative_call_once(func);
0221     }
0222 }
0223 
0224 #if _MSC_VER && !defined(__INTEL_COMPILER)
0225     #pragma warning (pop) // 4324 warning
0226 #endif
0227 
0228 } // namespace d1
0229 } // namespace detail
0230 
0231 using detail::d1::collaborative_call_once;
0232 using detail::d1::collaborative_once_flag;
0233 } // namespace tbb
0234 
0235 #endif // __TBB_collaborative_call_once_H