Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:28:35

0001 //
0002 // detail/impl/win_iocp_io_context.ipp
0003 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0004 //
0005 // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
0006 //
0007 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0009 //
0010 
0011 #ifndef BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP
0012 #define BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP
0013 
0014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0015 # pragma once
0016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
0017 
0018 #include <boost/asio/detail/config.hpp>
0019 
0020 #if defined(BOOST_ASIO_HAS_IOCP)
0021 
0022 #include <boost/asio/error.hpp>
0023 #include <boost/asio/detail/cstdint.hpp>
0024 #include <boost/asio/detail/handler_alloc_helpers.hpp>
0025 #include <boost/asio/detail/limits.hpp>
0026 #include <boost/asio/detail/thread.hpp>
0027 #include <boost/asio/detail/throw_error.hpp>
0028 #include <boost/asio/detail/win_iocp_io_context.hpp>
0029 
0030 #include <boost/asio/detail/push_options.hpp>
0031 
0032 namespace boost {
0033 namespace asio {
0034 namespace detail {
0035 
0036 struct win_iocp_io_context::thread_function
0037 {
0038   explicit thread_function(win_iocp_io_context* s)
0039     : this_(s)
0040   {
0041   }
0042 
0043   void operator()()
0044   {
0045     boost::system::error_code ec;
0046     this_->run(ec);
0047   }
0048 
0049   win_iocp_io_context* this_;
0050 };
0051 
0052 struct win_iocp_io_context::work_finished_on_block_exit
0053 {
0054   ~work_finished_on_block_exit() noexcept(false)
0055   {
0056     io_context_->work_finished();
0057   }
0058 
0059   win_iocp_io_context* io_context_;
0060 };
0061 
0062 struct win_iocp_io_context::timer_thread_function
0063 {
0064   void operator()()
0065   {
0066     while (::InterlockedExchangeAdd(&io_context_->shutdown_, 0) == 0)
0067     {
0068       if (::WaitForSingleObject(io_context_->waitable_timer_.handle,
0069             INFINITE) == WAIT_OBJECT_0)
0070       {
0071         ::InterlockedExchange(&io_context_->dispatch_required_, 1);
0072         ::PostQueuedCompletionStatus(io_context_->iocp_.handle,
0073             0, wake_for_dispatch, 0);
0074       }
0075     }
0076   }
0077 
0078   win_iocp_io_context* io_context_;
0079 };
0080 
0081 win_iocp_io_context::win_iocp_io_context(
0082     boost::asio::execution_context& ctx, int concurrency_hint, bool own_thread)
0083   : execution_context_service_base<win_iocp_io_context>(ctx),
0084     iocp_(),
0085     outstanding_work_(0),
0086     stopped_(0),
0087     stop_event_posted_(0),
0088     shutdown_(0),
0089     gqcs_timeout_(get_gqcs_timeout()),
0090     dispatch_required_(0),
0091     concurrency_hint_(concurrency_hint)
0092 {
0093   BOOST_ASIO_HANDLER_TRACKING_INIT;
0094 
0095   iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0,
0096       static_cast<DWORD>(concurrency_hint >= 0 ? concurrency_hint : DWORD(~0)));
0097   if (!iocp_.handle)
0098   {
0099     DWORD last_error = ::GetLastError();
0100     boost::system::error_code ec(last_error,
0101         boost::asio::error::get_system_category());
0102     boost::asio::detail::throw_error(ec, "iocp");
0103   }
0104 
0105   if (own_thread)
0106   {
0107     ::InterlockedIncrement(&outstanding_work_);
0108     thread_.reset(new boost::asio::detail::thread(thread_function(this)));
0109   }
0110 }
0111 
0112 win_iocp_io_context::~win_iocp_io_context()
0113 {
0114   if (thread_.get())
0115   {
0116     stop();
0117     thread_->join();
0118     thread_.reset();
0119   }
0120 }
0121 
0122 void win_iocp_io_context::shutdown()
0123 {
0124   ::InterlockedExchange(&shutdown_, 1);
0125 
0126   if (timer_thread_.get())
0127   {
0128     LARGE_INTEGER timeout;
0129     timeout.QuadPart = 1;
0130     ::SetWaitableTimer(waitable_timer_.handle, &timeout, 1, 0, 0, FALSE);
0131   }
0132 
0133   if (thread_.get())
0134   {
0135     stop();
0136     thread_->join();
0137     thread_.reset();
0138     ::InterlockedDecrement(&outstanding_work_);
0139   }
0140 
0141   while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0)
0142   {
0143     op_queue<win_iocp_operation> ops;
0144     timer_queues_.get_all_timers(ops);
0145     ops.push(completed_ops_);
0146     if (!ops.empty())
0147     {
0148       while (win_iocp_operation* op = ops.front())
0149       {
0150         ops.pop();
0151         ::InterlockedDecrement(&outstanding_work_);
0152         op->destroy();
0153       }
0154     }
0155     else
0156     {
0157       DWORD bytes_transferred = 0;
0158       dword_ptr_t completion_key = 0;
0159       LPOVERLAPPED overlapped = 0;
0160       ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
0161           &completion_key, &overlapped, gqcs_timeout_);
0162       if (overlapped)
0163       {
0164         ::InterlockedDecrement(&outstanding_work_);
0165         static_cast<win_iocp_operation*>(overlapped)->destroy();
0166       }
0167     }
0168   }
0169 
0170   if (timer_thread_.get())
0171   {
0172     timer_thread_->join();
0173     timer_thread_.reset();
0174   }
0175 }
0176 
0177 boost::system::error_code win_iocp_io_context::register_handle(
0178     HANDLE handle, boost::system::error_code& ec)
0179 {
0180   if (::CreateIoCompletionPort(handle, iocp_.handle, 0, 0) == 0)
0181   {
0182     DWORD last_error = ::GetLastError();
0183     ec = boost::system::error_code(last_error,
0184         boost::asio::error::get_system_category());
0185   }
0186   else
0187   {
0188     ec = boost::system::error_code();
0189   }
0190   return ec;
0191 }
0192 
0193 size_t win_iocp_io_context::run(boost::system::error_code& ec)
0194 {
0195   if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
0196   {
0197     stop();
0198     ec = boost::system::error_code();
0199     return 0;
0200   }
0201 
0202   win_iocp_thread_info this_thread;
0203   thread_call_stack::context ctx(this, this_thread);
0204 
0205   size_t n = 0;
0206   while (do_one(INFINITE, this_thread, ec))
0207     if (n != (std::numeric_limits<size_t>::max)())
0208       ++n;
0209   return n;
0210 }
0211 
0212 size_t win_iocp_io_context::run_one(boost::system::error_code& ec)
0213 {
0214   if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
0215   {
0216     stop();
0217     ec = boost::system::error_code();
0218     return 0;
0219   }
0220 
0221   win_iocp_thread_info this_thread;
0222   thread_call_stack::context ctx(this, this_thread);
0223 
0224   return do_one(INFINITE, this_thread, ec);
0225 }
0226 
0227 size_t win_iocp_io_context::wait_one(long usec, boost::system::error_code& ec)
0228 {
0229   if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
0230   {
0231     stop();
0232     ec = boost::system::error_code();
0233     return 0;
0234   }
0235 
0236   win_iocp_thread_info this_thread;
0237   thread_call_stack::context ctx(this, this_thread);
0238 
0239   return do_one(usec < 0 ? INFINITE : ((usec - 1) / 1000 + 1), this_thread, ec);
0240 }
0241 
0242 size_t win_iocp_io_context::poll(boost::system::error_code& ec)
0243 {
0244   if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
0245   {
0246     stop();
0247     ec = boost::system::error_code();
0248     return 0;
0249   }
0250 
0251   win_iocp_thread_info this_thread;
0252   thread_call_stack::context ctx(this, this_thread);
0253 
0254   size_t n = 0;
0255   while (do_one(0, this_thread, ec))
0256     if (n != (std::numeric_limits<size_t>::max)())
0257       ++n;
0258   return n;
0259 }
0260 
0261 size_t win_iocp_io_context::poll_one(boost::system::error_code& ec)
0262 {
0263   if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
0264   {
0265     stop();
0266     ec = boost::system::error_code();
0267     return 0;
0268   }
0269 
0270   win_iocp_thread_info this_thread;
0271   thread_call_stack::context ctx(this, this_thread);
0272 
0273   return do_one(0, this_thread, ec);
0274 }
0275 
0276 void win_iocp_io_context::stop()
0277 {
0278   if (::InterlockedExchange(&stopped_, 1) == 0)
0279   {
0280     if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
0281     {
0282       if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
0283       {
0284         DWORD last_error = ::GetLastError();
0285         boost::system::error_code ec(last_error,
0286             boost::asio::error::get_system_category());
0287         boost::asio::detail::throw_error(ec, "pqcs");
0288       }
0289     }
0290   }
0291 }
0292 
0293 bool win_iocp_io_context::can_dispatch()
0294 {
0295   return thread_call_stack::contains(this) != 0;
0296 }
0297 
0298 void win_iocp_io_context::capture_current_exception()
0299 {
0300   if (thread_info_base* this_thread = thread_call_stack::contains(this))
0301     this_thread->capture_current_exception();
0302 }
0303 
0304 void win_iocp_io_context::post_deferred_completion(win_iocp_operation* op)
0305 {
0306   // Flag the operation as ready.
0307   op->ready_ = 1;
0308 
0309   // Enqueue the operation on the I/O completion port.
0310   if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op))
0311   {
0312     // Out of resources. Put on completed queue instead.
0313     mutex::scoped_lock lock(dispatch_mutex_);
0314     completed_ops_.push(op);
0315     ::InterlockedExchange(&dispatch_required_, 1);
0316   }
0317 }
0318 
0319 void win_iocp_io_context::post_deferred_completions(
0320     op_queue<win_iocp_operation>& ops)
0321 {
0322   while (win_iocp_operation* op = ops.front())
0323   {
0324     ops.pop();
0325 
0326     // Flag the operation as ready.
0327     op->ready_ = 1;
0328 
0329     // Enqueue the operation on the I/O completion port.
0330     if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op))
0331     {
0332       // Out of resources. Put on completed queue instead.
0333       mutex::scoped_lock lock(dispatch_mutex_);
0334       completed_ops_.push(op);
0335       completed_ops_.push(ops);
0336       ::InterlockedExchange(&dispatch_required_, 1);
0337     }
0338   }
0339 }
0340 
0341 void win_iocp_io_context::abandon_operations(
0342     op_queue<win_iocp_operation>& ops)
0343 {
0344   while (win_iocp_operation* op = ops.front())
0345   {
0346     ops.pop();
0347     ::InterlockedDecrement(&outstanding_work_);
0348     op->destroy();
0349   }
0350 }
0351 
0352 void win_iocp_io_context::on_pending(win_iocp_operation* op)
0353 {
0354   if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
0355   {
0356     // Enqueue the operation on the I/O completion port.
0357     if (!::PostQueuedCompletionStatus(iocp_.handle,
0358           0, overlapped_contains_result, op))
0359     {
0360       // Out of resources. Put on completed queue instead.
0361       mutex::scoped_lock lock(dispatch_mutex_);
0362       completed_ops_.push(op);
0363       ::InterlockedExchange(&dispatch_required_, 1);
0364     }
0365   }
0366 }
0367 
0368 void win_iocp_io_context::on_completion(win_iocp_operation* op,
0369     DWORD last_error, DWORD bytes_transferred)
0370 {
0371   // Flag that the operation is ready for invocation.
0372   op->ready_ = 1;
0373 
0374   // Store results in the OVERLAPPED structure.
0375   op->Internal = reinterpret_cast<ulong_ptr_t>(
0376       &boost::asio::error::get_system_category());
0377   op->Offset = last_error;
0378   op->OffsetHigh = bytes_transferred;
0379 
0380   // Enqueue the operation on the I/O completion port.
0381   if (!::PostQueuedCompletionStatus(iocp_.handle,
0382         0, overlapped_contains_result, op))
0383   {
0384     // Out of resources. Put on completed queue instead.
0385     mutex::scoped_lock lock(dispatch_mutex_);
0386     completed_ops_.push(op);
0387     ::InterlockedExchange(&dispatch_required_, 1);
0388   }
0389 }
0390 
0391 void win_iocp_io_context::on_completion(win_iocp_operation* op,
0392     const boost::system::error_code& ec, DWORD bytes_transferred)
0393 {
0394   // Flag that the operation is ready for invocation.
0395   op->ready_ = 1;
0396 
0397   // Store results in the OVERLAPPED structure.
0398   op->Internal = reinterpret_cast<ulong_ptr_t>(&ec.category());
0399   op->Offset = ec.value();
0400   op->OffsetHigh = bytes_transferred;
0401 
0402   // Enqueue the operation on the I/O completion port.
0403   if (!::PostQueuedCompletionStatus(iocp_.handle,
0404         0, overlapped_contains_result, op))
0405   {
0406     // Out of resources. Put on completed queue instead.
0407     mutex::scoped_lock lock(dispatch_mutex_);
0408     completed_ops_.push(op);
0409     ::InterlockedExchange(&dispatch_required_, 1);
0410   }
0411 }
0412 
0413 size_t win_iocp_io_context::do_one(DWORD msec,
0414     win_iocp_thread_info& this_thread, boost::system::error_code& ec)
0415 {
0416   for (;;)
0417   {
0418     // Try to acquire responsibility for dispatching timers and completed ops.
0419     if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)
0420     {
0421       mutex::scoped_lock lock(dispatch_mutex_);
0422 
0423       // Dispatch pending timers and operations.
0424       op_queue<win_iocp_operation> ops;
0425       ops.push(completed_ops_);
0426       timer_queues_.get_ready_timers(ops);
0427       post_deferred_completions(ops);
0428       update_timeout();
0429     }
0430 
0431     // Get the next operation from the queue.
0432     DWORD bytes_transferred = 0;
0433     dword_ptr_t completion_key = 0;
0434     LPOVERLAPPED overlapped = 0;
0435     ::SetLastError(0);
0436     BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle,
0437         &bytes_transferred, &completion_key, &overlapped,
0438         msec < gqcs_timeout_ ? msec : gqcs_timeout_);
0439     DWORD last_error = ::GetLastError();
0440 
0441     if (overlapped)
0442     {
0443       win_iocp_operation* op = static_cast<win_iocp_operation*>(overlapped);
0444       boost::system::error_code result_ec(last_error,
0445           boost::asio::error::get_system_category());
0446 
0447       // We may have been passed the last_error and bytes_transferred in the
0448       // OVERLAPPED structure itself.
0449       if (completion_key == overlapped_contains_result)
0450       {
0451         result_ec = boost::system::error_code(static_cast<int>(op->Offset),
0452             *reinterpret_cast<boost::system::error_category*>(op->Internal));
0453         bytes_transferred = op->OffsetHigh;
0454       }
0455 
0456       // Otherwise ensure any result has been saved into the OVERLAPPED
0457       // structure.
0458       else
0459       {
0460         op->Internal = reinterpret_cast<ulong_ptr_t>(&result_ec.category());
0461         op->Offset = result_ec.value();
0462         op->OffsetHigh = bytes_transferred;
0463       }
0464 
0465       // Dispatch the operation only if ready. The operation may not be ready
0466       // if the initiating function (e.g. a call to WSARecv) has not yet
0467       // returned. This is because the initiating function still wants access
0468       // to the operation's OVERLAPPED structure.
0469       if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
0470       {
0471         // Ensure the count of outstanding work is decremented on block exit.
0472         work_finished_on_block_exit on_exit = { this };
0473         (void)on_exit;
0474 
0475         op->complete(this, result_ec, bytes_transferred);
0476         this_thread.rethrow_pending_exception();
0477         ec = boost::system::error_code();
0478         return 1;
0479       }
0480     }
0481     else if (!ok)
0482     {
0483       if (last_error != WAIT_TIMEOUT)
0484       {
0485         ec = boost::system::error_code(last_error,
0486             boost::asio::error::get_system_category());
0487         return 0;
0488       }
0489 
0490       // If we're waiting indefinitely we need to keep going until we get a
0491       // real handler.
0492       if (msec == INFINITE)
0493         continue;
0494 
0495       ec = boost::system::error_code();
0496       return 0;
0497     }
0498     else if (completion_key == wake_for_dispatch)
0499     {
0500       // We have been woken up to try to acquire responsibility for dispatching
0501       // timers and completed operations.
0502     }
0503     else
0504     {
0505       // Indicate that there is no longer an in-flight stop event.
0506       ::InterlockedExchange(&stop_event_posted_, 0);
0507 
0508       // The stopped_ flag is always checked to ensure that any leftover
0509       // stop events from a previous run invocation are ignored.
0510       if (::InterlockedExchangeAdd(&stopped_, 0) != 0)
0511       {
0512         // Wake up next thread that is blocked on GetQueuedCompletionStatus.
0513         if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
0514         {
0515           if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
0516           {
0517             last_error = ::GetLastError();
0518             ec = boost::system::error_code(last_error,
0519                 boost::asio::error::get_system_category());
0520             return 0;
0521           }
0522         }
0523 
0524         ec = boost::system::error_code();
0525         return 0;
0526       }
0527     }
0528   }
0529 }
0530 
0531 DWORD win_iocp_io_context::get_gqcs_timeout()
0532 {
0533 #if !defined(_WIN32_WINNT) || (_WIN32_WINNT < 0x0600)
0534   OSVERSIONINFOEX osvi;
0535   ZeroMemory(&osvi, sizeof(osvi));
0536   osvi.dwOSVersionInfoSize = sizeof(osvi);
0537   osvi.dwMajorVersion = 6ul;
0538 
0539   const uint64_t condition_mask = ::VerSetConditionMask(
0540       0, VER_MAJORVERSION, VER_GREATER_EQUAL);
0541 
0542   if (!!::VerifyVersionInfo(&osvi, VER_MAJORVERSION, condition_mask))
0543     return INFINITE;
0544 
0545   return default_gqcs_timeout;
0546 #else // !defined(_WIN32_WINNT) || (_WIN32_WINNT < 0x0600)
0547   return INFINITE;
0548 #endif // !defined(_WIN32_WINNT) || (_WIN32_WINNT < 0x0600)
0549 }
0550 
0551 void win_iocp_io_context::do_add_timer_queue(timer_queue_base& queue)
0552 {
0553   mutex::scoped_lock lock(dispatch_mutex_);
0554 
0555   timer_queues_.insert(&queue);
0556 
0557   if (!waitable_timer_.handle)
0558   {
0559     waitable_timer_.handle = ::CreateWaitableTimer(0, FALSE, 0);
0560     if (waitable_timer_.handle == 0)
0561     {
0562       DWORD last_error = ::GetLastError();
0563       boost::system::error_code ec(last_error,
0564           boost::asio::error::get_system_category());
0565       boost::asio::detail::throw_error(ec, "timer");
0566     }
0567 
0568     LARGE_INTEGER timeout;
0569     timeout.QuadPart = -max_timeout_usec;
0570     timeout.QuadPart *= 10;
0571     ::SetWaitableTimer(waitable_timer_.handle,
0572         &timeout, max_timeout_msec, 0, 0, FALSE);
0573   }
0574 
0575   if (!timer_thread_.get())
0576   {
0577     timer_thread_function thread_function = { this };
0578     timer_thread_.reset(new thread(thread_function, 65536));
0579   }
0580 }
0581 
0582 void win_iocp_io_context::do_remove_timer_queue(timer_queue_base& queue)
0583 {
0584   mutex::scoped_lock lock(dispatch_mutex_);
0585 
0586   timer_queues_.erase(&queue);
0587 }
0588 
0589 void win_iocp_io_context::update_timeout()
0590 {
0591   if (timer_thread_.get())
0592   {
0593     // There's no point updating the waitable timer if the new timeout period
0594     // exceeds the maximum timeout. In that case, we might as well wait for the
0595     // existing period of the timer to expire.
0596     long timeout_usec = timer_queues_.wait_duration_usec(max_timeout_usec);
0597     if (timeout_usec < max_timeout_usec)
0598     {
0599       LARGE_INTEGER timeout;
0600       timeout.QuadPart = -timeout_usec;
0601       timeout.QuadPart *= 10;
0602       ::SetWaitableTimer(waitable_timer_.handle,
0603           &timeout, max_timeout_msec, 0, 0, FALSE);
0604     }
0605   }
0606 }
0607 
0608 } // namespace detail
0609 } // namespace asio
0610 } // namespace boost
0611 
0612 #include <boost/asio/detail/pop_options.hpp>
0613 
0614 #endif // defined(BOOST_ASIO_HAS_IOCP)
0615 
0616 #endif // BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP