Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:54:53

0001 ///////////////////////////////////////////////////////////////////////////////
0002 // Copyright (c) Lewis Baker
0003 // Licenced under MIT license. See LICENSE.txt for details.
0004 ///////////////////////////////////////////////////////////////////////////////
0005 #ifndef CPPCORO_STATIC_THREAD_POOL_HPP_INCLUDED
0006 #define CPPCORO_STATIC_THREAD_POOL_HPP_INCLUDED
0007 
0008 #include <atomic>
0009 #include <cstdint>
0010 #include <memory>
0011 #include <thread>
0012 #include <vector>
0013 #include <mutex>
0014 #include <cppcoro/coroutine.hpp>
0015 
0016 namespace cppcoro
0017 {
0018     class static_thread_pool
0019     {
0020     public:
0021 
0022         /// Initialise to a number of threads equal to the number of cores
0023         /// on the current machine.
0024         static_thread_pool();
0025 
0026         /// Construct a thread pool with the specified number of threads.
0027         ///
0028         /// \param threadCount
0029         /// The number of threads in the pool that will be used to execute work.
0030         explicit static_thread_pool(std::uint32_t threadCount);
0031 
0032         ~static_thread_pool();
0033 
0034         class schedule_operation
0035         {
0036         public:
0037 
0038             schedule_operation(static_thread_pool* tp) noexcept : m_threadPool(tp) {}
0039 
0040             bool await_ready() noexcept { return false; }
0041             void await_suspend(cppcoro::coroutine_handle<> awaitingCoroutine) noexcept;
0042             void await_resume() noexcept {}
0043 
0044         private:
0045 
0046             friend class static_thread_pool;
0047 
0048             static_thread_pool* m_threadPool;
0049             cppcoro::coroutine_handle<> m_awaitingCoroutine;
0050             schedule_operation* m_next;
0051 
0052         };
0053 
0054         std::uint32_t thread_count() const noexcept { return m_threadCount; }
0055 
0056         [[nodiscard]]
0057         schedule_operation schedule() noexcept { return schedule_operation{ this }; }
0058 
0059     private:
0060 
0061         friend class schedule_operation;
0062 
0063         void run_worker_thread(std::uint32_t threadIndex) noexcept;
0064 
0065         void shutdown();
0066 
0067         void schedule_impl(schedule_operation* operation) noexcept;
0068 
0069         void remote_enqueue(schedule_operation* operation) noexcept;
0070 
0071         bool has_any_queued_work_for(std::uint32_t threadIndex) noexcept;
0072 
0073         bool approx_has_any_queued_work_for(std::uint32_t threadIndex) const noexcept;
0074 
0075         bool is_shutdown_requested() const noexcept;
0076 
0077         void notify_intent_to_sleep(std::uint32_t threadIndex) noexcept;
0078         void try_clear_intent_to_sleep(std::uint32_t threadIndex) noexcept;
0079 
0080         schedule_operation* try_global_dequeue() noexcept;
0081 
0082         /// Try to steal a task from another thread.
0083         ///
0084         /// \return
0085         /// A pointer to the operation that was stolen if one could be stolen
0086         /// from another thread. Otherwise returns nullptr if none of the other
0087         /// threads had any tasks that could be stolen.
0088         schedule_operation* try_steal_from_other_thread(std::uint32_t thisThreadIndex) noexcept;
0089 
0090         void wake_one_thread() noexcept;
0091 
0092         class thread_state;
0093 
0094         static thread_local thread_state* s_currentState;
0095         static thread_local static_thread_pool* s_currentThreadPool;
0096 
0097         const std::uint32_t m_threadCount;
0098         const std::unique_ptr<thread_state[]> m_threadStates;
0099 
0100         std::vector<std::thread> m_threads;
0101 
0102         std::atomic<bool> m_stopRequested;
0103 
0104         std::mutex m_globalQueueMutex;
0105         std::atomic<schedule_operation*> m_globalQueueHead;
0106 
0107         //alignas(std::hardware_destructive_interference_size)
0108         std::atomic<schedule_operation*> m_globalQueueTail;
0109 
0110         //alignas(std::hardware_destructive_interference_size)
0111         std::atomic<std::uint32_t> m_sleepingThreadCount;
0112 
0113     };
0114 }
0115 
0116 #endif