Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:54:52

0001 ///////////////////////////////////////////////////////////////////////////////
0002 // Copyright (c) Lewis Baker
0003 // Licenced under MIT license. See LICENSE.txt for details.
0004 ///////////////////////////////////////////////////////////////////////////////
0005 #ifndef CPPCORO_IO_SERVICE_HPP_INCLUDED
0006 #define CPPCORO_IO_SERVICE_HPP_INCLUDED
0007 
0008 #include <cppcoro/config.hpp>
0009 #include <cppcoro/cancellation_token.hpp>
0010 #include <cppcoro/cancellation_registration.hpp>
0011 
0012 #if CPPCORO_OS_WINNT
0013 # include <cppcoro/detail/win32.hpp>
0014 #endif
0015 
0016 #include <optional>
0017 #include <chrono>
0018 #include <cstdint>
0019 #include <atomic>
0020 #include <utility>
0021 #include <mutex>
0022 #include <cppcoro/coroutine.hpp>
0023 
0024 namespace cppcoro
0025 {
0026     class io_service
0027     {
0028     public:
0029 
0030         class schedule_operation;
0031         class timed_schedule_operation;
0032 
0033         /// Initialises the io_service.
0034         ///
0035         /// Does not set a concurrency hint. All threads that enter the
0036         /// event loop will actively process events.
0037         io_service();
0038 
0039         /// Initialise the io_service with a concurrency hint.
0040         ///
0041         /// \param concurrencyHint
0042         /// Specifies the target maximum number of I/O threads to be
0043         /// actively processing events.
0044         /// Note that the number of active threads may temporarily go
0045         /// above this number.
0046         io_service(std::uint32_t concurrencyHint);
0047 
0048         ~io_service();
0049 
0050         io_service(io_service&& other) = delete;
0051         io_service(const io_service& other) = delete;
0052         io_service& operator=(io_service&& other) = delete;
0053         io_service& operator=(const io_service& other) = delete;
0054 
0055         /// Returns an operation that when awaited suspends the awaiting
0056         /// coroutine and reschedules it for resumption on an I/O thread
0057         /// associated with this io_service.
0058         [[nodiscard]]
0059         schedule_operation schedule() noexcept;
0060 
0061         /// Returns an operation that when awaited will suspend the
0062         /// awaiting coroutine for the specified delay. Once the delay
0063         /// has elapsed, the coroutine will resume execution on an
0064         /// I/O thread associated with this io_service.
0065         ///
0066         /// \param delay
0067         /// The amount of time to delay scheduling resumption of the coroutine
0068         /// on an I/O thread. There is no guarantee that the coroutine will
0069         /// be resumed exactly after this delay.
0070         ///
0071         /// \param cancellationToken [optional]
0072         /// A cancellation token that can be used to communicate a request to
0073         /// cancel the delayed schedule operation and schedule it for resumption
0074         /// immediately.
0075         /// The co_await operation will throw cppcoro::operation_cancelled if
0076         /// cancellation was requested before the coroutine could be resumed.
0077         template<typename REP, typename PERIOD>
0078         [[nodiscard]]
0079         timed_schedule_operation schedule_after(
0080             const std::chrono::duration<REP, PERIOD>& delay,
0081             cancellation_token cancellationToken = {}) noexcept;
0082 
0083         /// Process events until the io_service is stopped.
0084         ///
0085         /// \return
0086         /// The number of events processed during this call.
0087         std::uint64_t process_events();
0088 
0089         /// Process events until either the io_service is stopped or
0090         /// there are no more pending events in the queue.
0091         ///
0092         /// \return
0093         /// The number of events processed during this call.
0094         std::uint64_t process_pending_events();
0095 
0096         /// Block until either one event is processed or the io_service is stopped.
0097         ///
0098         /// \return
0099         /// The number of events processed during this call.
0100         /// This will either be 0 or 1.
0101         std::uint64_t process_one_event();
0102 
0103         /// Process one event if there are any events pending, otherwise if there
0104         /// are no events pending or the io_service is stopped then return immediately.
0105         ///
0106         /// \return
0107         /// The number of events processed during this call.
0108         /// This will either be 0 or 1.
0109         std::uint64_t process_one_pending_event();
0110 
0111         /// Shut down the io_service.
0112         ///
0113         /// This will cause any threads currently in a call to one of the process_xxx() methods
0114         /// to return from that call once they finish processing the current event.
0115         ///
0116         /// This call does not wait until all threads have exited the event loop so you
0117         /// must use other synchronisation mechanisms to wait for those threads.
0118         void stop() noexcept;
0119 
0120         /// Reset an io_service to prepare it for resuming processing of events.
0121         ///
0122         /// Call this after a call to stop() to allow calls to process_xxx() methods
0123         /// to process events.
0124         ///
0125         /// After calling stop() you should ensure that all threads have returned from
0126         /// calls to process_xxx() methods before calling reset().
0127         void reset();
0128 
0129         bool is_stop_requested() const noexcept;
0130 
0131         void notify_work_started() noexcept;
0132 
0133         void notify_work_finished() noexcept;
0134 
0135 #if CPPCORO_OS_WINNT
0136         detail::win32::handle_t native_iocp_handle() noexcept;
0137         void ensure_winsock_initialised();
0138 #endif
0139 
0140     private:
0141 
0142         class timer_thread_state;
0143         class timer_queue;
0144 
0145         friend class schedule_operation;
0146         friend class timed_schedule_operation;
0147 
0148         void schedule_impl(schedule_operation* operation) noexcept;
0149 
0150         void try_reschedule_overflow_operations() noexcept;
0151 
0152         bool try_enter_event_loop() noexcept;
0153         void exit_event_loop() noexcept;
0154 
0155         bool try_process_one_event(bool waitForEvent);
0156 
0157         void post_wake_up_event() noexcept;
0158 
0159         timer_thread_state* ensure_timer_thread_started();
0160 
0161         static constexpr std::uint32_t stop_requested_flag = 1;
0162         static constexpr std::uint32_t active_thread_count_increment = 2;
0163 
0164         // Bit 0: stop_requested_flag
0165         // Bit 1-31: count of active threads currently running the event loop
0166         std::atomic<std::uint32_t> m_threadState;
0167 
0168         std::atomic<std::uint32_t> m_workCount;
0169 
0170 #if CPPCORO_OS_WINNT
0171         detail::win32::safe_handle m_iocpHandle;
0172 
0173         std::atomic<bool> m_winsockInitialised;
0174         std::mutex m_winsockInitialisationMutex;
0175 #endif
0176 
0177         // Head of a linked-list of schedule operations that are
0178         // ready to run but that failed to be queued to the I/O
0179         // completion port (eg. due to low memory).
0180         std::atomic<schedule_operation*> m_scheduleOperations;
0181 
0182         std::atomic<timer_thread_state*> m_timerState;
0183 
0184     };
0185 
0186     class io_service::schedule_operation
0187     {
0188     public:
0189 
0190         schedule_operation(io_service& service) noexcept
0191             : m_service(service)
0192         {}
0193 
0194         bool await_ready() const noexcept { return false; }
0195         void await_suspend(cppcoro::coroutine_handle<> awaiter) noexcept;
0196         void await_resume() const noexcept {}
0197 
0198     private:
0199 
0200         friend class io_service;
0201         friend class io_service::timed_schedule_operation;
0202 
0203         io_service& m_service;
0204         cppcoro::coroutine_handle<> m_awaiter;
0205         schedule_operation* m_next;
0206 
0207     };
0208 
0209     class io_service::timed_schedule_operation
0210     {
0211     public:
0212 
0213         timed_schedule_operation(
0214             io_service& service,
0215             std::chrono::high_resolution_clock::time_point resumeTime,
0216             cppcoro::cancellation_token cancellationToken) noexcept;
0217 
0218         timed_schedule_operation(timed_schedule_operation&& other) noexcept;
0219 
0220         ~timed_schedule_operation();
0221 
0222         timed_schedule_operation& operator=(timed_schedule_operation&& other) = delete;
0223         timed_schedule_operation(const timed_schedule_operation& other) = delete;
0224         timed_schedule_operation& operator=(const timed_schedule_operation& other) = delete;
0225 
0226         bool await_ready() const noexcept;
0227         void await_suspend(cppcoro::coroutine_handle<> awaiter);
0228         void await_resume();
0229 
0230     private:
0231 
0232         friend class io_service::timer_queue;
0233         friend class io_service::timer_thread_state;
0234 
0235         io_service::schedule_operation m_scheduleOperation;
0236         std::chrono::high_resolution_clock::time_point m_resumeTime;
0237 
0238         cppcoro::cancellation_token m_cancellationToken;
0239         std::optional<cppcoro::cancellation_registration> m_cancellationRegistration;
0240 
0241         timed_schedule_operation* m_next;
0242 
0243         std::atomic<std::uint32_t> m_refCount;
0244 
0245     };
0246 
0247     class io_work_scope
0248     {
0249     public:
0250 
0251         explicit io_work_scope(io_service& service) noexcept
0252             : m_service(&service)
0253         {
0254             service.notify_work_started();
0255         }
0256 
0257         io_work_scope(const io_work_scope& other) noexcept
0258             : m_service(other.m_service)
0259         {
0260             if (m_service != nullptr)
0261             {
0262                 m_service->notify_work_started();
0263             }
0264         }
0265 
0266         io_work_scope(io_work_scope&& other) noexcept
0267             : m_service(other.m_service)
0268         {
0269             other.m_service = nullptr;
0270         }
0271 
0272         ~io_work_scope()
0273         {
0274             if (m_service != nullptr)
0275             {
0276                 m_service->notify_work_finished();
0277             }
0278         }
0279 
0280         void swap(io_work_scope& other) noexcept
0281         {
0282             std::swap(m_service, other.m_service);
0283         }
0284 
0285         io_work_scope& operator=(io_work_scope other) noexcept
0286         {
0287             swap(other);
0288             return *this;
0289         }
0290 
0291         io_service& service() noexcept
0292         {
0293             return *m_service;
0294         }
0295 
0296     private:
0297 
0298         io_service* m_service;
0299 
0300     };
0301 
0302     inline void swap(io_work_scope& a, io_work_scope& b)
0303     {
0304         a.swap(b);
0305     }
0306 }
0307 
0308 template<typename REP, typename RATIO>
0309 cppcoro::io_service::timed_schedule_operation
0310 cppcoro::io_service::schedule_after(
0311     const std::chrono::duration<REP, RATIO>& duration,
0312     cppcoro::cancellation_token cancellationToken) noexcept
0313 {
0314     return timed_schedule_operation{
0315         *this,
0316         std::chrono::high_resolution_clock::now() + duration,
0317         std::move(cancellationToken)
0318     };
0319 }
0320 
0321 #endif