File indexing completed on 2025-07-15 08:49:04
0001
0002
0003
0004
0005
0006
0007 #ifndef BOOST_REDIS_RUNNER_HPP
0008 #define BOOST_REDIS_RUNNER_HPP
0009
0010 #include <boost/redis/detail/health_checker.hpp>
0011 #include <boost/redis/config.hpp>
0012 #include <boost/redis/response.hpp>
0013 #include <boost/redis/detail/helper.hpp>
0014 #include <boost/redis/error.hpp>
0015 #include <boost/redis/logger.hpp>
0016 #include <boost/redis/operation.hpp>
0017 #include <boost/redis/detail/connector.hpp>
0018 #include <boost/redis/detail/resolver.hpp>
0019 #include <boost/redis/detail/handshaker.hpp>
0020 #include <boost/asio/compose.hpp>
0021 #include <boost/asio/connect.hpp>
0022 #include <boost/asio/coroutine.hpp>
0023 #include <boost/asio/experimental/parallel_group.hpp>
0024 #include <boost/asio/ip/tcp.hpp>
0025 #include <boost/asio/steady_timer.hpp>
0026 #include <string>
0027 #include <memory>
0028 #include <chrono>
0029
0030 namespace boost::redis::detail
0031 {
0032
0033 void push_hello(config const& cfg, request& req);
0034
0035 template <class Runner, class Connection, class Logger>
0036 struct hello_op {
0037 Runner* runner_ = nullptr;
0038 Connection* conn_ = nullptr;
0039 Logger logger_;
0040 asio::coroutine coro_{};
0041
0042 template <class Self>
0043 void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
0044 {
0045 BOOST_ASIO_CORO_REENTER (coro_)
0046 {
0047 runner_->add_hello();
0048
0049 BOOST_ASIO_CORO_YIELD
0050 conn_->async_exec(runner_->hello_req_, runner_->hello_resp_, std::move(self));
0051 logger_.on_hello(ec, runner_->hello_resp_);
0052
0053 if (ec || runner_->has_error_in_response() || is_cancelled(self)) {
0054 logger_.trace("hello-op: error/canceled. Exiting ...");
0055 conn_->cancel(operation::run);
0056 self.complete(!!ec ? ec : asio::error::operation_aborted);
0057 return;
0058 }
0059
0060 self.complete({});
0061 }
0062 }
0063 };
0064
0065 template <class Runner, class Connection, class Logger>
0066 class runner_op {
0067 private:
0068 Runner* runner_ = nullptr;
0069 Connection* conn_ = nullptr;
0070 Logger logger_;
0071 asio::coroutine coro_{};
0072
0073 public:
0074 runner_op(Runner* runner, Connection* conn, Logger l)
0075 : runner_{runner}
0076 , conn_{conn}
0077 , logger_{l}
0078 {}
0079
0080 template <class Self>
0081 void operator()( Self& self
0082 , std::array<std::size_t, 3> order = {}
0083 , system::error_code ec0 = {}
0084 , system::error_code ec1 = {}
0085 , system::error_code ec2 = {}
0086 , std::size_t = 0)
0087 {
0088 BOOST_ASIO_CORO_REENTER (coro_)
0089 {
0090 BOOST_ASIO_CORO_YIELD
0091 asio::experimental::make_parallel_group(
0092 [this](auto token) { return runner_->async_run_all(*conn_, logger_, token); },
0093 [this](auto token) { return runner_->health_checker_.async_check_health(*conn_, logger_, token); },
0094 [this](auto token) { return runner_->async_hello(*conn_, logger_, token); }
0095 ).async_wait(
0096 asio::experimental::wait_for_all(),
0097 std::move(self));
0098
0099 logger_.on_runner(ec0, ec1, ec2);
0100
0101 if (is_cancelled(self)) {
0102 self.complete(asio::error::operation_aborted);
0103 return;
0104 }
0105
0106 if (ec0 == error::connect_timeout || ec0 == error::resolve_timeout) {
0107 self.complete(ec0);
0108 return;
0109 }
0110
0111 if (order[0] == 2 && !!ec2) {
0112 self.complete(ec2);
0113 return;
0114 }
0115
0116 if (order[0] == 1 && ec1 == error::pong_timeout) {
0117 self.complete(ec1);
0118 return;
0119 }
0120
0121 self.complete(ec0);
0122 }
0123 }
0124 };
0125
0126 template <class Runner, class Connection, class Logger>
0127 struct run_all_op {
0128 Runner* runner_ = nullptr;
0129 Connection* conn_ = nullptr;
0130 Logger logger_;
0131 asio::coroutine coro_{};
0132
0133 template <class Self>
0134 void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
0135 {
0136 BOOST_ASIO_CORO_REENTER (coro_)
0137 {
0138 BOOST_ASIO_CORO_YIELD
0139 runner_->resv_.async_resolve(std::move(self));
0140 logger_.on_resolve(ec, runner_->resv_.results());
0141 BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
0142
0143 BOOST_ASIO_CORO_YIELD
0144 runner_->ctor_.async_connect(conn_->next_layer().next_layer(), runner_->resv_.results(), std::move(self));
0145 logger_.on_connect(ec, runner_->ctor_.endpoint());
0146 BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
0147
0148 if (conn_->use_ssl()) {
0149 BOOST_ASIO_CORO_YIELD
0150 runner_->hsher_.async_handshake(conn_->next_layer(), std::move(self));
0151 logger_.on_ssl_handshake(ec);
0152 BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
0153 }
0154
0155 BOOST_ASIO_CORO_YIELD
0156 conn_->async_run_lean(runner_->cfg_, logger_, std::move(self));
0157 BOOST_REDIS_CHECK_OP0(;)
0158 self.complete(ec);
0159 }
0160 }
0161 };
0162
0163 template <class Executor>
0164 class runner {
0165 public:
0166 runner(Executor ex, config cfg)
0167 : resv_{ex}
0168 , ctor_{ex}
0169 , hsher_{ex}
0170 , health_checker_{ex}
0171 , cfg_{cfg}
0172 { }
0173
0174 std::size_t cancel(operation op)
0175 {
0176 resv_.cancel(op);
0177 ctor_.cancel(op);
0178 hsher_.cancel(op);
0179 health_checker_.cancel(op);
0180 return 0U;
0181 }
0182
0183 void set_config(config const& cfg)
0184 {
0185 cfg_ = cfg;
0186 resv_.set_config(cfg);
0187 ctor_.set_config(cfg);
0188 hsher_.set_config(cfg);
0189 health_checker_.set_config(cfg);
0190 }
0191
0192 template <class Connection, class Logger, class CompletionToken>
0193 auto async_run(Connection& conn, Logger l, CompletionToken token)
0194 {
0195 return asio::async_compose
0196 < CompletionToken
0197 , void(system::error_code)
0198 >(runner_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
0199 }
0200
0201 config const& get_config() const noexcept {return cfg_;}
0202
0203 private:
0204 using resolver_type = resolver<Executor>;
0205 using connector_type = connector<Executor>;
0206 using handshaker_type = detail::handshaker<Executor>;
0207 using health_checker_type = health_checker<Executor>;
0208 using timer_type = typename connector_type::timer_type;
0209
0210 template <class, class, class> friend struct run_all_op;
0211 template <class, class, class> friend class runner_op;
0212 template <class, class, class> friend struct hello_op;
0213
0214 template <class Connection, class Logger, class CompletionToken>
0215 auto async_run_all(Connection& conn, Logger l, CompletionToken token)
0216 {
0217 return asio::async_compose
0218 < CompletionToken
0219 , void(system::error_code)
0220 >(run_all_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
0221 }
0222
0223 template <class Connection, class Logger, class CompletionToken>
0224 auto async_hello(Connection& conn, Logger l, CompletionToken token)
0225 {
0226 return asio::async_compose
0227 < CompletionToken
0228 , void(system::error_code)
0229 >(hello_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
0230 }
0231
0232 void add_hello()
0233 {
0234 hello_req_.clear();
0235 if (hello_resp_.has_value())
0236 hello_resp_.value().clear();
0237 push_hello(cfg_, hello_req_);
0238 }
0239
0240 bool has_error_in_response() const noexcept
0241 {
0242 if (!hello_resp_.has_value())
0243 return true;
0244
0245 auto f = [](auto const& e)
0246 {
0247 switch (e.data_type) {
0248 case resp3::type::simple_error:
0249 case resp3::type::blob_error: return true;
0250 default: return false;
0251 }
0252 };
0253
0254 return std::any_of(std::cbegin(hello_resp_.value()), std::cend(hello_resp_.value()), f);
0255 }
0256
0257 resolver_type resv_;
0258 connector_type ctor_;
0259 handshaker_type hsher_;
0260 health_checker_type health_checker_;
0261 request hello_req_;
0262 generic_response hello_resp_;
0263 config cfg_;
0264 };
0265
0266 }
0267
0268 #endif