File indexing completed on 2025-09-14 08:47:51
0001
0002
0003
0004
0005
0006
0007 #ifndef BOOST_REDIS_RUNNER_HPP
0008 #define BOOST_REDIS_RUNNER_HPP
0009
0010 #include <boost/redis/adapter/any_adapter.hpp>
0011 #include <boost/redis/config.hpp>
0012 #include <boost/redis/request.hpp>
0013 #include <boost/redis/response.hpp>
0014 #include <boost/redis/detail/helper.hpp>
0015 #include <boost/redis/error.hpp>
0016 #include <boost/redis/logger.hpp>
0017 #include <boost/redis/operation.hpp>
0018 #include <boost/asio/compose.hpp>
0019 #include <boost/asio/coroutine.hpp>
0020 #include <boost/asio/experimental/parallel_group.hpp>
0021 #include <boost/asio/ip/tcp.hpp>
0022 #include <boost/asio/steady_timer.hpp>
0023 #include <boost/asio/prepend.hpp>
0024 #include <boost/asio/ssl.hpp>
0025 #include <boost/asio/cancel_after.hpp>
0026 #include <string>
0027 #include <memory>
0028 #include <chrono>
0029
0030 namespace boost::redis::detail
0031 {
0032
0033 void push_hello(config const& cfg, request& req);
0034
0035
0036
0037
0038 template <class Runner, class Connection, class Logger>
0039 struct hello_op {
0040 Runner* runner_ = nullptr;
0041 Connection* conn_ = nullptr;
0042 Logger logger_;
0043 asio::coroutine coro_{};
0044
0045 template <class Self>
0046 void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
0047 {
0048 BOOST_ASIO_CORO_REENTER (coro_)
0049 {
0050 runner_->add_hello();
0051
0052 BOOST_ASIO_CORO_YIELD
0053 conn_->async_exec(runner_->hello_req_, any_adapter(runner_->hello_resp_), std::move(self));
0054 logger_.on_hello(ec, runner_->hello_resp_);
0055
0056 if (ec) {
0057 conn_->cancel(operation::run);
0058 self.complete(ec);
0059 return;
0060 }
0061
0062 if (runner_->has_error_in_response()) {
0063 conn_->cancel(operation::run);
0064 self.complete(error::resp3_hello);
0065 return;
0066 }
0067
0068 self.complete({});
0069 }
0070 }
0071 };
0072
0073 template <class Runner, class Connection, class Logger>
0074 class runner_op {
0075 private:
0076 Runner* runner_ = nullptr;
0077 Connection* conn_ = nullptr;
0078 Logger logger_;
0079 asio::coroutine coro_{};
0080
0081 using order_t = std::array<std::size_t, 5>;
0082
0083 public:
0084 runner_op(Runner* runner, Connection* conn, Logger l)
0085 : runner_{runner}
0086 , conn_{conn}
0087 , logger_{l}
0088 {}
0089
0090 template <class Self>
0091 void operator()( Self& self
0092 , order_t order = {}
0093 , system::error_code ec0 = {}
0094 , system::error_code ec1 = {}
0095 , system::error_code ec2 = {}
0096 , system::error_code ec3 = {}
0097 , system::error_code ec4 = {})
0098 {
0099 BOOST_ASIO_CORO_REENTER (coro_) for (;;)
0100 {
0101 BOOST_ASIO_CORO_YIELD
0102 conn_->resv_.async_resolve(asio::prepend(std::move(self), order_t {}));
0103
0104 logger_.on_resolve(ec0, conn_->resv_.results());
0105
0106 if (ec0) {
0107 self.complete(ec0);
0108 return;
0109 }
0110
0111 BOOST_ASIO_CORO_YIELD
0112 conn_->ctor_.async_connect(
0113 conn_->next_layer().next_layer(),
0114 conn_->resv_.results(),
0115 asio::prepend(std::move(self), order_t {}));
0116
0117 logger_.on_connect(ec0, conn_->ctor_.endpoint());
0118
0119 if (ec0) {
0120 self.complete(ec0);
0121 return;
0122 }
0123
0124 if (conn_->use_ssl()) {
0125 BOOST_ASIO_CORO_YIELD
0126 conn_->next_layer().async_handshake(
0127 asio::ssl::stream_base::client,
0128 asio::prepend(
0129 asio::cancel_after(
0130 runner_->cfg_.ssl_handshake_timeout,
0131 std::move(self)
0132 ),
0133 order_t {}
0134 )
0135 );
0136
0137 logger_.on_ssl_handshake(ec0);
0138
0139 if (ec0) {
0140 self.complete(ec0);
0141 return;
0142 }
0143 }
0144
0145 conn_->reset();
0146
0147
0148
0149
0150 BOOST_ASIO_CORO_YIELD
0151 asio::experimental::make_parallel_group(
0152 [this](auto token) { return runner_->async_hello(*conn_, logger_, token); },
0153 [this](auto token) { return conn_->health_checker_.async_ping(*conn_, logger_, token); },
0154 [this](auto token) { return conn_->health_checker_.async_check_timeout(*conn_, logger_, token);},
0155 [this](auto token) { return conn_->reader(logger_, token);},
0156 [this](auto token) { return conn_->writer(logger_, token);}
0157 ).async_wait(
0158 asio::experimental::wait_for_one_error(),
0159 std::move(self));
0160
0161 if (order[0] == 0 && !!ec0) {
0162 self.complete(ec0);
0163 return;
0164 }
0165
0166 if (order[0] == 2 && ec2 == error::pong_timeout) {
0167 self.complete(ec1);
0168 return;
0169 }
0170
0171
0172
0173
0174 conn_->cancel(operation::receive);
0175
0176 if (!conn_->will_reconnect()) {
0177 conn_->cancel(operation::reconnection);
0178 self.complete(ec3);
0179 return;
0180 }
0181
0182
0183
0184 conn_->writer_timer_.expires_after(conn_->cfg_.reconnect_wait_interval);
0185
0186 BOOST_ASIO_CORO_YIELD
0187 conn_->writer_timer_.async_wait(asio::prepend(std::move(self), order_t {}));
0188 if (ec0) {
0189 self.complete(ec0);
0190 return;
0191 }
0192
0193 if (!conn_->will_reconnect()) {
0194 self.complete(asio::error::operation_aborted);
0195 return;
0196 }
0197
0198 conn_->reset_stream();
0199 }
0200 }
0201 };
0202
0203 template <class Executor>
0204 class runner {
0205 public:
0206 runner(Executor ex, config cfg)
0207 : cfg_{cfg}
0208 { }
0209
0210 void set_config(config const& cfg)
0211 {
0212 cfg_ = cfg;
0213 }
0214
0215 template <class Connection, class Logger, class CompletionToken>
0216 auto async_run(Connection& conn, Logger l, CompletionToken token)
0217 {
0218 return asio::async_compose
0219 < CompletionToken
0220 , void(system::error_code)
0221 >(runner_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
0222 }
0223
0224 private:
0225
0226 template <class, class, class> friend class runner_op;
0227 template <class, class, class> friend struct hello_op;
0228
0229 template <class Connection, class Logger, class CompletionToken>
0230 auto async_hello(Connection& conn, Logger l, CompletionToken token)
0231 {
0232 return asio::async_compose
0233 < CompletionToken
0234 , void(system::error_code)
0235 >(hello_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
0236 }
0237
0238 void add_hello()
0239 {
0240 hello_req_.clear();
0241 if (hello_resp_.has_value())
0242 hello_resp_.value().clear();
0243 push_hello(cfg_, hello_req_);
0244 }
0245
0246 bool has_error_in_response() const noexcept
0247 {
0248 if (!hello_resp_.has_value())
0249 return true;
0250
0251 auto f = [](auto const& e)
0252 {
0253 switch (e.data_type) {
0254 case resp3::type::simple_error:
0255 case resp3::type::blob_error: return true;
0256 default: return false;
0257 }
0258 };
0259
0260 return std::any_of(std::cbegin(hello_resp_.value()), std::cend(hello_resp_.value()), f);
0261 }
0262
0263 request hello_req_;
0264 generic_response hello_resp_;
0265 config cfg_;
0266 };
0267
0268 }
0269
0270 #endif