File indexing completed on 2025-09-17 08:38:22
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MQTT5_SUBSCRIBE_OP_HPP
0009 #define BOOST_MQTT5_SUBSCRIBE_OP_HPP
0010
0011 #include <boost/mqtt5/error.hpp>
0012 #include <boost/mqtt5/reason_codes.hpp>
0013 #include <boost/mqtt5/types.hpp>
0014
0015 #include <boost/mqtt5/detail/cancellable_handler.hpp>
0016 #include <boost/mqtt5/detail/control_packet.hpp>
0017 #include <boost/mqtt5/detail/internal_types.hpp>
0018 #include <boost/mqtt5/detail/topic_validation.hpp>
0019
0020 #include <boost/mqtt5/impl/codecs/message_decoders.hpp>
0021 #include <boost/mqtt5/impl/codecs/message_encoders.hpp>
0022 #include <boost/mqtt5/impl/disconnect_op.hpp>
0023
0024 #include <boost/asio/associated_allocator.hpp>
0025 #include <boost/asio/associated_cancellation_slot.hpp>
0026 #include <boost/asio/associated_executor.hpp>
0027 #include <boost/asio/cancellation_type.hpp>
0028 #include <boost/asio/detached.hpp>
0029 #include <boost/asio/error.hpp>
0030 #include <boost/asio/prepend.hpp>
0031
0032 #include <cstdint>
0033 #include <memory>
0034 #include <string>
0035 #include <vector>
0036
0037 namespace boost::mqtt5::detail {
0038
0039 namespace asio = boost::asio;
0040
0041 template <typename ClientService, typename Handler>
0042 class subscribe_op {
0043 using client_service = ClientService;
0044
0045 struct on_subscribe {};
0046 struct on_suback {};
0047
0048 std::shared_ptr<client_service> _svc_ptr;
0049
0050 using handler_type = cancellable_handler<
0051 Handler,
0052 typename client_service::executor_type
0053 >;
0054 handler_type _handler;
0055
0056 size_t _num_topics { 0 };
0057
0058 public:
0059 subscribe_op(
0060 std::shared_ptr<client_service> svc_ptr,
0061 Handler&& handler
0062 ) :
0063 _svc_ptr(std::move(svc_ptr)),
0064 _handler(std::move(handler), _svc_ptr->get_executor())
0065 {
0066 auto slot = asio::get_associated_cancellation_slot(_handler);
0067 if (slot.is_connected())
0068 slot.assign([&svc = *_svc_ptr](asio::cancellation_type_t) {
0069 svc.cancel();
0070 });
0071 }
0072
0073 subscribe_op(subscribe_op&&) = default;
0074 subscribe_op(const subscribe_op&) = delete;
0075
0076 subscribe_op& operator=(subscribe_op&&) = default;
0077 subscribe_op& operator=(const subscribe_op&) = delete;
0078
0079 using allocator_type = asio::associated_allocator_t<handler_type>;
0080 allocator_type get_allocator() const noexcept {
0081 return asio::get_associated_allocator(_handler);
0082 }
0083
0084 using executor_type = typename client_service::executor_type;
0085 executor_type get_executor() const noexcept {
0086 return _svc_ptr->get_executor();
0087 }
0088
0089 void perform(
0090 const std::vector<subscribe_topic>& topics,
0091 const subscribe_props& props
0092 ) {
0093 _num_topics = topics.size();
0094
0095 uint16_t packet_id = _svc_ptr->allocate_pid();
0096 if (packet_id == 0)
0097 return complete_immediate(client::error::pid_overrun, packet_id);
0098
0099 if (_num_topics == 0)
0100 return complete_immediate(client::error::invalid_topic, packet_id);
0101
0102 auto ec = validate_subscribe(topics, props);
0103 if (ec)
0104 return complete_immediate(ec, packet_id);
0105
0106 auto subscribe = control_packet<allocator_type>::of(
0107 with_pid, get_allocator(),
0108 encoders::encode_subscribe, packet_id,
0109 topics, props
0110 );
0111
0112 auto max_packet_size = _svc_ptr->connack_property(prop::maximum_packet_size)
0113 .value_or(default_max_send_size);
0114 if (subscribe.size() > max_packet_size)
0115 return complete_immediate(client::error::packet_too_large, packet_id);
0116
0117 send_subscribe(std::move(subscribe));
0118 }
0119
0120 void send_subscribe(control_packet<allocator_type> subscribe) {
0121 auto wire_data = subscribe.wire_data();
0122 _svc_ptr->async_send(
0123 wire_data,
0124 no_serial, send_flag::none,
0125 asio::prepend(
0126 std::move(*this), on_subscribe {}, std::move(subscribe)
0127 )
0128 );
0129 }
0130
0131 void resend_subscribe(control_packet<allocator_type> subscribe) {
0132 if (_handler.cancelled() != asio::cancellation_type_t::none)
0133 return complete(
0134 asio::error::operation_aborted, subscribe.packet_id()
0135 );
0136 send_subscribe(std::move(subscribe));
0137 }
0138
0139 void operator()(
0140 on_subscribe, control_packet<allocator_type> packet,
0141 error_code ec
0142 ) {
0143 if (ec == asio::error::try_again)
0144 return resend_subscribe(std::move(packet));
0145
0146 auto packet_id = packet.packet_id();
0147
0148 if (ec)
0149 return complete(ec, packet_id);
0150
0151 _svc_ptr->async_wait_reply(
0152 control_code_e::suback, packet_id,
0153 asio::prepend(std::move(*this), on_suback {}, std::move(packet))
0154 );
0155 }
0156
0157 void operator()(
0158 on_suback, control_packet<allocator_type> packet,
0159 error_code ec, byte_citer first, byte_citer last
0160 ) {
0161 if (ec == asio::error::try_again)
0162 return resend_subscribe(std::move(packet));
0163
0164 uint16_t packet_id = packet.packet_id();
0165
0166 if (ec)
0167 return complete(ec, packet_id);
0168
0169 auto suback = decoders::decode_suback(
0170 static_cast<uint32_t>(std::distance(first, last)), first
0171 );
0172 if (!suback.has_value()) {
0173 on_malformed_packet("Malformed SUBACK: cannot decode");
0174 return resend_subscribe(std::move(packet));
0175 }
0176
0177 auto& [props, rcs] = *suback;
0178 auto reason_codes = to_reason_codes(std::move(rcs));
0179 if (reason_codes.size() != _num_topics) {
0180 on_malformed_packet(
0181 "Malformed SUBACK: does not contain a "
0182 "valid Reason Code for every Topic Filter"
0183 );
0184 return resend_subscribe(std::move(packet));
0185 }
0186
0187 complete(
0188 ec, packet_id, std::move(reason_codes), std::move(props)
0189 );
0190 }
0191
0192 private:
0193
0194 error_code validate_subscribe(
0195 const std::vector<subscribe_topic>& topics, const subscribe_props& props
0196 ) const {
0197 error_code ec;
0198 for (const auto& topic: topics) {
0199 ec = validate_topic(topic);
0200 if (ec)
0201 return ec;
0202 }
0203
0204 ec = validate_props(props);
0205 return ec;
0206 }
0207
0208 error_code validate_topic(const subscribe_topic& topic) const {
0209 auto wildcard_available = _svc_ptr->connack_property(
0210 prop::wildcard_subscription_available
0211 ).value_or(1);
0212 auto shared_available = _svc_ptr->connack_property(
0213 prop::shared_subscription_available
0214 ).value_or(1);
0215
0216 std::string_view topic_filter = topic.topic_filter;
0217
0218 validation_result result = validation_result::valid;
0219 if (
0220 topic_filter.compare(0, shared_sub_prefix.size(), shared_sub_prefix) == 0
0221 ) {
0222 if (!shared_available)
0223 return client::error::shared_subscription_not_available;
0224
0225 result = validate_shared_topic_filter(topic_filter, wildcard_available);
0226 } else
0227 result = wildcard_available ?
0228 validate_topic_filter(topic_filter) :
0229 validate_topic_name(topic_filter);
0230
0231 if (result == validation_result::invalid)
0232 return client::error::invalid_topic;
0233 if (!wildcard_available && result != validation_result::valid)
0234 return client::error::wildcard_subscription_not_available;
0235 return error_code {};
0236 }
0237
0238 error_code validate_props(const subscribe_props& props) const {
0239 const auto& user_properties = props[prop::user_property];
0240 for (const auto& user_property: user_properties)
0241 if (!is_valid_string_pair(user_property))
0242 return client::error::malformed_packet;
0243
0244 const auto& sub_id = props[prop::subscription_identifier];
0245 if (!sub_id.has_value())
0246 return error_code {};
0247
0248 auto sub_id_available = _svc_ptr->connack_property(
0249 prop::subscription_identifier_available
0250 ).value_or(1);
0251
0252 if (!sub_id_available)
0253 return client::error::subscription_identifier_not_available;
0254
0255 return (min_subscription_identifier <= *sub_id &&
0256 *sub_id <= max_subscription_identifier) ?
0257 error_code {} :
0258 client::error::malformed_packet;
0259 }
0260
0261 static std::vector<reason_code> to_reason_codes(std::vector<uint8_t> codes) {
0262 std::vector<reason_code> ret;
0263 for (uint8_t code : codes) {
0264 auto rc = to_reason_code<reason_codes::category::suback>(code);
0265 if (rc)
0266 ret.push_back(*rc);
0267 }
0268 return ret;
0269 }
0270
0271 void on_malformed_packet(const std::string& reason) {
0272 auto props = disconnect_props {};
0273 props[prop::reason_string] = reason;
0274 async_disconnect(
0275 disconnect_rc_e::malformed_packet, props, _svc_ptr,
0276 asio::detached
0277 );
0278 }
0279
0280 void complete_immediate(error_code ec, uint16_t packet_id) {
0281 if (packet_id != 0)
0282 _svc_ptr->free_pid(packet_id);
0283 _handler.complete_immediate(
0284 ec, std::vector<reason_code>(_num_topics, reason_codes::empty),
0285 suback_props {}
0286 );
0287 }
0288
0289 void complete(
0290 error_code ec, uint16_t packet_id,
0291 std::vector<reason_code> reason_codes = {}, suback_props props = {}
0292 ) {
0293 if (reason_codes.empty() && _num_topics)
0294 reason_codes = std::vector<reason_code>(_num_topics, reason_codes::empty);
0295
0296 if (!_svc_ptr->subscriptions_present()) {
0297 bool has_success_rc = std::any_of(
0298 reason_codes.cbegin(), reason_codes.cend(),
0299 [](const reason_code& rc) { return !rc; }
0300 );
0301 if (has_success_rc)
0302 _svc_ptr->subscriptions_present(true);
0303 }
0304
0305 _svc_ptr->free_pid(packet_id);
0306 _handler.complete(ec, std::move(reason_codes), std::move(props));
0307 }
0308 };
0309
0310 template <typename ClientService>
0311 class initiate_async_subscribe {
0312 std::shared_ptr<ClientService> _svc_ptr;
0313 public:
0314 explicit initiate_async_subscribe(std::shared_ptr<ClientService> svc_ptr) :
0315 _svc_ptr(std::move(svc_ptr))
0316 {}
0317
0318 using executor_type = typename ClientService::executor_type;
0319 executor_type get_executor() const noexcept {
0320 return _svc_ptr->get_executor();
0321 }
0322
0323 template <typename Handler>
0324 void operator()(
0325 Handler&& handler,
0326 const std::vector<subscribe_topic>& topics, const subscribe_props& props
0327 ) {
0328 detail::subscribe_op { _svc_ptr, std::move(handler) }
0329 .perform(topics, props);
0330 }
0331 };
0332
0333 }
0334
0335 #endif