File indexing completed on 2025-09-17 08:38:21
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MQTT5_ENDPOINTS_HPP
0009 #define BOOST_MQTT5_ENDPOINTS_HPP
0010
0011 #include <boost/mqtt5/types.hpp>
0012
0013 #include <boost/mqtt5/detail/log_invoke.hpp>
0014
0015 #include <boost/asio/append.hpp>
0016 #include <boost/asio/associated_allocator.hpp>
0017 #include <boost/asio/associated_cancellation_slot.hpp>
0018 #include <boost/asio/associated_executor.hpp>
0019 #include <boost/asio/async_result.hpp>
0020 #include <boost/asio/deferred.hpp>
0021 #include <boost/asio/error.hpp>
0022 #include <boost/asio/experimental/parallel_group.hpp>
0023 #include <boost/asio/ip/tcp.hpp>
0024 #include <boost/asio/post.hpp>
0025 #include <boost/asio/prepend.hpp>
0026 #include <boost/spirit/home/x3.hpp>
0027
0028 #include <array>
0029 #include <chrono>
0030 #include <string>
0031
0032 namespace boost::mqtt5::detail {
0033
0034 namespace asio = boost::asio;
0035
0036 using epoints = asio::ip::tcp::resolver::results_type;
0037
0038 template <typename Owner, typename Handler>
0039 class resolve_op {
0040 struct on_resolve {};
0041
0042 Owner& _owner;
0043
0044 using handler_type = Handler;
0045 handler_type _handler;
0046
0047 public:
0048 resolve_op(Owner& owner, Handler&& handler) :
0049 _owner(owner), _handler(std::move(handler))
0050 {}
0051
0052 resolve_op(resolve_op&&) = default;
0053 resolve_op(const resolve_op&) = delete;
0054
0055 resolve_op& operator=(resolve_op&&) = default;
0056 resolve_op& operator=(const resolve_op&) = delete;
0057
0058 using allocator_type = asio::associated_allocator_t<handler_type>;
0059 allocator_type get_allocator() const noexcept {
0060 return asio::get_associated_allocator(_handler);
0061 }
0062
0063 using cancellation_slot_type =
0064 asio::associated_cancellation_slot_t<handler_type>;
0065 cancellation_slot_type get_cancellation_slot() const noexcept {
0066 return asio::get_associated_cancellation_slot(_handler);
0067 }
0068
0069 using executor_type = asio::associated_executor_t<handler_type>;
0070 executor_type get_executor() const noexcept {
0071 return asio::get_associated_executor(_handler);
0072 }
0073
0074 void perform() {
0075 namespace asioex = boost::asio::experimental;
0076
0077 if (_owner._servers.empty())
0078 return complete_post(asio::error::host_not_found, {}, {});
0079
0080 _owner._current_host++;
0081
0082 if (_owner._current_host + 1 > static_cast<int>(_owner._servers.size())) {
0083 _owner._current_host = -1;
0084 return complete_post(asio::error::try_again, {}, {});
0085 }
0086
0087 authority_path ap = _owner._servers[_owner._current_host];
0088
0089 _owner._connect_timer.expires_after(std::chrono::seconds(5));
0090
0091 auto timed_resolve = asioex::make_parallel_group(
0092 _owner._resolver.async_resolve(ap.host, ap.port, asio::deferred),
0093 _owner._connect_timer.async_wait(asio::deferred)
0094 );
0095
0096 timed_resolve.async_wait(
0097 asioex::wait_for_one(),
0098 asio::append(
0099 asio::prepend(std::move(*this), on_resolve {}),
0100 std::move(ap)
0101 )
0102 );
0103 }
0104
0105 void operator()(
0106 on_resolve, std::array<std::size_t, 2> ord,
0107 error_code resolve_ec, epoints epts,
0108 error_code timer_ec, authority_path ap
0109 ) {
0110 if (
0111 (ord[0] == 0 && resolve_ec == asio::error::operation_aborted) ||
0112 (ord[0] == 1 && timer_ec == asio::error::operation_aborted)
0113 )
0114 return complete(asio::error::operation_aborted, {}, {});
0115
0116 resolve_ec = timer_ec ? resolve_ec : asio::error::timed_out;
0117 _owner._log.at_resolve(resolve_ec, ap.host, ap.port, epts);
0118 if (!resolve_ec)
0119 return complete(error_code {}, std::move(epts), std::move(ap));
0120
0121 perform();
0122 }
0123
0124 private:
0125 void complete(error_code ec, epoints eps, authority_path ap) {
0126 std::move(_handler)(ec, std::move(eps), std::move(ap));
0127 }
0128
0129 void complete_post(error_code ec, epoints eps, authority_path ap) {
0130 asio::post(
0131 _owner.get_executor(),
0132 asio::prepend(
0133 std::move(_handler), ec,
0134 std::move(eps), std::move(ap)
0135 )
0136 );
0137
0138 }
0139 };
0140
0141
0142 template <typename LoggerType>
0143 class endpoints {
0144 using logger_type = LoggerType;
0145
0146 asio::ip::tcp::resolver _resolver;
0147 asio::steady_timer& _connect_timer;
0148
0149 std::vector<authority_path> _servers;
0150
0151 int _current_host { -1 };
0152
0153 log_invoke<logger_type>& _log;
0154
0155 template <typename Owner, typename Handler>
0156 friend class resolve_op;
0157
0158 template <typename T>
0159 static constexpr auto to_(T& arg) {
0160 return [&](auto& ctx) { arg = boost::spirit::x3::_attr(ctx); };
0161 }
0162
0163 template <typename T, typename Parser>
0164 static constexpr auto as_(Parser&& p){
0165 return boost::spirit::x3::rule<struct _, T>{} = std::forward<Parser>(p);
0166 }
0167
0168 public:
0169 template <typename Executor>
0170 endpoints(
0171 Executor ex, asio::steady_timer& timer,
0172 log_invoke<logger_type>& log
0173 ) :
0174 _resolver(std::move(ex)), _connect_timer(timer),
0175 _log(log)
0176 {}
0177
0178 endpoints(const endpoints&) = delete;
0179 endpoints& operator=(const endpoints&) = delete;
0180
0181 void clone_servers(const endpoints& other) {
0182 _servers = other._servers;
0183 }
0184
0185 using executor_type = asio::ip::tcp::resolver::executor_type;
0186
0187 executor_type get_executor() noexcept {
0188 return _resolver.get_executor();
0189 }
0190
0191 template <typename CompletionToken>
0192 decltype(auto) async_next_endpoint(CompletionToken&& token) {
0193 using Signature = void (error_code, epoints, authority_path);
0194
0195 auto initiation = [](auto handler, endpoints& self) {
0196 resolve_op { self, std::move(handler) }.perform();
0197 };
0198
0199 return asio::async_initiate<CompletionToken, Signature>(
0200 initiation, token, std::ref(*this)
0201 );
0202 }
0203
0204 void brokers(std::string hosts, uint16_t default_port) {
0205 namespace x3 = boost::spirit::x3;
0206
0207 _servers.clear();
0208
0209 std::string host, port, path;
0210
0211
0212 auto unreserved_ = x3::char_("-a-zA-Z_0-9._~");
0213 auto digit_ = x3::char_("0-9");
0214 auto separator_ = x3::char_(',');
0215
0216 auto host_ = as_<std::string>(+unreserved_)[to_(host)];
0217 auto port_ = as_<std::string>(':' >> +digit_)[to_(port)];
0218 auto path_ = as_<std::string>(x3::char_('/') >> *unreserved_)[to_(path)];
0219 auto uri_ = *x3::omit[x3::space] >> (host_ >> *port_ >> *path_) >>
0220 (*x3::omit[x3::space] >> x3::omit[separator_ | x3::eoi]);
0221
0222 for (auto b = hosts.begin(); b != hosts.end(); ) {
0223 host.clear(); port.clear(); path.clear();
0224 if (phrase_parse(b, hosts.end(), uri_, x3::eps(false))) {
0225 _servers.push_back({
0226 std::move(host),
0227 port.empty()
0228 ? std::to_string(default_port)
0229 : std::move(port),
0230 std::move(path)
0231 });
0232 }
0233 else b = hosts.end();
0234 }
0235 }
0236
0237 };
0238
0239
0240 }
0241
0242 #endif