Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-14 08:47:51

0001 /* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
0002  *
0003  * Distributed under the Boost Software License, Version 1.0. (See
0004  * accompanying file LICENSE.txt)
0005  */
0006 
0007 #ifndef BOOST_REDIS_RUNNER_HPP
0008 #define BOOST_REDIS_RUNNER_HPP
0009 
0010 #include <boost/redis/adapter/any_adapter.hpp>
0011 #include <boost/redis/config.hpp>
0012 #include <boost/redis/request.hpp>
0013 #include <boost/redis/response.hpp>
0014 #include <boost/redis/detail/helper.hpp>
0015 #include <boost/redis/error.hpp>
0016 #include <boost/redis/logger.hpp>
0017 #include <boost/redis/operation.hpp>
0018 #include <boost/asio/compose.hpp>
0019 #include <boost/asio/coroutine.hpp>
0020 #include <boost/asio/experimental/parallel_group.hpp>
0021 #include <boost/asio/ip/tcp.hpp>
0022 #include <boost/asio/steady_timer.hpp>
0023 #include <boost/asio/prepend.hpp>
0024 #include <boost/asio/ssl.hpp>
0025 #include <boost/asio/cancel_after.hpp>
0026 #include <string>
0027 #include <memory>
0028 #include <chrono>
0029 
0030 namespace boost::redis::detail
0031 {
0032 
0033 void push_hello(config const& cfg, request& req);
0034 
0035 // TODO: Can we avoid this whole function whose only purpose is to
0036 // check for an error in the hello response and complete with an error
0037 // so that the parallel group that starts it can exit?
0038 template <class Runner, class Connection, class Logger>
0039 struct hello_op {
0040    Runner* runner_ = nullptr;
0041    Connection* conn_ = nullptr;
0042    Logger logger_;
0043    asio::coroutine coro_{};
0044 
0045    template <class Self>
0046    void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
0047    {
0048       BOOST_ASIO_CORO_REENTER (coro_)
0049       {
0050          runner_->add_hello();
0051 
0052          BOOST_ASIO_CORO_YIELD
0053          conn_->async_exec(runner_->hello_req_, any_adapter(runner_->hello_resp_), std::move(self));
0054          logger_.on_hello(ec, runner_->hello_resp_);
0055 
0056          if (ec) {
0057             conn_->cancel(operation::run);
0058             self.complete(ec);
0059             return;
0060          }
0061 
0062          if (runner_->has_error_in_response()) {
0063             conn_->cancel(operation::run);
0064             self.complete(error::resp3_hello);
0065             return;
0066          }
0067 
0068          self.complete({});
0069       }
0070    }
0071 };
0072 
0073 template <class Runner, class Connection, class Logger>
0074 class runner_op {
0075 private:
0076    Runner* runner_ = nullptr;
0077    Connection* conn_ = nullptr;
0078    Logger logger_;
0079    asio::coroutine coro_{};
0080 
0081    using order_t = std::array<std::size_t, 5>;
0082 
0083 public:
0084    runner_op(Runner* runner, Connection* conn, Logger l)
0085    : runner_{runner}
0086    , conn_{conn}
0087    , logger_{l}
0088    {}
0089 
0090    template <class Self>
0091    void operator()( Self& self
0092                   , order_t order = {}
0093                   , system::error_code ec0 = {}
0094                   , system::error_code ec1 = {}
0095                   , system::error_code ec2 = {}
0096                   , system::error_code ec3 = {}
0097                   , system::error_code ec4 = {})
0098    {
0099       BOOST_ASIO_CORO_REENTER (coro_) for (;;)
0100       {
0101          BOOST_ASIO_CORO_YIELD
0102          conn_->resv_.async_resolve(asio::prepend(std::move(self), order_t {}));
0103 
0104          logger_.on_resolve(ec0, conn_->resv_.results());
0105 
0106          if (ec0) {
0107             self.complete(ec0);
0108             return;
0109          }
0110 
0111          BOOST_ASIO_CORO_YIELD
0112          conn_->ctor_.async_connect(
0113             conn_->next_layer().next_layer(),
0114             conn_->resv_.results(),
0115             asio::prepend(std::move(self), order_t {}));
0116 
0117          logger_.on_connect(ec0, conn_->ctor_.endpoint());
0118 
0119          if (ec0) {
0120             self.complete(ec0);
0121             return;
0122          }
0123 
0124          if (conn_->use_ssl()) {
0125             BOOST_ASIO_CORO_YIELD
0126             conn_->next_layer().async_handshake(
0127                asio::ssl::stream_base::client,
0128                asio::prepend(
0129                   asio::cancel_after(
0130                      runner_->cfg_.ssl_handshake_timeout,
0131                      std::move(self)
0132                   ),
0133                   order_t {}
0134                )
0135             );
0136 
0137             logger_.on_ssl_handshake(ec0);
0138 
0139             if (ec0) {
0140                self.complete(ec0);
0141                return;
0142             }
0143          }
0144 
0145          conn_->reset();
0146 
0147          // Note: Order is important here because the writer might
0148          // trigger an async_write before the async_hello thereby
0149          // causing an authentication problem.
0150          BOOST_ASIO_CORO_YIELD
0151          asio::experimental::make_parallel_group(
0152             [this](auto token) { return runner_->async_hello(*conn_, logger_, token); },
0153             [this](auto token) { return conn_->health_checker_.async_ping(*conn_, logger_, token); },
0154             [this](auto token) { return conn_->health_checker_.async_check_timeout(*conn_, logger_, token);},
0155             [this](auto token) { return conn_->reader(logger_, token);},
0156             [this](auto token) { return conn_->writer(logger_, token);}
0157          ).async_wait(
0158             asio::experimental::wait_for_one_error(),
0159             std::move(self));
0160 
0161          if (order[0] == 0 && !!ec0) {
0162             self.complete(ec0);
0163             return;
0164          }
0165 
0166          if (order[0] == 2 && ec2 == error::pong_timeout) {
0167             self.complete(ec1);
0168             return;
0169          }
0170 
0171          // The receive operation must be cancelled because channel
0172          // subscription does not survive a reconnection but requires
0173          // re-subscription.
0174          conn_->cancel(operation::receive);
0175 
0176          if (!conn_->will_reconnect()) {
0177             conn_->cancel(operation::reconnection);
0178             self.complete(ec3);
0179             return;
0180          }
0181 
0182          // It is safe to use the writer timer here because we are not
0183          // connected.
0184          conn_->writer_timer_.expires_after(conn_->cfg_.reconnect_wait_interval);
0185 
0186          BOOST_ASIO_CORO_YIELD
0187          conn_->writer_timer_.async_wait(asio::prepend(std::move(self), order_t {}));
0188          if (ec0) {
0189             self.complete(ec0);
0190             return;
0191          }
0192 
0193          if (!conn_->will_reconnect()) {
0194             self.complete(asio::error::operation_aborted);
0195             return;
0196          }
0197 
0198          conn_->reset_stream();
0199       }
0200    }
0201 };
0202 
0203 template <class Executor>
0204 class runner {
0205 public:
0206    runner(Executor ex, config cfg)
0207    : cfg_{cfg}
0208    { }
0209 
0210    void set_config(config const& cfg)
0211    {
0212       cfg_ = cfg;
0213    }
0214 
0215    template <class Connection, class Logger, class CompletionToken>
0216    auto async_run(Connection& conn, Logger l, CompletionToken token)
0217    {
0218       return asio::async_compose
0219          < CompletionToken
0220          , void(system::error_code)
0221          >(runner_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
0222    }
0223 
0224 private:
0225 
0226    template <class, class, class> friend class runner_op;
0227    template <class, class, class> friend struct hello_op;
0228 
0229    template <class Connection, class Logger, class CompletionToken>
0230    auto async_hello(Connection& conn, Logger l, CompletionToken token)
0231    {
0232       return asio::async_compose
0233          < CompletionToken
0234          , void(system::error_code)
0235          >(hello_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
0236    }
0237 
0238    void add_hello()
0239    {
0240       hello_req_.clear();
0241       if (hello_resp_.has_value())
0242          hello_resp_.value().clear();
0243       push_hello(cfg_, hello_req_);
0244    }
0245 
0246    bool has_error_in_response() const noexcept
0247    {
0248       if (!hello_resp_.has_value())
0249          return true;
0250 
0251       auto f = [](auto const& e)
0252       {
0253          switch (e.data_type) {
0254             case resp3::type::simple_error:
0255             case resp3::type::blob_error: return true;
0256             default: return false;
0257          }
0258       };
0259 
0260       return std::any_of(std::cbegin(hello_resp_.value()), std::cend(hello_resp_.value()), f);
0261    }
0262 
0263    request hello_req_;
0264    generic_response hello_resp_;
0265    config cfg_;
0266 };
0267 
0268 } // boost::redis::detail
0269 
0270 #endif // BOOST_REDIS_RUNNER_HPP