Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /include/boost/asio/detail/impl/win_iocp_io_context.ipp was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 //
0002 // detail/impl/win_iocp_io_context.ipp
0003 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0004 //
0005 // Copyright (c) 2003-2025 Christopher M. Kohlhoff (chris at kohlhoff dot com)
0006 //
0007 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0009 //
0010 
0011 #ifndef BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP
0012 #define BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP
0013 
0014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0015 # pragma once
0016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
0017 
0018 #include <boost/asio/detail/config.hpp>
0019 
0020 #if defined(BOOST_ASIO_HAS_IOCP)
0021 
0022 #include <boost/asio/config.hpp>
0023 #include <boost/asio/error.hpp>
0024 #include <boost/asio/detail/cstdint.hpp>
0025 #include <boost/asio/detail/handler_alloc_helpers.hpp>
0026 #include <boost/asio/detail/limits.hpp>
0027 #include <boost/asio/detail/thread.hpp>
0028 #include <boost/asio/detail/throw_error.hpp>
0029 #include <boost/asio/detail/win_iocp_io_context.hpp>
0030 
0031 #include <boost/asio/detail/push_options.hpp>
0032 
0033 namespace boost {
0034 namespace asio {
0035 namespace detail {
0036 
0037 struct win_iocp_io_context::thread_function
0038 {
0039   explicit thread_function(win_iocp_io_context* s)
0040     : this_(s)
0041   {
0042   }
0043 
0044   void operator()()
0045   {
0046     boost::system::error_code ec;
0047     this_->run(ec);
0048   }
0049 
0050   win_iocp_io_context* this_;
0051 };
0052 
0053 struct win_iocp_io_context::work_finished_on_block_exit
0054 {
0055   ~work_finished_on_block_exit() noexcept(false)
0056   {
0057     io_context_->work_finished();
0058   }
0059 
0060   win_iocp_io_context* io_context_;
0061 };
0062 
0063 struct win_iocp_io_context::timer_thread_function
0064 {
0065   void operator()()
0066   {
0067     while (::InterlockedExchangeAdd(&io_context_->shutdown_, 0) == 0)
0068     {
0069       if (::WaitForSingleObject(io_context_->waitable_timer_.handle,
0070             INFINITE) == WAIT_OBJECT_0)
0071       {
0072         ::InterlockedExchange(&io_context_->dispatch_required_, 1);
0073         ::PostQueuedCompletionStatus(io_context_->iocp_.handle,
0074             0, wake_for_dispatch, 0);
0075       }
0076     }
0077   }
0078 
0079   win_iocp_io_context* io_context_;
0080 };
0081 
0082 win_iocp_io_context::win_iocp_io_context(
0083     boost::asio::execution_context& ctx, bool own_thread)
0084   : execution_context_service_base<win_iocp_io_context>(ctx),
0085     iocp_(),
0086     outstanding_work_(0),
0087     stopped_(0),
0088     stop_event_posted_(0),
0089     shutdown_(0),
0090     gqcs_timeout_(get_gqcs_timeout()),
0091     dispatch_required_(0),
0092     concurrency_hint_(config(ctx).get("scheduler", "concurrency_hint", -1))
0093 {
0094   BOOST_ASIO_HANDLER_TRACKING_INIT;
0095 
0096   iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0,
0097       static_cast<DWORD>(concurrency_hint_ >= 0
0098         ? concurrency_hint_ : DWORD(~0)));
0099   if (!iocp_.handle)
0100   {
0101     DWORD last_error = ::GetLastError();
0102     boost::system::error_code ec(last_error,
0103         boost::asio::error::get_system_category());
0104     boost::asio::detail::throw_error(ec, "iocp");
0105   }
0106 
0107   if (own_thread)
0108   {
0109     ::InterlockedIncrement(&outstanding_work_);
0110     thread_.reset(new boost::asio::detail::thread(thread_function(this)));
0111   }
0112 }
0113 
0114 win_iocp_io_context::~win_iocp_io_context()
0115 {
0116   if (thread_.get())
0117   {
0118     stop();
0119     thread_->join();
0120     thread_.reset();
0121   }
0122 }
0123 
0124 void win_iocp_io_context::shutdown()
0125 {
0126   ::InterlockedExchange(&shutdown_, 1);
0127 
0128   if (timer_thread_.get())
0129   {
0130     LARGE_INTEGER timeout;
0131     timeout.QuadPart = 1;
0132     ::SetWaitableTimer(waitable_timer_.handle, &timeout, 1, 0, 0, FALSE);
0133   }
0134 
0135   if (thread_.get())
0136   {
0137     stop();
0138     thread_->join();
0139     thread_.reset();
0140     ::InterlockedDecrement(&outstanding_work_);
0141   }
0142 
0143   while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0)
0144   {
0145     op_queue<win_iocp_operation> ops;
0146     timer_queues_.get_all_timers(ops);
0147     ops.push(completed_ops_);
0148     if (!ops.empty())
0149     {
0150       while (win_iocp_operation* op = ops.front())
0151       {
0152         ops.pop();
0153         ::InterlockedDecrement(&outstanding_work_);
0154         op->destroy();
0155       }
0156     }
0157     else
0158     {
0159       DWORD bytes_transferred = 0;
0160       dword_ptr_t completion_key = 0;
0161       LPOVERLAPPED overlapped = 0;
0162       ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
0163           &completion_key, &overlapped, gqcs_timeout_);
0164       if (overlapped)
0165       {
0166         ::InterlockedDecrement(&outstanding_work_);
0167         static_cast<win_iocp_operation*>(overlapped)->destroy();
0168       }
0169     }
0170   }
0171 
0172   if (timer_thread_.get())
0173   {
0174     timer_thread_->join();
0175     timer_thread_.reset();
0176   }
0177 }
0178 
0179 boost::system::error_code win_iocp_io_context::register_handle(
0180     HANDLE handle, boost::system::error_code& ec)
0181 {
0182   if (::CreateIoCompletionPort(handle, iocp_.handle, 0, 0) == 0)
0183   {
0184     DWORD last_error = ::GetLastError();
0185     ec = boost::system::error_code(last_error,
0186         boost::asio::error::get_system_category());
0187   }
0188   else
0189   {
0190     ec = boost::system::error_code();
0191   }
0192   return ec;
0193 }
0194 
0195 size_t win_iocp_io_context::run(boost::system::error_code& ec)
0196 {
0197   if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
0198   {
0199     stop();
0200     ec = boost::system::error_code();
0201     return 0;
0202   }
0203 
0204   win_iocp_thread_info this_thread;
0205   thread_call_stack::context ctx(this, this_thread);
0206 
0207   size_t n = 0;
0208   while (do_one(INFINITE, this_thread, ec))
0209     if (n != (std::numeric_limits<size_t>::max)())
0210       ++n;
0211   return n;
0212 }
0213 
0214 size_t win_iocp_io_context::run_one(boost::system::error_code& ec)
0215 {
0216   if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
0217   {
0218     stop();
0219     ec = boost::system::error_code();
0220     return 0;
0221   }
0222 
0223   win_iocp_thread_info this_thread;
0224   thread_call_stack::context ctx(this, this_thread);
0225 
0226   return do_one(INFINITE, this_thread, ec);
0227 }
0228 
0229 size_t win_iocp_io_context::wait_one(long usec, boost::system::error_code& ec)
0230 {
0231   if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
0232   {
0233     stop();
0234     ec = boost::system::error_code();
0235     return 0;
0236   }
0237 
0238   win_iocp_thread_info this_thread;
0239   thread_call_stack::context ctx(this, this_thread);
0240 
0241   return do_one(usec < 0 ? INFINITE : ((usec - 1) / 1000 + 1), this_thread, ec);
0242 }
0243 
0244 size_t win_iocp_io_context::poll(boost::system::error_code& ec)
0245 {
0246   if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
0247   {
0248     stop();
0249     ec = boost::system::error_code();
0250     return 0;
0251   }
0252 
0253   win_iocp_thread_info this_thread;
0254   thread_call_stack::context ctx(this, this_thread);
0255 
0256   size_t n = 0;
0257   while (do_one(0, this_thread, ec))
0258     if (n != (std::numeric_limits<size_t>::max)())
0259       ++n;
0260   return n;
0261 }
0262 
0263 size_t win_iocp_io_context::poll_one(boost::system::error_code& ec)
0264 {
0265   if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
0266   {
0267     stop();
0268     ec = boost::system::error_code();
0269     return 0;
0270   }
0271 
0272   win_iocp_thread_info this_thread;
0273   thread_call_stack::context ctx(this, this_thread);
0274 
0275   return do_one(0, this_thread, ec);
0276 }
0277 
0278 void win_iocp_io_context::stop()
0279 {
0280   if (::InterlockedExchange(&stopped_, 1) == 0)
0281   {
0282     if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
0283     {
0284       if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
0285       {
0286         DWORD last_error = ::GetLastError();
0287         boost::system::error_code ec(last_error,
0288             boost::asio::error::get_system_category());
0289         boost::asio::detail::throw_error(ec, "pqcs");
0290       }
0291     }
0292   }
0293 }
0294 
0295 bool win_iocp_io_context::can_dispatch()
0296 {
0297   return thread_call_stack::contains(this) != 0;
0298 }
0299 
0300 void win_iocp_io_context::capture_current_exception()
0301 {
0302   if (thread_info_base* this_thread = thread_call_stack::contains(this))
0303     this_thread->capture_current_exception();
0304 }
0305 
0306 void win_iocp_io_context::post_deferred_completion(win_iocp_operation* op)
0307 {
0308   // Flag the operation as ready.
0309   op->ready_ = 1;
0310 
0311   // Enqueue the operation on the I/O completion port.
0312   if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op))
0313   {
0314     // Out of resources. Put on completed queue instead.
0315     mutex::scoped_lock lock(dispatch_mutex_);
0316     completed_ops_.push(op);
0317     ::InterlockedExchange(&dispatch_required_, 1);
0318   }
0319 }
0320 
0321 void win_iocp_io_context::post_deferred_completions(
0322     op_queue<win_iocp_operation>& ops)
0323 {
0324   while (win_iocp_operation* op = ops.front())
0325   {
0326     ops.pop();
0327 
0328     // Flag the operation as ready.
0329     op->ready_ = 1;
0330 
0331     // Enqueue the operation on the I/O completion port.
0332     if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op))
0333     {
0334       // Out of resources. Put on completed queue instead.
0335       mutex::scoped_lock lock(dispatch_mutex_);
0336       completed_ops_.push(op);
0337       completed_ops_.push(ops);
0338       ::InterlockedExchange(&dispatch_required_, 1);
0339     }
0340   }
0341 }
0342 
0343 void win_iocp_io_context::abandon_operations(
0344     op_queue<win_iocp_operation>& ops)
0345 {
0346   while (win_iocp_operation* op = ops.front())
0347   {
0348     ops.pop();
0349     ::InterlockedDecrement(&outstanding_work_);
0350     op->destroy();
0351   }
0352 }
0353 
0354 void win_iocp_io_context::on_pending(win_iocp_operation* op)
0355 {
0356   if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
0357   {
0358     // Enqueue the operation on the I/O completion port.
0359     if (!::PostQueuedCompletionStatus(iocp_.handle,
0360           0, overlapped_contains_result, op))
0361     {
0362       // Out of resources. Put on completed queue instead.
0363       mutex::scoped_lock lock(dispatch_mutex_);
0364       completed_ops_.push(op);
0365       ::InterlockedExchange(&dispatch_required_, 1);
0366     }
0367   }
0368 }
0369 
0370 void win_iocp_io_context::on_completion(win_iocp_operation* op,
0371     DWORD last_error, DWORD bytes_transferred)
0372 {
0373   // Flag that the operation is ready for invocation.
0374   op->ready_ = 1;
0375 
0376   // Store results in the OVERLAPPED structure.
0377   op->Internal = reinterpret_cast<ulong_ptr_t>(
0378       &boost::asio::error::get_system_category());
0379   op->Offset = last_error;
0380   op->OffsetHigh = bytes_transferred;
0381 
0382   // Enqueue the operation on the I/O completion port.
0383   if (!::PostQueuedCompletionStatus(iocp_.handle,
0384         0, overlapped_contains_result, op))
0385   {
0386     // Out of resources. Put on completed queue instead.
0387     mutex::scoped_lock lock(dispatch_mutex_);
0388     completed_ops_.push(op);
0389     ::InterlockedExchange(&dispatch_required_, 1);
0390   }
0391 }
0392 
0393 void win_iocp_io_context::on_completion(win_iocp_operation* op,
0394     const boost::system::error_code& ec, DWORD bytes_transferred)
0395 {
0396   // Flag that the operation is ready for invocation.
0397   op->ready_ = 1;
0398 
0399   // Store results in the OVERLAPPED structure.
0400   op->Internal = reinterpret_cast<ulong_ptr_t>(&ec.category());
0401   op->Offset = ec.value();
0402   op->OffsetHigh = bytes_transferred;
0403 
0404   // Enqueue the operation on the I/O completion port.
0405   if (!::PostQueuedCompletionStatus(iocp_.handle,
0406         0, overlapped_contains_result, op))
0407   {
0408     // Out of resources. Put on completed queue instead.
0409     mutex::scoped_lock lock(dispatch_mutex_);
0410     completed_ops_.push(op);
0411     ::InterlockedExchange(&dispatch_required_, 1);
0412   }
0413 }
0414 
0415 size_t win_iocp_io_context::do_one(DWORD msec,
0416     win_iocp_thread_info& this_thread, boost::system::error_code& ec)
0417 {
0418   for (;;)
0419   {
0420     // Try to acquire responsibility for dispatching timers and completed ops.
0421     if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)
0422     {
0423       mutex::scoped_lock lock(dispatch_mutex_);
0424 
0425       // Dispatch pending timers and operations.
0426       op_queue<win_iocp_operation> ops;
0427       ops.push(completed_ops_);
0428       timer_queues_.get_ready_timers(ops);
0429       post_deferred_completions(ops);
0430       update_timeout();
0431     }
0432 
0433     // Get the next operation from the queue.
0434     DWORD bytes_transferred = 0;
0435     dword_ptr_t completion_key = 0;
0436     LPOVERLAPPED overlapped = 0;
0437     ::SetLastError(0);
0438     BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle,
0439         &bytes_transferred, &completion_key, &overlapped,
0440         msec < gqcs_timeout_ ? msec : gqcs_timeout_);
0441     DWORD last_error = ::GetLastError();
0442 
0443     if (overlapped)
0444     {
0445       win_iocp_operation* op = static_cast<win_iocp_operation*>(overlapped);
0446       boost::system::error_code result_ec(last_error,
0447           boost::asio::error::get_system_category());
0448 
0449       // We may have been passed the last_error and bytes_transferred in the
0450       // OVERLAPPED structure itself.
0451       if (completion_key == overlapped_contains_result)
0452       {
0453         result_ec = boost::system::error_code(static_cast<int>(op->Offset),
0454             *reinterpret_cast<boost::system::error_category*>(op->Internal));
0455         bytes_transferred = op->OffsetHigh;
0456       }
0457 
0458       // Otherwise ensure any result has been saved into the OVERLAPPED
0459       // structure.
0460       else
0461       {
0462         op->Internal = reinterpret_cast<ulong_ptr_t>(&result_ec.category());
0463         op->Offset = result_ec.value();
0464         op->OffsetHigh = bytes_transferred;
0465       }
0466 
0467       // Dispatch the operation only if ready. The operation may not be ready
0468       // if the initiating function (e.g. a call to WSARecv) has not yet
0469       // returned. This is because the initiating function still wants access
0470       // to the operation's OVERLAPPED structure.
0471       if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
0472       {
0473         // Ensure the count of outstanding work is decremented on block exit.
0474         work_finished_on_block_exit on_exit = { this };
0475         (void)on_exit;
0476 
0477         op->complete(this, result_ec, bytes_transferred);
0478         this_thread.rethrow_pending_exception();
0479         ec = boost::system::error_code();
0480         return 1;
0481       }
0482     }
0483     else if (!ok)
0484     {
0485       if (last_error != WAIT_TIMEOUT)
0486       {
0487         ec = boost::system::error_code(last_error,
0488             boost::asio::error::get_system_category());
0489         return 0;
0490       }
0491 
0492       // If we're waiting indefinitely we need to keep going until we get a
0493       // real handler.
0494       if (msec == INFINITE)
0495         continue;
0496 
0497       ec = boost::system::error_code();
0498       return 0;
0499     }
0500     else if (completion_key == wake_for_dispatch)
0501     {
0502       // We have been woken up to try to acquire responsibility for dispatching
0503       // timers and completed operations.
0504     }
0505     else
0506     {
0507       // Indicate that there is no longer an in-flight stop event.
0508       ::InterlockedExchange(&stop_event_posted_, 0);
0509 
0510       // The stopped_ flag is always checked to ensure that any leftover
0511       // stop events from a previous run invocation are ignored.
0512       if (::InterlockedExchangeAdd(&stopped_, 0) != 0)
0513       {
0514         // Wake up next thread that is blocked on GetQueuedCompletionStatus.
0515         if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
0516         {
0517           if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
0518           {
0519             last_error = ::GetLastError();
0520             ec = boost::system::error_code(last_error,
0521                 boost::asio::error::get_system_category());
0522             return 0;
0523           }
0524         }
0525 
0526         ec = boost::system::error_code();
0527         return 0;
0528       }
0529     }
0530   }
0531 }
0532 
0533 DWORD win_iocp_io_context::get_gqcs_timeout()
0534 {
0535 #if !defined(_WIN32_WINNT) || (_WIN32_WINNT < 0x0600)
0536   OSVERSIONINFOEX osvi;
0537   ZeroMemory(&osvi, sizeof(osvi));
0538   osvi.dwOSVersionInfoSize = sizeof(osvi);
0539   osvi.dwMajorVersion = 6ul;
0540 
0541   const uint64_t condition_mask = ::VerSetConditionMask(
0542       0, VER_MAJORVERSION, VER_GREATER_EQUAL);
0543 
0544   if (!!::VerifyVersionInfo(&osvi, VER_MAJORVERSION, condition_mask))
0545     return INFINITE;
0546 
0547   return default_gqcs_timeout;
0548 #else // !defined(_WIN32_WINNT) || (_WIN32_WINNT < 0x0600)
0549   return INFINITE;
0550 #endif // !defined(_WIN32_WINNT) || (_WIN32_WINNT < 0x0600)
0551 }
0552 
0553 void win_iocp_io_context::do_add_timer_queue(timer_queue_base& queue)
0554 {
0555   mutex::scoped_lock lock(dispatch_mutex_);
0556 
0557   timer_queues_.insert(&queue);
0558 
0559   if (!waitable_timer_.handle)
0560   {
0561     waitable_timer_.handle = ::CreateWaitableTimer(0, FALSE, 0);
0562     if (waitable_timer_.handle == 0)
0563     {
0564       DWORD last_error = ::GetLastError();
0565       boost::system::error_code ec(last_error,
0566           boost::asio::error::get_system_category());
0567       boost::asio::detail::throw_error(ec, "timer");
0568     }
0569 
0570     LARGE_INTEGER timeout;
0571     timeout.QuadPart = -max_timeout_usec;
0572     timeout.QuadPart *= 10;
0573     ::SetWaitableTimer(waitable_timer_.handle,
0574         &timeout, max_timeout_msec, 0, 0, FALSE);
0575   }
0576 
0577   if (!timer_thread_.get())
0578   {
0579     timer_thread_function thread_function = { this };
0580     timer_thread_.reset(new thread(thread_function, 65536));
0581   }
0582 }
0583 
0584 void win_iocp_io_context::do_remove_timer_queue(timer_queue_base& queue)
0585 {
0586   mutex::scoped_lock lock(dispatch_mutex_);
0587 
0588   timer_queues_.erase(&queue);
0589 }
0590 
0591 void win_iocp_io_context::update_timeout()
0592 {
0593   if (timer_thread_.get())
0594   {
0595     // There's no point updating the waitable timer if the new timeout period
0596     // exceeds the maximum timeout. In that case, we might as well wait for the
0597     // existing period of the timer to expire.
0598     long timeout_usec = timer_queues_.wait_duration_usec(max_timeout_usec);
0599     if (timeout_usec < max_timeout_usec)
0600     {
0601       LARGE_INTEGER timeout;
0602       timeout.QuadPart = -timeout_usec;
0603       timeout.QuadPart *= 10;
0604       ::SetWaitableTimer(waitable_timer_.handle,
0605           &timeout, max_timeout_msec, 0, 0, FALSE);
0606     }
0607   }
0608 }
0609 
0610 } // namespace detail
0611 } // namespace asio
0612 } // namespace boost
0613 
0614 #include <boost/asio/detail/pop_options.hpp>
0615 
0616 #endif // defined(BOOST_ASIO_HAS_IOCP)
0617 
0618 #endif // BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP