File indexing completed on 2025-01-18 09:51:19
0001
0002
0003
0004
0005
0006
0007 #ifndef BOOST_REDIS_CONNECTION_BASE_HPP
0008 #define BOOST_REDIS_CONNECTION_BASE_HPP
0009
0010 #include <boost/redis/adapter/adapt.hpp>
0011 #include <boost/redis/detail/helper.hpp>
0012 #include <boost/redis/error.hpp>
0013 #include <boost/redis/operation.hpp>
0014 #include <boost/redis/request.hpp>
0015 #include <boost/redis/resp3/type.hpp>
0016 #include <boost/redis/config.hpp>
0017 #include <boost/redis/detail/runner.hpp>
0018 #include <boost/redis/usage.hpp>
0019
0020 #include <boost/system.hpp>
0021 #include <boost/asio/basic_stream_socket.hpp>
0022 #include <boost/asio/bind_executor.hpp>
0023 #include <boost/asio/experimental/parallel_group.hpp>
0024 #include <boost/asio/ip/tcp.hpp>
0025 #include <boost/asio/steady_timer.hpp>
0026 #include <boost/asio/write.hpp>
0027 #include <boost/assert.hpp>
0028 #include <boost/core/ignore_unused.hpp>
0029 #include <boost/asio/ssl/stream.hpp>
0030 #include <boost/asio/read_until.hpp>
0031 #include <boost/asio/buffer.hpp>
0032 #include <boost/asio/experimental/channel.hpp>
0033
0034 #include <algorithm>
0035 #include <array>
0036 #include <chrono>
0037 #include <deque>
0038 #include <memory>
0039 #include <string_view>
0040 #include <type_traits>
0041 #include <functional>
0042
0043 namespace boost::redis::detail
0044 {
0045
0046 template <class DynamicBuffer>
0047 std::string_view buffer_view(DynamicBuffer buf) noexcept
0048 {
0049 char const* start = static_cast<char const*>(buf.data(0, buf.size()).data());
0050 return std::string_view{start, std::size(buf)};
0051 }
0052
0053 template <class AsyncReadStream, class DynamicBuffer>
0054 class append_some_op {
0055 private:
0056 AsyncReadStream& stream_;
0057 DynamicBuffer buf_;
0058 std::size_t size_ = 0;
0059 std::size_t tmp_ = 0;
0060 asio::coroutine coro_{};
0061
0062 public:
0063 append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
0064 : stream_ {stream}
0065 , buf_ {std::move(buf)}
0066 , size_{size}
0067 { }
0068
0069 template <class Self>
0070 void operator()( Self& self
0071 , system::error_code ec = {}
0072 , std::size_t n = 0)
0073 {
0074 BOOST_ASIO_CORO_REENTER (coro_)
0075 {
0076 tmp_ = buf_.size();
0077 buf_.grow(size_);
0078
0079 BOOST_ASIO_CORO_YIELD
0080 stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
0081 if (ec) {
0082 self.complete(ec, 0);
0083 return;
0084 }
0085
0086 buf_.shrink(buf_.size() - tmp_ - n);
0087 self.complete({}, n);
0088 }
0089 }
0090 };
0091
0092 template <class AsyncReadStream, class DynamicBuffer, class CompletionToken>
0093 auto
0094 async_append_some(
0095 AsyncReadStream& stream,
0096 DynamicBuffer buffer,
0097 std::size_t size,
0098 CompletionToken&& token)
0099 {
0100 return asio::async_compose
0101 < CompletionToken
0102 , void(system::error_code, std::size_t)
0103 >(append_some_op<AsyncReadStream, DynamicBuffer> {stream, buffer, size}, token, stream);
0104 }
0105
0106 template <class Conn>
0107 struct exec_op {
0108 using req_info_type = typename Conn::req_info;
0109 using adapter_type = typename Conn::adapter_type;
0110
0111 Conn* conn_ = nullptr;
0112 std::shared_ptr<req_info_type> info_ = nullptr;
0113 asio::coroutine coro{};
0114
0115 template <class Self>
0116 void operator()(Self& self , system::error_code ec = {})
0117 {
0118 BOOST_ASIO_CORO_REENTER (coro)
0119 {
0120
0121
0122 if (info_->req_->get_config().cancel_if_not_connected && !conn_->is_open()) {
0123 BOOST_ASIO_CORO_YIELD
0124 asio::post(std::move(self));
0125 return self.complete(error::not_connected, 0);
0126 }
0127
0128 conn_->add_request_info(info_);
0129
0130 EXEC_OP_WAIT:
0131 BOOST_ASIO_CORO_YIELD
0132 info_->async_wait(std::move(self));
0133 BOOST_ASSERT(ec == asio::error::operation_aborted);
0134
0135 if (info_->ec_) {
0136 self.complete(info_->ec_, 0);
0137 return;
0138 }
0139
0140 if (info_->stop_requested()) {
0141
0142
0143 return self.complete(ec, 0);
0144 }
0145
0146 if (is_cancelled(self)) {
0147 if (info_->is_written()) {
0148 using c_t = asio::cancellation_type;
0149 auto const c = self.get_cancellation_state().cancelled();
0150 if ((c & c_t::terminal) != c_t::none) {
0151
0152
0153 conn_->cancel(operation::run);
0154 return self.complete(ec, 0);
0155 } else {
0156
0157 self.get_cancellation_state().clear();
0158
0159
0160
0161 goto EXEC_OP_WAIT;
0162 }
0163 } else {
0164
0165 conn_->remove_request(info_);
0166 self.complete(ec, 0);
0167 return;
0168 }
0169 }
0170
0171 self.complete(info_->ec_, info_->read_size_);
0172 }
0173 }
0174 };
0175
0176 template <class Conn, class Logger>
0177 struct run_op {
0178 Conn* conn = nullptr;
0179 Logger logger_;
0180 asio::coroutine coro{};
0181
0182 template <class Self>
0183 void operator()( Self& self
0184 , std::array<std::size_t, 2> order = {}
0185 , system::error_code ec0 = {}
0186 , system::error_code ec1 = {})
0187 {
0188 BOOST_ASIO_CORO_REENTER (coro)
0189 {
0190 conn->reset();
0191
0192 BOOST_ASIO_CORO_YIELD
0193 asio::experimental::make_parallel_group(
0194 [this](auto token) { return conn->reader(logger_, token);},
0195 [this](auto token) { return conn->writer(logger_, token);}
0196 ).async_wait(
0197 asio::experimental::wait_for_one(),
0198 std::move(self));
0199
0200 if (is_cancelled(self)) {
0201 logger_.trace("run-op: canceled. Exiting ...");
0202 self.complete(asio::error::operation_aborted);
0203 return;
0204 }
0205
0206 logger_.on_run(ec0, ec1);
0207
0208 switch (order[0]) {
0209 case 0: self.complete(ec0); break;
0210 case 1: self.complete(ec1); break;
0211 default: BOOST_ASSERT(false);
0212 }
0213 }
0214 }
0215 };
0216
0217 template <class Conn, class Logger>
0218 struct writer_op {
0219 Conn* conn_;
0220 Logger logger_;
0221 asio::coroutine coro{};
0222
0223 template <class Self>
0224 void operator()( Self& self
0225 , system::error_code ec = {}
0226 , std::size_t n = 0)
0227 {
0228 ignore_unused(n);
0229
0230 BOOST_ASIO_CORO_REENTER (coro) for (;;)
0231 {
0232 while (conn_->coalesce_requests()) {
0233 if (conn_->use_ssl())
0234 BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
0235 else
0236 BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer().next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
0237
0238 logger_.on_write(ec, conn_->write_buffer_);
0239
0240 if (ec) {
0241 logger_.trace("writer-op: error. Exiting ...");
0242 conn_->cancel(operation::run);
0243 self.complete(ec);
0244 return;
0245 }
0246
0247 if (is_cancelled(self)) {
0248 logger_.trace("writer-op: canceled. Exiting ...");
0249 self.complete(asio::error::operation_aborted);
0250 return;
0251 }
0252
0253 conn_->on_write();
0254
0255
0256
0257
0258 if (!conn_->is_open()) {
0259 logger_.trace("writer-op: canceled (2). Exiting ...");
0260 self.complete({});
0261 return;
0262 }
0263 }
0264
0265 BOOST_ASIO_CORO_YIELD
0266 conn_->writer_timer_.async_wait(std::move(self));
0267 if (!conn_->is_open() || is_cancelled(self)) {
0268 logger_.trace("writer-op: canceled (3). Exiting ...");
0269
0270
0271
0272 self.complete({});
0273 return;
0274 }
0275 }
0276 }
0277 };
0278
0279 template <class Conn, class Logger>
0280 struct reader_op {
0281 using parse_result = typename Conn::parse_result;
0282 using parse_ret_type = typename Conn::parse_ret_type;
0283 Conn* conn_;
0284 Logger logger_;
0285 parse_ret_type res_{parse_result::resp, 0};
0286 asio::coroutine coro{};
0287
0288 template <class Self>
0289 void operator()( Self& self
0290 , system::error_code ec = {}
0291 , std::size_t n = 0)
0292 {
0293 ignore_unused(n);
0294
0295 BOOST_ASIO_CORO_REENTER (coro) for (;;)
0296 {
0297
0298 if ((res_.first == parse_result::needs_more) || std::empty(conn_->read_buffer_)) {
0299 if (conn_->use_ssl()) {
0300 BOOST_ASIO_CORO_YIELD
0301 async_append_some(
0302 conn_->next_layer(),
0303 conn_->dbuf_,
0304 conn_->get_suggested_buffer_growth(),
0305 std::move(self));
0306 } else {
0307 BOOST_ASIO_CORO_YIELD
0308 async_append_some(
0309 conn_->next_layer().next_layer(),
0310 conn_->dbuf_,
0311 conn_->get_suggested_buffer_growth(),
0312 std::move(self));
0313 }
0314
0315 logger_.on_read(ec, n);
0316
0317
0318 if (ec == asio::error::eof) {
0319 logger_.trace("reader-op: EOF received. Exiting ...");
0320 conn_->cancel(operation::run);
0321 return self.complete({});
0322 }
0323
0324
0325 if (ec) {
0326 logger_.trace("reader-op: error. Exiting ...");
0327 conn_->cancel(operation::run);
0328 self.complete(ec);
0329 return;
0330 }
0331
0332
0333
0334
0335 if (!conn_->is_open() || is_cancelled(self)) {
0336 logger_.trace("reader-op: canceled. Exiting ...");
0337 self.complete(ec);
0338 return;
0339 }
0340 }
0341
0342 res_ = conn_->on_read(buffer_view(conn_->dbuf_), ec);
0343 if (ec) {
0344 logger_.trace("reader-op: parse error. Exiting ...");
0345 conn_->cancel(operation::run);
0346 self.complete(ec);
0347 return;
0348 }
0349
0350 if (res_.first == parse_result::push) {
0351 if (!conn_->receive_channel_.try_send(ec, res_.second)) {
0352 BOOST_ASIO_CORO_YIELD
0353 conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
0354 }
0355
0356 if (ec) {
0357 logger_.trace("reader-op: error. Exiting ...");
0358 conn_->cancel(operation::run);
0359 self.complete(ec);
0360 return;
0361 }
0362
0363 if (!conn_->is_open() || is_cancelled(self)) {
0364 logger_.trace("reader-op: canceled (2). Exiting ...");
0365 self.complete(asio::error::operation_aborted);
0366 return;
0367 }
0368
0369 }
0370 }
0371 }
0372 };
0373
0374
0375
0376
0377
0378
0379
0380 template <class Executor>
0381 class connection_base {
0382 public:
0383
0384 using executor_type = Executor;
0385
0386
0387 using next_layer_type = asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>>;
0388
0389 using clock_type = std::chrono::steady_clock;
0390 using clock_traits_type = asio::wait_traits<clock_type>;
0391 using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
0392
0393 using this_type = connection_base<Executor>;
0394
0395
0396 connection_base(
0397 executor_type ex,
0398 asio::ssl::context::method method,
0399 std::size_t max_read_size)
0400 : ctx_{method}
0401 , stream_{std::make_unique<next_layer_type>(ex, ctx_)}
0402 , writer_timer_{ex}
0403 , receive_channel_{ex, 256}
0404 , runner_{ex, {}}
0405 , dbuf_{read_buffer_, max_read_size}
0406 {
0407 set_receive_response(ignore);
0408 writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)());
0409 }
0410
0411
0412 auto const& get_ssl_context() const noexcept
0413 { return ctx_;}
0414
0415
0416 auto& get_ssl_context() noexcept
0417 { return ctx_;}
0418
0419
0420 void reset_stream()
0421 {
0422 stream_ = std::make_unique<next_layer_type>(writer_timer_.get_executor(), ctx_);
0423 }
0424
0425
0426 auto& next_layer() noexcept { return *stream_; }
0427
0428
0429 auto const& next_layer() const noexcept { return *stream_; }
0430
0431
0432 auto get_executor() {return writer_timer_.get_executor();}
0433
0434
0435 void cancel(operation op)
0436 {
0437 runner_.cancel(op);
0438 if (op == operation::all) {
0439 cancel_impl(operation::run);
0440 cancel_impl(operation::receive);
0441 cancel_impl(operation::exec);
0442 return;
0443 }
0444
0445 cancel_impl(op);
0446 }
0447
0448 template <class Response, class CompletionToken>
0449 auto async_exec(request const& req, Response& resp, CompletionToken token)
0450 {
0451 using namespace boost::redis::adapter;
0452 auto f = boost_redis_adapt(resp);
0453 BOOST_ASSERT_MSG(req.get_expected_responses() <= f.get_supported_response_size(), "Request and response have incompatible sizes.");
0454
0455 auto info = std::make_shared<req_info>(req, f, get_executor());
0456
0457 return asio::async_compose
0458 < CompletionToken
0459 , void(system::error_code, std::size_t)
0460 >(exec_op<this_type>{this, info}, token, writer_timer_);
0461 }
0462
0463 template <class Response, class CompletionToken>
0464 [[deprecated("Set the response with set_receive_response and use the other overload.")]]
0465 auto async_receive(Response& response, CompletionToken token)
0466 {
0467 set_receive_response(response);
0468 return receive_channel_.async_receive(std::move(token));
0469 }
0470
0471 template <class CompletionToken>
0472 auto async_receive(CompletionToken token)
0473 { return receive_channel_.async_receive(std::move(token)); }
0474
0475 std::size_t receive(system::error_code& ec)
0476 {
0477 std::size_t size = 0;
0478
0479 auto f = [&](system::error_code const& ec2, std::size_t n)
0480 {
0481 ec = ec2;
0482 size = n;
0483 };
0484
0485 auto const res = receive_channel_.try_receive(f);
0486 if (ec)
0487 return 0;
0488
0489 if (!res)
0490 ec = error::sync_receive_push_failed;
0491
0492 return size;
0493 }
0494
0495 template <class Logger, class CompletionToken>
0496 auto async_run(config const& cfg, Logger l, CompletionToken token)
0497 {
0498 runner_.set_config(cfg);
0499 l.set_prefix(runner_.get_config().log_prefix);
0500 return runner_.async_run(*this, l, std::move(token));
0501 }
0502
0503 template <class Response>
0504 void set_receive_response(Response& response)
0505 {
0506 using namespace boost::redis::adapter;
0507 auto g = boost_redis_adapt(response);
0508 receive_adapter_ = adapter::detail::make_adapter_wrapper(g);
0509 }
0510
0511 usage get_usage() const noexcept
0512 { return usage_; }
0513
0514 private:
0515 using receive_channel_type = asio::experimental::channel<executor_type, void(system::error_code, std::size_t)>;
0516 using runner_type = runner<executor_type>;
0517 using adapter_type = std::function<void(std::size_t, resp3::basic_node<std::string_view> const&, system::error_code&)>;
0518 using receiver_adapter_type = std::function<void(resp3::basic_node<std::string_view> const&, system::error_code&)>;
0519
0520 auto use_ssl() const noexcept
0521 { return runner_.get_config().use_ssl;}
0522
0523 auto cancel_on_conn_lost() -> std::size_t
0524 {
0525
0526 auto cond = [](auto const& ptr)
0527 {
0528 BOOST_ASSERT(ptr != nullptr);
0529
0530 if (ptr->is_written()) {
0531 return !ptr->req_->get_config().cancel_if_unresponded;
0532 } else {
0533 return !ptr->req_->get_config().cancel_on_connection_lost;
0534 }
0535 };
0536
0537 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond);
0538
0539 auto const ret = std::distance(point, std::end(reqs_));
0540
0541 std::for_each(point, std::end(reqs_), [](auto const& ptr) {
0542 ptr->stop();
0543 });
0544
0545 reqs_.erase(point, std::end(reqs_));
0546 std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
0547 return ptr->reset_status();
0548 });
0549
0550 return ret;
0551 }
0552
0553 auto cancel_unwritten_requests() -> std::size_t
0554 {
0555 auto f = [](auto const& ptr)
0556 {
0557 BOOST_ASSERT(ptr != nullptr);
0558 return ptr->is_written();
0559 };
0560
0561 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f);
0562
0563 auto const ret = std::distance(point, std::end(reqs_));
0564
0565 std::for_each(point, std::end(reqs_), [](auto const& ptr) {
0566 ptr->stop();
0567 });
0568
0569 reqs_.erase(point, std::end(reqs_));
0570 return ret;
0571 }
0572
0573 void cancel_impl(operation op)
0574 {
0575 switch (op) {
0576 case operation::exec:
0577 {
0578 cancel_unwritten_requests();
0579 } break;
0580 case operation::run:
0581 {
0582 close();
0583 writer_timer_.cancel();
0584 receive_channel_.cancel();
0585 cancel_on_conn_lost();
0586 } break;
0587 case operation::receive:
0588 {
0589 receive_channel_.cancel();
0590 } break;
0591 default: ;
0592 }
0593 }
0594
0595 void on_write()
0596 {
0597
0598
0599 write_buffer_.clear();
0600
0601
0602 cancel_push_requests();
0603
0604
0605
0606 std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
0607 BOOST_ASSERT_MSG(ptr != nullptr, "Expects non-null pointer.");
0608 if (ptr->is_staged())
0609 ptr->mark_written();
0610 });
0611 }
0612
0613 struct req_info {
0614 public:
0615 using node_type = resp3::basic_node<std::string_view>;
0616 using wrapped_adapter_type = std::function<void(node_type const&, system::error_code&)>;
0617
0618 enum class action
0619 {
0620 stop,
0621 proceed,
0622 none,
0623 };
0624
0625 explicit req_info(request const& req, adapter_type adapter, executor_type ex)
0626 : timer_{ex}
0627 , action_{action::none}
0628 , req_{&req}
0629 , adapter_{}
0630 , expected_responses_{req.get_expected_responses()}
0631 , status_{status::none}
0632 , ec_{{}}
0633 , read_size_{0}
0634 {
0635 timer_.expires_at((std::chrono::steady_clock::time_point::max)());
0636
0637 adapter_ = [this, adapter](node_type const& nd, system::error_code& ec)
0638 {
0639 auto const i = req_->get_expected_responses() - expected_responses_;
0640 adapter(i, nd, ec);
0641 };
0642 }
0643
0644 auto proceed()
0645 {
0646 timer_.cancel();
0647 action_ = action::proceed;
0648 }
0649
0650 void stop()
0651 {
0652 timer_.cancel();
0653 action_ = action::stop;
0654 }
0655
0656 [[nodiscard]] auto is_waiting_write() const noexcept
0657 { return !is_written() && !is_staged(); }
0658
0659 [[nodiscard]] auto is_written() const noexcept
0660 { return status_ == status::written; }
0661
0662 [[nodiscard]] auto is_staged() const noexcept
0663 { return status_ == status::staged; }
0664
0665 void mark_written() noexcept
0666 { status_ = status::written; }
0667
0668 void mark_staged() noexcept
0669 { status_ = status::staged; }
0670
0671 void reset_status() noexcept
0672 { status_ = status::none; }
0673
0674 [[nodiscard]] auto stop_requested() const noexcept
0675 { return action_ == action::stop;}
0676
0677 template <class CompletionToken>
0678 auto async_wait(CompletionToken token)
0679 {
0680 return timer_.async_wait(std::move(token));
0681 }
0682
0683
0684 enum class status
0685 { none
0686 , staged
0687 , written
0688 };
0689
0690 timer_type timer_;
0691 action action_;
0692 request const* req_;
0693 wrapped_adapter_type adapter_;
0694
0695
0696 std::size_t expected_responses_;
0697 status status_;
0698
0699 system::error_code ec_;
0700 std::size_t read_size_;
0701 };
0702
0703 void remove_request(std::shared_ptr<req_info> const& info)
0704 {
0705 reqs_.erase(std::remove(std::begin(reqs_), std::end(reqs_), info));
0706 }
0707
0708 using reqs_type = std::deque<std::shared_ptr<req_info>>;
0709
0710 template <class, class> friend struct reader_op;
0711 template <class, class> friend struct writer_op;
0712 template <class, class> friend struct run_op;
0713 template <class> friend struct exec_op;
0714 template <class, class, class> friend struct run_all_op;
0715
0716 void cancel_push_requests()
0717 {
0718 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
0719 return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0);
0720 });
0721
0722 std::for_each(point, std::end(reqs_), [](auto const& ptr) {
0723 ptr->proceed();
0724 });
0725
0726 reqs_.erase(point, std::end(reqs_));
0727 }
0728
0729 [[nodiscard]] bool is_writing() const noexcept
0730 {
0731 return !write_buffer_.empty();
0732 }
0733
0734 void add_request_info(std::shared_ptr<req_info> const& info)
0735 {
0736 reqs_.push_back(info);
0737
0738 if (info->req_->has_hello_priority()) {
0739 auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) {
0740 return e->is_waiting_write();
0741 });
0742
0743 std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend);
0744 }
0745
0746 if (is_open() && !is_writing())
0747 writer_timer_.cancel();
0748 }
0749
0750 template <class CompletionToken, class Logger>
0751 auto reader(Logger l, CompletionToken&& token)
0752 {
0753 return asio::async_compose
0754 < CompletionToken
0755 , void(system::error_code)
0756 >(reader_op<this_type, Logger>{this, l}, token, writer_timer_);
0757 }
0758
0759 template <class CompletionToken, class Logger>
0760 auto writer(Logger l, CompletionToken&& token)
0761 {
0762 return asio::async_compose
0763 < CompletionToken
0764 , void(system::error_code)
0765 >(writer_op<this_type, Logger>{this, l}, token, writer_timer_);
0766 }
0767
0768 template <class Logger, class CompletionToken>
0769 auto async_run_lean(config const& cfg, Logger l, CompletionToken token)
0770 {
0771 runner_.set_config(cfg);
0772 l.set_prefix(runner_.get_config().log_prefix);
0773 return asio::async_compose
0774 < CompletionToken
0775 , void(system::error_code)
0776 >(run_op<this_type, Logger>{this, l}, token, writer_timer_);
0777 }
0778
0779 [[nodiscard]] bool coalesce_requests()
0780 {
0781
0782
0783 auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](auto const& ri) {
0784 return !ri->is_waiting_write();
0785 });
0786
0787 std::for_each(point, std::cend(reqs_), [this](auto const& ri) {
0788
0789 write_buffer_ += ri->req_->payload();
0790 ri->mark_staged();
0791 usage_.commands_sent += ri->expected_responses_;
0792 });
0793
0794 usage_.bytes_sent += std::size(write_buffer_);
0795
0796 return point != std::cend(reqs_);
0797 }
0798
0799 bool is_waiting_response() const noexcept
0800 {
0801 return !std::empty(reqs_) && reqs_.front()->is_written();
0802 }
0803
0804 void close()
0805 {
0806 if (stream_->next_layer().is_open()) {
0807 system::error_code ec;
0808 stream_->next_layer().close(ec);
0809 }
0810 }
0811
0812 auto is_open() const noexcept { return stream_->next_layer().is_open(); }
0813 auto& lowest_layer() noexcept { return stream_->lowest_layer(); }
0814
0815 auto is_next_push()
0816 {
0817
0818
0819
0820
0821
0822
0823
0824
0825
0826
0827
0828
0829
0830
0831
0832
0833
0834
0835
0836
0837
0838
0839
0840 BOOST_ASSERT(!read_buffer_.empty());
0841
0842 return
0843 (resp3::to_type(read_buffer_.front()) == resp3::type::push)
0844 || reqs_.empty()
0845 || (!reqs_.empty() && reqs_.front()->expected_responses_ == 0)
0846 || !is_waiting_response();
0847 }
0848
0849 auto get_suggested_buffer_growth() const noexcept
0850 {
0851 return parser_.get_suggested_buffer_growth(4096);
0852 }
0853
0854 enum class parse_result { needs_more, push, resp };
0855
0856 using parse_ret_type = std::pair<parse_result, std::size_t>;
0857
0858 parse_ret_type on_finish_parsing(parse_result t)
0859 {
0860 if (t == parse_result::push) {
0861 usage_.pushes_received += 1;
0862 usage_.push_bytes_received += parser_.get_consumed();
0863 } else {
0864 usage_.responses_received += 1;
0865 usage_.response_bytes_received += parser_.get_consumed();
0866 }
0867
0868 on_push_ = false;
0869 dbuf_.consume(parser_.get_consumed());
0870 auto const res = std::make_pair(t, parser_.get_consumed());
0871 parser_.reset();
0872 return res;
0873 }
0874
0875 parse_ret_type on_read(std::string_view data, system::error_code& ec)
0876 {
0877
0878
0879
0880
0881
0882
0883
0884
0885
0886
0887 if (!on_push_)
0888 on_push_ = is_next_push();
0889
0890 if (on_push_) {
0891 if (!resp3::parse(parser_, data, receive_adapter_, ec))
0892 return std::make_pair(parse_result::needs_more, 0);
0893
0894 if (ec)
0895 return std::make_pair(parse_result::push, 0);
0896
0897 return on_finish_parsing(parse_result::push);
0898 }
0899
0900 BOOST_ASSERT_MSG(is_waiting_response(), "Not waiting for a response (using MONITOR command perhaps?)");
0901 BOOST_ASSERT(!reqs_.empty());
0902 BOOST_ASSERT(reqs_.front() != nullptr);
0903 BOOST_ASSERT(reqs_.front()->expected_responses_ != 0);
0904
0905 if (!resp3::parse(parser_, data, reqs_.front()->adapter_, ec))
0906 return std::make_pair(parse_result::needs_more, 0);
0907
0908 if (ec) {
0909 reqs_.front()->ec_ = ec;
0910 reqs_.front()->proceed();
0911 return std::make_pair(parse_result::resp, 0);
0912 }
0913
0914 reqs_.front()->read_size_ += parser_.get_consumed();
0915
0916 if (--reqs_.front()->expected_responses_ == 0) {
0917
0918 reqs_.front()->proceed();
0919 reqs_.pop_front();
0920 }
0921
0922 return on_finish_parsing(parse_result::resp);
0923 }
0924
0925 void reset()
0926 {
0927 write_buffer_.clear();
0928 read_buffer_.clear();
0929 parser_.reset();
0930 on_push_ = false;
0931 }
0932
0933 asio::ssl::context ctx_;
0934 std::unique_ptr<next_layer_type> stream_;
0935
0936
0937
0938
0939 timer_type writer_timer_;
0940 receive_channel_type receive_channel_;
0941 runner_type runner_;
0942 receiver_adapter_type receive_adapter_;
0943
0944 using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
0945
0946 std::string read_buffer_;
0947 dyn_buffer_type dbuf_;
0948 std::string write_buffer_;
0949 reqs_type reqs_;
0950 resp3::parser parser_{};
0951 bool on_push_ = false;
0952
0953 usage usage_;
0954 };
0955
0956 }
0957
0958 #endif