Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-17 08:38:20

0001 //
0002 // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
0003 //
0004 // Distributed under the Boost Software License, Version 1.0.
0005 // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MQTT5_ASSEMBLE_OP_HPP
0009 #define BOOST_MQTT5_ASSEMBLE_OP_HPP
0010 
0011 #include <boost/mqtt5/error.hpp>
0012 
0013 #include <boost/mqtt5/detail/control_packet.hpp>
0014 #include <boost/mqtt5/detail/internal_types.hpp>
0015 
0016 #include <boost/mqtt5/impl/codecs/message_decoders.hpp>
0017 
0018 #include <boost/asio/append.hpp>
0019 #include <boost/asio/associated_allocator.hpp>
0020 #include <boost/asio/buffer.hpp>
0021 #include <boost/asio/completion_condition.hpp>
0022 #include <boost/asio/post.hpp>
0023 #include <boost/asio/prepend.hpp>
0024 #include <boost/assert.hpp>
0025 #include <boost/system/error_code.hpp>
0026 
0027 #include <chrono>
0028 #include <cstdint>
0029 #include <string>
0030 #include <utility>
0031 
0032 namespace boost::mqtt5::detail {
0033 
0034 namespace asio = boost::asio;
0035 
0036 class data_span : private std::pair<byte_citer, byte_citer> {
0037     using base = std::pair<byte_citer, byte_citer>;
0038 public:
0039     using base::base;
0040 
0041     auto first() const {
0042         return base::first;
0043     }
0044     auto last() const {
0045         return base::second;
0046     }
0047     void expand_suffix(size_t num_chars) {
0048         base::second += num_chars;
0049     }
0050     void remove_prefix(size_t num_chars) {
0051         base::first += num_chars;
0052     }
0053     size_t size() const {
0054         return std::distance(base::first, base::second);
0055     }
0056 };
0057 
0058 
0059 template <typename ClientService, typename Handler>
0060 class assemble_op {
0061     using client_service = ClientService;
0062     using handler_type = Handler;
0063 
0064     struct on_read {};
0065 
0066     static constexpr uint32_t max_recv_size = 65'536;
0067 
0068     client_service& _svc;
0069     handler_type _handler;
0070 
0071     std::string& _read_buff;
0072     data_span& _data_span;
0073 
0074 public:
0075     assemble_op(
0076         client_service& svc, handler_type&& handler,
0077         std::string& read_buff, data_span& active_span
0078     ) :
0079         _svc(svc),
0080         _handler(std::move(handler)),
0081         _read_buff(read_buff), _data_span(active_span)
0082     {}
0083 
0084     assemble_op(assemble_op&&) noexcept = default;
0085     assemble_op(const assemble_op&) = delete;
0086 
0087     assemble_op& operator=(assemble_op&&) noexcept = default;
0088     assemble_op& operator=(const assemble_op&) = delete;
0089 
0090     using allocator_type = asio::associated_allocator_t<handler_type>;
0091     allocator_type get_allocator() const noexcept {
0092         return asio::get_associated_allocator(_handler);
0093     }
0094 
0095     using executor_type = asio::associated_executor_t<handler_type>;
0096     executor_type get_executor() const noexcept {
0097         return asio::get_associated_executor(_handler);
0098     }
0099 
0100     template <typename CompletionCondition>
0101     void perform(CompletionCondition cc) {
0102         _read_buff.erase(
0103             _read_buff.cbegin(), _data_span.first()
0104         );
0105         _read_buff.resize(
0106             _svc.connect_property(prop::maximum_packet_size).value_or(max_recv_size)
0107         );
0108         _data_span = {
0109             _read_buff.cbegin(),
0110             _read_buff.cbegin() + _data_span.size()
0111         };
0112 
0113         if (cc(error_code {}, 0) == 0 && _data_span.size()) {
0114             return asio::post(
0115                 _svc.get_executor(),
0116                 asio::prepend(
0117                     std::move(*this), on_read {}, error_code {},
0118                     0, std::move(cc)
0119                 )
0120             );
0121         }
0122 
0123         // Must be evaluated before this is moved
0124         auto store_begin = _read_buff.data() + _data_span.size();
0125         auto store_size = std::distance(_data_span.last(), _read_buff.cend());
0126 
0127         _svc._stream.async_read_some(
0128             asio::buffer(store_begin, store_size), compute_read_timeout(),
0129             asio::prepend(
0130                 asio::append(std::move(*this), std::move(cc)),
0131                 on_read {}
0132             )
0133         );
0134     }
0135 
0136     template <typename CompletionCondition>
0137     void operator()(
0138         on_read, error_code ec, size_t bytes_read,
0139         CompletionCondition cc
0140     ) {
0141         if (ec == asio::error::try_again) {
0142             _svc.update_session_state();
0143             _svc._async_sender.resend();
0144             _data_span = { _read_buff.cend(), _read_buff.cend() };
0145             return perform(std::move(cc));
0146         }
0147 
0148         if (ec)
0149             return complete(ec, 0, {}, {});
0150 
0151         _data_span.expand_suffix(bytes_read);
0152         BOOST_ASSERT(_data_span.size());
0153 
0154         auto control_byte = uint8_t(*_data_span.first());
0155 
0156         if ((control_byte & 0b11110000) == 0)
0157             // close the connection, cancel
0158             return complete(client::error::malformed_packet, 0, {}, {});
0159 
0160         auto first = _data_span.first() + 1;
0161         auto varlen = decoders::type_parse(
0162             first, _data_span.last(), decoders::basic::varint_
0163         );
0164 
0165         if (!varlen) {
0166             if (_data_span.size() < 5)
0167                 return perform(asio::transfer_at_least(1));
0168             return complete(client::error::malformed_packet, 0, {}, {});
0169         }
0170 
0171         auto recv_size = _svc.connect_property(prop::maximum_packet_size)
0172             .value_or(max_recv_size);
0173         if (static_cast<uint32_t>(*varlen) > recv_size - std::distance(_data_span.first(), first))
0174             return complete(client::error::malformed_packet, 0, {}, {});
0175 
0176         if (std::distance(first, _data_span.last()) < *varlen)
0177             return perform(asio::transfer_at_least(1));
0178 
0179         _data_span.remove_prefix(
0180             std::distance(_data_span.first(), first) + *varlen
0181         );
0182 
0183         dispatch(control_byte, first, first + *varlen);
0184     }
0185 
0186 private:
0187     duration compute_read_timeout() const {
0188         auto negotiated_ka = _svc.negotiated_keep_alive();
0189         return negotiated_ka ?
0190             std::chrono::milliseconds(3 * negotiated_ka * 1000 / 2) :
0191             duration((std::numeric_limits<duration::rep>::max)());
0192     }
0193 
0194     static bool valid_header(uint8_t control_byte) {
0195         auto code = control_code_e(control_byte & 0b11110000);
0196 
0197         if (code == control_code_e::publish)
0198             return true;
0199 
0200         auto res = control_byte & 0b00001111;
0201         if (code == control_code_e::pubrel)
0202             return res == 0b00000010;
0203         return res == 0b00000000;
0204     }
0205 
0206     void dispatch(
0207         uint8_t control_byte, byte_citer first, byte_citer last
0208     ) {
0209         using namespace decoders;
0210 
0211         if (!valid_header(control_byte))
0212             return complete(client::error::malformed_packet, 0, {}, {});
0213 
0214         auto code = control_code_e(control_byte & 0b11110000);
0215 
0216         if (code == control_code_e::pingresp)
0217             return perform(asio::transfer_at_least(0));
0218 
0219         bool is_reply = code != control_code_e::publish &&
0220             code != control_code_e::auth &&
0221             code != control_code_e::disconnect;
0222 
0223         if (is_reply) {
0224             auto packet_id = decoders::decode_packet_id(first).value();
0225             _svc._replies.dispatch(error_code {}, code, packet_id, first, last);
0226             return perform(asio::transfer_at_least(0));
0227         }
0228 
0229         complete(error_code {}, control_byte, first, last);
0230     }
0231 
0232     void complete(
0233         error_code ec, uint8_t control_code,
0234         byte_citer first, byte_citer last
0235     ) {
0236         if (ec)
0237             _data_span = { _read_buff.cend(), _read_buff.cend() };
0238         std::move(_handler)(ec, control_code, first, last);
0239     }
0240 };
0241 
0242 } // end namespace boost::mqtt5::detail
0243 
0244 #endif // !BOOST_MQTT5_ASSEMBLE_OP_HPP