File indexing completed on 2025-01-18 09:28:40
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #ifndef BOOST_ASIO_DETAIL_IO_URING_SOCKET_RECV_OP_HPP
0012 #define BOOST_ASIO_DETAIL_IO_URING_SOCKET_RECV_OP_HPP
0013
0014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0015 # pragma once
0016 #endif
0017
0018 #include <boost/asio/detail/config.hpp>
0019
0020 #if defined(BOOST_ASIO_HAS_IO_URING)
0021
0022 #include <boost/asio/detail/bind_handler.hpp>
0023 #include <boost/asio/detail/buffer_sequence_adapter.hpp>
0024 #include <boost/asio/detail/socket_ops.hpp>
0025 #include <boost/asio/detail/fenced_block.hpp>
0026 #include <boost/asio/detail/handler_work.hpp>
0027 #include <boost/asio/detail/io_uring_operation.hpp>
0028 #include <boost/asio/detail/memory.hpp>
0029
0030 #include <boost/asio/detail/push_options.hpp>
0031
0032 namespace boost {
0033 namespace asio {
0034 namespace detail {
0035
0036 template <typename MutableBufferSequence>
0037 class io_uring_socket_recv_op_base : public io_uring_operation
0038 {
0039 public:
0040 io_uring_socket_recv_op_base(const boost::system::error_code& success_ec,
0041 socket_type socket, socket_ops::state_type state,
0042 const MutableBufferSequence& buffers,
0043 socket_base::message_flags flags, func_type complete_func)
0044 : io_uring_operation(success_ec,
0045 &io_uring_socket_recv_op_base::do_prepare,
0046 &io_uring_socket_recv_op_base::do_perform, complete_func),
0047 socket_(socket),
0048 state_(state),
0049 buffers_(buffers),
0050 flags_(flags),
0051 bufs_(buffers),
0052 msghdr_()
0053 {
0054 msghdr_.msg_iov = bufs_.buffers();
0055 msghdr_.msg_iovlen = static_cast<int>(bufs_.count());
0056 }
0057
0058 static void do_prepare(io_uring_operation* base, ::io_uring_sqe* sqe)
0059 {
0060 BOOST_ASIO_ASSUME(base != 0);
0061 io_uring_socket_recv_op_base* o(
0062 static_cast<io_uring_socket_recv_op_base*>(base));
0063
0064 if ((o->state_ & socket_ops::internal_non_blocking) != 0)
0065 {
0066 bool except_op = (o->flags_ & socket_base::message_out_of_band) != 0;
0067 ::io_uring_prep_poll_add(sqe, o->socket_, except_op ? POLLPRI : POLLIN);
0068 }
0069 else if (o->bufs_.is_single_buffer
0070 && o->bufs_.is_registered_buffer && o->flags_ == 0)
0071 {
0072 ::io_uring_prep_read_fixed(sqe, o->socket_,
0073 o->bufs_.buffers()->iov_base, o->bufs_.buffers()->iov_len,
0074 0, o->bufs_.registered_id().native_handle());
0075 }
0076 else
0077 {
0078 ::io_uring_prep_recvmsg(sqe, o->socket_, &o->msghdr_, o->flags_);
0079 }
0080 }
0081
0082 static bool do_perform(io_uring_operation* base, bool after_completion)
0083 {
0084 BOOST_ASIO_ASSUME(base != 0);
0085 io_uring_socket_recv_op_base* o(
0086 static_cast<io_uring_socket_recv_op_base*>(base));
0087
0088 if ((o->state_ & socket_ops::internal_non_blocking) != 0)
0089 {
0090 bool except_op = (o->flags_ & socket_base::message_out_of_band) != 0;
0091 if (after_completion || !except_op)
0092 {
0093 if (o->bufs_.is_single_buffer)
0094 {
0095 return socket_ops::non_blocking_recv1(o->socket_,
0096 o->bufs_.first(o->buffers_).data(),
0097 o->bufs_.first(o->buffers_).size(), o->flags_,
0098 (o->state_ & socket_ops::stream_oriented) != 0,
0099 o->ec_, o->bytes_transferred_);
0100 }
0101 else
0102 {
0103 return socket_ops::non_blocking_recv(o->socket_,
0104 o->bufs_.buffers(), o->bufs_.count(), o->flags_,
0105 (o->state_ & socket_ops::stream_oriented) != 0,
0106 o->ec_, o->bytes_transferred_);
0107 }
0108 }
0109 }
0110 else if (after_completion)
0111 {
0112 if (!o->ec_ && o->bytes_transferred_ == 0)
0113 if ((o->state_ & socket_ops::stream_oriented) != 0)
0114 o->ec_ = boost::asio::error::eof;
0115 }
0116
0117 if (o->ec_ && o->ec_ == boost::asio::error::would_block)
0118 {
0119 o->state_ |= socket_ops::internal_non_blocking;
0120 return false;
0121 }
0122
0123 return after_completion;
0124 }
0125
0126 private:
0127 socket_type socket_;
0128 socket_ops::state_type state_;
0129 MutableBufferSequence buffers_;
0130 socket_base::message_flags flags_;
0131 buffer_sequence_adapter<boost::asio::mutable_buffer,
0132 MutableBufferSequence> bufs_;
0133 msghdr msghdr_;
0134 };
0135
0136 template <typename MutableBufferSequence, typename Handler, typename IoExecutor>
0137 class io_uring_socket_recv_op
0138 : public io_uring_socket_recv_op_base<MutableBufferSequence>
0139 {
0140 public:
0141 BOOST_ASIO_DEFINE_HANDLER_PTR(io_uring_socket_recv_op);
0142
0143 io_uring_socket_recv_op(const boost::system::error_code& success_ec,
0144 int socket, socket_ops::state_type state,
0145 const MutableBufferSequence& buffers, socket_base::message_flags flags,
0146 Handler& handler, const IoExecutor& io_ex)
0147 : io_uring_socket_recv_op_base<MutableBufferSequence>(success_ec,
0148 socket, state, buffers, flags, &io_uring_socket_recv_op::do_complete),
0149 handler_(static_cast<Handler&&>(handler)),
0150 work_(handler_, io_ex)
0151 {
0152 }
0153
0154 static void do_complete(void* owner, operation* base,
0155 const boost::system::error_code& ,
0156 std::size_t )
0157 {
0158
0159 BOOST_ASIO_ASSUME(base != 0);
0160 io_uring_socket_recv_op* o
0161 (static_cast<io_uring_socket_recv_op*>(base));
0162 ptr p = { boost::asio::detail::addressof(o->handler_), o, o };
0163
0164 BOOST_ASIO_HANDLER_COMPLETION((*o));
0165
0166
0167 handler_work<Handler, IoExecutor> w(
0168 static_cast<handler_work<Handler, IoExecutor>&&>(
0169 o->work_));
0170
0171 BOOST_ASIO_ERROR_LOCATION(o->ec_);
0172
0173
0174
0175
0176
0177
0178
0179 detail::binder2<Handler, boost::system::error_code, std::size_t>
0180 handler(o->handler_, o->ec_, o->bytes_transferred_);
0181 p.h = boost::asio::detail::addressof(handler.handler_);
0182 p.reset();
0183
0184
0185 if (owner)
0186 {
0187 fenced_block b(fenced_block::half);
0188 BOOST_ASIO_HANDLER_INVOCATION_BEGIN((handler.arg1_, handler.arg2_));
0189 w.complete(handler, handler.handler_);
0190 BOOST_ASIO_HANDLER_INVOCATION_END;
0191 }
0192 }
0193
0194 private:
0195 Handler handler_;
0196 handler_work<Handler, IoExecutor> work_;
0197 };
0198
0199 }
0200 }
0201 }
0202
0203 #include <boost/asio/detail/pop_options.hpp>
0204
0205 #endif
0206
0207 #endif