Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-07-15 08:49:04

0001 /* Copyright (c) 2018-2023 Marcelo Zimbres Silva (mzimbres@gmail.com)
0002  *
0003  * Distributed under the Boost Software License, Version 1.0. (See
0004  * accompanying file LICENSE.txt)
0005  */
0006 
0007 #ifndef BOOST_REDIS_CONNECTION_BASE_HPP
0008 #define BOOST_REDIS_CONNECTION_BASE_HPP
0009 
0010 #include <boost/redis/adapter/adapt.hpp>
0011 #include <boost/redis/detail/helper.hpp>
0012 #include <boost/redis/error.hpp>
0013 #include <boost/redis/operation.hpp>
0014 #include <boost/redis/request.hpp>
0015 #include <boost/redis/resp3/type.hpp>
0016 #include <boost/redis/config.hpp>
0017 #include <boost/redis/detail/runner.hpp>
0018 #include <boost/redis/usage.hpp>
0019 
0020 #include <boost/system.hpp>
0021 #include <boost/asio/basic_stream_socket.hpp>
0022 #include <boost/asio/bind_executor.hpp>
0023 #include <boost/asio/experimental/parallel_group.hpp>
0024 #include <boost/asio/ip/tcp.hpp>
0025 #include <boost/asio/steady_timer.hpp>
0026 #include <boost/asio/write.hpp>
0027 #include <boost/assert.hpp>
0028 #include <boost/core/ignore_unused.hpp>
0029 #include <boost/asio/ssl/stream.hpp>
0030 #include <boost/asio/read_until.hpp>
0031 #include <boost/asio/buffer.hpp>
0032 #include <boost/asio/experimental/channel.hpp>
0033 
0034 #include <algorithm>
0035 #include <array>
0036 #include <chrono>
0037 #include <deque>
0038 #include <memory>
0039 #include <string_view>
0040 #include <type_traits>
0041 #include <functional>
0042 
0043 namespace boost::redis::detail
0044 {
0045 
0046 template <class DynamicBuffer>
0047 std::string_view buffer_view(DynamicBuffer buf) noexcept
0048 {
0049    char const* start = static_cast<char const*>(buf.data(0, buf.size()).data());
0050    return std::string_view{start, std::size(buf)};
0051 }
0052 
0053 template <class AsyncReadStream, class DynamicBuffer>
0054 class append_some_op {
0055 private:
0056    AsyncReadStream& stream_;
0057    DynamicBuffer buf_;
0058    std::size_t size_ = 0;
0059    std::size_t tmp_ = 0;
0060    asio::coroutine coro_{};
0061 
0062 public:
0063    append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
0064    : stream_ {stream}
0065    , buf_ {std::move(buf)}
0066    , size_{size}
0067    { }
0068 
0069    template <class Self>
0070    void operator()( Self& self
0071                   , system::error_code ec = {}
0072                   , std::size_t n = 0)
0073    {
0074       BOOST_ASIO_CORO_REENTER (coro_)
0075       {
0076          tmp_ = buf_.size();
0077          buf_.grow(size_);
0078 
0079          BOOST_ASIO_CORO_YIELD
0080          stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
0081          if (ec) {
0082             self.complete(ec, 0);
0083             return;
0084          }
0085 
0086          buf_.shrink(buf_.size() - tmp_ - n);
0087          self.complete({}, n);
0088       }
0089    }
0090 };
0091 
0092 template <class AsyncReadStream, class DynamicBuffer, class CompletionToken>
0093 auto
0094 async_append_some(
0095    AsyncReadStream& stream,
0096    DynamicBuffer buffer,
0097    std::size_t size,
0098    CompletionToken&& token)
0099 {
0100    return asio::async_compose
0101       < CompletionToken
0102       , void(system::error_code, std::size_t)
0103       >(append_some_op<AsyncReadStream, DynamicBuffer> {stream, buffer, size}, token, stream);
0104 }
0105 
0106 template <class Conn>
0107 struct exec_op {
0108    using req_info_type = typename Conn::req_info;
0109    using adapter_type = typename Conn::adapter_type;
0110 
0111    Conn* conn_ = nullptr;
0112    std::shared_ptr<req_info_type> info_ = nullptr;
0113    asio::coroutine coro{};
0114 
0115    template <class Self>
0116    void operator()(Self& self , system::error_code = {}, std::size_t = 0)
0117    {
0118       BOOST_ASIO_CORO_REENTER (coro)
0119       {
0120          // Check whether the user wants to wait for the connection to
0121          // be stablished.
0122          if (info_->req_->get_config().cancel_if_not_connected && !conn_->is_open()) {
0123             BOOST_ASIO_CORO_YIELD
0124             asio::post(std::move(self));
0125             return self.complete(error::not_connected, 0);
0126          }
0127 
0128          conn_->add_request_info(info_);
0129 
0130 EXEC_OP_WAIT:
0131          BOOST_ASIO_CORO_YIELD
0132          info_->async_wait(std::move(self));
0133 
0134          if (info_->ec_) {
0135             self.complete(info_->ec_, 0);
0136             return;
0137          }
0138 
0139          if (info_->stop_requested()) {
0140             // Don't have to call remove_request as it has already
0141             // been by cancel(exec).
0142             return self.complete(asio::error::operation_aborted, 0);
0143          }
0144 
0145          if (is_cancelled(self)) {
0146             if (!info_->is_waiting()) {
0147                using c_t = asio::cancellation_type;
0148                auto const c = self.get_cancellation_state().cancelled();
0149                if ((c & c_t::terminal) != c_t::none) {
0150                   // Cancellation requires closing the connection
0151                   // otherwise it stays in inconsistent state.
0152                   conn_->cancel(operation::run);
0153                   return self.complete(asio::error::operation_aborted, 0);
0154                } else {
0155                   // Can't implement other cancelation types, ignoring.
0156                   self.get_cancellation_state().clear();
0157 
0158                   // TODO: Find out a better way to ignore
0159                   // cancelation.
0160                   goto EXEC_OP_WAIT;
0161                }
0162             } else {
0163                // Cancelation can be honored.
0164                conn_->remove_request(info_);
0165                self.complete(asio::error::operation_aborted, 0);
0166                return;
0167             }
0168          }
0169 
0170          self.complete(info_->ec_, info_->read_size_);
0171       }
0172    }
0173 };
0174 
0175 template <class Conn, class Logger>
0176 struct run_op {
0177    Conn* conn = nullptr;
0178    Logger logger_;
0179    asio::coroutine coro{};
0180 
0181    template <class Self>
0182    void operator()( Self& self
0183                   , std::array<std::size_t, 2> order = {}
0184                   , system::error_code ec0 = {}
0185                   , system::error_code ec1 = {})
0186    {
0187       BOOST_ASIO_CORO_REENTER (coro)
0188       {
0189          conn->reset();
0190 
0191          BOOST_ASIO_CORO_YIELD
0192          asio::experimental::make_parallel_group(
0193             [this](auto token) { return conn->reader(logger_, token);},
0194             [this](auto token) { return conn->writer(logger_, token);}
0195          ).async_wait(
0196             asio::experimental::wait_for_one(),
0197             std::move(self));
0198 
0199          if (is_cancelled(self)) {
0200             logger_.trace("run-op: canceled. Exiting ...");
0201             self.complete(asio::error::operation_aborted);
0202             return;
0203          }
0204 
0205          logger_.on_run(ec0, ec1);
0206 
0207          switch (order[0]) {
0208            case 0: self.complete(ec0); break;
0209            case 1: self.complete(ec1); break;
0210            default: BOOST_ASSERT(false);
0211          }
0212       }
0213    }
0214 };
0215 
0216 template <class Conn, class Logger>
0217 struct writer_op {
0218    Conn* conn_;
0219    Logger logger_;
0220    asio::coroutine coro{};
0221 
0222    template <class Self>
0223    void operator()( Self& self
0224                   , system::error_code ec = {}
0225                   , std::size_t n = 0)
0226    {
0227       ignore_unused(n);
0228 
0229       BOOST_ASIO_CORO_REENTER (coro) for (;;)
0230       {
0231          while (conn_->coalesce_requests()) {
0232             if (conn_->use_ssl())
0233                BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
0234             else
0235                BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer().next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
0236 
0237             logger_.on_write(ec, conn_->write_buffer_);
0238 
0239             if (ec) {
0240                logger_.trace("writer-op: error. Exiting ...");
0241                conn_->cancel(operation::run);
0242                self.complete(ec);
0243                return;
0244             }
0245 
0246             if (is_cancelled(self)) {
0247                logger_.trace("writer-op: canceled. Exiting ...");
0248                self.complete(asio::error::operation_aborted);
0249                return;
0250             }
0251 
0252             conn_->on_write();
0253 
0254             // A socket.close() may have been called while a
0255             // successful write might had already been queued, so we
0256             // have to check here before proceeding.
0257             if (!conn_->is_open()) {
0258                logger_.trace("writer-op: canceled (2). Exiting ...");
0259                self.complete({});
0260                return;
0261             }
0262          }
0263 
0264          BOOST_ASIO_CORO_YIELD
0265          conn_->writer_timer_.async_wait(std::move(self));
0266          if (!conn_->is_open() || is_cancelled(self)) {
0267             logger_.trace("writer-op: canceled (3). Exiting ...");
0268             // Notice this is not an error of the op, stoping was
0269             // requested from the outside, so we complete with
0270             // success.
0271             self.complete({});
0272             return;
0273          }
0274       }
0275    }
0276 };
0277 
0278 template <class Conn, class Logger>
0279 struct reader_op {
0280    using parse_result = typename Conn::parse_result;
0281    using parse_ret_type = typename Conn::parse_ret_type;
0282    Conn* conn_;
0283    Logger logger_;
0284    parse_ret_type res_{parse_result::resp, 0};
0285    asio::coroutine coro{};
0286 
0287    template <class Self>
0288    void operator()( Self& self
0289                   , system::error_code ec = {}
0290                   , std::size_t n = 0)
0291    {
0292       ignore_unused(n);
0293 
0294       BOOST_ASIO_CORO_REENTER (coro) for (;;)
0295       {
0296          // Appends some data to the buffer if necessary.
0297          if ((res_.first == parse_result::needs_more) || std::empty(conn_->read_buffer_)) {
0298             if (conn_->use_ssl()) {
0299                BOOST_ASIO_CORO_YIELD
0300                async_append_some(
0301                   conn_->next_layer(),
0302                   conn_->dbuf_,
0303                   conn_->get_suggested_buffer_growth(),
0304                   std::move(self));
0305             } else {
0306                BOOST_ASIO_CORO_YIELD
0307                async_append_some(
0308                   conn_->next_layer().next_layer(),
0309                   conn_->dbuf_,
0310                   conn_->get_suggested_buffer_growth(),
0311                   std::move(self));
0312             }
0313 
0314             logger_.on_read(ec, n);
0315 
0316             // EOF is not treated as error.
0317             if (ec == asio::error::eof) {
0318                logger_.trace("reader-op: EOF received. Exiting ...");
0319                conn_->cancel(operation::run);
0320                return self.complete({}); // EOFINAE: EOF is not an error.
0321             }
0322 
0323             // The connection is not viable after an error.
0324             if (ec) {
0325                logger_.trace("reader-op: error. Exiting ...");
0326                conn_->cancel(operation::run);
0327                self.complete(ec);
0328                return;
0329             }
0330 
0331             // Somebody might have canceled implicitly or explicitly
0332             // while we were suspended and after queueing so we have to
0333             // check.
0334             if (!conn_->is_open() || is_cancelled(self)) {
0335                logger_.trace("reader-op: canceled. Exiting ...");
0336                self.complete(ec);
0337                return;
0338             }
0339          }
0340 
0341          res_ = conn_->on_read(buffer_view(conn_->dbuf_), ec);
0342          if (ec) {
0343             logger_.trace("reader-op: parse error. Exiting ...");
0344             conn_->cancel(operation::run);
0345             self.complete(ec);
0346             return;
0347          }
0348 
0349          if (res_.first == parse_result::push) {
0350             if (!conn_->receive_channel_.try_send(ec, res_.second)) {
0351                BOOST_ASIO_CORO_YIELD
0352                conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
0353             }
0354 
0355             if (ec) {
0356                logger_.trace("reader-op: error. Exiting ...");
0357                conn_->cancel(operation::run);
0358                self.complete(ec);
0359                return;
0360             }
0361 
0362             if (!conn_->is_open() || is_cancelled(self)) {
0363                logger_.trace("reader-op: canceled (2). Exiting ...");
0364                self.complete(asio::error::operation_aborted);
0365                return;
0366             }
0367 
0368          }
0369       }
0370    }
0371 };
0372 
0373 /** @brief Base class for high level Redis asynchronous connections.
0374  *  @ingroup high-level-api
0375  *
0376  *  @tparam Executor The executor type.
0377  *
0378  */
0379 template <class Executor>
0380 class connection_base {
0381 public:
0382    /// Executor type
0383    using executor_type = Executor;
0384 
0385    /// Type of the next layer
0386    using next_layer_type = asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>>;
0387 
0388    using clock_type = std::chrono::steady_clock;
0389    using clock_traits_type = asio::wait_traits<clock_type>;
0390    using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
0391 
0392    using this_type = connection_base<Executor>;
0393 
0394    /// Constructs from an executor.
0395    connection_base(
0396       executor_type ex,
0397       asio::ssl::context ctx,
0398       std::size_t max_read_size)
0399    : ctx_{std::move(ctx)}
0400    , stream_{std::make_unique<next_layer_type>(ex, ctx_)}
0401    , writer_timer_{ex}
0402    , receive_channel_{ex, 256}
0403    , runner_{ex, {}}
0404    , dbuf_{read_buffer_, max_read_size}
0405    {
0406       set_receive_response(ignore);
0407       writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)());
0408    }
0409 
0410    /// Returns the ssl context.
0411    auto const& get_ssl_context() const noexcept
0412       { return ctx_;}
0413 
0414    /// Resets the underlying stream.
0415    void reset_stream()
0416    {
0417       stream_ = std::make_unique<next_layer_type>(writer_timer_.get_executor(), ctx_);
0418    }
0419 
0420    /// Returns a reference to the next layer.
0421    auto& next_layer() noexcept { return *stream_; }
0422 
0423    /// Returns a const reference to the next layer.
0424    auto const& next_layer() const noexcept { return *stream_; }
0425 
0426    /// Returns the associated executor.
0427    auto get_executor() {return writer_timer_.get_executor();}
0428 
0429    /// Cancels specific operations.
0430    void cancel(operation op)
0431    {
0432       runner_.cancel(op);
0433       if (op == operation::all) {
0434          cancel_impl(operation::run);
0435          cancel_impl(operation::receive);
0436          cancel_impl(operation::exec);
0437          return;
0438       } 
0439 
0440       cancel_impl(op);
0441    }
0442 
0443    template <class Response, class CompletionToken>
0444    auto async_exec(request const& req, Response& resp, CompletionToken token)
0445    {
0446       using namespace boost::redis::adapter;
0447       auto f = boost_redis_adapt(resp);
0448       BOOST_ASSERT_MSG(req.get_expected_responses() <= f.get_supported_response_size(), "Request and response have incompatible sizes.");
0449 
0450       auto info = std::make_shared<req_info>(req, f, get_executor());
0451 
0452       return asio::async_compose
0453          < CompletionToken
0454          , void(system::error_code, std::size_t)
0455          >(exec_op<this_type>{this, info}, token, writer_timer_);
0456    }
0457 
0458    template <class Response, class CompletionToken>
0459    [[deprecated("Set the response with set_receive_response and use the other overload.")]]
0460    auto async_receive(Response& response, CompletionToken token)
0461    {
0462       set_receive_response(response);
0463       return receive_channel_.async_receive(std::move(token));
0464    }
0465 
0466    template <class CompletionToken>
0467    auto async_receive(CompletionToken token)
0468       { return receive_channel_.async_receive(std::move(token)); }
0469 
0470    std::size_t receive(system::error_code& ec)
0471    {
0472       std::size_t size = 0;
0473 
0474       auto f = [&](system::error_code const& ec2, std::size_t n)
0475       {
0476          ec = ec2;
0477          size = n;
0478       };
0479 
0480       auto const res = receive_channel_.try_receive(f);
0481       if (ec)
0482          return 0;
0483 
0484       if (!res)
0485          ec = error::sync_receive_push_failed;
0486 
0487       return size;
0488    }
0489 
0490    template <class Logger, class CompletionToken>
0491    auto async_run(config const& cfg, Logger l, CompletionToken token)
0492    {
0493       runner_.set_config(cfg);
0494       l.set_prefix(runner_.get_config().log_prefix);
0495       return runner_.async_run(*this, l, std::move(token));
0496    }
0497 
0498    template <class Response>
0499    void set_receive_response(Response& response)
0500    {
0501       using namespace boost::redis::adapter;
0502       auto g = boost_redis_adapt(response);
0503       receive_adapter_ = adapter::detail::make_adapter_wrapper(g);
0504    }
0505 
0506    usage get_usage() const noexcept
0507       { return usage_; }
0508 
0509    auto run_is_canceled() const noexcept
0510       { return cancel_run_called_; }
0511 
0512 private:
0513    using receive_channel_type = asio::experimental::channel<executor_type, void(system::error_code, std::size_t)>;
0514    using runner_type = runner<executor_type>;
0515    using adapter_type = std::function<void(std::size_t, resp3::basic_node<std::string_view> const&, system::error_code&)>;
0516    using receiver_adapter_type = std::function<void(resp3::basic_node<std::string_view> const&, system::error_code&)>;
0517    using exec_notifier_type = receive_channel_type;
0518 
0519    auto use_ssl() const noexcept
0520       { return runner_.get_config().use_ssl;}
0521 
0522    auto cancel_on_conn_lost() -> std::size_t
0523    {
0524       // Must return false if the request should be removed.
0525       auto cond = [](auto const& ptr)
0526       {
0527          BOOST_ASSERT(ptr != nullptr);
0528 
0529          if (ptr->is_waiting()) {
0530             return !ptr->req_->get_config().cancel_on_connection_lost;
0531          } else {
0532             return !ptr->req_->get_config().cancel_if_unresponded;
0533          }
0534       };
0535 
0536       auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond);
0537 
0538       auto const ret = std::distance(point, std::end(reqs_));
0539 
0540       std::for_each(point, std::end(reqs_), [](auto const& ptr) {
0541          ptr->stop();
0542       });
0543 
0544       reqs_.erase(point, std::end(reqs_));
0545 
0546       std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
0547          return ptr->mark_waiting();
0548       });
0549 
0550       return ret;
0551    }
0552 
0553    auto cancel_unwritten_requests() -> std::size_t
0554    {
0555       auto f = [](auto const& ptr)
0556       {
0557          BOOST_ASSERT(ptr != nullptr);
0558          return !ptr->is_waiting();
0559       };
0560 
0561       auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f);
0562 
0563       auto const ret = std::distance(point, std::end(reqs_));
0564 
0565       std::for_each(point, std::end(reqs_), [](auto const& ptr) {
0566          ptr->stop();
0567       });
0568 
0569       reqs_.erase(point, std::end(reqs_));
0570       return ret;
0571    }
0572 
0573    void cancel_impl(operation op)
0574    {
0575       switch (op) {
0576          case operation::exec:
0577          {
0578             cancel_unwritten_requests();
0579          } break;
0580          case operation::run:
0581          {
0582             // Protects the code below from being called more than
0583             // once, see https://github.com/boostorg/redis/issues/181
0584             if (std::exchange(cancel_run_called_, true)) {
0585                return;
0586             }
0587 
0588             close();
0589             writer_timer_.cancel();
0590             receive_channel_.cancel();
0591             cancel_on_conn_lost();
0592          } break;
0593          case operation::receive:
0594          {
0595             receive_channel_.cancel();
0596          } break;
0597          default: /* ignore */;
0598       }
0599    }
0600 
0601    void on_write()
0602    {
0603       // We have to clear the payload right after writing it to use it
0604       // as a flag that informs there is no ongoing write.
0605       write_buffer_.clear();
0606 
0607       // Notice this must come before the for-each below.
0608       cancel_push_requests();
0609 
0610       // There is small optimization possible here: traverse only the
0611       // partition of unwritten requests instead of them all.
0612       std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
0613          BOOST_ASSERT_MSG(ptr != nullptr, "Expects non-null pointer.");
0614          if (ptr->is_staged()) {
0615             ptr->mark_written();
0616          }
0617       });
0618    }
0619 
0620    struct req_info {
0621    public:
0622       using node_type = resp3::basic_node<std::string_view>;
0623       using wrapped_adapter_type = std::function<void(node_type const&, system::error_code&)>;
0624 
0625       explicit req_info(request const& req, adapter_type adapter, executor_type ex)
0626       : notifier_{ex, 1}
0627       , req_{&req}
0628       , adapter_{}
0629       , expected_responses_{req.get_expected_responses()}
0630       , status_{status::waiting}
0631       , ec_{{}}
0632       , read_size_{0}
0633       {
0634          adapter_ = [this, adapter](node_type const& nd, system::error_code& ec)
0635          {
0636             auto const i = req_->get_expected_responses() - expected_responses_;
0637             adapter(i, nd, ec);
0638          };
0639       }
0640 
0641       auto proceed()
0642       {
0643          notifier_.try_send(std::error_code{}, 0);
0644       }
0645 
0646       void stop()
0647       {
0648          notifier_.close();
0649       }
0650 
0651       [[nodiscard]] auto is_waiting() const noexcept
0652          { return status_ == status::waiting; }
0653 
0654       [[nodiscard]] auto is_written() const noexcept
0655          { return status_ == status::written; }
0656 
0657       [[nodiscard]] auto is_staged() const noexcept
0658          { return status_ == status::staged; }
0659 
0660       void mark_written() noexcept
0661          { status_ = status::written; }
0662 
0663       void mark_staged() noexcept
0664          { status_ = status::staged; }
0665 
0666       void mark_waiting() noexcept
0667          { status_ = status::waiting; }
0668 
0669       [[nodiscard]] auto stop_requested() const noexcept
0670          { return !notifier_.is_open();}
0671 
0672       template <class CompletionToken>
0673       auto async_wait(CompletionToken token)
0674       {
0675          return notifier_.async_receive(std::move(token));
0676       }
0677 
0678    //private:
0679       enum class status
0680       { waiting
0681       , staged
0682       , written
0683       };
0684 
0685       exec_notifier_type notifier_;
0686       request const* req_;
0687       wrapped_adapter_type adapter_;
0688 
0689       // Contains the number of commands that haven't been read yet.
0690       std::size_t expected_responses_;
0691       status status_;
0692 
0693       system::error_code ec_;
0694       std::size_t read_size_;
0695    };
0696 
0697    void remove_request(std::shared_ptr<req_info> const& info)
0698    {
0699       reqs_.erase(std::remove(std::begin(reqs_), std::end(reqs_), info));
0700    }
0701 
0702    using reqs_type = std::deque<std::shared_ptr<req_info>>;
0703 
0704    template <class, class> friend struct reader_op;
0705    template <class, class> friend struct writer_op;
0706    template <class, class> friend struct run_op;
0707    template <class> friend struct exec_op;
0708    template <class, class, class> friend struct run_all_op;
0709 
0710    void cancel_push_requests()
0711    {
0712       auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
0713             return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0);
0714       });
0715 
0716       std::for_each(point, std::end(reqs_), [](auto const& ptr) {
0717          ptr->proceed();
0718       });
0719 
0720       reqs_.erase(point, std::end(reqs_));
0721    }
0722 
0723    [[nodiscard]] bool is_writing() const noexcept
0724    {
0725       return !write_buffer_.empty();
0726    }
0727 
0728    void add_request_info(std::shared_ptr<req_info> const& info)
0729    {
0730       reqs_.push_back(info);
0731 
0732       if (info->req_->has_hello_priority()) {
0733          auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) {
0734                return e->is_waiting();
0735          });
0736 
0737          std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend);
0738       }
0739 
0740       if (is_open() && !is_writing())
0741          writer_timer_.cancel();
0742    }
0743 
0744    template <class CompletionToken, class Logger>
0745    auto reader(Logger l, CompletionToken&& token)
0746    {
0747       return asio::async_compose
0748          < CompletionToken
0749          , void(system::error_code)
0750          >(reader_op<this_type, Logger>{this, l}, token, writer_timer_);
0751    }
0752 
0753    template <class CompletionToken, class Logger>
0754    auto writer(Logger l, CompletionToken&& token)
0755    {
0756       return asio::async_compose
0757          < CompletionToken
0758          , void(system::error_code)
0759          >(writer_op<this_type, Logger>{this, l}, token, writer_timer_);
0760    }
0761 
0762    template <class Logger, class CompletionToken>
0763    auto async_run_lean(config const& cfg, Logger l, CompletionToken token)
0764    {
0765       runner_.set_config(cfg);
0766       l.set_prefix(runner_.get_config().log_prefix);
0767       return asio::async_compose
0768          < CompletionToken
0769          , void(system::error_code)
0770          >(run_op<this_type, Logger>{this, l}, token, writer_timer_);
0771    }
0772 
0773    [[nodiscard]] bool coalesce_requests()
0774    {
0775       // Coalesces the requests and marks them staged. After a
0776       // successful write staged requests will be marked as written.
0777       auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](auto const& ri) {
0778             return !ri->is_waiting();
0779       });
0780 
0781       std::for_each(point, std::cend(reqs_), [this](auto const& ri) {
0782          // Stage the request.
0783          write_buffer_ += ri->req_->payload();
0784          ri->mark_staged();
0785          usage_.commands_sent += ri->expected_responses_;
0786       });
0787 
0788       usage_.bytes_sent += std::size(write_buffer_);
0789 
0790       return point != std::cend(reqs_);
0791    }
0792 
0793    bool is_waiting_response() const noexcept
0794    {
0795       if (std::empty(reqs_))
0796          return false;
0797 
0798       // Under load and on low-latency networks we might start
0799       // receiving responses before the write operation completed and
0800       // the request is still maked as staged and not written.  See
0801       // https://github.com/boostorg/redis/issues/170
0802       return !reqs_.front()->is_waiting();
0803    }
0804 
0805    void close()
0806    {
0807       if (stream_->next_layer().is_open()) {
0808          system::error_code ec;
0809          stream_->next_layer().close(ec);
0810       }
0811    }
0812 
0813    auto is_open() const noexcept { return stream_->next_layer().is_open(); }
0814    auto& lowest_layer() noexcept { return stream_->lowest_layer(); }
0815 
0816    auto is_next_push()
0817    {
0818       BOOST_ASSERT(!read_buffer_.empty());
0819 
0820       // Useful links to understand the heuristics below.
0821       //
0822       // - https://github.com/redis/redis/issues/11784
0823       // - https://github.com/redis/redis/issues/6426
0824       // - https://github.com/boostorg/redis/issues/170
0825 
0826       // The message's resp3 type is a push.
0827       if (resp3::to_type(read_buffer_.front()) == resp3::type::push)
0828          return true;
0829 
0830       // This is non-push type and the requests queue is empty. I have
0831       // noticed this is possible, for example with -MISCONF. I don't
0832       // know why they are not sent with a push type so we can
0833       // distinguish them from responses to commands. If we are lucky
0834       // enough to receive them when the command queue is empty they
0835       // can be treated as server pushes, otherwise it is impossible
0836       // to handle them properly
0837       if (reqs_.empty())
0838          return true;
0839 
0840       // The request does not expect any response but we got one. This
0841       // may happen if for example, subscribe with wrong syntax.
0842       if (reqs_.front()->expected_responses_ == 0)
0843          return true;
0844 
0845       // Added to deal with MONITOR and also to fix PR170 which
0846       // happens under load and on low-latency networks, where we
0847       // might start receiving responses before the write operation
0848       // completed and the request is still maked as staged and not
0849       // written.
0850       return reqs_.front()->is_waiting();
0851    }
0852 
0853    auto get_suggested_buffer_growth() const noexcept
0854    {
0855       return parser_.get_suggested_buffer_growth(4096);
0856    }
0857 
0858    enum class parse_result { needs_more, push, resp };
0859 
0860    using parse_ret_type = std::pair<parse_result, std::size_t>;
0861 
0862    parse_ret_type on_finish_parsing(parse_result t)
0863    {
0864       if (t == parse_result::push) {
0865          usage_.pushes_received += 1;
0866          usage_.push_bytes_received += parser_.get_consumed();
0867       } else {
0868          usage_.responses_received += 1;
0869          usage_.response_bytes_received += parser_.get_consumed();
0870       }
0871 
0872       on_push_ = false;
0873       dbuf_.consume(parser_.get_consumed());
0874       auto const res = std::make_pair(t, parser_.get_consumed());
0875       parser_.reset();
0876       return res;
0877    }
0878 
0879    parse_ret_type on_read(std::string_view data, system::error_code& ec)
0880    {
0881       // We arrive here in two states:
0882       //
0883       //    1. While we are parsing a message. In this case we
0884       //       don't want to determine the type of the message in the
0885       //       buffer (i.e. response vs push) but leave it untouched
0886       //       until the parsing of a complete message ends.
0887       //
0888       //    2. On a new message, in which case we have to determine
0889       //       whether the next messag is a push or a response.
0890       //
0891       if (!on_push_) // Prepare for new message.
0892          on_push_ = is_next_push();
0893 
0894       if (on_push_) {
0895          if (!resp3::parse(parser_, data, receive_adapter_, ec))
0896             return std::make_pair(parse_result::needs_more, 0);
0897 
0898          if (ec)
0899             return std::make_pair(parse_result::push, 0);
0900 
0901          return on_finish_parsing(parse_result::push);
0902       }
0903 
0904       BOOST_ASSERT_MSG(is_waiting_response(), "Not waiting for a response (using MONITOR command perhaps?)");
0905       BOOST_ASSERT(!reqs_.empty());
0906       BOOST_ASSERT(reqs_.front() != nullptr);
0907       BOOST_ASSERT(reqs_.front()->expected_responses_ != 0);
0908 
0909       if (!resp3::parse(parser_, data, reqs_.front()->adapter_, ec))
0910          return std::make_pair(parse_result::needs_more, 0);
0911 
0912       if (ec) {
0913          reqs_.front()->ec_ = ec;
0914          reqs_.front()->proceed();
0915          return std::make_pair(parse_result::resp, 0);
0916       }
0917 
0918       reqs_.front()->read_size_ += parser_.get_consumed();
0919 
0920       if (--reqs_.front()->expected_responses_ == 0) {
0921          // Done with this request.
0922          reqs_.front()->proceed();
0923          reqs_.pop_front();
0924       }
0925 
0926       return on_finish_parsing(parse_result::resp);
0927    }
0928 
0929    void reset()
0930    {
0931       write_buffer_.clear();
0932       read_buffer_.clear();
0933       parser_.reset();
0934       on_push_ = false;
0935       cancel_run_called_ = false;
0936    }
0937 
0938    asio::ssl::context ctx_;
0939    std::unique_ptr<next_layer_type> stream_;
0940 
0941    // Notice we use a timer to simulate a condition-variable. It is
0942    // also more suitable than a channel and the notify operation does
0943    // not suspend.
0944    timer_type writer_timer_;
0945    receive_channel_type receive_channel_;
0946    runner_type runner_;
0947    receiver_adapter_type receive_adapter_;
0948 
0949    using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
0950 
0951    std::string read_buffer_;
0952    dyn_buffer_type dbuf_;
0953    std::string write_buffer_;
0954    reqs_type reqs_;
0955    resp3::parser parser_{};
0956    bool on_push_ = false;
0957    bool cancel_run_called_ = false;
0958 
0959    usage usage_;
0960 };
0961 
0962 } // boost::redis::detail
0963 
0964 #endif // BOOST_REDIS_CONNECTION_BASE_HPP