Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:51:19

0001 /* Copyright (c) 2018-2023 Marcelo Zimbres Silva (mzimbres@gmail.com)
0002  *
0003  * Distributed under the Boost Software License, Version 1.0. (See
0004  * accompanying file LICENSE.txt)
0005  */
0006 
0007 #ifndef BOOST_REDIS_RUNNER_HPP
0008 #define BOOST_REDIS_RUNNER_HPP
0009 
0010 #include <boost/redis/detail/health_checker.hpp>
0011 #include <boost/redis/config.hpp>
0012 #include <boost/redis/response.hpp>
0013 #include <boost/redis/detail/helper.hpp>
0014 #include <boost/redis/error.hpp>
0015 #include <boost/redis/logger.hpp>
0016 #include <boost/redis/operation.hpp>
0017 #include <boost/redis/detail/connector.hpp>
0018 #include <boost/redis/detail/resolver.hpp>
0019 #include <boost/redis/detail/handshaker.hpp>
0020 #include <boost/asio/compose.hpp>
0021 #include <boost/asio/connect.hpp>
0022 #include <boost/asio/coroutine.hpp>
0023 #include <boost/asio/experimental/parallel_group.hpp>
0024 #include <boost/asio/ip/tcp.hpp>
0025 #include <boost/asio/steady_timer.hpp>
0026 #include <string>
0027 #include <memory>
0028 #include <chrono>
0029 
0030 namespace boost::redis::detail
0031 {
0032 
0033 template <class Runner, class Connection, class Logger>
0034 struct hello_op {
0035    Runner* runner_ = nullptr;
0036    Connection* conn_ = nullptr;
0037    Logger logger_;
0038    asio::coroutine coro_{};
0039 
0040    template <class Self>
0041    void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
0042    {
0043       BOOST_ASIO_CORO_REENTER (coro_)
0044       {
0045          runner_->hello_req_.clear();
0046          if (runner_->hello_resp_.has_value())
0047             runner_->hello_resp_.value().clear();
0048          runner_->add_hello();
0049 
0050          BOOST_ASIO_CORO_YIELD
0051          conn_->async_exec(runner_->hello_req_, runner_->hello_resp_, std::move(self));
0052          logger_.on_hello(ec, runner_->hello_resp_);
0053 
0054          if (ec || runner_->has_error_in_response() || is_cancelled(self)) {
0055             logger_.trace("hello-op: error/canceled. Exiting ...");
0056             conn_->cancel(operation::run);
0057             self.complete(!!ec ? ec : asio::error::operation_aborted);
0058             return;
0059          }
0060 
0061          self.complete({});
0062       }
0063    }
0064 };
0065 
0066 template <class Runner, class Connection, class Logger>
0067 class runner_op {
0068 private:
0069    Runner* runner_ = nullptr;
0070    Connection* conn_ = nullptr;
0071    Logger logger_;
0072    asio::coroutine coro_{};
0073 
0074 public:
0075    runner_op(Runner* runner, Connection* conn, Logger l)
0076    : runner_{runner}
0077    , conn_{conn}
0078    , logger_{l}
0079    {}
0080 
0081    template <class Self>
0082    void operator()( Self& self
0083                   , std::array<std::size_t, 3> order = {}
0084                   , system::error_code ec0 = {}
0085                   , system::error_code ec1 = {}
0086                   , system::error_code ec2 = {}
0087                   , std::size_t = 0)
0088    {
0089       BOOST_ASIO_CORO_REENTER (coro_)
0090       {
0091          BOOST_ASIO_CORO_YIELD
0092          asio::experimental::make_parallel_group(
0093             [this](auto token) { return runner_->async_run_all(*conn_, logger_, token); },
0094             [this](auto token) { return runner_->health_checker_.async_check_health(*conn_, logger_, token); },
0095             [this](auto token) { return runner_->async_hello(*conn_, logger_, token); }
0096          ).async_wait(
0097             asio::experimental::wait_for_all(),
0098             std::move(self));
0099 
0100          logger_.on_runner(ec0, ec1, ec2);
0101 
0102          if (is_cancelled(self)) {
0103             self.complete(asio::error::operation_aborted);
0104             return;
0105          }
0106 
0107          if (ec0 == error::connect_timeout || ec0 == error::resolve_timeout) {
0108             self.complete(ec0);
0109             return;
0110          }
0111 
0112          if (order[0] == 2 && !!ec2) {
0113             self.complete(ec2);
0114             return;
0115          }
0116 
0117          if (order[0] == 1 && ec1 == error::pong_timeout) {
0118             self.complete(ec1);
0119             return;
0120          }
0121 
0122          self.complete(ec0);
0123       }
0124    }
0125 };
0126 
0127 template <class Runner, class Connection, class Logger>
0128 struct run_all_op {
0129    Runner* runner_ = nullptr;
0130    Connection* conn_ = nullptr;
0131    Logger logger_;
0132    asio::coroutine coro_{};
0133 
0134    template <class Self>
0135    void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
0136    {
0137       BOOST_ASIO_CORO_REENTER (coro_)
0138       {
0139          BOOST_ASIO_CORO_YIELD
0140          runner_->resv_.async_resolve(std::move(self));
0141          logger_.on_resolve(ec, runner_->resv_.results());
0142          BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
0143 
0144          BOOST_ASIO_CORO_YIELD
0145          runner_->ctor_.async_connect(conn_->next_layer().next_layer(), runner_->resv_.results(), std::move(self));
0146          logger_.on_connect(ec, runner_->ctor_.endpoint());
0147          BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
0148 
0149          if (conn_->use_ssl()) {
0150             BOOST_ASIO_CORO_YIELD
0151             runner_->hsher_.async_handshake(conn_->next_layer(), std::move(self));
0152             logger_.on_ssl_handshake(ec);
0153             BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
0154          }
0155 
0156          BOOST_ASIO_CORO_YIELD
0157          conn_->async_run_lean(runner_->cfg_, logger_, std::move(self));
0158          BOOST_REDIS_CHECK_OP0(;)
0159          self.complete(ec);
0160       }
0161    }
0162 };
0163 
0164 template <class Executor>
0165 class runner {
0166 public:
0167    runner(Executor ex, config cfg)
0168    : resv_{ex}
0169    , ctor_{ex}
0170    , hsher_{ex}
0171    , health_checker_{ex}
0172    , cfg_{cfg}
0173    { }
0174 
0175    std::size_t cancel(operation op)
0176    {
0177       resv_.cancel(op);
0178       ctor_.cancel(op);
0179       hsher_.cancel(op);
0180       health_checker_.cancel(op);
0181       return 0U;
0182    }
0183 
0184    void set_config(config const& cfg)
0185    {
0186       cfg_ = cfg;
0187       resv_.set_config(cfg);
0188       ctor_.set_config(cfg);
0189       hsher_.set_config(cfg);
0190       health_checker_.set_config(cfg);
0191    }
0192 
0193    template <class Connection, class Logger, class CompletionToken>
0194    auto async_run(Connection& conn, Logger l, CompletionToken token)
0195    {
0196       return asio::async_compose
0197          < CompletionToken
0198          , void(system::error_code)
0199          >(runner_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
0200    }
0201 
0202    config const& get_config() const noexcept {return cfg_;}
0203 
0204 private:
0205    using resolver_type = resolver<Executor>;
0206    using connector_type = connector<Executor>;
0207    using handshaker_type = detail::handshaker<Executor>;
0208    using health_checker_type = health_checker<Executor>;
0209    using timer_type = typename connector_type::timer_type;
0210 
0211    template <class, class, class> friend struct run_all_op;
0212    template <class, class, class> friend class runner_op;
0213    template <class, class, class> friend struct hello_op;
0214 
0215    template <class Connection, class Logger, class CompletionToken>
0216    auto async_run_all(Connection& conn, Logger l, CompletionToken token)
0217    {
0218       return asio::async_compose
0219          < CompletionToken
0220          , void(system::error_code)
0221          >(run_all_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
0222    }
0223 
0224    template <class Connection, class Logger, class CompletionToken>
0225    auto async_hello(Connection& conn, Logger l, CompletionToken token)
0226    {
0227       return asio::async_compose
0228          < CompletionToken
0229          , void(system::error_code)
0230          >(hello_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
0231    }
0232 
0233    void add_hello()
0234    {
0235       if (!cfg_.username.empty() && !cfg_.password.empty() && !cfg_.clientname.empty())
0236          hello_req_.push("HELLO", "3", "AUTH", cfg_.username, cfg_.password, "SETNAME", cfg_.clientname);
0237       else if (cfg_.username.empty() && cfg_.password.empty() && cfg_.clientname.empty())
0238          hello_req_.push("HELLO", "3");
0239       else if (cfg_.clientname.empty())
0240          hello_req_.push("HELLO", "3", "AUTH", cfg_.username, cfg_.password);
0241       else
0242          hello_req_.push("HELLO", "3", "SETNAME", cfg_.clientname);
0243 
0244       if (cfg_.database_index && cfg_.database_index.value() != 0)
0245          hello_req_.push("SELECT", cfg_.database_index.value());
0246    }
0247 
0248    bool has_error_in_response() const noexcept
0249    {
0250       if (!hello_resp_.has_value())
0251          return true;
0252 
0253       auto f = [](auto const& e)
0254       {
0255          switch (e.data_type) {
0256             case resp3::type::simple_error:
0257             case resp3::type::blob_error: return true;
0258             default: return false;
0259          }
0260       };
0261 
0262       return std::any_of(std::cbegin(hello_resp_.value()), std::cend(hello_resp_.value()), f);
0263    }
0264 
0265    resolver_type resv_;
0266    connector_type ctor_;
0267    handshaker_type hsher_;
0268    health_checker_type health_checker_;
0269    request hello_req_;
0270    generic_response hello_resp_;
0271    config cfg_;
0272 };
0273 
0274 } // boost::redis::detail
0275 
0276 #endif // BOOST_REDIS_RUNNER_HPP