Warning, file /include/boost/asio/detail/impl/win_iocp_io_context.ipp was not indexed
or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #ifndef BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP
0012 #define BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP
0013
0014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0015 # pragma once
0016 #endif
0017
0018 #include <boost/asio/detail/config.hpp>
0019
0020 #if defined(BOOST_ASIO_HAS_IOCP)
0021
0022 #include <boost/asio/config.hpp>
0023 #include <boost/asio/error.hpp>
0024 #include <boost/asio/detail/cstdint.hpp>
0025 #include <boost/asio/detail/handler_alloc_helpers.hpp>
0026 #include <boost/asio/detail/limits.hpp>
0027 #include <boost/asio/detail/thread.hpp>
0028 #include <boost/asio/detail/throw_error.hpp>
0029 #include <boost/asio/detail/win_iocp_io_context.hpp>
0030
0031 #include <boost/asio/detail/push_options.hpp>
0032
0033 namespace boost {
0034 namespace asio {
0035 namespace detail {
0036
0037 struct win_iocp_io_context::thread_function
0038 {
0039 explicit thread_function(win_iocp_io_context* s)
0040 : this_(s)
0041 {
0042 }
0043
0044 void operator()()
0045 {
0046 boost::system::error_code ec;
0047 this_->run(ec);
0048 }
0049
0050 win_iocp_io_context* this_;
0051 };
0052
0053 struct win_iocp_io_context::work_finished_on_block_exit
0054 {
0055 ~work_finished_on_block_exit() noexcept(false)
0056 {
0057 io_context_->work_finished();
0058 }
0059
0060 win_iocp_io_context* io_context_;
0061 };
0062
0063 struct win_iocp_io_context::timer_thread_function
0064 {
0065 void operator()()
0066 {
0067 while (::InterlockedExchangeAdd(&io_context_->shutdown_, 0) == 0)
0068 {
0069 if (::WaitForSingleObject(io_context_->waitable_timer_.handle,
0070 INFINITE) == WAIT_OBJECT_0)
0071 {
0072 ::InterlockedExchange(&io_context_->dispatch_required_, 1);
0073 ::PostQueuedCompletionStatus(io_context_->iocp_.handle,
0074 0, wake_for_dispatch, 0);
0075 }
0076 }
0077 }
0078
0079 win_iocp_io_context* io_context_;
0080 };
0081
0082 win_iocp_io_context::win_iocp_io_context(
0083 boost::asio::execution_context& ctx, bool own_thread)
0084 : execution_context_service_base<win_iocp_io_context>(ctx),
0085 iocp_(),
0086 outstanding_work_(0),
0087 stopped_(0),
0088 stop_event_posted_(0),
0089 shutdown_(0),
0090 gqcs_timeout_(get_gqcs_timeout()),
0091 dispatch_required_(0),
0092 concurrency_hint_(config(ctx).get("scheduler", "concurrency_hint", -1))
0093 {
0094 BOOST_ASIO_HANDLER_TRACKING_INIT;
0095
0096 iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0,
0097 static_cast<DWORD>(concurrency_hint_ >= 0
0098 ? concurrency_hint_ : DWORD(~0)));
0099 if (!iocp_.handle)
0100 {
0101 DWORD last_error = ::GetLastError();
0102 boost::system::error_code ec(last_error,
0103 boost::asio::error::get_system_category());
0104 boost::asio::detail::throw_error(ec, "iocp");
0105 }
0106
0107 if (own_thread)
0108 {
0109 ::InterlockedIncrement(&outstanding_work_);
0110 thread_.reset(new boost::asio::detail::thread(thread_function(this)));
0111 }
0112 }
0113
0114 win_iocp_io_context::~win_iocp_io_context()
0115 {
0116 if (thread_.get())
0117 {
0118 stop();
0119 thread_->join();
0120 thread_.reset();
0121 }
0122 }
0123
0124 void win_iocp_io_context::shutdown()
0125 {
0126 ::InterlockedExchange(&shutdown_, 1);
0127
0128 if (timer_thread_.get())
0129 {
0130 LARGE_INTEGER timeout;
0131 timeout.QuadPart = 1;
0132 ::SetWaitableTimer(waitable_timer_.handle, &timeout, 1, 0, 0, FALSE);
0133 }
0134
0135 if (thread_.get())
0136 {
0137 stop();
0138 thread_->join();
0139 thread_.reset();
0140 ::InterlockedDecrement(&outstanding_work_);
0141 }
0142
0143 while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0)
0144 {
0145 op_queue<win_iocp_operation> ops;
0146 timer_queues_.get_all_timers(ops);
0147 ops.push(completed_ops_);
0148 if (!ops.empty())
0149 {
0150 while (win_iocp_operation* op = ops.front())
0151 {
0152 ops.pop();
0153 ::InterlockedDecrement(&outstanding_work_);
0154 op->destroy();
0155 }
0156 }
0157 else
0158 {
0159 DWORD bytes_transferred = 0;
0160 dword_ptr_t completion_key = 0;
0161 LPOVERLAPPED overlapped = 0;
0162 ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
0163 &completion_key, &overlapped, gqcs_timeout_);
0164 if (overlapped)
0165 {
0166 ::InterlockedDecrement(&outstanding_work_);
0167 static_cast<win_iocp_operation*>(overlapped)->destroy();
0168 }
0169 }
0170 }
0171
0172 if (timer_thread_.get())
0173 {
0174 timer_thread_->join();
0175 timer_thread_.reset();
0176 }
0177 }
0178
0179 boost::system::error_code win_iocp_io_context::register_handle(
0180 HANDLE handle, boost::system::error_code& ec)
0181 {
0182 if (::CreateIoCompletionPort(handle, iocp_.handle, 0, 0) == 0)
0183 {
0184 DWORD last_error = ::GetLastError();
0185 ec = boost::system::error_code(last_error,
0186 boost::asio::error::get_system_category());
0187 }
0188 else
0189 {
0190 ec = boost::system::error_code();
0191 }
0192 return ec;
0193 }
0194
0195 size_t win_iocp_io_context::run(boost::system::error_code& ec)
0196 {
0197 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
0198 {
0199 stop();
0200 ec = boost::system::error_code();
0201 return 0;
0202 }
0203
0204 win_iocp_thread_info this_thread;
0205 thread_call_stack::context ctx(this, this_thread);
0206
0207 size_t n = 0;
0208 while (do_one(INFINITE, this_thread, ec))
0209 if (n != (std::numeric_limits<size_t>::max)())
0210 ++n;
0211 return n;
0212 }
0213
0214 size_t win_iocp_io_context::run_one(boost::system::error_code& ec)
0215 {
0216 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
0217 {
0218 stop();
0219 ec = boost::system::error_code();
0220 return 0;
0221 }
0222
0223 win_iocp_thread_info this_thread;
0224 thread_call_stack::context ctx(this, this_thread);
0225
0226 return do_one(INFINITE, this_thread, ec);
0227 }
0228
0229 size_t win_iocp_io_context::wait_one(long usec, boost::system::error_code& ec)
0230 {
0231 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
0232 {
0233 stop();
0234 ec = boost::system::error_code();
0235 return 0;
0236 }
0237
0238 win_iocp_thread_info this_thread;
0239 thread_call_stack::context ctx(this, this_thread);
0240
0241 return do_one(usec < 0 ? INFINITE : ((usec - 1) / 1000 + 1), this_thread, ec);
0242 }
0243
0244 size_t win_iocp_io_context::poll(boost::system::error_code& ec)
0245 {
0246 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
0247 {
0248 stop();
0249 ec = boost::system::error_code();
0250 return 0;
0251 }
0252
0253 win_iocp_thread_info this_thread;
0254 thread_call_stack::context ctx(this, this_thread);
0255
0256 size_t n = 0;
0257 while (do_one(0, this_thread, ec))
0258 if (n != (std::numeric_limits<size_t>::max)())
0259 ++n;
0260 return n;
0261 }
0262
0263 size_t win_iocp_io_context::poll_one(boost::system::error_code& ec)
0264 {
0265 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
0266 {
0267 stop();
0268 ec = boost::system::error_code();
0269 return 0;
0270 }
0271
0272 win_iocp_thread_info this_thread;
0273 thread_call_stack::context ctx(this, this_thread);
0274
0275 return do_one(0, this_thread, ec);
0276 }
0277
0278 void win_iocp_io_context::stop()
0279 {
0280 if (::InterlockedExchange(&stopped_, 1) == 0)
0281 {
0282 if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
0283 {
0284 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
0285 {
0286 DWORD last_error = ::GetLastError();
0287 boost::system::error_code ec(last_error,
0288 boost::asio::error::get_system_category());
0289 boost::asio::detail::throw_error(ec, "pqcs");
0290 }
0291 }
0292 }
0293 }
0294
0295 bool win_iocp_io_context::can_dispatch()
0296 {
0297 return thread_call_stack::contains(this) != 0;
0298 }
0299
0300 void win_iocp_io_context::capture_current_exception()
0301 {
0302 if (thread_info_base* this_thread = thread_call_stack::contains(this))
0303 this_thread->capture_current_exception();
0304 }
0305
0306 void win_iocp_io_context::post_deferred_completion(win_iocp_operation* op)
0307 {
0308
0309 op->ready_ = 1;
0310
0311
0312 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op))
0313 {
0314
0315 mutex::scoped_lock lock(dispatch_mutex_);
0316 completed_ops_.push(op);
0317 ::InterlockedExchange(&dispatch_required_, 1);
0318 }
0319 }
0320
0321 void win_iocp_io_context::post_deferred_completions(
0322 op_queue<win_iocp_operation>& ops)
0323 {
0324 while (win_iocp_operation* op = ops.front())
0325 {
0326 ops.pop();
0327
0328
0329 op->ready_ = 1;
0330
0331
0332 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op))
0333 {
0334
0335 mutex::scoped_lock lock(dispatch_mutex_);
0336 completed_ops_.push(op);
0337 completed_ops_.push(ops);
0338 ::InterlockedExchange(&dispatch_required_, 1);
0339 }
0340 }
0341 }
0342
0343 void win_iocp_io_context::abandon_operations(
0344 op_queue<win_iocp_operation>& ops)
0345 {
0346 while (win_iocp_operation* op = ops.front())
0347 {
0348 ops.pop();
0349 ::InterlockedDecrement(&outstanding_work_);
0350 op->destroy();
0351 }
0352 }
0353
0354 void win_iocp_io_context::on_pending(win_iocp_operation* op)
0355 {
0356 if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
0357 {
0358
0359 if (!::PostQueuedCompletionStatus(iocp_.handle,
0360 0, overlapped_contains_result, op))
0361 {
0362
0363 mutex::scoped_lock lock(dispatch_mutex_);
0364 completed_ops_.push(op);
0365 ::InterlockedExchange(&dispatch_required_, 1);
0366 }
0367 }
0368 }
0369
0370 void win_iocp_io_context::on_completion(win_iocp_operation* op,
0371 DWORD last_error, DWORD bytes_transferred)
0372 {
0373
0374 op->ready_ = 1;
0375
0376
0377 op->Internal = reinterpret_cast<ulong_ptr_t>(
0378 &boost::asio::error::get_system_category());
0379 op->Offset = last_error;
0380 op->OffsetHigh = bytes_transferred;
0381
0382
0383 if (!::PostQueuedCompletionStatus(iocp_.handle,
0384 0, overlapped_contains_result, op))
0385 {
0386
0387 mutex::scoped_lock lock(dispatch_mutex_);
0388 completed_ops_.push(op);
0389 ::InterlockedExchange(&dispatch_required_, 1);
0390 }
0391 }
0392
0393 void win_iocp_io_context::on_completion(win_iocp_operation* op,
0394 const boost::system::error_code& ec, DWORD bytes_transferred)
0395 {
0396
0397 op->ready_ = 1;
0398
0399
0400 op->Internal = reinterpret_cast<ulong_ptr_t>(&ec.category());
0401 op->Offset = ec.value();
0402 op->OffsetHigh = bytes_transferred;
0403
0404
0405 if (!::PostQueuedCompletionStatus(iocp_.handle,
0406 0, overlapped_contains_result, op))
0407 {
0408
0409 mutex::scoped_lock lock(dispatch_mutex_);
0410 completed_ops_.push(op);
0411 ::InterlockedExchange(&dispatch_required_, 1);
0412 }
0413 }
0414
0415 size_t win_iocp_io_context::do_one(DWORD msec,
0416 win_iocp_thread_info& this_thread, boost::system::error_code& ec)
0417 {
0418 for (;;)
0419 {
0420
0421 if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)
0422 {
0423 mutex::scoped_lock lock(dispatch_mutex_);
0424
0425
0426 op_queue<win_iocp_operation> ops;
0427 ops.push(completed_ops_);
0428 timer_queues_.get_ready_timers(ops);
0429 post_deferred_completions(ops);
0430 update_timeout();
0431 }
0432
0433
0434 DWORD bytes_transferred = 0;
0435 dword_ptr_t completion_key = 0;
0436 LPOVERLAPPED overlapped = 0;
0437 ::SetLastError(0);
0438 BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle,
0439 &bytes_transferred, &completion_key, &overlapped,
0440 msec < gqcs_timeout_ ? msec : gqcs_timeout_);
0441 DWORD last_error = ::GetLastError();
0442
0443 if (overlapped)
0444 {
0445 win_iocp_operation* op = static_cast<win_iocp_operation*>(overlapped);
0446 boost::system::error_code result_ec(last_error,
0447 boost::asio::error::get_system_category());
0448
0449
0450
0451 if (completion_key == overlapped_contains_result)
0452 {
0453 result_ec = boost::system::error_code(static_cast<int>(op->Offset),
0454 *reinterpret_cast<boost::system::error_category*>(op->Internal));
0455 bytes_transferred = op->OffsetHigh;
0456 }
0457
0458
0459
0460 else
0461 {
0462 op->Internal = reinterpret_cast<ulong_ptr_t>(&result_ec.category());
0463 op->Offset = result_ec.value();
0464 op->OffsetHigh = bytes_transferred;
0465 }
0466
0467
0468
0469
0470
0471 if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
0472 {
0473
0474 work_finished_on_block_exit on_exit = { this };
0475 (void)on_exit;
0476
0477 op->complete(this, result_ec, bytes_transferred);
0478 this_thread.rethrow_pending_exception();
0479 ec = boost::system::error_code();
0480 return 1;
0481 }
0482 }
0483 else if (!ok)
0484 {
0485 if (last_error != WAIT_TIMEOUT)
0486 {
0487 ec = boost::system::error_code(last_error,
0488 boost::asio::error::get_system_category());
0489 return 0;
0490 }
0491
0492
0493
0494 if (msec == INFINITE)
0495 continue;
0496
0497 ec = boost::system::error_code();
0498 return 0;
0499 }
0500 else if (completion_key == wake_for_dispatch)
0501 {
0502
0503
0504 }
0505 else
0506 {
0507
0508 ::InterlockedExchange(&stop_event_posted_, 0);
0509
0510
0511
0512 if (::InterlockedExchangeAdd(&stopped_, 0) != 0)
0513 {
0514
0515 if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
0516 {
0517 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
0518 {
0519 last_error = ::GetLastError();
0520 ec = boost::system::error_code(last_error,
0521 boost::asio::error::get_system_category());
0522 return 0;
0523 }
0524 }
0525
0526 ec = boost::system::error_code();
0527 return 0;
0528 }
0529 }
0530 }
0531 }
0532
0533 DWORD win_iocp_io_context::get_gqcs_timeout()
0534 {
0535 #if !defined(_WIN32_WINNT) || (_WIN32_WINNT < 0x0600)
0536 OSVERSIONINFOEX osvi;
0537 ZeroMemory(&osvi, sizeof(osvi));
0538 osvi.dwOSVersionInfoSize = sizeof(osvi);
0539 osvi.dwMajorVersion = 6ul;
0540
0541 const uint64_t condition_mask = ::VerSetConditionMask(
0542 0, VER_MAJORVERSION, VER_GREATER_EQUAL);
0543
0544 if (!!::VerifyVersionInfo(&osvi, VER_MAJORVERSION, condition_mask))
0545 return INFINITE;
0546
0547 return default_gqcs_timeout;
0548 #else
0549 return INFINITE;
0550 #endif
0551 }
0552
0553 void win_iocp_io_context::do_add_timer_queue(timer_queue_base& queue)
0554 {
0555 mutex::scoped_lock lock(dispatch_mutex_);
0556
0557 timer_queues_.insert(&queue);
0558
0559 if (!waitable_timer_.handle)
0560 {
0561 waitable_timer_.handle = ::CreateWaitableTimer(0, FALSE, 0);
0562 if (waitable_timer_.handle == 0)
0563 {
0564 DWORD last_error = ::GetLastError();
0565 boost::system::error_code ec(last_error,
0566 boost::asio::error::get_system_category());
0567 boost::asio::detail::throw_error(ec, "timer");
0568 }
0569
0570 LARGE_INTEGER timeout;
0571 timeout.QuadPart = -max_timeout_usec;
0572 timeout.QuadPart *= 10;
0573 ::SetWaitableTimer(waitable_timer_.handle,
0574 &timeout, max_timeout_msec, 0, 0, FALSE);
0575 }
0576
0577 if (!timer_thread_.get())
0578 {
0579 timer_thread_function thread_function = { this };
0580 timer_thread_.reset(new thread(thread_function, 65536));
0581 }
0582 }
0583
0584 void win_iocp_io_context::do_remove_timer_queue(timer_queue_base& queue)
0585 {
0586 mutex::scoped_lock lock(dispatch_mutex_);
0587
0588 timer_queues_.erase(&queue);
0589 }
0590
0591 void win_iocp_io_context::update_timeout()
0592 {
0593 if (timer_thread_.get())
0594 {
0595
0596
0597
0598 long timeout_usec = timer_queues_.wait_duration_usec(max_timeout_usec);
0599 if (timeout_usec < max_timeout_usec)
0600 {
0601 LARGE_INTEGER timeout;
0602 timeout.QuadPart = -timeout_usec;
0603 timeout.QuadPart *= 10;
0604 ::SetWaitableTimer(waitable_timer_.handle,
0605 &timeout, max_timeout_msec, 0, 0, FALSE);
0606 }
0607 }
0608 }
0609
0610 }
0611 }
0612 }
0613
0614 #include <boost/asio/detail/pop_options.hpp>
0615
0616 #endif
0617
0618 #endif