Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-17 08:38:22

0001 //
0002 // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
0003 //
0004 // Distributed under the Boost Software License, Version 1.0.
0005 // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MQTT5_RECONNECT_OP_HPP
0009 #define BOOST_MQTT5_RECONNECT_OP_HPP
0010 
0011 #include <boost/mqtt5/types.hpp>
0012 
0013 #include <boost/mqtt5/detail/async_traits.hpp>
0014 
0015 #include <boost/mqtt5/impl/connect_op.hpp>
0016 
0017 #include <boost/asio/any_completion_handler.hpp>
0018 #include <boost/asio/associated_allocator.hpp>
0019 #include <boost/asio/associated_cancellation_slot.hpp>
0020 #include <boost/asio/associated_executor.hpp>
0021 #include <boost/asio/async_result.hpp>
0022 #include <boost/asio/deferred.hpp>
0023 #include <boost/asio/dispatch.hpp>
0024 #include <boost/asio/error.hpp>
0025 #include <boost/asio/experimental/parallel_group.hpp>
0026 #include <boost/asio/ip/tcp.hpp>
0027 #include <boost/asio/prepend.hpp>
0028 #include <boost/random/linear_congruential.hpp>
0029 #include <boost/random/uniform_smallint.hpp>
0030 
0031 #include <array>
0032 #include <chrono>
0033 #include <memory>
0034 #include <string>
0035 
0036 namespace boost::mqtt5::detail {
0037 
0038 class exponential_backoff {
0039     int _curr_exp { 0 };
0040 
0041     static constexpr int _base_mulptilier = 1000;
0042     static constexpr int _max_exp = 4;
0043 
0044     // sizeof(_generator) = 8
0045     boost::random::rand48 _generator { uint32_t(std::time(0)) };
0046     boost::random::uniform_smallint<> _distribution { -500, 500 };
0047 public:
0048     exponential_backoff() = default;
0049 
0050     duration generate() {
0051         int exponent = _curr_exp < _max_exp ? _curr_exp++ : _max_exp;
0052         int base = 1 << exponent;
0053         return std::chrono::milliseconds(
0054             base * _base_mulptilier + _distribution(_generator) /* noise */
0055         );
0056     }
0057 };
0058 
0059 namespace asio = boost::asio;
0060 
0061 template <typename Owner>
0062 class reconnect_op {
0063     struct on_locked {};
0064     struct on_next_endpoint {};
0065     struct on_connect {};
0066     struct on_backoff {};
0067 
0068     Owner& _owner;
0069 
0070     using handler_type = asio::any_completion_handler<void (error_code)>;
0071     handler_type _handler;
0072 
0073     std::unique_ptr<std::string> _buffer_ptr;
0074 
0075     exponential_backoff _generator;
0076 
0077     using endpoint = asio::ip::tcp::endpoint;
0078     using epoints = asio::ip::tcp::resolver::results_type;
0079 
0080 public:
0081     template <typename Handler>
0082     reconnect_op(Owner& owner, Handler&& handler) :
0083         _owner(owner), _handler(std::move(handler))
0084     {}
0085 
0086     reconnect_op(reconnect_op&&) = default;
0087     reconnect_op(const reconnect_op&) = delete;
0088 
0089     reconnect_op& operator=(reconnect_op&&) = default;
0090     reconnect_op& operator=(const reconnect_op&) = delete;
0091 
0092     using allocator_type = asio::associated_allocator_t<handler_type>;
0093     allocator_type get_allocator() const noexcept {
0094         return asio::get_associated_allocator(_handler);
0095     }
0096 
0097     using cancellation_slot_type =
0098         asio::associated_cancellation_slot_t<handler_type>;
0099     cancellation_slot_type get_cancellation_slot() const noexcept {
0100         return asio::get_associated_cancellation_slot(_handler);
0101     }
0102 
0103     using executor_type = asio::associated_executor_t<handler_type>;
0104     executor_type get_executor() const noexcept {
0105         return asio::get_associated_executor(_handler);
0106     }
0107 
0108     void perform(typename Owner::stream_ptr s) {
0109         _owner._conn_mtx.lock(
0110             asio::prepend(std::move(*this), on_locked {}, s)
0111         );
0112     }
0113 
0114     void operator()(on_locked, typename Owner::stream_ptr s, error_code ec) {
0115         if (ec == asio::error::operation_aborted)
0116             // cancelled without acquiring the lock (by calling client.cancel())
0117             return std::move(_handler)(ec);
0118         
0119         if (!_owner.is_open())
0120             return complete(asio::error::operation_aborted);
0121 
0122         if (s != _owner._stream_ptr)
0123             return complete(asio::error::try_again);
0124 
0125         do_reconnect();
0126     }
0127 
0128     void do_reconnect() {
0129         _owner._endpoints.async_next_endpoint(
0130             asio::prepend(std::move(*this), on_next_endpoint {})
0131         );
0132     }
0133 
0134     void backoff_and_reconnect() {
0135         _owner._connect_timer.expires_after(_generator.generate());
0136         _owner._connect_timer.async_wait(
0137             asio::prepend(std::move(*this), on_backoff {})
0138         );
0139     }
0140 
0141     void operator()(on_backoff, error_code ec) {
0142         if (ec == asio::error::operation_aborted || !_owner.is_open())
0143             return complete(asio::error::operation_aborted);
0144 
0145         do_reconnect();
0146     }
0147 
0148     void operator()(
0149         on_next_endpoint, error_code ec,
0150         epoints eps, authority_path ap
0151     ) {
0152         // the three error codes below are the only possible codes
0153         // that may be returned from async_next_endpont
0154 
0155         if (ec == asio::error::operation_aborted || !_owner.is_open())
0156             return complete(asio::error::operation_aborted);
0157 
0158         if (ec == asio::error::try_again)
0159             return backoff_and_reconnect();
0160 
0161         if (ec == asio::error::host_not_found)
0162             return complete(asio::error::no_recovery);
0163 
0164         connect(eps.cbegin(), std::move(ap));
0165     }
0166 
0167     void connect(epoints::const_iterator eps, authority_path ap) {
0168         namespace asioex = boost::asio::experimental;
0169 
0170         const auto& ep = eps->endpoint();
0171         auto sptr = _owner.construct_and_open_next_layer(ep.protocol());
0172 
0173         if constexpr (has_tls_context<typename Owner::stream_context_type>)
0174             setup_tls_sni(
0175                 ap, _owner._stream_context.tls_context(), *sptr
0176             );
0177 
0178         // wait max 5 seconds for the connect (handshake) op to finish
0179         _owner._connect_timer.expires_after(std::chrono::seconds(5));
0180 
0181         auto init_connect = [](
0182             auto handler, typename Owner::stream_type& stream,
0183             mqtt_ctx& context, log_invoke<typename Owner::logger_type>& log,
0184             endpoint ep, authority_path ap
0185         ) {
0186             connect_op { stream, context, log, std::move(handler) }
0187                 .perform(ep, std::move(ap));
0188         };
0189 
0190         auto timed_connect = asioex::make_parallel_group(
0191             asio::async_initiate<const asio::deferred_t, void(error_code)>(
0192                 init_connect, asio::deferred, std::ref(*sptr),
0193                 std::ref(_owner._stream_context.mqtt_context()),
0194                 std::ref(_owner.log()),
0195                 ep, ap
0196             ),
0197             _owner._connect_timer.async_wait(asio::deferred)
0198         );
0199 
0200         timed_connect.async_wait(
0201             asioex::wait_for_one(),
0202             asio::prepend(
0203                 std::move(*this), on_connect {},
0204                 std::move(sptr), std::move(eps), std::move(ap)
0205             )
0206         );
0207     }
0208 
0209     void operator()(
0210         on_connect,
0211         typename Owner::stream_ptr sptr, epoints::const_iterator eps, authority_path ap,
0212         std::array<std::size_t, 2> ord,
0213         error_code connect_ec, error_code timer_ec
0214     ) {
0215         // connect_ec may be any of:
0216         //  1) async_connect error codes
0217         //  2) async_handshake (TLS) error codes
0218         //  3) async_handshake (WebSocket) error codes
0219         //  4) async_write error codes
0220         //  5) async_read error codes
0221         //  5) client::error::malformed_packet
0222         if (
0223             (ord[0] == 0 && connect_ec == asio::error::operation_aborted) ||
0224             (ord[0] == 1 && timer_ec == asio::error::operation_aborted) ||
0225             !_owner.is_open()
0226         )
0227             return complete(asio::error::operation_aborted);
0228 
0229         // retry for operation timed out and any other error_code or client::error::malformed_packet
0230         if (ord[0] == 1 || connect_ec) {
0231             // if the hostname resolved into more endpoints, try the next one
0232             if (++eps != epoints::const_iterator())
0233                 return connect(std::move(eps), std::move(ap));
0234             // try next server
0235             return do_reconnect();
0236         }
0237 
0238         _owner.replace_next_layer(std::move(sptr));
0239         complete(error_code {});
0240     }
0241 
0242 private:
0243     void complete(error_code ec) {
0244         _owner._conn_mtx.unlock();
0245         std::move(_handler)(ec);
0246     }
0247 };
0248 
0249 
0250 } // end namespace boost::mqtt5::detail
0251 
0252 #endif // !BOOST_MQTT5_RECONNECT_OP_HPP