Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-17 08:38:21

0001 //
0002 // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
0003 //
0004 // Distributed under the Boost Software License, Version 1.0.
0005 // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MQTT5_DISCONNECT_OP_HPP
0009 #define BOOST_MQTT5_DISCONNECT_OP_HPP
0010 
0011 #include <boost/mqtt5/types.hpp>
0012 
0013 #include <boost/mqtt5/detail/cancellable_handler.hpp>
0014 #include <boost/mqtt5/detail/control_packet.hpp>
0015 #include <boost/mqtt5/detail/internal_types.hpp>
0016 #include <boost/mqtt5/detail/topic_validation.hpp>
0017 #include <boost/mqtt5/detail/utf8_mqtt.hpp>
0018 
0019 #include <boost/mqtt5/impl/codecs/message_encoders.hpp>
0020 
0021 #include <boost/asio/any_completion_handler.hpp>
0022 #include <boost/asio/associated_allocator.hpp>
0023 #include <boost/asio/associated_cancellation_slot.hpp>
0024 #include <boost/asio/associated_executor.hpp>
0025 #include <boost/asio/async_result.hpp>
0026 #include <boost/asio/consign.hpp>
0027 #include <boost/asio/deferred.hpp>
0028 #include <boost/asio/error.hpp>
0029 #include <boost/asio/experimental/parallel_group.hpp>
0030 #include <boost/asio/prepend.hpp>
0031 #include <boost/asio/steady_timer.hpp>
0032 
0033 #include <cstdint>
0034 #include <memory>
0035 
0036 namespace boost::mqtt5::detail {
0037 
0038 namespace asio = boost::asio;
0039 
0040 template <
0041     typename ClientService,
0042     typename DisconnectContext
0043 >
0044 class disconnect_op {
0045     using client_service = ClientService;
0046 
0047     struct on_disconnect {};
0048     struct on_shutdown {};
0049 
0050     std::shared_ptr<client_service> _svc_ptr;
0051     DisconnectContext _context;
0052 
0053     using handler_type = cancellable_handler<
0054         asio::any_completion_handler<void (error_code)>,
0055         typename ClientService::executor_type
0056     >;
0057     handler_type _handler;
0058 
0059 public:
0060     template <typename Handler>
0061     disconnect_op(
0062         std::shared_ptr<client_service> svc_ptr,
0063         DisconnectContext&& context, Handler&& handler
0064     ) :
0065         _svc_ptr(std::move(svc_ptr)), _context(std::move(context)),
0066         _handler(std::move(handler), _svc_ptr->get_executor())
0067     {
0068         auto slot = asio::get_associated_cancellation_slot(_handler);
0069         if (slot.is_connected())
0070             slot.assign([&svc = *_svc_ptr](asio::cancellation_type_t) {
0071                 svc.cancel();
0072             });
0073     }
0074 
0075     disconnect_op(disconnect_op&&) = default;
0076     disconnect_op(const disconnect_op&) = delete;
0077 
0078     disconnect_op& operator=(disconnect_op&&) = default;
0079     disconnect_op& operator=(const disconnect_op&) = delete;
0080 
0081     using allocator_type = asio::associated_allocator_t<handler_type>;
0082     allocator_type get_allocator() const noexcept {
0083         return asio::get_associated_allocator(_handler);
0084     }
0085 
0086     using executor_type = typename client_service::executor_type;
0087     executor_type get_executor() const noexcept {
0088         return _svc_ptr->get_executor();
0089     }
0090 
0091     void perform() {
0092         error_code ec = validate_disconnect(_context.props);
0093         if (ec)
0094             return complete_immediate(ec);
0095 
0096         auto disconnect = control_packet<allocator_type>::of(
0097             no_pid, get_allocator(),
0098             encoders::encode_disconnect,
0099             static_cast<uint8_t>(_context.reason_code), _context.props
0100         );
0101 
0102         auto max_packet_size = _svc_ptr->connack_property(prop::maximum_packet_size)
0103                 .value_or(default_max_send_size);
0104         if (disconnect.size() > max_packet_size)
0105             // drop properties
0106             return send_disconnect(control_packet<allocator_type>::of(
0107                 no_pid, get_allocator(),
0108                 encoders::encode_disconnect,
0109                 static_cast<uint8_t>(_context.reason_code), disconnect_props {}
0110             ));
0111 
0112         send_disconnect(std::move(disconnect));
0113     }
0114 
0115     void send_disconnect(control_packet<allocator_type> disconnect) {
0116         auto wire_data = disconnect.wire_data();
0117         _svc_ptr->async_send(
0118             wire_data,
0119             no_serial, send_flag::terminal,
0120             asio::prepend(
0121                 std::move(*this),
0122                 on_disconnect {}, std::move(disconnect)
0123             )
0124         );
0125     }
0126 
0127     void operator()(
0128         on_disconnect,
0129         control_packet<allocator_type> disconnect, error_code ec
0130     ) {
0131         // The connection must be closed even
0132         // if we failed to send the DISCONNECT packet
0133         // with Reason Code of 0x80 or greater.
0134 
0135         if (
0136             ec == asio::error::operation_aborted ||
0137             ec == asio::error::no_recovery
0138         )
0139             return complete(asio::error::operation_aborted);
0140 
0141         if (ec == asio::error::try_again) {
0142             if (_context.terminal)
0143                 return send_disconnect(std::move(disconnect));
0144             return complete(error_code {});
0145         }
0146 
0147         return _svc_ptr->async_shutdown(
0148             asio::prepend(std::move(*this), on_shutdown {})
0149         );
0150     }
0151 
0152     void operator()(on_shutdown, error_code ec) {
0153         if (_context.terminal)
0154             _svc_ptr->cancel();
0155         complete(ec);
0156     }
0157 
0158 private:
0159     static error_code validate_disconnect(const disconnect_props& props) {
0160         const auto& reason_string = props[prop::reason_string];
0161         if (
0162             reason_string &&
0163             validate_mqtt_utf8(*reason_string) != validation_result::valid
0164         )
0165             return client::error::malformed_packet;
0166 
0167         const auto& user_properties = props[prop::user_property];
0168         for (const auto& user_property: user_properties)
0169             if (!is_valid_string_pair(user_property))
0170                 return client::error::malformed_packet;
0171         return error_code {};
0172     }
0173 
0174     void complete(error_code ec) {
0175         _handler.complete(ec);
0176     }
0177 
0178     void complete_immediate(error_code ec) {
0179         _handler.complete_immediate(ec);
0180     }
0181 };
0182 
0183 template <typename ClientService, typename Handler>
0184 class terminal_disconnect_op {
0185     using client_service = ClientService;
0186 
0187     static constexpr uint8_t seconds = 5;
0188 
0189     std::shared_ptr<client_service> _svc_ptr;
0190     std::unique_ptr<asio::steady_timer> _timer;
0191 
0192     using handler_type = Handler;
0193     handler_type _handler;
0194 
0195 public:
0196     terminal_disconnect_op(
0197         std::shared_ptr<client_service> svc_ptr,
0198         Handler&& handler
0199     ) :
0200         _svc_ptr(std::move(svc_ptr)),
0201         _timer(new asio::steady_timer(_svc_ptr->get_executor())),
0202         _handler(std::move(handler))
0203     {}
0204 
0205     terminal_disconnect_op(terminal_disconnect_op&&) = default;
0206     terminal_disconnect_op(const terminal_disconnect_op&) = delete;
0207 
0208     terminal_disconnect_op& operator=(terminal_disconnect_op&&) = default;
0209     terminal_disconnect_op& operator=(const terminal_disconnect_op&) = delete;
0210 
0211     using allocator_type = asio::associated_allocator_t<handler_type>;
0212     allocator_type get_allocator() const noexcept {
0213         return asio::get_associated_allocator(_handler);
0214     }
0215 
0216     using cancellation_slot_type = asio::associated_cancellation_slot_t<handler_type>;
0217     cancellation_slot_type get_cancellation_slot() const noexcept {
0218         return asio::get_associated_cancellation_slot(_handler);
0219     }
0220 
0221     using executor_type = asio::associated_executor_t<handler_type>;
0222     executor_type get_executor() const noexcept {
0223         return asio::get_associated_executor(_handler);
0224     }
0225 
0226     template <typename DisconnectContext>
0227     void perform(DisconnectContext&& context) {
0228         namespace asioex = boost::asio::experimental;
0229 
0230         auto init_disconnect = [](
0231             auto handler, disconnect_ctx ctx,
0232             std::shared_ptr<ClientService> svc_ptr
0233         ) {
0234             disconnect_op {
0235                 std::move(svc_ptr), std::move(ctx), std::move(handler)
0236             }.perform();
0237         };
0238 
0239         _timer->expires_after(std::chrono::seconds(seconds));
0240 
0241         auto timed_disconnect = asioex::make_parallel_group(
0242             asio::async_initiate<const asio::deferred_t, void (error_code)>(
0243                 init_disconnect, asio::deferred,
0244                 std::forward<DisconnectContext>(context), _svc_ptr
0245             ),
0246             _timer->async_wait(asio::deferred)
0247         );
0248 
0249         timed_disconnect.async_wait(
0250             asioex::wait_for_one(), std::move(*this)
0251         );
0252     }
0253 
0254     void operator()(
0255         std::array<std::size_t, 2> /* ord */,
0256         error_code disconnect_ec, error_code /* timer_ec */
0257     ) {
0258         std::move(_handler)(disconnect_ec);
0259     }
0260 };
0261 
0262 template <typename ClientService, bool terminal>
0263 class initiate_async_disconnect {
0264     std::shared_ptr<ClientService> _svc_ptr;
0265 public:
0266     explicit initiate_async_disconnect(std::shared_ptr<ClientService> svc_ptr) :
0267         _svc_ptr(std::move(svc_ptr))
0268     {}
0269 
0270     using executor_type = typename ClientService::executor_type;
0271     executor_type get_executor() const noexcept {
0272         return _svc_ptr->get_executor();
0273     }
0274 
0275     template <typename Handler>
0276     void operator()(
0277         Handler&& handler,
0278         disconnect_rc_e rc, const disconnect_props& props
0279     ) {
0280         auto ctx = disconnect_ctx { rc, props, terminal };
0281         if constexpr (terminal)
0282             terminal_disconnect_op { _svc_ptr, std::move(handler) }
0283                 .perform(std::move(ctx));
0284         else
0285             disconnect_op { _svc_ptr, std::move(ctx), std::move(handler) }
0286                 .perform();
0287     }
0288 };
0289 
0290 template <typename ClientService, typename CompletionToken>
0291 decltype(auto) async_disconnect(
0292     disconnect_rc_e reason_code, const disconnect_props& props,
0293     std::shared_ptr<ClientService> svc_ptr,
0294     CompletionToken&& token
0295 ) {
0296     using Signature = void (error_code);
0297     return asio::async_initiate<CompletionToken, Signature>(
0298         initiate_async_disconnect<ClientService, false>(std::move(svc_ptr)), token,
0299         reason_code, props
0300     );
0301 }
0302 
0303 template <typename ClientService, typename CompletionToken>
0304 decltype(auto) async_terminal_disconnect(
0305     disconnect_rc_e reason_code, const disconnect_props& props,
0306     std::shared_ptr<ClientService> svc_ptr,
0307     CompletionToken&& token
0308 ) {
0309     using Signature = void (error_code);
0310     return asio::async_initiate<CompletionToken, Signature>(
0311         initiate_async_disconnect<ClientService, true>(std::move(svc_ptr)), token,
0312         reason_code, props
0313     );
0314 }
0315 
0316 } // end namespace boost::mqtt5::detail
0317 
0318 #endif // !BOOST_MQTT5_DISCONNECT_HPP