File indexing completed on 2025-12-16 09:42:58
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #ifndef BOOST_ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP
0012 #define BOOST_ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP
0013
0014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0015 # pragma once
0016 #endif
0017
0018 #include <boost/asio/detail/config.hpp>
0019
0020 #if defined(BOOST_ASIO_HAS_IO_URING)
0021
0022 #include <cstddef>
0023 #include <sys/eventfd.h>
0024 #include <boost/asio/detail/io_uring_service.hpp>
0025 #include <boost/asio/detail/reactor_op.hpp>
0026 #include <boost/asio/detail/scheduler.hpp>
0027 #include <boost/asio/detail/throw_error.hpp>
0028 #include <boost/asio/error.hpp>
0029
0030 #include <boost/asio/detail/push_options.hpp>
0031
0032 namespace boost {
0033 namespace asio {
0034 namespace detail {
0035
0036 io_uring_service::io_uring_service(boost::asio::execution_context& ctx)
0037 : execution_context_service_base<io_uring_service>(ctx),
0038 scheduler_(use_service<scheduler>(ctx)),
0039 mutex_(config(ctx).get("reactor", "registration_locking", true),
0040 config(ctx).get("reactor", "registration_locking_spin_count", 0)),
0041 outstanding_work_(0),
0042 submit_sqes_op_(this),
0043 pending_sqes_(0),
0044 pending_submit_sqes_op_(false),
0045 shutdown_(false),
0046 io_locking_(config(ctx).get("reactor", "io_locking", true)),
0047 io_locking_spin_count_(
0048 config(ctx).get("reactor", "io_locking_spin_count", 0)),
0049 timeout_(),
0050 registration_mutex_(mutex_.enabled()),
0051 registered_io_objects_(
0052 config(ctx).get("reactor", "preallocated_io_objects", 0U),
0053 io_locking_, io_locking_spin_count_),
0054 reactor_(use_service<reactor>(ctx)),
0055 reactor_data_(),
0056 event_fd_(-1)
0057 {
0058 reactor_.init_task();
0059 init_ring();
0060 register_with_reactor();
0061 }
0062
0063 io_uring_service::~io_uring_service()
0064 {
0065 if (ring_.ring_fd != -1)
0066 ::io_uring_queue_exit(&ring_);
0067 if (event_fd_ != -1)
0068 ::close(event_fd_);
0069 }
0070
0071 void io_uring_service::shutdown()
0072 {
0073 mutex::scoped_lock lock(mutex_);
0074 shutdown_ = true;
0075 lock.unlock();
0076
0077 op_queue<operation> ops;
0078
0079
0080 while (io_object* io_obj = registered_io_objects_.first())
0081 {
0082 for (int i = 0; i < max_ops; ++i)
0083 {
0084 if (!io_obj->queues_[i].op_queue_.empty())
0085 {
0086 ops.push(io_obj->queues_[i].op_queue_);
0087 if (::io_uring_sqe* sqe = get_sqe())
0088 ::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0);
0089 }
0090 }
0091 io_obj->shutdown_ = true;
0092 registered_io_objects_.free(io_obj);
0093 }
0094
0095
0096 if (::io_uring_sqe* sqe = get_sqe())
0097 ::io_uring_prep_cancel(sqe, &timeout_, IOSQE_IO_DRAIN);
0098 submit_sqes();
0099
0100
0101 for (; outstanding_work_ > 0; --outstanding_work_)
0102 {
0103 ::io_uring_cqe* cqe = 0;
0104 if (::io_uring_wait_cqe(&ring_, &cqe) != 0)
0105 break;
0106 }
0107
0108 timer_queues_.get_all_timers(ops);
0109
0110 scheduler_.abandon_operations(ops);
0111 }
0112
0113 void io_uring_service::notify_fork(
0114 boost::asio::execution_context::fork_event fork_ev)
0115 {
0116 switch (fork_ev)
0117 {
0118 case boost::asio::execution_context::fork_prepare:
0119 {
0120
0121
0122 mutex::scoped_lock registration_lock(registration_mutex_);
0123 for (io_object* io_obj = registered_io_objects_.first();
0124 io_obj != 0; io_obj = io_obj->next_)
0125 {
0126 mutex::scoped_lock io_object_lock(io_obj->mutex_);
0127 for (int i = 0; i < max_ops; ++i)
0128 {
0129 if (!io_obj->queues_[i].op_queue_.empty()
0130 && !io_obj->queues_[i].cancel_requested_)
0131 {
0132 mutex::scoped_lock lock(mutex_);
0133 if (::io_uring_sqe* sqe = get_sqe())
0134 ::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0);
0135 }
0136 }
0137 }
0138
0139
0140 {
0141 mutex::scoped_lock lock(mutex_);
0142 if (::io_uring_sqe* sqe = get_sqe())
0143 ::io_uring_prep_cancel(sqe, &timeout_, IOSQE_IO_DRAIN);
0144 submit_sqes();
0145 }
0146
0147
0148
0149
0150
0151 op_queue<operation> ops;
0152 for (; outstanding_work_ > 0; --outstanding_work_)
0153 {
0154 ::io_uring_cqe* cqe = 0;
0155 if (::io_uring_wait_cqe(&ring_, &cqe) != 0)
0156 break;
0157 if (void* ptr = ::io_uring_cqe_get_data(cqe))
0158 {
0159 if (ptr != this && ptr != &timer_queues_ && ptr != &timeout_)
0160 {
0161 io_queue* io_q = static_cast<io_queue*>(ptr);
0162 io_q->set_result(cqe->res);
0163 ops.push(io_q);
0164 }
0165 }
0166 }
0167 scheduler_.post_deferred_completions(ops);
0168
0169
0170 register_with_reactor();
0171 }
0172 break;
0173
0174 case boost::asio::execution_context::fork_parent:
0175
0176 update_timeout();
0177 register_with_reactor();
0178 break;
0179
0180 case boost::asio::execution_context::fork_child:
0181 {
0182
0183 ::io_uring_queue_exit(&ring_);
0184 init_ring();
0185 register_with_reactor();
0186 }
0187 break;
0188 default:
0189 break;
0190 }
0191 }
0192
0193 void io_uring_service::init_task()
0194 {
0195 scheduler_.init_task();
0196 }
0197
0198 void io_uring_service::register_io_object(
0199 io_uring_service::per_io_object_data& io_obj)
0200 {
0201 io_obj = allocate_io_object();
0202
0203 mutex::scoped_lock io_object_lock(io_obj->mutex_);
0204
0205 io_obj->service_ = this;
0206 io_obj->shutdown_ = false;
0207 for (int i = 0; i < max_ops; ++i)
0208 {
0209 io_obj->queues_[i].io_object_ = io_obj;
0210 io_obj->queues_[i].cancel_requested_ = false;
0211 }
0212 }
0213
0214 void io_uring_service::register_internal_io_object(
0215 io_uring_service::per_io_object_data& io_obj,
0216 int op_type, io_uring_operation* op)
0217 {
0218 io_obj = allocate_io_object();
0219
0220 mutex::scoped_lock io_object_lock(io_obj->mutex_);
0221
0222 io_obj->service_ = this;
0223 io_obj->shutdown_ = false;
0224 for (int i = 0; i < max_ops; ++i)
0225 {
0226 io_obj->queues_[i].io_object_ = io_obj;
0227 io_obj->queues_[i].cancel_requested_ = false;
0228 }
0229
0230 io_obj->queues_[op_type].op_queue_.push(op);
0231 io_object_lock.unlock();
0232 mutex::scoped_lock lock(mutex_);
0233 if (::io_uring_sqe* sqe = get_sqe())
0234 {
0235 op->prepare(sqe);
0236 ::io_uring_sqe_set_data(sqe, &io_obj->queues_[op_type]);
0237 post_submit_sqes_op(lock);
0238 }
0239 else
0240 {
0241 boost::system::error_code ec(ENOBUFS,
0242 boost::asio::error::get_system_category());
0243 boost::asio::detail::throw_error(ec, "io_uring_get_sqe");
0244 }
0245 }
0246
0247 void io_uring_service::register_buffers(const ::iovec* v, unsigned n)
0248 {
0249 int result = ::io_uring_register_buffers(&ring_, v, n);
0250 if (result < 0)
0251 {
0252 boost::system::error_code ec(-result,
0253 boost::asio::error::get_system_category());
0254 boost::asio::detail::throw_error(ec, "io_uring_register_buffers");
0255 }
0256 }
0257
0258 void io_uring_service::unregister_buffers()
0259 {
0260 (void)::io_uring_unregister_buffers(&ring_);
0261 }
0262
0263 void io_uring_service::start_op(int op_type,
0264 io_uring_service::per_io_object_data& io_obj,
0265 io_uring_operation* op, bool is_continuation)
0266 {
0267 if (!io_obj)
0268 {
0269 op->ec_ = boost::asio::error::bad_descriptor;
0270 post_immediate_completion(op, is_continuation);
0271 return;
0272 }
0273
0274 mutex::scoped_lock io_object_lock(io_obj->mutex_);
0275
0276 if (io_obj->shutdown_)
0277 {
0278 io_object_lock.unlock();
0279 post_immediate_completion(op, is_continuation);
0280 return;
0281 }
0282
0283 if (io_obj->queues_[op_type].op_queue_.empty())
0284 {
0285 if (op->perform(false))
0286 {
0287 io_object_lock.unlock();
0288 scheduler_.post_immediate_completion(op, is_continuation);
0289 }
0290 else
0291 {
0292 io_obj->queues_[op_type].op_queue_.push(op);
0293 io_object_lock.unlock();
0294 mutex::scoped_lock lock(mutex_);
0295 if (::io_uring_sqe* sqe = get_sqe())
0296 {
0297 op->prepare(sqe);
0298 ::io_uring_sqe_set_data(sqe, &io_obj->queues_[op_type]);
0299 scheduler_.work_started();
0300 post_submit_sqes_op(lock);
0301 }
0302 else
0303 {
0304 lock.unlock();
0305 io_obj->queues_[op_type].set_result(-ENOBUFS);
0306 post_immediate_completion(&io_obj->queues_[op_type], is_continuation);
0307 }
0308 }
0309 }
0310 else
0311 {
0312 io_obj->queues_[op_type].op_queue_.push(op);
0313 scheduler_.work_started();
0314 }
0315 }
0316
0317 void io_uring_service::cancel_ops(io_uring_service::per_io_object_data& io_obj)
0318 {
0319 if (!io_obj)
0320 return;
0321
0322 mutex::scoped_lock io_object_lock(io_obj->mutex_);
0323 op_queue<operation> ops;
0324 do_cancel_ops(io_obj, ops);
0325 io_object_lock.unlock();
0326 scheduler_.post_deferred_completions(ops);
0327 }
0328
0329 void io_uring_service::cancel_ops_by_key(
0330 io_uring_service::per_io_object_data& io_obj,
0331 int op_type, void* cancellation_key)
0332 {
0333 if (!io_obj)
0334 return;
0335
0336 mutex::scoped_lock io_object_lock(io_obj->mutex_);
0337
0338 bool first = true;
0339 op_queue<operation> ops;
0340 op_queue<io_uring_operation> other_ops;
0341 while (io_uring_operation* op = io_obj->queues_[op_type].op_queue_.front())
0342 {
0343 io_obj->queues_[op_type].op_queue_.pop();
0344 if (op->cancellation_key_ == cancellation_key)
0345 {
0346 if (first)
0347 {
0348 other_ops.push(op);
0349 if (!io_obj->queues_[op_type].cancel_requested_)
0350 {
0351 io_obj->queues_[op_type].cancel_requested_ = true;
0352 mutex::scoped_lock lock(mutex_);
0353 if (::io_uring_sqe* sqe = get_sqe())
0354 {
0355 ::io_uring_prep_cancel(sqe, &io_obj->queues_[op_type], 0);
0356 submit_sqes();
0357 }
0358 }
0359 }
0360 else
0361 {
0362 op->ec_ = boost::asio::error::operation_aborted;
0363 ops.push(op);
0364 }
0365 }
0366 else
0367 other_ops.push(op);
0368 first = false;
0369 }
0370 io_obj->queues_[op_type].op_queue_.push(other_ops);
0371
0372 io_object_lock.unlock();
0373
0374 scheduler_.post_deferred_completions(ops);
0375 }
0376
0377 void io_uring_service::deregister_io_object(
0378 io_uring_service::per_io_object_data& io_obj)
0379 {
0380 if (!io_obj)
0381 return;
0382
0383 mutex::scoped_lock io_object_lock(io_obj->mutex_);
0384 if (!io_obj->shutdown_)
0385 {
0386 op_queue<operation> ops;
0387 bool pending_cancelled_ops = do_cancel_ops(io_obj, ops);
0388 io_obj->shutdown_ = true;
0389 io_object_lock.unlock();
0390 scheduler_.post_deferred_completions(ops);
0391 if (pending_cancelled_ops)
0392 {
0393
0394
0395 io_obj = 0;
0396 }
0397 else
0398 {
0399
0400
0401 }
0402 }
0403 else
0404 {
0405
0406
0407 io_obj = 0;
0408 }
0409 }
0410
0411 void io_uring_service::cleanup_io_object(
0412 io_uring_service::per_io_object_data& io_obj)
0413 {
0414 if (io_obj)
0415 {
0416 free_io_object(io_obj);
0417 io_obj = 0;
0418 }
0419 }
0420
0421 void io_uring_service::run(long usec, op_queue<operation>& ops)
0422 {
0423 __kernel_timespec ts;
0424 int local_ops = 0;
0425
0426 if (usec > 0)
0427 {
0428 ts.tv_sec = usec / 1000000;
0429 ts.tv_nsec = (usec % 1000000) * 1000;
0430 mutex::scoped_lock lock(mutex_);
0431 if (::io_uring_sqe* sqe = get_sqe())
0432 {
0433 ++local_ops;
0434 ::io_uring_prep_timeout(sqe, &ts, 0, 0);
0435 ::io_uring_sqe_set_data(sqe, &ts);
0436 submit_sqes();
0437 }
0438 }
0439
0440 ::io_uring_cqe* cqe = 0;
0441 int result = (usec == 0)
0442 ? ::io_uring_peek_cqe(&ring_, &cqe)
0443 : ::io_uring_wait_cqe(&ring_, &cqe);
0444
0445 if (local_ops > 0)
0446 {
0447 if (result != 0 || ::io_uring_cqe_get_data(cqe) != &ts)
0448 {
0449 mutex::scoped_lock lock(mutex_);
0450 if (::io_uring_sqe* sqe = get_sqe())
0451 {
0452 ++local_ops;
0453 ::io_uring_prep_timeout_remove(sqe, reinterpret_cast<__u64>(&ts), 0);
0454 ::io_uring_sqe_set_data(sqe, &ts);
0455 submit_sqes();
0456 }
0457 }
0458 }
0459
0460 bool check_timers = false;
0461 int count = 0;
0462 while (result == 0 || local_ops > 0)
0463 {
0464 if (result == 0)
0465 {
0466 if (void* ptr = ::io_uring_cqe_get_data(cqe))
0467 {
0468 if (ptr == this)
0469 {
0470
0471 }
0472 else if (ptr == &timer_queues_)
0473 {
0474 check_timers = true;
0475 }
0476 else if (ptr == &timeout_)
0477 {
0478 check_timers = true;
0479 timeout_.tv_sec = 0;
0480 timeout_.tv_nsec = 0;
0481 }
0482 else if (ptr == &ts)
0483 {
0484 --local_ops;
0485 }
0486 else
0487 {
0488 io_queue* io_q = static_cast<io_queue*>(ptr);
0489 io_q->set_result(cqe->res);
0490 ops.push(io_q);
0491 }
0492 }
0493 ::io_uring_cqe_seen(&ring_, cqe);
0494 ++count;
0495 }
0496 result = (count < complete_batch_size || local_ops > 0)
0497 ? ::io_uring_peek_cqe(&ring_, &cqe) : -EAGAIN;
0498 }
0499
0500 decrement(outstanding_work_, count);
0501
0502 if (check_timers)
0503 {
0504 mutex::scoped_lock lock(mutex_);
0505 timer_queues_.get_ready_timers(ops);
0506 if (timeout_.tv_sec == 0 && timeout_.tv_nsec == 0)
0507 {
0508 timeout_ = get_timeout();
0509 if (::io_uring_sqe* sqe = get_sqe())
0510 {
0511 ::io_uring_prep_timeout(sqe, &timeout_, 0, 0);
0512 ::io_uring_sqe_set_data(sqe, &timeout_);
0513 push_submit_sqes_op(ops);
0514 }
0515 }
0516 }
0517 }
0518
0519 void io_uring_service::interrupt()
0520 {
0521 mutex::scoped_lock lock(mutex_);
0522 if (::io_uring_sqe* sqe = get_sqe())
0523 {
0524 ::io_uring_prep_nop(sqe);
0525 ::io_uring_sqe_set_data(sqe, this);
0526 }
0527 submit_sqes();
0528 }
0529
0530 void io_uring_service::init_ring()
0531 {
0532 int result = ::io_uring_queue_init(ring_size, &ring_, 0);
0533 if (result < 0)
0534 {
0535 ring_.ring_fd = -1;
0536 boost::system::error_code ec(-result,
0537 boost::asio::error::get_system_category());
0538 boost::asio::detail::throw_error(ec, "io_uring_queue_init");
0539 }
0540
0541 #if !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
0542 event_fd_ = ::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
0543 if (event_fd_ < 0)
0544 {
0545 boost::system::error_code ec(-result,
0546 boost::asio::error::get_system_category());
0547 ::io_uring_queue_exit(&ring_);
0548 boost::asio::detail::throw_error(ec, "eventfd");
0549 }
0550
0551 result = ::io_uring_register_eventfd(&ring_, event_fd_);
0552 if (result < 0)
0553 {
0554 ::close(event_fd_);
0555 ::io_uring_queue_exit(&ring_);
0556 boost::system::error_code ec(-result,
0557 boost::asio::error::get_system_category());
0558 boost::asio::detail::throw_error(ec, "io_uring_queue_init");
0559 }
0560 #endif
0561 }
0562
0563 #if !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
0564 class io_uring_service::event_fd_read_op :
0565 public reactor_op
0566 {
0567 public:
0568 event_fd_read_op(io_uring_service* s)
0569 : reactor_op(boost::system::error_code(),
0570 &event_fd_read_op::do_perform, event_fd_read_op::do_complete),
0571 service_(s)
0572 {
0573 }
0574
0575 static status do_perform(reactor_op* base)
0576 {
0577 event_fd_read_op* o(static_cast<event_fd_read_op*>(base));
0578
0579 for (;;)
0580 {
0581
0582 uint64_t counter(0);
0583 errno = 0;
0584 int bytes_read = ::read(o->service_->event_fd_,
0585 &counter, sizeof(uint64_t));
0586 if (bytes_read < 0 && errno == EINTR)
0587 continue;
0588 break;
0589 }
0590
0591 op_queue<operation> ops;
0592 o->service_->run(0, ops);
0593 o->service_->scheduler_.post_deferred_completions(ops);
0594
0595 return not_done;
0596 }
0597
0598 static void do_complete(void* , operation* base,
0599 const boost::system::error_code& ,
0600 std::size_t )
0601 {
0602 event_fd_read_op* o(static_cast<event_fd_read_op*>(base));
0603 delete o;
0604 }
0605
0606 private:
0607 io_uring_service* service_;
0608 };
0609 #endif
0610
0611 void io_uring_service::register_with_reactor()
0612 {
0613 #if !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
0614 reactor_.register_internal_descriptor(reactor::read_op,
0615 event_fd_, reactor_data_, new event_fd_read_op(this));
0616 #endif
0617 }
0618
0619 io_uring_service::io_object* io_uring_service::allocate_io_object()
0620 {
0621 mutex::scoped_lock registration_lock(registration_mutex_);
0622 return registered_io_objects_.alloc(io_locking_, io_locking_spin_count_);
0623 }
0624
0625 void io_uring_service::free_io_object(io_uring_service::io_object* io_obj)
0626 {
0627 mutex::scoped_lock registration_lock(registration_mutex_);
0628 registered_io_objects_.free(io_obj);
0629 }
0630
0631 bool io_uring_service::do_cancel_ops(
0632 per_io_object_data& io_obj, op_queue<operation>& ops)
0633 {
0634 bool cancel_op = false;
0635
0636 for (int i = 0; i < max_ops; ++i)
0637 {
0638 if (io_uring_operation* first_op = io_obj->queues_[i].op_queue_.front())
0639 {
0640 cancel_op = true;
0641 io_obj->queues_[i].op_queue_.pop();
0642 while (io_uring_operation* op = io_obj->queues_[i].op_queue_.front())
0643 {
0644 op->ec_ = boost::asio::error::operation_aborted;
0645 io_obj->queues_[i].op_queue_.pop();
0646 ops.push(op);
0647 }
0648 io_obj->queues_[i].op_queue_.push(first_op);
0649 }
0650 }
0651
0652 if (cancel_op)
0653 {
0654 mutex::scoped_lock lock(mutex_);
0655 for (int i = 0; i < max_ops; ++i)
0656 {
0657 if (!io_obj->queues_[i].op_queue_.empty()
0658 && !io_obj->queues_[i].cancel_requested_)
0659 {
0660 io_obj->queues_[i].cancel_requested_ = true;
0661 if (::io_uring_sqe* sqe = get_sqe())
0662 ::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0);
0663 }
0664 }
0665 submit_sqes();
0666 }
0667
0668 return cancel_op;
0669 }
0670
0671 void io_uring_service::do_add_timer_queue(timer_queue_base& queue)
0672 {
0673 mutex::scoped_lock lock(mutex_);
0674 timer_queues_.insert(&queue);
0675 }
0676
0677 void io_uring_service::do_remove_timer_queue(timer_queue_base& queue)
0678 {
0679 mutex::scoped_lock lock(mutex_);
0680 timer_queues_.erase(&queue);
0681 }
0682
0683 void io_uring_service::update_timeout()
0684 {
0685 if (::io_uring_sqe* sqe = get_sqe())
0686 {
0687 ::io_uring_prep_timeout_remove(sqe, reinterpret_cast<__u64>(&timeout_), 0);
0688 ::io_uring_sqe_set_data(sqe, &timer_queues_);
0689 }
0690 }
0691
0692 __kernel_timespec io_uring_service::get_timeout() const
0693 {
0694 __kernel_timespec ts;
0695 long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
0696 ts.tv_sec = usec / 1000000;
0697 ts.tv_nsec = usec ? (usec % 1000000) * 1000 : 1;
0698 return ts;
0699 }
0700
0701 ::io_uring_sqe* io_uring_service::get_sqe()
0702 {
0703 ::io_uring_sqe* sqe = ::io_uring_get_sqe(&ring_);
0704 if (!sqe)
0705 {
0706 submit_sqes();
0707 sqe = ::io_uring_get_sqe(&ring_);
0708 }
0709 if (sqe)
0710 {
0711 ::io_uring_sqe_set_data(sqe, 0);
0712 ++pending_sqes_;
0713 }
0714 return sqe;
0715 }
0716
0717 void io_uring_service::submit_sqes()
0718 {
0719 if (pending_sqes_ != 0)
0720 {
0721 int result = ::io_uring_submit(&ring_);
0722 if (result > 0)
0723 {
0724 pending_sqes_ -= result;
0725 increment(outstanding_work_, result);
0726 }
0727 }
0728 }
0729
0730 void io_uring_service::post_submit_sqes_op(mutex::scoped_lock& lock)
0731 {
0732 if (pending_sqes_ >= submit_batch_size)
0733 {
0734 submit_sqes();
0735 }
0736 else if (pending_sqes_ != 0 && !pending_submit_sqes_op_)
0737 {
0738 pending_submit_sqes_op_ = true;
0739 lock.unlock();
0740 scheduler_.post_immediate_completion(&submit_sqes_op_, false);
0741 }
0742 }
0743
0744 void io_uring_service::push_submit_sqes_op(op_queue<operation>& ops)
0745 {
0746 if (pending_sqes_ != 0 && !pending_submit_sqes_op_)
0747 {
0748 pending_submit_sqes_op_ = true;
0749 ops.push(&submit_sqes_op_);
0750 scheduler_.compensating_work_started();
0751 }
0752 }
0753
0754 io_uring_service::submit_sqes_op::submit_sqes_op(io_uring_service* s)
0755 : operation(&io_uring_service::submit_sqes_op::do_complete),
0756 service_(s)
0757 {
0758 }
0759
0760 void io_uring_service::submit_sqes_op::do_complete(void* owner, operation* base,
0761 const boost::system::error_code& , std::size_t )
0762 {
0763 if (owner)
0764 {
0765 submit_sqes_op* o = static_cast<submit_sqes_op*>(base);
0766 mutex::scoped_lock lock(o->service_->mutex_);
0767 o->service_->submit_sqes();
0768 if (o->service_->pending_sqes_ != 0)
0769 o->service_->scheduler_.post_immediate_completion(o, true);
0770 else
0771 o->service_->pending_submit_sqes_op_ = false;
0772 }
0773 }
0774
0775 io_uring_service::io_queue::io_queue()
0776 : operation(&io_uring_service::io_queue::do_complete)
0777 {
0778 }
0779
0780 struct io_uring_service::perform_io_cleanup_on_block_exit
0781 {
0782 explicit perform_io_cleanup_on_block_exit(io_uring_service* s)
0783 : service_(s), io_object_to_free_(0), first_op_(0)
0784 {
0785 }
0786
0787 ~perform_io_cleanup_on_block_exit()
0788 {
0789 if (io_object_to_free_)
0790 {
0791 mutex::scoped_lock lock(service_->mutex_);
0792 service_->free_io_object(io_object_to_free_);
0793 }
0794
0795 if (first_op_)
0796 {
0797
0798 if (!ops_.empty())
0799 service_->scheduler_.post_deferred_completions(ops_);
0800
0801
0802
0803
0804 }
0805 else
0806 {
0807
0808
0809
0810 service_->scheduler_.compensating_work_started();
0811 }
0812 }
0813
0814 io_uring_service* service_;
0815 io_object* io_object_to_free_;
0816 op_queue<operation> ops_;
0817 operation* first_op_;
0818 };
0819
0820 operation* io_uring_service::io_queue::perform_io(int result)
0821 {
0822 perform_io_cleanup_on_block_exit io_cleanup(io_object_->service_);
0823 mutex::scoped_lock io_object_lock(io_object_->mutex_);
0824
0825 if (result != -ECANCELED || cancel_requested_)
0826 {
0827 if (io_uring_operation* op = op_queue_.front())
0828 {
0829 if (result < 0)
0830 {
0831 op->ec_.assign(-result, boost::asio::error::get_system_category());
0832 op->bytes_transferred_ = 0;
0833 }
0834 else
0835 {
0836 op->ec_.assign(0, op->ec_.category());
0837 op->bytes_transferred_ = static_cast<std::size_t>(result);
0838 }
0839 }
0840
0841 while (io_uring_operation* op = op_queue_.front())
0842 {
0843 if (op->perform(io_cleanup.ops_.empty()))
0844 {
0845 op_queue_.pop();
0846 io_cleanup.ops_.push(op);
0847 }
0848 else
0849 break;
0850 }
0851 }
0852
0853 cancel_requested_ = false;
0854
0855 if (!op_queue_.empty())
0856 {
0857 io_uring_service* service = io_object_->service_;
0858 mutex::scoped_lock lock(service->mutex_);
0859 if (::io_uring_sqe* sqe = service->get_sqe())
0860 {
0861 op_queue_.front()->prepare(sqe);
0862 ::io_uring_sqe_set_data(sqe, this);
0863 service->post_submit_sqes_op(lock);
0864 }
0865 else
0866 {
0867 lock.unlock();
0868 while (io_uring_operation* op = op_queue_.front())
0869 {
0870 op->ec_ = boost::asio::error::no_buffer_space;
0871 op_queue_.pop();
0872 io_cleanup.ops_.push(op);
0873 }
0874 }
0875 }
0876
0877
0878 if (io_object_->shutdown_)
0879 {
0880 io_cleanup.io_object_to_free_ = io_object_;
0881 for (int i = 0; i < max_ops; ++i)
0882 if (!io_object_->queues_[i].op_queue_.empty())
0883 io_cleanup.io_object_to_free_ = 0;
0884 }
0885
0886
0887
0888 io_cleanup.first_op_ = io_cleanup.ops_.front();
0889 io_cleanup.ops_.pop();
0890 return io_cleanup.first_op_;
0891 }
0892
0893 void io_uring_service::io_queue::do_complete(void* owner, operation* base,
0894 const boost::system::error_code& ec, std::size_t bytes_transferred)
0895 {
0896 if (owner)
0897 {
0898 io_queue* io_q = static_cast<io_queue*>(base);
0899 int result = static_cast<int>(bytes_transferred);
0900 if (operation* op = io_q->perform_io(result))
0901 {
0902 op->complete(owner, ec, 0);
0903 }
0904 }
0905 }
0906
0907 io_uring_service::io_object::io_object(bool locking, int spin_count)
0908 : mutex_(locking, spin_count)
0909 {
0910 }
0911
0912 }
0913 }
0914 }
0915
0916 #include <boost/asio/detail/pop_options.hpp>
0917
0918 #endif
0919
0920 #endif