Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-07-15 08:49:04

0001 /* Copyright (c) 2018-2023 Marcelo Zimbres Silva (mzimbres@gmail.com)
0002  *
0003  * Distributed under the Boost Software License, Version 1.0. (See
0004  * accompanying file LICENSE.txt)
0005  */
0006 
0007 #ifndef BOOST_REDIS_RUNNER_HPP
0008 #define BOOST_REDIS_RUNNER_HPP
0009 
0010 #include <boost/redis/detail/health_checker.hpp>
0011 #include <boost/redis/config.hpp>
0012 #include <boost/redis/response.hpp>
0013 #include <boost/redis/detail/helper.hpp>
0014 #include <boost/redis/error.hpp>
0015 #include <boost/redis/logger.hpp>
0016 #include <boost/redis/operation.hpp>
0017 #include <boost/redis/detail/connector.hpp>
0018 #include <boost/redis/detail/resolver.hpp>
0019 #include <boost/redis/detail/handshaker.hpp>
0020 #include <boost/asio/compose.hpp>
0021 #include <boost/asio/connect.hpp>
0022 #include <boost/asio/coroutine.hpp>
0023 #include <boost/asio/experimental/parallel_group.hpp>
0024 #include <boost/asio/ip/tcp.hpp>
0025 #include <boost/asio/steady_timer.hpp>
0026 #include <string>
0027 #include <memory>
0028 #include <chrono>
0029 
0030 namespace boost::redis::detail
0031 {
0032 
0033 void push_hello(config const& cfg, request& req);
0034 
0035 template <class Runner, class Connection, class Logger>
0036 struct hello_op {
0037    Runner* runner_ = nullptr;
0038    Connection* conn_ = nullptr;
0039    Logger logger_;
0040    asio::coroutine coro_{};
0041 
0042    template <class Self>
0043    void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
0044    {
0045       BOOST_ASIO_CORO_REENTER (coro_)
0046       {
0047          runner_->add_hello();
0048 
0049          BOOST_ASIO_CORO_YIELD
0050          conn_->async_exec(runner_->hello_req_, runner_->hello_resp_, std::move(self));
0051          logger_.on_hello(ec, runner_->hello_resp_);
0052 
0053          if (ec || runner_->has_error_in_response() || is_cancelled(self)) {
0054             logger_.trace("hello-op: error/canceled. Exiting ...");
0055             conn_->cancel(operation::run);
0056             self.complete(!!ec ? ec : asio::error::operation_aborted);
0057             return;
0058          }
0059 
0060          self.complete({});
0061       }
0062    }
0063 };
0064 
0065 template <class Runner, class Connection, class Logger>
0066 class runner_op {
0067 private:
0068    Runner* runner_ = nullptr;
0069    Connection* conn_ = nullptr;
0070    Logger logger_;
0071    asio::coroutine coro_{};
0072 
0073 public:
0074    runner_op(Runner* runner, Connection* conn, Logger l)
0075    : runner_{runner}
0076    , conn_{conn}
0077    , logger_{l}
0078    {}
0079 
0080    template <class Self>
0081    void operator()( Self& self
0082                   , std::array<std::size_t, 3> order = {}
0083                   , system::error_code ec0 = {}
0084                   , system::error_code ec1 = {}
0085                   , system::error_code ec2 = {}
0086                   , std::size_t = 0)
0087    {
0088       BOOST_ASIO_CORO_REENTER (coro_)
0089       {
0090          BOOST_ASIO_CORO_YIELD
0091          asio::experimental::make_parallel_group(
0092             [this](auto token) { return runner_->async_run_all(*conn_, logger_, token); },
0093             [this](auto token) { return runner_->health_checker_.async_check_health(*conn_, logger_, token); },
0094             [this](auto token) { return runner_->async_hello(*conn_, logger_, token); }
0095          ).async_wait(
0096             asio::experimental::wait_for_all(),
0097             std::move(self));
0098 
0099          logger_.on_runner(ec0, ec1, ec2);
0100 
0101          if (is_cancelled(self)) {
0102             self.complete(asio::error::operation_aborted);
0103             return;
0104          }
0105 
0106          if (ec0 == error::connect_timeout || ec0 == error::resolve_timeout) {
0107             self.complete(ec0);
0108             return;
0109          }
0110 
0111          if (order[0] == 2 && !!ec2) {
0112             self.complete(ec2);
0113             return;
0114          }
0115 
0116          if (order[0] == 1 && ec1 == error::pong_timeout) {
0117             self.complete(ec1);
0118             return;
0119          }
0120 
0121          self.complete(ec0);
0122       }
0123    }
0124 };
0125 
0126 template <class Runner, class Connection, class Logger>
0127 struct run_all_op {
0128    Runner* runner_ = nullptr;
0129    Connection* conn_ = nullptr;
0130    Logger logger_;
0131    asio::coroutine coro_{};
0132 
0133    template <class Self>
0134    void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
0135    {
0136       BOOST_ASIO_CORO_REENTER (coro_)
0137       {
0138          BOOST_ASIO_CORO_YIELD
0139          runner_->resv_.async_resolve(std::move(self));
0140          logger_.on_resolve(ec, runner_->resv_.results());
0141          BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
0142 
0143          BOOST_ASIO_CORO_YIELD
0144          runner_->ctor_.async_connect(conn_->next_layer().next_layer(), runner_->resv_.results(), std::move(self));
0145          logger_.on_connect(ec, runner_->ctor_.endpoint());
0146          BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
0147 
0148          if (conn_->use_ssl()) {
0149             BOOST_ASIO_CORO_YIELD
0150             runner_->hsher_.async_handshake(conn_->next_layer(), std::move(self));
0151             logger_.on_ssl_handshake(ec);
0152             BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
0153          }
0154 
0155          BOOST_ASIO_CORO_YIELD
0156          conn_->async_run_lean(runner_->cfg_, logger_, std::move(self));
0157          BOOST_REDIS_CHECK_OP0(;)
0158          self.complete(ec);
0159       }
0160    }
0161 };
0162 
0163 template <class Executor>
0164 class runner {
0165 public:
0166    runner(Executor ex, config cfg)
0167    : resv_{ex}
0168    , ctor_{ex}
0169    , hsher_{ex}
0170    , health_checker_{ex}
0171    , cfg_{cfg}
0172    { }
0173 
0174    std::size_t cancel(operation op)
0175    {
0176       resv_.cancel(op);
0177       ctor_.cancel(op);
0178       hsher_.cancel(op);
0179       health_checker_.cancel(op);
0180       return 0U;
0181    }
0182 
0183    void set_config(config const& cfg)
0184    {
0185       cfg_ = cfg;
0186       resv_.set_config(cfg);
0187       ctor_.set_config(cfg);
0188       hsher_.set_config(cfg);
0189       health_checker_.set_config(cfg);
0190    }
0191 
0192    template <class Connection, class Logger, class CompletionToken>
0193    auto async_run(Connection& conn, Logger l, CompletionToken token)
0194    {
0195       return asio::async_compose
0196          < CompletionToken
0197          , void(system::error_code)
0198          >(runner_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
0199    }
0200 
0201    config const& get_config() const noexcept {return cfg_;}
0202 
0203 private:
0204    using resolver_type = resolver<Executor>;
0205    using connector_type = connector<Executor>;
0206    using handshaker_type = detail::handshaker<Executor>;
0207    using health_checker_type = health_checker<Executor>;
0208    using timer_type = typename connector_type::timer_type;
0209 
0210    template <class, class, class> friend struct run_all_op;
0211    template <class, class, class> friend class runner_op;
0212    template <class, class, class> friend struct hello_op;
0213 
0214    template <class Connection, class Logger, class CompletionToken>
0215    auto async_run_all(Connection& conn, Logger l, CompletionToken token)
0216    {
0217       return asio::async_compose
0218          < CompletionToken
0219          , void(system::error_code)
0220          >(run_all_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
0221    }
0222 
0223    template <class Connection, class Logger, class CompletionToken>
0224    auto async_hello(Connection& conn, Logger l, CompletionToken token)
0225    {
0226       return asio::async_compose
0227          < CompletionToken
0228          , void(system::error_code)
0229          >(hello_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
0230    }
0231 
0232    void add_hello()
0233    {
0234       hello_req_.clear();
0235       if (hello_resp_.has_value())
0236          hello_resp_.value().clear();
0237       push_hello(cfg_, hello_req_);
0238    }
0239 
0240    bool has_error_in_response() const noexcept
0241    {
0242       if (!hello_resp_.has_value())
0243          return true;
0244 
0245       auto f = [](auto const& e)
0246       {
0247          switch (e.data_type) {
0248             case resp3::type::simple_error:
0249             case resp3::type::blob_error: return true;
0250             default: return false;
0251          }
0252       };
0253 
0254       return std::any_of(std::cbegin(hello_resp_.value()), std::cend(hello_resp_.value()), f);
0255    }
0256 
0257    resolver_type resv_;
0258    connector_type ctor_;
0259    handshaker_type hsher_;
0260    health_checker_type health_checker_;
0261    request hello_req_;
0262    generic_response hello_resp_;
0263    config cfg_;
0264 };
0265 
0266 } // boost::redis::detail
0267 
0268 #endif // BOOST_REDIS_RUNNER_HPP