File indexing completed on 2025-09-17 08:38:21
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MQTT5_PUBLISH_REC_OP_HPP
0009 #define BOOST_MQTT5_PUBLISH_REC_OP_HPP
0010
0011 #include <boost/mqtt5/error.hpp>
0012 #include <boost/mqtt5/property_types.hpp>
0013 #include <boost/mqtt5/reason_codes.hpp>
0014 #include <boost/mqtt5/types.hpp>
0015
0016 #include <boost/mqtt5/detail/control_packet.hpp>
0017 #include <boost/mqtt5/detail/internal_types.hpp>
0018
0019 #include <boost/mqtt5/impl/codecs/message_decoders.hpp>
0020 #include <boost/mqtt5/impl/codecs/message_encoders.hpp>
0021 #include <boost/mqtt5/impl/disconnect_op.hpp>
0022
0023 #include <boost/asio/consign.hpp>
0024 #include <boost/asio/detached.hpp>
0025 #include <boost/asio/prepend.hpp>
0026 #include <boost/asio/recycling_allocator.hpp>
0027
0028 #include <cstdint>
0029 #include <memory>
0030 #include <string>
0031
0032 namespace boost::mqtt5::detail {
0033
0034 namespace asio = boost::asio;
0035
0036 template <typename ClientService>
0037 class publish_rec_op {
0038 using client_service = ClientService;
0039
0040 struct on_puback {};
0041 struct on_pubrec {};
0042 struct on_pubrel {};
0043 struct on_pubcomp {};
0044
0045 std::shared_ptr<client_service> _svc_ptr;
0046 decoders::publish_message _message;
0047
0048 public:
0049 explicit publish_rec_op(std::shared_ptr<client_service> svc_ptr) :
0050 _svc_ptr(std::move(svc_ptr))
0051 {}
0052
0053 publish_rec_op(publish_rec_op&&) noexcept = default;
0054 publish_rec_op(const publish_rec_op&) = delete;
0055
0056 publish_rec_op& operator=(publish_rec_op&&) noexcept = default;
0057 publish_rec_op& operator=(const publish_rec_op&) = delete;
0058
0059 using allocator_type = asio::recycling_allocator<void>;
0060 allocator_type get_allocator() const noexcept {
0061 return allocator_type {};
0062 }
0063
0064 using executor_type = typename client_service::executor_type;
0065 executor_type get_executor() const noexcept {
0066 return _svc_ptr->get_executor();
0067 }
0068
0069 void perform(decoders::publish_message message) {
0070 auto flags = std::get<2>(message);
0071 auto qos_bits = (flags >> 1) & 0b11;
0072 if (qos_bits == 0b11)
0073 return on_malformed_packet(
0074 "Malformed PUBLISH received: QoS bits set to 0b11"
0075 );
0076
0077 auto qos = qos_e(qos_bits);
0078 _message = std::move(message);
0079
0080 if (qos == qos_e::at_most_once)
0081 return complete();
0082
0083 auto packet_id = std::get<1>(_message);
0084
0085 if (qos == qos_e::at_least_once) {
0086 auto puback = control_packet<allocator_type>::of(
0087 with_pid, get_allocator(),
0088 encoders::encode_puback, *packet_id,
0089 uint8_t(0), puback_props {}
0090 );
0091 return send_puback(std::move(puback));
0092 }
0093
0094
0095 auto pubrec = control_packet<allocator_type>::of(
0096 with_pid, get_allocator(),
0097 encoders::encode_pubrec, *packet_id,
0098 uint8_t(0), pubrec_props {}
0099 );
0100
0101 return send_pubrec(std::move(pubrec));
0102 }
0103
0104 void send_puback(control_packet<allocator_type> puback) {
0105 auto wire_data = puback.wire_data();
0106 _svc_ptr->async_send(
0107 wire_data,
0108 no_serial, send_flag::none,
0109 asio::consign(
0110 asio::prepend(std::move(*this), on_puback {}),
0111 std::move(puback)
0112 )
0113 );
0114 }
0115
0116 void operator()(on_puback, error_code ec) {
0117 if (ec)
0118 return;
0119
0120 complete();
0121 }
0122
0123 void send_pubrec(control_packet<allocator_type> pubrec) {
0124 auto wire_data = pubrec.wire_data();
0125 _svc_ptr->async_send(
0126 wire_data,
0127 no_serial, send_flag::none,
0128 asio::prepend(std::move(*this), on_pubrec {}, std::move(pubrec))
0129 );
0130 }
0131
0132 void operator()(
0133 on_pubrec, control_packet<allocator_type> packet,
0134 error_code ec
0135 ) {
0136 if (ec)
0137 return;
0138
0139 wait_pubrel(packet.packet_id());
0140 }
0141
0142 void wait_pubrel(uint16_t packet_id) {
0143 _svc_ptr->async_wait_reply(
0144 control_code_e::pubrel, packet_id,
0145 asio::prepend(std::move(*this), on_pubrel {}, packet_id)
0146 );
0147 }
0148
0149 void operator()(
0150 on_pubrel, uint16_t packet_id,
0151 error_code ec, byte_citer first, byte_citer last
0152 ) {
0153 if (ec == asio::error::try_again)
0154 return wait_pubrel(packet_id);
0155
0156 if (ec)
0157 return;
0158
0159 auto pubrel = decoders::decode_pubrel(static_cast<uint32_t>(std::distance(first, last)), first);
0160 if (!pubrel.has_value()) {
0161 on_malformed_packet("Malformed PUBREL received: cannot decode");
0162 return wait_pubrel(packet_id);
0163 }
0164
0165 auto& [reason_code, props] = *pubrel;
0166 auto rc = to_reason_code<reason_codes::category::pubrel>(reason_code);
0167 if (!rc) {
0168 on_malformed_packet("Malformed PUBREL received: invalid Reason Code");
0169 return wait_pubrel(packet_id);
0170 }
0171
0172 auto pubcomp = control_packet<allocator_type>::of(
0173 with_pid, get_allocator(),
0174 encoders::encode_pubcomp, packet_id,
0175 uint8_t(0), pubcomp_props{}
0176 );
0177 send_pubcomp(std::move(pubcomp));
0178 }
0179
0180 void send_pubcomp(control_packet<allocator_type> pubcomp) {
0181 auto wire_data = pubcomp.wire_data();
0182 _svc_ptr->async_send(
0183 wire_data,
0184 no_serial, send_flag::none,
0185 asio::prepend(std::move(*this), on_pubcomp {}, std::move(pubcomp))
0186 );
0187 }
0188
0189 void operator()(
0190 on_pubcomp, control_packet<allocator_type> packet,
0191 error_code ec
0192 ) {
0193 if (ec == asio::error::try_again)
0194 return wait_pubrel(packet.packet_id());
0195
0196 if (ec)
0197 return;
0198
0199 complete();
0200 }
0201
0202 private:
0203 void on_malformed_packet(const std::string& reason) {
0204 auto props = disconnect_props {};
0205 props[prop::reason_string] = reason;
0206 return async_disconnect(
0207 disconnect_rc_e::malformed_packet, props,
0208 _svc_ptr, asio::detached
0209 );
0210 }
0211
0212 void complete() {
0213 _svc_ptr->channel_store(std::move(_message));
0214 }
0215 };
0216
0217 }
0218
0219 #endif