File indexing completed on 2025-09-17 08:38:20
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MQTT5_ASSEMBLE_OP_HPP
0009 #define BOOST_MQTT5_ASSEMBLE_OP_HPP
0010
0011 #include <boost/mqtt5/error.hpp>
0012
0013 #include <boost/mqtt5/detail/control_packet.hpp>
0014 #include <boost/mqtt5/detail/internal_types.hpp>
0015
0016 #include <boost/mqtt5/impl/codecs/message_decoders.hpp>
0017
0018 #include <boost/asio/append.hpp>
0019 #include <boost/asio/associated_allocator.hpp>
0020 #include <boost/asio/buffer.hpp>
0021 #include <boost/asio/completion_condition.hpp>
0022 #include <boost/asio/post.hpp>
0023 #include <boost/asio/prepend.hpp>
0024 #include <boost/assert.hpp>
0025 #include <boost/system/error_code.hpp>
0026
0027 #include <chrono>
0028 #include <cstdint>
0029 #include <string>
0030 #include <utility>
0031
0032 namespace boost::mqtt5::detail {
0033
0034 namespace asio = boost::asio;
0035
0036 class data_span : private std::pair<byte_citer, byte_citer> {
0037 using base = std::pair<byte_citer, byte_citer>;
0038 public:
0039 using base::base;
0040
0041 auto first() const {
0042 return base::first;
0043 }
0044 auto last() const {
0045 return base::second;
0046 }
0047 void expand_suffix(size_t num_chars) {
0048 base::second += num_chars;
0049 }
0050 void remove_prefix(size_t num_chars) {
0051 base::first += num_chars;
0052 }
0053 size_t size() const {
0054 return std::distance(base::first, base::second);
0055 }
0056 };
0057
0058
0059 template <typename ClientService, typename Handler>
0060 class assemble_op {
0061 using client_service = ClientService;
0062 using handler_type = Handler;
0063
0064 struct on_read {};
0065
0066 static constexpr uint32_t max_recv_size = 65'536;
0067
0068 client_service& _svc;
0069 handler_type _handler;
0070
0071 std::string& _read_buff;
0072 data_span& _data_span;
0073
0074 public:
0075 assemble_op(
0076 client_service& svc, handler_type&& handler,
0077 std::string& read_buff, data_span& active_span
0078 ) :
0079 _svc(svc),
0080 _handler(std::move(handler)),
0081 _read_buff(read_buff), _data_span(active_span)
0082 {}
0083
0084 assemble_op(assemble_op&&) noexcept = default;
0085 assemble_op(const assemble_op&) = delete;
0086
0087 assemble_op& operator=(assemble_op&&) noexcept = default;
0088 assemble_op& operator=(const assemble_op&) = delete;
0089
0090 using allocator_type = asio::associated_allocator_t<handler_type>;
0091 allocator_type get_allocator() const noexcept {
0092 return asio::get_associated_allocator(_handler);
0093 }
0094
0095 using executor_type = asio::associated_executor_t<handler_type>;
0096 executor_type get_executor() const noexcept {
0097 return asio::get_associated_executor(_handler);
0098 }
0099
0100 template <typename CompletionCondition>
0101 void perform(CompletionCondition cc) {
0102 _read_buff.erase(
0103 _read_buff.cbegin(), _data_span.first()
0104 );
0105 _read_buff.resize(
0106 _svc.connect_property(prop::maximum_packet_size).value_or(max_recv_size)
0107 );
0108 _data_span = {
0109 _read_buff.cbegin(),
0110 _read_buff.cbegin() + _data_span.size()
0111 };
0112
0113 if (cc(error_code {}, 0) == 0 && _data_span.size()) {
0114 return asio::post(
0115 _svc.get_executor(),
0116 asio::prepend(
0117 std::move(*this), on_read {}, error_code {},
0118 0, std::move(cc)
0119 )
0120 );
0121 }
0122
0123 // Must be evaluated before this is moved
0124 auto store_begin = _read_buff.data() + _data_span.size();
0125 auto store_size = std::distance(_data_span.last(), _read_buff.cend());
0126
0127 _svc._stream.async_read_some(
0128 asio::buffer(store_begin, store_size), compute_read_timeout(),
0129 asio::prepend(
0130 asio::append(std::move(*this), std::move(cc)),
0131 on_read {}
0132 )
0133 );
0134 }
0135
0136 template <typename CompletionCondition>
0137 void operator()(
0138 on_read, error_code ec, size_t bytes_read,
0139 CompletionCondition cc
0140 ) {
0141 if (ec == asio::error::try_again) {
0142 _svc.update_session_state();
0143 _svc._async_sender.resend();
0144 _data_span = { _read_buff.cend(), _read_buff.cend() };
0145 return perform(std::move(cc));
0146 }
0147
0148 if (ec)
0149 return complete(ec, 0, {}, {});
0150
0151 _data_span.expand_suffix(bytes_read);
0152 BOOST_ASSERT(_data_span.size());
0153
0154 auto control_byte = uint8_t(*_data_span.first());
0155
0156 if ((control_byte & 0b11110000) == 0)
0157 // close the connection, cancel
0158 return complete(client::error::malformed_packet, 0, {}, {});
0159
0160 auto first = _data_span.first() + 1;
0161 auto varlen = decoders::type_parse(
0162 first, _data_span.last(), decoders::basic::varint_
0163 );
0164
0165 if (!varlen) {
0166 if (_data_span.size() < 5)
0167 return perform(asio::transfer_at_least(1));
0168 return complete(client::error::malformed_packet, 0, {}, {});
0169 }
0170
0171 auto recv_size = _svc.connect_property(prop::maximum_packet_size)
0172 .value_or(max_recv_size);
0173 if (static_cast<uint32_t>(*varlen) > recv_size - std::distance(_data_span.first(), first))
0174 return complete(client::error::malformed_packet, 0, {}, {});
0175
0176 if (std::distance(first, _data_span.last()) < *varlen)
0177 return perform(asio::transfer_at_least(1));
0178
0179 _data_span.remove_prefix(
0180 std::distance(_data_span.first(), first) + *varlen
0181 );
0182
0183 dispatch(control_byte, first, first + *varlen);
0184 }
0185
0186 private:
0187 duration compute_read_timeout() const {
0188 auto negotiated_ka = _svc.negotiated_keep_alive();
0189 return negotiated_ka ?
0190 std::chrono::milliseconds(3 * negotiated_ka * 1000 / 2) :
0191 duration((std::numeric_limits<duration::rep>::max)());
0192 }
0193
0194 static bool valid_header(uint8_t control_byte) {
0195 auto code = control_code_e(control_byte & 0b11110000);
0196
0197 if (code == control_code_e::publish)
0198 return true;
0199
0200 auto res = control_byte & 0b00001111;
0201 if (code == control_code_e::pubrel)
0202 return res == 0b00000010;
0203 return res == 0b00000000;
0204 }
0205
0206 void dispatch(
0207 uint8_t control_byte, byte_citer first, byte_citer last
0208 ) {
0209 using namespace decoders;
0210
0211 if (!valid_header(control_byte))
0212 return complete(client::error::malformed_packet, 0, {}, {});
0213
0214 auto code = control_code_e(control_byte & 0b11110000);
0215
0216 if (code == control_code_e::pingresp)
0217 return perform(asio::transfer_at_least(0));
0218
0219 bool is_reply = code != control_code_e::publish &&
0220 code != control_code_e::auth &&
0221 code != control_code_e::disconnect;
0222
0223 if (is_reply) {
0224 auto packet_id = decoders::decode_packet_id(first).value();
0225 _svc._replies.dispatch(error_code {}, code, packet_id, first, last);
0226 return perform(asio::transfer_at_least(0));
0227 }
0228
0229 complete(error_code {}, control_byte, first, last);
0230 }
0231
0232 void complete(
0233 error_code ec, uint8_t control_code,
0234 byte_citer first, byte_citer last
0235 ) {
0236 if (ec)
0237 _data_span = { _read_buff.cend(), _read_buff.cend() };
0238 std::move(_handler)(ec, control_code, first, last);
0239 }
0240 };
0241
0242 } // end namespace boost::mqtt5::detail
0243
0244 #endif // !BOOST_MQTT5_ASSEMBLE_OP_HPP