File indexing completed on 2025-04-09 08:28:00
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MYSQL_DETAIL_ENGINE_IMPL_HPP
0009 #define BOOST_MYSQL_DETAIL_ENGINE_IMPL_HPP
0010
0011 #include <boost/mysql/error_code.hpp>
0012
0013 #include <boost/mysql/detail/any_resumable_ref.hpp>
0014 #include <boost/mysql/detail/engine.hpp>
0015 #include <boost/mysql/detail/next_action.hpp>
0016
0017 #include <boost/mysql/impl/internal/coroutine.hpp>
0018
0019 #include <boost/asio/any_io_executor.hpp>
0020 #include <boost/asio/buffer.hpp>
0021 #include <boost/asio/compose.hpp>
0022 #include <boost/asio/post.hpp>
0023 #include <boost/assert.hpp>
0024
0025 #include <cstddef>
0026 #include <utility>
0027
0028 namespace boost {
0029 namespace mysql {
0030 namespace detail {
0031
0032 inline asio::mutable_buffer to_buffer(span<std::uint8_t> buff) noexcept
0033 {
0034 return asio::mutable_buffer(buff.data(), buff.size());
0035 }
0036
0037 template <class EngineStream>
0038 struct run_algo_op
0039 {
0040 int resume_point_{0};
0041 EngineStream& stream_;
0042 any_resumable_ref resumable_;
0043 bool has_done_io_{false};
0044 error_code stored_ec_;
0045
0046 run_algo_op(EngineStream& stream, any_resumable_ref algo) noexcept : stream_(stream), resumable_(algo) {}
0047
0048 template <class Self>
0049 void operator()(Self& self, error_code io_ec = {}, std::size_t bytes_transferred = 0)
0050 {
0051 next_action act;
0052
0053 switch (resume_point_)
0054 {
0055 case 0:
0056
0057 while (true)
0058 {
0059
0060 act = resumable_.resume(io_ec, bytes_transferred);
0061 if (act.is_done())
0062 {
0063 stored_ec_ = act.error();
0064 if (!has_done_io_)
0065 {
0066 BOOST_MYSQL_YIELD(
0067 resume_point_,
0068 1,
0069 asio::post(stream_.get_executor(), std::move(self))
0070 )
0071 }
0072 self.complete(stored_ec_);
0073 return;
0074 }
0075 else if (act.type() == next_action_type::read)
0076 {
0077 BOOST_MYSQL_YIELD(
0078 resume_point_,
0079 2,
0080 stream_.async_read_some(
0081 to_buffer(act.read_args().buffer),
0082 act.read_args().use_ssl,
0083 std::move(self)
0084 )
0085 )
0086 has_done_io_ = true;
0087 }
0088 else if (act.type() == next_action_type::write)
0089 {
0090 BOOST_MYSQL_YIELD(
0091 resume_point_,
0092 3,
0093 stream_.async_write_some(
0094 asio::buffer(act.write_args().buffer),
0095 act.write_args().use_ssl,
0096 std::move(self)
0097 )
0098 )
0099 has_done_io_ = true;
0100 }
0101 else if (act.type() == next_action_type::ssl_handshake)
0102 {
0103 BOOST_MYSQL_YIELD(resume_point_, 4, stream_.async_ssl_handshake(std::move(self)))
0104 has_done_io_ = true;
0105 }
0106 else if (act.type() == next_action_type::ssl_shutdown)
0107 {
0108 BOOST_MYSQL_YIELD(resume_point_, 5, stream_.async_ssl_shutdown(std::move(self)))
0109 has_done_io_ = true;
0110 }
0111 else if (act.type() == next_action_type::connect)
0112 {
0113 BOOST_MYSQL_YIELD(resume_point_, 6, stream_.async_connect(std::move(self)))
0114 has_done_io_ = true;
0115 }
0116 else
0117 {
0118 BOOST_ASSERT(act.type() == next_action_type::close);
0119 stream_.close(io_ec);
0120 }
0121 }
0122 }
0123 }
0124 };
0125
0126
0127
0128
0129
0130
0131
0132
0133
0134
0135
0136
0137
0138
0139
0140
0141
0142
0143
0144 template <class EngineStream>
0145 class engine_impl final : public engine
0146 {
0147 EngineStream stream_;
0148
0149 public:
0150 template <class... Args>
0151 engine_impl(Args&&... args) : stream_(std::forward<Args>(args)...)
0152 {
0153 }
0154
0155 EngineStream& stream() { return stream_; }
0156 const EngineStream& stream() const { return stream_; }
0157
0158 using executor_type = asio::any_io_executor;
0159 executor_type get_executor() override final { return stream_.get_executor(); }
0160
0161 bool supports_ssl() const override final { return stream_.supports_ssl(); }
0162
0163 void set_endpoint(const void* endpoint) override final { stream_.set_endpoint(endpoint); }
0164
0165 void run(any_resumable_ref resumable, error_code& ec) override final
0166 {
0167 ec.clear();
0168 error_code io_ec;
0169 std::size_t bytes_transferred = 0;
0170
0171 while (true)
0172 {
0173
0174 auto act = resumable.resume(io_ec, bytes_transferred);
0175
0176
0177 bytes_transferred = 0;
0178 if (act.is_done())
0179 {
0180 ec = act.error();
0181 return;
0182 }
0183 else if (act.type() == next_action_type::read)
0184 {
0185 bytes_transferred = stream_.read_some(
0186 to_buffer(act.read_args().buffer),
0187 act.read_args().use_ssl,
0188 io_ec
0189 );
0190 }
0191 else if (act.type() == next_action_type::write)
0192 {
0193 bytes_transferred = stream_.write_some(
0194 asio::buffer(act.write_args().buffer),
0195 act.write_args().use_ssl,
0196 io_ec
0197 );
0198 }
0199 else if (act.type() == next_action_type::ssl_handshake)
0200 {
0201 stream_.ssl_handshake(io_ec);
0202 }
0203 else if (act.type() == next_action_type::ssl_shutdown)
0204 {
0205 stream_.ssl_shutdown(io_ec);
0206 }
0207 else if (act.type() == next_action_type::connect)
0208 {
0209 stream_.connect(io_ec);
0210 }
0211 else
0212 {
0213 BOOST_ASSERT(act.type() == next_action_type::close);
0214 stream_.close(io_ec);
0215 }
0216 }
0217 }
0218
0219 void async_run(any_resumable_ref resumable, asio::any_completion_handler<void(error_code)> h)
0220 override final
0221 {
0222 return asio::async_compose<asio::any_completion_handler<void(error_code)>, void(error_code)>(
0223 run_algo_op<EngineStream>(stream_, resumable),
0224 h,
0225 stream_
0226 );
0227 }
0228 };
0229
0230 }
0231 }
0232 }
0233
0234 #endif