Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-17 08:39:14

0001 //
0002 // Copyright (c) 2019-2025 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MYSQL_DETAIL_ENGINE_IMPL_HPP
0009 #define BOOST_MYSQL_DETAIL_ENGINE_IMPL_HPP
0010 
0011 #include <boost/mysql/error_code.hpp>
0012 
0013 #include <boost/mysql/detail/any_resumable_ref.hpp>
0014 #include <boost/mysql/detail/engine.hpp>
0015 #include <boost/mysql/detail/next_action.hpp>
0016 
0017 #include <boost/mysql/impl/internal/coroutine.hpp>
0018 
0019 #include <boost/asio/any_io_executor.hpp>
0020 #include <boost/asio/buffer.hpp>
0021 #include <boost/asio/cancellation_type.hpp>
0022 #include <boost/asio/compose.hpp>
0023 #include <boost/asio/error.hpp>
0024 #include <boost/asio/immediate.hpp>
0025 #include <boost/asio/post.hpp>
0026 #include <boost/assert.hpp>
0027 
0028 #include <cstddef>
0029 #include <utility>
0030 
0031 namespace boost {
0032 namespace mysql {
0033 namespace detail {
0034 
0035 inline asio::mutable_buffer to_buffer(span<std::uint8_t> buff) noexcept
0036 {
0037     return asio::mutable_buffer(buff.data(), buff.size());
0038 }
0039 
0040 inline bool has_terminal_cancellation(asio::cancellation_type_t cancel_type)
0041 {
0042     return static_cast<bool>(cancel_type & asio::cancellation_type_t::terminal);
0043 }
0044 
0045 template <class EngineStream>
0046 struct run_algo_op
0047 {
0048     int resume_point_{0};
0049     EngineStream& stream_;
0050     any_resumable_ref resumable_;
0051     bool has_done_io_{false};
0052     error_code stored_ec_;
0053 
0054     run_algo_op(EngineStream& stream, any_resumable_ref algo) noexcept : stream_(stream), resumable_(algo) {}
0055 
0056     template <class Self>
0057     void operator()(Self& self, error_code io_ec = {}, std::size_t bytes_transferred = 0)
0058     {
0059         next_action act;
0060 
0061         switch (resume_point_)
0062         {
0063         case 0:
0064 
0065             while (true)
0066             {
0067                 // If we were cancelled, but the last operation completed successfully,
0068                 // set a cancelled error code so the algorithm exits. This might happen
0069                 // if a cancellation signal is emitted after an intermediate operation succeeded
0070                 // but before the handler was called.
0071                 if (!io_ec && has_terminal_cancellation(self.cancelled()))
0072                     io_ec = asio::error::operation_aborted;
0073 
0074                 // Run the op
0075                 act = resumable_.resume(io_ec, bytes_transferred);
0076                 if (act.is_done())
0077                 {
0078                     stored_ec_ = act.error();
0079                     if (!has_done_io_)
0080                     {
0081                         BOOST_MYSQL_YIELD(
0082                             resume_point_,
0083                             1,
0084                             asio::async_immediate(stream_.get_executor(), std::move(self))
0085                         )
0086                     }
0087                     self.complete(stored_ec_);
0088                     return;
0089                 }
0090                 else if (act.type() == next_action_type::read)
0091                 {
0092                     BOOST_MYSQL_YIELD(
0093                         resume_point_,
0094                         2,
0095                         stream_.async_read_some(
0096                             to_buffer(act.read_args().buffer),
0097                             act.read_args().use_ssl,
0098                             std::move(self)
0099                         )
0100                     )
0101                     has_done_io_ = true;
0102                 }
0103                 else if (act.type() == next_action_type::write)
0104                 {
0105                     BOOST_MYSQL_YIELD(
0106                         resume_point_,
0107                         3,
0108                         stream_.async_write_some(
0109                             asio::buffer(act.write_args().buffer),
0110                             act.write_args().use_ssl,
0111                             std::move(self)
0112                         )
0113                     )
0114                     has_done_io_ = true;
0115                 }
0116                 else if (act.type() == next_action_type::ssl_handshake)
0117                 {
0118                     BOOST_MYSQL_YIELD(resume_point_, 4, stream_.async_ssl_handshake(std::move(self)))
0119                     has_done_io_ = true;
0120                 }
0121                 else if (act.type() == next_action_type::ssl_shutdown)
0122                 {
0123                     BOOST_MYSQL_YIELD(resume_point_, 5, stream_.async_ssl_shutdown(std::move(self)))
0124                     has_done_io_ = true;
0125                 }
0126                 else if (act.type() == next_action_type::connect)
0127                 {
0128                     BOOST_MYSQL_YIELD(
0129                         resume_point_,
0130                         6,
0131                         stream_.async_connect(act.connect_endpoint(), std::move(self))
0132                     )
0133                     has_done_io_ = true;
0134                 }
0135                 else
0136                 {
0137                     BOOST_ASSERT(act.type() == next_action_type::close);
0138                     stream_.close(io_ec);
0139                 }
0140             }
0141         }
0142     }
0143 };
0144 
0145 // EngineStream is an "extended" stream concept, with the following operations:
0146 //    using executor_type = asio::any_io_executor;
0147 //    executor_type get_executor();
0148 //    bool supports_ssl() const;
0149 //    std::size_t read_some(asio::mutable_buffer, bool use_ssl, error_code&);
0150 //    void async_read_some(asio::mutable_buffer, bool use_ssl, CompletinToken&&);
0151 //    std::size_t write_some(asio::const_buffer, bool use_ssl, error_code&);
0152 //    void async_write_some(asio::const_buffer, bool use_ssl, CompletinToken&&);
0153 //    void ssl_handshake(error_code&);
0154 //    void async_ssl_handshake(CompletionToken&&);
0155 //    void ssl_shutdown(error_code&);
0156 //    void async_ssl_shutdown(CompletionToken&&);
0157 //    void connect(const void* server_address, error_code&);
0158 //    void async_connect(const void* server_address, CompletionToken&&);
0159 //    void close(error_code&);
0160 // Async operations are only required to support callback types
0161 // See stream_adaptor for an implementation
0162 template <class EngineStream>
0163 class engine_impl final : public engine
0164 {
0165     EngineStream stream_;
0166 
0167 public:
0168     template <class... Args>
0169     engine_impl(Args&&... args) : stream_(std::forward<Args>(args)...)
0170     {
0171     }
0172 
0173     EngineStream& stream() { return stream_; }
0174     const EngineStream& stream() const { return stream_; }
0175 
0176     using executor_type = asio::any_io_executor;
0177     executor_type get_executor() override final { return stream_.get_executor(); }
0178 
0179     bool supports_ssl() const override final { return stream_.supports_ssl(); }
0180 
0181     void run(any_resumable_ref resumable, error_code& ec) override final
0182     {
0183         ec.clear();
0184         error_code io_ec;
0185         std::size_t bytes_transferred = 0;
0186 
0187         while (true)
0188         {
0189             // Run the op
0190             auto act = resumable.resume(io_ec, bytes_transferred);
0191 
0192             // Apply the next action
0193             bytes_transferred = 0;
0194             if (act.is_done())
0195             {
0196                 ec = act.error();
0197                 return;
0198             }
0199             else if (act.type() == next_action_type::read)
0200             {
0201                 bytes_transferred = stream_.read_some(
0202                     to_buffer(act.read_args().buffer),
0203                     act.read_args().use_ssl,
0204                     io_ec
0205                 );
0206             }
0207             else if (act.type() == next_action_type::write)
0208             {
0209                 bytes_transferred = stream_.write_some(
0210                     asio::buffer(act.write_args().buffer),
0211                     act.write_args().use_ssl,
0212                     io_ec
0213                 );
0214             }
0215             else if (act.type() == next_action_type::ssl_handshake)
0216             {
0217                 stream_.ssl_handshake(io_ec);
0218             }
0219             else if (act.type() == next_action_type::ssl_shutdown)
0220             {
0221                 stream_.ssl_shutdown(io_ec);
0222             }
0223             else if (act.type() == next_action_type::connect)
0224             {
0225                 stream_.connect(act.connect_endpoint(), io_ec);
0226             }
0227             else
0228             {
0229                 BOOST_ASSERT(act.type() == next_action_type::close);
0230                 stream_.close(io_ec);
0231             }
0232         }
0233     }
0234 
0235     void async_run(any_resumable_ref resumable, asio::any_completion_handler<void(error_code)> h)
0236         override final
0237     {
0238         return asio::async_compose<asio::any_completion_handler<void(error_code)>, void(error_code)>(
0239             run_algo_op<EngineStream>(stream_, resumable),
0240             h,
0241             stream_
0242         );
0243     }
0244 };
0245 
0246 }  // namespace detail
0247 }  // namespace mysql
0248 }  // namespace boost
0249 
0250 #endif