Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-17 08:38:21

0001 //
0002 // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
0003 //
0004 // Distributed under the Boost Software License, Version 1.0.
0005 // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MQTT5_PUBLISH_SEND_OP_HPP
0009 #define BOOST_MQTT5_PUBLISH_SEND_OP_HPP
0010 
0011 #include <boost/mqtt5/error.hpp>
0012 #include <boost/mqtt5/reason_codes.hpp>
0013 #include <boost/mqtt5/types.hpp>
0014 
0015 #include <boost/mqtt5/detail/cancellable_handler.hpp>
0016 #include <boost/mqtt5/detail/control_packet.hpp>
0017 #include <boost/mqtt5/detail/internal_types.hpp>
0018 #include <boost/mqtt5/detail/topic_validation.hpp>
0019 #include <boost/mqtt5/detail/utf8_mqtt.hpp>
0020 
0021 #include <boost/mqtt5/impl/codecs/message_decoders.hpp>
0022 #include <boost/mqtt5/impl/codecs/message_encoders.hpp>
0023 #include <boost/mqtt5/impl/disconnect_op.hpp>
0024 
0025 #include <boost/asio/associated_allocator.hpp>
0026 #include <boost/asio/associated_executor.hpp>
0027 #include <boost/asio/cancellation_type.hpp>
0028 #include <boost/asio/detached.hpp>
0029 #include <boost/asio/error.hpp>
0030 #include <boost/asio/prepend.hpp>
0031 
0032 #include <cstdint>
0033 #include <memory>
0034 #include <string>
0035 #include <type_traits>
0036 
0037 namespace boost::mqtt5::detail {
0038 
0039 namespace asio = boost::asio;
0040 
0041 template <qos_e qos_type>
0042 using on_publish_signature = std::conditional_t<
0043     qos_type == qos_e::at_most_once,
0044         void (error_code),
0045         std::conditional_t<
0046             qos_type == qos_e::at_least_once,
0047                 void (error_code, reason_code, puback_props),
0048                 void (error_code, reason_code, pubcomp_props)
0049         >
0050 >;
0051 
0052 template <qos_e qos_type>
0053 using on_publish_props_type = std::conditional_t<
0054     qos_type == qos_e::at_most_once,
0055         void,
0056         std::conditional_t<
0057             qos_type == qos_e::at_least_once,
0058                 puback_props,
0059                 pubcomp_props
0060         >
0061 >;
0062 
0063 template <typename ClientService, typename Handler, qos_e qos_type>
0064 class publish_send_op {
0065     using client_service = ClientService;
0066 
0067     struct on_publish {};
0068     struct on_puback {};
0069     struct on_pubrec {};
0070     struct on_pubrel {};
0071     struct on_pubcomp {};
0072 
0073     std::shared_ptr<client_service> _svc_ptr;
0074 
0075     using handler_type = cancellable_handler<
0076         Handler,
0077         typename client_service::executor_type
0078     >;
0079     handler_type _handler;
0080 
0081     serial_num_t _serial_num;
0082 
0083 public:
0084     publish_send_op(
0085         std::shared_ptr<client_service> svc_ptr,
0086         Handler&& handler
0087     ) :
0088         _svc_ptr(std::move(svc_ptr)),
0089         _handler(std::move(handler), _svc_ptr->get_executor())
0090     {
0091         auto slot = asio::get_associated_cancellation_slot(_handler);
0092         if (slot.is_connected())
0093             slot.assign([&svc = *_svc_ptr](asio::cancellation_type_t) {
0094                 svc.cancel();
0095             });
0096     }
0097 
0098     publish_send_op(publish_send_op&&) = default;
0099     publish_send_op(const publish_send_op&) = delete;
0100 
0101     publish_send_op& operator=(publish_send_op&&) = default;
0102     publish_send_op& operator=(const publish_send_op&) = delete;
0103 
0104     using allocator_type = asio::associated_allocator_t<handler_type>;
0105     allocator_type get_allocator() const noexcept {
0106         return asio::get_associated_allocator(_handler);
0107     }
0108 
0109     using executor_type = typename client_service::executor_type;
0110     executor_type get_executor() const noexcept {
0111         return _svc_ptr->get_executor();
0112     }
0113 
0114     void perform(
0115         std::string topic, std::string payload,
0116         retain_e retain, const publish_props& props
0117     ) {
0118         uint16_t packet_id = 0;
0119         if constexpr (qos_type != qos_e::at_most_once) {
0120             packet_id = _svc_ptr->allocate_pid();
0121             if (packet_id == 0)
0122                 return complete_immediate(client::error::pid_overrun, packet_id);
0123         }
0124 
0125         auto ec = validate_publish(topic, payload, retain, props);
0126         if (ec)
0127             return complete_immediate(ec, packet_id);
0128 
0129         _serial_num = _svc_ptr->next_serial_num();
0130 
0131         auto publish = control_packet<allocator_type>::of(
0132             with_pid, get_allocator(),
0133             encoders::encode_publish, packet_id,
0134             std::move(topic), std::move(payload),
0135             qos_type, retain, dup_e::no, props
0136         );
0137 
0138         auto max_packet_size = _svc_ptr->connack_property(prop::maximum_packet_size)
0139                 .value_or(default_max_send_size);
0140         if (publish.size() > max_packet_size)
0141             return complete_immediate(client::error::packet_too_large, packet_id);
0142 
0143         send_publish(std::move(publish));
0144     }
0145 
0146     void send_publish(control_packet<allocator_type> publish) {
0147         auto wire_data = publish.wire_data();
0148         _svc_ptr->async_send(
0149             wire_data,
0150             _serial_num,
0151             send_flag::throttled * (qos_type != qos_e::at_most_once),
0152             asio::prepend(std::move(*this), on_publish {}, std::move(publish))
0153         );
0154     }
0155 
0156     void resend_publish(control_packet<allocator_type> publish) {
0157         if (_handler.cancelled() != asio::cancellation_type_t::none)
0158             return complete(
0159                 asio::error::operation_aborted, publish.packet_id()
0160             );
0161         send_publish(std::move(publish));
0162     }
0163 
0164     void operator()(
0165         on_publish, control_packet<allocator_type> publish,
0166         error_code ec
0167     ) {
0168         if (ec == asio::error::try_again)
0169             return resend_publish(std::move(publish));
0170 
0171         if constexpr (qos_type == qos_e::at_most_once)
0172             return complete(ec);
0173 
0174         else {
0175             auto packet_id = publish.packet_id();
0176 
0177             if (ec)
0178                 return complete(ec, packet_id);
0179 
0180             if constexpr (qos_type == qos_e::at_least_once)
0181                 _svc_ptr->async_wait_reply(
0182                     control_code_e::puback, packet_id,
0183                     asio::prepend(
0184                         std::move(*this), on_puback {}, std::move(publish)
0185                     )
0186                 );
0187 
0188             else if constexpr (qos_type == qos_e::exactly_once)
0189                 _svc_ptr->async_wait_reply(
0190                     control_code_e::pubrec, packet_id,
0191                     asio::prepend(
0192                         std::move(*this), on_pubrec {}, std::move(publish)
0193                     )
0194                 );
0195         }
0196     }
0197 
0198 
0199     template <
0200         qos_e q = qos_type,
0201         std::enable_if_t<q == qos_e::at_least_once, bool> = true
0202     >
0203     void operator()(
0204         on_puback, control_packet<allocator_type> publish,
0205         error_code ec, byte_citer first, byte_citer last
0206     ) {
0207         if (ec == asio::error::try_again) // "resend unanswered"
0208             return resend_publish(std::move(publish.set_dup()));
0209 
0210         uint16_t packet_id = publish.packet_id();
0211 
0212         if (ec)
0213             return complete(ec, packet_id);
0214 
0215         auto puback = decoders::decode_puback(
0216             static_cast<uint32_t>(std::distance(first, last)), first
0217         );
0218         if (!puback.has_value()) {
0219             on_malformed_packet("Malformed PUBACK: cannot decode");
0220             return resend_publish(std::move(publish.set_dup()));
0221         }
0222 
0223         auto& [reason_code, props] = *puback;
0224         auto rc = to_reason_code<reason_codes::category::puback>(reason_code);
0225         if (!rc) {
0226             on_malformed_packet("Malformed PUBACK: invalid Reason Code");
0227             return resend_publish(std::move(publish.set_dup()));
0228         }
0229 
0230         complete(ec, packet_id, *rc, std::move(props));
0231     }
0232 
0233     template <
0234         qos_e q = qos_type,
0235         std::enable_if_t<q == qos_e::exactly_once, bool> = true
0236     >
0237     void operator()(
0238         on_pubrec, control_packet<allocator_type> publish,
0239         error_code ec, byte_citer first, byte_citer last
0240     ) {
0241         if (ec == asio::error::try_again) // "resend unanswered"
0242             return resend_publish(std::move(publish.set_dup()));
0243 
0244         uint16_t packet_id = publish.packet_id();
0245 
0246         if (ec)
0247             return complete(ec, packet_id);
0248 
0249         auto pubrec = decoders::decode_pubrec(
0250             static_cast<uint32_t>(std::distance(first, last)), first
0251         );
0252         if (!pubrec.has_value()) {
0253             on_malformed_packet("Malformed PUBREC: cannot decode");
0254             return resend_publish(std::move(publish.set_dup()));
0255         }
0256 
0257         auto& [reason_code, props] = *pubrec;
0258 
0259         auto rc = to_reason_code<reason_codes::category::pubrec>(reason_code);
0260         if (!rc) {
0261             on_malformed_packet("Malformed PUBREC: invalid Reason Code");
0262             return resend_publish(std::move(publish.set_dup()));
0263         }
0264 
0265         if (*rc)
0266             return complete(ec, packet_id, *rc);
0267 
0268         auto pubrel = control_packet<allocator_type>::of(
0269             with_pid, get_allocator(),
0270             encoders::encode_pubrel, packet_id,
0271             0, pubrel_props {}
0272         );
0273 
0274         send_pubrel(std::move(pubrel), false);
0275     }
0276 
0277     void send_pubrel(control_packet<allocator_type> pubrel, bool throttled) {
0278         auto wire_data = pubrel.wire_data();
0279         _svc_ptr->async_send(
0280             wire_data,
0281             _serial_num,
0282             (send_flag::throttled * throttled) | send_flag::prioritized,
0283             asio::prepend(std::move(*this), on_pubrel {}, std::move(pubrel))
0284         );
0285     }
0286 
0287     template <
0288         qos_e q = qos_type,
0289         std::enable_if_t<q == qos_e::exactly_once, bool> = true
0290     >
0291     void operator()(
0292         on_pubrel, control_packet<allocator_type> pubrel, error_code ec
0293     ) {
0294         if (ec == asio::error::try_again)
0295             return send_pubrel(std::move(pubrel), true);
0296 
0297         uint16_t packet_id = pubrel.packet_id();
0298 
0299         if (ec)
0300             return complete(ec, packet_id);
0301 
0302         _svc_ptr->async_wait_reply(
0303             control_code_e::pubcomp, packet_id,
0304             asio::prepend(std::move(*this), on_pubcomp {}, std::move(pubrel))
0305         );
0306     }
0307 
0308     template <
0309         qos_e q = qos_type,
0310         std::enable_if_t<q == qos_e::exactly_once, bool> = true
0311     >
0312     void operator()(
0313         on_pubcomp, control_packet<allocator_type> pubrel,
0314         error_code ec,
0315         byte_citer first, byte_citer last
0316     ) {
0317         if (ec == asio::error::try_again) // "resend unanswered"
0318             return send_pubrel(std::move(pubrel), true);
0319 
0320         uint16_t packet_id = pubrel.packet_id();
0321 
0322         if (ec)
0323             return complete(ec, packet_id);
0324 
0325         auto pubcomp = decoders::decode_pubcomp(
0326             static_cast<uint32_t>(std::distance(first, last)), first
0327         );
0328         if (!pubcomp.has_value()) {
0329             on_malformed_packet("Malformed PUBCOMP: cannot decode");
0330             return send_pubrel(std::move(pubrel), true);
0331         }
0332 
0333         auto& [reason_code, props] = *pubcomp;
0334 
0335         auto rc = to_reason_code<reason_codes::category::pubcomp>(reason_code);
0336         if (!rc) {
0337             on_malformed_packet("Malformed PUBCOMP: invalid Reason Code");
0338             return send_pubrel(std::move(pubrel), true);
0339         }
0340 
0341         return complete(ec, pubrel.packet_id(), *rc);
0342     }
0343 
0344 private:
0345 
0346     error_code validate_publish(
0347         const std::string& topic, const std::string& payload,
0348         retain_e retain, const publish_props& props
0349     ) const {
0350         constexpr uint8_t default_retain_available = 1;
0351         constexpr uint8_t default_maximum_qos = 2;
0352         constexpr uint8_t default_payload_format_ind = 0;
0353 
0354         auto topic_name_valid = props[prop::topic_alias].has_value() ?
0355             validate_topic_alias_name(topic) == validation_result::valid :
0356             validate_topic_name(topic) == validation_result::valid
0357         ;
0358 
0359         if (!topic_name_valid)
0360             return client::error::invalid_topic;
0361 
0362         auto max_qos = _svc_ptr->connack_property(prop::maximum_qos)
0363             .value_or(default_maximum_qos);
0364         auto retain_available = _svc_ptr->connack_property(prop::retain_available)
0365             .value_or(default_retain_available);
0366 
0367         if (uint8_t(qos_type) > max_qos)
0368             return client::error::qos_not_supported;
0369 
0370         if (retain_available == 0 && retain == retain_e::yes)
0371             return client::error::retain_not_available;
0372 
0373         auto payload_format_ind = props[prop::payload_format_indicator]
0374             .value_or(default_payload_format_ind);
0375         if (
0376             payload_format_ind == 1 &&
0377             validate_mqtt_utf8(payload) != validation_result::valid
0378         )
0379             return client::error::malformed_packet;
0380 
0381         return validate_props(props);
0382     }
0383 
0384     error_code validate_props(const publish_props& props) const {
0385         constexpr uint16_t default_topic_alias_max = 0;
0386 
0387         const auto& topic_alias = props[prop::topic_alias];
0388         if (topic_alias) {
0389             auto topic_alias_max = _svc_ptr->connack_property(prop::topic_alias_maximum)
0390                 .value_or(default_topic_alias_max);
0391 
0392             if (topic_alias_max == 0 || *topic_alias > topic_alias_max)
0393                 return client::error::topic_alias_maximum_reached;
0394             if (*topic_alias == 0 )
0395                 return client::error::malformed_packet;
0396         }
0397 
0398         const auto& response_topic = props[prop::response_topic];
0399         if (
0400             response_topic &&
0401             validate_topic_name(*response_topic) != validation_result::valid
0402         )
0403             return client::error::malformed_packet;
0404 
0405         const auto& user_properties = props[prop::user_property];
0406         for (const auto& user_property: user_properties)
0407             if (!is_valid_string_pair(user_property))
0408                 return client::error::malformed_packet;
0409 
0410         if (!props[prop::subscription_identifier].empty())
0411             return client::error::malformed_packet;
0412 
0413         const auto& content_type = props[prop::content_type];
0414         if (
0415             content_type &&
0416             validate_mqtt_utf8(*content_type) != validation_result::valid
0417         )
0418             return client::error::malformed_packet;
0419 
0420         return error_code {};
0421     }
0422 
0423     void on_malformed_packet(const std::string& reason) {
0424         auto props = disconnect_props {};
0425         props[prop::reason_string] = reason;
0426         async_disconnect(
0427             disconnect_rc_e::malformed_packet, props, _svc_ptr,
0428             asio::detached
0429         );
0430     }
0431 
0432     template <
0433         qos_e q = qos_type,
0434         std::enable_if_t<q == qos_e::at_most_once, bool> = true
0435     >
0436     void complete(error_code ec, uint16_t = 0) {
0437         _handler.complete(ec);
0438     }
0439 
0440     template <
0441         qos_e q = qos_type,
0442         std::enable_if_t<q == qos_e::at_most_once, bool> = true
0443     >
0444     void complete_immediate(error_code ec, uint16_t) {
0445         _handler.complete_immediate(ec);
0446     }
0447 
0448     template <
0449         typename Props = on_publish_props_type<qos_type>,
0450         std::enable_if_t<
0451             std::is_same_v<Props, puback_props> ||
0452             std::is_same_v<Props, pubcomp_props>,
0453             bool
0454         > = true
0455     >
0456     void complete(
0457         error_code ec, uint16_t packet_id,
0458         reason_code rc = reason_codes::empty, Props&& props = Props {}
0459     ) {
0460         _svc_ptr->free_pid(packet_id, true);
0461         _handler.complete(ec, rc, std::forward<Props>(props));
0462     }
0463 
0464     template <
0465         typename Props = on_publish_props_type<qos_type>,
0466         std::enable_if_t<
0467             std::is_same_v<Props, puback_props> ||
0468             std::is_same_v<Props, pubcomp_props>,
0469             bool
0470         > = true
0471     >
0472     void complete_immediate(error_code ec, uint16_t packet_id) {
0473         if (packet_id != 0)
0474             _svc_ptr->free_pid(packet_id, false);
0475         _handler.complete_immediate(ec, reason_codes::empty, Props {});
0476     }
0477 };
0478 
0479 template <typename ClientService, qos_e qos_type>
0480 class initiate_async_publish {
0481     std::shared_ptr<ClientService> _svc_ptr;
0482 public:
0483     explicit initiate_async_publish(std::shared_ptr<ClientService> svc_ptr) :
0484         _svc_ptr(std::move(svc_ptr))
0485     {}
0486 
0487     using executor_type = typename ClientService::executor_type;
0488     executor_type get_executor() const noexcept {
0489         return _svc_ptr->get_executor();
0490     }
0491 
0492     template <typename Handler>
0493     void operator()(
0494         Handler&& handler,
0495         std::string topic, std::string payload,
0496         retain_e retain, const publish_props& props
0497     ) {
0498         detail::publish_send_op<ClientService, Handler, qos_type> {
0499             _svc_ptr, std::move(handler) 
0500         }.perform(
0501             std::move(topic), std::move(payload), retain, props
0502         );
0503     }
0504 };
0505 
0506 } // end namespace boost::mqtt5::detail
0507 
0508 #endif // !BOOST_MQTT5_PUBLISH_SEND_OP_HPP