Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-17 08:38:22

0001 //
0002 // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
0003 //
0004 // Distributed under the Boost Software License, Version 1.0.
0005 // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MQTT5_SHUTDOWN_OP_HPP
0009 #define BOOST_MQTT5_SHUTDOWN_OP_HPP
0010 
0011 #include <boost/mqtt5/types.hpp>
0012 
0013 #include <boost/mqtt5/detail/async_traits.hpp>
0014 #include <boost/mqtt5/detail/shutdown.hpp>
0015 
0016 #include <boost/asio/any_completion_handler.hpp>
0017 #include <boost/asio/associated_allocator.hpp>
0018 #include <boost/asio/associated_cancellation_slot.hpp>
0019 #include <boost/asio/associated_executor.hpp>
0020 #include <boost/asio/async_result.hpp>
0021 #include <boost/asio/deferred.hpp>
0022 #include <boost/asio/error.hpp>
0023 #include <boost/asio/experimental/parallel_group.hpp>
0024 #include <boost/asio/ip/tcp.hpp>
0025 #include <boost/asio/prepend.hpp>
0026 
0027 #include <array>
0028 #include <chrono>
0029 
0030 namespace boost::mqtt5::detail {
0031 
0032 template <typename>
0033 constexpr bool is_basic_socket = false;
0034 
0035 template <typename P, typename E>
0036 constexpr bool is_basic_socket<asio::basic_stream_socket<P, E>> = true;
0037 
0038 namespace asio = boost::asio;
0039 
0040 template <typename Owner>
0041 class shutdown_op {
0042     struct on_locked {};
0043     struct on_shutdown {};
0044 
0045     Owner& _owner;
0046 
0047     using handler_type = asio::any_completion_handler<void (error_code)>;
0048     handler_type _handler;
0049 
0050 public:
0051     template <typename Handler>
0052     shutdown_op(Owner& owner, Handler&& handler) :
0053         _owner(owner), _handler(std::move(handler))
0054     {}
0055 
0056     shutdown_op(shutdown_op&&) = default;
0057     shutdown_op(const shutdown_op&) = delete;
0058 
0059     shutdown_op& operator=(shutdown_op&&) = default;
0060     shutdown_op& operator=(const shutdown_op&) = delete;
0061 
0062     using allocator_type = asio::associated_allocator_t<handler_type>;
0063     allocator_type get_allocator() const noexcept {
0064         return asio::get_associated_allocator(_handler);
0065     }
0066 
0067     using cancellation_slot_type =
0068         asio::associated_cancellation_slot_t<handler_type>;
0069     cancellation_slot_type get_cancellation_slot() const noexcept {
0070         return asio::get_associated_cancellation_slot(_handler);
0071     }
0072 
0073     using executor_type = typename Owner::executor_type;
0074     executor_type get_executor() const noexcept {
0075         return _owner.get_executor();
0076     }
0077 
0078     void perform() {
0079         if constexpr (is_basic_socket<typename Owner::stream_type>) {
0080             error_code ec;
0081             _owner._stream_ptr->shutdown(asio::socket_base::shutdown_both, ec);
0082             return std::move(_handler)(error_code {});
0083         }
0084         else {
0085             if (_owner._conn_mtx.is_locked())
0086                 return std::move(_handler)(error_code{});
0087 
0088             auto s = std::move(_owner._stream_ptr);
0089             _owner.replace_next_layer(_owner.construct_next_layer());
0090             _owner.open();
0091 
0092             _owner._conn_mtx.lock(
0093                 asio::prepend(std::move(*this), on_locked {}, std::move(s))
0094             );
0095         }
0096     }
0097 
0098     void operator()(on_locked, typename Owner::stream_ptr s, error_code ec) {
0099         if (ec == asio::error::operation_aborted)
0100             return complete(s, asio::error::operation_aborted);
0101 
0102         if (!_owner.is_open()) {
0103             _owner._conn_mtx.unlock();
0104             return complete(s, asio::error::operation_aborted);
0105         }
0106         
0107         namespace asioex = boost::asio::experimental;
0108 
0109         // wait max 5 seconds for the shutdown op to finish
0110         _owner._connect_timer.expires_after(std::chrono::seconds(5));
0111 
0112         auto init_shutdown = [](
0113             auto handler, typename Owner::stream_type& stream
0114         ) {
0115             async_shutdown(stream, std::move(handler));
0116         };
0117 
0118         auto timed_shutdown = asioex::make_parallel_group(
0119             asio::async_initiate<const asio::deferred_t, void(error_code)>(
0120                 init_shutdown, asio::deferred, std::ref(*s)
0121             ),
0122             _owner._connect_timer.async_wait(asio::deferred)
0123         );
0124 
0125         timed_shutdown.async_wait(
0126             asioex::wait_for_one(),
0127             asio::prepend(
0128                 std::move(*this), on_shutdown {},
0129                 std::move(s)
0130             )
0131         );
0132     }
0133 
0134     void operator()(
0135         on_shutdown, typename Owner::stream_ptr sptr,
0136         std::array<std::size_t, 2> /* ord */,
0137         error_code /* shutdown_ec */, error_code /* timer_ec */
0138     ) {
0139         _owner._conn_mtx.unlock();
0140 
0141         if (!_owner.is_open())
0142             return complete(sptr, asio::error::operation_aborted);
0143 
0144         // ignore shutdown error_code
0145         complete(sptr, error_code {});
0146     }
0147 
0148 private:
0149     void complete(const typename Owner::stream_ptr& sptr, error_code ec) {
0150         asio::get_associated_cancellation_slot(_handler).clear();
0151         error_code close_ec;
0152         lowest_layer(*sptr).close(close_ec);
0153         std::move(_handler)(ec);
0154     }
0155 };
0156 
0157 
0158 } // end namespace boost::mqtt5::detail
0159 
0160 #endif // !BOOST_MQTT5_SHUTDOWN_OP_HPP