Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:51:19

0001 /* Copyright (c) 2018-2023 Marcelo Zimbres Silva (mzimbres@gmail.com)
0002  *
0003  * Distributed under the Boost Software License, Version 1.0. (See
0004  * accompanying file LICENSE.txt)
0005  */
0006 
0007 #ifndef BOOST_REDIS_HEALTH_CHECKER_HPP
0008 #define BOOST_REDIS_HEALTH_CHECKER_HPP
0009 
0010 // Has to included before promise.hpp to build on msvc.
0011 #include <boost/redis/request.hpp>
0012 #include <boost/redis/response.hpp>
0013 #include <boost/redis/operation.hpp>
0014 #include <boost/redis/detail/helper.hpp>
0015 #include <boost/redis/config.hpp>
0016 #include <boost/asio/steady_timer.hpp>
0017 #include <boost/asio/compose.hpp>
0018 #include <boost/asio/consign.hpp>
0019 #include <boost/asio/coroutine.hpp>
0020 #include <boost/asio/post.hpp>
0021 #include <boost/asio/experimental/parallel_group.hpp>
0022 #include <memory>
0023 #include <chrono>
0024 
0025 namespace boost::redis::detail {
0026 
0027 template <class HealthChecker, class Connection, class Logger>
0028 class ping_op {
0029 public:
0030    HealthChecker* checker_ = nullptr;
0031    Connection* conn_ = nullptr;
0032    Logger logger_;
0033    asio::coroutine coro_{};
0034 
0035    template <class Self>
0036    void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
0037    {
0038       BOOST_ASIO_CORO_REENTER (coro_) for (;;)
0039       {
0040          if (checker_->checker_has_exited_) {
0041             logger_.trace("ping_op: checker has exited. Exiting ...");
0042             self.complete({});
0043             return;
0044          }
0045 
0046          BOOST_ASIO_CORO_YIELD
0047          conn_->async_exec(checker_->req_, checker_->resp_, std::move(self));
0048          if (ec || is_cancelled(self)) {
0049             logger_.trace("ping_op: error/cancelled (1).");
0050             checker_->wait_timer_.cancel();
0051             self.complete(!!ec ? ec : asio::error::operation_aborted);
0052             return;
0053          }
0054 
0055          // Wait before pinging again.
0056          checker_->ping_timer_.expires_after(checker_->ping_interval_);
0057          BOOST_ASIO_CORO_YIELD
0058          checker_->ping_timer_.async_wait(std::move(self));
0059          if (ec || is_cancelled(self)) {
0060             logger_.trace("ping_op: error/cancelled (2).");
0061             self.complete(!!ec ? ec : asio::error::operation_aborted);
0062             return;
0063          }
0064       }
0065    }
0066 };
0067 
0068 template <class HealthChecker, class Connection, class Logger>
0069 class check_timeout_op {
0070 public:
0071    HealthChecker* checker_ = nullptr;
0072    Connection* conn_ = nullptr;
0073    Logger logger_;
0074    asio::coroutine coro_{};
0075 
0076    template <class Self>
0077    void operator()(Self& self, system::error_code ec = {})
0078    {
0079       BOOST_ASIO_CORO_REENTER (coro_) for (;;)
0080       {
0081          checker_->wait_timer_.expires_after(2 * checker_->ping_interval_);
0082          BOOST_ASIO_CORO_YIELD
0083          checker_->wait_timer_.async_wait(std::move(self));
0084          if (ec || is_cancelled(self)) {
0085             logger_.trace("check-timeout-op: error/canceled. Exiting ...");
0086             self.complete(!!ec ? ec : asio::error::operation_aborted);
0087             return;
0088          }
0089 
0090          if (checker_->resp_.has_error()) {
0091             logger_.trace("check-timeout-op: Response error. Exiting ...");
0092             self.complete({});
0093             return;
0094          }
0095 
0096          if (checker_->resp_.value().empty()) {
0097             logger_.trace("check-timeout-op: Response has no value. Exiting ...");
0098             checker_->ping_timer_.cancel();
0099             conn_->cancel(operation::run);
0100             checker_->checker_has_exited_ = true;
0101             self.complete(error::pong_timeout);
0102             return;
0103          }
0104 
0105          if (checker_->resp_.has_value()) {
0106             checker_->resp_.value().clear();
0107          }
0108       }
0109    }
0110 };
0111 
0112 template <class HealthChecker, class Connection, class Logger>
0113 class check_health_op {
0114 public:
0115    HealthChecker* checker_ = nullptr;
0116    Connection* conn_ = nullptr;
0117    Logger logger_;
0118    asio::coroutine coro_{};
0119 
0120    template <class Self>
0121    void
0122    operator()(
0123          Self& self,
0124          std::array<std::size_t, 2> order = {},
0125          system::error_code ec1 = {},
0126          system::error_code ec2 = {})
0127    {
0128       BOOST_ASIO_CORO_REENTER (coro_)
0129       {
0130          if (checker_->ping_interval_ == std::chrono::seconds::zero()) {
0131             logger_.trace("check-health-op: timeout disabled.");
0132             BOOST_ASIO_CORO_YIELD
0133             asio::post(std::move(self));
0134             self.complete({});
0135             return;
0136          }
0137 
0138          BOOST_ASIO_CORO_YIELD
0139          asio::experimental::make_parallel_group(
0140             [this](auto token) { return checker_->async_ping(*conn_, logger_, token); },
0141             [this](auto token) { return checker_->async_check_timeout(*conn_, logger_, token);}
0142          ).async_wait(
0143             asio::experimental::wait_for_one(),
0144             std::move(self));
0145 
0146          logger_.on_check_health(ec1, ec2);
0147 
0148          if (is_cancelled(self)) {
0149             logger_.trace("check-health-op: canceled. Exiting ...");
0150             self.complete(asio::error::operation_aborted);
0151             return;
0152          }
0153 
0154          switch (order[0]) {
0155             case 0: self.complete(ec1); return;
0156             case 1: self.complete(ec2); return;
0157             default: BOOST_ASSERT(false);
0158          }
0159       }
0160    }
0161 };
0162 
0163 template <class Executor>
0164 class health_checker {
0165 private:
0166    using timer_type =
0167       asio::basic_waitable_timer<
0168          std::chrono::steady_clock,
0169          asio::wait_traits<std::chrono::steady_clock>,
0170          Executor>;
0171 
0172 public:
0173    health_checker(Executor ex)
0174    : ping_timer_{ex}
0175    , wait_timer_{ex}
0176    {
0177       req_.push("PING", "Boost.Redis");
0178    }
0179 
0180    void set_config(config const& cfg)
0181    {
0182       req_.clear();
0183       req_.push("PING", cfg.health_check_id);
0184       ping_interval_ = cfg.health_check_interval;
0185    }
0186 
0187    template <
0188       class Connection,
0189       class Logger,
0190       class CompletionToken = asio::default_completion_token_t<Executor>
0191    >
0192    auto
0193    async_check_health(
0194       Connection& conn,
0195       Logger l,
0196       CompletionToken token = CompletionToken{})
0197    {
0198       checker_has_exited_ = false;
0199       return asio::async_compose
0200          < CompletionToken
0201          , void(system::error_code)
0202          >(check_health_op<health_checker, Connection, Logger>{this, &conn, l}, token, conn);
0203    }
0204 
0205    std::size_t cancel(operation op)
0206    {
0207       switch (op) {
0208          case operation::health_check:
0209          case operation::all:
0210             ping_timer_.cancel();
0211             wait_timer_.cancel();
0212             break;
0213          default: /* ignore */;
0214       }
0215 
0216       return 0;
0217    }
0218 
0219 private:
0220    template <class Connection, class Logger, class CompletionToken>
0221    auto async_ping(Connection& conn, Logger l, CompletionToken token)
0222    {
0223       return asio::async_compose
0224          < CompletionToken
0225          , void(system::error_code)
0226          >(ping_op<health_checker, Connection, Logger>{this, &conn, l}, token, conn, ping_timer_);
0227    }
0228 
0229    template <class Connection, class Logger, class CompletionToken>
0230    auto async_check_timeout(Connection& conn, Logger l, CompletionToken token)
0231    {
0232       return asio::async_compose
0233          < CompletionToken
0234          , void(system::error_code)
0235          >(check_timeout_op<health_checker, Connection, Logger>{this, &conn, l}, token, conn, wait_timer_);
0236    }
0237 
0238    template <class, class, class> friend class ping_op;
0239    template <class, class, class> friend class check_timeout_op;
0240    template <class, class, class> friend class check_health_op;
0241 
0242    timer_type ping_timer_;
0243    timer_type wait_timer_;
0244    redis::request req_;
0245    redis::generic_response resp_;
0246    std::chrono::steady_clock::duration ping_interval_ = std::chrono::seconds{5};
0247    bool checker_has_exited_ = false;
0248 };
0249 
0250 } // boost::redis::detail
0251 
0252 #endif // BOOST_REDIS_HEALTH_CHECKER_HPP