Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-12-16 09:42:58

0001 //
0002 // detail/impl/io_uring_service.ipp
0003 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0004 //
0005 // Copyright (c) 2003-2025 Christopher M. Kohlhoff (chris at kohlhoff dot com)
0006 //
0007 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0009 //
0010 
0011 #ifndef BOOST_ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP
0012 #define BOOST_ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP
0013 
0014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0015 # pragma once
0016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
0017 
0018 #include <boost/asio/detail/config.hpp>
0019 
0020 #if defined(BOOST_ASIO_HAS_IO_URING)
0021 
0022 #include <cstddef>
0023 #include <sys/eventfd.h>
0024 #include <boost/asio/detail/io_uring_service.hpp>
0025 #include <boost/asio/detail/reactor_op.hpp>
0026 #include <boost/asio/detail/scheduler.hpp>
0027 #include <boost/asio/detail/throw_error.hpp>
0028 #include <boost/asio/error.hpp>
0029 
0030 #include <boost/asio/detail/push_options.hpp>
0031 
0032 namespace boost {
0033 namespace asio {
0034 namespace detail {
0035 
0036 io_uring_service::io_uring_service(boost::asio::execution_context& ctx)
0037   : execution_context_service_base<io_uring_service>(ctx),
0038     scheduler_(use_service<scheduler>(ctx)),
0039     mutex_(config(ctx).get("reactor", "registration_locking", true),
0040         config(ctx).get("reactor", "registration_locking_spin_count", 0)),
0041     outstanding_work_(0),
0042     submit_sqes_op_(this),
0043     pending_sqes_(0),
0044     pending_submit_sqes_op_(false),
0045     shutdown_(false),
0046     io_locking_(config(ctx).get("reactor", "io_locking", true)),
0047     io_locking_spin_count_(
0048         config(ctx).get("reactor", "io_locking_spin_count", 0)),
0049     timeout_(),
0050     registration_mutex_(mutex_.enabled()),
0051     registered_io_objects_(
0052         config(ctx).get("reactor", "preallocated_io_objects", 0U),
0053         io_locking_, io_locking_spin_count_),
0054     reactor_(use_service<reactor>(ctx)),
0055     reactor_data_(),
0056     event_fd_(-1)
0057 {
0058   reactor_.init_task();
0059   init_ring();
0060   register_with_reactor();
0061 }
0062 
0063 io_uring_service::~io_uring_service()
0064 {
0065   if (ring_.ring_fd != -1)
0066     ::io_uring_queue_exit(&ring_);
0067   if (event_fd_ != -1)
0068     ::close(event_fd_);
0069 }
0070 
0071 void io_uring_service::shutdown()
0072 {
0073   mutex::scoped_lock lock(mutex_);
0074   shutdown_ = true;
0075   lock.unlock();
0076 
0077   op_queue<operation> ops;
0078 
0079   // Cancel all outstanding operations.
0080   while (io_object* io_obj = registered_io_objects_.first())
0081   {
0082     for (int i = 0; i < max_ops; ++i)
0083     {
0084       if (!io_obj->queues_[i].op_queue_.empty())
0085       {
0086         ops.push(io_obj->queues_[i].op_queue_);
0087         if (::io_uring_sqe* sqe = get_sqe())
0088           ::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0);
0089       }
0090     }
0091     io_obj->shutdown_ = true;
0092     registered_io_objects_.free(io_obj);
0093   }
0094 
0095   // Cancel the timeout operation.
0096   if (::io_uring_sqe* sqe = get_sqe())
0097     ::io_uring_prep_cancel(sqe, &timeout_, IOSQE_IO_DRAIN);
0098   submit_sqes();
0099 
0100   // Wait for all completions to come back.
0101   for (; outstanding_work_ > 0; --outstanding_work_)
0102   {
0103     ::io_uring_cqe* cqe = 0;
0104     if (::io_uring_wait_cqe(&ring_, &cqe) != 0)
0105       break;
0106   }
0107 
0108   timer_queues_.get_all_timers(ops);
0109 
0110   scheduler_.abandon_operations(ops);
0111 }
0112 
0113 void io_uring_service::notify_fork(
0114     boost::asio::execution_context::fork_event fork_ev)
0115 {
0116   switch (fork_ev)
0117   {
0118   case boost::asio::execution_context::fork_prepare:
0119     {
0120       // Cancel all outstanding operations. They will be restarted
0121       // after the fork completes.
0122       mutex::scoped_lock registration_lock(registration_mutex_);
0123       for (io_object* io_obj = registered_io_objects_.first();
0124           io_obj != 0; io_obj = io_obj->next_)
0125       {
0126         mutex::scoped_lock io_object_lock(io_obj->mutex_);
0127         for (int i = 0; i < max_ops; ++i)
0128         {
0129           if (!io_obj->queues_[i].op_queue_.empty()
0130               && !io_obj->queues_[i].cancel_requested_)
0131           {
0132             mutex::scoped_lock lock(mutex_);
0133             if (::io_uring_sqe* sqe = get_sqe())
0134               ::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0);
0135           }
0136         }
0137       }
0138 
0139       // Cancel the timeout operation.
0140       {
0141         mutex::scoped_lock lock(mutex_);
0142         if (::io_uring_sqe* sqe = get_sqe())
0143           ::io_uring_prep_cancel(sqe, &timeout_, IOSQE_IO_DRAIN);
0144         submit_sqes();
0145       }
0146 
0147       // Wait for all completions to come back, and post all completed I/O
0148       // queues to the scheduler. Note that some operations may have already
0149       // completed, or were explicitly cancelled. All others will be
0150       // automatically restarted.
0151       op_queue<operation> ops;
0152       for (; outstanding_work_ > 0; --outstanding_work_)
0153       {
0154         ::io_uring_cqe* cqe = 0;
0155         if (::io_uring_wait_cqe(&ring_, &cqe) != 0)
0156           break;
0157         if (void* ptr = ::io_uring_cqe_get_data(cqe))
0158         {
0159           if (ptr != this && ptr != &timer_queues_ && ptr != &timeout_)
0160           {
0161             io_queue* io_q = static_cast<io_queue*>(ptr);
0162             io_q->set_result(cqe->res);
0163             ops.push(io_q);
0164           }
0165         }
0166       }
0167       scheduler_.post_deferred_completions(ops);
0168 
0169       // Restart and eventfd operation.
0170       register_with_reactor();
0171     }
0172     break;
0173 
0174   case boost::asio::execution_context::fork_parent:
0175     // Restart the timeout and eventfd operations.
0176     update_timeout();
0177     register_with_reactor();
0178     break;
0179 
0180   case boost::asio::execution_context::fork_child:
0181     {
0182       // The child process gets a new io_uring instance.
0183       ::io_uring_queue_exit(&ring_);
0184       init_ring();
0185       register_with_reactor();
0186     }
0187     break;
0188   default:
0189     break;
0190   }
0191 }
0192 
0193 void io_uring_service::init_task()
0194 {
0195   scheduler_.init_task();
0196 }
0197 
0198 void io_uring_service::register_io_object(
0199     io_uring_service::per_io_object_data& io_obj)
0200 {
0201   io_obj = allocate_io_object();
0202 
0203   mutex::scoped_lock io_object_lock(io_obj->mutex_);
0204 
0205   io_obj->service_ = this;
0206   io_obj->shutdown_ = false;
0207   for (int i = 0; i < max_ops; ++i)
0208   {
0209     io_obj->queues_[i].io_object_ = io_obj;
0210     io_obj->queues_[i].cancel_requested_ = false;
0211   }
0212 }
0213 
0214 void io_uring_service::register_internal_io_object(
0215     io_uring_service::per_io_object_data& io_obj,
0216     int op_type, io_uring_operation* op)
0217 {
0218   io_obj = allocate_io_object();
0219 
0220   mutex::scoped_lock io_object_lock(io_obj->mutex_);
0221 
0222   io_obj->service_ = this;
0223   io_obj->shutdown_ = false;
0224   for (int i = 0; i < max_ops; ++i)
0225   {
0226     io_obj->queues_[i].io_object_ = io_obj;
0227     io_obj->queues_[i].cancel_requested_ = false;
0228   }
0229 
0230   io_obj->queues_[op_type].op_queue_.push(op);
0231   io_object_lock.unlock();
0232   mutex::scoped_lock lock(mutex_);
0233   if (::io_uring_sqe* sqe = get_sqe())
0234   {
0235     op->prepare(sqe);
0236     ::io_uring_sqe_set_data(sqe, &io_obj->queues_[op_type]);
0237     post_submit_sqes_op(lock);
0238   }
0239   else
0240   {
0241     boost::system::error_code ec(ENOBUFS,
0242         boost::asio::error::get_system_category());
0243     boost::asio::detail::throw_error(ec, "io_uring_get_sqe");
0244   }
0245 }
0246 
0247 void io_uring_service::register_buffers(const ::iovec* v, unsigned n)
0248 {
0249   int result = ::io_uring_register_buffers(&ring_, v, n);
0250   if (result < 0)
0251   {
0252     boost::system::error_code ec(-result,
0253         boost::asio::error::get_system_category());
0254     boost::asio::detail::throw_error(ec, "io_uring_register_buffers");
0255   }
0256 }
0257 
0258 void io_uring_service::unregister_buffers()
0259 {
0260   (void)::io_uring_unregister_buffers(&ring_);
0261 }
0262 
0263 void io_uring_service::start_op(int op_type,
0264     io_uring_service::per_io_object_data& io_obj,
0265     io_uring_operation* op, bool is_continuation)
0266 {
0267   if (!io_obj)
0268   {
0269     op->ec_ = boost::asio::error::bad_descriptor;
0270     post_immediate_completion(op, is_continuation);
0271     return;
0272   }
0273 
0274   mutex::scoped_lock io_object_lock(io_obj->mutex_);
0275 
0276   if (io_obj->shutdown_)
0277   {
0278     io_object_lock.unlock();
0279     post_immediate_completion(op, is_continuation);
0280     return;
0281   }
0282 
0283   if (io_obj->queues_[op_type].op_queue_.empty())
0284   {
0285     if (op->perform(false))
0286     {
0287       io_object_lock.unlock();
0288       scheduler_.post_immediate_completion(op, is_continuation);
0289     }
0290     else
0291     {
0292       io_obj->queues_[op_type].op_queue_.push(op);
0293       io_object_lock.unlock();
0294       mutex::scoped_lock lock(mutex_);
0295       if (::io_uring_sqe* sqe = get_sqe())
0296       {
0297         op->prepare(sqe);
0298         ::io_uring_sqe_set_data(sqe, &io_obj->queues_[op_type]);
0299         scheduler_.work_started();
0300         post_submit_sqes_op(lock);
0301       }
0302       else
0303       {
0304         lock.unlock();
0305         io_obj->queues_[op_type].set_result(-ENOBUFS);
0306         post_immediate_completion(&io_obj->queues_[op_type], is_continuation);
0307       }
0308     }
0309   }
0310   else
0311   {
0312     io_obj->queues_[op_type].op_queue_.push(op);
0313     scheduler_.work_started();
0314   }
0315 }
0316 
0317 void io_uring_service::cancel_ops(io_uring_service::per_io_object_data& io_obj)
0318 {
0319   if (!io_obj)
0320     return;
0321 
0322   mutex::scoped_lock io_object_lock(io_obj->mutex_);
0323   op_queue<operation> ops;
0324   do_cancel_ops(io_obj, ops);
0325   io_object_lock.unlock();
0326   scheduler_.post_deferred_completions(ops);
0327 }
0328 
0329 void io_uring_service::cancel_ops_by_key(
0330     io_uring_service::per_io_object_data& io_obj,
0331     int op_type, void* cancellation_key)
0332 {
0333   if (!io_obj)
0334     return;
0335 
0336   mutex::scoped_lock io_object_lock(io_obj->mutex_);
0337 
0338   bool first = true;
0339   op_queue<operation> ops;
0340   op_queue<io_uring_operation> other_ops;
0341   while (io_uring_operation* op = io_obj->queues_[op_type].op_queue_.front())
0342   {
0343     io_obj->queues_[op_type].op_queue_.pop();
0344     if (op->cancellation_key_ == cancellation_key)
0345     {
0346       if (first)
0347       {
0348         other_ops.push(op);
0349         if (!io_obj->queues_[op_type].cancel_requested_)
0350         {
0351           io_obj->queues_[op_type].cancel_requested_ = true;
0352           mutex::scoped_lock lock(mutex_);
0353           if (::io_uring_sqe* sqe = get_sqe())
0354           {
0355             ::io_uring_prep_cancel(sqe, &io_obj->queues_[op_type], 0);
0356             submit_sqes();
0357           }
0358         }
0359       }
0360       else
0361       {
0362         op->ec_ = boost::asio::error::operation_aborted;
0363         ops.push(op);
0364       }
0365     }
0366     else
0367       other_ops.push(op);
0368     first = false;
0369   }
0370   io_obj->queues_[op_type].op_queue_.push(other_ops);
0371 
0372   io_object_lock.unlock();
0373 
0374   scheduler_.post_deferred_completions(ops);
0375 }
0376 
0377 void io_uring_service::deregister_io_object(
0378     io_uring_service::per_io_object_data& io_obj)
0379 {
0380   if (!io_obj)
0381     return;
0382 
0383   mutex::scoped_lock io_object_lock(io_obj->mutex_);
0384   if (!io_obj->shutdown_)
0385   {
0386     op_queue<operation> ops;
0387     bool pending_cancelled_ops = do_cancel_ops(io_obj, ops);
0388     io_obj->shutdown_ = true;
0389     io_object_lock.unlock();
0390     scheduler_.post_deferred_completions(ops);
0391     if (pending_cancelled_ops)
0392     {
0393       // There are still pending operations. Prevent cleanup_io_object from
0394       // freeing the I/O object and let the last operation to complete free it.
0395       io_obj = 0;
0396     }
0397     else
0398     {
0399       // Leave io_obj set so that it will be freed by the subsequent call to
0400       // cleanup_io_object.
0401     }
0402   }
0403   else
0404   {
0405     // We are shutting down, so prevent cleanup_io_object from freeing
0406     // the I/O object and let the destructor free it instead.
0407     io_obj = 0;
0408   }
0409 }
0410 
0411 void io_uring_service::cleanup_io_object(
0412     io_uring_service::per_io_object_data& io_obj)
0413 {
0414   if (io_obj)
0415   {
0416     free_io_object(io_obj);
0417     io_obj = 0;
0418   }
0419 }
0420 
0421 void io_uring_service::run(long usec, op_queue<operation>& ops)
0422 {
0423   __kernel_timespec ts;
0424   int local_ops = 0;
0425 
0426   if (usec > 0)
0427   {
0428     ts.tv_sec = usec / 1000000;
0429     ts.tv_nsec = (usec % 1000000) * 1000;
0430     mutex::scoped_lock lock(mutex_);
0431     if (::io_uring_sqe* sqe = get_sqe())
0432     {
0433       ++local_ops;
0434       ::io_uring_prep_timeout(sqe, &ts, 0, 0);
0435       ::io_uring_sqe_set_data(sqe, &ts);
0436       submit_sqes();
0437     }
0438   }
0439 
0440   ::io_uring_cqe* cqe = 0;
0441   int result = (usec == 0)
0442     ? ::io_uring_peek_cqe(&ring_, &cqe)
0443     : ::io_uring_wait_cqe(&ring_, &cqe);
0444 
0445   if (local_ops > 0)
0446   {
0447     if (result != 0 || ::io_uring_cqe_get_data(cqe) != &ts)
0448     {
0449       mutex::scoped_lock lock(mutex_);
0450       if (::io_uring_sqe* sqe = get_sqe())
0451       {
0452         ++local_ops;
0453         ::io_uring_prep_timeout_remove(sqe, reinterpret_cast<__u64>(&ts), 0);
0454         ::io_uring_sqe_set_data(sqe, &ts);
0455         submit_sqes();
0456       }
0457     }
0458   }
0459 
0460   bool check_timers = false;
0461   int count = 0;
0462   while (result == 0 || local_ops > 0)
0463   {
0464     if (result == 0)
0465     {
0466       if (void* ptr = ::io_uring_cqe_get_data(cqe))
0467       {
0468         if (ptr == this)
0469         {
0470           // The io_uring service was interrupted.
0471         }
0472         else if (ptr == &timer_queues_)
0473         {
0474           check_timers = true;
0475         }
0476         else if (ptr == &timeout_)
0477         {
0478           check_timers = true;
0479           timeout_.tv_sec = 0;
0480           timeout_.tv_nsec = 0;
0481         }
0482         else if (ptr == &ts)
0483         {
0484           --local_ops;
0485         }
0486         else
0487         {
0488           io_queue* io_q = static_cast<io_queue*>(ptr);
0489           io_q->set_result(cqe->res);
0490           ops.push(io_q);
0491         }
0492       }
0493       ::io_uring_cqe_seen(&ring_, cqe);
0494       ++count;
0495     }
0496     result = (count < complete_batch_size || local_ops > 0)
0497       ? ::io_uring_peek_cqe(&ring_, &cqe) : -EAGAIN;
0498   }
0499 
0500   decrement(outstanding_work_, count);
0501 
0502   if (check_timers)
0503   {
0504     mutex::scoped_lock lock(mutex_);
0505     timer_queues_.get_ready_timers(ops);
0506     if (timeout_.tv_sec == 0 && timeout_.tv_nsec == 0)
0507     {
0508       timeout_ = get_timeout();
0509       if (::io_uring_sqe* sqe = get_sqe())
0510       {
0511         ::io_uring_prep_timeout(sqe, &timeout_, 0, 0);
0512         ::io_uring_sqe_set_data(sqe, &timeout_);
0513         push_submit_sqes_op(ops);
0514       }
0515     }
0516   }
0517 }
0518 
0519 void io_uring_service::interrupt()
0520 {
0521   mutex::scoped_lock lock(mutex_);
0522   if (::io_uring_sqe* sqe = get_sqe())
0523   {
0524     ::io_uring_prep_nop(sqe);
0525     ::io_uring_sqe_set_data(sqe, this);
0526   }
0527   submit_sqes();
0528 }
0529 
0530 void io_uring_service::init_ring()
0531 {
0532   int result = ::io_uring_queue_init(ring_size, &ring_, 0);
0533   if (result < 0)
0534   {
0535     ring_.ring_fd = -1;
0536     boost::system::error_code ec(-result,
0537         boost::asio::error::get_system_category());
0538     boost::asio::detail::throw_error(ec, "io_uring_queue_init");
0539   }
0540 
0541 #if !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
0542   event_fd_ = ::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
0543   if (event_fd_ < 0)
0544   {
0545     boost::system::error_code ec(-result,
0546         boost::asio::error::get_system_category());
0547     ::io_uring_queue_exit(&ring_);
0548     boost::asio::detail::throw_error(ec, "eventfd");
0549   }
0550 
0551   result = ::io_uring_register_eventfd(&ring_, event_fd_);
0552   if (result < 0)
0553   {
0554     ::close(event_fd_);
0555     ::io_uring_queue_exit(&ring_);
0556     boost::system::error_code ec(-result,
0557         boost::asio::error::get_system_category());
0558     boost::asio::detail::throw_error(ec, "io_uring_queue_init");
0559   }
0560 #endif // !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
0561 }
0562 
0563 #if !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
0564 class io_uring_service::event_fd_read_op :
0565   public reactor_op
0566 {
0567 public:
0568   event_fd_read_op(io_uring_service* s)
0569     : reactor_op(boost::system::error_code(),
0570         &event_fd_read_op::do_perform, event_fd_read_op::do_complete),
0571       service_(s)
0572   {
0573   }
0574 
0575   static status do_perform(reactor_op* base)
0576   {
0577     event_fd_read_op* o(static_cast<event_fd_read_op*>(base));
0578 
0579     for (;;)
0580     {
0581       // Only perform one read. The kernel maintains an atomic counter.
0582       uint64_t counter(0);
0583       errno = 0;
0584       int bytes_read = ::read(o->service_->event_fd_,
0585           &counter, sizeof(uint64_t));
0586       if (bytes_read < 0 && errno == EINTR)
0587         continue;
0588       break;
0589     }
0590 
0591     op_queue<operation> ops;
0592     o->service_->run(0, ops);
0593     o->service_->scheduler_.post_deferred_completions(ops);
0594 
0595     return not_done;
0596   }
0597 
0598   static void do_complete(void* /*owner*/, operation* base,
0599       const boost::system::error_code& /*ec*/,
0600       std::size_t /*bytes_transferred*/)
0601   {
0602     event_fd_read_op* o(static_cast<event_fd_read_op*>(base));
0603     delete o;
0604   }
0605 
0606 private:
0607   io_uring_service* service_;
0608 };
0609 #endif // !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
0610 
0611 void io_uring_service::register_with_reactor()
0612 {
0613 #if !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
0614   reactor_.register_internal_descriptor(reactor::read_op,
0615       event_fd_, reactor_data_, new event_fd_read_op(this));
0616 #endif // !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
0617 }
0618 
0619 io_uring_service::io_object* io_uring_service::allocate_io_object()
0620 {
0621   mutex::scoped_lock registration_lock(registration_mutex_);
0622   return registered_io_objects_.alloc(io_locking_, io_locking_spin_count_);
0623 }
0624 
0625 void io_uring_service::free_io_object(io_uring_service::io_object* io_obj)
0626 {
0627   mutex::scoped_lock registration_lock(registration_mutex_);
0628   registered_io_objects_.free(io_obj);
0629 }
0630 
0631 bool io_uring_service::do_cancel_ops(
0632     per_io_object_data& io_obj, op_queue<operation>& ops)
0633 {
0634   bool cancel_op = false;
0635 
0636   for (int i = 0; i < max_ops; ++i)
0637   {
0638     if (io_uring_operation* first_op = io_obj->queues_[i].op_queue_.front())
0639     {
0640       cancel_op = true;
0641       io_obj->queues_[i].op_queue_.pop();
0642       while (io_uring_operation* op = io_obj->queues_[i].op_queue_.front())
0643       {
0644         op->ec_ = boost::asio::error::operation_aborted;
0645         io_obj->queues_[i].op_queue_.pop();
0646         ops.push(op);
0647       }
0648       io_obj->queues_[i].op_queue_.push(first_op);
0649     }
0650   }
0651 
0652   if (cancel_op)
0653   {
0654     mutex::scoped_lock lock(mutex_);
0655     for (int i = 0; i < max_ops; ++i)
0656     {
0657       if (!io_obj->queues_[i].op_queue_.empty()
0658           && !io_obj->queues_[i].cancel_requested_)
0659       {
0660         io_obj->queues_[i].cancel_requested_ = true;
0661         if (::io_uring_sqe* sqe = get_sqe())
0662           ::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0);
0663       }
0664     }
0665     submit_sqes();
0666   }
0667 
0668   return cancel_op;
0669 }
0670 
0671 void io_uring_service::do_add_timer_queue(timer_queue_base& queue)
0672 {
0673   mutex::scoped_lock lock(mutex_);
0674   timer_queues_.insert(&queue);
0675 }
0676 
0677 void io_uring_service::do_remove_timer_queue(timer_queue_base& queue)
0678 {
0679   mutex::scoped_lock lock(mutex_);
0680   timer_queues_.erase(&queue);
0681 }
0682 
0683 void io_uring_service::update_timeout()
0684 {
0685   if (::io_uring_sqe* sqe = get_sqe())
0686   {
0687     ::io_uring_prep_timeout_remove(sqe, reinterpret_cast<__u64>(&timeout_), 0);
0688     ::io_uring_sqe_set_data(sqe, &timer_queues_);
0689   }
0690 }
0691 
0692 __kernel_timespec io_uring_service::get_timeout() const
0693 {
0694   __kernel_timespec ts;
0695   long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
0696   ts.tv_sec = usec / 1000000;
0697   ts.tv_nsec = usec ? (usec % 1000000) * 1000 : 1;
0698   return ts;
0699 }
0700 
0701 ::io_uring_sqe* io_uring_service::get_sqe()
0702 {
0703   ::io_uring_sqe* sqe = ::io_uring_get_sqe(&ring_);
0704   if (!sqe)
0705   {
0706     submit_sqes();
0707     sqe = ::io_uring_get_sqe(&ring_);
0708   }
0709   if (sqe)
0710   {
0711     ::io_uring_sqe_set_data(sqe, 0);
0712     ++pending_sqes_;
0713   }
0714   return sqe;
0715 }
0716 
0717 void io_uring_service::submit_sqes()
0718 {
0719   if (pending_sqes_ != 0)
0720   {
0721     int result = ::io_uring_submit(&ring_);
0722     if (result > 0)
0723     {
0724       pending_sqes_ -= result;
0725       increment(outstanding_work_, result);
0726     }
0727   }
0728 }
0729 
0730 void io_uring_service::post_submit_sqes_op(mutex::scoped_lock& lock)
0731 {
0732   if (pending_sqes_ >= submit_batch_size)
0733   {
0734     submit_sqes();
0735   }
0736   else if (pending_sqes_ != 0 && !pending_submit_sqes_op_)
0737   {
0738     pending_submit_sqes_op_ = true;
0739     lock.unlock();
0740     scheduler_.post_immediate_completion(&submit_sqes_op_, false);
0741   }
0742 }
0743 
0744 void io_uring_service::push_submit_sqes_op(op_queue<operation>& ops)
0745 {
0746   if (pending_sqes_ != 0 && !pending_submit_sqes_op_)
0747   {
0748     pending_submit_sqes_op_ = true;
0749     ops.push(&submit_sqes_op_);
0750     scheduler_.compensating_work_started();
0751   }
0752 }
0753 
0754 io_uring_service::submit_sqes_op::submit_sqes_op(io_uring_service* s)
0755   : operation(&io_uring_service::submit_sqes_op::do_complete),
0756     service_(s)
0757 {
0758 }
0759 
0760 void io_uring_service::submit_sqes_op::do_complete(void* owner, operation* base,
0761     const boost::system::error_code& /*ec*/, std::size_t /*bytes_transferred*/)
0762 {
0763   if (owner)
0764   {
0765     submit_sqes_op* o = static_cast<submit_sqes_op*>(base);
0766     mutex::scoped_lock lock(o->service_->mutex_);
0767     o->service_->submit_sqes();
0768     if (o->service_->pending_sqes_ != 0)
0769       o->service_->scheduler_.post_immediate_completion(o, true);
0770     else
0771       o->service_->pending_submit_sqes_op_ = false;
0772   }
0773 }
0774 
0775 io_uring_service::io_queue::io_queue()
0776   : operation(&io_uring_service::io_queue::do_complete)
0777 {
0778 }
0779 
0780 struct io_uring_service::perform_io_cleanup_on_block_exit
0781 {
0782   explicit perform_io_cleanup_on_block_exit(io_uring_service* s)
0783     : service_(s), io_object_to_free_(0), first_op_(0)
0784   {
0785   }
0786 
0787   ~perform_io_cleanup_on_block_exit()
0788   {
0789     if (io_object_to_free_)
0790     {
0791       mutex::scoped_lock lock(service_->mutex_);
0792       service_->free_io_object(io_object_to_free_);
0793     }
0794 
0795     if (first_op_)
0796     {
0797       // Post the remaining completed operations for invocation.
0798       if (!ops_.empty())
0799         service_->scheduler_.post_deferred_completions(ops_);
0800 
0801       // A user-initiated operation has completed, but there's no need to
0802       // explicitly call work_finished() here. Instead, we'll take advantage of
0803       // the fact that the scheduler will call work_finished() once we return.
0804     }
0805     else
0806     {
0807       // No user-initiated operations have completed, so we need to compensate
0808       // for the work_finished() call that the scheduler will make once this
0809       // operation returns.
0810       service_->scheduler_.compensating_work_started();
0811     }
0812   }
0813 
0814   io_uring_service* service_;
0815   io_object* io_object_to_free_;
0816   op_queue<operation> ops_;
0817   operation* first_op_;
0818 };
0819 
0820 operation* io_uring_service::io_queue::perform_io(int result)
0821 {
0822   perform_io_cleanup_on_block_exit io_cleanup(io_object_->service_);
0823   mutex::scoped_lock io_object_lock(io_object_->mutex_);
0824 
0825   if (result != -ECANCELED || cancel_requested_)
0826   {
0827     if (io_uring_operation* op = op_queue_.front())
0828     {
0829       if (result < 0)
0830       {
0831         op->ec_.assign(-result, boost::asio::error::get_system_category());
0832         op->bytes_transferred_ = 0;
0833       }
0834       else
0835       {
0836         op->ec_.assign(0, op->ec_.category());
0837         op->bytes_transferred_ = static_cast<std::size_t>(result);
0838       }
0839     }
0840 
0841     while (io_uring_operation* op = op_queue_.front())
0842     {
0843       if (op->perform(io_cleanup.ops_.empty()))
0844       {
0845         op_queue_.pop();
0846         io_cleanup.ops_.push(op);
0847       }
0848       else
0849         break;
0850     }
0851   }
0852 
0853   cancel_requested_ = false;
0854 
0855   if (!op_queue_.empty())
0856   {
0857     io_uring_service* service = io_object_->service_;
0858     mutex::scoped_lock lock(service->mutex_);
0859     if (::io_uring_sqe* sqe = service->get_sqe())
0860     {
0861       op_queue_.front()->prepare(sqe);
0862       ::io_uring_sqe_set_data(sqe, this);
0863       service->post_submit_sqes_op(lock);
0864     }
0865     else
0866     {
0867       lock.unlock();
0868       while (io_uring_operation* op = op_queue_.front())
0869       {
0870         op->ec_ = boost::asio::error::no_buffer_space;
0871         op_queue_.pop();
0872         io_cleanup.ops_.push(op);
0873       }
0874     }
0875   }
0876 
0877   // The last operation to complete on a shut down object must free it.
0878   if (io_object_->shutdown_)
0879   {
0880     io_cleanup.io_object_to_free_ = io_object_;
0881     for (int i = 0; i < max_ops; ++i)
0882       if (!io_object_->queues_[i].op_queue_.empty())
0883         io_cleanup.io_object_to_free_ = 0;
0884   }
0885 
0886   // The first operation will be returned for completion now. The others will
0887   // be posted for later by the io_cleanup object's destructor.
0888   io_cleanup.first_op_ = io_cleanup.ops_.front();
0889   io_cleanup.ops_.pop();
0890   return io_cleanup.first_op_;
0891 }
0892 
0893 void io_uring_service::io_queue::do_complete(void* owner, operation* base,
0894     const boost::system::error_code& ec, std::size_t bytes_transferred)
0895 {
0896   if (owner)
0897   {
0898     io_queue* io_q = static_cast<io_queue*>(base);
0899     int result = static_cast<int>(bytes_transferred);
0900     if (operation* op = io_q->perform_io(result))
0901     {
0902       op->complete(owner, ec, 0);
0903     }
0904   }
0905 }
0906 
0907 io_uring_service::io_object::io_object(bool locking, int spin_count)
0908   : mutex_(locking, spin_count)
0909 {
0910 }
0911 
0912 } // namespace detail
0913 } // namespace asio
0914 } // namespace boost
0915 
0916 #include <boost/asio/detail/pop_options.hpp>
0917 
0918 #endif // defined(BOOST_ASIO_HAS_IO_URING)
0919 
0920 #endif // BOOST_ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP