File indexing completed on 2025-01-18 09:54:53
0001
0002
0003
0004
0005 #ifndef CPPCORO_STATIC_THREAD_POOL_HPP_INCLUDED
0006 #define CPPCORO_STATIC_THREAD_POOL_HPP_INCLUDED
0007
0008 #include <atomic>
0009 #include <cstdint>
0010 #include <memory>
0011 #include <thread>
0012 #include <vector>
0013 #include <mutex>
0014 #include <cppcoro/coroutine.hpp>
0015
0016 namespace cppcoro
0017 {
0018 class static_thread_pool
0019 {
0020 public:
0021
0022
0023
0024 static_thread_pool();
0025
0026
0027
0028
0029
0030 explicit static_thread_pool(std::uint32_t threadCount);
0031
0032 ~static_thread_pool();
0033
0034 class schedule_operation
0035 {
0036 public:
0037
0038 schedule_operation(static_thread_pool* tp) noexcept : m_threadPool(tp) {}
0039
0040 bool await_ready() noexcept { return false; }
0041 void await_suspend(cppcoro::coroutine_handle<> awaitingCoroutine) noexcept;
0042 void await_resume() noexcept {}
0043
0044 private:
0045
0046 friend class static_thread_pool;
0047
0048 static_thread_pool* m_threadPool;
0049 cppcoro::coroutine_handle<> m_awaitingCoroutine;
0050 schedule_operation* m_next;
0051
0052 };
0053
0054 std::uint32_t thread_count() const noexcept { return m_threadCount; }
0055
0056 [[nodiscard]]
0057 schedule_operation schedule() noexcept { return schedule_operation{ this }; }
0058
0059 private:
0060
0061 friend class schedule_operation;
0062
0063 void run_worker_thread(std::uint32_t threadIndex) noexcept;
0064
0065 void shutdown();
0066
0067 void schedule_impl(schedule_operation* operation) noexcept;
0068
0069 void remote_enqueue(schedule_operation* operation) noexcept;
0070
0071 bool has_any_queued_work_for(std::uint32_t threadIndex) noexcept;
0072
0073 bool approx_has_any_queued_work_for(std::uint32_t threadIndex) const noexcept;
0074
0075 bool is_shutdown_requested() const noexcept;
0076
0077 void notify_intent_to_sleep(std::uint32_t threadIndex) noexcept;
0078 void try_clear_intent_to_sleep(std::uint32_t threadIndex) noexcept;
0079
0080 schedule_operation* try_global_dequeue() noexcept;
0081
0082
0083
0084
0085
0086
0087
0088 schedule_operation* try_steal_from_other_thread(std::uint32_t thisThreadIndex) noexcept;
0089
0090 void wake_one_thread() noexcept;
0091
0092 class thread_state;
0093
0094 static thread_local thread_state* s_currentState;
0095 static thread_local static_thread_pool* s_currentThreadPool;
0096
0097 const std::uint32_t m_threadCount;
0098 const std::unique_ptr<thread_state[]> m_threadStates;
0099
0100 std::vector<std::thread> m_threads;
0101
0102 std::atomic<bool> m_stopRequested;
0103
0104 std::mutex m_globalQueueMutex;
0105 std::atomic<schedule_operation*> m_globalQueueHead;
0106
0107
0108 std::atomic<schedule_operation*> m_globalQueueTail;
0109
0110
0111 std::atomic<std::uint32_t> m_sleepingThreadCount;
0112
0113 };
0114 }
0115
0116 #endif