File indexing completed on 2025-09-17 08:38:22
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MQTT5_UNSUBSCRIBE_OP_HPP
0009 #define BOOST_MQTT5_UNSUBSCRIBE_OP_HPP
0010
0011 #include <boost/mqtt5/error.hpp>
0012 #include <boost/mqtt5/reason_codes.hpp>
0013 #include <boost/mqtt5/types.hpp>
0014
0015 #include <boost/mqtt5/detail/cancellable_handler.hpp>
0016 #include <boost/mqtt5/detail/control_packet.hpp>
0017 #include <boost/mqtt5/detail/internal_types.hpp>
0018 #include <boost/mqtt5/detail/topic_validation.hpp>
0019
0020 #include <boost/mqtt5/impl/codecs/message_decoders.hpp>
0021 #include <boost/mqtt5/impl/codecs/message_encoders.hpp>
0022 #include <boost/mqtt5/impl/disconnect_op.hpp>
0023
0024 #include <boost/asio/associated_allocator.hpp>
0025 #include <boost/asio/associated_cancellation_slot.hpp>
0026 #include <boost/asio/associated_executor.hpp>
0027 #include <boost/asio/cancellation_type.hpp>
0028 #include <boost/asio/detached.hpp>
0029 #include <boost/asio/error.hpp>
0030 #include <boost/asio/prepend.hpp>
0031
0032 #include <cstdint>
0033 #include <memory>
0034 #include <string>
0035 #include <vector>
0036
0037 namespace boost::mqtt5::detail {
0038
0039 namespace asio = boost::asio;
0040
0041 template <typename ClientService, typename Handler>
0042 class unsubscribe_op {
0043 using client_service = ClientService;
0044
0045 struct on_unsubscribe {};
0046 struct on_unsuback {};
0047
0048 std::shared_ptr<client_service> _svc_ptr;
0049
0050 using handler_type = cancellable_handler<
0051 Handler,
0052 typename client_service::executor_type
0053 >;
0054 handler_type _handler;
0055
0056 size_t _num_topics { 0 };
0057
0058 public:
0059 unsubscribe_op(
0060 std::shared_ptr<client_service> svc_ptr,
0061 Handler&& handler
0062 ) :
0063 _svc_ptr(std::move(svc_ptr)),
0064 _handler(std::move(handler), _svc_ptr->get_executor())
0065 {
0066 auto slot = asio::get_associated_cancellation_slot(_handler);
0067 if (slot.is_connected())
0068 slot.assign([&svc = *_svc_ptr](asio::cancellation_type_t) {
0069 svc.cancel();
0070 });
0071 }
0072
0073 unsubscribe_op(unsubscribe_op&&) = default;
0074 unsubscribe_op(const unsubscribe_op&) = delete;
0075
0076 unsubscribe_op& operator=(unsubscribe_op&&) = default;
0077 unsubscribe_op& operator=(const unsubscribe_op&) = delete;
0078
0079 using allocator_type = asio::associated_allocator_t<handler_type>;
0080 allocator_type get_allocator() const noexcept {
0081 return asio::get_associated_allocator(_handler);
0082 }
0083
0084 using executor_type = typename client_service::executor_type;
0085 executor_type get_executor() const noexcept {
0086 return _svc_ptr->get_executor();
0087 }
0088
0089 void perform(
0090 const std::vector<std::string>& topics,
0091 const unsubscribe_props& props
0092 ) {
0093 _num_topics = topics.size();
0094
0095 uint16_t packet_id = _svc_ptr->allocate_pid();
0096 if (packet_id == 0)
0097 return complete_immediate(client::error::pid_overrun, packet_id);
0098
0099 if (_num_topics == 0)
0100 return complete_immediate(client::error::invalid_topic, packet_id);
0101
0102 auto ec = validate_unsubscribe(topics, props);
0103 if (ec)
0104 return complete_immediate(ec, packet_id);
0105
0106 auto unsubscribe = control_packet<allocator_type>::of(
0107 with_pid, get_allocator(),
0108 encoders::encode_unsubscribe, packet_id,
0109 topics, props
0110 );
0111
0112 auto max_packet_size = _svc_ptr->connack_property(prop::maximum_packet_size)
0113 .value_or(default_max_send_size);
0114 if (unsubscribe.size() > max_packet_size)
0115 return complete_immediate(client::error::packet_too_large, packet_id);
0116
0117 send_unsubscribe(std::move(unsubscribe));
0118 }
0119
0120 void send_unsubscribe(control_packet<allocator_type> unsubscribe) {
0121 auto wire_data = unsubscribe.wire_data();
0122 _svc_ptr->async_send(
0123 wire_data,
0124 no_serial, send_flag::none,
0125 asio::prepend(
0126 std::move(*this), on_unsubscribe {}, std::move(unsubscribe)
0127 )
0128 );
0129 }
0130
0131 void resend_unsubscribe(control_packet<allocator_type> subscribe) {
0132 if (_handler.cancelled() != asio::cancellation_type_t::none)
0133 return complete(
0134 asio::error::operation_aborted, subscribe.packet_id()
0135 );
0136 send_unsubscribe(std::move(subscribe));
0137 }
0138
0139 void operator()(
0140 on_unsubscribe, control_packet<allocator_type> packet,
0141 error_code ec
0142 ) {
0143 if (ec == asio::error::try_again)
0144 return resend_unsubscribe(std::move(packet));
0145
0146 auto packet_id = packet.packet_id();
0147
0148 if (ec)
0149 return complete(ec, packet_id);
0150
0151 _svc_ptr->async_wait_reply(
0152 control_code_e::unsuback, packet_id,
0153 asio::prepend(std::move(*this), on_unsuback {}, std::move(packet))
0154 );
0155 }
0156
0157 void operator()(
0158 on_unsuback, control_packet<allocator_type> packet,
0159 error_code ec, byte_citer first, byte_citer last
0160 ) {
0161 if (ec == asio::error::try_again)
0162 return resend_unsubscribe(std::move(packet));
0163
0164 uint16_t packet_id = packet.packet_id();
0165
0166 if (ec)
0167 return complete(ec, packet_id);
0168
0169 auto unsuback = decoders::decode_unsuback(
0170 static_cast<uint32_t>(std::distance(first, last)), first
0171 );
0172 if (!unsuback.has_value()) {
0173 on_malformed_packet("Malformed UNSUBACK: cannot decode");
0174 return resend_unsubscribe(std::move(packet));
0175 }
0176
0177 auto& [props, rcs] = *unsuback;
0178 auto reason_codes = to_reason_codes(std::move(rcs));
0179 if (reason_codes.size() != _num_topics) {
0180 on_malformed_packet(
0181 "Malformed UNSUBACK: does not contain a "
0182 "valid Reason Code for every Topic Filter"
0183 );
0184 return resend_unsubscribe(std::move(packet));
0185 }
0186
0187 complete(
0188 ec, packet_id, std::move(reason_codes), std::move(props)
0189 );
0190 }
0191
0192 private:
0193
0194 static error_code validate_unsubscribe(
0195 const std::vector<std::string>& topics,
0196 const unsubscribe_props& props
0197 ) {
0198 for (const auto& topic : topics)
0199 if (validate_topic_filter(topic) != validation_result::valid)
0200 return client::error::invalid_topic;
0201
0202 const auto& user_properties = props[prop::user_property];
0203 for (const auto& user_property: user_properties)
0204 if (!is_valid_string_pair(user_property))
0205 return client::error::malformed_packet;
0206 return error_code {};
0207 }
0208
0209 static std::vector<reason_code> to_reason_codes(std::vector<uint8_t> codes) {
0210 std::vector<reason_code> ret;
0211 for (uint8_t code : codes) {
0212 auto rc = to_reason_code<reason_codes::category::unsuback>(code);
0213 if (rc)
0214 ret.push_back(*rc);
0215 }
0216 return ret;
0217 }
0218
0219 void on_malformed_packet(
0220 const std::string& reason
0221 ) {
0222 auto props = disconnect_props {};
0223 props[prop::reason_string] = reason;
0224 async_disconnect(
0225 disconnect_rc_e::malformed_packet, props, _svc_ptr,
0226 asio::detached
0227 );
0228 }
0229
0230 void complete_immediate(error_code ec, uint16_t packet_id) {
0231 if (packet_id != 0)
0232 _svc_ptr->free_pid(packet_id);
0233 _handler.complete_immediate(
0234 ec, std::vector<reason_code>(_num_topics, reason_codes::empty),
0235 unsuback_props {}
0236 );
0237 }
0238
0239 void complete(
0240 error_code ec, uint16_t packet_id,
0241 std::vector<reason_code> reason_codes = {}, unsuback_props props = {}
0242 ) {
0243 if (reason_codes.empty() && _num_topics)
0244 reason_codes = std::vector<reason_code>(_num_topics, reason_codes::empty);
0245
0246 _svc_ptr->free_pid(packet_id);
0247 _handler.complete(ec, std::move(reason_codes), std::move(props));
0248 }
0249 };
0250
0251 template <typename ClientService>
0252 class initiate_async_unsubscribe {
0253 std::shared_ptr<ClientService> _svc_ptr;
0254 public:
0255 explicit initiate_async_unsubscribe(std::shared_ptr<ClientService> svc_ptr) :
0256 _svc_ptr(std::move(svc_ptr))
0257 {}
0258
0259 using executor_type = typename ClientService::executor_type;
0260 executor_type get_executor() const noexcept {
0261 return _svc_ptr->get_executor();
0262 }
0263
0264 template <typename Handler>
0265 void operator()(
0266 Handler&& handler,
0267 const std::vector<std::string>& topics, const unsubscribe_props& props
0268 ) {
0269 detail::unsubscribe_op { _svc_ptr, std::move(handler) }
0270 .perform(topics, props);
0271 }
0272 };
0273
0274
0275 }
0276
0277 #endif