File indexing completed on 2025-09-17 08:38:22
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MQTT5_RECONNECT_OP_HPP
0009 #define BOOST_MQTT5_RECONNECT_OP_HPP
0010
0011 #include <boost/mqtt5/types.hpp>
0012
0013 #include <boost/mqtt5/detail/async_traits.hpp>
0014
0015 #include <boost/mqtt5/impl/connect_op.hpp>
0016
0017 #include <boost/asio/any_completion_handler.hpp>
0018 #include <boost/asio/associated_allocator.hpp>
0019 #include <boost/asio/associated_cancellation_slot.hpp>
0020 #include <boost/asio/associated_executor.hpp>
0021 #include <boost/asio/async_result.hpp>
0022 #include <boost/asio/deferred.hpp>
0023 #include <boost/asio/dispatch.hpp>
0024 #include <boost/asio/error.hpp>
0025 #include <boost/asio/experimental/parallel_group.hpp>
0026 #include <boost/asio/ip/tcp.hpp>
0027 #include <boost/asio/prepend.hpp>
0028 #include <boost/random/linear_congruential.hpp>
0029 #include <boost/random/uniform_smallint.hpp>
0030
0031 #include <array>
0032 #include <chrono>
0033 #include <memory>
0034 #include <string>
0035
0036 namespace boost::mqtt5::detail {
0037
0038 class exponential_backoff {
0039 int _curr_exp { 0 };
0040
0041 static constexpr int _base_mulptilier = 1000;
0042 static constexpr int _max_exp = 4;
0043
0044
0045 boost::random::rand48 _generator { uint32_t(std::time(0)) };
0046 boost::random::uniform_smallint<> _distribution { -500, 500 };
0047 public:
0048 exponential_backoff() = default;
0049
0050 duration generate() {
0051 int exponent = _curr_exp < _max_exp ? _curr_exp++ : _max_exp;
0052 int base = 1 << exponent;
0053 return std::chrono::milliseconds(
0054 base * _base_mulptilier + _distribution(_generator)
0055 );
0056 }
0057 };
0058
0059 namespace asio = boost::asio;
0060
0061 template <typename Owner>
0062 class reconnect_op {
0063 struct on_locked {};
0064 struct on_next_endpoint {};
0065 struct on_connect {};
0066 struct on_backoff {};
0067
0068 Owner& _owner;
0069
0070 using handler_type = asio::any_completion_handler<void (error_code)>;
0071 handler_type _handler;
0072
0073 std::unique_ptr<std::string> _buffer_ptr;
0074
0075 exponential_backoff _generator;
0076
0077 using endpoint = asio::ip::tcp::endpoint;
0078 using epoints = asio::ip::tcp::resolver::results_type;
0079
0080 public:
0081 template <typename Handler>
0082 reconnect_op(Owner& owner, Handler&& handler) :
0083 _owner(owner), _handler(std::move(handler))
0084 {}
0085
0086 reconnect_op(reconnect_op&&) = default;
0087 reconnect_op(const reconnect_op&) = delete;
0088
0089 reconnect_op& operator=(reconnect_op&&) = default;
0090 reconnect_op& operator=(const reconnect_op&) = delete;
0091
0092 using allocator_type = asio::associated_allocator_t<handler_type>;
0093 allocator_type get_allocator() const noexcept {
0094 return asio::get_associated_allocator(_handler);
0095 }
0096
0097 using cancellation_slot_type =
0098 asio::associated_cancellation_slot_t<handler_type>;
0099 cancellation_slot_type get_cancellation_slot() const noexcept {
0100 return asio::get_associated_cancellation_slot(_handler);
0101 }
0102
0103 using executor_type = asio::associated_executor_t<handler_type>;
0104 executor_type get_executor() const noexcept {
0105 return asio::get_associated_executor(_handler);
0106 }
0107
0108 void perform(typename Owner::stream_ptr s) {
0109 _owner._conn_mtx.lock(
0110 asio::prepend(std::move(*this), on_locked {}, s)
0111 );
0112 }
0113
0114 void operator()(on_locked, typename Owner::stream_ptr s, error_code ec) {
0115 if (ec == asio::error::operation_aborted)
0116
0117 return std::move(_handler)(ec);
0118
0119 if (!_owner.is_open())
0120 return complete(asio::error::operation_aborted);
0121
0122 if (s != _owner._stream_ptr)
0123 return complete(asio::error::try_again);
0124
0125 do_reconnect();
0126 }
0127
0128 void do_reconnect() {
0129 _owner._endpoints.async_next_endpoint(
0130 asio::prepend(std::move(*this), on_next_endpoint {})
0131 );
0132 }
0133
0134 void backoff_and_reconnect() {
0135 _owner._connect_timer.expires_after(_generator.generate());
0136 _owner._connect_timer.async_wait(
0137 asio::prepend(std::move(*this), on_backoff {})
0138 );
0139 }
0140
0141 void operator()(on_backoff, error_code ec) {
0142 if (ec == asio::error::operation_aborted || !_owner.is_open())
0143 return complete(asio::error::operation_aborted);
0144
0145 do_reconnect();
0146 }
0147
0148 void operator()(
0149 on_next_endpoint, error_code ec,
0150 epoints eps, authority_path ap
0151 ) {
0152
0153
0154
0155 if (ec == asio::error::operation_aborted || !_owner.is_open())
0156 return complete(asio::error::operation_aborted);
0157
0158 if (ec == asio::error::try_again)
0159 return backoff_and_reconnect();
0160
0161 if (ec == asio::error::host_not_found)
0162 return complete(asio::error::no_recovery);
0163
0164 connect(eps.cbegin(), std::move(ap));
0165 }
0166
0167 void connect(epoints::const_iterator eps, authority_path ap) {
0168 namespace asioex = boost::asio::experimental;
0169
0170 const auto& ep = eps->endpoint();
0171 auto sptr = _owner.construct_and_open_next_layer(ep.protocol());
0172
0173 if constexpr (has_tls_context<typename Owner::stream_context_type>)
0174 setup_tls_sni(
0175 ap, _owner._stream_context.tls_context(), *sptr
0176 );
0177
0178
0179 _owner._connect_timer.expires_after(std::chrono::seconds(5));
0180
0181 auto init_connect = [](
0182 auto handler, typename Owner::stream_type& stream,
0183 mqtt_ctx& context, log_invoke<typename Owner::logger_type>& log,
0184 endpoint ep, authority_path ap
0185 ) {
0186 connect_op { stream, context, log, std::move(handler) }
0187 .perform(ep, std::move(ap));
0188 };
0189
0190 auto timed_connect = asioex::make_parallel_group(
0191 asio::async_initiate<const asio::deferred_t, void(error_code)>(
0192 init_connect, asio::deferred, std::ref(*sptr),
0193 std::ref(_owner._stream_context.mqtt_context()),
0194 std::ref(_owner.log()),
0195 ep, ap
0196 ),
0197 _owner._connect_timer.async_wait(asio::deferred)
0198 );
0199
0200 timed_connect.async_wait(
0201 asioex::wait_for_one(),
0202 asio::prepend(
0203 std::move(*this), on_connect {},
0204 std::move(sptr), std::move(eps), std::move(ap)
0205 )
0206 );
0207 }
0208
0209 void operator()(
0210 on_connect,
0211 typename Owner::stream_ptr sptr, epoints::const_iterator eps, authority_path ap,
0212 std::array<std::size_t, 2> ord,
0213 error_code connect_ec, error_code timer_ec
0214 ) {
0215
0216
0217
0218
0219
0220
0221
0222 if (
0223 (ord[0] == 0 && connect_ec == asio::error::operation_aborted) ||
0224 (ord[0] == 1 && timer_ec == asio::error::operation_aborted) ||
0225 !_owner.is_open()
0226 )
0227 return complete(asio::error::operation_aborted);
0228
0229
0230 if (ord[0] == 1 || connect_ec) {
0231
0232 if (++eps != epoints::const_iterator())
0233 return connect(std::move(eps), std::move(ap));
0234
0235 return do_reconnect();
0236 }
0237
0238 _owner.replace_next_layer(std::move(sptr));
0239 complete(error_code {});
0240 }
0241
0242 private:
0243 void complete(error_code ec) {
0244 _owner._conn_mtx.unlock();
0245 std::move(_handler)(ec);
0246 }
0247 };
0248
0249
0250 }
0251
0252 #endif