Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-04-09 08:28:00

0001 //
0002 // Copyright (c) 2019-2024 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MYSQL_DETAIL_ENGINE_IMPL_HPP
0009 #define BOOST_MYSQL_DETAIL_ENGINE_IMPL_HPP
0010 
0011 #include <boost/mysql/error_code.hpp>
0012 
0013 #include <boost/mysql/detail/any_resumable_ref.hpp>
0014 #include <boost/mysql/detail/engine.hpp>
0015 #include <boost/mysql/detail/next_action.hpp>
0016 
0017 #include <boost/mysql/impl/internal/coroutine.hpp>
0018 
0019 #include <boost/asio/any_io_executor.hpp>
0020 #include <boost/asio/buffer.hpp>
0021 #include <boost/asio/compose.hpp>
0022 #include <boost/asio/post.hpp>
0023 #include <boost/assert.hpp>
0024 
0025 #include <cstddef>
0026 #include <utility>
0027 
0028 namespace boost {
0029 namespace mysql {
0030 namespace detail {
0031 
0032 inline asio::mutable_buffer to_buffer(span<std::uint8_t> buff) noexcept
0033 {
0034     return asio::mutable_buffer(buff.data(), buff.size());
0035 }
0036 
0037 template <class EngineStream>
0038 struct run_algo_op
0039 {
0040     int resume_point_{0};
0041     EngineStream& stream_;
0042     any_resumable_ref resumable_;
0043     bool has_done_io_{false};
0044     error_code stored_ec_;
0045 
0046     run_algo_op(EngineStream& stream, any_resumable_ref algo) noexcept : stream_(stream), resumable_(algo) {}
0047 
0048     template <class Self>
0049     void operator()(Self& self, error_code io_ec = {}, std::size_t bytes_transferred = 0)
0050     {
0051         next_action act;
0052 
0053         switch (resume_point_)
0054         {
0055         case 0:
0056 
0057             while (true)
0058             {
0059                 // Run the op
0060                 act = resumable_.resume(io_ec, bytes_transferred);
0061                 if (act.is_done())
0062                 {
0063                     stored_ec_ = act.error();
0064                     if (!has_done_io_)
0065                     {
0066                         BOOST_MYSQL_YIELD(
0067                             resume_point_,
0068                             1,
0069                             asio::post(stream_.get_executor(), std::move(self))
0070                         )
0071                     }
0072                     self.complete(stored_ec_);
0073                     return;
0074                 }
0075                 else if (act.type() == next_action_type::read)
0076                 {
0077                     BOOST_MYSQL_YIELD(
0078                         resume_point_,
0079                         2,
0080                         stream_.async_read_some(
0081                             to_buffer(act.read_args().buffer),
0082                             act.read_args().use_ssl,
0083                             std::move(self)
0084                         )
0085                     )
0086                     has_done_io_ = true;
0087                 }
0088                 else if (act.type() == next_action_type::write)
0089                 {
0090                     BOOST_MYSQL_YIELD(
0091                         resume_point_,
0092                         3,
0093                         stream_.async_write_some(
0094                             asio::buffer(act.write_args().buffer),
0095                             act.write_args().use_ssl,
0096                             std::move(self)
0097                         )
0098                     )
0099                     has_done_io_ = true;
0100                 }
0101                 else if (act.type() == next_action_type::ssl_handshake)
0102                 {
0103                     BOOST_MYSQL_YIELD(resume_point_, 4, stream_.async_ssl_handshake(std::move(self)))
0104                     has_done_io_ = true;
0105                 }
0106                 else if (act.type() == next_action_type::ssl_shutdown)
0107                 {
0108                     BOOST_MYSQL_YIELD(resume_point_, 5, stream_.async_ssl_shutdown(std::move(self)))
0109                     has_done_io_ = true;
0110                 }
0111                 else if (act.type() == next_action_type::connect)
0112                 {
0113                     BOOST_MYSQL_YIELD(resume_point_, 6, stream_.async_connect(std::move(self)))
0114                     has_done_io_ = true;
0115                 }
0116                 else
0117                 {
0118                     BOOST_ASSERT(act.type() == next_action_type::close);
0119                     stream_.close(io_ec);
0120                 }
0121             }
0122         }
0123     }
0124 };
0125 
0126 // EngineStream is an "extended" stream concept, with the following operations:
0127 //    using executor_type = asio::any_io_executor;
0128 //    executor_type get_executor();
0129 //    bool supports_ssl() const;
0130 //    void set_endpoint(const void* endpoint);
0131 //    std::size_t read_some(asio::mutable_buffer, bool use_ssl, error_code&);
0132 //    void async_read_some(asio::mutable_buffer, bool use_ssl, CompletinToken&&);
0133 //    std::size_t write_some(asio::const_buffer, bool use_ssl, error_code&);
0134 //    void async_write_some(asio::const_buffer, bool use_ssl, CompletinToken&&);
0135 //    void ssl_handshake(error_code&);
0136 //    void async_ssl_handshake(CompletionToken&&);
0137 //    void ssl_shutdown(error_code&);
0138 //    void async_ssl_shutdown(CompletionToken&&);
0139 //    void connect(error_code&);
0140 //    void async_connect(CompletionToken&&);
0141 //    void close(error_code&);
0142 // Async operations are only required to support callback types
0143 // See stream_adaptor for an implementation
0144 template <class EngineStream>
0145 class engine_impl final : public engine
0146 {
0147     EngineStream stream_;
0148 
0149 public:
0150     template <class... Args>
0151     engine_impl(Args&&... args) : stream_(std::forward<Args>(args)...)
0152     {
0153     }
0154 
0155     EngineStream& stream() { return stream_; }
0156     const EngineStream& stream() const { return stream_; }
0157 
0158     using executor_type = asio::any_io_executor;
0159     executor_type get_executor() override final { return stream_.get_executor(); }
0160 
0161     bool supports_ssl() const override final { return stream_.supports_ssl(); }
0162 
0163     void set_endpoint(const void* endpoint) override final { stream_.set_endpoint(endpoint); }
0164 
0165     void run(any_resumable_ref resumable, error_code& ec) override final
0166     {
0167         ec.clear();
0168         error_code io_ec;
0169         std::size_t bytes_transferred = 0;
0170 
0171         while (true)
0172         {
0173             // Run the op
0174             auto act = resumable.resume(io_ec, bytes_transferred);
0175 
0176             // Apply the next action
0177             bytes_transferred = 0;
0178             if (act.is_done())
0179             {
0180                 ec = act.error();
0181                 return;
0182             }
0183             else if (act.type() == next_action_type::read)
0184             {
0185                 bytes_transferred = stream_.read_some(
0186                     to_buffer(act.read_args().buffer),
0187                     act.read_args().use_ssl,
0188                     io_ec
0189                 );
0190             }
0191             else if (act.type() == next_action_type::write)
0192             {
0193                 bytes_transferred = stream_.write_some(
0194                     asio::buffer(act.write_args().buffer),
0195                     act.write_args().use_ssl,
0196                     io_ec
0197                 );
0198             }
0199             else if (act.type() == next_action_type::ssl_handshake)
0200             {
0201                 stream_.ssl_handshake(io_ec);
0202             }
0203             else if (act.type() == next_action_type::ssl_shutdown)
0204             {
0205                 stream_.ssl_shutdown(io_ec);
0206             }
0207             else if (act.type() == next_action_type::connect)
0208             {
0209                 stream_.connect(io_ec);
0210             }
0211             else
0212             {
0213                 BOOST_ASSERT(act.type() == next_action_type::close);
0214                 stream_.close(io_ec);
0215             }
0216         }
0217     }
0218 
0219     void async_run(any_resumable_ref resumable, asio::any_completion_handler<void(error_code)> h)
0220         override final
0221     {
0222         return asio::async_compose<asio::any_completion_handler<void(error_code)>, void(error_code)>(
0223             run_algo_op<EngineStream>(stream_, resumable),
0224             h,
0225             stream_
0226         );
0227     }
0228 };
0229 
0230 }  // namespace detail
0231 }  // namespace mysql
0232 }  // namespace boost
0233 
0234 #endif