Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-17 08:50:07

0001 /* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
0002  *
0003  * Distributed under the Boost Software License, Version 1.0. (See
0004  * accompanying file LICENSE.txt)
0005  */
0006 
0007 #ifndef BOOST_REDIS_HEALTH_CHECKER_HPP
0008 #define BOOST_REDIS_HEALTH_CHECKER_HPP
0009 
0010 #include <boost/redis/request.hpp>
0011 #include <boost/redis/response.hpp>
0012 #include <boost/redis/operation.hpp>
0013 #include <boost/redis/adapter/any_adapter.hpp>
0014 #include <boost/redis/config.hpp>
0015 #include <boost/redis/operation.hpp>
0016 #include <boost/asio/steady_timer.hpp>
0017 #include <boost/asio/compose.hpp>
0018 #include <boost/asio/consign.hpp>
0019 #include <boost/asio/coroutine.hpp>
0020 #include <boost/asio/post.hpp>
0021 #include <memory>
0022 #include <chrono>
0023 
0024 namespace boost::redis::detail {
0025 
0026 template <class HealthChecker, class Connection, class Logger>
0027 class ping_op {
0028 public:
0029    HealthChecker* checker_ = nullptr;
0030    Connection* conn_ = nullptr;
0031    Logger logger_;
0032    asio::coroutine coro_{};
0033 
0034    template <class Self>
0035    void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
0036    {
0037       BOOST_ASIO_CORO_REENTER (coro_) for (;;)
0038       {
0039          if (checker_->ping_interval_ == std::chrono::seconds::zero()) {
0040             logger_.trace("ping_op (1): timeout disabled.");
0041             BOOST_ASIO_CORO_YIELD
0042             asio::post(std::move(self));
0043             self.complete({});
0044             return;
0045          }
0046 
0047          if (checker_->checker_has_exited_) {
0048             logger_.trace("ping_op (2): checker has exited.");
0049             self.complete({});
0050             return;
0051          }
0052 
0053          BOOST_ASIO_CORO_YIELD
0054          conn_->async_exec(checker_->req_, any_adapter(checker_->resp_), std::move(self));
0055          if (ec) {
0056             logger_.trace("ping_op (3)", ec);
0057             checker_->wait_timer_.cancel();
0058             self.complete(ec);
0059             return;
0060          }
0061 
0062          // Wait before pinging again.
0063          checker_->ping_timer_.expires_after(checker_->ping_interval_);
0064 
0065          BOOST_ASIO_CORO_YIELD
0066          checker_->ping_timer_.async_wait(std::move(self));
0067          if (ec) {
0068             logger_.trace("ping_op (4)", ec);
0069             self.complete(ec);
0070             return;
0071          }
0072       }
0073    }
0074 };
0075 
0076 template <class HealthChecker, class Connection, class Logger>
0077 class check_timeout_op {
0078 public:
0079    HealthChecker* checker_ = nullptr;
0080    Connection* conn_ = nullptr;
0081    Logger logger_;
0082    asio::coroutine coro_{};
0083 
0084    template <class Self>
0085    void operator()(Self& self, system::error_code ec = {})
0086    {
0087       BOOST_ASIO_CORO_REENTER (coro_) for (;;)
0088       {
0089          if (checker_->ping_interval_ == std::chrono::seconds::zero()) {
0090             logger_.trace("check_timeout_op (1): timeout disabled.");
0091             BOOST_ASIO_CORO_YIELD
0092             asio::post(std::move(self));
0093             self.complete({});
0094             return;
0095          }
0096 
0097          checker_->wait_timer_.expires_after(2 * checker_->ping_interval_);
0098 
0099          BOOST_ASIO_CORO_YIELD
0100          checker_->wait_timer_.async_wait(std::move(self));
0101          if (ec) {
0102             logger_.trace("check_timeout_op (2)", ec);
0103             self.complete(ec);
0104             return;
0105          }
0106 
0107          if (checker_->resp_.has_error()) {
0108             // TODO: Log the error.
0109             logger_.trace("check_timeout_op (3): Response error.");
0110             self.complete({});
0111             return;
0112          }
0113 
0114          if (checker_->resp_.value().empty()) {
0115             logger_.trace("check_timeout_op (4): pong timeout.");
0116             checker_->ping_timer_.cancel();
0117             conn_->cancel(operation::run);
0118             checker_->checker_has_exited_ = true;
0119             self.complete(error::pong_timeout);
0120             return;
0121          }
0122 
0123          if (checker_->resp_.has_value()) {
0124             checker_->resp_.value().clear();
0125          }
0126       }
0127    }
0128 };
0129 
0130 template <class Executor>
0131 class health_checker {
0132 private:
0133    using timer_type =
0134       asio::basic_waitable_timer<
0135          std::chrono::steady_clock,
0136          asio::wait_traits<std::chrono::steady_clock>,
0137          Executor>;
0138 
0139 public:
0140    health_checker(Executor ex)
0141    : ping_timer_{ex}
0142    , wait_timer_{ex}
0143    {
0144       req_.push("PING", "Boost.Redis");
0145    }
0146 
0147    void set_config(config const& cfg)
0148    {
0149       req_.clear();
0150       req_.push("PING", cfg.health_check_id);
0151       ping_interval_ = cfg.health_check_interval;
0152    }
0153 
0154    void cancel()
0155    {
0156       ping_timer_.cancel();
0157       wait_timer_.cancel();
0158    }
0159 
0160    template <class Connection, class Logger, class CompletionToken>
0161    auto async_ping(Connection& conn, Logger l, CompletionToken token)
0162    {
0163       return asio::async_compose
0164          < CompletionToken
0165          , void(system::error_code)
0166          >(ping_op<health_checker, Connection, Logger>{this, &conn, l}, token, conn, ping_timer_);
0167    }
0168 
0169    template <class Connection, class Logger, class CompletionToken>
0170    auto async_check_timeout(Connection& conn, Logger l, CompletionToken token)
0171    {
0172       checker_has_exited_ = false;
0173       return asio::async_compose
0174          < CompletionToken
0175          , void(system::error_code)
0176          >(check_timeout_op<health_checker, Connection, Logger>{this, &conn, l}, token, conn, wait_timer_);
0177    }
0178 
0179 private:
0180    template <class, class, class> friend class ping_op;
0181    template <class, class, class> friend class check_timeout_op;
0182 
0183    timer_type ping_timer_;
0184    timer_type wait_timer_;
0185    redis::request req_;
0186    redis::generic_response resp_;
0187    std::chrono::steady_clock::duration ping_interval_ = std::chrono::seconds{5};
0188    bool checker_has_exited_ = false;
0189 };
0190 
0191 } // boost::redis::detail
0192 
0193 #endif // BOOST_REDIS_HEALTH_CHECKER_HPP