File indexing completed on 2025-09-17 08:38:21
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MQTT5_PUBLISH_SEND_OP_HPP
0009 #define BOOST_MQTT5_PUBLISH_SEND_OP_HPP
0010
0011 #include <boost/mqtt5/error.hpp>
0012 #include <boost/mqtt5/reason_codes.hpp>
0013 #include <boost/mqtt5/types.hpp>
0014
0015 #include <boost/mqtt5/detail/cancellable_handler.hpp>
0016 #include <boost/mqtt5/detail/control_packet.hpp>
0017 #include <boost/mqtt5/detail/internal_types.hpp>
0018 #include <boost/mqtt5/detail/topic_validation.hpp>
0019 #include <boost/mqtt5/detail/utf8_mqtt.hpp>
0020
0021 #include <boost/mqtt5/impl/codecs/message_decoders.hpp>
0022 #include <boost/mqtt5/impl/codecs/message_encoders.hpp>
0023 #include <boost/mqtt5/impl/disconnect_op.hpp>
0024
0025 #include <boost/asio/associated_allocator.hpp>
0026 #include <boost/asio/associated_executor.hpp>
0027 #include <boost/asio/cancellation_type.hpp>
0028 #include <boost/asio/detached.hpp>
0029 #include <boost/asio/error.hpp>
0030 #include <boost/asio/prepend.hpp>
0031
0032 #include <cstdint>
0033 #include <memory>
0034 #include <string>
0035 #include <type_traits>
0036
0037 namespace boost::mqtt5::detail {
0038
0039 namespace asio = boost::asio;
0040
0041 template <qos_e qos_type>
0042 using on_publish_signature = std::conditional_t<
0043 qos_type == qos_e::at_most_once,
0044 void (error_code),
0045 std::conditional_t<
0046 qos_type == qos_e::at_least_once,
0047 void (error_code, reason_code, puback_props),
0048 void (error_code, reason_code, pubcomp_props)
0049 >
0050 >;
0051
0052 template <qos_e qos_type>
0053 using on_publish_props_type = std::conditional_t<
0054 qos_type == qos_e::at_most_once,
0055 void,
0056 std::conditional_t<
0057 qos_type == qos_e::at_least_once,
0058 puback_props,
0059 pubcomp_props
0060 >
0061 >;
0062
0063 template <typename ClientService, typename Handler, qos_e qos_type>
0064 class publish_send_op {
0065 using client_service = ClientService;
0066
0067 struct on_publish {};
0068 struct on_puback {};
0069 struct on_pubrec {};
0070 struct on_pubrel {};
0071 struct on_pubcomp {};
0072
0073 std::shared_ptr<client_service> _svc_ptr;
0074
0075 using handler_type = cancellable_handler<
0076 Handler,
0077 typename client_service::executor_type
0078 >;
0079 handler_type _handler;
0080
0081 serial_num_t _serial_num;
0082
0083 public:
0084 publish_send_op(
0085 std::shared_ptr<client_service> svc_ptr,
0086 Handler&& handler
0087 ) :
0088 _svc_ptr(std::move(svc_ptr)),
0089 _handler(std::move(handler), _svc_ptr->get_executor())
0090 {
0091 auto slot = asio::get_associated_cancellation_slot(_handler);
0092 if (slot.is_connected())
0093 slot.assign([&svc = *_svc_ptr](asio::cancellation_type_t) {
0094 svc.cancel();
0095 });
0096 }
0097
0098 publish_send_op(publish_send_op&&) = default;
0099 publish_send_op(const publish_send_op&) = delete;
0100
0101 publish_send_op& operator=(publish_send_op&&) = default;
0102 publish_send_op& operator=(const publish_send_op&) = delete;
0103
0104 using allocator_type = asio::associated_allocator_t<handler_type>;
0105 allocator_type get_allocator() const noexcept {
0106 return asio::get_associated_allocator(_handler);
0107 }
0108
0109 using executor_type = typename client_service::executor_type;
0110 executor_type get_executor() const noexcept {
0111 return _svc_ptr->get_executor();
0112 }
0113
0114 void perform(
0115 std::string topic, std::string payload,
0116 retain_e retain, const publish_props& props
0117 ) {
0118 uint16_t packet_id = 0;
0119 if constexpr (qos_type != qos_e::at_most_once) {
0120 packet_id = _svc_ptr->allocate_pid();
0121 if (packet_id == 0)
0122 return complete_immediate(client::error::pid_overrun, packet_id);
0123 }
0124
0125 auto ec = validate_publish(topic, payload, retain, props);
0126 if (ec)
0127 return complete_immediate(ec, packet_id);
0128
0129 _serial_num = _svc_ptr->next_serial_num();
0130
0131 auto publish = control_packet<allocator_type>::of(
0132 with_pid, get_allocator(),
0133 encoders::encode_publish, packet_id,
0134 std::move(topic), std::move(payload),
0135 qos_type, retain, dup_e::no, props
0136 );
0137
0138 auto max_packet_size = _svc_ptr->connack_property(prop::maximum_packet_size)
0139 .value_or(default_max_send_size);
0140 if (publish.size() > max_packet_size)
0141 return complete_immediate(client::error::packet_too_large, packet_id);
0142
0143 send_publish(std::move(publish));
0144 }
0145
0146 void send_publish(control_packet<allocator_type> publish) {
0147 auto wire_data = publish.wire_data();
0148 _svc_ptr->async_send(
0149 wire_data,
0150 _serial_num,
0151 send_flag::throttled * (qos_type != qos_e::at_most_once),
0152 asio::prepend(std::move(*this), on_publish {}, std::move(publish))
0153 );
0154 }
0155
0156 void resend_publish(control_packet<allocator_type> publish) {
0157 if (_handler.cancelled() != asio::cancellation_type_t::none)
0158 return complete(
0159 asio::error::operation_aborted, publish.packet_id()
0160 );
0161 send_publish(std::move(publish));
0162 }
0163
0164 void operator()(
0165 on_publish, control_packet<allocator_type> publish,
0166 error_code ec
0167 ) {
0168 if (ec == asio::error::try_again)
0169 return resend_publish(std::move(publish));
0170
0171 if constexpr (qos_type == qos_e::at_most_once)
0172 return complete(ec);
0173
0174 else {
0175 auto packet_id = publish.packet_id();
0176
0177 if (ec)
0178 return complete(ec, packet_id);
0179
0180 if constexpr (qos_type == qos_e::at_least_once)
0181 _svc_ptr->async_wait_reply(
0182 control_code_e::puback, packet_id,
0183 asio::prepend(
0184 std::move(*this), on_puback {}, std::move(publish)
0185 )
0186 );
0187
0188 else if constexpr (qos_type == qos_e::exactly_once)
0189 _svc_ptr->async_wait_reply(
0190 control_code_e::pubrec, packet_id,
0191 asio::prepend(
0192 std::move(*this), on_pubrec {}, std::move(publish)
0193 )
0194 );
0195 }
0196 }
0197
0198
0199 template <
0200 qos_e q = qos_type,
0201 std::enable_if_t<q == qos_e::at_least_once, bool> = true
0202 >
0203 void operator()(
0204 on_puback, control_packet<allocator_type> publish,
0205 error_code ec, byte_citer first, byte_citer last
0206 ) {
0207 if (ec == asio::error::try_again)
0208 return resend_publish(std::move(publish.set_dup()));
0209
0210 uint16_t packet_id = publish.packet_id();
0211
0212 if (ec)
0213 return complete(ec, packet_id);
0214
0215 auto puback = decoders::decode_puback(
0216 static_cast<uint32_t>(std::distance(first, last)), first
0217 );
0218 if (!puback.has_value()) {
0219 on_malformed_packet("Malformed PUBACK: cannot decode");
0220 return resend_publish(std::move(publish.set_dup()));
0221 }
0222
0223 auto& [reason_code, props] = *puback;
0224 auto rc = to_reason_code<reason_codes::category::puback>(reason_code);
0225 if (!rc) {
0226 on_malformed_packet("Malformed PUBACK: invalid Reason Code");
0227 return resend_publish(std::move(publish.set_dup()));
0228 }
0229
0230 complete(ec, packet_id, *rc, std::move(props));
0231 }
0232
0233 template <
0234 qos_e q = qos_type,
0235 std::enable_if_t<q == qos_e::exactly_once, bool> = true
0236 >
0237 void operator()(
0238 on_pubrec, control_packet<allocator_type> publish,
0239 error_code ec, byte_citer first, byte_citer last
0240 ) {
0241 if (ec == asio::error::try_again)
0242 return resend_publish(std::move(publish.set_dup()));
0243
0244 uint16_t packet_id = publish.packet_id();
0245
0246 if (ec)
0247 return complete(ec, packet_id);
0248
0249 auto pubrec = decoders::decode_pubrec(
0250 static_cast<uint32_t>(std::distance(first, last)), first
0251 );
0252 if (!pubrec.has_value()) {
0253 on_malformed_packet("Malformed PUBREC: cannot decode");
0254 return resend_publish(std::move(publish.set_dup()));
0255 }
0256
0257 auto& [reason_code, props] = *pubrec;
0258
0259 auto rc = to_reason_code<reason_codes::category::pubrec>(reason_code);
0260 if (!rc) {
0261 on_malformed_packet("Malformed PUBREC: invalid Reason Code");
0262 return resend_publish(std::move(publish.set_dup()));
0263 }
0264
0265 if (*rc)
0266 return complete(ec, packet_id, *rc);
0267
0268 auto pubrel = control_packet<allocator_type>::of(
0269 with_pid, get_allocator(),
0270 encoders::encode_pubrel, packet_id,
0271 0, pubrel_props {}
0272 );
0273
0274 send_pubrel(std::move(pubrel), false);
0275 }
0276
0277 void send_pubrel(control_packet<allocator_type> pubrel, bool throttled) {
0278 auto wire_data = pubrel.wire_data();
0279 _svc_ptr->async_send(
0280 wire_data,
0281 _serial_num,
0282 (send_flag::throttled * throttled) | send_flag::prioritized,
0283 asio::prepend(std::move(*this), on_pubrel {}, std::move(pubrel))
0284 );
0285 }
0286
0287 template <
0288 qos_e q = qos_type,
0289 std::enable_if_t<q == qos_e::exactly_once, bool> = true
0290 >
0291 void operator()(
0292 on_pubrel, control_packet<allocator_type> pubrel, error_code ec
0293 ) {
0294 if (ec == asio::error::try_again)
0295 return send_pubrel(std::move(pubrel), true);
0296
0297 uint16_t packet_id = pubrel.packet_id();
0298
0299 if (ec)
0300 return complete(ec, packet_id);
0301
0302 _svc_ptr->async_wait_reply(
0303 control_code_e::pubcomp, packet_id,
0304 asio::prepend(std::move(*this), on_pubcomp {}, std::move(pubrel))
0305 );
0306 }
0307
0308 template <
0309 qos_e q = qos_type,
0310 std::enable_if_t<q == qos_e::exactly_once, bool> = true
0311 >
0312 void operator()(
0313 on_pubcomp, control_packet<allocator_type> pubrel,
0314 error_code ec,
0315 byte_citer first, byte_citer last
0316 ) {
0317 if (ec == asio::error::try_again)
0318 return send_pubrel(std::move(pubrel), true);
0319
0320 uint16_t packet_id = pubrel.packet_id();
0321
0322 if (ec)
0323 return complete(ec, packet_id);
0324
0325 auto pubcomp = decoders::decode_pubcomp(
0326 static_cast<uint32_t>(std::distance(first, last)), first
0327 );
0328 if (!pubcomp.has_value()) {
0329 on_malformed_packet("Malformed PUBCOMP: cannot decode");
0330 return send_pubrel(std::move(pubrel), true);
0331 }
0332
0333 auto& [reason_code, props] = *pubcomp;
0334
0335 auto rc = to_reason_code<reason_codes::category::pubcomp>(reason_code);
0336 if (!rc) {
0337 on_malformed_packet("Malformed PUBCOMP: invalid Reason Code");
0338 return send_pubrel(std::move(pubrel), true);
0339 }
0340
0341 return complete(ec, pubrel.packet_id(), *rc);
0342 }
0343
0344 private:
0345
0346 error_code validate_publish(
0347 const std::string& topic, const std::string& payload,
0348 retain_e retain, const publish_props& props
0349 ) const {
0350 constexpr uint8_t default_retain_available = 1;
0351 constexpr uint8_t default_maximum_qos = 2;
0352 constexpr uint8_t default_payload_format_ind = 0;
0353
0354 auto topic_name_valid = props[prop::topic_alias].has_value() ?
0355 validate_topic_alias_name(topic) == validation_result::valid :
0356 validate_topic_name(topic) == validation_result::valid
0357 ;
0358
0359 if (!topic_name_valid)
0360 return client::error::invalid_topic;
0361
0362 auto max_qos = _svc_ptr->connack_property(prop::maximum_qos)
0363 .value_or(default_maximum_qos);
0364 auto retain_available = _svc_ptr->connack_property(prop::retain_available)
0365 .value_or(default_retain_available);
0366
0367 if (uint8_t(qos_type) > max_qos)
0368 return client::error::qos_not_supported;
0369
0370 if (retain_available == 0 && retain == retain_e::yes)
0371 return client::error::retain_not_available;
0372
0373 auto payload_format_ind = props[prop::payload_format_indicator]
0374 .value_or(default_payload_format_ind);
0375 if (
0376 payload_format_ind == 1 &&
0377 validate_mqtt_utf8(payload) != validation_result::valid
0378 )
0379 return client::error::malformed_packet;
0380
0381 return validate_props(props);
0382 }
0383
0384 error_code validate_props(const publish_props& props) const {
0385 constexpr uint16_t default_topic_alias_max = 0;
0386
0387 const auto& topic_alias = props[prop::topic_alias];
0388 if (topic_alias) {
0389 auto topic_alias_max = _svc_ptr->connack_property(prop::topic_alias_maximum)
0390 .value_or(default_topic_alias_max);
0391
0392 if (topic_alias_max == 0 || *topic_alias > topic_alias_max)
0393 return client::error::topic_alias_maximum_reached;
0394 if (*topic_alias == 0 )
0395 return client::error::malformed_packet;
0396 }
0397
0398 const auto& response_topic = props[prop::response_topic];
0399 if (
0400 response_topic &&
0401 validate_topic_name(*response_topic) != validation_result::valid
0402 )
0403 return client::error::malformed_packet;
0404
0405 const auto& user_properties = props[prop::user_property];
0406 for (const auto& user_property: user_properties)
0407 if (!is_valid_string_pair(user_property))
0408 return client::error::malformed_packet;
0409
0410 if (!props[prop::subscription_identifier].empty())
0411 return client::error::malformed_packet;
0412
0413 const auto& content_type = props[prop::content_type];
0414 if (
0415 content_type &&
0416 validate_mqtt_utf8(*content_type) != validation_result::valid
0417 )
0418 return client::error::malformed_packet;
0419
0420 return error_code {};
0421 }
0422
0423 void on_malformed_packet(const std::string& reason) {
0424 auto props = disconnect_props {};
0425 props[prop::reason_string] = reason;
0426 async_disconnect(
0427 disconnect_rc_e::malformed_packet, props, _svc_ptr,
0428 asio::detached
0429 );
0430 }
0431
0432 template <
0433 qos_e q = qos_type,
0434 std::enable_if_t<q == qos_e::at_most_once, bool> = true
0435 >
0436 void complete(error_code ec, uint16_t = 0) {
0437 _handler.complete(ec);
0438 }
0439
0440 template <
0441 qos_e q = qos_type,
0442 std::enable_if_t<q == qos_e::at_most_once, bool> = true
0443 >
0444 void complete_immediate(error_code ec, uint16_t) {
0445 _handler.complete_immediate(ec);
0446 }
0447
0448 template <
0449 typename Props = on_publish_props_type<qos_type>,
0450 std::enable_if_t<
0451 std::is_same_v<Props, puback_props> ||
0452 std::is_same_v<Props, pubcomp_props>,
0453 bool
0454 > = true
0455 >
0456 void complete(
0457 error_code ec, uint16_t packet_id,
0458 reason_code rc = reason_codes::empty, Props&& props = Props {}
0459 ) {
0460 _svc_ptr->free_pid(packet_id, true);
0461 _handler.complete(ec, rc, std::forward<Props>(props));
0462 }
0463
0464 template <
0465 typename Props = on_publish_props_type<qos_type>,
0466 std::enable_if_t<
0467 std::is_same_v<Props, puback_props> ||
0468 std::is_same_v<Props, pubcomp_props>,
0469 bool
0470 > = true
0471 >
0472 void complete_immediate(error_code ec, uint16_t packet_id) {
0473 if (packet_id != 0)
0474 _svc_ptr->free_pid(packet_id, false);
0475 _handler.complete_immediate(ec, reason_codes::empty, Props {});
0476 }
0477 };
0478
0479 template <typename ClientService, qos_e qos_type>
0480 class initiate_async_publish {
0481 std::shared_ptr<ClientService> _svc_ptr;
0482 public:
0483 explicit initiate_async_publish(std::shared_ptr<ClientService> svc_ptr) :
0484 _svc_ptr(std::move(svc_ptr))
0485 {}
0486
0487 using executor_type = typename ClientService::executor_type;
0488 executor_type get_executor() const noexcept {
0489 return _svc_ptr->get_executor();
0490 }
0491
0492 template <typename Handler>
0493 void operator()(
0494 Handler&& handler,
0495 std::string topic, std::string payload,
0496 retain_e retain, const publish_props& props
0497 ) {
0498 detail::publish_send_op<ClientService, Handler, qos_type> {
0499 _svc_ptr, std::move(handler)
0500 }.perform(
0501 std::move(topic), std::move(payload), retain, props
0502 );
0503 }
0504 };
0505
0506 }
0507
0508 #endif