File indexing completed on 2025-12-16 10:08:25
0001
0002
0003
0004
0005
0006
0007 #ifndef BOOST_REDIS_CONNECTION_HPP
0008 #define BOOST_REDIS_CONNECTION_HPP
0009
0010 #include <boost/redis/adapter/adapt.hpp>
0011 #include <boost/redis/adapter/any_adapter.hpp>
0012 #include <boost/redis/config.hpp>
0013 #include <boost/redis/detail/connector.hpp>
0014 #include <boost/redis/detail/health_checker.hpp>
0015 #include <boost/redis/detail/helper.hpp>
0016 #include <boost/redis/detail/resolver.hpp>
0017 #include <boost/redis/detail/resp3_handshaker.hpp>
0018 #include <boost/redis/error.hpp>
0019 #include <boost/redis/logger.hpp>
0020 #include <boost/redis/operation.hpp>
0021 #include <boost/redis/request.hpp>
0022 #include <boost/redis/resp3/type.hpp>
0023 #include <boost/redis/usage.hpp>
0024
0025 #include <boost/asio/any_completion_handler.hpp>
0026 #include <boost/asio/any_io_executor.hpp>
0027 #include <boost/asio/associated_immediate_executor.hpp>
0028 #include <boost/asio/basic_stream_socket.hpp>
0029 #include <boost/asio/bind_executor.hpp>
0030 #include <boost/asio/buffer.hpp>
0031 #include <boost/asio/cancel_after.hpp>
0032 #include <boost/asio/coroutine.hpp>
0033 #include <boost/asio/deferred.hpp>
0034 #include <boost/asio/experimental/channel.hpp>
0035 #include <boost/asio/experimental/parallel_group.hpp>
0036 #include <boost/asio/io_context.hpp>
0037 #include <boost/asio/ip/tcp.hpp>
0038 #include <boost/asio/prepend.hpp>
0039 #include <boost/asio/read_until.hpp>
0040 #include <boost/asio/ssl/stream.hpp>
0041 #include <boost/asio/steady_timer.hpp>
0042 #include <boost/asio/write.hpp>
0043 #include <boost/assert.hpp>
0044
0045 #include <boost/core/ignore_unused.hpp>
0046
0047 #include <algorithm>
0048 #include <array>
0049 #include <chrono>
0050 #include <cstddef>
0051 #include <deque>
0052 #include <functional>
0053 #include <limits>
0054 #include <memory>
0055 #include <string_view>
0056 #include <utility>
0057
0058 namespace boost::redis {
0059 namespace detail
0060 {
0061
0062 template <class DynamicBuffer>
0063 std::string_view buffer_view(DynamicBuffer buf) noexcept
0064 {
0065 char const* start = static_cast<char const*>(buf.data(0, buf.size()).data());
0066 return std::string_view{start, std::size(buf)};
0067 }
0068
0069 template <class AsyncReadStream, class DynamicBuffer>
0070 class append_some_op {
0071 private:
0072 AsyncReadStream& stream_;
0073 DynamicBuffer buf_;
0074 std::size_t size_ = 0;
0075 std::size_t tmp_ = 0;
0076 asio::coroutine coro_{};
0077
0078 public:
0079 append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
0080 : stream_ {stream}
0081 , buf_ {std::move(buf)}
0082 , size_{size}
0083 { }
0084
0085 template <class Self>
0086 void operator()( Self& self
0087 , system::error_code ec = {}
0088 , std::size_t n = 0)
0089 {
0090 BOOST_ASIO_CORO_REENTER (coro_)
0091 {
0092 tmp_ = buf_.size();
0093 buf_.grow(size_);
0094
0095 BOOST_ASIO_CORO_YIELD
0096 stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
0097 if (ec) {
0098 self.complete(ec, 0);
0099 return;
0100 }
0101
0102 buf_.shrink(buf_.size() - tmp_ - n);
0103 self.complete({}, n);
0104 }
0105 }
0106 };
0107
0108 template <class AsyncReadStream, class DynamicBuffer, class CompletionToken>
0109 auto
0110 async_append_some(
0111 AsyncReadStream& stream,
0112 DynamicBuffer buffer,
0113 std::size_t size,
0114 CompletionToken&& token)
0115 {
0116 return asio::async_compose
0117 < CompletionToken
0118 , void(system::error_code, std::size_t)
0119 >(append_some_op<AsyncReadStream, DynamicBuffer> {stream, buffer, size}, token, stream);
0120 }
0121
0122 template <class Conn>
0123 struct exec_op {
0124 using req_info_type = typename Conn::req_info;
0125 using adapter_type = typename Conn::adapter_type;
0126
0127 Conn* conn_ = nullptr;
0128 std::shared_ptr<req_info_type> info_ = nullptr;
0129 asio::coroutine coro{};
0130
0131 template <class Self>
0132 void operator()(Self& self , system::error_code = {}, std::size_t = 0)
0133 {
0134 BOOST_ASIO_CORO_REENTER (coro)
0135 {
0136
0137
0138 if (info_->req_->get_config().cancel_if_not_connected && !conn_->is_open()) {
0139 BOOST_ASIO_CORO_YIELD
0140 asio::dispatch(
0141 asio::get_associated_immediate_executor(self, self.get_io_executor()),
0142 std::move(self));
0143 return self.complete(error::not_connected, 0);
0144 }
0145
0146 conn_->add_request_info(info_);
0147
0148 EXEC_OP_WAIT:
0149 BOOST_ASIO_CORO_YIELD
0150 info_->async_wait(std::move(self));
0151
0152 if (info_->ec_) {
0153 self.complete(info_->ec_, 0);
0154 return;
0155 }
0156
0157 if (info_->stop_requested()) {
0158
0159
0160 return self.complete(asio::error::operation_aborted, 0);
0161 }
0162
0163 if (is_cancelled(self)) {
0164 if (!info_->is_waiting()) {
0165 using c_t = asio::cancellation_type;
0166 auto const c = self.get_cancellation_state().cancelled();
0167 if ((c & c_t::terminal) != c_t::none) {
0168
0169
0170 conn_->cancel(operation::run);
0171 return self.complete(asio::error::operation_aborted, 0);
0172 } else {
0173
0174 self.get_cancellation_state().clear();
0175
0176
0177
0178 goto EXEC_OP_WAIT;
0179 }
0180 } else {
0181
0182 conn_->remove_request(info_);
0183 self.complete(asio::error::operation_aborted, 0);
0184 return;
0185 }
0186 }
0187
0188 self.complete(info_->ec_, info_->read_size_);
0189 }
0190 }
0191 };
0192
0193 template <class Conn, class Logger>
0194 struct writer_op {
0195 Conn* conn_;
0196 Logger logger_;
0197 asio::coroutine coro{};
0198
0199 template <class Self>
0200 void operator()( Self& self
0201 , system::error_code ec = {}
0202 , std::size_t n = 0)
0203 {
0204 ignore_unused(n);
0205
0206 BOOST_ASIO_CORO_REENTER (coro) for (;;)
0207 {
0208 while (conn_->coalesce_requests()) {
0209 if (conn_->use_ssl())
0210 BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
0211 else
0212 BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer().next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
0213
0214 logger_.on_write(ec, conn_->write_buffer_);
0215
0216 if (ec) {
0217 logger_.trace("writer_op (1)", ec);
0218 conn_->cancel(operation::run);
0219 self.complete(ec);
0220 return;
0221 }
0222
0223 conn_->on_write();
0224
0225
0226
0227
0228 if (!conn_->is_open()) {
0229 logger_.trace("writer_op (2): connection is closed.");
0230 self.complete({});
0231 return;
0232 }
0233 }
0234
0235 BOOST_ASIO_CORO_YIELD
0236 conn_->writer_timer_.async_wait(std::move(self));
0237 if (!conn_->is_open()) {
0238 logger_.trace("writer_op (3): connection is closed.");
0239
0240
0241
0242 self.complete({});
0243 return;
0244 }
0245 }
0246 }
0247 };
0248
0249 template <class Conn, class Logger>
0250 struct reader_op {
0251 using parse_result = typename Conn::parse_result;
0252 using parse_ret_type = typename Conn::parse_ret_type;
0253 Conn* conn_;
0254 Logger logger_;
0255 parse_ret_type res_{parse_result::resp, 0};
0256 asio::coroutine coro{};
0257
0258 template <class Self>
0259 void operator()( Self& self
0260 , system::error_code ec = {}
0261 , std::size_t n = 0)
0262 {
0263 ignore_unused(n);
0264
0265 BOOST_ASIO_CORO_REENTER (coro) for (;;)
0266 {
0267
0268 if ((res_.first == parse_result::needs_more) || std::empty(conn_->read_buffer_)) {
0269 if (conn_->use_ssl()) {
0270 BOOST_ASIO_CORO_YIELD
0271 async_append_some(
0272 conn_->next_layer(),
0273 conn_->dbuf_,
0274 conn_->get_suggested_buffer_growth(),
0275 std::move(self));
0276 } else {
0277 BOOST_ASIO_CORO_YIELD
0278 async_append_some(
0279 conn_->next_layer().next_layer(),
0280 conn_->dbuf_,
0281 conn_->get_suggested_buffer_growth(),
0282 std::move(self));
0283 }
0284
0285 logger_.on_read(ec, n);
0286
0287
0288 if (ec) {
0289 logger_.trace("reader_op (1)", ec);
0290 conn_->cancel(operation::run);
0291 self.complete(ec);
0292 return;
0293 }
0294
0295
0296
0297
0298 if (!conn_->is_open()) {
0299 logger_.trace("reader_op (2): connection is closed.");
0300 self.complete(ec);
0301 return;
0302 }
0303 }
0304
0305 res_ = conn_->on_read(buffer_view(conn_->dbuf_), ec);
0306 if (ec) {
0307 logger_.trace("reader_op (3)", ec);
0308 conn_->cancel(operation::run);
0309 self.complete(ec);
0310 return;
0311 }
0312
0313 if (res_.first == parse_result::push) {
0314 if (!conn_->receive_channel_.try_send(ec, res_.second)) {
0315 BOOST_ASIO_CORO_YIELD
0316 conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
0317 }
0318
0319 if (ec) {
0320 logger_.trace("reader_op (4)", ec);
0321 conn_->cancel(operation::run);
0322 self.complete(ec);
0323 return;
0324 }
0325
0326 if (!conn_->is_open()) {
0327 logger_.trace("reader_op (5): connection is closed.");
0328 self.complete(asio::error::operation_aborted);
0329 return;
0330 }
0331
0332 }
0333 }
0334 }
0335 };
0336
0337 template <class Conn, class Logger>
0338 class run_op {
0339 private:
0340 Conn* conn_ = nullptr;
0341 Logger logger_;
0342 asio::coroutine coro_{};
0343
0344 using order_t = std::array<std::size_t, 5>;
0345
0346 public:
0347 run_op(Conn* conn, Logger l)
0348 : conn_{conn}
0349 , logger_{l}
0350 {}
0351
0352 template <class Self>
0353 void operator()( Self& self
0354 , order_t order = {}
0355 , system::error_code ec0 = {}
0356 , system::error_code ec1 = {}
0357 , system::error_code ec2 = {}
0358 , system::error_code ec3 = {}
0359 , system::error_code ec4 = {})
0360 {
0361 BOOST_ASIO_CORO_REENTER (coro_) for (;;)
0362 {
0363 BOOST_ASIO_CORO_YIELD
0364 conn_->resv_.async_resolve(asio::prepend(std::move(self), order_t {}));
0365
0366 logger_.on_resolve(ec0, conn_->resv_.results());
0367
0368 if (ec0) {
0369 self.complete(ec0);
0370 return;
0371 }
0372
0373 BOOST_ASIO_CORO_YIELD
0374 conn_->ctor_.async_connect(
0375 conn_->next_layer().next_layer(),
0376 conn_->resv_.results(),
0377 asio::prepend(std::move(self), order_t {}));
0378
0379 logger_.on_connect(ec0, conn_->ctor_.endpoint());
0380
0381 if (ec0) {
0382 self.complete(ec0);
0383 return;
0384 }
0385
0386 if (conn_->use_ssl()) {
0387 BOOST_ASIO_CORO_YIELD
0388 conn_->next_layer().async_handshake(
0389 asio::ssl::stream_base::client,
0390 asio::prepend(
0391 asio::cancel_after(
0392 conn_->cfg_.ssl_handshake_timeout,
0393 std::move(self)
0394 ),
0395 order_t {}
0396 )
0397 );
0398
0399 logger_.on_ssl_handshake(ec0);
0400
0401 if (ec0) {
0402 self.complete(ec0);
0403 return;
0404 }
0405 }
0406
0407 conn_->reset();
0408
0409
0410
0411
0412 BOOST_ASIO_CORO_YIELD
0413 asio::experimental::make_parallel_group(
0414 [this](auto token) { return conn_->handshaker_.async_hello(*conn_, logger_, token); },
0415 [this](auto token) { return conn_->health_checker_.async_ping(*conn_, logger_, token); },
0416 [this](auto token) { return conn_->health_checker_.async_check_timeout(*conn_, logger_, token);},
0417 [this](auto token) { return conn_->reader(logger_, token);},
0418 [this](auto token) { return conn_->writer(logger_, token);}
0419 ).async_wait(
0420 asio::experimental::wait_for_one_error(),
0421 std::move(self));
0422
0423 if (order[0] == 0 && !!ec0) {
0424 self.complete(ec0);
0425 return;
0426 }
0427
0428 if (order[0] == 2 && ec2 == error::pong_timeout) {
0429 self.complete(ec1);
0430 return;
0431 }
0432
0433
0434
0435
0436 conn_->cancel(operation::receive);
0437
0438 if (!conn_->will_reconnect()) {
0439 conn_->cancel(operation::reconnection);
0440 self.complete(ec3);
0441 return;
0442 }
0443
0444
0445
0446 conn_->writer_timer_.expires_after(conn_->cfg_.reconnect_wait_interval);
0447
0448 BOOST_ASIO_CORO_YIELD
0449 conn_->writer_timer_.async_wait(asio::prepend(std::move(self), order_t {}));
0450 if (ec0) {
0451 self.complete(ec0);
0452 return;
0453 }
0454
0455 if (!conn_->will_reconnect()) {
0456 self.complete(asio::error::operation_aborted);
0457 return;
0458 }
0459
0460 conn_->reset_stream();
0461 }
0462 }
0463 };
0464
0465 }
0466
0467
0468
0469
0470
0471
0472
0473
0474
0475
0476
0477 template <class Executor>
0478 class basic_connection {
0479 public:
0480 using this_type = basic_connection<Executor>;
0481
0482
0483 using next_layer_type = asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>>;
0484
0485
0486 using executor_type = Executor;
0487
0488
0489 executor_type get_executor() noexcept
0490 {return writer_timer_.get_executor();}
0491
0492
0493 template <class Executor1>
0494 struct rebind_executor
0495 {
0496
0497 using other = basic_connection<Executor1>;
0498 };
0499
0500
0501
0502
0503
0504
0505
0506
0507 explicit
0508 basic_connection(
0509 executor_type ex,
0510 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
0511 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
0512 : ctx_{std::move(ctx)}
0513 , stream_{std::make_unique<next_layer_type>(ex, ctx_)}
0514 , writer_timer_{ex}
0515 , receive_channel_{ex, 256}
0516 , resv_{ex}
0517 , health_checker_{ex}
0518 , dbuf_{read_buffer_, max_read_size}
0519 {
0520 set_receive_response(ignore);
0521 writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)());
0522 }
0523
0524
0525 explicit
0526 basic_connection(
0527 asio::io_context& ioc,
0528 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
0529 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
0530 : basic_connection(ioc.get_executor(), std::move(ctx), max_read_size)
0531 { }
0532
0533
0534
0535
0536
0537
0538
0539
0540
0541
0542
0543
0544
0545
0546
0547
0548
0549
0550
0551
0552
0553
0554
0555
0556
0557
0558
0559
0560
0561
0562
0563
0564
0565
0566
0567
0568
0569
0570 template <
0571 class Logger = logger,
0572 class CompletionToken = asio::default_completion_token_t<executor_type>>
0573 auto
0574 async_run(
0575 config const& cfg = {},
0576 Logger l = Logger{},
0577 CompletionToken&& token = {})
0578 {
0579 cfg_ = cfg;
0580 resv_.set_config(cfg);
0581 ctor_.set_config(cfg);
0582 health_checker_.set_config(cfg);
0583 handshaker_.set_config(cfg);
0584 l.set_prefix(cfg.log_prefix);
0585
0586 return asio::async_compose
0587 < CompletionToken
0588 , void(system::error_code)
0589 >(detail::run_op<this_type, Logger>{this, l}, token, writer_timer_);
0590 }
0591
0592
0593
0594
0595
0596
0597
0598
0599
0600
0601
0602
0603
0604
0605
0606
0607
0608
0609
0610
0611
0612
0613
0614 template <class CompletionToken = asio::default_completion_token_t<executor_type>>
0615 auto async_receive(CompletionToken&& token = {})
0616 { return receive_channel_.async_receive(std::forward<CompletionToken>(token)); }
0617
0618
0619
0620
0621
0622
0623
0624
0625
0626
0627
0628
0629 std::size_t receive(system::error_code& ec)
0630 {
0631 std::size_t size = 0;
0632
0633 auto f = [&](system::error_code const& ec2, std::size_t n)
0634 {
0635 ec = ec2;
0636 size = n;
0637 };
0638
0639 auto const res = receive_channel_.try_receive(f);
0640 if (ec)
0641 return 0;
0642
0643 if (!res)
0644 ec = error::sync_receive_push_failed;
0645
0646 return size;
0647 }
0648
0649
0650
0651
0652
0653
0654
0655
0656
0657
0658
0659
0660
0661
0662
0663
0664
0665
0666
0667
0668
0669
0670
0671
0672 template <
0673 class Response = ignore_t,
0674 class CompletionToken = asio::default_completion_token_t<executor_type>
0675 >
0676 auto
0677 async_exec(
0678 request const& req,
0679 Response& resp = ignore,
0680 CompletionToken&& token = {})
0681 {
0682 return this->async_exec(req, any_adapter(resp), std::forward<CompletionToken>(token));
0683 }
0684
0685
0686
0687
0688
0689
0690 template <class CompletionToken = asio::default_completion_token_t<executor_type>>
0691 auto
0692 async_exec(
0693 request const& req,
0694 any_adapter adapter,
0695 CompletionToken&& token = {})
0696 {
0697 auto& adapter_impl = adapter.impl_;
0698 BOOST_ASSERT_MSG(req.get_expected_responses() <= adapter_impl.supported_response_size, "Request and response have incompatible sizes.");
0699
0700 auto info = std::make_shared<req_info>(req, std::move(adapter_impl.adapt_fn), get_executor());
0701
0702 return asio::async_compose
0703 < CompletionToken
0704 , void(system::error_code, std::size_t)
0705 >(detail::exec_op<this_type>{this, info}, token, writer_timer_);
0706 }
0707
0708
0709
0710
0711
0712
0713
0714
0715
0716
0717
0718
0719 void cancel(operation op = operation::all)
0720 {
0721 switch (op) {
0722 case operation::resolve:
0723 resv_.cancel();
0724 break;
0725 case operation::exec:
0726 cancel_unwritten_requests();
0727 break;
0728 case operation::reconnection:
0729 cfg_.reconnect_wait_interval = std::chrono::seconds::zero();
0730 break;
0731 case operation::run:
0732 cancel_run();
0733 break;
0734 case operation::receive:
0735 receive_channel_.cancel();
0736 break;
0737 case operation::health_check:
0738 health_checker_.cancel();
0739 break;
0740 case operation::all:
0741 resv_.cancel();
0742 cfg_.reconnect_wait_interval = std::chrono::seconds::zero();
0743 health_checker_.cancel();
0744 cancel_run();
0745 receive_channel_.cancel();
0746 cancel_unwritten_requests();
0747 break;
0748 default: ;
0749 }
0750 }
0751
0752 auto run_is_canceled() const noexcept
0753 { return cancel_run_called_; }
0754
0755
0756 bool will_reconnect() const noexcept
0757 { return cfg_.reconnect_wait_interval != std::chrono::seconds::zero();}
0758
0759
0760 auto const& get_ssl_context() const noexcept
0761 { return ctx_;}
0762
0763
0764 void reset_stream()
0765 { stream_ = std::make_unique<next_layer_type>(writer_timer_.get_executor(), ctx_); }
0766
0767
0768 auto& next_layer() noexcept
0769 { return *stream_; }
0770
0771
0772 auto const& next_layer() const noexcept
0773 { return *stream_; }
0774
0775 template <class Response>
0776 void set_receive_response(Response& response)
0777 {
0778 using namespace boost::redis::adapter;
0779 auto g = boost_redis_adapt(response);
0780 receive_adapter_ = adapter::detail::make_adapter_wrapper(g);
0781 }
0782
0783
0784 usage get_usage() const noexcept
0785 { return usage_; }
0786
0787 private:
0788 using clock_type = std::chrono::steady_clock;
0789 using clock_traits_type = asio::wait_traits<clock_type>;
0790 using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
0791
0792 using receive_channel_type = asio::experimental::channel<executor_type, void(system::error_code, std::size_t)>;
0793 using resolver_type = detail::resolver<Executor>;
0794 using health_checker_type = detail::health_checker<Executor>;
0795 using resp3_handshaker_type = detail::resp3_handshaker<executor_type>;
0796 using adapter_type = std::function<void(std::size_t, resp3::basic_node<std::string_view> const&, system::error_code&)>;
0797 using receiver_adapter_type = std::function<void(resp3::basic_node<std::string_view> const&, system::error_code&)>;
0798 using exec_notifier_type = receive_channel_type;
0799
0800 auto use_ssl() const noexcept
0801 { return cfg_.use_ssl;}
0802
0803 auto cancel_on_conn_lost() -> std::size_t
0804 {
0805
0806 auto cond = [](auto const& ptr)
0807 {
0808 BOOST_ASSERT(ptr != nullptr);
0809
0810 if (ptr->is_waiting()) {
0811 return !ptr->req_->get_config().cancel_on_connection_lost;
0812 } else {
0813 return !ptr->req_->get_config().cancel_if_unresponded;
0814 }
0815 };
0816
0817 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond);
0818
0819 auto const ret = std::distance(point, std::end(reqs_));
0820
0821 std::for_each(point, std::end(reqs_), [](auto const& ptr) {
0822 ptr->stop();
0823 });
0824
0825 reqs_.erase(point, std::end(reqs_));
0826
0827 std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
0828 return ptr->mark_waiting();
0829 });
0830
0831 return ret;
0832 }
0833
0834 auto cancel_unwritten_requests() -> std::size_t
0835 {
0836 auto f = [](auto const& ptr)
0837 {
0838 BOOST_ASSERT(ptr != nullptr);
0839 return !ptr->is_waiting();
0840 };
0841
0842 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f);
0843
0844 auto const ret = std::distance(point, std::end(reqs_));
0845
0846 std::for_each(point, std::end(reqs_), [](auto const& ptr) {
0847 ptr->stop();
0848 });
0849
0850 reqs_.erase(point, std::end(reqs_));
0851 return ret;
0852 }
0853
0854 void cancel_run()
0855 {
0856
0857
0858 if (std::exchange(cancel_run_called_, true)) {
0859 return;
0860 }
0861
0862 close();
0863 writer_timer_.cancel();
0864 receive_channel_.cancel();
0865 cancel_on_conn_lost();
0866 }
0867
0868 void on_write()
0869 {
0870
0871
0872 write_buffer_.clear();
0873
0874
0875 cancel_push_requests();
0876
0877
0878
0879 std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
0880 BOOST_ASSERT_MSG(ptr != nullptr, "Expects non-null pointer.");
0881 if (ptr->is_staged()) {
0882 ptr->mark_written();
0883 }
0884 });
0885 }
0886
0887 struct req_info {
0888 public:
0889 using node_type = resp3::basic_node<std::string_view>;
0890 using wrapped_adapter_type = std::function<void(node_type const&, system::error_code&)>;
0891
0892 explicit req_info(request const& req, adapter_type adapter, executor_type ex)
0893 : notifier_{ex, 1}
0894 , req_{&req}
0895 , adapter_{}
0896 , expected_responses_{req.get_expected_responses()}
0897 , status_{status::waiting}
0898 , ec_{{}}
0899 , read_size_{0}
0900 {
0901 adapter_ = [this, adapter](node_type const& nd, system::error_code& ec)
0902 {
0903 auto const i = req_->get_expected_responses() - expected_responses_;
0904 adapter(i, nd, ec);
0905 };
0906 }
0907
0908 auto proceed()
0909 {
0910 notifier_.try_send(std::error_code{}, 0);
0911 }
0912
0913 void stop()
0914 {
0915 notifier_.close();
0916 }
0917
0918 [[nodiscard]] auto is_waiting() const noexcept
0919 { return status_ == status::waiting; }
0920
0921 [[nodiscard]] auto is_written() const noexcept
0922 { return status_ == status::written; }
0923
0924 [[nodiscard]] auto is_staged() const noexcept
0925 { return status_ == status::staged; }
0926
0927 void mark_written() noexcept
0928 { status_ = status::written; }
0929
0930 void mark_staged() noexcept
0931 { status_ = status::staged; }
0932
0933 void mark_waiting() noexcept
0934 { status_ = status::waiting; }
0935
0936 [[nodiscard]] auto stop_requested() const noexcept
0937 { return !notifier_.is_open();}
0938
0939 template <class CompletionToken>
0940 auto async_wait(CompletionToken&& token)
0941 {
0942 return notifier_.async_receive(std::forward<CompletionToken>(token));
0943 }
0944
0945
0946 enum class status
0947 { waiting
0948 , staged
0949 , written
0950 };
0951
0952 exec_notifier_type notifier_;
0953 request const* req_;
0954 wrapped_adapter_type adapter_;
0955
0956
0957 std::size_t expected_responses_;
0958 status status_;
0959
0960 system::error_code ec_;
0961 std::size_t read_size_;
0962 };
0963
0964 void remove_request(std::shared_ptr<req_info> const& info)
0965 {
0966 reqs_.erase(std::remove(std::begin(reqs_), std::end(reqs_), info));
0967 }
0968
0969 using reqs_type = std::deque<std::shared_ptr<req_info>>;
0970
0971 template <class, class> friend struct detail::reader_op;
0972 template <class, class> friend struct detail::writer_op;
0973 template <class> friend struct detail::exec_op;
0974 template <class, class> friend class detail::run_op;
0975
0976 void cancel_push_requests()
0977 {
0978 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
0979 return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0);
0980 });
0981
0982 std::for_each(point, std::end(reqs_), [](auto const& ptr) {
0983 ptr->proceed();
0984 });
0985
0986 reqs_.erase(point, std::end(reqs_));
0987 }
0988
0989 [[nodiscard]] bool is_writing() const noexcept
0990 {
0991 return !write_buffer_.empty();
0992 }
0993
0994 void add_request_info(std::shared_ptr<req_info> const& info)
0995 {
0996 reqs_.push_back(info);
0997
0998 if (info->req_->has_hello_priority()) {
0999 auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) {
1000 return e->is_waiting();
1001 });
1002
1003 std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend);
1004 }
1005
1006 if (is_open() && !is_writing())
1007 writer_timer_.cancel();
1008 }
1009
1010 template <class CompletionToken, class Logger>
1011 auto reader(Logger l, CompletionToken&& token)
1012 {
1013 return asio::async_compose
1014 < CompletionToken
1015 , void(system::error_code)
1016 >(detail::reader_op<this_type, Logger>{this, l},
1017 std::forward<CompletionToken>(token), writer_timer_);
1018 }
1019
1020 template <class CompletionToken, class Logger>
1021 auto writer(Logger l, CompletionToken&& token)
1022 {
1023 return asio::async_compose
1024 < CompletionToken
1025 , void(system::error_code)
1026 >(detail::writer_op<this_type, Logger>{this, l}, std::forward<CompletionToken>(token), writer_timer_);
1027 }
1028
1029 [[nodiscard]] bool coalesce_requests()
1030 {
1031
1032
1033 auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](auto const& ri) {
1034 return !ri->is_waiting();
1035 });
1036
1037 std::for_each(point, std::cend(reqs_), [this](auto const& ri) {
1038
1039 write_buffer_ += ri->req_->payload();
1040 ri->mark_staged();
1041 usage_.commands_sent += ri->expected_responses_;
1042 });
1043
1044 usage_.bytes_sent += std::size(write_buffer_);
1045
1046 return point != std::cend(reqs_);
1047 }
1048
1049 bool is_waiting_response() const noexcept
1050 {
1051 if (std::empty(reqs_))
1052 return false;
1053
1054
1055
1056
1057
1058 return !reqs_.front()->is_waiting();
1059 }
1060
1061 void close()
1062 {
1063 if (stream_->next_layer().is_open()) {
1064 system::error_code ec;
1065 stream_->next_layer().close(ec);
1066 }
1067 }
1068
1069 auto is_open() const noexcept { return stream_->next_layer().is_open(); }
1070 auto& lowest_layer() noexcept { return stream_->lowest_layer(); }
1071
1072 auto is_next_push()
1073 {
1074 BOOST_ASSERT(!read_buffer_.empty());
1075
1076
1077
1078
1079
1080
1081
1082
1083 if (resp3::to_type(read_buffer_.front()) == resp3::type::push)
1084 return true;
1085
1086
1087
1088
1089
1090
1091
1092
1093 if (reqs_.empty())
1094 return true;
1095
1096
1097
1098 if (reqs_.front()->expected_responses_ == 0)
1099 return true;
1100
1101
1102
1103
1104
1105
1106 return reqs_.front()->is_waiting();
1107 }
1108
1109 auto get_suggested_buffer_growth() const noexcept
1110 {
1111 return parser_.get_suggested_buffer_growth(4096);
1112 }
1113
1114 enum class parse_result { needs_more, push, resp };
1115
1116 using parse_ret_type = std::pair<parse_result, std::size_t>;
1117
1118 parse_ret_type on_finish_parsing(parse_result t)
1119 {
1120 if (t == parse_result::push) {
1121 usage_.pushes_received += 1;
1122 usage_.push_bytes_received += parser_.get_consumed();
1123 } else {
1124 usage_.responses_received += 1;
1125 usage_.response_bytes_received += parser_.get_consumed();
1126 }
1127
1128 on_push_ = false;
1129 dbuf_.consume(parser_.get_consumed());
1130 auto const res = std::make_pair(t, parser_.get_consumed());
1131 parser_.reset();
1132 return res;
1133 }
1134
1135 parse_ret_type on_read(std::string_view data, system::error_code& ec)
1136 {
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147 if (!on_push_)
1148 on_push_ = is_next_push();
1149
1150 if (on_push_) {
1151 if (!resp3::parse(parser_, data, receive_adapter_, ec))
1152 return std::make_pair(parse_result::needs_more, 0);
1153
1154 if (ec)
1155 return std::make_pair(parse_result::push, 0);
1156
1157 return on_finish_parsing(parse_result::push);
1158 }
1159
1160 BOOST_ASSERT_MSG(is_waiting_response(), "Not waiting for a response (using MONITOR command perhaps?)");
1161 BOOST_ASSERT(!reqs_.empty());
1162 BOOST_ASSERT(reqs_.front() != nullptr);
1163 BOOST_ASSERT(reqs_.front()->expected_responses_ != 0);
1164
1165 if (!resp3::parse(parser_, data, reqs_.front()->adapter_, ec))
1166 return std::make_pair(parse_result::needs_more, 0);
1167
1168 if (ec) {
1169 reqs_.front()->ec_ = ec;
1170 reqs_.front()->proceed();
1171 return std::make_pair(parse_result::resp, 0);
1172 }
1173
1174 reqs_.front()->read_size_ += parser_.get_consumed();
1175
1176 if (--reqs_.front()->expected_responses_ == 0) {
1177
1178 reqs_.front()->proceed();
1179 reqs_.pop_front();
1180 }
1181
1182 return on_finish_parsing(parse_result::resp);
1183 }
1184
1185 void reset()
1186 {
1187 write_buffer_.clear();
1188 read_buffer_.clear();
1189 parser_.reset();
1190 on_push_ = false;
1191 cancel_run_called_ = false;
1192 }
1193
1194 asio::ssl::context ctx_;
1195 std::unique_ptr<next_layer_type> stream_;
1196
1197
1198
1199
1200 timer_type writer_timer_;
1201 receive_channel_type receive_channel_;
1202 resolver_type resv_;
1203 detail::connector ctor_;
1204 health_checker_type health_checker_;
1205 resp3_handshaker_type handshaker_;
1206 receiver_adapter_type receive_adapter_;
1207
1208 using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
1209
1210 config cfg_;
1211 std::string read_buffer_;
1212 dyn_buffer_type dbuf_;
1213 std::string write_buffer_;
1214 reqs_type reqs_;
1215 resp3::parser parser_{};
1216 bool on_push_ = false;
1217 bool cancel_run_called_ = false;
1218
1219 usage usage_;
1220 };
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231 class connection {
1232 public:
1233
1234 using executor_type = asio::any_io_executor;
1235
1236
1237 explicit
1238 connection(
1239 executor_type ex,
1240 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
1241 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)());
1242
1243
1244 explicit
1245 connection(
1246 asio::io_context& ioc,
1247 asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
1248 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)());
1249
1250
1251 executor_type get_executor() noexcept
1252 { return impl_.get_executor(); }
1253
1254
1255 template <class CompletionToken = asio::deferred_t>
1256 auto async_run(config const& cfg, logger l, CompletionToken&& token = {})
1257 {
1258 return asio::async_initiate<
1259 CompletionToken, void(boost::system::error_code)>(
1260 [](auto handler, connection* self, config const* cfg, logger l)
1261 {
1262 self->async_run_impl(*cfg, l, std::move(handler));
1263 }, token, this, &cfg, l);
1264 }
1265
1266
1267 template <class CompletionToken = asio::deferred_t>
1268 auto async_receive(CompletionToken&& token = {})
1269 { return impl_.async_receive(std::forward<CompletionToken>(token)); }
1270
1271
1272 std::size_t receive(system::error_code& ec)
1273 {
1274 return impl_.receive(ec);
1275 }
1276
1277
1278 template <class Response, class CompletionToken = asio::deferred_t>
1279 auto async_exec(request const& req, Response& resp, CompletionToken&& token = {})
1280 {
1281 return async_exec(req, any_adapter(resp), std::forward<CompletionToken>(token));
1282 }
1283
1284
1285 template <class CompletionToken = asio::deferred_t>
1286 auto
1287 async_exec(
1288 request const& req,
1289 any_adapter adapter,
1290 CompletionToken&& token = {})
1291 {
1292 return asio::async_initiate<
1293 CompletionToken, void(boost::system::error_code, std::size_t)>(
1294 [](auto handler, connection* self, request const* req, any_adapter&& adapter)
1295 {
1296 self->async_exec_impl(*req, std::move(adapter), std::move(handler));
1297 }, token, this, &req, std::move(adapter));
1298 }
1299
1300
1301 void cancel(operation op = operation::all);
1302
1303
1304 bool will_reconnect() const noexcept
1305 { return impl_.will_reconnect();}
1306
1307
1308 auto& next_layer() noexcept
1309 { return impl_.next_layer(); }
1310
1311
1312 auto const& next_layer() const noexcept
1313 { return impl_.next_layer(); }
1314
1315
1316 void reset_stream()
1317 { impl_.reset_stream();}
1318
1319
1320 template <class Response>
1321 void set_receive_response(Response& response)
1322 { impl_.set_receive_response(response); }
1323
1324
1325 usage get_usage() const noexcept
1326 { return impl_.get_usage(); }
1327
1328
1329 auto const& get_ssl_context() const noexcept
1330 { return impl_.get_ssl_context();}
1331
1332 private:
1333 void
1334 async_run_impl(
1335 config const& cfg,
1336 logger l,
1337 asio::any_completion_handler<void(boost::system::error_code)> token);
1338
1339 void
1340 async_exec_impl(
1341 request const& req,
1342 any_adapter&& adapter,
1343 asio::any_completion_handler<void(boost::system::error_code, std::size_t)> token);
1344
1345 basic_connection<executor_type> impl_;
1346 };
1347
1348 }
1349
1350 #endif