Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:51:19

0001 /* Copyright (c) 2018-2023 Marcelo Zimbres Silva (mzimbres@gmail.com)
0002  *
0003  * Distributed under the Boost Software License, Version 1.0. (See
0004  * accompanying file LICENSE.txt)
0005  */
0006 
0007 #ifndef BOOST_REDIS_CONNECTION_BASE_HPP
0008 #define BOOST_REDIS_CONNECTION_BASE_HPP
0009 
0010 #include <boost/redis/adapter/adapt.hpp>
0011 #include <boost/redis/detail/helper.hpp>
0012 #include <boost/redis/error.hpp>
0013 #include <boost/redis/operation.hpp>
0014 #include <boost/redis/request.hpp>
0015 #include <boost/redis/resp3/type.hpp>
0016 #include <boost/redis/config.hpp>
0017 #include <boost/redis/detail/runner.hpp>
0018 #include <boost/redis/usage.hpp>
0019 
0020 #include <boost/system.hpp>
0021 #include <boost/asio/basic_stream_socket.hpp>
0022 #include <boost/asio/bind_executor.hpp>
0023 #include <boost/asio/experimental/parallel_group.hpp>
0024 #include <boost/asio/ip/tcp.hpp>
0025 #include <boost/asio/steady_timer.hpp>
0026 #include <boost/asio/write.hpp>
0027 #include <boost/assert.hpp>
0028 #include <boost/core/ignore_unused.hpp>
0029 #include <boost/asio/ssl/stream.hpp>
0030 #include <boost/asio/read_until.hpp>
0031 #include <boost/asio/buffer.hpp>
0032 #include <boost/asio/experimental/channel.hpp>
0033 
0034 #include <algorithm>
0035 #include <array>
0036 #include <chrono>
0037 #include <deque>
0038 #include <memory>
0039 #include <string_view>
0040 #include <type_traits>
0041 #include <functional>
0042 
0043 namespace boost::redis::detail
0044 {
0045 
0046 template <class DynamicBuffer>
0047 std::string_view buffer_view(DynamicBuffer buf) noexcept
0048 {
0049    char const* start = static_cast<char const*>(buf.data(0, buf.size()).data());
0050    return std::string_view{start, std::size(buf)};
0051 }
0052 
0053 template <class AsyncReadStream, class DynamicBuffer>
0054 class append_some_op {
0055 private:
0056    AsyncReadStream& stream_;
0057    DynamicBuffer buf_;
0058    std::size_t size_ = 0;
0059    std::size_t tmp_ = 0;
0060    asio::coroutine coro_{};
0061 
0062 public:
0063    append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
0064    : stream_ {stream}
0065    , buf_ {std::move(buf)}
0066    , size_{size}
0067    { }
0068 
0069    template <class Self>
0070    void operator()( Self& self
0071                   , system::error_code ec = {}
0072                   , std::size_t n = 0)
0073    {
0074       BOOST_ASIO_CORO_REENTER (coro_)
0075       {
0076          tmp_ = buf_.size();
0077          buf_.grow(size_);
0078 
0079          BOOST_ASIO_CORO_YIELD
0080          stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
0081          if (ec) {
0082             self.complete(ec, 0);
0083             return;
0084          }
0085 
0086          buf_.shrink(buf_.size() - tmp_ - n);
0087          self.complete({}, n);
0088       }
0089    }
0090 };
0091 
0092 template <class AsyncReadStream, class DynamicBuffer, class CompletionToken>
0093 auto
0094 async_append_some(
0095    AsyncReadStream& stream,
0096    DynamicBuffer buffer,
0097    std::size_t size,
0098    CompletionToken&& token)
0099 {
0100    return asio::async_compose
0101       < CompletionToken
0102       , void(system::error_code, std::size_t)
0103       >(append_some_op<AsyncReadStream, DynamicBuffer> {stream, buffer, size}, token, stream);
0104 }
0105 
0106 template <class Conn>
0107 struct exec_op {
0108    using req_info_type = typename Conn::req_info;
0109    using adapter_type = typename Conn::adapter_type;
0110 
0111    Conn* conn_ = nullptr;
0112    std::shared_ptr<req_info_type> info_ = nullptr;
0113    asio::coroutine coro{};
0114 
0115    template <class Self>
0116    void operator()(Self& self , system::error_code ec = {})
0117    {
0118       BOOST_ASIO_CORO_REENTER (coro)
0119       {
0120          // Check whether the user wants to wait for the connection to
0121          // be stablished.
0122          if (info_->req_->get_config().cancel_if_not_connected && !conn_->is_open()) {
0123             BOOST_ASIO_CORO_YIELD
0124             asio::post(std::move(self));
0125             return self.complete(error::not_connected, 0);
0126          }
0127 
0128          conn_->add_request_info(info_);
0129 
0130 EXEC_OP_WAIT:
0131          BOOST_ASIO_CORO_YIELD
0132          info_->async_wait(std::move(self));
0133          BOOST_ASSERT(ec == asio::error::operation_aborted);
0134 
0135          if (info_->ec_) {
0136             self.complete(info_->ec_, 0);
0137             return;
0138          }
0139 
0140          if (info_->stop_requested()) {
0141             // Don't have to call remove_request as it has already
0142             // been by cancel(exec).
0143             return self.complete(ec, 0);
0144          }
0145 
0146          if (is_cancelled(self)) {
0147             if (info_->is_written()) {
0148                using c_t = asio::cancellation_type;
0149                auto const c = self.get_cancellation_state().cancelled();
0150                if ((c & c_t::terminal) != c_t::none) {
0151                   // Cancellation requires closing the connection
0152                   // otherwise it stays in inconsistent state.
0153                   conn_->cancel(operation::run);
0154                   return self.complete(ec, 0);
0155                } else {
0156                   // Can't implement other cancelation types, ignoring.
0157                   self.get_cancellation_state().clear();
0158 
0159                   // TODO: Find out a better way to ignore
0160                   // cancelation.
0161                   goto EXEC_OP_WAIT;
0162                }
0163             } else {
0164                // Cancelation can be honored.
0165                conn_->remove_request(info_);
0166                self.complete(ec, 0);
0167                return;
0168             }
0169          }
0170 
0171          self.complete(info_->ec_, info_->read_size_);
0172       }
0173    }
0174 };
0175 
0176 template <class Conn, class Logger>
0177 struct run_op {
0178    Conn* conn = nullptr;
0179    Logger logger_;
0180    asio::coroutine coro{};
0181 
0182    template <class Self>
0183    void operator()( Self& self
0184                   , std::array<std::size_t, 2> order = {}
0185                   , system::error_code ec0 = {}
0186                   , system::error_code ec1 = {})
0187    {
0188       BOOST_ASIO_CORO_REENTER (coro)
0189       {
0190          conn->reset();
0191 
0192          BOOST_ASIO_CORO_YIELD
0193          asio::experimental::make_parallel_group(
0194             [this](auto token) { return conn->reader(logger_, token);},
0195             [this](auto token) { return conn->writer(logger_, token);}
0196          ).async_wait(
0197             asio::experimental::wait_for_one(),
0198             std::move(self));
0199 
0200          if (is_cancelled(self)) {
0201             logger_.trace("run-op: canceled. Exiting ...");
0202             self.complete(asio::error::operation_aborted);
0203             return;
0204          }
0205 
0206          logger_.on_run(ec0, ec1);
0207 
0208          switch (order[0]) {
0209            case 0: self.complete(ec0); break;
0210            case 1: self.complete(ec1); break;
0211            default: BOOST_ASSERT(false);
0212          }
0213       }
0214    }
0215 };
0216 
0217 template <class Conn, class Logger>
0218 struct writer_op {
0219    Conn* conn_;
0220    Logger logger_;
0221    asio::coroutine coro{};
0222 
0223    template <class Self>
0224    void operator()( Self& self
0225                   , system::error_code ec = {}
0226                   , std::size_t n = 0)
0227    {
0228       ignore_unused(n);
0229 
0230       BOOST_ASIO_CORO_REENTER (coro) for (;;)
0231       {
0232          while (conn_->coalesce_requests()) {
0233             if (conn_->use_ssl())
0234                BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
0235             else
0236                BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer().next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
0237 
0238             logger_.on_write(ec, conn_->write_buffer_);
0239 
0240             if (ec) {
0241                logger_.trace("writer-op: error. Exiting ...");
0242                conn_->cancel(operation::run);
0243                self.complete(ec);
0244                return;
0245             }
0246 
0247             if (is_cancelled(self)) {
0248                logger_.trace("writer-op: canceled. Exiting ...");
0249                self.complete(asio::error::operation_aborted);
0250                return;
0251             }
0252 
0253             conn_->on_write();
0254 
0255             // A socket.close() may have been called while a
0256             // successful write might had already been queued, so we
0257             // have to check here before proceeding.
0258             if (!conn_->is_open()) {
0259                logger_.trace("writer-op: canceled (2). Exiting ...");
0260                self.complete({});
0261                return;
0262             }
0263          }
0264 
0265          BOOST_ASIO_CORO_YIELD
0266          conn_->writer_timer_.async_wait(std::move(self));
0267          if (!conn_->is_open() || is_cancelled(self)) {
0268             logger_.trace("writer-op: canceled (3). Exiting ...");
0269             // Notice this is not an error of the op, stoping was
0270             // requested from the outside, so we complete with
0271             // success.
0272             self.complete({});
0273             return;
0274          }
0275       }
0276    }
0277 };
0278 
0279 template <class Conn, class Logger>
0280 struct reader_op {
0281    using parse_result = typename Conn::parse_result;
0282    using parse_ret_type = typename Conn::parse_ret_type;
0283    Conn* conn_;
0284    Logger logger_;
0285    parse_ret_type res_{parse_result::resp, 0};
0286    asio::coroutine coro{};
0287 
0288    template <class Self>
0289    void operator()( Self& self
0290                   , system::error_code ec = {}
0291                   , std::size_t n = 0)
0292    {
0293       ignore_unused(n);
0294 
0295       BOOST_ASIO_CORO_REENTER (coro) for (;;)
0296       {
0297          // Appends some data to the buffer if necessary.
0298          if ((res_.first == parse_result::needs_more) || std::empty(conn_->read_buffer_)) {
0299             if (conn_->use_ssl()) {
0300                BOOST_ASIO_CORO_YIELD
0301                async_append_some(
0302                   conn_->next_layer(),
0303                   conn_->dbuf_,
0304                   conn_->get_suggested_buffer_growth(),
0305                   std::move(self));
0306             } else {
0307                BOOST_ASIO_CORO_YIELD
0308                async_append_some(
0309                   conn_->next_layer().next_layer(),
0310                   conn_->dbuf_,
0311                   conn_->get_suggested_buffer_growth(),
0312                   std::move(self));
0313             }
0314 
0315             logger_.on_read(ec, n);
0316 
0317             // EOF is not treated as error.
0318             if (ec == asio::error::eof) {
0319                logger_.trace("reader-op: EOF received. Exiting ...");
0320                conn_->cancel(operation::run);
0321                return self.complete({}); // EOFINAE: EOF is not an error.
0322             }
0323 
0324             // The connection is not viable after an error.
0325             if (ec) {
0326                logger_.trace("reader-op: error. Exiting ...");
0327                conn_->cancel(operation::run);
0328                self.complete(ec);
0329                return;
0330             }
0331 
0332             // Somebody might have canceled implicitly or explicitly
0333             // while we were suspended and after queueing so we have to
0334             // check.
0335             if (!conn_->is_open() || is_cancelled(self)) {
0336                logger_.trace("reader-op: canceled. Exiting ...");
0337                self.complete(ec);
0338                return;
0339             }
0340          }
0341 
0342          res_ = conn_->on_read(buffer_view(conn_->dbuf_), ec);
0343          if (ec) {
0344             logger_.trace("reader-op: parse error. Exiting ...");
0345             conn_->cancel(operation::run);
0346             self.complete(ec);
0347             return;
0348          }
0349 
0350          if (res_.first == parse_result::push) {
0351             if (!conn_->receive_channel_.try_send(ec, res_.second)) {
0352                BOOST_ASIO_CORO_YIELD
0353                conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
0354             }
0355 
0356             if (ec) {
0357                logger_.trace("reader-op: error. Exiting ...");
0358                conn_->cancel(operation::run);
0359                self.complete(ec);
0360                return;
0361             }
0362 
0363             if (!conn_->is_open() || is_cancelled(self)) {
0364                logger_.trace("reader-op: canceled (2). Exiting ...");
0365                self.complete(asio::error::operation_aborted);
0366                return;
0367             }
0368 
0369          }
0370       }
0371    }
0372 };
0373 
0374 /** @brief Base class for high level Redis asynchronous connections.
0375  *  @ingroup high-level-api
0376  *
0377  *  @tparam Executor The executor type.
0378  *
0379  */
0380 template <class Executor>
0381 class connection_base {
0382 public:
0383    /// Executor type
0384    using executor_type = Executor;
0385 
0386    /// Type of the next layer
0387    using next_layer_type = asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>>;
0388 
0389    using clock_type = std::chrono::steady_clock;
0390    using clock_traits_type = asio::wait_traits<clock_type>;
0391    using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
0392 
0393    using this_type = connection_base<Executor>;
0394 
0395    /// Constructs from an executor.
0396    connection_base(
0397       executor_type ex,
0398       asio::ssl::context::method method,
0399       std::size_t max_read_size)
0400    : ctx_{method}
0401    , stream_{std::make_unique<next_layer_type>(ex, ctx_)}
0402    , writer_timer_{ex}
0403    , receive_channel_{ex, 256}
0404    , runner_{ex, {}}
0405    , dbuf_{read_buffer_, max_read_size}
0406    {
0407       set_receive_response(ignore);
0408       writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)());
0409    }
0410 
0411    /// Returns the ssl context.
0412    auto const& get_ssl_context() const noexcept
0413       { return ctx_;}
0414 
0415    /// Returns the ssl context.
0416    auto& get_ssl_context() noexcept
0417       { return ctx_;}
0418 
0419    /// Resets the underlying stream.
0420    void reset_stream()
0421    {
0422       stream_ = std::make_unique<next_layer_type>(writer_timer_.get_executor(), ctx_);
0423    }
0424 
0425    /// Returns a reference to the next layer.
0426    auto& next_layer() noexcept { return *stream_; }
0427 
0428    /// Returns a const reference to the next layer.
0429    auto const& next_layer() const noexcept { return *stream_; }
0430 
0431    /// Returns the associated executor.
0432    auto get_executor() {return writer_timer_.get_executor();}
0433 
0434    /// Cancels specific operations.
0435    void cancel(operation op)
0436    {
0437       runner_.cancel(op);
0438       if (op == operation::all) {
0439          cancel_impl(operation::run);
0440          cancel_impl(operation::receive);
0441          cancel_impl(operation::exec);
0442          return;
0443       } 
0444 
0445       cancel_impl(op);
0446    }
0447 
0448    template <class Response, class CompletionToken>
0449    auto async_exec(request const& req, Response& resp, CompletionToken token)
0450    {
0451       using namespace boost::redis::adapter;
0452       auto f = boost_redis_adapt(resp);
0453       BOOST_ASSERT_MSG(req.get_expected_responses() <= f.get_supported_response_size(), "Request and response have incompatible sizes.");
0454 
0455       auto info = std::make_shared<req_info>(req, f, get_executor());
0456 
0457       return asio::async_compose
0458          < CompletionToken
0459          , void(system::error_code, std::size_t)
0460          >(exec_op<this_type>{this, info}, token, writer_timer_);
0461    }
0462 
0463    template <class Response, class CompletionToken>
0464    [[deprecated("Set the response with set_receive_response and use the other overload.")]]
0465    auto async_receive(Response& response, CompletionToken token)
0466    {
0467       set_receive_response(response);
0468       return receive_channel_.async_receive(std::move(token));
0469    }
0470 
0471    template <class CompletionToken>
0472    auto async_receive(CompletionToken token)
0473       { return receive_channel_.async_receive(std::move(token)); }
0474 
0475    std::size_t receive(system::error_code& ec)
0476    {
0477       std::size_t size = 0;
0478 
0479       auto f = [&](system::error_code const& ec2, std::size_t n)
0480       {
0481          ec = ec2;
0482          size = n;
0483       };
0484 
0485       auto const res = receive_channel_.try_receive(f);
0486       if (ec)
0487          return 0;
0488 
0489       if (!res)
0490          ec = error::sync_receive_push_failed;
0491 
0492       return size;
0493    }
0494 
0495    template <class Logger, class CompletionToken>
0496    auto async_run(config const& cfg, Logger l, CompletionToken token)
0497    {
0498       runner_.set_config(cfg);
0499       l.set_prefix(runner_.get_config().log_prefix);
0500       return runner_.async_run(*this, l, std::move(token));
0501    }
0502 
0503    template <class Response>
0504    void set_receive_response(Response& response)
0505    {
0506       using namespace boost::redis::adapter;
0507       auto g = boost_redis_adapt(response);
0508       receive_adapter_ = adapter::detail::make_adapter_wrapper(g);
0509    }
0510 
0511    usage get_usage() const noexcept
0512       { return usage_; }
0513 
0514 private:
0515    using receive_channel_type = asio::experimental::channel<executor_type, void(system::error_code, std::size_t)>;
0516    using runner_type = runner<executor_type>;
0517    using adapter_type = std::function<void(std::size_t, resp3::basic_node<std::string_view> const&, system::error_code&)>;
0518    using receiver_adapter_type = std::function<void(resp3::basic_node<std::string_view> const&, system::error_code&)>;
0519 
0520    auto use_ssl() const noexcept
0521       { return runner_.get_config().use_ssl;}
0522 
0523    auto cancel_on_conn_lost() -> std::size_t
0524    {
0525       // Must return false if the request should be removed.
0526       auto cond = [](auto const& ptr)
0527       {
0528          BOOST_ASSERT(ptr != nullptr);
0529 
0530          if (ptr->is_written()) {
0531             return !ptr->req_->get_config().cancel_if_unresponded;
0532          } else {
0533             return !ptr->req_->get_config().cancel_on_connection_lost;
0534          }
0535       };
0536 
0537       auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond);
0538 
0539       auto const ret = std::distance(point, std::end(reqs_));
0540 
0541       std::for_each(point, std::end(reqs_), [](auto const& ptr) {
0542          ptr->stop();
0543       });
0544 
0545       reqs_.erase(point, std::end(reqs_));
0546       std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
0547          return ptr->reset_status();
0548       });
0549 
0550       return ret;
0551    }
0552 
0553    auto cancel_unwritten_requests() -> std::size_t
0554    {
0555       auto f = [](auto const& ptr)
0556       {
0557          BOOST_ASSERT(ptr != nullptr);
0558          return ptr->is_written();
0559       };
0560 
0561       auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f);
0562 
0563       auto const ret = std::distance(point, std::end(reqs_));
0564 
0565       std::for_each(point, std::end(reqs_), [](auto const& ptr) {
0566          ptr->stop();
0567       });
0568 
0569       reqs_.erase(point, std::end(reqs_));
0570       return ret;
0571    }
0572 
0573    void cancel_impl(operation op)
0574    {
0575       switch (op) {
0576          case operation::exec:
0577          {
0578             cancel_unwritten_requests();
0579          } break;
0580          case operation::run:
0581          {
0582             close();
0583             writer_timer_.cancel();
0584             receive_channel_.cancel();
0585             cancel_on_conn_lost();
0586          } break;
0587          case operation::receive:
0588          {
0589             receive_channel_.cancel();
0590          } break;
0591          default: /* ignore */;
0592       }
0593    }
0594 
0595    void on_write()
0596    {
0597       // We have to clear the payload right after writing it to use it
0598       // as a flag that informs there is no ongoing write.
0599       write_buffer_.clear();
0600 
0601       // Notice this must come before the for-each below.
0602       cancel_push_requests();
0603 
0604       // There is small optimization possible here: traverse only the
0605       // partition of unwritten requests instead of them all.
0606       std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
0607          BOOST_ASSERT_MSG(ptr != nullptr, "Expects non-null pointer.");
0608          if (ptr->is_staged())
0609             ptr->mark_written();
0610       });
0611    }
0612 
0613    struct req_info {
0614    public:
0615       using node_type = resp3::basic_node<std::string_view>;
0616       using wrapped_adapter_type = std::function<void(node_type const&, system::error_code&)>;
0617 
0618       enum class action
0619       {
0620          stop,
0621          proceed,
0622          none,
0623       };
0624 
0625       explicit req_info(request const& req, adapter_type adapter, executor_type ex)
0626       : timer_{ex}
0627       , action_{action::none}
0628       , req_{&req}
0629       , adapter_{}
0630       , expected_responses_{req.get_expected_responses()}
0631       , status_{status::none}
0632       , ec_{{}}
0633       , read_size_{0}
0634       {
0635          timer_.expires_at((std::chrono::steady_clock::time_point::max)());
0636 
0637          adapter_ = [this, adapter](node_type const& nd, system::error_code& ec)
0638          {
0639             auto const i = req_->get_expected_responses() - expected_responses_;
0640             adapter(i, nd, ec);
0641          };
0642       }
0643 
0644       auto proceed()
0645       {
0646          timer_.cancel();
0647          action_ = action::proceed;
0648       }
0649 
0650       void stop()
0651       {
0652          timer_.cancel();
0653          action_ = action::stop;
0654       }
0655 
0656       [[nodiscard]] auto is_waiting_write() const noexcept
0657          { return !is_written() && !is_staged(); }
0658 
0659       [[nodiscard]] auto is_written() const noexcept
0660          { return status_ == status::written; }
0661 
0662       [[nodiscard]] auto is_staged() const noexcept
0663          { return status_ == status::staged; }
0664 
0665       void mark_written() noexcept
0666          { status_ = status::written; }
0667 
0668       void mark_staged() noexcept
0669          { status_ = status::staged; }
0670 
0671       void reset_status() noexcept
0672          { status_ = status::none; }
0673 
0674       [[nodiscard]] auto stop_requested() const noexcept
0675          { return action_ == action::stop;}
0676 
0677       template <class CompletionToken>
0678       auto async_wait(CompletionToken token)
0679       {
0680          return timer_.async_wait(std::move(token));
0681       }
0682 
0683    //private:
0684       enum class status
0685       { none
0686       , staged
0687       , written
0688       };
0689 
0690       timer_type timer_;
0691       action action_;
0692       request const* req_;
0693       wrapped_adapter_type adapter_;
0694 
0695       // Contains the number of commands that haven't been read yet.
0696       std::size_t expected_responses_;
0697       status status_;
0698 
0699       system::error_code ec_;
0700       std::size_t read_size_;
0701    };
0702 
0703    void remove_request(std::shared_ptr<req_info> const& info)
0704    {
0705       reqs_.erase(std::remove(std::begin(reqs_), std::end(reqs_), info));
0706    }
0707 
0708    using reqs_type = std::deque<std::shared_ptr<req_info>>;
0709 
0710    template <class, class> friend struct reader_op;
0711    template <class, class> friend struct writer_op;
0712    template <class, class> friend struct run_op;
0713    template <class> friend struct exec_op;
0714    template <class, class, class> friend struct run_all_op;
0715 
0716    void cancel_push_requests()
0717    {
0718       auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
0719          return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0);
0720       });
0721 
0722       std::for_each(point, std::end(reqs_), [](auto const& ptr) {
0723          ptr->proceed();
0724       });
0725 
0726       reqs_.erase(point, std::end(reqs_));
0727    }
0728 
0729    [[nodiscard]] bool is_writing() const noexcept
0730    {
0731       return !write_buffer_.empty();
0732    }
0733 
0734    void add_request_info(std::shared_ptr<req_info> const& info)
0735    {
0736       reqs_.push_back(info);
0737 
0738       if (info->req_->has_hello_priority()) {
0739          auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) {
0740                return e->is_waiting_write();
0741          });
0742 
0743          std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend);
0744       }
0745 
0746       if (is_open() && !is_writing())
0747          writer_timer_.cancel();
0748    }
0749 
0750    template <class CompletionToken, class Logger>
0751    auto reader(Logger l, CompletionToken&& token)
0752    {
0753       return asio::async_compose
0754          < CompletionToken
0755          , void(system::error_code)
0756          >(reader_op<this_type, Logger>{this, l}, token, writer_timer_);
0757    }
0758 
0759    template <class CompletionToken, class Logger>
0760    auto writer(Logger l, CompletionToken&& token)
0761    {
0762       return asio::async_compose
0763          < CompletionToken
0764          , void(system::error_code)
0765          >(writer_op<this_type, Logger>{this, l}, token, writer_timer_);
0766    }
0767 
0768    template <class Logger, class CompletionToken>
0769    auto async_run_lean(config const& cfg, Logger l, CompletionToken token)
0770    {
0771       runner_.set_config(cfg);
0772       l.set_prefix(runner_.get_config().log_prefix);
0773       return asio::async_compose
0774          < CompletionToken
0775          , void(system::error_code)
0776          >(run_op<this_type, Logger>{this, l}, token, writer_timer_);
0777    }
0778 
0779    [[nodiscard]] bool coalesce_requests()
0780    {
0781       // Coalesces the requests and marks them staged. After a
0782       // successful write staged requests will be marked as written.
0783       auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](auto const& ri) {
0784             return !ri->is_waiting_write();
0785       });
0786 
0787       std::for_each(point, std::cend(reqs_), [this](auto const& ri) {
0788          // Stage the request.
0789          write_buffer_ += ri->req_->payload();
0790          ri->mark_staged();
0791          usage_.commands_sent += ri->expected_responses_;
0792       });
0793 
0794       usage_.bytes_sent += std::size(write_buffer_);
0795 
0796       return point != std::cend(reqs_);
0797    }
0798 
0799    bool is_waiting_response() const noexcept
0800    {
0801       return !std::empty(reqs_) && reqs_.front()->is_written();
0802    }
0803 
0804    void close()
0805    {
0806       if (stream_->next_layer().is_open()) {
0807          system::error_code ec;
0808          stream_->next_layer().close(ec);
0809       }
0810    }
0811 
0812    auto is_open() const noexcept { return stream_->next_layer().is_open(); }
0813    auto& lowest_layer() noexcept { return stream_->lowest_layer(); }
0814 
0815    auto is_next_push()
0816    {
0817       // We handle unsolicited events in the following way
0818       //
0819       // 1. Its resp3 type is a push.
0820       //
0821       // 2. A non-push type is received with an empty requests
0822       //    queue. I have noticed this is possible (e.g. -MISCONF).
0823       //    I expect them to have type push so we can distinguish
0824       //    them from responses to commands, but it is a
0825       //    simple-error. If we are lucky enough to receive them
0826       //    when the command queue is empty we can treat them as
0827       //    server pushes, otherwise it is impossible to handle
0828       //    them properly
0829       //
0830       // 3. The request does not expect any response but we got
0831       //    one. This may happen if for example, subscribe with
0832       //    wrong syntax.
0833       //
0834       // Useful links:
0835       //
0836       // - https://github.com/redis/redis/issues/11784
0837       // - https://github.com/redis/redis/issues/6426
0838       //
0839 
0840       BOOST_ASSERT(!read_buffer_.empty());
0841 
0842       return
0843          (resp3::to_type(read_buffer_.front()) == resp3::type::push)
0844           || reqs_.empty()
0845           || (!reqs_.empty() && reqs_.front()->expected_responses_ == 0)
0846           || !is_waiting_response(); // Added to deal with MONITOR.
0847    }
0848 
0849    auto get_suggested_buffer_growth() const noexcept
0850    {
0851       return parser_.get_suggested_buffer_growth(4096);
0852    }
0853 
0854    enum class parse_result { needs_more, push, resp };
0855 
0856    using parse_ret_type = std::pair<parse_result, std::size_t>;
0857 
0858    parse_ret_type on_finish_parsing(parse_result t)
0859    {
0860       if (t == parse_result::push) {
0861          usage_.pushes_received += 1;
0862          usage_.push_bytes_received += parser_.get_consumed();
0863       } else {
0864          usage_.responses_received += 1;
0865          usage_.response_bytes_received += parser_.get_consumed();
0866       }
0867 
0868       on_push_ = false;
0869       dbuf_.consume(parser_.get_consumed());
0870       auto const res = std::make_pair(t, parser_.get_consumed());
0871       parser_.reset();
0872       return res;
0873    }
0874 
0875    parse_ret_type on_read(std::string_view data, system::error_code& ec)
0876    {
0877       // We arrive here in two states:
0878       //
0879       //    1. While we are parsing a message. In this case we
0880       //       don't want to determine the type of the message in the
0881       //       buffer (i.e. response vs push) but leave it untouched
0882       //       until the parsing of a complete message ends.
0883       //
0884       //    2. On a new message, in which case we have to determine
0885       //       whether the next messag is a push or a response.
0886       //
0887       if (!on_push_) // Prepare for new message.
0888          on_push_ = is_next_push();
0889 
0890       if (on_push_) {
0891          if (!resp3::parse(parser_, data, receive_adapter_, ec))
0892             return std::make_pair(parse_result::needs_more, 0);
0893 
0894          if (ec)
0895             return std::make_pair(parse_result::push, 0);
0896 
0897          return on_finish_parsing(parse_result::push);
0898       }
0899 
0900       BOOST_ASSERT_MSG(is_waiting_response(), "Not waiting for a response (using MONITOR command perhaps?)");
0901       BOOST_ASSERT(!reqs_.empty());
0902       BOOST_ASSERT(reqs_.front() != nullptr);
0903       BOOST_ASSERT(reqs_.front()->expected_responses_ != 0);
0904 
0905       if (!resp3::parse(parser_, data, reqs_.front()->adapter_, ec))
0906          return std::make_pair(parse_result::needs_more, 0);
0907 
0908       if (ec) {
0909          reqs_.front()->ec_ = ec;
0910          reqs_.front()->proceed();
0911          return std::make_pair(parse_result::resp, 0);
0912       }
0913 
0914       reqs_.front()->read_size_ += parser_.get_consumed();
0915 
0916       if (--reqs_.front()->expected_responses_ == 0) {
0917          // Done with this request.
0918          reqs_.front()->proceed();
0919          reqs_.pop_front();
0920       }
0921 
0922       return on_finish_parsing(parse_result::resp);
0923    }
0924 
0925    void reset()
0926    {
0927       write_buffer_.clear();
0928       read_buffer_.clear();
0929       parser_.reset();
0930       on_push_ = false;
0931    }
0932 
0933    asio::ssl::context ctx_;
0934    std::unique_ptr<next_layer_type> stream_;
0935 
0936    // Notice we use a timer to simulate a condition-variable. It is
0937    // also more suitable than a channel and the notify operation does
0938    // not suspend.
0939    timer_type writer_timer_;
0940    receive_channel_type receive_channel_;
0941    runner_type runner_;
0942    receiver_adapter_type receive_adapter_;
0943 
0944    using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
0945 
0946    std::string read_buffer_;
0947    dyn_buffer_type dbuf_;
0948    std::string write_buffer_;
0949    reqs_type reqs_;
0950    resp3::parser parser_{};
0951    bool on_push_ = false;
0952 
0953    usage usage_;
0954 };
0955 
0956 } // boost::redis::detail
0957 
0958 #endif // BOOST_REDIS_CONNECTION_BASE_HPP