Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-12-16 10:08:25

0001 /* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
0002  *
0003  * Distributed under the Boost Software License, Version 1.0. (See
0004  * accompanying file LICENSE.txt)
0005  */
0006 
0007 #ifndef BOOST_REDIS_CONNECTION_HPP
0008 #define BOOST_REDIS_CONNECTION_HPP
0009 
0010 #include <boost/redis/adapter/adapt.hpp>
0011 #include <boost/redis/adapter/any_adapter.hpp>
0012 #include <boost/redis/config.hpp>
0013 #include <boost/redis/detail/connector.hpp>
0014 #include <boost/redis/detail/health_checker.hpp>
0015 #include <boost/redis/detail/helper.hpp>
0016 #include <boost/redis/detail/resolver.hpp>
0017 #include <boost/redis/detail/resp3_handshaker.hpp>
0018 #include <boost/redis/error.hpp>
0019 #include <boost/redis/logger.hpp>
0020 #include <boost/redis/operation.hpp>
0021 #include <boost/redis/request.hpp>
0022 #include <boost/redis/resp3/type.hpp>
0023 #include <boost/redis/usage.hpp>
0024 
0025 #include <boost/asio/any_completion_handler.hpp>
0026 #include <boost/asio/any_io_executor.hpp>
0027 #include <boost/asio/associated_immediate_executor.hpp>
0028 #include <boost/asio/basic_stream_socket.hpp>
0029 #include <boost/asio/bind_executor.hpp>
0030 #include <boost/asio/buffer.hpp>
0031 #include <boost/asio/cancel_after.hpp>
0032 #include <boost/asio/coroutine.hpp>
0033 #include <boost/asio/deferred.hpp>
0034 #include <boost/asio/experimental/channel.hpp>
0035 #include <boost/asio/experimental/parallel_group.hpp>
0036 #include <boost/asio/io_context.hpp>
0037 #include <boost/asio/ip/tcp.hpp>
0038 #include <boost/asio/prepend.hpp>
0039 #include <boost/asio/read_until.hpp>
0040 #include <boost/asio/ssl/stream.hpp>
0041 #include <boost/asio/steady_timer.hpp>
0042 #include <boost/asio/write.hpp>
0043 #include <boost/assert.hpp>
0044 
0045 #include <boost/core/ignore_unused.hpp>
0046 
0047 #include <algorithm>
0048 #include <array>
0049 #include <chrono>
0050 #include <cstddef>
0051 #include <deque>
0052 #include <functional>
0053 #include <limits>
0054 #include <memory>
0055 #include <string_view>
0056 #include <utility>
0057 
0058 namespace boost::redis {
0059 namespace detail
0060 {
0061 
0062 template <class DynamicBuffer>
0063 std::string_view buffer_view(DynamicBuffer buf) noexcept
0064 {
0065    char const* start = static_cast<char const*>(buf.data(0, buf.size()).data());
0066    return std::string_view{start, std::size(buf)};
0067 }
0068 
0069 template <class AsyncReadStream, class DynamicBuffer>
0070 class append_some_op {
0071 private:
0072    AsyncReadStream& stream_;
0073    DynamicBuffer buf_;
0074    std::size_t size_ = 0;
0075    std::size_t tmp_ = 0;
0076    asio::coroutine coro_{};
0077 
0078 public:
0079    append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
0080    : stream_ {stream}
0081    , buf_ {std::move(buf)}
0082    , size_{size}
0083    { }
0084 
0085    template <class Self>
0086    void operator()( Self& self
0087                   , system::error_code ec = {}
0088                   , std::size_t n = 0)
0089    {
0090       BOOST_ASIO_CORO_REENTER (coro_)
0091       {
0092          tmp_ = buf_.size();
0093          buf_.grow(size_);
0094 
0095          BOOST_ASIO_CORO_YIELD
0096          stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
0097          if (ec) {
0098             self.complete(ec, 0);
0099             return;
0100          }
0101 
0102          buf_.shrink(buf_.size() - tmp_ - n);
0103          self.complete({}, n);
0104       }
0105    }
0106 };
0107 
0108 template <class AsyncReadStream, class DynamicBuffer, class CompletionToken>
0109 auto
0110 async_append_some(
0111    AsyncReadStream& stream,
0112    DynamicBuffer buffer,
0113    std::size_t size,
0114    CompletionToken&& token)
0115 {
0116    return asio::async_compose
0117       < CompletionToken
0118       , void(system::error_code, std::size_t)
0119       >(append_some_op<AsyncReadStream, DynamicBuffer> {stream, buffer, size}, token, stream);
0120 }
0121 
0122 template <class Conn>
0123 struct exec_op {
0124    using req_info_type = typename Conn::req_info;
0125    using adapter_type = typename Conn::adapter_type;
0126 
0127    Conn* conn_ = nullptr;
0128    std::shared_ptr<req_info_type> info_ = nullptr;
0129    asio::coroutine coro{};
0130 
0131    template <class Self>
0132    void operator()(Self& self , system::error_code = {}, std::size_t = 0)
0133    {
0134       BOOST_ASIO_CORO_REENTER (coro)
0135       {
0136          // Check whether the user wants to wait for the connection to
0137          // be stablished.
0138          if (info_->req_->get_config().cancel_if_not_connected && !conn_->is_open()) {
0139             BOOST_ASIO_CORO_YIELD
0140             asio::dispatch(
0141                asio::get_associated_immediate_executor(self, self.get_io_executor()),
0142                std::move(self));
0143             return self.complete(error::not_connected, 0);
0144          }
0145 
0146          conn_->add_request_info(info_);
0147 
0148 EXEC_OP_WAIT:
0149          BOOST_ASIO_CORO_YIELD
0150          info_->async_wait(std::move(self));
0151 
0152          if (info_->ec_) {
0153             self.complete(info_->ec_, 0);
0154             return;
0155          }
0156 
0157          if (info_->stop_requested()) {
0158             // Don't have to call remove_request as it has already
0159             // been by cancel(exec).
0160             return self.complete(asio::error::operation_aborted, 0);
0161          }
0162 
0163          if (is_cancelled(self)) {
0164             if (!info_->is_waiting()) {
0165                using c_t = asio::cancellation_type;
0166                auto const c = self.get_cancellation_state().cancelled();
0167                if ((c & c_t::terminal) != c_t::none) {
0168                   // Cancellation requires closing the connection
0169                   // otherwise it stays in inconsistent state.
0170                   conn_->cancel(operation::run);
0171                   return self.complete(asio::error::operation_aborted, 0);
0172                } else {
0173                   // Can't implement other cancelation types, ignoring.
0174                   self.get_cancellation_state().clear();
0175 
0176                   // TODO: Find out a better way to ignore
0177                   // cancelation.
0178                   goto EXEC_OP_WAIT;
0179                }
0180             } else {
0181                // Cancelation can be honored.
0182                conn_->remove_request(info_);
0183                self.complete(asio::error::operation_aborted, 0);
0184                return;
0185             }
0186          }
0187 
0188          self.complete(info_->ec_, info_->read_size_);
0189       }
0190    }
0191 };
0192 
0193 template <class Conn, class Logger>
0194 struct writer_op {
0195    Conn* conn_;
0196    Logger logger_;
0197    asio::coroutine coro{};
0198 
0199    template <class Self>
0200    void operator()( Self& self
0201                   , system::error_code ec = {}
0202                   , std::size_t n = 0)
0203    {
0204       ignore_unused(n);
0205 
0206       BOOST_ASIO_CORO_REENTER (coro) for (;;)
0207       {
0208          while (conn_->coalesce_requests()) {
0209             if (conn_->use_ssl())
0210                BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
0211             else
0212                BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer().next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
0213 
0214             logger_.on_write(ec, conn_->write_buffer_);
0215 
0216             if (ec) {
0217                logger_.trace("writer_op (1)", ec);
0218                conn_->cancel(operation::run);
0219                self.complete(ec);
0220                return;
0221             }
0222 
0223             conn_->on_write();
0224 
0225             // A socket.close() may have been called while a
0226             // successful write might had already been queued, so we
0227             // have to check here before proceeding.
0228             if (!conn_->is_open()) {
0229                logger_.trace("writer_op (2): connection is closed.");
0230                self.complete({});
0231                return;
0232             }
0233          }
0234 
0235          BOOST_ASIO_CORO_YIELD
0236          conn_->writer_timer_.async_wait(std::move(self));
0237          if (!conn_->is_open()) {
0238             logger_.trace("writer_op (3): connection is closed.");
0239             // Notice this is not an error of the op, stoping was
0240             // requested from the outside, so we complete with
0241             // success.
0242             self.complete({});
0243             return;
0244          }
0245       }
0246    }
0247 };
0248 
0249 template <class Conn, class Logger>
0250 struct reader_op {
0251    using parse_result = typename Conn::parse_result;
0252    using parse_ret_type = typename Conn::parse_ret_type;
0253    Conn* conn_;
0254    Logger logger_;
0255    parse_ret_type res_{parse_result::resp, 0};
0256    asio::coroutine coro{};
0257 
0258    template <class Self>
0259    void operator()( Self& self
0260                   , system::error_code ec = {}
0261                   , std::size_t n = 0)
0262    {
0263       ignore_unused(n);
0264 
0265       BOOST_ASIO_CORO_REENTER (coro) for (;;)
0266       {
0267          // Appends some data to the buffer if necessary.
0268          if ((res_.first == parse_result::needs_more) || std::empty(conn_->read_buffer_)) {
0269             if (conn_->use_ssl()) {
0270                BOOST_ASIO_CORO_YIELD
0271                async_append_some(
0272                   conn_->next_layer(),
0273                   conn_->dbuf_,
0274                   conn_->get_suggested_buffer_growth(),
0275                   std::move(self));
0276             } else {
0277                BOOST_ASIO_CORO_YIELD
0278                async_append_some(
0279                   conn_->next_layer().next_layer(),
0280                   conn_->dbuf_,
0281                   conn_->get_suggested_buffer_growth(),
0282                   std::move(self));
0283             }
0284 
0285             logger_.on_read(ec, n);
0286 
0287             // The connection is not viable after an error.
0288             if (ec) {
0289                logger_.trace("reader_op (1)", ec);
0290                conn_->cancel(operation::run);
0291                self.complete(ec);
0292                return;
0293             }
0294 
0295             // Somebody might have canceled implicitly or explicitly
0296             // while we were suspended and after queueing so we have to
0297             // check.
0298             if (!conn_->is_open()) {
0299                logger_.trace("reader_op (2): connection is closed.");
0300                self.complete(ec);
0301                return;
0302             }
0303          }
0304 
0305          res_ = conn_->on_read(buffer_view(conn_->dbuf_), ec);
0306          if (ec) {
0307             logger_.trace("reader_op (3)", ec);
0308             conn_->cancel(operation::run);
0309             self.complete(ec);
0310             return;
0311          }
0312 
0313          if (res_.first == parse_result::push) {
0314             if (!conn_->receive_channel_.try_send(ec, res_.second)) {
0315                BOOST_ASIO_CORO_YIELD
0316                conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
0317             }
0318 
0319             if (ec) {
0320                logger_.trace("reader_op (4)", ec);
0321                conn_->cancel(operation::run);
0322                self.complete(ec);
0323                return;
0324             }
0325 
0326             if (!conn_->is_open()) {
0327                logger_.trace("reader_op (5): connection is closed.");
0328                self.complete(asio::error::operation_aborted);
0329                return;
0330             }
0331 
0332          }
0333       }
0334    }
0335 };
0336 
0337 template <class Conn, class Logger>
0338 class run_op {
0339 private:
0340    Conn* conn_ = nullptr;
0341    Logger logger_;
0342    asio::coroutine coro_{};
0343 
0344    using order_t = std::array<std::size_t, 5>;
0345 
0346 public:
0347    run_op(Conn* conn, Logger l)
0348    : conn_{conn}
0349    , logger_{l}
0350    {}
0351 
0352    template <class Self>
0353    void operator()( Self& self
0354                   , order_t order = {}
0355                   , system::error_code ec0 = {}
0356                   , system::error_code ec1 = {}
0357                   , system::error_code ec2 = {}
0358                   , system::error_code ec3 = {}
0359                   , system::error_code ec4 = {})
0360    {
0361       BOOST_ASIO_CORO_REENTER (coro_) for (;;)
0362       {
0363          BOOST_ASIO_CORO_YIELD
0364          conn_->resv_.async_resolve(asio::prepend(std::move(self), order_t {}));
0365 
0366          logger_.on_resolve(ec0, conn_->resv_.results());
0367 
0368          if (ec0) {
0369             self.complete(ec0);
0370             return;
0371          }
0372 
0373          BOOST_ASIO_CORO_YIELD
0374          conn_->ctor_.async_connect(
0375             conn_->next_layer().next_layer(),
0376             conn_->resv_.results(),
0377             asio::prepend(std::move(self), order_t {}));
0378 
0379          logger_.on_connect(ec0, conn_->ctor_.endpoint());
0380 
0381          if (ec0) {
0382             self.complete(ec0);
0383             return;
0384          }
0385 
0386          if (conn_->use_ssl()) {
0387             BOOST_ASIO_CORO_YIELD
0388             conn_->next_layer().async_handshake(
0389                asio::ssl::stream_base::client,
0390                asio::prepend(
0391                   asio::cancel_after(
0392                      conn_->cfg_.ssl_handshake_timeout,
0393                      std::move(self)
0394                   ),
0395                   order_t {}
0396                )
0397             );
0398 
0399             logger_.on_ssl_handshake(ec0);
0400 
0401             if (ec0) {
0402                self.complete(ec0);
0403                return;
0404             }
0405          }
0406 
0407          conn_->reset();
0408 
0409          // Note: Order is important here because the writer might
0410          // trigger an async_write before the async_hello thereby
0411          // causing an authentication problem.
0412          BOOST_ASIO_CORO_YIELD
0413          asio::experimental::make_parallel_group(
0414             [this](auto token) { return conn_->handshaker_.async_hello(*conn_, logger_, token); },
0415             [this](auto token) { return conn_->health_checker_.async_ping(*conn_, logger_, token); },
0416             [this](auto token) { return conn_->health_checker_.async_check_timeout(*conn_, logger_, token);},
0417             [this](auto token) { return conn_->reader(logger_, token);},
0418             [this](auto token) { return conn_->writer(logger_, token);}
0419          ).async_wait(
0420             asio::experimental::wait_for_one_error(),
0421             std::move(self));
0422 
0423          if (order[0] == 0 && !!ec0) {
0424             self.complete(ec0);
0425             return;
0426          }
0427 
0428          if (order[0] == 2 && ec2 == error::pong_timeout) {
0429             self.complete(ec1);
0430             return;
0431          }
0432 
0433          // The receive operation must be cancelled because channel
0434          // subscription does not survive a reconnection but requires
0435          // re-subscription.
0436          conn_->cancel(operation::receive);
0437 
0438          if (!conn_->will_reconnect()) {
0439             conn_->cancel(operation::reconnection);
0440             self.complete(ec3);
0441             return;
0442          }
0443 
0444          // It is safe to use the writer timer here because we are not
0445          // connected.
0446          conn_->writer_timer_.expires_after(conn_->cfg_.reconnect_wait_interval);
0447 
0448          BOOST_ASIO_CORO_YIELD
0449          conn_->writer_timer_.async_wait(asio::prepend(std::move(self), order_t {}));
0450          if (ec0) {
0451             self.complete(ec0);
0452             return;
0453          }
0454 
0455          if (!conn_->will_reconnect()) {
0456             self.complete(asio::error::operation_aborted);
0457             return;
0458          }
0459 
0460          conn_->reset_stream();
0461       }
0462    }
0463 };
0464 
0465 } // boost::redis::detail
0466 
0467 /** @brief A SSL connection to the Redis server.
0468  *  @ingroup high-level-api
0469  *
0470  *  This class keeps a healthy connection to the Redis instance where
0471  *  commands can be sent at any time. For more details, please see the
0472  *  documentation of each individual function.
0473  *
0474  *  @tparam Socket The socket type e.g. asio::ip::tcp::socket.
0475  *
0476  */
0477 template <class Executor>
0478 class basic_connection {
0479 public:
0480    using this_type = basic_connection<Executor>;
0481 
0482    /// Type of the next layer
0483    using next_layer_type = asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>>;
0484 
0485    /// Executor type
0486    using executor_type = Executor;
0487 
0488    /// Returns the associated executor.
0489    executor_type get_executor() noexcept
0490       {return writer_timer_.get_executor();}
0491 
0492    /// Rebinds the socket type to another executor.
0493    template <class Executor1>
0494    struct rebind_executor
0495    {
0496       /// The connection type when rebound to the specified executor.
0497       using other = basic_connection<Executor1>;
0498    };
0499 
0500    /** @brief Constructor
0501     *
0502     *  @param ex Executor on which connection operation will run.
0503     *  @param ctx SSL context.
0504     *  @param max_read_size Maximum read size that is passed to
0505     *  the internal `asio::dynamic_buffer` constructor.
0506     */
0507    explicit
0508    basic_connection(
0509       executor_type ex,
0510       asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
0511       std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
0512    : ctx_{std::move(ctx)}
0513    , stream_{std::make_unique<next_layer_type>(ex, ctx_)}
0514    , writer_timer_{ex}
0515    , receive_channel_{ex, 256}
0516    , resv_{ex}
0517    , health_checker_{ex}
0518    , dbuf_{read_buffer_, max_read_size}
0519    {
0520       set_receive_response(ignore);
0521       writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)());
0522    }
0523 
0524    /// Constructs from a context.
0525    explicit
0526    basic_connection(
0527       asio::io_context& ioc,
0528       asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
0529       std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
0530    : basic_connection(ioc.get_executor(), std::move(ctx), max_read_size)
0531    { }
0532 
0533    /** @brief Starts underlying connection operations.
0534     *
0535     *  This member function provides the following functionality
0536     *
0537     *  1. Resolve the address passed on `boost::redis::config::addr`.
0538     *  2. Connect to one of the results obtained in the resolve operation.
0539     *  3. Send a [HELLO](https://redis.io/commands/hello/) command where each of its parameters are read from `cfg`.
0540     *  4. Start a health-check operation where ping commands are sent
0541     *     at intervals specified in
0542     *     `boost::redis::config::health_check_interval`.  The message passed to
0543     *     `PING` will be `boost::redis::config::health_check_id`.  Passing a
0544     *     timeout with value zero will disable health-checks.  If the Redis
0545     *     server does not respond to a health-check within two times the value
0546     *     specified here, it will be considered unresponsive and the connection
0547     *     will be closed and a new connection will be stablished.
0548     *  5. Starts read and write operations with the Redis
0549     *  server. More specifically it will trigger the write of all
0550     *  requests i.e. calls to `async_exec` that happened prior to this
0551     *  call.
0552     *
0553     *  When a connection is lost for any reason, a new one is
0554     *  stablished automatically. To disable reconnection call
0555     *  `boost::redis::connection::cancel(operation::reconnection)`.
0556     *
0557     *  @param cfg Configuration paramters.
0558     *  @param l Logger object. The interface expected is specified in the class `boost::redis::logger`.
0559     *  @param token Completion token.
0560     *
0561     *  The completion token must have the following signature
0562     *
0563     *  @code
0564     *  void f(system::error_code);
0565     *  @endcode
0566     *
0567     *  For example on how to call this function refer to
0568     *  cpp20_intro.cpp or any other example.
0569     */
0570    template <
0571       class Logger = logger,
0572       class CompletionToken = asio::default_completion_token_t<executor_type>>
0573    auto
0574    async_run(
0575       config const& cfg = {},
0576       Logger l = Logger{},
0577       CompletionToken&& token = {})
0578    {
0579       cfg_ = cfg;
0580       resv_.set_config(cfg);
0581       ctor_.set_config(cfg);
0582       health_checker_.set_config(cfg);
0583       handshaker_.set_config(cfg);
0584       l.set_prefix(cfg.log_prefix);
0585 
0586       return asio::async_compose
0587          < CompletionToken
0588          , void(system::error_code)
0589          >(detail::run_op<this_type, Logger>{this, l}, token, writer_timer_);
0590    }
0591 
0592    /** @brief Receives server side pushes asynchronously.
0593     *
0594     *  When pushes arrive and there is no `async_receive` operation in
0595     *  progress, pushed data, requests, and responses will be paused
0596     *  until `async_receive` is called again.  Apps will usually want
0597     *  to call `async_receive` in a loop. 
0598     *
0599     *  To cancel an ongoing receive operation apps should call
0600     *  `connection::cancel(operation::receive)`.
0601     *
0602     *  @param token Completion token.
0603     *
0604     *  For an example see cpp20_subscriber.cpp. The completion token must
0605     *  have the following signature
0606     *
0607     *  @code
0608     *  void f(system::error_code, std::size_t);
0609     *  @endcode
0610     *
0611     *  Where the second parameter is the size of the push received in
0612     *  bytes.
0613     */
0614    template <class CompletionToken = asio::default_completion_token_t<executor_type>>
0615    auto async_receive(CompletionToken&& token = {})
0616       { return receive_channel_.async_receive(std::forward<CompletionToken>(token)); }
0617 
0618    /** @brief Receives server pushes synchronously without blocking.
0619     *
0620     *  Receives a server push synchronously by calling `try_receive` on
0621     *  the underlying channel. If the operation fails because
0622     *  `try_receive` returns `false`, `ec` will be set to
0623     *  `boost::redis::error::sync_receive_push_failed`.
0624     *
0625     *  @param ec Contains the error if any occurred.
0626     *
0627     *  @returns The number of bytes read from the socket.
0628     */
0629    std::size_t receive(system::error_code& ec)
0630    {
0631       std::size_t size = 0;
0632 
0633       auto f = [&](system::error_code const& ec2, std::size_t n)
0634       {
0635          ec = ec2;
0636          size = n;
0637       };
0638 
0639       auto const res = receive_channel_.try_receive(f);
0640       if (ec)
0641          return 0;
0642 
0643       if (!res)
0644          ec = error::sync_receive_push_failed;
0645 
0646       return size;
0647    }
0648 
0649    /** @brief Executes commands on the Redis server asynchronously.
0650     *
0651     *  This function sends a request to the Redis server and waits for
0652     *  the responses to each individual command in the request. If the
0653     *  request contains only commands that don't expect a response,
0654     *  the completion occurs after it has been written to the
0655     *  underlying stream.  Multiple concurrent calls to this function
0656     *  will be automatically queued by the implementation.
0657     *
0658     *  @param req Request.
0659     *  @param resp Response.
0660     *  @param token Completion token.
0661     *
0662     *  For an example see cpp20_echo_server.cpp. The completion token must
0663     *  have the following signature
0664     *
0665     *  @code
0666     *  void f(system::error_code, std::size_t);
0667     *  @endcode
0668     *
0669     *  Where the second parameter is the size of the response received
0670     *  in bytes.
0671     */
0672    template <
0673       class Response = ignore_t,
0674       class CompletionToken = asio::default_completion_token_t<executor_type>
0675    >
0676    auto
0677    async_exec(
0678       request const& req,
0679       Response& resp = ignore,
0680       CompletionToken&& token = {})
0681    {
0682       return this->async_exec(req, any_adapter(resp), std::forward<CompletionToken>(token));
0683    }
0684 
0685    /** @copydoc async_exec
0686     * 
0687     * @details This function uses the type-erased @ref any_adapter class, which
0688     * encapsulates a reference to a response object.
0689     */
0690    template <class CompletionToken = asio::default_completion_token_t<executor_type>>
0691    auto
0692    async_exec(
0693       request const& req,
0694       any_adapter adapter,
0695       CompletionToken&& token = {})
0696    {
0697       auto& adapter_impl = adapter.impl_;
0698       BOOST_ASSERT_MSG(req.get_expected_responses() <= adapter_impl.supported_response_size, "Request and response have incompatible sizes.");
0699 
0700       auto info = std::make_shared<req_info>(req, std::move(adapter_impl.adapt_fn), get_executor());
0701 
0702       return asio::async_compose
0703          < CompletionToken
0704          , void(system::error_code, std::size_t)
0705          >(detail::exec_op<this_type>{this, info}, token, writer_timer_);
0706    }
0707 
0708    /** @brief Cancel operations.
0709     *
0710     *  @li `operation::exec`: Cancels operations started with
0711     *  `async_exec`. Affects only requests that haven't been written
0712     *  yet.
0713     *  @li operation::run: Cancels the `async_run` operation.
0714     *  @li operation::receive: Cancels any ongoing calls to `async_receive`.
0715     *  @li operation::all: Cancels all operations listed above.
0716     *
0717     *  @param op: The operation to be cancelled.
0718     */
0719    void cancel(operation op = operation::all)
0720    {
0721       switch (op) {
0722          case operation::resolve:
0723             resv_.cancel();
0724             break;
0725          case operation::exec:
0726             cancel_unwritten_requests();
0727             break;
0728          case operation::reconnection:
0729             cfg_.reconnect_wait_interval = std::chrono::seconds::zero();
0730             break;
0731          case operation::run:
0732             cancel_run();
0733             break;
0734          case operation::receive:
0735             receive_channel_.cancel();
0736             break;
0737          case operation::health_check:
0738             health_checker_.cancel();
0739             break;
0740          case operation::all:
0741             resv_.cancel();
0742             cfg_.reconnect_wait_interval = std::chrono::seconds::zero();
0743             health_checker_.cancel();
0744             cancel_run(); // run
0745             receive_channel_.cancel(); // receive
0746             cancel_unwritten_requests(); // exec
0747             break;
0748          default: /* ignore */;
0749       }
0750    }
0751 
0752    auto run_is_canceled() const noexcept
0753       { return cancel_run_called_; }
0754 
0755    /// Returns true if the connection was canceled.
0756    bool will_reconnect() const noexcept
0757       { return cfg_.reconnect_wait_interval != std::chrono::seconds::zero();}
0758 
0759    /// Returns the ssl context.
0760    auto const& get_ssl_context() const noexcept
0761       { return ctx_;}
0762 
0763    /// Resets the underlying stream.
0764    void reset_stream()
0765       { stream_ = std::make_unique<next_layer_type>(writer_timer_.get_executor(), ctx_); }
0766 
0767    /// Returns a reference to the next layer.
0768    auto& next_layer() noexcept
0769       { return *stream_; }
0770 
0771    /// Returns a const reference to the next layer.
0772    auto const& next_layer() const noexcept
0773       { return *stream_; }
0774    /// Sets the response object of `async_receive` operations.
0775    template <class Response>
0776    void set_receive_response(Response& response)
0777    {
0778       using namespace boost::redis::adapter;
0779       auto g = boost_redis_adapt(response);
0780       receive_adapter_ = adapter::detail::make_adapter_wrapper(g);
0781    }
0782 
0783    /// Returns connection usage information.
0784    usage get_usage() const noexcept
0785       { return usage_; }
0786 
0787 private:
0788    using clock_type = std::chrono::steady_clock;
0789    using clock_traits_type = asio::wait_traits<clock_type>;
0790    using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
0791 
0792    using receive_channel_type = asio::experimental::channel<executor_type, void(system::error_code, std::size_t)>;
0793    using resolver_type = detail::resolver<Executor>;
0794    using health_checker_type = detail::health_checker<Executor>;
0795    using resp3_handshaker_type = detail::resp3_handshaker<executor_type>;
0796    using adapter_type = std::function<void(std::size_t, resp3::basic_node<std::string_view> const&, system::error_code&)>;
0797    using receiver_adapter_type = std::function<void(resp3::basic_node<std::string_view> const&, system::error_code&)>;
0798    using exec_notifier_type = receive_channel_type;
0799 
0800    auto use_ssl() const noexcept
0801       { return cfg_.use_ssl;}
0802 
0803    auto cancel_on_conn_lost() -> std::size_t
0804    {
0805       // Must return false if the request should be removed.
0806       auto cond = [](auto const& ptr)
0807       {
0808          BOOST_ASSERT(ptr != nullptr);
0809 
0810          if (ptr->is_waiting()) {
0811             return !ptr->req_->get_config().cancel_on_connection_lost;
0812          } else {
0813             return !ptr->req_->get_config().cancel_if_unresponded;
0814          }
0815       };
0816 
0817       auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond);
0818 
0819       auto const ret = std::distance(point, std::end(reqs_));
0820 
0821       std::for_each(point, std::end(reqs_), [](auto const& ptr) {
0822          ptr->stop();
0823       });
0824 
0825       reqs_.erase(point, std::end(reqs_));
0826 
0827       std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
0828          return ptr->mark_waiting();
0829       });
0830 
0831       return ret;
0832    }
0833 
0834    auto cancel_unwritten_requests() -> std::size_t
0835    {
0836       auto f = [](auto const& ptr)
0837       {
0838          BOOST_ASSERT(ptr != nullptr);
0839          return !ptr->is_waiting();
0840       };
0841 
0842       auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f);
0843 
0844       auto const ret = std::distance(point, std::end(reqs_));
0845 
0846       std::for_each(point, std::end(reqs_), [](auto const& ptr) {
0847          ptr->stop();
0848       });
0849 
0850       reqs_.erase(point, std::end(reqs_));
0851       return ret;
0852    }
0853 
0854    void cancel_run()
0855    {
0856       // Protects the code below from being called more than
0857       // once, see https://github.com/boostorg/redis/issues/181
0858       if (std::exchange(cancel_run_called_, true)) {
0859          return;
0860       }
0861 
0862       close();
0863       writer_timer_.cancel();
0864       receive_channel_.cancel();
0865       cancel_on_conn_lost();
0866    }
0867 
0868    void on_write()
0869    {
0870       // We have to clear the payload right after writing it to use it
0871       // as a flag that informs there is no ongoing write.
0872       write_buffer_.clear();
0873 
0874       // Notice this must come before the for-each below.
0875       cancel_push_requests();
0876 
0877       // There is small optimization possible here: traverse only the
0878       // partition of unwritten requests instead of them all.
0879       std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
0880          BOOST_ASSERT_MSG(ptr != nullptr, "Expects non-null pointer.");
0881          if (ptr->is_staged()) {
0882             ptr->mark_written();
0883          }
0884       });
0885    }
0886 
0887    struct req_info {
0888    public:
0889       using node_type = resp3::basic_node<std::string_view>;
0890       using wrapped_adapter_type = std::function<void(node_type const&, system::error_code&)>;
0891 
0892       explicit req_info(request const& req, adapter_type adapter, executor_type ex)
0893       : notifier_{ex, 1}
0894       , req_{&req}
0895       , adapter_{}
0896       , expected_responses_{req.get_expected_responses()}
0897       , status_{status::waiting}
0898       , ec_{{}}
0899       , read_size_{0}
0900       {
0901          adapter_ = [this, adapter](node_type const& nd, system::error_code& ec)
0902          {
0903             auto const i = req_->get_expected_responses() - expected_responses_;
0904             adapter(i, nd, ec);
0905          };
0906       }
0907 
0908       auto proceed()
0909       {
0910          notifier_.try_send(std::error_code{}, 0);
0911       }
0912 
0913       void stop()
0914       {
0915          notifier_.close();
0916       }
0917 
0918       [[nodiscard]] auto is_waiting() const noexcept
0919          { return status_ == status::waiting; }
0920 
0921       [[nodiscard]] auto is_written() const noexcept
0922          { return status_ == status::written; }
0923 
0924       [[nodiscard]] auto is_staged() const noexcept
0925          { return status_ == status::staged; }
0926 
0927       void mark_written() noexcept
0928          { status_ = status::written; }
0929 
0930       void mark_staged() noexcept
0931          { status_ = status::staged; }
0932 
0933       void mark_waiting() noexcept
0934          { status_ = status::waiting; }
0935 
0936       [[nodiscard]] auto stop_requested() const noexcept
0937          { return !notifier_.is_open();}
0938 
0939       template <class CompletionToken>
0940       auto async_wait(CompletionToken&& token)
0941       {
0942          return notifier_.async_receive(std::forward<CompletionToken>(token));
0943       }
0944 
0945    //private:
0946       enum class status
0947       { waiting
0948       , staged
0949       , written
0950       };
0951 
0952       exec_notifier_type notifier_;
0953       request const* req_;
0954       wrapped_adapter_type adapter_;
0955 
0956       // Contains the number of commands that haven't been read yet.
0957       std::size_t expected_responses_;
0958       status status_;
0959 
0960       system::error_code ec_;
0961       std::size_t read_size_;
0962    };
0963 
0964    void remove_request(std::shared_ptr<req_info> const& info)
0965    {
0966       reqs_.erase(std::remove(std::begin(reqs_), std::end(reqs_), info));
0967    }
0968 
0969    using reqs_type = std::deque<std::shared_ptr<req_info>>;
0970 
0971    template <class, class> friend struct detail::reader_op;
0972    template <class, class> friend struct detail::writer_op;
0973    template <class> friend struct detail::exec_op;
0974    template <class, class> friend class detail::run_op;
0975 
0976    void cancel_push_requests()
0977    {
0978       auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
0979             return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0);
0980       });
0981 
0982       std::for_each(point, std::end(reqs_), [](auto const& ptr) {
0983          ptr->proceed();
0984       });
0985 
0986       reqs_.erase(point, std::end(reqs_));
0987    }
0988 
0989    [[nodiscard]] bool is_writing() const noexcept
0990    {
0991       return !write_buffer_.empty();
0992    }
0993 
0994    void add_request_info(std::shared_ptr<req_info> const& info)
0995    {
0996       reqs_.push_back(info);
0997 
0998       if (info->req_->has_hello_priority()) {
0999          auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) {
1000                return e->is_waiting();
1001          });
1002 
1003          std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend);
1004       }
1005 
1006       if (is_open() && !is_writing())
1007          writer_timer_.cancel();
1008    }
1009 
1010    template <class CompletionToken, class Logger>
1011    auto reader(Logger l, CompletionToken&& token)
1012    {
1013       return asio::async_compose
1014          < CompletionToken
1015          , void(system::error_code)
1016          >(detail::reader_op<this_type, Logger>{this, l},
1017                std::forward<CompletionToken>(token), writer_timer_);
1018    }
1019 
1020    template <class CompletionToken, class Logger>
1021    auto writer(Logger l, CompletionToken&& token)
1022    {
1023       return asio::async_compose
1024          < CompletionToken
1025          , void(system::error_code)
1026          >(detail::writer_op<this_type, Logger>{this, l}, std::forward<CompletionToken>(token), writer_timer_);
1027    }
1028 
1029    [[nodiscard]] bool coalesce_requests()
1030    {
1031       // Coalesces the requests and marks them staged. After a
1032       // successful write staged requests will be marked as written.
1033       auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](auto const& ri) {
1034             return !ri->is_waiting();
1035       });
1036 
1037       std::for_each(point, std::cend(reqs_), [this](auto const& ri) {
1038          // Stage the request.
1039          write_buffer_ += ri->req_->payload();
1040          ri->mark_staged();
1041          usage_.commands_sent += ri->expected_responses_;
1042       });
1043 
1044       usage_.bytes_sent += std::size(write_buffer_);
1045 
1046       return point != std::cend(reqs_);
1047    }
1048 
1049    bool is_waiting_response() const noexcept
1050    {
1051       if (std::empty(reqs_))
1052          return false;
1053 
1054       // Under load and on low-latency networks we might start
1055       // receiving responses before the write operation completed and
1056       // the request is still maked as staged and not written.  See
1057       // https://github.com/boostorg/redis/issues/170
1058       return !reqs_.front()->is_waiting();
1059    }
1060 
1061    void close()
1062    {
1063       if (stream_->next_layer().is_open()) {
1064          system::error_code ec;
1065          stream_->next_layer().close(ec);
1066       }
1067    }
1068 
1069    auto is_open() const noexcept { return stream_->next_layer().is_open(); }
1070    auto& lowest_layer() noexcept { return stream_->lowest_layer(); }
1071 
1072    auto is_next_push()
1073    {
1074       BOOST_ASSERT(!read_buffer_.empty());
1075 
1076       // Useful links to understand the heuristics below.
1077       //
1078       // - https://github.com/redis/redis/issues/11784
1079       // - https://github.com/redis/redis/issues/6426
1080       // - https://github.com/boostorg/redis/issues/170
1081 
1082       // The message's resp3 type is a push.
1083       if (resp3::to_type(read_buffer_.front()) == resp3::type::push)
1084          return true;
1085 
1086       // This is non-push type and the requests queue is empty. I have
1087       // noticed this is possible, for example with -MISCONF. I don't
1088       // know why they are not sent with a push type so we can
1089       // distinguish them from responses to commands. If we are lucky
1090       // enough to receive them when the command queue is empty they
1091       // can be treated as server pushes, otherwise it is impossible
1092       // to handle them properly
1093       if (reqs_.empty())
1094          return true;
1095 
1096       // The request does not expect any response but we got one. This
1097       // may happen if for example, subscribe with wrong syntax.
1098       if (reqs_.front()->expected_responses_ == 0)
1099          return true;
1100 
1101       // Added to deal with MONITOR and also to fix PR170 which
1102       // happens under load and on low-latency networks, where we
1103       // might start receiving responses before the write operation
1104       // completed and the request is still maked as staged and not
1105       // written.
1106       return reqs_.front()->is_waiting();
1107    }
1108 
1109    auto get_suggested_buffer_growth() const noexcept
1110    {
1111       return parser_.get_suggested_buffer_growth(4096);
1112    }
1113 
1114    enum class parse_result { needs_more, push, resp };
1115 
1116    using parse_ret_type = std::pair<parse_result, std::size_t>;
1117 
1118    parse_ret_type on_finish_parsing(parse_result t)
1119    {
1120       if (t == parse_result::push) {
1121          usage_.pushes_received += 1;
1122          usage_.push_bytes_received += parser_.get_consumed();
1123       } else {
1124          usage_.responses_received += 1;
1125          usage_.response_bytes_received += parser_.get_consumed();
1126       }
1127 
1128       on_push_ = false;
1129       dbuf_.consume(parser_.get_consumed());
1130       auto const res = std::make_pair(t, parser_.get_consumed());
1131       parser_.reset();
1132       return res;
1133    }
1134 
1135    parse_ret_type on_read(std::string_view data, system::error_code& ec)
1136    {
1137       // We arrive here in two states:
1138       //
1139       //    1. While we are parsing a message. In this case we
1140       //       don't want to determine the type of the message in the
1141       //       buffer (i.e. response vs push) but leave it untouched
1142       //       until the parsing of a complete message ends.
1143       //
1144       //    2. On a new message, in which case we have to determine
1145       //       whether the next message is a push or a response.
1146       //
1147       if (!on_push_) // Prepare for new message.
1148          on_push_ = is_next_push();
1149 
1150       if (on_push_) {
1151          if (!resp3::parse(parser_, data, receive_adapter_, ec))
1152             return std::make_pair(parse_result::needs_more, 0);
1153 
1154          if (ec)
1155             return std::make_pair(parse_result::push, 0);
1156 
1157          return on_finish_parsing(parse_result::push);
1158       }
1159 
1160       BOOST_ASSERT_MSG(is_waiting_response(), "Not waiting for a response (using MONITOR command perhaps?)");
1161       BOOST_ASSERT(!reqs_.empty());
1162       BOOST_ASSERT(reqs_.front() != nullptr);
1163       BOOST_ASSERT(reqs_.front()->expected_responses_ != 0);
1164 
1165       if (!resp3::parse(parser_, data, reqs_.front()->adapter_, ec))
1166          return std::make_pair(parse_result::needs_more, 0);
1167 
1168       if (ec) {
1169          reqs_.front()->ec_ = ec;
1170          reqs_.front()->proceed();
1171          return std::make_pair(parse_result::resp, 0);
1172       }
1173 
1174       reqs_.front()->read_size_ += parser_.get_consumed();
1175 
1176       if (--reqs_.front()->expected_responses_ == 0) {
1177          // Done with this request.
1178          reqs_.front()->proceed();
1179          reqs_.pop_front();
1180       }
1181 
1182       return on_finish_parsing(parse_result::resp);
1183    }
1184 
1185    void reset()
1186    {
1187       write_buffer_.clear();
1188       read_buffer_.clear();
1189       parser_.reset();
1190       on_push_ = false;
1191       cancel_run_called_ = false;
1192    }
1193 
1194    asio::ssl::context ctx_;
1195    std::unique_ptr<next_layer_type> stream_;
1196 
1197    // Notice we use a timer to simulate a condition-variable. It is
1198    // also more suitable than a channel and the notify operation does
1199    // not suspend.
1200    timer_type writer_timer_;
1201    receive_channel_type receive_channel_;
1202    resolver_type resv_;
1203    detail::connector ctor_;
1204    health_checker_type health_checker_;
1205    resp3_handshaker_type handshaker_;
1206    receiver_adapter_type receive_adapter_;
1207 
1208    using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
1209 
1210    config cfg_;
1211    std::string read_buffer_;
1212    dyn_buffer_type dbuf_;
1213    std::string write_buffer_;
1214    reqs_type reqs_;
1215    resp3::parser parser_{};
1216    bool on_push_ = false;
1217    bool cancel_run_called_ = false;
1218 
1219    usage usage_;
1220 };
1221 
1222 /** \brief A basic_connection that type erases the executor.
1223  *  \ingroup high-level-api
1224  *
1225  *  This connection type uses the asio::any_io_executor and
1226  *  asio::any_completion_token to reduce compilation times.
1227  *
1228  *  For documentation of each member function see
1229  *  `boost::redis::basic_connection`.
1230  */
1231 class connection {
1232 public:
1233    /// Executor type.
1234    using executor_type = asio::any_io_executor;
1235 
1236    /// Contructs from an executor.
1237    explicit
1238    connection(
1239       executor_type ex,
1240       asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
1241       std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)());
1242 
1243    /// Contructs from a context.
1244    explicit
1245    connection(
1246       asio::io_context& ioc,
1247       asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
1248       std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)());
1249 
1250    /// Returns the underlying executor.
1251    executor_type get_executor() noexcept
1252       { return impl_.get_executor(); }
1253 
1254    /// Calls `boost::redis::basic_connection::async_run`.
1255    template <class CompletionToken = asio::deferred_t>
1256    auto async_run(config const& cfg, logger l, CompletionToken&& token = {})
1257    {
1258       return asio::async_initiate<
1259          CompletionToken, void(boost::system::error_code)>(
1260             [](auto handler, connection* self, config const* cfg, logger l)
1261             {
1262                self->async_run_impl(*cfg, l, std::move(handler));
1263             }, token, this, &cfg, l);
1264    }
1265 
1266    /// Calls `boost::redis::basic_connection::async_receive`.
1267    template <class CompletionToken = asio::deferred_t>
1268    auto async_receive(CompletionToken&& token = {})
1269       { return impl_.async_receive(std::forward<CompletionToken>(token)); }
1270 
1271    /// Calls `boost::redis::basic_connection::receive`.
1272    std::size_t receive(system::error_code& ec)
1273    {
1274       return impl_.receive(ec);
1275    }
1276 
1277    /// Calls `boost::redis::basic_connection::async_exec`.
1278    template <class Response, class CompletionToken = asio::deferred_t>
1279    auto async_exec(request const& req, Response& resp, CompletionToken&& token = {})
1280    {
1281       return async_exec(req, any_adapter(resp), std::forward<CompletionToken>(token));
1282    }
1283 
1284    /// Calls `boost::redis::basic_connection::async_exec`.
1285    template <class CompletionToken = asio::deferred_t>
1286    auto
1287    async_exec(
1288        request const& req,
1289        any_adapter adapter,
1290        CompletionToken&& token = {})
1291    {
1292       return asio::async_initiate<
1293          CompletionToken, void(boost::system::error_code, std::size_t)>(
1294             [](auto handler, connection* self, request const* req, any_adapter&& adapter)
1295             {
1296                self->async_exec_impl(*req, std::move(adapter), std::move(handler));
1297             }, token, this, &req, std::move(adapter));
1298    }
1299 
1300    /// Calls `boost::redis::basic_connection::cancel`.
1301    void cancel(operation op = operation::all);
1302 
1303    /// Calls `boost::redis::basic_connection::will_reconnect`.
1304    bool will_reconnect() const noexcept
1305       { return impl_.will_reconnect();}
1306 
1307    /// Calls `boost::redis::basic_connection::next_layer`.
1308    auto& next_layer() noexcept
1309       { return impl_.next_layer(); }
1310 
1311    /// Calls `boost::redis::basic_connection::next_layer`.
1312    auto const& next_layer() const noexcept
1313       { return impl_.next_layer(); }
1314 
1315    /// Calls `boost::redis::basic_connection::reset_stream`.
1316    void reset_stream()
1317       { impl_.reset_stream();}
1318 
1319    /// Sets the response object of `async_receive` operations.
1320    template <class Response>
1321    void set_receive_response(Response& response)
1322       { impl_.set_receive_response(response); }
1323 
1324    /// Returns connection usage information.
1325    usage get_usage() const noexcept
1326       { return impl_.get_usage(); }
1327 
1328    /// Returns the ssl context.
1329    auto const& get_ssl_context() const noexcept
1330       { return impl_.get_ssl_context();}
1331 
1332 private:
1333    void
1334    async_run_impl(
1335       config const& cfg,
1336       logger l,
1337       asio::any_completion_handler<void(boost::system::error_code)> token);
1338    
1339    void
1340    async_exec_impl(
1341       request const& req,
1342       any_adapter&& adapter,
1343       asio::any_completion_handler<void(boost::system::error_code, std::size_t)> token);
1344 
1345    basic_connection<executor_type> impl_;
1346 };
1347 
1348 } // boost::redis
1349 
1350 #endif // BOOST_REDIS_CONNECTION_HPP