Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-16 08:28:42

0001 //
0002 // detail/impl/scheduler.ipp
0003 // ~~~~~~~~~~~~~~~~~~~~~~~~~
0004 //
0005 // Copyright (c) 2003-2025 Christopher M. Kohlhoff (chris at kohlhoff dot com)
0006 //
0007 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0009 //
0010 
0011 #ifndef BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP
0012 #define BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP
0013 
0014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0015 # pragma once
0016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
0017 
0018 #include <boost/asio/detail/config.hpp>
0019 
0020 #include <boost/asio/config.hpp>
0021 #include <boost/asio/detail/event.hpp>
0022 #include <boost/asio/detail/limits.hpp>
0023 #include <boost/asio/detail/scheduler.hpp>
0024 #include <boost/asio/detail/scheduler_thread_info.hpp>
0025 #include <boost/asio/detail/signal_blocker.hpp>
0026 
0027 #if defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
0028 # include <boost/asio/detail/io_uring_service.hpp>
0029 #else // defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
0030 # include <boost/asio/detail/reactor.hpp>
0031 #endif // defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
0032 
0033 #include <boost/asio/detail/push_options.hpp>
0034 
0035 namespace boost {
0036 namespace asio {
0037 namespace detail {
0038 
0039 class scheduler::thread_function
0040 {
0041 public:
0042   explicit thread_function(scheduler* s)
0043     : this_(s)
0044   {
0045   }
0046 
0047   void operator()()
0048   {
0049     boost::system::error_code ec;
0050     this_->run(ec);
0051   }
0052 
0053 private:
0054   scheduler* this_;
0055 };
0056 
0057 struct scheduler::task_cleanup
0058 {
0059   ~task_cleanup()
0060   {
0061     if (this_thread_->private_outstanding_work > 0)
0062     {
0063       boost::asio::detail::increment(
0064           scheduler_->outstanding_work_,
0065           this_thread_->private_outstanding_work);
0066     }
0067     this_thread_->private_outstanding_work = 0;
0068 
0069     // Enqueue the completed operations and reinsert the task at the end of
0070     // the operation queue.
0071     lock_->lock();
0072     scheduler_->task_interrupted_ = true;
0073     scheduler_->op_queue_.push(this_thread_->private_op_queue);
0074     scheduler_->op_queue_.push(&scheduler_->task_operation_);
0075   }
0076 
0077   scheduler* scheduler_;
0078   mutex::scoped_lock* lock_;
0079   thread_info* this_thread_;
0080 };
0081 
0082 struct scheduler::work_cleanup
0083 {
0084   ~work_cleanup()
0085   {
0086     if (this_thread_->private_outstanding_work > 1)
0087     {
0088       boost::asio::detail::increment(
0089           scheduler_->outstanding_work_,
0090           this_thread_->private_outstanding_work - 1);
0091     }
0092     else if (this_thread_->private_outstanding_work < 1)
0093     {
0094       scheduler_->work_finished();
0095     }
0096     this_thread_->private_outstanding_work = 0;
0097 
0098 #if defined(BOOST_ASIO_HAS_THREADS)
0099     if (!this_thread_->private_op_queue.empty())
0100     {
0101       lock_->lock();
0102       scheduler_->op_queue_.push(this_thread_->private_op_queue);
0103     }
0104 #endif // defined(BOOST_ASIO_HAS_THREADS)
0105   }
0106 
0107   scheduler* scheduler_;
0108   mutex::scoped_lock* lock_;
0109   thread_info* this_thread_;
0110 };
0111 
0112 scheduler::scheduler(boost::asio::execution_context& ctx,
0113     bool own_thread, get_task_func_type get_task)
0114   : boost::asio::detail::execution_context_service_base<scheduler>(ctx),
0115     one_thread_(config(ctx).get("scheduler", "concurrency_hint", 0) == 1),
0116     mutex_(config(ctx).get("scheduler", "locking", true),
0117         config(ctx).get("scheduler", "locking_spin_count", 0)),
0118     task_(0),
0119     get_task_(get_task),
0120     task_interrupted_(true),
0121     outstanding_work_(0),
0122     stopped_(false),
0123     shutdown_(false),
0124     concurrency_hint_(config(ctx).get("scheduler", "concurrency_hint", 0)),
0125     task_usec_(config(ctx).get("scheduler", "task_usec", -1L)),
0126     wait_usec_(config(ctx).get("scheduler", "wait_usec", -1L)),
0127     thread_(0)
0128 {
0129   BOOST_ASIO_HANDLER_TRACKING_INIT;
0130 
0131   if (own_thread)
0132   {
0133     ++outstanding_work_;
0134     boost::asio::detail::signal_blocker sb;
0135     thread_ = new boost::asio::detail::thread(thread_function(this));
0136   }
0137 }
0138 
0139 scheduler::~scheduler()
0140 {
0141   if (thread_)
0142   {
0143     mutex::scoped_lock lock(mutex_);
0144     shutdown_ = true;
0145     stop_all_threads(lock);
0146     lock.unlock();
0147     thread_->join();
0148     delete thread_;
0149   }
0150 }
0151 
0152 void scheduler::shutdown()
0153 {
0154   mutex::scoped_lock lock(mutex_);
0155   shutdown_ = true;
0156   if (thread_)
0157     stop_all_threads(lock);
0158   lock.unlock();
0159 
0160   // Join thread to ensure task operation is returned to queue.
0161   if (thread_)
0162   {
0163     thread_->join();
0164     delete thread_;
0165     thread_ = 0;
0166   }
0167 
0168   // Destroy handler objects.
0169   while (!op_queue_.empty())
0170   {
0171     operation* o = op_queue_.front();
0172     op_queue_.pop();
0173     if (o != &task_operation_)
0174       o->destroy();
0175   }
0176 
0177   // Reset to initial state.
0178   task_ = 0;
0179 }
0180 
0181 void scheduler::init_task()
0182 {
0183   mutex::scoped_lock lock(mutex_);
0184   if (!shutdown_ && !task_)
0185   {
0186     task_ = get_task_(this->context());
0187     op_queue_.push(&task_operation_);
0188     wake_one_thread_and_unlock(lock);
0189   }
0190 }
0191 
0192 std::size_t scheduler::run(boost::system::error_code& ec)
0193 {
0194   ec = boost::system::error_code();
0195   if (outstanding_work_ == 0)
0196   {
0197     stop();
0198     return 0;
0199   }
0200 
0201   thread_info this_thread;
0202   this_thread.private_outstanding_work = 0;
0203   thread_call_stack::context ctx(this, this_thread);
0204 
0205   mutex::scoped_lock lock(mutex_);
0206 
0207   std::size_t n = 0;
0208   for (; do_run_one(lock, this_thread, ec); lock.lock())
0209     if (n != (std::numeric_limits<std::size_t>::max)())
0210       ++n;
0211   return n;
0212 }
0213 
0214 std::size_t scheduler::run_one(boost::system::error_code& ec)
0215 {
0216   ec = boost::system::error_code();
0217   if (outstanding_work_ == 0)
0218   {
0219     stop();
0220     return 0;
0221   }
0222 
0223   thread_info this_thread;
0224   this_thread.private_outstanding_work = 0;
0225   thread_call_stack::context ctx(this, this_thread);
0226 
0227   mutex::scoped_lock lock(mutex_);
0228 
0229   return do_run_one(lock, this_thread, ec);
0230 }
0231 
0232 std::size_t scheduler::wait_one(long usec, boost::system::error_code& ec)
0233 {
0234   ec = boost::system::error_code();
0235   if (outstanding_work_ == 0)
0236   {
0237     stop();
0238     return 0;
0239   }
0240 
0241   thread_info this_thread;
0242   this_thread.private_outstanding_work = 0;
0243   thread_call_stack::context ctx(this, this_thread);
0244 
0245   mutex::scoped_lock lock(mutex_);
0246 
0247   return do_wait_one(lock, this_thread, usec, ec);
0248 }
0249 
0250 std::size_t scheduler::poll(boost::system::error_code& ec)
0251 {
0252   ec = boost::system::error_code();
0253   if (outstanding_work_ == 0)
0254   {
0255     stop();
0256     return 0;
0257   }
0258 
0259   thread_info this_thread;
0260   this_thread.private_outstanding_work = 0;
0261   thread_call_stack::context ctx(this, this_thread);
0262 
0263   mutex::scoped_lock lock(mutex_);
0264 
0265 #if defined(BOOST_ASIO_HAS_THREADS)
0266   // We want to support nested calls to poll() and poll_one(), so any handlers
0267   // that are already on a thread-private queue need to be put on to the main
0268   // queue now.
0269   if (one_thread_)
0270     if (thread_info* outer_info = static_cast<thread_info*>(ctx.next_by_key()))
0271       op_queue_.push(outer_info->private_op_queue);
0272 #endif // defined(BOOST_ASIO_HAS_THREADS)
0273 
0274   std::size_t n = 0;
0275   for (; do_poll_one(lock, this_thread, ec); lock.lock())
0276     if (n != (std::numeric_limits<std::size_t>::max)())
0277       ++n;
0278   return n;
0279 }
0280 
0281 std::size_t scheduler::poll_one(boost::system::error_code& ec)
0282 {
0283   ec = boost::system::error_code();
0284   if (outstanding_work_ == 0)
0285   {
0286     stop();
0287     return 0;
0288   }
0289 
0290   thread_info this_thread;
0291   this_thread.private_outstanding_work = 0;
0292   thread_call_stack::context ctx(this, this_thread);
0293 
0294   mutex::scoped_lock lock(mutex_);
0295 
0296 #if defined(BOOST_ASIO_HAS_THREADS)
0297   // We want to support nested calls to poll() and poll_one(), so any handlers
0298   // that are already on a thread-private queue need to be put on to the main
0299   // queue now.
0300   if (one_thread_)
0301     if (thread_info* outer_info = static_cast<thread_info*>(ctx.next_by_key()))
0302       op_queue_.push(outer_info->private_op_queue);
0303 #endif // defined(BOOST_ASIO_HAS_THREADS)
0304 
0305   return do_poll_one(lock, this_thread, ec);
0306 }
0307 
0308 void scheduler::stop()
0309 {
0310   mutex::scoped_lock lock(mutex_);
0311   stop_all_threads(lock);
0312 }
0313 
0314 bool scheduler::stopped() const
0315 {
0316   mutex::scoped_lock lock(mutex_);
0317   return stopped_;
0318 }
0319 
0320 void scheduler::restart()
0321 {
0322   mutex::scoped_lock lock(mutex_);
0323   stopped_ = false;
0324 }
0325 
0326 void scheduler::compensating_work_started()
0327 {
0328   thread_info_base* this_thread = thread_call_stack::contains(this);
0329   BOOST_ASIO_ASSUME(this_thread != 0); // Only called from inside scheduler.
0330   ++static_cast<thread_info*>(this_thread)->private_outstanding_work;
0331 }
0332 
0333 bool scheduler::can_dispatch()
0334 {
0335   return thread_call_stack::contains(this) != 0;
0336 }
0337 
0338 void scheduler::capture_current_exception()
0339 {
0340   if (thread_info_base* this_thread = thread_call_stack::contains(this))
0341     this_thread->capture_current_exception();
0342 }
0343 
0344 void scheduler::post_immediate_completion(
0345     scheduler::operation* op, bool is_continuation)
0346 {
0347 #if defined(BOOST_ASIO_HAS_THREADS)
0348   if (one_thread_ || is_continuation)
0349   {
0350     if (thread_info_base* this_thread = thread_call_stack::contains(this))
0351     {
0352       ++static_cast<thread_info*>(this_thread)->private_outstanding_work;
0353       static_cast<thread_info*>(this_thread)->private_op_queue.push(op);
0354       return;
0355     }
0356   }
0357 #else // defined(BOOST_ASIO_HAS_THREADS)
0358   (void)is_continuation;
0359 #endif // defined(BOOST_ASIO_HAS_THREADS)
0360 
0361   work_started();
0362   mutex::scoped_lock lock(mutex_);
0363   op_queue_.push(op);
0364   wake_one_thread_and_unlock(lock);
0365 }
0366 
0367 void scheduler::post_immediate_completions(std::size_t n,
0368     op_queue<scheduler::operation>& ops, bool is_continuation)
0369 {
0370 #if defined(BOOST_ASIO_HAS_THREADS)
0371   if (one_thread_ || is_continuation)
0372   {
0373     if (thread_info_base* this_thread = thread_call_stack::contains(this))
0374     {
0375       static_cast<thread_info*>(this_thread)->private_outstanding_work
0376         += static_cast<long>(n);
0377       static_cast<thread_info*>(this_thread)->private_op_queue.push(ops);
0378       return;
0379     }
0380   }
0381 #else // defined(BOOST_ASIO_HAS_THREADS)
0382   (void)is_continuation;
0383 #endif // defined(BOOST_ASIO_HAS_THREADS)
0384 
0385   increment(outstanding_work_, static_cast<long>(n));
0386   mutex::scoped_lock lock(mutex_);
0387   op_queue_.push(ops);
0388   wake_one_thread_and_unlock(lock);
0389 }
0390 
0391 void scheduler::post_deferred_completion(scheduler::operation* op)
0392 {
0393 #if defined(BOOST_ASIO_HAS_THREADS)
0394   if (one_thread_)
0395   {
0396     if (thread_info_base* this_thread = thread_call_stack::contains(this))
0397     {
0398       static_cast<thread_info*>(this_thread)->private_op_queue.push(op);
0399       return;
0400     }
0401   }
0402 #endif // defined(BOOST_ASIO_HAS_THREADS)
0403 
0404   mutex::scoped_lock lock(mutex_);
0405   op_queue_.push(op);
0406   wake_one_thread_and_unlock(lock);
0407 }
0408 
0409 void scheduler::post_deferred_completions(
0410     op_queue<scheduler::operation>& ops)
0411 {
0412   if (!ops.empty())
0413   {
0414 #if defined(BOOST_ASIO_HAS_THREADS)
0415     if (one_thread_)
0416     {
0417       if (thread_info_base* this_thread = thread_call_stack::contains(this))
0418       {
0419         static_cast<thread_info*>(this_thread)->private_op_queue.push(ops);
0420         return;
0421       }
0422     }
0423 #endif // defined(BOOST_ASIO_HAS_THREADS)
0424 
0425     mutex::scoped_lock lock(mutex_);
0426     op_queue_.push(ops);
0427     wake_one_thread_and_unlock(lock);
0428   }
0429 }
0430 
0431 void scheduler::do_dispatch(
0432     scheduler::operation* op)
0433 {
0434   work_started();
0435   mutex::scoped_lock lock(mutex_);
0436   op_queue_.push(op);
0437   wake_one_thread_and_unlock(lock);
0438 }
0439 
0440 void scheduler::abandon_operations(
0441     op_queue<scheduler::operation>& ops)
0442 {
0443   op_queue<scheduler::operation> ops2;
0444   ops2.push(ops);
0445 }
0446 
0447 std::size_t scheduler::do_run_one(mutex::scoped_lock& lock,
0448     scheduler::thread_info& this_thread,
0449     const boost::system::error_code& ec)
0450 {
0451   while (!stopped_)
0452   {
0453     if (!op_queue_.empty())
0454     {
0455       // Prepare to execute first handler from queue.
0456       operation* o = op_queue_.front();
0457       op_queue_.pop();
0458       bool more_handlers = (!op_queue_.empty());
0459 
0460       if (o == &task_operation_)
0461       {
0462         task_interrupted_ = more_handlers || task_usec_ == 0;
0463 
0464         if (more_handlers && !one_thread_ && wait_usec_ != 0)
0465           wakeup_event_.unlock_and_signal_one(lock);
0466         else
0467           lock.unlock();
0468 
0469         task_cleanup on_exit = { this, &lock, &this_thread };
0470         (void)on_exit;
0471 
0472         // Run the task. May throw an exception. Only block if the operation
0473         // queue is empty and we're not polling, otherwise we want to return
0474         // as soon as possible.
0475         task_->run(more_handlers ? 0 : task_usec_,
0476             this_thread.private_op_queue);
0477       }
0478       else
0479       {
0480         std::size_t task_result = o->task_result_;
0481 
0482         if (more_handlers && !one_thread_)
0483           wake_one_thread_and_unlock(lock);
0484         else
0485           lock.unlock();
0486 
0487         // Ensure the count of outstanding work is decremented on block exit.
0488         work_cleanup on_exit = { this, &lock, &this_thread };
0489         (void)on_exit;
0490 
0491         // Complete the operation. May throw an exception. Deletes the object.
0492         o->complete(this, ec, task_result);
0493         this_thread.rethrow_pending_exception();
0494 
0495         return 1;
0496       }
0497     }
0498     else
0499     {
0500       if (wait_usec_ == 0)
0501       {
0502         lock.unlock();
0503         lock.lock();
0504       }
0505       else
0506       {
0507         wakeup_event_.clear(lock);
0508         if (wait_usec_ > 0)
0509           wakeup_event_.wait_for_usec(lock, wait_usec_);
0510         else
0511           wakeup_event_.wait(lock);
0512       }
0513     }
0514   }
0515 
0516   return 0;
0517 }
0518 
0519 std::size_t scheduler::do_wait_one(mutex::scoped_lock& lock,
0520     scheduler::thread_info& this_thread, long usec,
0521     const boost::system::error_code& ec)
0522 {
0523   if (stopped_)
0524     return 0;
0525 
0526   operation* o = op_queue_.front();
0527   if (o == 0)
0528   {
0529     wakeup_event_.clear(lock);
0530     usec = (wait_usec_ >= 0 && wait_usec_ < usec) ? wait_usec_ : usec;
0531     wakeup_event_.wait_for_usec(lock, usec);
0532     usec = 0; // Wait at most once.
0533     o = op_queue_.front();
0534   }
0535 
0536   if (o == &task_operation_)
0537   {
0538     op_queue_.pop();
0539     bool more_handlers = (!op_queue_.empty());
0540 
0541     usec = (task_usec_ >= 0 && task_usec_ < usec) ? task_usec_ : usec;
0542     task_interrupted_ = more_handlers || usec == 0;
0543 
0544     if (more_handlers && !one_thread_ && wait_usec_ != 0)
0545       wakeup_event_.unlock_and_signal_one(lock);
0546     else
0547       lock.unlock();
0548 
0549     {
0550       task_cleanup on_exit = { this, &lock, &this_thread };
0551       (void)on_exit;
0552 
0553       // Run the task. May throw an exception. Only block if the operation
0554       // queue is empty and we're not polling, otherwise we want to return
0555       // as soon as possible.
0556       task_->run(more_handlers ? 0 : usec, this_thread.private_op_queue);
0557     }
0558 
0559     o = op_queue_.front();
0560     if (o == &task_operation_)
0561     {
0562       if (!one_thread_)
0563         wakeup_event_.maybe_unlock_and_signal_one(lock);
0564       return 0;
0565     }
0566   }
0567 
0568   if (o == 0)
0569     return 0;
0570 
0571   op_queue_.pop();
0572   bool more_handlers = (!op_queue_.empty());
0573 
0574   std::size_t task_result = o->task_result_;
0575 
0576   if (more_handlers && !one_thread_)
0577     wake_one_thread_and_unlock(lock);
0578   else
0579     lock.unlock();
0580 
0581   // Ensure the count of outstanding work is decremented on block exit.
0582   work_cleanup on_exit = { this, &lock, &this_thread };
0583   (void)on_exit;
0584 
0585   // Complete the operation. May throw an exception. Deletes the object.
0586   o->complete(this, ec, task_result);
0587   this_thread.rethrow_pending_exception();
0588 
0589   return 1;
0590 }
0591 
0592 std::size_t scheduler::do_poll_one(mutex::scoped_lock& lock,
0593     scheduler::thread_info& this_thread,
0594     const boost::system::error_code& ec)
0595 {
0596   if (stopped_)
0597     return 0;
0598 
0599   operation* o = op_queue_.front();
0600   if (o == &task_operation_)
0601   {
0602     op_queue_.pop();
0603     lock.unlock();
0604 
0605     {
0606       task_cleanup c = { this, &lock, &this_thread };
0607       (void)c;
0608 
0609       // Run the task. May throw an exception. Only block if the operation
0610       // queue is empty and we're not polling, otherwise we want to return
0611       // as soon as possible.
0612       task_->run(0, this_thread.private_op_queue);
0613     }
0614 
0615     o = op_queue_.front();
0616     if (o == &task_operation_)
0617     {
0618       wakeup_event_.maybe_unlock_and_signal_one(lock);
0619       return 0;
0620     }
0621   }
0622 
0623   if (o == 0)
0624     return 0;
0625 
0626   op_queue_.pop();
0627   bool more_handlers = (!op_queue_.empty());
0628 
0629   std::size_t task_result = o->task_result_;
0630 
0631   if (more_handlers && !one_thread_)
0632     wake_one_thread_and_unlock(lock);
0633   else
0634     lock.unlock();
0635 
0636   // Ensure the count of outstanding work is decremented on block exit.
0637   work_cleanup on_exit = { this, &lock, &this_thread };
0638   (void)on_exit;
0639 
0640   // Complete the operation. May throw an exception. Deletes the object.
0641   o->complete(this, ec, task_result);
0642   this_thread.rethrow_pending_exception();
0643 
0644   return 1;
0645 }
0646 
0647 void scheduler::stop_all_threads(
0648     mutex::scoped_lock& lock)
0649 {
0650   stopped_ = true;
0651   wakeup_event_.signal_all(lock);
0652 
0653   if (!task_interrupted_ && task_)
0654   {
0655     task_interrupted_ = true;
0656     task_->interrupt();
0657   }
0658 }
0659 
0660 void scheduler::wake_one_thread_and_unlock(
0661     mutex::scoped_lock& lock)
0662 {
0663   if (wait_usec_ == 0 || !wakeup_event_.maybe_unlock_and_signal_one(lock))
0664   {
0665     if (!task_interrupted_ && task_)
0666     {
0667       task_interrupted_ = true;
0668       task_->interrupt();
0669     }
0670     lock.unlock();
0671   }
0672 }
0673 
0674 scheduler_task* scheduler::get_default_task(boost::asio::execution_context& ctx)
0675 {
0676 #if defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
0677   return &use_service<io_uring_service>(ctx);
0678 #else // defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
0679   return &use_service<reactor>(ctx);
0680 #endif // defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
0681 }
0682 
0683 } // namespace detail
0684 } // namespace asio
0685 } // namespace boost
0686 
0687 #include <boost/asio/detail/pop_options.hpp>
0688 
0689 #endif // BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP