File indexing completed on 2025-01-18 09:28:35
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #ifndef BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP
0012 #define BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP
0013
0014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0015 # pragma once
0016 #endif
0017
0018 #include <boost/asio/detail/config.hpp>
0019
0020 #if defined(BOOST_ASIO_HAS_IOCP)
0021
0022 #include <boost/asio/error.hpp>
0023 #include <boost/asio/detail/cstdint.hpp>
0024 #include <boost/asio/detail/handler_alloc_helpers.hpp>
0025 #include <boost/asio/detail/limits.hpp>
0026 #include <boost/asio/detail/thread.hpp>
0027 #include <boost/asio/detail/throw_error.hpp>
0028 #include <boost/asio/detail/win_iocp_io_context.hpp>
0029
0030 #include <boost/asio/detail/push_options.hpp>
0031
0032 namespace boost {
0033 namespace asio {
0034 namespace detail {
0035
0036 struct win_iocp_io_context::thread_function
0037 {
0038 explicit thread_function(win_iocp_io_context* s)
0039 : this_(s)
0040 {
0041 }
0042
0043 void operator()()
0044 {
0045 boost::system::error_code ec;
0046 this_->run(ec);
0047 }
0048
0049 win_iocp_io_context* this_;
0050 };
0051
0052 struct win_iocp_io_context::work_finished_on_block_exit
0053 {
0054 ~work_finished_on_block_exit() noexcept(false)
0055 {
0056 io_context_->work_finished();
0057 }
0058
0059 win_iocp_io_context* io_context_;
0060 };
0061
0062 struct win_iocp_io_context::timer_thread_function
0063 {
0064 void operator()()
0065 {
0066 while (::InterlockedExchangeAdd(&io_context_->shutdown_, 0) == 0)
0067 {
0068 if (::WaitForSingleObject(io_context_->waitable_timer_.handle,
0069 INFINITE) == WAIT_OBJECT_0)
0070 {
0071 ::InterlockedExchange(&io_context_->dispatch_required_, 1);
0072 ::PostQueuedCompletionStatus(io_context_->iocp_.handle,
0073 0, wake_for_dispatch, 0);
0074 }
0075 }
0076 }
0077
0078 win_iocp_io_context* io_context_;
0079 };
0080
0081 win_iocp_io_context::win_iocp_io_context(
0082 boost::asio::execution_context& ctx, int concurrency_hint, bool own_thread)
0083 : execution_context_service_base<win_iocp_io_context>(ctx),
0084 iocp_(),
0085 outstanding_work_(0),
0086 stopped_(0),
0087 stop_event_posted_(0),
0088 shutdown_(0),
0089 gqcs_timeout_(get_gqcs_timeout()),
0090 dispatch_required_(0),
0091 concurrency_hint_(concurrency_hint)
0092 {
0093 BOOST_ASIO_HANDLER_TRACKING_INIT;
0094
0095 iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0,
0096 static_cast<DWORD>(concurrency_hint >= 0 ? concurrency_hint : DWORD(~0)));
0097 if (!iocp_.handle)
0098 {
0099 DWORD last_error = ::GetLastError();
0100 boost::system::error_code ec(last_error,
0101 boost::asio::error::get_system_category());
0102 boost::asio::detail::throw_error(ec, "iocp");
0103 }
0104
0105 if (own_thread)
0106 {
0107 ::InterlockedIncrement(&outstanding_work_);
0108 thread_.reset(new boost::asio::detail::thread(thread_function(this)));
0109 }
0110 }
0111
0112 win_iocp_io_context::~win_iocp_io_context()
0113 {
0114 if (thread_.get())
0115 {
0116 stop();
0117 thread_->join();
0118 thread_.reset();
0119 }
0120 }
0121
0122 void win_iocp_io_context::shutdown()
0123 {
0124 ::InterlockedExchange(&shutdown_, 1);
0125
0126 if (timer_thread_.get())
0127 {
0128 LARGE_INTEGER timeout;
0129 timeout.QuadPart = 1;
0130 ::SetWaitableTimer(waitable_timer_.handle, &timeout, 1, 0, 0, FALSE);
0131 }
0132
0133 if (thread_.get())
0134 {
0135 stop();
0136 thread_->join();
0137 thread_.reset();
0138 ::InterlockedDecrement(&outstanding_work_);
0139 }
0140
0141 while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0)
0142 {
0143 op_queue<win_iocp_operation> ops;
0144 timer_queues_.get_all_timers(ops);
0145 ops.push(completed_ops_);
0146 if (!ops.empty())
0147 {
0148 while (win_iocp_operation* op = ops.front())
0149 {
0150 ops.pop();
0151 ::InterlockedDecrement(&outstanding_work_);
0152 op->destroy();
0153 }
0154 }
0155 else
0156 {
0157 DWORD bytes_transferred = 0;
0158 dword_ptr_t completion_key = 0;
0159 LPOVERLAPPED overlapped = 0;
0160 ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
0161 &completion_key, &overlapped, gqcs_timeout_);
0162 if (overlapped)
0163 {
0164 ::InterlockedDecrement(&outstanding_work_);
0165 static_cast<win_iocp_operation*>(overlapped)->destroy();
0166 }
0167 }
0168 }
0169
0170 if (timer_thread_.get())
0171 {
0172 timer_thread_->join();
0173 timer_thread_.reset();
0174 }
0175 }
0176
0177 boost::system::error_code win_iocp_io_context::register_handle(
0178 HANDLE handle, boost::system::error_code& ec)
0179 {
0180 if (::CreateIoCompletionPort(handle, iocp_.handle, 0, 0) == 0)
0181 {
0182 DWORD last_error = ::GetLastError();
0183 ec = boost::system::error_code(last_error,
0184 boost::asio::error::get_system_category());
0185 }
0186 else
0187 {
0188 ec = boost::system::error_code();
0189 }
0190 return ec;
0191 }
0192
0193 size_t win_iocp_io_context::run(boost::system::error_code& ec)
0194 {
0195 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
0196 {
0197 stop();
0198 ec = boost::system::error_code();
0199 return 0;
0200 }
0201
0202 win_iocp_thread_info this_thread;
0203 thread_call_stack::context ctx(this, this_thread);
0204
0205 size_t n = 0;
0206 while (do_one(INFINITE, this_thread, ec))
0207 if (n != (std::numeric_limits<size_t>::max)())
0208 ++n;
0209 return n;
0210 }
0211
0212 size_t win_iocp_io_context::run_one(boost::system::error_code& ec)
0213 {
0214 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
0215 {
0216 stop();
0217 ec = boost::system::error_code();
0218 return 0;
0219 }
0220
0221 win_iocp_thread_info this_thread;
0222 thread_call_stack::context ctx(this, this_thread);
0223
0224 return do_one(INFINITE, this_thread, ec);
0225 }
0226
0227 size_t win_iocp_io_context::wait_one(long usec, boost::system::error_code& ec)
0228 {
0229 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
0230 {
0231 stop();
0232 ec = boost::system::error_code();
0233 return 0;
0234 }
0235
0236 win_iocp_thread_info this_thread;
0237 thread_call_stack::context ctx(this, this_thread);
0238
0239 return do_one(usec < 0 ? INFINITE : ((usec - 1) / 1000 + 1), this_thread, ec);
0240 }
0241
0242 size_t win_iocp_io_context::poll(boost::system::error_code& ec)
0243 {
0244 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
0245 {
0246 stop();
0247 ec = boost::system::error_code();
0248 return 0;
0249 }
0250
0251 win_iocp_thread_info this_thread;
0252 thread_call_stack::context ctx(this, this_thread);
0253
0254 size_t n = 0;
0255 while (do_one(0, this_thread, ec))
0256 if (n != (std::numeric_limits<size_t>::max)())
0257 ++n;
0258 return n;
0259 }
0260
0261 size_t win_iocp_io_context::poll_one(boost::system::error_code& ec)
0262 {
0263 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
0264 {
0265 stop();
0266 ec = boost::system::error_code();
0267 return 0;
0268 }
0269
0270 win_iocp_thread_info this_thread;
0271 thread_call_stack::context ctx(this, this_thread);
0272
0273 return do_one(0, this_thread, ec);
0274 }
0275
0276 void win_iocp_io_context::stop()
0277 {
0278 if (::InterlockedExchange(&stopped_, 1) == 0)
0279 {
0280 if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
0281 {
0282 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
0283 {
0284 DWORD last_error = ::GetLastError();
0285 boost::system::error_code ec(last_error,
0286 boost::asio::error::get_system_category());
0287 boost::asio::detail::throw_error(ec, "pqcs");
0288 }
0289 }
0290 }
0291 }
0292
0293 bool win_iocp_io_context::can_dispatch()
0294 {
0295 return thread_call_stack::contains(this) != 0;
0296 }
0297
0298 void win_iocp_io_context::capture_current_exception()
0299 {
0300 if (thread_info_base* this_thread = thread_call_stack::contains(this))
0301 this_thread->capture_current_exception();
0302 }
0303
0304 void win_iocp_io_context::post_deferred_completion(win_iocp_operation* op)
0305 {
0306
0307 op->ready_ = 1;
0308
0309
0310 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op))
0311 {
0312
0313 mutex::scoped_lock lock(dispatch_mutex_);
0314 completed_ops_.push(op);
0315 ::InterlockedExchange(&dispatch_required_, 1);
0316 }
0317 }
0318
0319 void win_iocp_io_context::post_deferred_completions(
0320 op_queue<win_iocp_operation>& ops)
0321 {
0322 while (win_iocp_operation* op = ops.front())
0323 {
0324 ops.pop();
0325
0326
0327 op->ready_ = 1;
0328
0329
0330 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op))
0331 {
0332
0333 mutex::scoped_lock lock(dispatch_mutex_);
0334 completed_ops_.push(op);
0335 completed_ops_.push(ops);
0336 ::InterlockedExchange(&dispatch_required_, 1);
0337 }
0338 }
0339 }
0340
0341 void win_iocp_io_context::abandon_operations(
0342 op_queue<win_iocp_operation>& ops)
0343 {
0344 while (win_iocp_operation* op = ops.front())
0345 {
0346 ops.pop();
0347 ::InterlockedDecrement(&outstanding_work_);
0348 op->destroy();
0349 }
0350 }
0351
0352 void win_iocp_io_context::on_pending(win_iocp_operation* op)
0353 {
0354 if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
0355 {
0356
0357 if (!::PostQueuedCompletionStatus(iocp_.handle,
0358 0, overlapped_contains_result, op))
0359 {
0360
0361 mutex::scoped_lock lock(dispatch_mutex_);
0362 completed_ops_.push(op);
0363 ::InterlockedExchange(&dispatch_required_, 1);
0364 }
0365 }
0366 }
0367
0368 void win_iocp_io_context::on_completion(win_iocp_operation* op,
0369 DWORD last_error, DWORD bytes_transferred)
0370 {
0371
0372 op->ready_ = 1;
0373
0374
0375 op->Internal = reinterpret_cast<ulong_ptr_t>(
0376 &boost::asio::error::get_system_category());
0377 op->Offset = last_error;
0378 op->OffsetHigh = bytes_transferred;
0379
0380
0381 if (!::PostQueuedCompletionStatus(iocp_.handle,
0382 0, overlapped_contains_result, op))
0383 {
0384
0385 mutex::scoped_lock lock(dispatch_mutex_);
0386 completed_ops_.push(op);
0387 ::InterlockedExchange(&dispatch_required_, 1);
0388 }
0389 }
0390
0391 void win_iocp_io_context::on_completion(win_iocp_operation* op,
0392 const boost::system::error_code& ec, DWORD bytes_transferred)
0393 {
0394
0395 op->ready_ = 1;
0396
0397
0398 op->Internal = reinterpret_cast<ulong_ptr_t>(&ec.category());
0399 op->Offset = ec.value();
0400 op->OffsetHigh = bytes_transferred;
0401
0402
0403 if (!::PostQueuedCompletionStatus(iocp_.handle,
0404 0, overlapped_contains_result, op))
0405 {
0406
0407 mutex::scoped_lock lock(dispatch_mutex_);
0408 completed_ops_.push(op);
0409 ::InterlockedExchange(&dispatch_required_, 1);
0410 }
0411 }
0412
0413 size_t win_iocp_io_context::do_one(DWORD msec,
0414 win_iocp_thread_info& this_thread, boost::system::error_code& ec)
0415 {
0416 for (;;)
0417 {
0418
0419 if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)
0420 {
0421 mutex::scoped_lock lock(dispatch_mutex_);
0422
0423
0424 op_queue<win_iocp_operation> ops;
0425 ops.push(completed_ops_);
0426 timer_queues_.get_ready_timers(ops);
0427 post_deferred_completions(ops);
0428 update_timeout();
0429 }
0430
0431
0432 DWORD bytes_transferred = 0;
0433 dword_ptr_t completion_key = 0;
0434 LPOVERLAPPED overlapped = 0;
0435 ::SetLastError(0);
0436 BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle,
0437 &bytes_transferred, &completion_key, &overlapped,
0438 msec < gqcs_timeout_ ? msec : gqcs_timeout_);
0439 DWORD last_error = ::GetLastError();
0440
0441 if (overlapped)
0442 {
0443 win_iocp_operation* op = static_cast<win_iocp_operation*>(overlapped);
0444 boost::system::error_code result_ec(last_error,
0445 boost::asio::error::get_system_category());
0446
0447
0448
0449 if (completion_key == overlapped_contains_result)
0450 {
0451 result_ec = boost::system::error_code(static_cast<int>(op->Offset),
0452 *reinterpret_cast<boost::system::error_category*>(op->Internal));
0453 bytes_transferred = op->OffsetHigh;
0454 }
0455
0456
0457
0458 else
0459 {
0460 op->Internal = reinterpret_cast<ulong_ptr_t>(&result_ec.category());
0461 op->Offset = result_ec.value();
0462 op->OffsetHigh = bytes_transferred;
0463 }
0464
0465
0466
0467
0468
0469 if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
0470 {
0471
0472 work_finished_on_block_exit on_exit = { this };
0473 (void)on_exit;
0474
0475 op->complete(this, result_ec, bytes_transferred);
0476 this_thread.rethrow_pending_exception();
0477 ec = boost::system::error_code();
0478 return 1;
0479 }
0480 }
0481 else if (!ok)
0482 {
0483 if (last_error != WAIT_TIMEOUT)
0484 {
0485 ec = boost::system::error_code(last_error,
0486 boost::asio::error::get_system_category());
0487 return 0;
0488 }
0489
0490
0491
0492 if (msec == INFINITE)
0493 continue;
0494
0495 ec = boost::system::error_code();
0496 return 0;
0497 }
0498 else if (completion_key == wake_for_dispatch)
0499 {
0500
0501
0502 }
0503 else
0504 {
0505
0506 ::InterlockedExchange(&stop_event_posted_, 0);
0507
0508
0509
0510 if (::InterlockedExchangeAdd(&stopped_, 0) != 0)
0511 {
0512
0513 if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
0514 {
0515 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
0516 {
0517 last_error = ::GetLastError();
0518 ec = boost::system::error_code(last_error,
0519 boost::asio::error::get_system_category());
0520 return 0;
0521 }
0522 }
0523
0524 ec = boost::system::error_code();
0525 return 0;
0526 }
0527 }
0528 }
0529 }
0530
0531 DWORD win_iocp_io_context::get_gqcs_timeout()
0532 {
0533 #if !defined(_WIN32_WINNT) || (_WIN32_WINNT < 0x0600)
0534 OSVERSIONINFOEX osvi;
0535 ZeroMemory(&osvi, sizeof(osvi));
0536 osvi.dwOSVersionInfoSize = sizeof(osvi);
0537 osvi.dwMajorVersion = 6ul;
0538
0539 const uint64_t condition_mask = ::VerSetConditionMask(
0540 0, VER_MAJORVERSION, VER_GREATER_EQUAL);
0541
0542 if (!!::VerifyVersionInfo(&osvi, VER_MAJORVERSION, condition_mask))
0543 return INFINITE;
0544
0545 return default_gqcs_timeout;
0546 #else
0547 return INFINITE;
0548 #endif
0549 }
0550
0551 void win_iocp_io_context::do_add_timer_queue(timer_queue_base& queue)
0552 {
0553 mutex::scoped_lock lock(dispatch_mutex_);
0554
0555 timer_queues_.insert(&queue);
0556
0557 if (!waitable_timer_.handle)
0558 {
0559 waitable_timer_.handle = ::CreateWaitableTimer(0, FALSE, 0);
0560 if (waitable_timer_.handle == 0)
0561 {
0562 DWORD last_error = ::GetLastError();
0563 boost::system::error_code ec(last_error,
0564 boost::asio::error::get_system_category());
0565 boost::asio::detail::throw_error(ec, "timer");
0566 }
0567
0568 LARGE_INTEGER timeout;
0569 timeout.QuadPart = -max_timeout_usec;
0570 timeout.QuadPart *= 10;
0571 ::SetWaitableTimer(waitable_timer_.handle,
0572 &timeout, max_timeout_msec, 0, 0, FALSE);
0573 }
0574
0575 if (!timer_thread_.get())
0576 {
0577 timer_thread_function thread_function = { this };
0578 timer_thread_.reset(new thread(thread_function, 65536));
0579 }
0580 }
0581
0582 void win_iocp_io_context::do_remove_timer_queue(timer_queue_base& queue)
0583 {
0584 mutex::scoped_lock lock(dispatch_mutex_);
0585
0586 timer_queues_.erase(&queue);
0587 }
0588
0589 void win_iocp_io_context::update_timeout()
0590 {
0591 if (timer_thread_.get())
0592 {
0593
0594
0595
0596 long timeout_usec = timer_queues_.wait_duration_usec(max_timeout_usec);
0597 if (timeout_usec < max_timeout_usec)
0598 {
0599 LARGE_INTEGER timeout;
0600 timeout.QuadPart = -timeout_usec;
0601 timeout.QuadPart *= 10;
0602 ::SetWaitableTimer(waitable_timer_.handle,
0603 &timeout, max_timeout_msec, 0, 0, FALSE);
0604 }
0605 }
0606 }
0607
0608 }
0609 }
0610 }
0611
0612 #include <boost/asio/detail/pop_options.hpp>
0613
0614 #endif
0615
0616 #endif