File indexing completed on 2025-09-17 08:38:20
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MQTT5_AUTOCONNECT_STREAM_HPP
0009 #define BOOST_MQTT5_AUTOCONNECT_STREAM_HPP
0010
0011 #include <boost/mqtt5/detail/async_mutex.hpp>
0012 #include <boost/mqtt5/detail/async_traits.hpp>
0013 #include <boost/mqtt5/detail/log_invoke.hpp>
0014
0015 #include <boost/mqtt5/impl/endpoints.hpp>
0016 #include <boost/mqtt5/impl/read_op.hpp>
0017 #include <boost/mqtt5/impl/reconnect_op.hpp>
0018 #include <boost/mqtt5/impl/shutdown_op.hpp>
0019 #include <boost/mqtt5/impl/write_op.hpp>
0020
0021 #include <boost/asio/async_result.hpp>
0022 #include <boost/asio/ip/tcp.hpp>
0023 #include <boost/asio/steady_timer.hpp>
0024 #include <boost/system/error_code.hpp>
0025
0026 #include <cstdint>
0027 #include <memory>
0028 #include <string>
0029 #include <variant> // std::monostate
0030
0031 namespace boost::mqtt5::detail {
0032
0033 namespace asio = boost::asio;
0034 using error_code = boost::system::error_code;
0035
0036 template <
0037 typename StreamType,
0038 typename StreamContext = std::monostate,
0039 typename LoggerType = noop_logger
0040 >
0041 class autoconnect_stream {
0042 public:
0043 using self_type = autoconnect_stream<StreamType, StreamContext, LoggerType>;
0044 using stream_type = StreamType;
0045 using stream_context_type = StreamContext;
0046 using logger_type = LoggerType;
0047 using executor_type = typename stream_type::executor_type;
0048 private:
0049 using stream_ptr = std::shared_ptr<stream_type>;
0050
0051 executor_type _stream_executor;
0052 async_mutex _conn_mtx;
0053 asio::steady_timer _read_timer, _connect_timer;
0054 endpoints<logger_type> _endpoints;
0055
0056 stream_ptr _stream_ptr;
0057 stream_context_type& _stream_context;
0058
0059 log_invoke<logger_type>& _log;
0060
0061 template <typename Owner, typename Handler>
0062 friend class read_op;
0063
0064 template <typename Owner, typename Handler>
0065 friend class write_op;
0066
0067 template <typename Owner>
0068 friend class reconnect_op;
0069
0070 template <typename Owner>
0071 friend class shutdown_op;
0072
0073 public:
0074 autoconnect_stream(
0075 const executor_type& ex, stream_context_type& context,
0076 log_invoke<logger_type>& log
0077 ) :
0078 _stream_executor(ex),
0079 _conn_mtx(_stream_executor),
0080 _read_timer(_stream_executor), _connect_timer(_stream_executor),
0081 _endpoints(_stream_executor, _connect_timer, log),
0082 _stream_context(context),
0083 _log(log)
0084 {
0085 replace_next_layer(construct_next_layer());
0086 }
0087
0088 autoconnect_stream(const autoconnect_stream&) = delete;
0089 autoconnect_stream& operator=(const autoconnect_stream&) = delete;
0090
0091 using next_layer_type = stream_type;
0092 next_layer_type& next_layer() {
0093 return *_stream_ptr;
0094 }
0095
0096 const next_layer_type& next_layer() const {
0097 return *_stream_ptr;
0098 }
0099
0100 executor_type get_executor() const noexcept {
0101 return _stream_executor;
0102 }
0103
0104 void brokers(std::string hosts, uint16_t default_port) {
0105 _endpoints.brokers(std::move(hosts), default_port);
0106 }
0107
0108 void clone_endpoints(const autoconnect_stream& other) {
0109 _endpoints.clone_servers(other._endpoints);
0110 }
0111
0112 bool is_open() const noexcept {
0113 return lowest_layer(*_stream_ptr).is_open();
0114 }
0115
0116 void open() {
0117 open_lowest_layer(_stream_ptr, asio::ip::tcp::v4());
0118 }
0119
0120 void cancel() {
0121 _conn_mtx.cancel();
0122 _connect_timer.cancel();
0123 }
0124
0125 void close() {
0126 error_code ec;
0127 lowest_layer(*_stream_ptr).close(ec);
0128 }
0129
0130 template <typename CompletionToken>
0131 void async_shutdown(CompletionToken&& token) {
0132 using Signature = void (error_code);
0133
0134 auto initiation = [](auto handler, self_type& self) {
0135 shutdown_op { self, std::move(handler) }.perform();
0136 };
0137
0138 return asio::async_initiate<CompletionToken, Signature>(
0139 initiation, token, std::ref(*this)
0140 );
0141 }
0142
0143 bool was_connected() const {
0144 error_code ec;
0145 lowest_layer(*_stream_ptr).remote_endpoint(ec);
0146 return ec == boost::system::errc::success;
0147 }
0148
0149 template <typename BufferType, typename CompletionToken>
0150 decltype(auto) async_read_some(
0151 const BufferType& buffer, duration wait_for, CompletionToken&& token
0152 ) {
0153 using Signature = void (error_code, size_t);
0154
0155 auto initiation = [](
0156 auto handler, self_type& self,
0157 const BufferType& buffer, duration wait_for
0158 ) {
0159 read_op { self, std::move(handler) }.perform(buffer, wait_for);
0160 };
0161
0162 return asio::async_initiate<CompletionToken, Signature>(
0163 initiation, token, std::ref(*this), buffer, wait_for
0164 );
0165 }
0166
0167 template <typename BufferType, typename CompletionToken>
0168 decltype(auto) async_write(
0169 const BufferType& buffer, CompletionToken&& token
0170 ) {
0171 using Signature = void (error_code, size_t);
0172
0173 auto initiation = [](
0174 auto handler, self_type& self, const BufferType& buffer
0175 ) {
0176 write_op { self, std::move(handler) }.perform(buffer);
0177 };
0178
0179 return asio::async_initiate<CompletionToken, Signature>(
0180 initiation, token, std::ref(*this), buffer
0181 );
0182 }
0183
0184 private:
0185
0186 log_invoke<logger_type>& log() {
0187 return _log;
0188 }
0189
0190 static void open_lowest_layer(const stream_ptr& sptr, asio::ip::tcp protocol) {
0191 error_code ec;
0192 auto& layer = lowest_layer(*sptr);
0193 layer.open(protocol, ec);
0194 layer.set_option(asio::socket_base::reuse_address(true), ec);
0195 layer.set_option(asio::ip::tcp::no_delay(true), ec);
0196 }
0197
0198 stream_ptr construct_next_layer() const {
0199 stream_ptr sptr;
0200 if constexpr (has_tls_context<StreamContext>)
0201 sptr = std::make_shared<stream_type>(
0202 _stream_executor, _stream_context.tls_context()
0203 );
0204 else
0205 sptr = std::make_shared<stream_type>(_stream_executor);
0206
0207 return sptr;
0208 }
0209
0210 stream_ptr construct_and_open_next_layer(asio::ip::tcp protocol) const {
0211 auto sptr = construct_next_layer();
0212 open_lowest_layer(sptr, protocol);
0213 return sptr;
0214 }
0215
0216 void replace_next_layer(stream_ptr sptr) {
0217
0218
0219
0220
0221
0222
0223 if (_stream_ptr)
0224 close();
0225 std::exchange(_stream_ptr, std::move(sptr));
0226 }
0227
0228 template <typename CompletionToken>
0229 decltype(auto) async_reconnect(stream_ptr s, CompletionToken&& token) {
0230 using Signature = void (error_code);
0231
0232 auto initiation = [](auto handler, self_type& self, stream_ptr s) {
0233 reconnect_op { self, std::move(handler) }.perform(std::move(s));
0234 };
0235
0236 return asio::async_initiate<CompletionToken, Signature>(
0237 initiation, token, std::ref(*this), std::move(s)
0238 );
0239 }
0240 };
0241
0242
0243 }
0244
0245 #endif