File indexing completed on 2025-09-16 08:28:42
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #ifndef BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP
0012 #define BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP
0013
0014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0015 # pragma once
0016 #endif
0017
0018 #include <boost/asio/detail/config.hpp>
0019
0020 #include <boost/asio/config.hpp>
0021 #include <boost/asio/detail/event.hpp>
0022 #include <boost/asio/detail/limits.hpp>
0023 #include <boost/asio/detail/scheduler.hpp>
0024 #include <boost/asio/detail/scheduler_thread_info.hpp>
0025 #include <boost/asio/detail/signal_blocker.hpp>
0026
0027 #if defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
0028 # include <boost/asio/detail/io_uring_service.hpp>
0029 #else
0030 # include <boost/asio/detail/reactor.hpp>
0031 #endif
0032
0033 #include <boost/asio/detail/push_options.hpp>
0034
0035 namespace boost {
0036 namespace asio {
0037 namespace detail {
0038
0039 class scheduler::thread_function
0040 {
0041 public:
0042 explicit thread_function(scheduler* s)
0043 : this_(s)
0044 {
0045 }
0046
0047 void operator()()
0048 {
0049 boost::system::error_code ec;
0050 this_->run(ec);
0051 }
0052
0053 private:
0054 scheduler* this_;
0055 };
0056
0057 struct scheduler::task_cleanup
0058 {
0059 ~task_cleanup()
0060 {
0061 if (this_thread_->private_outstanding_work > 0)
0062 {
0063 boost::asio::detail::increment(
0064 scheduler_->outstanding_work_,
0065 this_thread_->private_outstanding_work);
0066 }
0067 this_thread_->private_outstanding_work = 0;
0068
0069
0070
0071 lock_->lock();
0072 scheduler_->task_interrupted_ = true;
0073 scheduler_->op_queue_.push(this_thread_->private_op_queue);
0074 scheduler_->op_queue_.push(&scheduler_->task_operation_);
0075 }
0076
0077 scheduler* scheduler_;
0078 mutex::scoped_lock* lock_;
0079 thread_info* this_thread_;
0080 };
0081
0082 struct scheduler::work_cleanup
0083 {
0084 ~work_cleanup()
0085 {
0086 if (this_thread_->private_outstanding_work > 1)
0087 {
0088 boost::asio::detail::increment(
0089 scheduler_->outstanding_work_,
0090 this_thread_->private_outstanding_work - 1);
0091 }
0092 else if (this_thread_->private_outstanding_work < 1)
0093 {
0094 scheduler_->work_finished();
0095 }
0096 this_thread_->private_outstanding_work = 0;
0097
0098 #if defined(BOOST_ASIO_HAS_THREADS)
0099 if (!this_thread_->private_op_queue.empty())
0100 {
0101 lock_->lock();
0102 scheduler_->op_queue_.push(this_thread_->private_op_queue);
0103 }
0104 #endif
0105 }
0106
0107 scheduler* scheduler_;
0108 mutex::scoped_lock* lock_;
0109 thread_info* this_thread_;
0110 };
0111
0112 scheduler::scheduler(boost::asio::execution_context& ctx,
0113 bool own_thread, get_task_func_type get_task)
0114 : boost::asio::detail::execution_context_service_base<scheduler>(ctx),
0115 one_thread_(config(ctx).get("scheduler", "concurrency_hint", 0) == 1),
0116 mutex_(config(ctx).get("scheduler", "locking", true),
0117 config(ctx).get("scheduler", "locking_spin_count", 0)),
0118 task_(0),
0119 get_task_(get_task),
0120 task_interrupted_(true),
0121 outstanding_work_(0),
0122 stopped_(false),
0123 shutdown_(false),
0124 concurrency_hint_(config(ctx).get("scheduler", "concurrency_hint", 0)),
0125 task_usec_(config(ctx).get("scheduler", "task_usec", -1L)),
0126 wait_usec_(config(ctx).get("scheduler", "wait_usec", -1L)),
0127 thread_(0)
0128 {
0129 BOOST_ASIO_HANDLER_TRACKING_INIT;
0130
0131 if (own_thread)
0132 {
0133 ++outstanding_work_;
0134 boost::asio::detail::signal_blocker sb;
0135 thread_ = new boost::asio::detail::thread(thread_function(this));
0136 }
0137 }
0138
0139 scheduler::~scheduler()
0140 {
0141 if (thread_)
0142 {
0143 mutex::scoped_lock lock(mutex_);
0144 shutdown_ = true;
0145 stop_all_threads(lock);
0146 lock.unlock();
0147 thread_->join();
0148 delete thread_;
0149 }
0150 }
0151
0152 void scheduler::shutdown()
0153 {
0154 mutex::scoped_lock lock(mutex_);
0155 shutdown_ = true;
0156 if (thread_)
0157 stop_all_threads(lock);
0158 lock.unlock();
0159
0160
0161 if (thread_)
0162 {
0163 thread_->join();
0164 delete thread_;
0165 thread_ = 0;
0166 }
0167
0168
0169 while (!op_queue_.empty())
0170 {
0171 operation* o = op_queue_.front();
0172 op_queue_.pop();
0173 if (o != &task_operation_)
0174 o->destroy();
0175 }
0176
0177
0178 task_ = 0;
0179 }
0180
0181 void scheduler::init_task()
0182 {
0183 mutex::scoped_lock lock(mutex_);
0184 if (!shutdown_ && !task_)
0185 {
0186 task_ = get_task_(this->context());
0187 op_queue_.push(&task_operation_);
0188 wake_one_thread_and_unlock(lock);
0189 }
0190 }
0191
0192 std::size_t scheduler::run(boost::system::error_code& ec)
0193 {
0194 ec = boost::system::error_code();
0195 if (outstanding_work_ == 0)
0196 {
0197 stop();
0198 return 0;
0199 }
0200
0201 thread_info this_thread;
0202 this_thread.private_outstanding_work = 0;
0203 thread_call_stack::context ctx(this, this_thread);
0204
0205 mutex::scoped_lock lock(mutex_);
0206
0207 std::size_t n = 0;
0208 for (; do_run_one(lock, this_thread, ec); lock.lock())
0209 if (n != (std::numeric_limits<std::size_t>::max)())
0210 ++n;
0211 return n;
0212 }
0213
0214 std::size_t scheduler::run_one(boost::system::error_code& ec)
0215 {
0216 ec = boost::system::error_code();
0217 if (outstanding_work_ == 0)
0218 {
0219 stop();
0220 return 0;
0221 }
0222
0223 thread_info this_thread;
0224 this_thread.private_outstanding_work = 0;
0225 thread_call_stack::context ctx(this, this_thread);
0226
0227 mutex::scoped_lock lock(mutex_);
0228
0229 return do_run_one(lock, this_thread, ec);
0230 }
0231
0232 std::size_t scheduler::wait_one(long usec, boost::system::error_code& ec)
0233 {
0234 ec = boost::system::error_code();
0235 if (outstanding_work_ == 0)
0236 {
0237 stop();
0238 return 0;
0239 }
0240
0241 thread_info this_thread;
0242 this_thread.private_outstanding_work = 0;
0243 thread_call_stack::context ctx(this, this_thread);
0244
0245 mutex::scoped_lock lock(mutex_);
0246
0247 return do_wait_one(lock, this_thread, usec, ec);
0248 }
0249
0250 std::size_t scheduler::poll(boost::system::error_code& ec)
0251 {
0252 ec = boost::system::error_code();
0253 if (outstanding_work_ == 0)
0254 {
0255 stop();
0256 return 0;
0257 }
0258
0259 thread_info this_thread;
0260 this_thread.private_outstanding_work = 0;
0261 thread_call_stack::context ctx(this, this_thread);
0262
0263 mutex::scoped_lock lock(mutex_);
0264
0265 #if defined(BOOST_ASIO_HAS_THREADS)
0266
0267
0268
0269 if (one_thread_)
0270 if (thread_info* outer_info = static_cast<thread_info*>(ctx.next_by_key()))
0271 op_queue_.push(outer_info->private_op_queue);
0272 #endif
0273
0274 std::size_t n = 0;
0275 for (; do_poll_one(lock, this_thread, ec); lock.lock())
0276 if (n != (std::numeric_limits<std::size_t>::max)())
0277 ++n;
0278 return n;
0279 }
0280
0281 std::size_t scheduler::poll_one(boost::system::error_code& ec)
0282 {
0283 ec = boost::system::error_code();
0284 if (outstanding_work_ == 0)
0285 {
0286 stop();
0287 return 0;
0288 }
0289
0290 thread_info this_thread;
0291 this_thread.private_outstanding_work = 0;
0292 thread_call_stack::context ctx(this, this_thread);
0293
0294 mutex::scoped_lock lock(mutex_);
0295
0296 #if defined(BOOST_ASIO_HAS_THREADS)
0297
0298
0299
0300 if (one_thread_)
0301 if (thread_info* outer_info = static_cast<thread_info*>(ctx.next_by_key()))
0302 op_queue_.push(outer_info->private_op_queue);
0303 #endif
0304
0305 return do_poll_one(lock, this_thread, ec);
0306 }
0307
0308 void scheduler::stop()
0309 {
0310 mutex::scoped_lock lock(mutex_);
0311 stop_all_threads(lock);
0312 }
0313
0314 bool scheduler::stopped() const
0315 {
0316 mutex::scoped_lock lock(mutex_);
0317 return stopped_;
0318 }
0319
0320 void scheduler::restart()
0321 {
0322 mutex::scoped_lock lock(mutex_);
0323 stopped_ = false;
0324 }
0325
0326 void scheduler::compensating_work_started()
0327 {
0328 thread_info_base* this_thread = thread_call_stack::contains(this);
0329 BOOST_ASIO_ASSUME(this_thread != 0);
0330 ++static_cast<thread_info*>(this_thread)->private_outstanding_work;
0331 }
0332
0333 bool scheduler::can_dispatch()
0334 {
0335 return thread_call_stack::contains(this) != 0;
0336 }
0337
0338 void scheduler::capture_current_exception()
0339 {
0340 if (thread_info_base* this_thread = thread_call_stack::contains(this))
0341 this_thread->capture_current_exception();
0342 }
0343
0344 void scheduler::post_immediate_completion(
0345 scheduler::operation* op, bool is_continuation)
0346 {
0347 #if defined(BOOST_ASIO_HAS_THREADS)
0348 if (one_thread_ || is_continuation)
0349 {
0350 if (thread_info_base* this_thread = thread_call_stack::contains(this))
0351 {
0352 ++static_cast<thread_info*>(this_thread)->private_outstanding_work;
0353 static_cast<thread_info*>(this_thread)->private_op_queue.push(op);
0354 return;
0355 }
0356 }
0357 #else
0358 (void)is_continuation;
0359 #endif
0360
0361 work_started();
0362 mutex::scoped_lock lock(mutex_);
0363 op_queue_.push(op);
0364 wake_one_thread_and_unlock(lock);
0365 }
0366
0367 void scheduler::post_immediate_completions(std::size_t n,
0368 op_queue<scheduler::operation>& ops, bool is_continuation)
0369 {
0370 #if defined(BOOST_ASIO_HAS_THREADS)
0371 if (one_thread_ || is_continuation)
0372 {
0373 if (thread_info_base* this_thread = thread_call_stack::contains(this))
0374 {
0375 static_cast<thread_info*>(this_thread)->private_outstanding_work
0376 += static_cast<long>(n);
0377 static_cast<thread_info*>(this_thread)->private_op_queue.push(ops);
0378 return;
0379 }
0380 }
0381 #else
0382 (void)is_continuation;
0383 #endif
0384
0385 increment(outstanding_work_, static_cast<long>(n));
0386 mutex::scoped_lock lock(mutex_);
0387 op_queue_.push(ops);
0388 wake_one_thread_and_unlock(lock);
0389 }
0390
0391 void scheduler::post_deferred_completion(scheduler::operation* op)
0392 {
0393 #if defined(BOOST_ASIO_HAS_THREADS)
0394 if (one_thread_)
0395 {
0396 if (thread_info_base* this_thread = thread_call_stack::contains(this))
0397 {
0398 static_cast<thread_info*>(this_thread)->private_op_queue.push(op);
0399 return;
0400 }
0401 }
0402 #endif
0403
0404 mutex::scoped_lock lock(mutex_);
0405 op_queue_.push(op);
0406 wake_one_thread_and_unlock(lock);
0407 }
0408
0409 void scheduler::post_deferred_completions(
0410 op_queue<scheduler::operation>& ops)
0411 {
0412 if (!ops.empty())
0413 {
0414 #if defined(BOOST_ASIO_HAS_THREADS)
0415 if (one_thread_)
0416 {
0417 if (thread_info_base* this_thread = thread_call_stack::contains(this))
0418 {
0419 static_cast<thread_info*>(this_thread)->private_op_queue.push(ops);
0420 return;
0421 }
0422 }
0423 #endif
0424
0425 mutex::scoped_lock lock(mutex_);
0426 op_queue_.push(ops);
0427 wake_one_thread_and_unlock(lock);
0428 }
0429 }
0430
0431 void scheduler::do_dispatch(
0432 scheduler::operation* op)
0433 {
0434 work_started();
0435 mutex::scoped_lock lock(mutex_);
0436 op_queue_.push(op);
0437 wake_one_thread_and_unlock(lock);
0438 }
0439
0440 void scheduler::abandon_operations(
0441 op_queue<scheduler::operation>& ops)
0442 {
0443 op_queue<scheduler::operation> ops2;
0444 ops2.push(ops);
0445 }
0446
0447 std::size_t scheduler::do_run_one(mutex::scoped_lock& lock,
0448 scheduler::thread_info& this_thread,
0449 const boost::system::error_code& ec)
0450 {
0451 while (!stopped_)
0452 {
0453 if (!op_queue_.empty())
0454 {
0455
0456 operation* o = op_queue_.front();
0457 op_queue_.pop();
0458 bool more_handlers = (!op_queue_.empty());
0459
0460 if (o == &task_operation_)
0461 {
0462 task_interrupted_ = more_handlers || task_usec_ == 0;
0463
0464 if (more_handlers && !one_thread_ && wait_usec_ != 0)
0465 wakeup_event_.unlock_and_signal_one(lock);
0466 else
0467 lock.unlock();
0468
0469 task_cleanup on_exit = { this, &lock, &this_thread };
0470 (void)on_exit;
0471
0472
0473
0474
0475 task_->run(more_handlers ? 0 : task_usec_,
0476 this_thread.private_op_queue);
0477 }
0478 else
0479 {
0480 std::size_t task_result = o->task_result_;
0481
0482 if (more_handlers && !one_thread_)
0483 wake_one_thread_and_unlock(lock);
0484 else
0485 lock.unlock();
0486
0487
0488 work_cleanup on_exit = { this, &lock, &this_thread };
0489 (void)on_exit;
0490
0491
0492 o->complete(this, ec, task_result);
0493 this_thread.rethrow_pending_exception();
0494
0495 return 1;
0496 }
0497 }
0498 else
0499 {
0500 if (wait_usec_ == 0)
0501 {
0502 lock.unlock();
0503 lock.lock();
0504 }
0505 else
0506 {
0507 wakeup_event_.clear(lock);
0508 if (wait_usec_ > 0)
0509 wakeup_event_.wait_for_usec(lock, wait_usec_);
0510 else
0511 wakeup_event_.wait(lock);
0512 }
0513 }
0514 }
0515
0516 return 0;
0517 }
0518
0519 std::size_t scheduler::do_wait_one(mutex::scoped_lock& lock,
0520 scheduler::thread_info& this_thread, long usec,
0521 const boost::system::error_code& ec)
0522 {
0523 if (stopped_)
0524 return 0;
0525
0526 operation* o = op_queue_.front();
0527 if (o == 0)
0528 {
0529 wakeup_event_.clear(lock);
0530 usec = (wait_usec_ >= 0 && wait_usec_ < usec) ? wait_usec_ : usec;
0531 wakeup_event_.wait_for_usec(lock, usec);
0532 usec = 0;
0533 o = op_queue_.front();
0534 }
0535
0536 if (o == &task_operation_)
0537 {
0538 op_queue_.pop();
0539 bool more_handlers = (!op_queue_.empty());
0540
0541 usec = (task_usec_ >= 0 && task_usec_ < usec) ? task_usec_ : usec;
0542 task_interrupted_ = more_handlers || usec == 0;
0543
0544 if (more_handlers && !one_thread_ && wait_usec_ != 0)
0545 wakeup_event_.unlock_and_signal_one(lock);
0546 else
0547 lock.unlock();
0548
0549 {
0550 task_cleanup on_exit = { this, &lock, &this_thread };
0551 (void)on_exit;
0552
0553
0554
0555
0556 task_->run(more_handlers ? 0 : usec, this_thread.private_op_queue);
0557 }
0558
0559 o = op_queue_.front();
0560 if (o == &task_operation_)
0561 {
0562 if (!one_thread_)
0563 wakeup_event_.maybe_unlock_and_signal_one(lock);
0564 return 0;
0565 }
0566 }
0567
0568 if (o == 0)
0569 return 0;
0570
0571 op_queue_.pop();
0572 bool more_handlers = (!op_queue_.empty());
0573
0574 std::size_t task_result = o->task_result_;
0575
0576 if (more_handlers && !one_thread_)
0577 wake_one_thread_and_unlock(lock);
0578 else
0579 lock.unlock();
0580
0581
0582 work_cleanup on_exit = { this, &lock, &this_thread };
0583 (void)on_exit;
0584
0585
0586 o->complete(this, ec, task_result);
0587 this_thread.rethrow_pending_exception();
0588
0589 return 1;
0590 }
0591
0592 std::size_t scheduler::do_poll_one(mutex::scoped_lock& lock,
0593 scheduler::thread_info& this_thread,
0594 const boost::system::error_code& ec)
0595 {
0596 if (stopped_)
0597 return 0;
0598
0599 operation* o = op_queue_.front();
0600 if (o == &task_operation_)
0601 {
0602 op_queue_.pop();
0603 lock.unlock();
0604
0605 {
0606 task_cleanup c = { this, &lock, &this_thread };
0607 (void)c;
0608
0609
0610
0611
0612 task_->run(0, this_thread.private_op_queue);
0613 }
0614
0615 o = op_queue_.front();
0616 if (o == &task_operation_)
0617 {
0618 wakeup_event_.maybe_unlock_and_signal_one(lock);
0619 return 0;
0620 }
0621 }
0622
0623 if (o == 0)
0624 return 0;
0625
0626 op_queue_.pop();
0627 bool more_handlers = (!op_queue_.empty());
0628
0629 std::size_t task_result = o->task_result_;
0630
0631 if (more_handlers && !one_thread_)
0632 wake_one_thread_and_unlock(lock);
0633 else
0634 lock.unlock();
0635
0636
0637 work_cleanup on_exit = { this, &lock, &this_thread };
0638 (void)on_exit;
0639
0640
0641 o->complete(this, ec, task_result);
0642 this_thread.rethrow_pending_exception();
0643
0644 return 1;
0645 }
0646
0647 void scheduler::stop_all_threads(
0648 mutex::scoped_lock& lock)
0649 {
0650 stopped_ = true;
0651 wakeup_event_.signal_all(lock);
0652
0653 if (!task_interrupted_ && task_)
0654 {
0655 task_interrupted_ = true;
0656 task_->interrupt();
0657 }
0658 }
0659
0660 void scheduler::wake_one_thread_and_unlock(
0661 mutex::scoped_lock& lock)
0662 {
0663 if (wait_usec_ == 0 || !wakeup_event_.maybe_unlock_and_signal_one(lock))
0664 {
0665 if (!task_interrupted_ && task_)
0666 {
0667 task_interrupted_ = true;
0668 task_->interrupt();
0669 }
0670 lock.unlock();
0671 }
0672 }
0673
0674 scheduler_task* scheduler::get_default_task(boost::asio::execution_context& ctx)
0675 {
0676 #if defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
0677 return &use_service<io_uring_service>(ctx);
0678 #else
0679 return &use_service<reactor>(ctx);
0680 #endif
0681 }
0682
0683 }
0684 }
0685 }
0686
0687 #include <boost/asio/detail/pop_options.hpp>
0688
0689 #endif