Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-17 08:38:20

0001 //
0002 // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
0003 //
0004 // Distributed under the Boost Software License, Version 1.0.
0005 // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MQTT5_AUTOCONNECT_STREAM_HPP
0009 #define BOOST_MQTT5_AUTOCONNECT_STREAM_HPP
0010 
0011 #include <boost/mqtt5/detail/async_mutex.hpp>
0012 #include <boost/mqtt5/detail/async_traits.hpp>
0013 #include <boost/mqtt5/detail/log_invoke.hpp>
0014 
0015 #include <boost/mqtt5/impl/endpoints.hpp>
0016 #include <boost/mqtt5/impl/read_op.hpp>
0017 #include <boost/mqtt5/impl/reconnect_op.hpp>
0018 #include <boost/mqtt5/impl/shutdown_op.hpp>
0019 #include <boost/mqtt5/impl/write_op.hpp>
0020 
0021 #include <boost/asio/async_result.hpp>
0022 #include <boost/asio/ip/tcp.hpp>
0023 #include <boost/asio/steady_timer.hpp>
0024 #include <boost/system/error_code.hpp>
0025 
0026 #include <cstdint>
0027 #include <memory>
0028 #include <string>
0029 #include <variant> // std::monostate
0030 
0031 namespace boost::mqtt5::detail {
0032 
0033 namespace asio = boost::asio;
0034 using error_code = boost::system::error_code;
0035 
0036 template <
0037     typename StreamType,
0038     typename StreamContext = std::monostate,
0039     typename LoggerType = noop_logger
0040 >
0041 class autoconnect_stream {
0042 public:
0043     using self_type = autoconnect_stream<StreamType, StreamContext, LoggerType>;
0044     using stream_type = StreamType;
0045     using stream_context_type = StreamContext;
0046     using logger_type = LoggerType;
0047     using executor_type = typename stream_type::executor_type;
0048 private:
0049     using stream_ptr = std::shared_ptr<stream_type>;
0050 
0051     executor_type _stream_executor;
0052     async_mutex _conn_mtx;
0053     asio::steady_timer _read_timer, _connect_timer;
0054     endpoints<logger_type> _endpoints;
0055 
0056     stream_ptr _stream_ptr;
0057     stream_context_type& _stream_context;
0058 
0059     log_invoke<logger_type>& _log;
0060 
0061     template <typename Owner, typename Handler>
0062     friend class read_op;
0063 
0064     template <typename Owner, typename Handler>
0065     friend class write_op;
0066 
0067     template <typename Owner>
0068     friend class reconnect_op;
0069 
0070     template <typename Owner>
0071     friend class shutdown_op;
0072 
0073 public:
0074     autoconnect_stream(
0075         const executor_type& ex, stream_context_type& context,
0076         log_invoke<logger_type>& log
0077     ) :
0078         _stream_executor(ex),
0079         _conn_mtx(_stream_executor),
0080         _read_timer(_stream_executor), _connect_timer(_stream_executor),
0081         _endpoints(_stream_executor, _connect_timer, log),
0082         _stream_context(context),
0083         _log(log)
0084     {
0085         replace_next_layer(construct_next_layer());
0086     }
0087 
0088     autoconnect_stream(const autoconnect_stream&) = delete;
0089     autoconnect_stream& operator=(const autoconnect_stream&) = delete;
0090 
0091     using next_layer_type = stream_type;
0092     next_layer_type& next_layer() {
0093         return *_stream_ptr;
0094     }
0095 
0096     const next_layer_type& next_layer() const {
0097         return *_stream_ptr;
0098     }
0099 
0100     executor_type get_executor() const noexcept {
0101         return _stream_executor;
0102     }
0103 
0104     void brokers(std::string hosts, uint16_t default_port) {
0105         _endpoints.brokers(std::move(hosts), default_port);
0106     }
0107 
0108     void clone_endpoints(const autoconnect_stream& other) {
0109         _endpoints.clone_servers(other._endpoints);
0110     }
0111 
0112     bool is_open() const noexcept {
0113         return lowest_layer(*_stream_ptr).is_open();
0114     }
0115 
0116     void open() {
0117         open_lowest_layer(_stream_ptr, asio::ip::tcp::v4());
0118     }
0119 
0120     void cancel() {
0121         _conn_mtx.cancel();
0122         _connect_timer.cancel();
0123     }
0124 
0125     void close() {
0126         error_code ec;
0127         lowest_layer(*_stream_ptr).close(ec);
0128     }
0129 
0130     template <typename CompletionToken>
0131     void async_shutdown(CompletionToken&& token) {
0132         using Signature = void (error_code);
0133 
0134         auto initiation = [](auto handler, self_type& self) {
0135             shutdown_op { self, std::move(handler) }.perform();
0136         };
0137 
0138         return asio::async_initiate<CompletionToken, Signature>(
0139             initiation, token, std::ref(*this)
0140         );
0141     }
0142 
0143     bool was_connected() const {
0144         error_code ec;
0145         lowest_layer(*_stream_ptr).remote_endpoint(ec);
0146         return ec == boost::system::errc::success;
0147     }
0148 
0149     template <typename BufferType, typename CompletionToken>
0150     decltype(auto) async_read_some(
0151         const BufferType& buffer, duration wait_for, CompletionToken&& token
0152     ) {
0153         using Signature = void (error_code, size_t);
0154 
0155         auto initiation = [](
0156             auto handler, self_type& self,
0157             const BufferType& buffer, duration wait_for
0158         ) {
0159             read_op { self, std::move(handler) }.perform(buffer, wait_for);
0160         };
0161 
0162         return asio::async_initiate<CompletionToken, Signature>(
0163             initiation, token, std::ref(*this), buffer, wait_for
0164         );
0165     }
0166 
0167     template <typename BufferType, typename CompletionToken>
0168     decltype(auto) async_write(
0169         const BufferType& buffer, CompletionToken&& token
0170     ) {
0171         using Signature = void (error_code, size_t);
0172 
0173         auto initiation = [](
0174             auto handler, self_type& self, const BufferType& buffer
0175         ) {
0176             write_op { self, std::move(handler) }.perform(buffer);
0177         };
0178 
0179         return asio::async_initiate<CompletionToken, Signature>(
0180             initiation, token, std::ref(*this), buffer
0181         );
0182     }
0183 
0184 private:
0185 
0186     log_invoke<logger_type>& log() {
0187         return _log;
0188     }
0189 
0190     static void open_lowest_layer(const stream_ptr& sptr, asio::ip::tcp protocol) {
0191         error_code ec;
0192         auto& layer = lowest_layer(*sptr);
0193         layer.open(protocol, ec);
0194         layer.set_option(asio::socket_base::reuse_address(true), ec);
0195         layer.set_option(asio::ip::tcp::no_delay(true), ec);
0196     }
0197 
0198     stream_ptr construct_next_layer() const {
0199         stream_ptr sptr;
0200         if constexpr (has_tls_context<StreamContext>)
0201             sptr = std::make_shared<stream_type>(
0202                 _stream_executor, _stream_context.tls_context()
0203             );
0204         else
0205             sptr = std::make_shared<stream_type>(_stream_executor);
0206 
0207         return sptr;
0208     }
0209 
0210     stream_ptr construct_and_open_next_layer(asio::ip::tcp protocol) const {
0211         auto sptr = construct_next_layer();
0212         open_lowest_layer(sptr, protocol);
0213         return sptr;
0214     }
0215 
0216     void replace_next_layer(stream_ptr sptr) {
0217         // close() will cancel all outstanding async operations on
0218         // _stream_ptr; cancelling posts operation_aborted to handlers
0219         // but handlers will be executed after std::exchange below;
0220         // handlers should therefore treat (operation_aborted && is_open())
0221         // equivalent to try_again.
0222 
0223         if (_stream_ptr)
0224             close();
0225         std::exchange(_stream_ptr, std::move(sptr));
0226     }
0227 
0228     template <typename CompletionToken>
0229     decltype(auto) async_reconnect(stream_ptr s, CompletionToken&& token) {
0230         using Signature = void (error_code);
0231 
0232         auto initiation = [](auto handler, self_type& self, stream_ptr s) {
0233             reconnect_op { self, std::move(handler) }.perform(std::move(s));
0234         };
0235 
0236         return asio::async_initiate<CompletionToken, Signature>(
0237             initiation, token, std::ref(*this), std::move(s)
0238         );
0239     }
0240 };
0241 
0242 
0243 } // end namespace boost::mqtt5::detail
0244 
0245 #endif // !BOOST_MQTT5_AUTOCONNECT_STREAM_HPP