File indexing completed on 2025-01-18 09:51:19
0001
0002
0003
0004
0005
0006
0007 #ifndef BOOST_REDIS_HEALTH_CHECKER_HPP
0008 #define BOOST_REDIS_HEALTH_CHECKER_HPP
0009
0010
0011 #include <boost/redis/request.hpp>
0012 #include <boost/redis/response.hpp>
0013 #include <boost/redis/operation.hpp>
0014 #include <boost/redis/detail/helper.hpp>
0015 #include <boost/redis/config.hpp>
0016 #include <boost/asio/steady_timer.hpp>
0017 #include <boost/asio/compose.hpp>
0018 #include <boost/asio/consign.hpp>
0019 #include <boost/asio/coroutine.hpp>
0020 #include <boost/asio/post.hpp>
0021 #include <boost/asio/experimental/parallel_group.hpp>
0022 #include <memory>
0023 #include <chrono>
0024
0025 namespace boost::redis::detail {
0026
0027 template <class HealthChecker, class Connection, class Logger>
0028 class ping_op {
0029 public:
0030 HealthChecker* checker_ = nullptr;
0031 Connection* conn_ = nullptr;
0032 Logger logger_;
0033 asio::coroutine coro_{};
0034
0035 template <class Self>
0036 void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
0037 {
0038 BOOST_ASIO_CORO_REENTER (coro_) for (;;)
0039 {
0040 if (checker_->checker_has_exited_) {
0041 logger_.trace("ping_op: checker has exited. Exiting ...");
0042 self.complete({});
0043 return;
0044 }
0045
0046 BOOST_ASIO_CORO_YIELD
0047 conn_->async_exec(checker_->req_, checker_->resp_, std::move(self));
0048 if (ec || is_cancelled(self)) {
0049 logger_.trace("ping_op: error/cancelled (1).");
0050 checker_->wait_timer_.cancel();
0051 self.complete(!!ec ? ec : asio::error::operation_aborted);
0052 return;
0053 }
0054
0055
0056 checker_->ping_timer_.expires_after(checker_->ping_interval_);
0057 BOOST_ASIO_CORO_YIELD
0058 checker_->ping_timer_.async_wait(std::move(self));
0059 if (ec || is_cancelled(self)) {
0060 logger_.trace("ping_op: error/cancelled (2).");
0061 self.complete(!!ec ? ec : asio::error::operation_aborted);
0062 return;
0063 }
0064 }
0065 }
0066 };
0067
0068 template <class HealthChecker, class Connection, class Logger>
0069 class check_timeout_op {
0070 public:
0071 HealthChecker* checker_ = nullptr;
0072 Connection* conn_ = nullptr;
0073 Logger logger_;
0074 asio::coroutine coro_{};
0075
0076 template <class Self>
0077 void operator()(Self& self, system::error_code ec = {})
0078 {
0079 BOOST_ASIO_CORO_REENTER (coro_) for (;;)
0080 {
0081 checker_->wait_timer_.expires_after(2 * checker_->ping_interval_);
0082 BOOST_ASIO_CORO_YIELD
0083 checker_->wait_timer_.async_wait(std::move(self));
0084 if (ec || is_cancelled(self)) {
0085 logger_.trace("check-timeout-op: error/canceled. Exiting ...");
0086 self.complete(!!ec ? ec : asio::error::operation_aborted);
0087 return;
0088 }
0089
0090 if (checker_->resp_.has_error()) {
0091 logger_.trace("check-timeout-op: Response error. Exiting ...");
0092 self.complete({});
0093 return;
0094 }
0095
0096 if (checker_->resp_.value().empty()) {
0097 logger_.trace("check-timeout-op: Response has no value. Exiting ...");
0098 checker_->ping_timer_.cancel();
0099 conn_->cancel(operation::run);
0100 checker_->checker_has_exited_ = true;
0101 self.complete(error::pong_timeout);
0102 return;
0103 }
0104
0105 if (checker_->resp_.has_value()) {
0106 checker_->resp_.value().clear();
0107 }
0108 }
0109 }
0110 };
0111
0112 template <class HealthChecker, class Connection, class Logger>
0113 class check_health_op {
0114 public:
0115 HealthChecker* checker_ = nullptr;
0116 Connection* conn_ = nullptr;
0117 Logger logger_;
0118 asio::coroutine coro_{};
0119
0120 template <class Self>
0121 void
0122 operator()(
0123 Self& self,
0124 std::array<std::size_t, 2> order = {},
0125 system::error_code ec1 = {},
0126 system::error_code ec2 = {})
0127 {
0128 BOOST_ASIO_CORO_REENTER (coro_)
0129 {
0130 if (checker_->ping_interval_ == std::chrono::seconds::zero()) {
0131 logger_.trace("check-health-op: timeout disabled.");
0132 BOOST_ASIO_CORO_YIELD
0133 asio::post(std::move(self));
0134 self.complete({});
0135 return;
0136 }
0137
0138 BOOST_ASIO_CORO_YIELD
0139 asio::experimental::make_parallel_group(
0140 [this](auto token) { return checker_->async_ping(*conn_, logger_, token); },
0141 [this](auto token) { return checker_->async_check_timeout(*conn_, logger_, token);}
0142 ).async_wait(
0143 asio::experimental::wait_for_one(),
0144 std::move(self));
0145
0146 logger_.on_check_health(ec1, ec2);
0147
0148 if (is_cancelled(self)) {
0149 logger_.trace("check-health-op: canceled. Exiting ...");
0150 self.complete(asio::error::operation_aborted);
0151 return;
0152 }
0153
0154 switch (order[0]) {
0155 case 0: self.complete(ec1); return;
0156 case 1: self.complete(ec2); return;
0157 default: BOOST_ASSERT(false);
0158 }
0159 }
0160 }
0161 };
0162
0163 template <class Executor>
0164 class health_checker {
0165 private:
0166 using timer_type =
0167 asio::basic_waitable_timer<
0168 std::chrono::steady_clock,
0169 asio::wait_traits<std::chrono::steady_clock>,
0170 Executor>;
0171
0172 public:
0173 health_checker(Executor ex)
0174 : ping_timer_{ex}
0175 , wait_timer_{ex}
0176 {
0177 req_.push("PING", "Boost.Redis");
0178 }
0179
0180 void set_config(config const& cfg)
0181 {
0182 req_.clear();
0183 req_.push("PING", cfg.health_check_id);
0184 ping_interval_ = cfg.health_check_interval;
0185 }
0186
0187 template <
0188 class Connection,
0189 class Logger,
0190 class CompletionToken = asio::default_completion_token_t<Executor>
0191 >
0192 auto
0193 async_check_health(
0194 Connection& conn,
0195 Logger l,
0196 CompletionToken token = CompletionToken{})
0197 {
0198 checker_has_exited_ = false;
0199 return asio::async_compose
0200 < CompletionToken
0201 , void(system::error_code)
0202 >(check_health_op<health_checker, Connection, Logger>{this, &conn, l}, token, conn);
0203 }
0204
0205 std::size_t cancel(operation op)
0206 {
0207 switch (op) {
0208 case operation::health_check:
0209 case operation::all:
0210 ping_timer_.cancel();
0211 wait_timer_.cancel();
0212 break;
0213 default: ;
0214 }
0215
0216 return 0;
0217 }
0218
0219 private:
0220 template <class Connection, class Logger, class CompletionToken>
0221 auto async_ping(Connection& conn, Logger l, CompletionToken token)
0222 {
0223 return asio::async_compose
0224 < CompletionToken
0225 , void(system::error_code)
0226 >(ping_op<health_checker, Connection, Logger>{this, &conn, l}, token, conn, ping_timer_);
0227 }
0228
0229 template <class Connection, class Logger, class CompletionToken>
0230 auto async_check_timeout(Connection& conn, Logger l, CompletionToken token)
0231 {
0232 return asio::async_compose
0233 < CompletionToken
0234 , void(system::error_code)
0235 >(check_timeout_op<health_checker, Connection, Logger>{this, &conn, l}, token, conn, wait_timer_);
0236 }
0237
0238 template <class, class, class> friend class ping_op;
0239 template <class, class, class> friend class check_timeout_op;
0240 template <class, class, class> friend class check_health_op;
0241
0242 timer_type ping_timer_;
0243 timer_type wait_timer_;
0244 redis::request req_;
0245 redis::generic_response resp_;
0246 std::chrono::steady_clock::duration ping_interval_ = std::chrono::seconds{5};
0247 bool checker_has_exited_ = false;
0248 };
0249
0250 }
0251
0252 #endif