File indexing completed on 2025-01-18 09:51:19
0001
0002
0003
0004
0005
0006
0007 #ifndef BOOST_REDIS_CONNECTOR_HPP
0008 #define BOOST_REDIS_CONNECTOR_HPP
0009
0010 #include <boost/redis/detail/helper.hpp>
0011 #include <boost/redis/error.hpp>
0012 #include <boost/asio/compose.hpp>
0013 #include <boost/asio/connect.hpp>
0014 #include <boost/asio/coroutine.hpp>
0015 #include <boost/asio/experimental/parallel_group.hpp>
0016 #include <boost/asio/ip/tcp.hpp>
0017 #include <boost/asio/steady_timer.hpp>
0018 #include <string>
0019 #include <chrono>
0020
0021 namespace boost::redis::detail
0022 {
0023
0024 template <class Connector, class Stream>
0025 struct connect_op {
0026 Connector* ctor_ = nullptr;
0027 Stream* stream = nullptr;
0028 asio::ip::tcp::resolver::results_type const* res_ = nullptr;
0029 asio::coroutine coro{};
0030
0031 template <class Self>
0032 void operator()( Self& self
0033 , std::array<std::size_t, 2> const& order = {}
0034 , system::error_code const& ec1 = {}
0035 , asio::ip::tcp::endpoint const& ep= {}
0036 , system::error_code const& ec2 = {})
0037 {
0038 BOOST_ASIO_CORO_REENTER (coro)
0039 {
0040 ctor_->timer_.expires_after(ctor_->timeout_);
0041
0042 BOOST_ASIO_CORO_YIELD
0043 asio::experimental::make_parallel_group(
0044 [this](auto token)
0045 {
0046 auto f = [](system::error_code const&, auto const&) { return true; };
0047 return asio::async_connect(*stream, *res_, f, token);
0048 },
0049 [this](auto token) { return ctor_->timer_.async_wait(token);}
0050 ).async_wait(
0051 asio::experimental::wait_for_one(),
0052 std::move(self));
0053
0054 if (is_cancelled(self)) {
0055 self.complete(asio::error::operation_aborted);
0056 return;
0057 }
0058
0059 switch (order[0]) {
0060 case 0: {
0061 ctor_->endpoint_ = ep;
0062 self.complete(ec1);
0063 } break;
0064 case 1:
0065 {
0066 if (ec2) {
0067 self.complete(ec2);
0068 } else {
0069 self.complete(error::connect_timeout);
0070 }
0071 } break;
0072
0073 default: BOOST_ASSERT(false);
0074 }
0075 }
0076 }
0077 };
0078
0079 template <class Executor>
0080 class connector {
0081 public:
0082 using timer_type =
0083 asio::basic_waitable_timer<
0084 std::chrono::steady_clock,
0085 asio::wait_traits<std::chrono::steady_clock>,
0086 Executor>;
0087
0088 connector(Executor ex)
0089 : timer_{ex}
0090 {}
0091
0092 void set_config(config const& cfg)
0093 { timeout_ = cfg.connect_timeout; }
0094
0095 template <class Stream, class CompletionToken>
0096 auto
0097 async_connect(
0098 Stream& stream,
0099 asio::ip::tcp::resolver::results_type const& res,
0100 CompletionToken&& token)
0101 {
0102 return asio::async_compose
0103 < CompletionToken
0104 , void(system::error_code)
0105 >(connect_op<connector, Stream>{this, &stream, &res}, token, timer_);
0106 }
0107
0108 std::size_t cancel(operation op)
0109 {
0110 switch (op) {
0111 case operation::connect:
0112 case operation::all:
0113 timer_.cancel();
0114 break;
0115 default: ;
0116 }
0117
0118 return 0;
0119 }
0120
0121 auto const& endpoint() const noexcept { return endpoint_;}
0122
0123 private:
0124 template <class, class> friend struct connect_op;
0125
0126 timer_type timer_;
0127 std::chrono::steady_clock::duration timeout_ = std::chrono::seconds{2};
0128 asio::ip::tcp::endpoint endpoint_;
0129 };
0130
0131 }
0132
0133 #endif