File indexing completed on 2025-09-17 08:50:07
0001
0002
0003
0004
0005
0006
0007 #ifndef BOOST_REDIS_HEALTH_CHECKER_HPP
0008 #define BOOST_REDIS_HEALTH_CHECKER_HPP
0009
0010 #include <boost/redis/request.hpp>
0011 #include <boost/redis/response.hpp>
0012 #include <boost/redis/operation.hpp>
0013 #include <boost/redis/adapter/any_adapter.hpp>
0014 #include <boost/redis/config.hpp>
0015 #include <boost/redis/operation.hpp>
0016 #include <boost/asio/steady_timer.hpp>
0017 #include <boost/asio/compose.hpp>
0018 #include <boost/asio/consign.hpp>
0019 #include <boost/asio/coroutine.hpp>
0020 #include <boost/asio/post.hpp>
0021 #include <memory>
0022 #include <chrono>
0023
0024 namespace boost::redis::detail {
0025
0026 template <class HealthChecker, class Connection, class Logger>
0027 class ping_op {
0028 public:
0029 HealthChecker* checker_ = nullptr;
0030 Connection* conn_ = nullptr;
0031 Logger logger_;
0032 asio::coroutine coro_{};
0033
0034 template <class Self>
0035 void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
0036 {
0037 BOOST_ASIO_CORO_REENTER (coro_) for (;;)
0038 {
0039 if (checker_->ping_interval_ == std::chrono::seconds::zero()) {
0040 logger_.trace("ping_op (1): timeout disabled.");
0041 BOOST_ASIO_CORO_YIELD
0042 asio::post(std::move(self));
0043 self.complete({});
0044 return;
0045 }
0046
0047 if (checker_->checker_has_exited_) {
0048 logger_.trace("ping_op (2): checker has exited.");
0049 self.complete({});
0050 return;
0051 }
0052
0053 BOOST_ASIO_CORO_YIELD
0054 conn_->async_exec(checker_->req_, any_adapter(checker_->resp_), std::move(self));
0055 if (ec) {
0056 logger_.trace("ping_op (3)", ec);
0057 checker_->wait_timer_.cancel();
0058 self.complete(ec);
0059 return;
0060 }
0061
0062
0063 checker_->ping_timer_.expires_after(checker_->ping_interval_);
0064
0065 BOOST_ASIO_CORO_YIELD
0066 checker_->ping_timer_.async_wait(std::move(self));
0067 if (ec) {
0068 logger_.trace("ping_op (4)", ec);
0069 self.complete(ec);
0070 return;
0071 }
0072 }
0073 }
0074 };
0075
0076 template <class HealthChecker, class Connection, class Logger>
0077 class check_timeout_op {
0078 public:
0079 HealthChecker* checker_ = nullptr;
0080 Connection* conn_ = nullptr;
0081 Logger logger_;
0082 asio::coroutine coro_{};
0083
0084 template <class Self>
0085 void operator()(Self& self, system::error_code ec = {})
0086 {
0087 BOOST_ASIO_CORO_REENTER (coro_) for (;;)
0088 {
0089 if (checker_->ping_interval_ == std::chrono::seconds::zero()) {
0090 logger_.trace("check_timeout_op (1): timeout disabled.");
0091 BOOST_ASIO_CORO_YIELD
0092 asio::post(std::move(self));
0093 self.complete({});
0094 return;
0095 }
0096
0097 checker_->wait_timer_.expires_after(2 * checker_->ping_interval_);
0098
0099 BOOST_ASIO_CORO_YIELD
0100 checker_->wait_timer_.async_wait(std::move(self));
0101 if (ec) {
0102 logger_.trace("check_timeout_op (2)", ec);
0103 self.complete(ec);
0104 return;
0105 }
0106
0107 if (checker_->resp_.has_error()) {
0108
0109 logger_.trace("check_timeout_op (3): Response error.");
0110 self.complete({});
0111 return;
0112 }
0113
0114 if (checker_->resp_.value().empty()) {
0115 logger_.trace("check_timeout_op (4): pong timeout.");
0116 checker_->ping_timer_.cancel();
0117 conn_->cancel(operation::run);
0118 checker_->checker_has_exited_ = true;
0119 self.complete(error::pong_timeout);
0120 return;
0121 }
0122
0123 if (checker_->resp_.has_value()) {
0124 checker_->resp_.value().clear();
0125 }
0126 }
0127 }
0128 };
0129
0130 template <class Executor>
0131 class health_checker {
0132 private:
0133 using timer_type =
0134 asio::basic_waitable_timer<
0135 std::chrono::steady_clock,
0136 asio::wait_traits<std::chrono::steady_clock>,
0137 Executor>;
0138
0139 public:
0140 health_checker(Executor ex)
0141 : ping_timer_{ex}
0142 , wait_timer_{ex}
0143 {
0144 req_.push("PING", "Boost.Redis");
0145 }
0146
0147 void set_config(config const& cfg)
0148 {
0149 req_.clear();
0150 req_.push("PING", cfg.health_check_id);
0151 ping_interval_ = cfg.health_check_interval;
0152 }
0153
0154 void cancel()
0155 {
0156 ping_timer_.cancel();
0157 wait_timer_.cancel();
0158 }
0159
0160 template <class Connection, class Logger, class CompletionToken>
0161 auto async_ping(Connection& conn, Logger l, CompletionToken token)
0162 {
0163 return asio::async_compose
0164 < CompletionToken
0165 , void(system::error_code)
0166 >(ping_op<health_checker, Connection, Logger>{this, &conn, l}, token, conn, ping_timer_);
0167 }
0168
0169 template <class Connection, class Logger, class CompletionToken>
0170 auto async_check_timeout(Connection& conn, Logger l, CompletionToken token)
0171 {
0172 checker_has_exited_ = false;
0173 return asio::async_compose
0174 < CompletionToken
0175 , void(system::error_code)
0176 >(check_timeout_op<health_checker, Connection, Logger>{this, &conn, l}, token, conn, wait_timer_);
0177 }
0178
0179 private:
0180 template <class, class, class> friend class ping_op;
0181 template <class, class, class> friend class check_timeout_op;
0182
0183 timer_type ping_timer_;
0184 timer_type wait_timer_;
0185 redis::request req_;
0186 redis::generic_response resp_;
0187 std::chrono::steady_clock::duration ping_interval_ = std::chrono::seconds{5};
0188 bool checker_has_exited_ = false;
0189 };
0190
0191 }
0192
0193 #endif