File indexing completed on 2025-01-18 09:54:52
0001
0002
0003
0004
0005 #ifndef CPPCORO_IO_SERVICE_HPP_INCLUDED
0006 #define CPPCORO_IO_SERVICE_HPP_INCLUDED
0007
0008 #include <cppcoro/config.hpp>
0009 #include <cppcoro/cancellation_token.hpp>
0010 #include <cppcoro/cancellation_registration.hpp>
0011
0012 #if CPPCORO_OS_WINNT
0013 # include <cppcoro/detail/win32.hpp>
0014 #endif
0015
0016 #include <optional>
0017 #include <chrono>
0018 #include <cstdint>
0019 #include <atomic>
0020 #include <utility>
0021 #include <mutex>
0022 #include <cppcoro/coroutine.hpp>
0023
0024 namespace cppcoro
0025 {
0026 class io_service
0027 {
0028 public:
0029
0030 class schedule_operation;
0031 class timed_schedule_operation;
0032
0033
0034
0035
0036
0037 io_service();
0038
0039
0040
0041
0042
0043
0044
0045
0046 io_service(std::uint32_t concurrencyHint);
0047
0048 ~io_service();
0049
0050 io_service(io_service&& other) = delete;
0051 io_service(const io_service& other) = delete;
0052 io_service& operator=(io_service&& other) = delete;
0053 io_service& operator=(const io_service& other) = delete;
0054
0055
0056
0057
0058 [[nodiscard]]
0059 schedule_operation schedule() noexcept;
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077 template<typename REP, typename PERIOD>
0078 [[nodiscard]]
0079 timed_schedule_operation schedule_after(
0080 const std::chrono::duration<REP, PERIOD>& delay,
0081 cancellation_token cancellationToken = {}) noexcept;
0082
0083
0084
0085
0086
0087 std::uint64_t process_events();
0088
0089
0090
0091
0092
0093
0094 std::uint64_t process_pending_events();
0095
0096
0097
0098
0099
0100
0101 std::uint64_t process_one_event();
0102
0103
0104
0105
0106
0107
0108
0109 std::uint64_t process_one_pending_event();
0110
0111
0112
0113
0114
0115
0116
0117
0118 void stop() noexcept;
0119
0120
0121
0122
0123
0124
0125
0126
0127 void reset();
0128
0129 bool is_stop_requested() const noexcept;
0130
0131 void notify_work_started() noexcept;
0132
0133 void notify_work_finished() noexcept;
0134
0135 #if CPPCORO_OS_WINNT
0136 detail::win32::handle_t native_iocp_handle() noexcept;
0137 void ensure_winsock_initialised();
0138 #endif
0139
0140 private:
0141
0142 class timer_thread_state;
0143 class timer_queue;
0144
0145 friend class schedule_operation;
0146 friend class timed_schedule_operation;
0147
0148 void schedule_impl(schedule_operation* operation) noexcept;
0149
0150 void try_reschedule_overflow_operations() noexcept;
0151
0152 bool try_enter_event_loop() noexcept;
0153 void exit_event_loop() noexcept;
0154
0155 bool try_process_one_event(bool waitForEvent);
0156
0157 void post_wake_up_event() noexcept;
0158
0159 timer_thread_state* ensure_timer_thread_started();
0160
0161 static constexpr std::uint32_t stop_requested_flag = 1;
0162 static constexpr std::uint32_t active_thread_count_increment = 2;
0163
0164
0165
0166 std::atomic<std::uint32_t> m_threadState;
0167
0168 std::atomic<std::uint32_t> m_workCount;
0169
0170 #if CPPCORO_OS_WINNT
0171 detail::win32::safe_handle m_iocpHandle;
0172
0173 std::atomic<bool> m_winsockInitialised;
0174 std::mutex m_winsockInitialisationMutex;
0175 #endif
0176
0177
0178
0179
0180 std::atomic<schedule_operation*> m_scheduleOperations;
0181
0182 std::atomic<timer_thread_state*> m_timerState;
0183
0184 };
0185
0186 class io_service::schedule_operation
0187 {
0188 public:
0189
0190 schedule_operation(io_service& service) noexcept
0191 : m_service(service)
0192 {}
0193
0194 bool await_ready() const noexcept { return false; }
0195 void await_suspend(cppcoro::coroutine_handle<> awaiter) noexcept;
0196 void await_resume() const noexcept {}
0197
0198 private:
0199
0200 friend class io_service;
0201 friend class io_service::timed_schedule_operation;
0202
0203 io_service& m_service;
0204 cppcoro::coroutine_handle<> m_awaiter;
0205 schedule_operation* m_next;
0206
0207 };
0208
0209 class io_service::timed_schedule_operation
0210 {
0211 public:
0212
0213 timed_schedule_operation(
0214 io_service& service,
0215 std::chrono::high_resolution_clock::time_point resumeTime,
0216 cppcoro::cancellation_token cancellationToken) noexcept;
0217
0218 timed_schedule_operation(timed_schedule_operation&& other) noexcept;
0219
0220 ~timed_schedule_operation();
0221
0222 timed_schedule_operation& operator=(timed_schedule_operation&& other) = delete;
0223 timed_schedule_operation(const timed_schedule_operation& other) = delete;
0224 timed_schedule_operation& operator=(const timed_schedule_operation& other) = delete;
0225
0226 bool await_ready() const noexcept;
0227 void await_suspend(cppcoro::coroutine_handle<> awaiter);
0228 void await_resume();
0229
0230 private:
0231
0232 friend class io_service::timer_queue;
0233 friend class io_service::timer_thread_state;
0234
0235 io_service::schedule_operation m_scheduleOperation;
0236 std::chrono::high_resolution_clock::time_point m_resumeTime;
0237
0238 cppcoro::cancellation_token m_cancellationToken;
0239 std::optional<cppcoro::cancellation_registration> m_cancellationRegistration;
0240
0241 timed_schedule_operation* m_next;
0242
0243 std::atomic<std::uint32_t> m_refCount;
0244
0245 };
0246
0247 class io_work_scope
0248 {
0249 public:
0250
0251 explicit io_work_scope(io_service& service) noexcept
0252 : m_service(&service)
0253 {
0254 service.notify_work_started();
0255 }
0256
0257 io_work_scope(const io_work_scope& other) noexcept
0258 : m_service(other.m_service)
0259 {
0260 if (m_service != nullptr)
0261 {
0262 m_service->notify_work_started();
0263 }
0264 }
0265
0266 io_work_scope(io_work_scope&& other) noexcept
0267 : m_service(other.m_service)
0268 {
0269 other.m_service = nullptr;
0270 }
0271
0272 ~io_work_scope()
0273 {
0274 if (m_service != nullptr)
0275 {
0276 m_service->notify_work_finished();
0277 }
0278 }
0279
0280 void swap(io_work_scope& other) noexcept
0281 {
0282 std::swap(m_service, other.m_service);
0283 }
0284
0285 io_work_scope& operator=(io_work_scope other) noexcept
0286 {
0287 swap(other);
0288 return *this;
0289 }
0290
0291 io_service& service() noexcept
0292 {
0293 return *m_service;
0294 }
0295
0296 private:
0297
0298 io_service* m_service;
0299
0300 };
0301
0302 inline void swap(io_work_scope& a, io_work_scope& b)
0303 {
0304 a.swap(b);
0305 }
0306 }
0307
0308 template<typename REP, typename RATIO>
0309 cppcoro::io_service::timed_schedule_operation
0310 cppcoro::io_service::schedule_after(
0311 const std::chrono::duration<REP, RATIO>& duration,
0312 cppcoro::cancellation_token cancellationToken) noexcept
0313 {
0314 return timed_schedule_operation{
0315 *this,
0316 std::chrono::high_resolution_clock::now() + duration,
0317 std::move(cancellationToken)
0318 };
0319 }
0320
0321 #endif