File indexing completed on 2025-01-18 09:51:19
0001
0002
0003
0004
0005
0006
0007 #ifndef BOOST_REDIS_RUNNER_HPP
0008 #define BOOST_REDIS_RUNNER_HPP
0009
0010 #include <boost/redis/detail/health_checker.hpp>
0011 #include <boost/redis/config.hpp>
0012 #include <boost/redis/response.hpp>
0013 #include <boost/redis/detail/helper.hpp>
0014 #include <boost/redis/error.hpp>
0015 #include <boost/redis/logger.hpp>
0016 #include <boost/redis/operation.hpp>
0017 #include <boost/redis/detail/connector.hpp>
0018 #include <boost/redis/detail/resolver.hpp>
0019 #include <boost/redis/detail/handshaker.hpp>
0020 #include <boost/asio/compose.hpp>
0021 #include <boost/asio/connect.hpp>
0022 #include <boost/asio/coroutine.hpp>
0023 #include <boost/asio/experimental/parallel_group.hpp>
0024 #include <boost/asio/ip/tcp.hpp>
0025 #include <boost/asio/steady_timer.hpp>
0026 #include <string>
0027 #include <memory>
0028 #include <chrono>
0029
0030 namespace boost::redis::detail
0031 {
0032
0033 template <class Runner, class Connection, class Logger>
0034 struct hello_op {
0035 Runner* runner_ = nullptr;
0036 Connection* conn_ = nullptr;
0037 Logger logger_;
0038 asio::coroutine coro_{};
0039
0040 template <class Self>
0041 void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
0042 {
0043 BOOST_ASIO_CORO_REENTER (coro_)
0044 {
0045 runner_->hello_req_.clear();
0046 if (runner_->hello_resp_.has_value())
0047 runner_->hello_resp_.value().clear();
0048 runner_->add_hello();
0049
0050 BOOST_ASIO_CORO_YIELD
0051 conn_->async_exec(runner_->hello_req_, runner_->hello_resp_, std::move(self));
0052 logger_.on_hello(ec, runner_->hello_resp_);
0053
0054 if (ec || runner_->has_error_in_response() || is_cancelled(self)) {
0055 logger_.trace("hello-op: error/canceled. Exiting ...");
0056 conn_->cancel(operation::run);
0057 self.complete(!!ec ? ec : asio::error::operation_aborted);
0058 return;
0059 }
0060
0061 self.complete({});
0062 }
0063 }
0064 };
0065
0066 template <class Runner, class Connection, class Logger>
0067 class runner_op {
0068 private:
0069 Runner* runner_ = nullptr;
0070 Connection* conn_ = nullptr;
0071 Logger logger_;
0072 asio::coroutine coro_{};
0073
0074 public:
0075 runner_op(Runner* runner, Connection* conn, Logger l)
0076 : runner_{runner}
0077 , conn_{conn}
0078 , logger_{l}
0079 {}
0080
0081 template <class Self>
0082 void operator()( Self& self
0083 , std::array<std::size_t, 3> order = {}
0084 , system::error_code ec0 = {}
0085 , system::error_code ec1 = {}
0086 , system::error_code ec2 = {}
0087 , std::size_t = 0)
0088 {
0089 BOOST_ASIO_CORO_REENTER (coro_)
0090 {
0091 BOOST_ASIO_CORO_YIELD
0092 asio::experimental::make_parallel_group(
0093 [this](auto token) { return runner_->async_run_all(*conn_, logger_, token); },
0094 [this](auto token) { return runner_->health_checker_.async_check_health(*conn_, logger_, token); },
0095 [this](auto token) { return runner_->async_hello(*conn_, logger_, token); }
0096 ).async_wait(
0097 asio::experimental::wait_for_all(),
0098 std::move(self));
0099
0100 logger_.on_runner(ec0, ec1, ec2);
0101
0102 if (is_cancelled(self)) {
0103 self.complete(asio::error::operation_aborted);
0104 return;
0105 }
0106
0107 if (ec0 == error::connect_timeout || ec0 == error::resolve_timeout) {
0108 self.complete(ec0);
0109 return;
0110 }
0111
0112 if (order[0] == 2 && !!ec2) {
0113 self.complete(ec2);
0114 return;
0115 }
0116
0117 if (order[0] == 1 && ec1 == error::pong_timeout) {
0118 self.complete(ec1);
0119 return;
0120 }
0121
0122 self.complete(ec0);
0123 }
0124 }
0125 };
0126
0127 template <class Runner, class Connection, class Logger>
0128 struct run_all_op {
0129 Runner* runner_ = nullptr;
0130 Connection* conn_ = nullptr;
0131 Logger logger_;
0132 asio::coroutine coro_{};
0133
0134 template <class Self>
0135 void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
0136 {
0137 BOOST_ASIO_CORO_REENTER (coro_)
0138 {
0139 BOOST_ASIO_CORO_YIELD
0140 runner_->resv_.async_resolve(std::move(self));
0141 logger_.on_resolve(ec, runner_->resv_.results());
0142 BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
0143
0144 BOOST_ASIO_CORO_YIELD
0145 runner_->ctor_.async_connect(conn_->next_layer().next_layer(), runner_->resv_.results(), std::move(self));
0146 logger_.on_connect(ec, runner_->ctor_.endpoint());
0147 BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
0148
0149 if (conn_->use_ssl()) {
0150 BOOST_ASIO_CORO_YIELD
0151 runner_->hsher_.async_handshake(conn_->next_layer(), std::move(self));
0152 logger_.on_ssl_handshake(ec);
0153 BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
0154 }
0155
0156 BOOST_ASIO_CORO_YIELD
0157 conn_->async_run_lean(runner_->cfg_, logger_, std::move(self));
0158 BOOST_REDIS_CHECK_OP0(;)
0159 self.complete(ec);
0160 }
0161 }
0162 };
0163
0164 template <class Executor>
0165 class runner {
0166 public:
0167 runner(Executor ex, config cfg)
0168 : resv_{ex}
0169 , ctor_{ex}
0170 , hsher_{ex}
0171 , health_checker_{ex}
0172 , cfg_{cfg}
0173 { }
0174
0175 std::size_t cancel(operation op)
0176 {
0177 resv_.cancel(op);
0178 ctor_.cancel(op);
0179 hsher_.cancel(op);
0180 health_checker_.cancel(op);
0181 return 0U;
0182 }
0183
0184 void set_config(config const& cfg)
0185 {
0186 cfg_ = cfg;
0187 resv_.set_config(cfg);
0188 ctor_.set_config(cfg);
0189 hsher_.set_config(cfg);
0190 health_checker_.set_config(cfg);
0191 }
0192
0193 template <class Connection, class Logger, class CompletionToken>
0194 auto async_run(Connection& conn, Logger l, CompletionToken token)
0195 {
0196 return asio::async_compose
0197 < CompletionToken
0198 , void(system::error_code)
0199 >(runner_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
0200 }
0201
0202 config const& get_config() const noexcept {return cfg_;}
0203
0204 private:
0205 using resolver_type = resolver<Executor>;
0206 using connector_type = connector<Executor>;
0207 using handshaker_type = detail::handshaker<Executor>;
0208 using health_checker_type = health_checker<Executor>;
0209 using timer_type = typename connector_type::timer_type;
0210
0211 template <class, class, class> friend struct run_all_op;
0212 template <class, class, class> friend class runner_op;
0213 template <class, class, class> friend struct hello_op;
0214
0215 template <class Connection, class Logger, class CompletionToken>
0216 auto async_run_all(Connection& conn, Logger l, CompletionToken token)
0217 {
0218 return asio::async_compose
0219 < CompletionToken
0220 , void(system::error_code)
0221 >(run_all_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
0222 }
0223
0224 template <class Connection, class Logger, class CompletionToken>
0225 auto async_hello(Connection& conn, Logger l, CompletionToken token)
0226 {
0227 return asio::async_compose
0228 < CompletionToken
0229 , void(system::error_code)
0230 >(hello_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
0231 }
0232
0233 void add_hello()
0234 {
0235 if (!cfg_.username.empty() && !cfg_.password.empty() && !cfg_.clientname.empty())
0236 hello_req_.push("HELLO", "3", "AUTH", cfg_.username, cfg_.password, "SETNAME", cfg_.clientname);
0237 else if (cfg_.username.empty() && cfg_.password.empty() && cfg_.clientname.empty())
0238 hello_req_.push("HELLO", "3");
0239 else if (cfg_.clientname.empty())
0240 hello_req_.push("HELLO", "3", "AUTH", cfg_.username, cfg_.password);
0241 else
0242 hello_req_.push("HELLO", "3", "SETNAME", cfg_.clientname);
0243
0244 if (cfg_.database_index && cfg_.database_index.value() != 0)
0245 hello_req_.push("SELECT", cfg_.database_index.value());
0246 }
0247
0248 bool has_error_in_response() const noexcept
0249 {
0250 if (!hello_resp_.has_value())
0251 return true;
0252
0253 auto f = [](auto const& e)
0254 {
0255 switch (e.data_type) {
0256 case resp3::type::simple_error:
0257 case resp3::type::blob_error: return true;
0258 default: return false;
0259 }
0260 };
0261
0262 return std::any_of(std::cbegin(hello_resp_.value()), std::cend(hello_resp_.value()), f);
0263 }
0264
0265 resolver_type resv_;
0266 connector_type ctor_;
0267 handshaker_type hsher_;
0268 health_checker_type health_checker_;
0269 request hello_req_;
0270 generic_response hello_resp_;
0271 config cfg_;
0272 };
0273
0274 }
0275
0276 #endif