Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-17 08:38:21

0001 //
0002 // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
0003 //
0004 // Distributed under the Boost Software License, Version 1.0.
0005 // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MQTT5_ENDPOINTS_HPP
0009 #define BOOST_MQTT5_ENDPOINTS_HPP
0010 
0011 #include <boost/mqtt5/types.hpp>
0012 
0013 #include <boost/mqtt5/detail/log_invoke.hpp>
0014 
0015 #include <boost/asio/append.hpp>
0016 #include <boost/asio/associated_allocator.hpp>
0017 #include <boost/asio/associated_cancellation_slot.hpp>
0018 #include <boost/asio/associated_executor.hpp>
0019 #include <boost/asio/async_result.hpp>
0020 #include <boost/asio/deferred.hpp>
0021 #include <boost/asio/error.hpp>
0022 #include <boost/asio/experimental/parallel_group.hpp>
0023 #include <boost/asio/ip/tcp.hpp>
0024 #include <boost/asio/post.hpp>
0025 #include <boost/asio/prepend.hpp>
0026 #include <boost/spirit/home/x3.hpp>
0027 
0028 #include <array>
0029 #include <chrono>
0030 #include <string>
0031 
0032 namespace boost::mqtt5::detail {
0033 
0034 namespace asio = boost::asio;
0035 
0036 using epoints = asio::ip::tcp::resolver::results_type;
0037 
0038 template <typename Owner, typename Handler>
0039 class resolve_op {
0040     struct on_resolve {};
0041 
0042     Owner& _owner;
0043 
0044     using handler_type = Handler;
0045     handler_type _handler;
0046 
0047 public:
0048     resolve_op(Owner& owner, Handler&& handler) :
0049         _owner(owner), _handler(std::move(handler))
0050     {}
0051 
0052     resolve_op(resolve_op&&) = default;
0053     resolve_op(const resolve_op&) = delete;
0054 
0055     resolve_op& operator=(resolve_op&&) = default;
0056     resolve_op& operator=(const resolve_op&) = delete;
0057 
0058     using allocator_type = asio::associated_allocator_t<handler_type>;
0059     allocator_type get_allocator() const noexcept {
0060         return asio::get_associated_allocator(_handler);
0061     }
0062 
0063     using cancellation_slot_type =
0064         asio::associated_cancellation_slot_t<handler_type>;
0065     cancellation_slot_type get_cancellation_slot() const noexcept {
0066         return asio::get_associated_cancellation_slot(_handler);
0067     }
0068 
0069     using executor_type = asio::associated_executor_t<handler_type>;
0070     executor_type get_executor() const noexcept {
0071         return asio::get_associated_executor(_handler);
0072     }
0073 
0074     void perform() {
0075         namespace asioex = boost::asio::experimental;
0076 
0077         if (_owner._servers.empty())
0078             return complete_post(asio::error::host_not_found, {}, {});
0079 
0080         _owner._current_host++;
0081 
0082         if (_owner._current_host + 1 > static_cast<int>(_owner._servers.size())) {
0083             _owner._current_host = -1;
0084             return complete_post(asio::error::try_again, {}, {});
0085         }
0086 
0087         authority_path ap = _owner._servers[_owner._current_host];
0088 
0089         _owner._connect_timer.expires_after(std::chrono::seconds(5));
0090 
0091         auto timed_resolve = asioex::make_parallel_group(
0092             _owner._resolver.async_resolve(ap.host, ap.port, asio::deferred),
0093             _owner._connect_timer.async_wait(asio::deferred)
0094         );
0095 
0096         timed_resolve.async_wait(
0097             asioex::wait_for_one(),
0098             asio::append(
0099                 asio::prepend(std::move(*this), on_resolve {}),
0100                 std::move(ap)
0101             )
0102         );
0103     }
0104 
0105     void operator()(
0106         on_resolve, std::array<std::size_t, 2> ord,
0107         error_code resolve_ec, epoints epts,
0108         error_code timer_ec, authority_path ap
0109     ) {
0110         if (
0111             (ord[0] == 0 && resolve_ec == asio::error::operation_aborted) ||
0112             (ord[0] == 1 && timer_ec == asio::error::operation_aborted)
0113         )
0114             return complete(asio::error::operation_aborted, {}, {});
0115 
0116         resolve_ec = timer_ec ? resolve_ec : asio::error::timed_out;
0117         _owner._log.at_resolve(resolve_ec, ap.host, ap.port, epts);
0118         if (!resolve_ec)
0119             return complete(error_code {}, std::move(epts), std::move(ap));
0120 
0121         perform();
0122     }
0123 
0124 private:
0125     void complete(error_code ec, epoints eps, authority_path ap) {
0126         std::move(_handler)(ec, std::move(eps), std::move(ap));
0127     }
0128 
0129     void complete_post(error_code ec, epoints eps, authority_path ap) {
0130         asio::post(
0131             _owner.get_executor(),
0132             asio::prepend(
0133                 std::move(_handler), ec,
0134                 std::move(eps), std::move(ap)
0135             )
0136         );
0137 
0138     }
0139 };
0140 
0141 
0142 template <typename LoggerType>
0143 class endpoints {
0144     using logger_type = LoggerType;
0145 
0146     asio::ip::tcp::resolver _resolver;
0147     asio::steady_timer& _connect_timer;
0148 
0149     std::vector<authority_path> _servers;
0150 
0151     int _current_host { -1 };
0152 
0153     log_invoke<logger_type>& _log;
0154 
0155     template <typename Owner, typename Handler>
0156     friend class resolve_op;
0157 
0158     template <typename T>
0159     static constexpr auto to_(T& arg) {
0160         return [&](auto& ctx) { arg = boost::spirit::x3::_attr(ctx); };
0161     }
0162 
0163     template <typename T, typename Parser>
0164     static constexpr auto as_(Parser&& p){
0165         return boost::spirit::x3::rule<struct _, T>{} = std::forward<Parser>(p);
0166     }
0167 
0168 public:
0169     template <typename Executor>
0170     endpoints(
0171         Executor ex, asio::steady_timer& timer,
0172         log_invoke<logger_type>& log
0173     ) :
0174         _resolver(std::move(ex)), _connect_timer(timer),
0175         _log(log)
0176     {}
0177 
0178     endpoints(const endpoints&) = delete;
0179     endpoints& operator=(const endpoints&) = delete;
0180 
0181     void clone_servers(const endpoints& other) {
0182         _servers = other._servers;
0183     }
0184 
0185     using executor_type = asio::ip::tcp::resolver::executor_type;
0186     // NOTE: asio::ip::basic_resolver returns executor by value
0187     executor_type get_executor() noexcept {
0188         return _resolver.get_executor();
0189     }
0190 
0191     template <typename CompletionToken>
0192     decltype(auto) async_next_endpoint(CompletionToken&& token) {
0193         using Signature = void (error_code, epoints, authority_path);
0194 
0195         auto initiation = [](auto handler, endpoints& self) {
0196             resolve_op { self, std::move(handler) }.perform();
0197         };
0198 
0199         return asio::async_initiate<CompletionToken, Signature>(
0200             initiation, token, std::ref(*this)
0201         );
0202     }
0203 
0204     void brokers(std::string hosts, uint16_t default_port) {
0205         namespace x3 = boost::spirit::x3;
0206 
0207         _servers.clear();
0208 
0209         std::string host, port, path;
0210 
0211         // loosely based on RFC 3986
0212         auto unreserved_ = x3::char_("-a-zA-Z_0-9._~");
0213         auto digit_ = x3::char_("0-9");
0214         auto separator_ = x3::char_(',');
0215 
0216         auto host_ = as_<std::string>(+unreserved_)[to_(host)];
0217         auto port_ = as_<std::string>(':' >> +digit_)[to_(port)];
0218         auto path_ = as_<std::string>(x3::char_('/') >> *unreserved_)[to_(path)];
0219         auto uri_ = *x3::omit[x3::space] >> (host_ >> *port_ >> *path_) >>
0220             (*x3::omit[x3::space] >> x3::omit[separator_ | x3::eoi]);
0221 
0222         for (auto b = hosts.begin(); b != hosts.end(); ) {
0223             host.clear(); port.clear(); path.clear();
0224             if (phrase_parse(b, hosts.end(), uri_, x3::eps(false))) {
0225                 _servers.push_back({
0226                     std::move(host),
0227                     port.empty()
0228                         ? std::to_string(default_port)
0229                         : std::move(port),
0230                     std::move(path)
0231                 });
0232             }
0233             else b = hosts.end();
0234         }
0235     }
0236 
0237 };
0238 
0239 
0240 } // end namespace boost::mqtt5::detail
0241 
0242 #endif // !BOOST_MQTT5_ENDPOINTS_HPP