File indexing completed on 2025-07-15 08:49:04
0001
0002
0003
0004
0005
0006
0007 #ifndef BOOST_REDIS_CONNECTION_BASE_HPP
0008 #define BOOST_REDIS_CONNECTION_BASE_HPP
0009
0010 #include <boost/redis/adapter/adapt.hpp>
0011 #include <boost/redis/detail/helper.hpp>
0012 #include <boost/redis/error.hpp>
0013 #include <boost/redis/operation.hpp>
0014 #include <boost/redis/request.hpp>
0015 #include <boost/redis/resp3/type.hpp>
0016 #include <boost/redis/config.hpp>
0017 #include <boost/redis/detail/runner.hpp>
0018 #include <boost/redis/usage.hpp>
0019
0020 #include <boost/system.hpp>
0021 #include <boost/asio/basic_stream_socket.hpp>
0022 #include <boost/asio/bind_executor.hpp>
0023 #include <boost/asio/experimental/parallel_group.hpp>
0024 #include <boost/asio/ip/tcp.hpp>
0025 #include <boost/asio/steady_timer.hpp>
0026 #include <boost/asio/write.hpp>
0027 #include <boost/assert.hpp>
0028 #include <boost/core/ignore_unused.hpp>
0029 #include <boost/asio/ssl/stream.hpp>
0030 #include <boost/asio/read_until.hpp>
0031 #include <boost/asio/buffer.hpp>
0032 #include <boost/asio/experimental/channel.hpp>
0033
0034 #include <algorithm>
0035 #include <array>
0036 #include <chrono>
0037 #include <deque>
0038 #include <memory>
0039 #include <string_view>
0040 #include <type_traits>
0041 #include <functional>
0042
0043 namespace boost::redis::detail
0044 {
0045
0046 template <class DynamicBuffer>
0047 std::string_view buffer_view(DynamicBuffer buf) noexcept
0048 {
0049 char const* start = static_cast<char const*>(buf.data(0, buf.size()).data());
0050 return std::string_view{start, std::size(buf)};
0051 }
0052
0053 template <class AsyncReadStream, class DynamicBuffer>
0054 class append_some_op {
0055 private:
0056 AsyncReadStream& stream_;
0057 DynamicBuffer buf_;
0058 std::size_t size_ = 0;
0059 std::size_t tmp_ = 0;
0060 asio::coroutine coro_{};
0061
0062 public:
0063 append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
0064 : stream_ {stream}
0065 , buf_ {std::move(buf)}
0066 , size_{size}
0067 { }
0068
0069 template <class Self>
0070 void operator()( Self& self
0071 , system::error_code ec = {}
0072 , std::size_t n = 0)
0073 {
0074 BOOST_ASIO_CORO_REENTER (coro_)
0075 {
0076 tmp_ = buf_.size();
0077 buf_.grow(size_);
0078
0079 BOOST_ASIO_CORO_YIELD
0080 stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
0081 if (ec) {
0082 self.complete(ec, 0);
0083 return;
0084 }
0085
0086 buf_.shrink(buf_.size() - tmp_ - n);
0087 self.complete({}, n);
0088 }
0089 }
0090 };
0091
0092 template <class AsyncReadStream, class DynamicBuffer, class CompletionToken>
0093 auto
0094 async_append_some(
0095 AsyncReadStream& stream,
0096 DynamicBuffer buffer,
0097 std::size_t size,
0098 CompletionToken&& token)
0099 {
0100 return asio::async_compose
0101 < CompletionToken
0102 , void(system::error_code, std::size_t)
0103 >(append_some_op<AsyncReadStream, DynamicBuffer> {stream, buffer, size}, token, stream);
0104 }
0105
0106 template <class Conn>
0107 struct exec_op {
0108 using req_info_type = typename Conn::req_info;
0109 using adapter_type = typename Conn::adapter_type;
0110
0111 Conn* conn_ = nullptr;
0112 std::shared_ptr<req_info_type> info_ = nullptr;
0113 asio::coroutine coro{};
0114
0115 template <class Self>
0116 void operator()(Self& self , system::error_code = {}, std::size_t = 0)
0117 {
0118 BOOST_ASIO_CORO_REENTER (coro)
0119 {
0120
0121
0122 if (info_->req_->get_config().cancel_if_not_connected && !conn_->is_open()) {
0123 BOOST_ASIO_CORO_YIELD
0124 asio::post(std::move(self));
0125 return self.complete(error::not_connected, 0);
0126 }
0127
0128 conn_->add_request_info(info_);
0129
0130 EXEC_OP_WAIT:
0131 BOOST_ASIO_CORO_YIELD
0132 info_->async_wait(std::move(self));
0133
0134 if (info_->ec_) {
0135 self.complete(info_->ec_, 0);
0136 return;
0137 }
0138
0139 if (info_->stop_requested()) {
0140
0141
0142 return self.complete(asio::error::operation_aborted, 0);
0143 }
0144
0145 if (is_cancelled(self)) {
0146 if (!info_->is_waiting()) {
0147 using c_t = asio::cancellation_type;
0148 auto const c = self.get_cancellation_state().cancelled();
0149 if ((c & c_t::terminal) != c_t::none) {
0150
0151
0152 conn_->cancel(operation::run);
0153 return self.complete(asio::error::operation_aborted, 0);
0154 } else {
0155
0156 self.get_cancellation_state().clear();
0157
0158
0159
0160 goto EXEC_OP_WAIT;
0161 }
0162 } else {
0163
0164 conn_->remove_request(info_);
0165 self.complete(asio::error::operation_aborted, 0);
0166 return;
0167 }
0168 }
0169
0170 self.complete(info_->ec_, info_->read_size_);
0171 }
0172 }
0173 };
0174
0175 template <class Conn, class Logger>
0176 struct run_op {
0177 Conn* conn = nullptr;
0178 Logger logger_;
0179 asio::coroutine coro{};
0180
0181 template <class Self>
0182 void operator()( Self& self
0183 , std::array<std::size_t, 2> order = {}
0184 , system::error_code ec0 = {}
0185 , system::error_code ec1 = {})
0186 {
0187 BOOST_ASIO_CORO_REENTER (coro)
0188 {
0189 conn->reset();
0190
0191 BOOST_ASIO_CORO_YIELD
0192 asio::experimental::make_parallel_group(
0193 [this](auto token) { return conn->reader(logger_, token);},
0194 [this](auto token) { return conn->writer(logger_, token);}
0195 ).async_wait(
0196 asio::experimental::wait_for_one(),
0197 std::move(self));
0198
0199 if (is_cancelled(self)) {
0200 logger_.trace("run-op: canceled. Exiting ...");
0201 self.complete(asio::error::operation_aborted);
0202 return;
0203 }
0204
0205 logger_.on_run(ec0, ec1);
0206
0207 switch (order[0]) {
0208 case 0: self.complete(ec0); break;
0209 case 1: self.complete(ec1); break;
0210 default: BOOST_ASSERT(false);
0211 }
0212 }
0213 }
0214 };
0215
0216 template <class Conn, class Logger>
0217 struct writer_op {
0218 Conn* conn_;
0219 Logger logger_;
0220 asio::coroutine coro{};
0221
0222 template <class Self>
0223 void operator()( Self& self
0224 , system::error_code ec = {}
0225 , std::size_t n = 0)
0226 {
0227 ignore_unused(n);
0228
0229 BOOST_ASIO_CORO_REENTER (coro) for (;;)
0230 {
0231 while (conn_->coalesce_requests()) {
0232 if (conn_->use_ssl())
0233 BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
0234 else
0235 BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer().next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
0236
0237 logger_.on_write(ec, conn_->write_buffer_);
0238
0239 if (ec) {
0240 logger_.trace("writer-op: error. Exiting ...");
0241 conn_->cancel(operation::run);
0242 self.complete(ec);
0243 return;
0244 }
0245
0246 if (is_cancelled(self)) {
0247 logger_.trace("writer-op: canceled. Exiting ...");
0248 self.complete(asio::error::operation_aborted);
0249 return;
0250 }
0251
0252 conn_->on_write();
0253
0254
0255
0256
0257 if (!conn_->is_open()) {
0258 logger_.trace("writer-op: canceled (2). Exiting ...");
0259 self.complete({});
0260 return;
0261 }
0262 }
0263
0264 BOOST_ASIO_CORO_YIELD
0265 conn_->writer_timer_.async_wait(std::move(self));
0266 if (!conn_->is_open() || is_cancelled(self)) {
0267 logger_.trace("writer-op: canceled (3). Exiting ...");
0268
0269
0270
0271 self.complete({});
0272 return;
0273 }
0274 }
0275 }
0276 };
0277
0278 template <class Conn, class Logger>
0279 struct reader_op {
0280 using parse_result = typename Conn::parse_result;
0281 using parse_ret_type = typename Conn::parse_ret_type;
0282 Conn* conn_;
0283 Logger logger_;
0284 parse_ret_type res_{parse_result::resp, 0};
0285 asio::coroutine coro{};
0286
0287 template <class Self>
0288 void operator()( Self& self
0289 , system::error_code ec = {}
0290 , std::size_t n = 0)
0291 {
0292 ignore_unused(n);
0293
0294 BOOST_ASIO_CORO_REENTER (coro) for (;;)
0295 {
0296
0297 if ((res_.first == parse_result::needs_more) || std::empty(conn_->read_buffer_)) {
0298 if (conn_->use_ssl()) {
0299 BOOST_ASIO_CORO_YIELD
0300 async_append_some(
0301 conn_->next_layer(),
0302 conn_->dbuf_,
0303 conn_->get_suggested_buffer_growth(),
0304 std::move(self));
0305 } else {
0306 BOOST_ASIO_CORO_YIELD
0307 async_append_some(
0308 conn_->next_layer().next_layer(),
0309 conn_->dbuf_,
0310 conn_->get_suggested_buffer_growth(),
0311 std::move(self));
0312 }
0313
0314 logger_.on_read(ec, n);
0315
0316
0317 if (ec == asio::error::eof) {
0318 logger_.trace("reader-op: EOF received. Exiting ...");
0319 conn_->cancel(operation::run);
0320 return self.complete({});
0321 }
0322
0323
0324 if (ec) {
0325 logger_.trace("reader-op: error. Exiting ...");
0326 conn_->cancel(operation::run);
0327 self.complete(ec);
0328 return;
0329 }
0330
0331
0332
0333
0334 if (!conn_->is_open() || is_cancelled(self)) {
0335 logger_.trace("reader-op: canceled. Exiting ...");
0336 self.complete(ec);
0337 return;
0338 }
0339 }
0340
0341 res_ = conn_->on_read(buffer_view(conn_->dbuf_), ec);
0342 if (ec) {
0343 logger_.trace("reader-op: parse error. Exiting ...");
0344 conn_->cancel(operation::run);
0345 self.complete(ec);
0346 return;
0347 }
0348
0349 if (res_.first == parse_result::push) {
0350 if (!conn_->receive_channel_.try_send(ec, res_.second)) {
0351 BOOST_ASIO_CORO_YIELD
0352 conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
0353 }
0354
0355 if (ec) {
0356 logger_.trace("reader-op: error. Exiting ...");
0357 conn_->cancel(operation::run);
0358 self.complete(ec);
0359 return;
0360 }
0361
0362 if (!conn_->is_open() || is_cancelled(self)) {
0363 logger_.trace("reader-op: canceled (2). Exiting ...");
0364 self.complete(asio::error::operation_aborted);
0365 return;
0366 }
0367
0368 }
0369 }
0370 }
0371 };
0372
0373
0374
0375
0376
0377
0378
0379 template <class Executor>
0380 class connection_base {
0381 public:
0382
0383 using executor_type = Executor;
0384
0385
0386 using next_layer_type = asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>>;
0387
0388 using clock_type = std::chrono::steady_clock;
0389 using clock_traits_type = asio::wait_traits<clock_type>;
0390 using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
0391
0392 using this_type = connection_base<Executor>;
0393
0394
0395 connection_base(
0396 executor_type ex,
0397 asio::ssl::context ctx,
0398 std::size_t max_read_size)
0399 : ctx_{std::move(ctx)}
0400 , stream_{std::make_unique<next_layer_type>(ex, ctx_)}
0401 , writer_timer_{ex}
0402 , receive_channel_{ex, 256}
0403 , runner_{ex, {}}
0404 , dbuf_{read_buffer_, max_read_size}
0405 {
0406 set_receive_response(ignore);
0407 writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)());
0408 }
0409
0410
0411 auto const& get_ssl_context() const noexcept
0412 { return ctx_;}
0413
0414
0415 void reset_stream()
0416 {
0417 stream_ = std::make_unique<next_layer_type>(writer_timer_.get_executor(), ctx_);
0418 }
0419
0420
0421 auto& next_layer() noexcept { return *stream_; }
0422
0423
0424 auto const& next_layer() const noexcept { return *stream_; }
0425
0426
0427 auto get_executor() {return writer_timer_.get_executor();}
0428
0429
0430 void cancel(operation op)
0431 {
0432 runner_.cancel(op);
0433 if (op == operation::all) {
0434 cancel_impl(operation::run);
0435 cancel_impl(operation::receive);
0436 cancel_impl(operation::exec);
0437 return;
0438 }
0439
0440 cancel_impl(op);
0441 }
0442
0443 template <class Response, class CompletionToken>
0444 auto async_exec(request const& req, Response& resp, CompletionToken token)
0445 {
0446 using namespace boost::redis::adapter;
0447 auto f = boost_redis_adapt(resp);
0448 BOOST_ASSERT_MSG(req.get_expected_responses() <= f.get_supported_response_size(), "Request and response have incompatible sizes.");
0449
0450 auto info = std::make_shared<req_info>(req, f, get_executor());
0451
0452 return asio::async_compose
0453 < CompletionToken
0454 , void(system::error_code, std::size_t)
0455 >(exec_op<this_type>{this, info}, token, writer_timer_);
0456 }
0457
0458 template <class Response, class CompletionToken>
0459 [[deprecated("Set the response with set_receive_response and use the other overload.")]]
0460 auto async_receive(Response& response, CompletionToken token)
0461 {
0462 set_receive_response(response);
0463 return receive_channel_.async_receive(std::move(token));
0464 }
0465
0466 template <class CompletionToken>
0467 auto async_receive(CompletionToken token)
0468 { return receive_channel_.async_receive(std::move(token)); }
0469
0470 std::size_t receive(system::error_code& ec)
0471 {
0472 std::size_t size = 0;
0473
0474 auto f = [&](system::error_code const& ec2, std::size_t n)
0475 {
0476 ec = ec2;
0477 size = n;
0478 };
0479
0480 auto const res = receive_channel_.try_receive(f);
0481 if (ec)
0482 return 0;
0483
0484 if (!res)
0485 ec = error::sync_receive_push_failed;
0486
0487 return size;
0488 }
0489
0490 template <class Logger, class CompletionToken>
0491 auto async_run(config const& cfg, Logger l, CompletionToken token)
0492 {
0493 runner_.set_config(cfg);
0494 l.set_prefix(runner_.get_config().log_prefix);
0495 return runner_.async_run(*this, l, std::move(token));
0496 }
0497
0498 template <class Response>
0499 void set_receive_response(Response& response)
0500 {
0501 using namespace boost::redis::adapter;
0502 auto g = boost_redis_adapt(response);
0503 receive_adapter_ = adapter::detail::make_adapter_wrapper(g);
0504 }
0505
0506 usage get_usage() const noexcept
0507 { return usage_; }
0508
0509 auto run_is_canceled() const noexcept
0510 { return cancel_run_called_; }
0511
0512 private:
0513 using receive_channel_type = asio::experimental::channel<executor_type, void(system::error_code, std::size_t)>;
0514 using runner_type = runner<executor_type>;
0515 using adapter_type = std::function<void(std::size_t, resp3::basic_node<std::string_view> const&, system::error_code&)>;
0516 using receiver_adapter_type = std::function<void(resp3::basic_node<std::string_view> const&, system::error_code&)>;
0517 using exec_notifier_type = receive_channel_type;
0518
0519 auto use_ssl() const noexcept
0520 { return runner_.get_config().use_ssl;}
0521
0522 auto cancel_on_conn_lost() -> std::size_t
0523 {
0524
0525 auto cond = [](auto const& ptr)
0526 {
0527 BOOST_ASSERT(ptr != nullptr);
0528
0529 if (ptr->is_waiting()) {
0530 return !ptr->req_->get_config().cancel_on_connection_lost;
0531 } else {
0532 return !ptr->req_->get_config().cancel_if_unresponded;
0533 }
0534 };
0535
0536 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond);
0537
0538 auto const ret = std::distance(point, std::end(reqs_));
0539
0540 std::for_each(point, std::end(reqs_), [](auto const& ptr) {
0541 ptr->stop();
0542 });
0543
0544 reqs_.erase(point, std::end(reqs_));
0545
0546 std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
0547 return ptr->mark_waiting();
0548 });
0549
0550 return ret;
0551 }
0552
0553 auto cancel_unwritten_requests() -> std::size_t
0554 {
0555 auto f = [](auto const& ptr)
0556 {
0557 BOOST_ASSERT(ptr != nullptr);
0558 return !ptr->is_waiting();
0559 };
0560
0561 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f);
0562
0563 auto const ret = std::distance(point, std::end(reqs_));
0564
0565 std::for_each(point, std::end(reqs_), [](auto const& ptr) {
0566 ptr->stop();
0567 });
0568
0569 reqs_.erase(point, std::end(reqs_));
0570 return ret;
0571 }
0572
0573 void cancel_impl(operation op)
0574 {
0575 switch (op) {
0576 case operation::exec:
0577 {
0578 cancel_unwritten_requests();
0579 } break;
0580 case operation::run:
0581 {
0582
0583
0584 if (std::exchange(cancel_run_called_, true)) {
0585 return;
0586 }
0587
0588 close();
0589 writer_timer_.cancel();
0590 receive_channel_.cancel();
0591 cancel_on_conn_lost();
0592 } break;
0593 case operation::receive:
0594 {
0595 receive_channel_.cancel();
0596 } break;
0597 default: ;
0598 }
0599 }
0600
0601 void on_write()
0602 {
0603
0604
0605 write_buffer_.clear();
0606
0607
0608 cancel_push_requests();
0609
0610
0611
0612 std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
0613 BOOST_ASSERT_MSG(ptr != nullptr, "Expects non-null pointer.");
0614 if (ptr->is_staged()) {
0615 ptr->mark_written();
0616 }
0617 });
0618 }
0619
0620 struct req_info {
0621 public:
0622 using node_type = resp3::basic_node<std::string_view>;
0623 using wrapped_adapter_type = std::function<void(node_type const&, system::error_code&)>;
0624
0625 explicit req_info(request const& req, adapter_type adapter, executor_type ex)
0626 : notifier_{ex, 1}
0627 , req_{&req}
0628 , adapter_{}
0629 , expected_responses_{req.get_expected_responses()}
0630 , status_{status::waiting}
0631 , ec_{{}}
0632 , read_size_{0}
0633 {
0634 adapter_ = [this, adapter](node_type const& nd, system::error_code& ec)
0635 {
0636 auto const i = req_->get_expected_responses() - expected_responses_;
0637 adapter(i, nd, ec);
0638 };
0639 }
0640
0641 auto proceed()
0642 {
0643 notifier_.try_send(std::error_code{}, 0);
0644 }
0645
0646 void stop()
0647 {
0648 notifier_.close();
0649 }
0650
0651 [[nodiscard]] auto is_waiting() const noexcept
0652 { return status_ == status::waiting; }
0653
0654 [[nodiscard]] auto is_written() const noexcept
0655 { return status_ == status::written; }
0656
0657 [[nodiscard]] auto is_staged() const noexcept
0658 { return status_ == status::staged; }
0659
0660 void mark_written() noexcept
0661 { status_ = status::written; }
0662
0663 void mark_staged() noexcept
0664 { status_ = status::staged; }
0665
0666 void mark_waiting() noexcept
0667 { status_ = status::waiting; }
0668
0669 [[nodiscard]] auto stop_requested() const noexcept
0670 { return !notifier_.is_open();}
0671
0672 template <class CompletionToken>
0673 auto async_wait(CompletionToken token)
0674 {
0675 return notifier_.async_receive(std::move(token));
0676 }
0677
0678
0679 enum class status
0680 { waiting
0681 , staged
0682 , written
0683 };
0684
0685 exec_notifier_type notifier_;
0686 request const* req_;
0687 wrapped_adapter_type adapter_;
0688
0689
0690 std::size_t expected_responses_;
0691 status status_;
0692
0693 system::error_code ec_;
0694 std::size_t read_size_;
0695 };
0696
0697 void remove_request(std::shared_ptr<req_info> const& info)
0698 {
0699 reqs_.erase(std::remove(std::begin(reqs_), std::end(reqs_), info));
0700 }
0701
0702 using reqs_type = std::deque<std::shared_ptr<req_info>>;
0703
0704 template <class, class> friend struct reader_op;
0705 template <class, class> friend struct writer_op;
0706 template <class, class> friend struct run_op;
0707 template <class> friend struct exec_op;
0708 template <class, class, class> friend struct run_all_op;
0709
0710 void cancel_push_requests()
0711 {
0712 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
0713 return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0);
0714 });
0715
0716 std::for_each(point, std::end(reqs_), [](auto const& ptr) {
0717 ptr->proceed();
0718 });
0719
0720 reqs_.erase(point, std::end(reqs_));
0721 }
0722
0723 [[nodiscard]] bool is_writing() const noexcept
0724 {
0725 return !write_buffer_.empty();
0726 }
0727
0728 void add_request_info(std::shared_ptr<req_info> const& info)
0729 {
0730 reqs_.push_back(info);
0731
0732 if (info->req_->has_hello_priority()) {
0733 auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) {
0734 return e->is_waiting();
0735 });
0736
0737 std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend);
0738 }
0739
0740 if (is_open() && !is_writing())
0741 writer_timer_.cancel();
0742 }
0743
0744 template <class CompletionToken, class Logger>
0745 auto reader(Logger l, CompletionToken&& token)
0746 {
0747 return asio::async_compose
0748 < CompletionToken
0749 , void(system::error_code)
0750 >(reader_op<this_type, Logger>{this, l}, token, writer_timer_);
0751 }
0752
0753 template <class CompletionToken, class Logger>
0754 auto writer(Logger l, CompletionToken&& token)
0755 {
0756 return asio::async_compose
0757 < CompletionToken
0758 , void(system::error_code)
0759 >(writer_op<this_type, Logger>{this, l}, token, writer_timer_);
0760 }
0761
0762 template <class Logger, class CompletionToken>
0763 auto async_run_lean(config const& cfg, Logger l, CompletionToken token)
0764 {
0765 runner_.set_config(cfg);
0766 l.set_prefix(runner_.get_config().log_prefix);
0767 return asio::async_compose
0768 < CompletionToken
0769 , void(system::error_code)
0770 >(run_op<this_type, Logger>{this, l}, token, writer_timer_);
0771 }
0772
0773 [[nodiscard]] bool coalesce_requests()
0774 {
0775
0776
0777 auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](auto const& ri) {
0778 return !ri->is_waiting();
0779 });
0780
0781 std::for_each(point, std::cend(reqs_), [this](auto const& ri) {
0782
0783 write_buffer_ += ri->req_->payload();
0784 ri->mark_staged();
0785 usage_.commands_sent += ri->expected_responses_;
0786 });
0787
0788 usage_.bytes_sent += std::size(write_buffer_);
0789
0790 return point != std::cend(reqs_);
0791 }
0792
0793 bool is_waiting_response() const noexcept
0794 {
0795 if (std::empty(reqs_))
0796 return false;
0797
0798
0799
0800
0801
0802 return !reqs_.front()->is_waiting();
0803 }
0804
0805 void close()
0806 {
0807 if (stream_->next_layer().is_open()) {
0808 system::error_code ec;
0809 stream_->next_layer().close(ec);
0810 }
0811 }
0812
0813 auto is_open() const noexcept { return stream_->next_layer().is_open(); }
0814 auto& lowest_layer() noexcept { return stream_->lowest_layer(); }
0815
0816 auto is_next_push()
0817 {
0818 BOOST_ASSERT(!read_buffer_.empty());
0819
0820
0821
0822
0823
0824
0825
0826
0827 if (resp3::to_type(read_buffer_.front()) == resp3::type::push)
0828 return true;
0829
0830
0831
0832
0833
0834
0835
0836
0837 if (reqs_.empty())
0838 return true;
0839
0840
0841
0842 if (reqs_.front()->expected_responses_ == 0)
0843 return true;
0844
0845
0846
0847
0848
0849
0850 return reqs_.front()->is_waiting();
0851 }
0852
0853 auto get_suggested_buffer_growth() const noexcept
0854 {
0855 return parser_.get_suggested_buffer_growth(4096);
0856 }
0857
0858 enum class parse_result { needs_more, push, resp };
0859
0860 using parse_ret_type = std::pair<parse_result, std::size_t>;
0861
0862 parse_ret_type on_finish_parsing(parse_result t)
0863 {
0864 if (t == parse_result::push) {
0865 usage_.pushes_received += 1;
0866 usage_.push_bytes_received += parser_.get_consumed();
0867 } else {
0868 usage_.responses_received += 1;
0869 usage_.response_bytes_received += parser_.get_consumed();
0870 }
0871
0872 on_push_ = false;
0873 dbuf_.consume(parser_.get_consumed());
0874 auto const res = std::make_pair(t, parser_.get_consumed());
0875 parser_.reset();
0876 return res;
0877 }
0878
0879 parse_ret_type on_read(std::string_view data, system::error_code& ec)
0880 {
0881
0882
0883
0884
0885
0886
0887
0888
0889
0890
0891 if (!on_push_)
0892 on_push_ = is_next_push();
0893
0894 if (on_push_) {
0895 if (!resp3::parse(parser_, data, receive_adapter_, ec))
0896 return std::make_pair(parse_result::needs_more, 0);
0897
0898 if (ec)
0899 return std::make_pair(parse_result::push, 0);
0900
0901 return on_finish_parsing(parse_result::push);
0902 }
0903
0904 BOOST_ASSERT_MSG(is_waiting_response(), "Not waiting for a response (using MONITOR command perhaps?)");
0905 BOOST_ASSERT(!reqs_.empty());
0906 BOOST_ASSERT(reqs_.front() != nullptr);
0907 BOOST_ASSERT(reqs_.front()->expected_responses_ != 0);
0908
0909 if (!resp3::parse(parser_, data, reqs_.front()->adapter_, ec))
0910 return std::make_pair(parse_result::needs_more, 0);
0911
0912 if (ec) {
0913 reqs_.front()->ec_ = ec;
0914 reqs_.front()->proceed();
0915 return std::make_pair(parse_result::resp, 0);
0916 }
0917
0918 reqs_.front()->read_size_ += parser_.get_consumed();
0919
0920 if (--reqs_.front()->expected_responses_ == 0) {
0921
0922 reqs_.front()->proceed();
0923 reqs_.pop_front();
0924 }
0925
0926 return on_finish_parsing(parse_result::resp);
0927 }
0928
0929 void reset()
0930 {
0931 write_buffer_.clear();
0932 read_buffer_.clear();
0933 parser_.reset();
0934 on_push_ = false;
0935 cancel_run_called_ = false;
0936 }
0937
0938 asio::ssl::context ctx_;
0939 std::unique_ptr<next_layer_type> stream_;
0940
0941
0942
0943
0944 timer_type writer_timer_;
0945 receive_channel_type receive_channel_;
0946 runner_type runner_;
0947 receiver_adapter_type receive_adapter_;
0948
0949 using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
0950
0951 std::string read_buffer_;
0952 dyn_buffer_type dbuf_;
0953 std::string write_buffer_;
0954 reqs_type reqs_;
0955 resp3::parser parser_{};
0956 bool on_push_ = false;
0957 bool cancel_run_called_ = false;
0958
0959 usage usage_;
0960 };
0961
0962 }
0963
0964 #endif