File indexing completed on 2025-09-17 08:38:21
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MQTT5_READ_MESSAGE_OP_HPP
0009 #define BOOST_MQTT5_READ_MESSAGE_OP_HPP
0010
0011 #include <boost/mqtt5/reason_codes.hpp>
0012 #include <boost/mqtt5/types.hpp>
0013
0014 #include <boost/mqtt5/detail/control_packet.hpp>
0015
0016 #include <boost/mqtt5/impl/codecs/message_decoders.hpp>
0017 #include <boost/mqtt5/impl/disconnect_op.hpp>
0018 #include <boost/mqtt5/impl/publish_rec_op.hpp>
0019 #include <boost/mqtt5/impl/re_auth_op.hpp>
0020
0021 #include <boost/asio/error.hpp>
0022 #include <boost/asio/prepend.hpp>
0023 #include <boost/asio/recycling_allocator.hpp>
0024 #include <boost/assert.hpp>
0025
0026 #include <cstdint>
0027 #include <memory>
0028
0029 namespace boost::mqtt5::detail {
0030
0031 namespace asio = boost::asio;
0032
0033 template <typename ClientService, typename Handler>
0034 class read_message_op {
0035 using client_service = ClientService;
0036 using handler_type = Handler;
0037
0038 struct on_message {};
0039 struct on_disconnect {};
0040
0041 std::shared_ptr<client_service> _svc_ptr;
0042 handler_type _handler;
0043
0044 public:
0045 read_message_op(std::shared_ptr<client_service> svc_ptr, Handler&& handler)
0046 : _svc_ptr(std::move(svc_ptr)), _handler(std::move(handler))
0047 {}
0048
0049 read_message_op(read_message_op&&) noexcept = default;
0050 read_message_op(const read_message_op&) = delete;
0051
0052 read_message_op& operator=(read_message_op&&) noexcept = default;
0053 read_message_op& operator=(const read_message_op&) = delete;
0054
0055 using allocator_type = asio::associated_allocator_t<handler_type>;
0056 allocator_type get_allocator() const noexcept {
0057 return asio::get_associated_allocator(_handler);
0058 }
0059
0060 using executor_type = typename client_service::executor_type;
0061 executor_type get_executor() const noexcept {
0062 return _svc_ptr->get_executor();
0063 }
0064
0065 void perform() {
0066 _svc_ptr->async_assemble(
0067 asio::prepend(std::move(*this), on_message {})
0068 );
0069 }
0070
0071 void operator()(
0072 on_message, error_code ec,
0073 uint8_t control_code,
0074 byte_citer first, byte_citer last
0075 ) {
0076 if (ec == client::error::malformed_packet)
0077 return on_malformed_packet(
0078 "Malformed Packet received from the Server"
0079 );
0080
0081 if (ec == asio::error::no_recovery)
0082 _svc_ptr->cancel();
0083
0084 if (ec)
0085 return complete();
0086
0087 dispatch(control_code, first, last);
0088 }
0089
0090 void operator()(on_disconnect, error_code ec) {
0091 if (ec)
0092 return complete();
0093
0094 perform();
0095 }
0096
0097 private:
0098 void dispatch(
0099 uint8_t control_byte,
0100 byte_citer first, byte_citer last
0101 ) {
0102 auto code = control_code_e(control_byte & 0b11110000);
0103
0104 switch (code) {
0105 case control_code_e::publish: {
0106 auto msg = decoders::decode_publish(
0107 control_byte, static_cast<uint32_t>(std::distance(first, last)), first
0108 );
0109 if (!msg.has_value())
0110 return on_malformed_packet(
0111 "Malformed PUBLISH received: cannot decode"
0112 );
0113
0114 publish_rec_op { _svc_ptr }.perform(std::move(*msg));
0115 }
0116 break;
0117 case control_code_e::disconnect: {
0118 auto rv = decoders::decode_disconnect(
0119 static_cast<uint32_t>(std::distance(first, last)), first
0120 );
0121 if (!rv.has_value())
0122 return on_malformed_packet(
0123 "Malformed DISCONNECT received: cannot decode"
0124 );
0125
0126 const auto& [rc, props] = *rv;
0127 _svc_ptr->log().at_disconnect(
0128 to_reason_code<reason_codes::category::disconnect>(rc)
0129 .value_or(reason_codes::unspecified_error),
0130 props
0131 );
0132 return _svc_ptr->async_shutdown(
0133 asio::prepend(std::move(*this), on_disconnect {})
0134 );
0135 }
0136 break;
0137 case control_code_e::auth: {
0138 auto rv = decoders::decode_auth(
0139 static_cast<uint32_t>(std::distance(first, last)), first
0140 );
0141 if (!rv.has_value())
0142 return on_malformed_packet(
0143 "Malformed AUTH received: cannot decode"
0144 );
0145
0146 re_auth_op { _svc_ptr }.perform(std::move(*rv));
0147 }
0148 break;
0149 default:
0150 BOOST_ASSERT(false);
0151 }
0152
0153 perform();
0154 }
0155
0156 void on_malformed_packet(const std::string& reason) {
0157 auto props = disconnect_props {};
0158 props[prop::reason_string] = reason;
0159 auto svc_ptr = _svc_ptr;
0160
0161 async_disconnect(
0162 disconnect_rc_e::malformed_packet, props, svc_ptr,
0163 asio::prepend(std::move(*this), on_disconnect {})
0164 );
0165 }
0166
0167 void complete() {
0168 return std::move(_handler)();
0169 }
0170 };
0171
0172
0173 }
0174
0175 #endif