Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-17 08:38:21

0001 //
0002 // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
0003 //
0004 // Distributed under the Boost Software License, Version 1.0.
0005 // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MQTT5_PUBLISH_REC_OP_HPP
0009 #define BOOST_MQTT5_PUBLISH_REC_OP_HPP
0010 
0011 #include <boost/mqtt5/error.hpp>
0012 #include <boost/mqtt5/property_types.hpp>
0013 #include <boost/mqtt5/reason_codes.hpp>
0014 #include <boost/mqtt5/types.hpp>
0015 
0016 #include <boost/mqtt5/detail/control_packet.hpp>
0017 #include <boost/mqtt5/detail/internal_types.hpp>
0018 
0019 #include <boost/mqtt5/impl/codecs/message_decoders.hpp>
0020 #include <boost/mqtt5/impl/codecs/message_encoders.hpp>
0021 #include <boost/mqtt5/impl/disconnect_op.hpp>
0022 
0023 #include <boost/asio/consign.hpp>
0024 #include <boost/asio/detached.hpp>
0025 #include <boost/asio/prepend.hpp>
0026 #include <boost/asio/recycling_allocator.hpp>
0027 
0028 #include <cstdint>
0029 #include <memory>
0030 #include <string>
0031 
0032 namespace boost::mqtt5::detail {
0033 
0034 namespace asio = boost::asio;
0035 
0036 template <typename ClientService>
0037 class publish_rec_op {
0038     using client_service = ClientService;
0039 
0040     struct on_puback {};
0041     struct on_pubrec {};
0042     struct on_pubrel {};
0043     struct on_pubcomp {};
0044 
0045     std::shared_ptr<client_service> _svc_ptr;
0046     decoders::publish_message _message;
0047 
0048 public:
0049     explicit publish_rec_op(std::shared_ptr<client_service> svc_ptr) :
0050         _svc_ptr(std::move(svc_ptr))
0051     {}
0052 
0053     publish_rec_op(publish_rec_op&&) noexcept = default;
0054     publish_rec_op(const publish_rec_op&) = delete;
0055 
0056     publish_rec_op& operator=(publish_rec_op&&) noexcept = default;
0057     publish_rec_op& operator=(const publish_rec_op&) = delete;
0058 
0059     using allocator_type = asio::recycling_allocator<void>;
0060     allocator_type get_allocator() const noexcept {
0061         return allocator_type {};
0062     }
0063 
0064     using executor_type = typename client_service::executor_type;
0065     executor_type get_executor() const noexcept {
0066         return _svc_ptr->get_executor();
0067     }
0068 
0069     void perform(decoders::publish_message message) {
0070         auto flags = std::get<2>(message);
0071         auto qos_bits = (flags >> 1) & 0b11;
0072         if (qos_bits == 0b11)
0073             return on_malformed_packet(
0074                 "Malformed PUBLISH received: QoS bits set to 0b11"
0075             );
0076 
0077         auto qos = qos_e(qos_bits);
0078         _message = std::move(message);
0079 
0080         if (qos == qos_e::at_most_once)
0081             return complete();
0082 
0083         auto packet_id = std::get<1>(_message);
0084 
0085         if (qos == qos_e::at_least_once) {
0086             auto puback = control_packet<allocator_type>::of(
0087                 with_pid, get_allocator(),
0088                 encoders::encode_puback, *packet_id,
0089                 uint8_t(0), puback_props {}
0090             );
0091             return send_puback(std::move(puback));
0092         }
0093 
0094         // qos == qos_e::exactly_once
0095         auto pubrec = control_packet<allocator_type>::of(
0096             with_pid, get_allocator(),
0097             encoders::encode_pubrec, *packet_id,
0098             uint8_t(0), pubrec_props {}
0099         );
0100 
0101         return send_pubrec(std::move(pubrec));
0102     }
0103 
0104     void send_puback(control_packet<allocator_type> puback) {
0105         auto wire_data = puback.wire_data();
0106         _svc_ptr->async_send(
0107             wire_data,
0108             no_serial, send_flag::none,
0109             asio::consign(
0110                 asio::prepend(std::move(*this), on_puback {}),
0111                 std::move(puback)
0112             )
0113         );
0114     }
0115 
0116     void operator()(on_puback, error_code ec) {
0117         if (ec)
0118             return;
0119 
0120         complete();
0121     }
0122 
0123     void send_pubrec(control_packet<allocator_type> pubrec) {
0124         auto wire_data = pubrec.wire_data();
0125         _svc_ptr->async_send(
0126             wire_data,
0127             no_serial, send_flag::none,
0128             asio::prepend(std::move(*this), on_pubrec {}, std::move(pubrec))
0129         );
0130     }
0131 
0132     void operator()(
0133         on_pubrec, control_packet<allocator_type> packet,
0134         error_code ec
0135     ) {
0136         if (ec)
0137             return;
0138 
0139         wait_pubrel(packet.packet_id());
0140     }
0141 
0142     void wait_pubrel(uint16_t packet_id) {
0143         _svc_ptr->async_wait_reply(
0144             control_code_e::pubrel, packet_id,
0145             asio::prepend(std::move(*this), on_pubrel {}, packet_id)
0146         );
0147     }
0148 
0149     void operator()(
0150         on_pubrel, uint16_t packet_id,
0151         error_code ec, byte_citer first, byte_citer last
0152     ) {
0153         if (ec == asio::error::try_again) // "resend unanswered"
0154             return wait_pubrel(packet_id);
0155 
0156         if (ec)
0157             return;
0158 
0159         auto pubrel = decoders::decode_pubrel(static_cast<uint32_t>(std::distance(first, last)), first);
0160         if (!pubrel.has_value()) {
0161             on_malformed_packet("Malformed PUBREL received: cannot decode");
0162             return wait_pubrel(packet_id);
0163         }
0164 
0165         auto& [reason_code, props] = *pubrel;
0166         auto rc = to_reason_code<reason_codes::category::pubrel>(reason_code);
0167         if (!rc) {
0168             on_malformed_packet("Malformed PUBREL received: invalid Reason Code");
0169             return wait_pubrel(packet_id);
0170         }
0171 
0172         auto pubcomp = control_packet<allocator_type>::of(
0173             with_pid, get_allocator(),
0174             encoders::encode_pubcomp, packet_id,
0175             uint8_t(0), pubcomp_props{}
0176         );
0177         send_pubcomp(std::move(pubcomp));
0178     }
0179 
0180     void send_pubcomp(control_packet<allocator_type> pubcomp) {
0181         auto wire_data = pubcomp.wire_data();
0182         _svc_ptr->async_send(
0183             wire_data,
0184             no_serial, send_flag::none,
0185             asio::prepend(std::move(*this), on_pubcomp {}, std::move(pubcomp))
0186         );
0187     }
0188 
0189     void operator()(
0190         on_pubcomp, control_packet<allocator_type> packet,
0191         error_code ec
0192     ) {
0193         if (ec == asio::error::try_again)
0194             return wait_pubrel(packet.packet_id());
0195 
0196         if (ec)
0197             return;
0198 
0199         complete();
0200     }
0201 
0202 private:
0203     void on_malformed_packet(const std::string& reason) {
0204         auto props = disconnect_props {};
0205         props[prop::reason_string] = reason;
0206         return async_disconnect(
0207             disconnect_rc_e::malformed_packet, props,
0208             _svc_ptr, asio::detached
0209         );
0210     }
0211 
0212     void complete() {
0213         /* auto rv = */_svc_ptr->channel_store(std::move(_message));
0214     }
0215 };
0216 
0217 } // end namespace boost::mqtt5::detail
0218 
0219 #endif // !BOOST_MQTT5_PUBLISH_REC_OP_HPP