File indexing completed on 2025-09-17 08:38:22
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MQTT5_READ_OP_HPP
0009 #define BOOST_MQTT5_READ_OP_HPP
0010
0011 #include <boost/mqtt5/detail/internal_types.hpp>
0012
0013 #include <boost/asio/associated_allocator.hpp>
0014 #include <boost/asio/associated_executor.hpp>
0015 #include <boost/asio/deferred.hpp>
0016 #include <boost/asio/error.hpp>
0017 #include <boost/asio/experimental/parallel_group.hpp>
0018 #include <boost/asio/post.hpp>
0019 #include <boost/asio/prepend.hpp>
0020
0021 #include <array>
0022
0023 namespace boost::mqtt5::detail {
0024
0025 namespace asio = boost::asio;
0026 namespace asioex = boost::asio::experimental;
0027
0028 template <typename Owner, typename Handler>
0029 class read_op {
0030 struct on_read {};
0031 struct on_reconnect {};
0032
0033 Owner& _owner;
0034
0035 using handler_type = Handler;
0036 handler_type _handler;
0037
0038 public:
0039 read_op(Owner& owner, Handler&& handler) :
0040 _owner(owner), _handler(std::move(handler))
0041 {}
0042
0043 read_op(read_op&&) = default;
0044 read_op(const read_op&) = delete;
0045
0046 read_op& operator=(read_op&&) = default;
0047 read_op& operator=(const read_op&) = delete;
0048
0049 using allocator_type = asio::associated_allocator_t<handler_type>;
0050 allocator_type get_allocator() const noexcept {
0051 return asio::get_associated_allocator(_handler);
0052 }
0053
0054 using executor_type = asio::associated_executor_t<handler_type>;
0055 executor_type get_executor() const noexcept {
0056 return asio::get_associated_executor(_handler);
0057 }
0058
0059 template <typename BufferType>
0060 void perform(
0061 const BufferType& buffer, duration wait_for
0062 ) {
0063 auto stream_ptr = _owner._stream_ptr;
0064
0065 if (_owner.was_connected()) {
0066 _owner._read_timer.expires_after(wait_for);
0067
0068 auto timed_read = asioex::make_parallel_group(
0069 stream_ptr->async_read_some(buffer, asio::deferred),
0070 _owner._read_timer.async_wait(asio::deferred)
0071 );
0072
0073 timed_read.async_wait(
0074 asioex::wait_for_one(),
0075 asio::prepend(std::move(*this), on_read {}, stream_ptr)
0076 );
0077 }
0078 else
0079 asio::post(
0080 _owner.get_executor(),
0081 asio::prepend(
0082 std::move(*this), on_read {}, stream_ptr,
0083 std::array<size_t, 2> { 0, 1 },
0084 asio::error::not_connected, 0, error_code {}
0085 )
0086 );
0087 }
0088
0089 void operator()(
0090 on_read, typename Owner::stream_ptr stream_ptr,
0091 std::array<std::size_t, 2> ord, error_code read_ec, size_t bytes_read,
0092 error_code
0093 ) {
0094 if (!_owner.is_open())
0095 return complete(asio::error::operation_aborted, bytes_read);
0096
0097 error_code ec = ord[0] == 1 ? asio::error::timed_out : read_ec;
0098 bytes_read = ord[0] == 0 ? bytes_read : 0;
0099
0100 if (!ec)
0101 return complete(ec, bytes_read);
0102
0103
0104 if (should_reconnect(ec) || ec == asio::error::operation_aborted)
0105 return _owner.async_reconnect(
0106 stream_ptr, asio::prepend(std::move(*this), on_reconnect {})
0107 );
0108
0109 return complete(asio::error::no_recovery, bytes_read);
0110 }
0111
0112 void operator()(on_reconnect, error_code ec) {
0113 if ((ec == asio::error::operation_aborted && _owner.is_open()) || !ec)
0114 ec = asio::error::try_again;
0115
0116 return complete(ec, 0);
0117 }
0118
0119 private:
0120 void complete(error_code ec, size_t bytes_read) {
0121 std::move(_handler)(ec, bytes_read);
0122 }
0123
0124 static bool should_reconnect(error_code ec) {
0125 using namespace asio::error;
0126
0127 return ec.value() == 1236L ||
0128 ec.value() == 121L ||
0129 ec == connection_aborted || ec == not_connected ||
0130 ec == timed_out || ec == connection_reset ||
0131 ec == broken_pipe || ec == asio::error::eof;
0132 }
0133
0134 };
0135
0136
0137 }
0138
0139 #endif