Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-17 08:38:21

0001 //
0002 // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
0003 //
0004 // Distributed under the Boost Software License, Version 1.0.
0005 // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MQTT5_READ_MESSAGE_OP_HPP
0009 #define BOOST_MQTT5_READ_MESSAGE_OP_HPP
0010 
0011 #include <boost/mqtt5/reason_codes.hpp>
0012 #include <boost/mqtt5/types.hpp>
0013 
0014 #include <boost/mqtt5/detail/control_packet.hpp>
0015 
0016 #include <boost/mqtt5/impl/codecs/message_decoders.hpp>
0017 #include <boost/mqtt5/impl/disconnect_op.hpp>
0018 #include <boost/mqtt5/impl/publish_rec_op.hpp>
0019 #include <boost/mqtt5/impl/re_auth_op.hpp>
0020 
0021 #include <boost/asio/error.hpp>
0022 #include <boost/asio/prepend.hpp>
0023 #include <boost/asio/recycling_allocator.hpp>
0024 #include <boost/assert.hpp>
0025 
0026 #include <cstdint>
0027 #include <memory>
0028 
0029 namespace boost::mqtt5::detail {
0030 
0031 namespace asio = boost::asio;
0032 
0033 template <typename ClientService, typename Handler>
0034 class read_message_op {
0035     using client_service = ClientService;
0036     using handler_type = Handler;
0037 
0038     struct on_message {};
0039     struct on_disconnect {};
0040 
0041     std::shared_ptr<client_service> _svc_ptr;
0042     handler_type _handler;
0043 
0044 public:
0045     read_message_op(std::shared_ptr<client_service> svc_ptr, Handler&& handler)
0046         : _svc_ptr(std::move(svc_ptr)), _handler(std::move(handler))
0047     {}
0048 
0049     read_message_op(read_message_op&&) noexcept = default;
0050     read_message_op(const read_message_op&) = delete;
0051 
0052     read_message_op& operator=(read_message_op&&) noexcept = default;
0053     read_message_op& operator=(const read_message_op&) = delete;
0054 
0055     using allocator_type = asio::associated_allocator_t<handler_type>;
0056     allocator_type get_allocator() const noexcept {
0057         return asio::get_associated_allocator(_handler);
0058     }
0059 
0060     using executor_type = typename client_service::executor_type;
0061     executor_type get_executor() const noexcept {
0062         return _svc_ptr->get_executor();
0063     }
0064 
0065     void perform() {
0066         _svc_ptr->async_assemble(
0067             asio::prepend(std::move(*this), on_message {})
0068         );
0069     }
0070 
0071     void operator()(
0072         on_message, error_code ec,
0073         uint8_t control_code,
0074         byte_citer first, byte_citer last
0075     ) {
0076         if (ec == client::error::malformed_packet)
0077             return on_malformed_packet(
0078                 "Malformed Packet received from the Server"
0079             );
0080 
0081         if (ec == asio::error::no_recovery)
0082             _svc_ptr->cancel();
0083 
0084         if (ec)
0085             return complete();
0086 
0087         dispatch(control_code, first, last);
0088     }
0089 
0090     void operator()(on_disconnect, error_code ec) {
0091         if (ec)
0092             return complete();
0093 
0094         perform();
0095     }
0096 
0097 private:
0098     void dispatch(
0099         uint8_t control_byte,
0100         byte_citer first, byte_citer last
0101     ) {
0102         auto code = control_code_e(control_byte & 0b11110000);
0103 
0104         switch (code) {
0105             case control_code_e::publish: {
0106                 auto msg = decoders::decode_publish(
0107                     control_byte, static_cast<uint32_t>(std::distance(first, last)), first
0108                 );
0109                 if (!msg.has_value())
0110                     return on_malformed_packet(
0111                         "Malformed PUBLISH received: cannot decode"
0112                     );
0113 
0114                 publish_rec_op { _svc_ptr }.perform(std::move(*msg));
0115             }
0116             break;
0117             case control_code_e::disconnect: {
0118                 auto rv = decoders::decode_disconnect(
0119                     static_cast<uint32_t>(std::distance(first, last)), first
0120                 );
0121                 if (!rv.has_value())
0122                     return on_malformed_packet(
0123                         "Malformed DISCONNECT received: cannot decode"
0124                     );
0125 
0126                 const auto& [rc, props] = *rv;
0127                 _svc_ptr->log().at_disconnect(
0128                     to_reason_code<reason_codes::category::disconnect>(rc)
0129                         .value_or(reason_codes::unspecified_error),
0130                     props
0131                 );
0132                 return _svc_ptr->async_shutdown(
0133                     asio::prepend(std::move(*this), on_disconnect {})
0134                 );
0135             }
0136             break;
0137             case control_code_e::auth: {
0138                 auto rv = decoders::decode_auth(
0139                     static_cast<uint32_t>(std::distance(first, last)), first
0140                 );
0141                 if (!rv.has_value())
0142                     return on_malformed_packet(
0143                         "Malformed AUTH received: cannot decode"
0144                     );
0145 
0146                 re_auth_op { _svc_ptr }.perform(std::move(*rv));
0147             }
0148             break;
0149             default:
0150                 BOOST_ASSERT(false);
0151         }
0152 
0153         perform();
0154     }
0155 
0156     void on_malformed_packet(const std::string& reason) {
0157         auto props = disconnect_props {};
0158         props[prop::reason_string] = reason;
0159         auto svc_ptr = _svc_ptr; // copy before this is moved
0160 
0161         async_disconnect(
0162             disconnect_rc_e::malformed_packet, props, svc_ptr,
0163             asio::prepend(std::move(*this), on_disconnect {})
0164         );
0165     }
0166 
0167     void complete() {
0168         return std::move(_handler)();
0169     }
0170 };
0171 
0172 
0173 } // end namespace boost::mqtt5::detail
0174 
0175 #endif // !BOOST_MQTT5_READ_MESSAGE_OP_HPP