File indexing completed on 2025-09-17 08:38:21
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MQTT5_DISCONNECT_OP_HPP
0009 #define BOOST_MQTT5_DISCONNECT_OP_HPP
0010
0011 #include <boost/mqtt5/types.hpp>
0012
0013 #include <boost/mqtt5/detail/cancellable_handler.hpp>
0014 #include <boost/mqtt5/detail/control_packet.hpp>
0015 #include <boost/mqtt5/detail/internal_types.hpp>
0016 #include <boost/mqtt5/detail/topic_validation.hpp>
0017 #include <boost/mqtt5/detail/utf8_mqtt.hpp>
0018
0019 #include <boost/mqtt5/impl/codecs/message_encoders.hpp>
0020
0021 #include <boost/asio/any_completion_handler.hpp>
0022 #include <boost/asio/associated_allocator.hpp>
0023 #include <boost/asio/associated_cancellation_slot.hpp>
0024 #include <boost/asio/associated_executor.hpp>
0025 #include <boost/asio/async_result.hpp>
0026 #include <boost/asio/consign.hpp>
0027 #include <boost/asio/deferred.hpp>
0028 #include <boost/asio/error.hpp>
0029 #include <boost/asio/experimental/parallel_group.hpp>
0030 #include <boost/asio/prepend.hpp>
0031 #include <boost/asio/steady_timer.hpp>
0032
0033 #include <cstdint>
0034 #include <memory>
0035
0036 namespace boost::mqtt5::detail {
0037
0038 namespace asio = boost::asio;
0039
0040 template <
0041 typename ClientService,
0042 typename DisconnectContext
0043 >
0044 class disconnect_op {
0045 using client_service = ClientService;
0046
0047 struct on_disconnect {};
0048 struct on_shutdown {};
0049
0050 std::shared_ptr<client_service> _svc_ptr;
0051 DisconnectContext _context;
0052
0053 using handler_type = cancellable_handler<
0054 asio::any_completion_handler<void (error_code)>,
0055 typename ClientService::executor_type
0056 >;
0057 handler_type _handler;
0058
0059 public:
0060 template <typename Handler>
0061 disconnect_op(
0062 std::shared_ptr<client_service> svc_ptr,
0063 DisconnectContext&& context, Handler&& handler
0064 ) :
0065 _svc_ptr(std::move(svc_ptr)), _context(std::move(context)),
0066 _handler(std::move(handler), _svc_ptr->get_executor())
0067 {
0068 auto slot = asio::get_associated_cancellation_slot(_handler);
0069 if (slot.is_connected())
0070 slot.assign([&svc = *_svc_ptr](asio::cancellation_type_t) {
0071 svc.cancel();
0072 });
0073 }
0074
0075 disconnect_op(disconnect_op&&) = default;
0076 disconnect_op(const disconnect_op&) = delete;
0077
0078 disconnect_op& operator=(disconnect_op&&) = default;
0079 disconnect_op& operator=(const disconnect_op&) = delete;
0080
0081 using allocator_type = asio::associated_allocator_t<handler_type>;
0082 allocator_type get_allocator() const noexcept {
0083 return asio::get_associated_allocator(_handler);
0084 }
0085
0086 using executor_type = typename client_service::executor_type;
0087 executor_type get_executor() const noexcept {
0088 return _svc_ptr->get_executor();
0089 }
0090
0091 void perform() {
0092 error_code ec = validate_disconnect(_context.props);
0093 if (ec)
0094 return complete_immediate(ec);
0095
0096 auto disconnect = control_packet<allocator_type>::of(
0097 no_pid, get_allocator(),
0098 encoders::encode_disconnect,
0099 static_cast<uint8_t>(_context.reason_code), _context.props
0100 );
0101
0102 auto max_packet_size = _svc_ptr->connack_property(prop::maximum_packet_size)
0103 .value_or(default_max_send_size);
0104 if (disconnect.size() > max_packet_size)
0105
0106 return send_disconnect(control_packet<allocator_type>::of(
0107 no_pid, get_allocator(),
0108 encoders::encode_disconnect,
0109 static_cast<uint8_t>(_context.reason_code), disconnect_props {}
0110 ));
0111
0112 send_disconnect(std::move(disconnect));
0113 }
0114
0115 void send_disconnect(control_packet<allocator_type> disconnect) {
0116 auto wire_data = disconnect.wire_data();
0117 _svc_ptr->async_send(
0118 wire_data,
0119 no_serial, send_flag::terminal,
0120 asio::prepend(
0121 std::move(*this),
0122 on_disconnect {}, std::move(disconnect)
0123 )
0124 );
0125 }
0126
0127 void operator()(
0128 on_disconnect,
0129 control_packet<allocator_type> disconnect, error_code ec
0130 ) {
0131
0132
0133
0134
0135 if (
0136 ec == asio::error::operation_aborted ||
0137 ec == asio::error::no_recovery
0138 )
0139 return complete(asio::error::operation_aborted);
0140
0141 if (ec == asio::error::try_again) {
0142 if (_context.terminal)
0143 return send_disconnect(std::move(disconnect));
0144 return complete(error_code {});
0145 }
0146
0147 return _svc_ptr->async_shutdown(
0148 asio::prepend(std::move(*this), on_shutdown {})
0149 );
0150 }
0151
0152 void operator()(on_shutdown, error_code ec) {
0153 if (_context.terminal)
0154 _svc_ptr->cancel();
0155 complete(ec);
0156 }
0157
0158 private:
0159 static error_code validate_disconnect(const disconnect_props& props) {
0160 const auto& reason_string = props[prop::reason_string];
0161 if (
0162 reason_string &&
0163 validate_mqtt_utf8(*reason_string) != validation_result::valid
0164 )
0165 return client::error::malformed_packet;
0166
0167 const auto& user_properties = props[prop::user_property];
0168 for (const auto& user_property: user_properties)
0169 if (!is_valid_string_pair(user_property))
0170 return client::error::malformed_packet;
0171 return error_code {};
0172 }
0173
0174 void complete(error_code ec) {
0175 _handler.complete(ec);
0176 }
0177
0178 void complete_immediate(error_code ec) {
0179 _handler.complete_immediate(ec);
0180 }
0181 };
0182
0183 template <typename ClientService, typename Handler>
0184 class terminal_disconnect_op {
0185 using client_service = ClientService;
0186
0187 static constexpr uint8_t seconds = 5;
0188
0189 std::shared_ptr<client_service> _svc_ptr;
0190 std::unique_ptr<asio::steady_timer> _timer;
0191
0192 using handler_type = Handler;
0193 handler_type _handler;
0194
0195 public:
0196 terminal_disconnect_op(
0197 std::shared_ptr<client_service> svc_ptr,
0198 Handler&& handler
0199 ) :
0200 _svc_ptr(std::move(svc_ptr)),
0201 _timer(new asio::steady_timer(_svc_ptr->get_executor())),
0202 _handler(std::move(handler))
0203 {}
0204
0205 terminal_disconnect_op(terminal_disconnect_op&&) = default;
0206 terminal_disconnect_op(const terminal_disconnect_op&) = delete;
0207
0208 terminal_disconnect_op& operator=(terminal_disconnect_op&&) = default;
0209 terminal_disconnect_op& operator=(const terminal_disconnect_op&) = delete;
0210
0211 using allocator_type = asio::associated_allocator_t<handler_type>;
0212 allocator_type get_allocator() const noexcept {
0213 return asio::get_associated_allocator(_handler);
0214 }
0215
0216 using cancellation_slot_type = asio::associated_cancellation_slot_t<handler_type>;
0217 cancellation_slot_type get_cancellation_slot() const noexcept {
0218 return asio::get_associated_cancellation_slot(_handler);
0219 }
0220
0221 using executor_type = asio::associated_executor_t<handler_type>;
0222 executor_type get_executor() const noexcept {
0223 return asio::get_associated_executor(_handler);
0224 }
0225
0226 template <typename DisconnectContext>
0227 void perform(DisconnectContext&& context) {
0228 namespace asioex = boost::asio::experimental;
0229
0230 auto init_disconnect = [](
0231 auto handler, disconnect_ctx ctx,
0232 std::shared_ptr<ClientService> svc_ptr
0233 ) {
0234 disconnect_op {
0235 std::move(svc_ptr), std::move(ctx), std::move(handler)
0236 }.perform();
0237 };
0238
0239 _timer->expires_after(std::chrono::seconds(seconds));
0240
0241 auto timed_disconnect = asioex::make_parallel_group(
0242 asio::async_initiate<const asio::deferred_t, void (error_code)>(
0243 init_disconnect, asio::deferred,
0244 std::forward<DisconnectContext>(context), _svc_ptr
0245 ),
0246 _timer->async_wait(asio::deferred)
0247 );
0248
0249 timed_disconnect.async_wait(
0250 asioex::wait_for_one(), std::move(*this)
0251 );
0252 }
0253
0254 void operator()(
0255 std::array<std::size_t, 2> ,
0256 error_code disconnect_ec, error_code
0257 ) {
0258 std::move(_handler)(disconnect_ec);
0259 }
0260 };
0261
0262 template <typename ClientService, bool terminal>
0263 class initiate_async_disconnect {
0264 std::shared_ptr<ClientService> _svc_ptr;
0265 public:
0266 explicit initiate_async_disconnect(std::shared_ptr<ClientService> svc_ptr) :
0267 _svc_ptr(std::move(svc_ptr))
0268 {}
0269
0270 using executor_type = typename ClientService::executor_type;
0271 executor_type get_executor() const noexcept {
0272 return _svc_ptr->get_executor();
0273 }
0274
0275 template <typename Handler>
0276 void operator()(
0277 Handler&& handler,
0278 disconnect_rc_e rc, const disconnect_props& props
0279 ) {
0280 auto ctx = disconnect_ctx { rc, props, terminal };
0281 if constexpr (terminal)
0282 terminal_disconnect_op { _svc_ptr, std::move(handler) }
0283 .perform(std::move(ctx));
0284 else
0285 disconnect_op { _svc_ptr, std::move(ctx), std::move(handler) }
0286 .perform();
0287 }
0288 };
0289
0290 template <typename ClientService, typename CompletionToken>
0291 decltype(auto) async_disconnect(
0292 disconnect_rc_e reason_code, const disconnect_props& props,
0293 std::shared_ptr<ClientService> svc_ptr,
0294 CompletionToken&& token
0295 ) {
0296 using Signature = void (error_code);
0297 return asio::async_initiate<CompletionToken, Signature>(
0298 initiate_async_disconnect<ClientService, false>(std::move(svc_ptr)), token,
0299 reason_code, props
0300 );
0301 }
0302
0303 template <typename ClientService, typename CompletionToken>
0304 decltype(auto) async_terminal_disconnect(
0305 disconnect_rc_e reason_code, const disconnect_props& props,
0306 std::shared_ptr<ClientService> svc_ptr,
0307 CompletionToken&& token
0308 ) {
0309 using Signature = void (error_code);
0310 return asio::async_initiate<CompletionToken, Signature>(
0311 initiate_async_disconnect<ClientService, true>(std::move(svc_ptr)), token,
0312 reason_code, props
0313 );
0314 }
0315
0316 }
0317
0318 #endif