File indexing completed on 2025-09-17 08:38:21
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MQTT5_CONNECT_OP_HPP
0009 #define BOOST_MQTT5_CONNECT_OP_HPP
0010
0011 #include <boost/mqtt5/error.hpp>
0012 #include <boost/mqtt5/reason_codes.hpp>
0013
0014 #include <boost/mqtt5/detail/async_traits.hpp>
0015 #include <boost/mqtt5/detail/control_packet.hpp>
0016 #include <boost/mqtt5/detail/internal_types.hpp>
0017 #include <boost/mqtt5/detail/log_invoke.hpp>
0018 #include <boost/mqtt5/detail/shutdown.hpp>
0019
0020 #include <boost/mqtt5/impl/codecs/message_decoders.hpp>
0021 #include <boost/mqtt5/impl/codecs/message_encoders.hpp>
0022
0023 #include <boost/asio/any_completion_handler.hpp>
0024 #include <boost/asio/append.hpp>
0025 #include <boost/asio/associated_allocator.hpp>
0026 #include <boost/asio/associated_cancellation_slot.hpp>
0027 #include <boost/asio/associated_executor.hpp>
0028 #include <boost/asio/cancellation_state.hpp>
0029 #include <boost/asio/completion_condition.hpp>
0030 #include <boost/asio/consign.hpp>
0031 #include <boost/asio/dispatch.hpp>
0032 #include <boost/asio/error.hpp>
0033 #include <boost/asio/ip/tcp.hpp>
0034 #include <boost/asio/prepend.hpp>
0035 #include <boost/asio/read.hpp>
0036 #include <boost/asio/write.hpp>
0037
0038 #include <cstdint>
0039 #include <memory>
0040 #include <string>
0041
0042 namespace boost::mqtt5::detail {
0043
0044 template <typename Stream, typename LoggerType>
0045 class connect_op {
0046 static constexpr size_t min_packet_sz = 5;
0047
0048 struct on_connect {};
0049 struct on_tls_handshake {};
0050 struct on_ws_handshake {};
0051 struct on_send_connect {};
0052 struct on_fixed_header {};
0053 struct on_read_packet {};
0054 struct on_init_auth_data {};
0055 struct on_auth_data {};
0056 struct on_send_auth {};
0057 struct on_complete_auth {};
0058 struct on_shutdown {};
0059
0060 Stream& _stream;
0061 mqtt_ctx& _ctx;
0062 log_invoke<LoggerType>& _log;
0063
0064 using handler_type = asio::any_completion_handler<void (error_code)>;
0065 handler_type _handler;
0066
0067 std::unique_ptr<std::string> _buffer_ptr;
0068 asio::cancellation_state _cancellation_state;
0069
0070 using endpoint = asio::ip::tcp::endpoint;
0071
0072 public:
0073 template <typename Handler>
0074 connect_op(
0075 Stream& stream, mqtt_ctx& ctx,
0076 log_invoke<LoggerType>& log,
0077 Handler&& handler
0078 ) :
0079 _stream(stream), _ctx(ctx), _log(log),
0080 _handler(std::forward<Handler>(handler)),
0081 _cancellation_state(
0082 asio::get_associated_cancellation_slot(_handler),
0083 asio::enable_total_cancellation {},
0084 asio::enable_total_cancellation {}
0085 )
0086 {}
0087
0088 connect_op(connect_op&&) = default;
0089 connect_op(const connect_op&) = delete;
0090
0091 connect_op& operator=(connect_op&&) = default;
0092 connect_op& operator=(const connect_op&) = delete;
0093
0094 using allocator_type = asio::associated_allocator_t<handler_type>;
0095 allocator_type get_allocator() const noexcept {
0096 return asio::get_associated_allocator(_handler);
0097 }
0098
0099 using cancellation_slot_type = asio::cancellation_slot;
0100 cancellation_slot_type get_cancellation_slot() const noexcept {
0101 return _cancellation_state.slot();
0102 }
0103
0104 using executor_type = asio::associated_executor_t<handler_type>;
0105 executor_type get_executor() const noexcept {
0106 return asio::get_associated_executor(_handler);
0107 }
0108
0109 void perform(const endpoint& ep, authority_path ap) {
0110 lowest_layer(_stream).async_connect(
0111 ep,
0112 asio::append(
0113 asio::prepend(std::move(*this), on_connect {}),
0114 ep, std::move(ap)
0115 )
0116 );
0117 }
0118
0119 void operator()(
0120 on_connect, error_code ec, endpoint ep, authority_path ap
0121 ) {
0122 if (is_cancelled())
0123 return complete(asio::error::operation_aborted);
0124
0125 _log.at_tcp_connect(ec, ep);
0126 if (ec)
0127 return complete(ec);
0128
0129 do_tls_handshake(std::move(ep), std::move(ap));
0130 }
0131
0132 void do_tls_handshake(endpoint ep, authority_path ap) {
0133 if constexpr (has_tls_handshake<Stream>) {
0134 _stream.async_handshake(
0135 tls_handshake_type<Stream>::client,
0136 asio::append(
0137 asio::prepend(std::move(*this), on_tls_handshake {}),
0138 std::move(ep), std::move(ap)
0139 )
0140 );
0141 }
0142 else if constexpr (
0143 has_tls_handshake<next_layer_type<Stream>>
0144 ) {
0145 _stream.next_layer().async_handshake(
0146 tls_handshake_type<next_layer_type<Stream>>::client,
0147 asio::append(
0148 asio::prepend(std::move(*this), on_tls_handshake {}),
0149 std::move(ep), std::move(ap)
0150 )
0151 );
0152 }
0153 else
0154 do_ws_handshake(std::move(ep), std::move(ap));
0155 }
0156
0157 void operator()(
0158 on_tls_handshake, error_code ec, endpoint ep, authority_path ap
0159 ) {
0160 if (is_cancelled())
0161 return complete(asio::error::operation_aborted);
0162
0163 _log.at_tls_handshake(ec, ep);
0164 if (ec)
0165 return complete(ec);
0166
0167 do_ws_handshake(std::move(ep), std::move(ap));
0168 }
0169
0170 void do_ws_handshake(endpoint ep, authority_path ap) {
0171 if constexpr (has_ws_handshake<Stream>)
0172
0173
0174 ws_handshake_traits<Stream>::async_handshake(
0175 _stream, std::move(ap),
0176 asio::append(
0177 asio::prepend(std::move(*this), on_ws_handshake {}), ep
0178 )
0179 );
0180 else
0181 (*this)(on_ws_handshake {}, error_code {}, ep);
0182 }
0183
0184 void operator()(on_ws_handshake, error_code ec, endpoint ep) {
0185 if (is_cancelled())
0186 return complete(asio::error::operation_aborted);
0187
0188 if constexpr (has_ws_handshake<Stream>)
0189 _log.at_ws_handshake(ec, ep);
0190
0191 if (ec)
0192 return complete(ec);
0193
0194 auto auth_method = _ctx.authenticator.method();
0195 if (!auth_method.empty()) {
0196 _ctx.co_props[prop::authentication_method] = auth_method;
0197 return _ctx.authenticator.async_auth(
0198 auth_step_e::client_initial, "",
0199 asio::prepend(std::move(*this), on_init_auth_data {})
0200 );
0201 }
0202
0203 send_connect();
0204 }
0205
0206 void operator()(on_init_auth_data, error_code ec, std::string data) {
0207 if (is_cancelled())
0208 return complete(asio::error::operation_aborted);
0209
0210 if (ec)
0211 return do_shutdown(asio::error::try_again);
0212
0213 _ctx.co_props[prop::authentication_data] = std::move(data);
0214 send_connect();
0215 }
0216
0217 void send_connect() {
0218 auto packet = control_packet<allocator_type>::of(
0219 no_pid, get_allocator(),
0220 encoders::encode_connect,
0221 _ctx.creds.client_id,
0222 _ctx.creds.username, _ctx.creds.password,
0223 _ctx.keep_alive, false, _ctx.co_props, _ctx.will_msg
0224 );
0225
0226 auto wire_data = packet.wire_data();
0227
0228 detail::async_write(
0229 _stream, asio::buffer(wire_data),
0230 asio::consign(
0231 asio::prepend(std::move(*this), on_send_connect {}),
0232 std::move(packet)
0233 )
0234 );
0235 }
0236
0237 void operator()(on_send_connect, error_code ec, size_t) {
0238 if (is_cancelled())
0239 return complete(asio::error::operation_aborted);
0240
0241 if (ec)
0242 return do_shutdown(ec);
0243
0244 _buffer_ptr = std::make_unique<std::string>(min_packet_sz, char(0));
0245
0246 auto buff = asio::buffer(_buffer_ptr->data(), min_packet_sz);
0247 asio::async_read(
0248 _stream, buff, asio::transfer_all(),
0249 asio::prepend(std::move(*this), on_fixed_header {})
0250 );
0251 }
0252
0253 void operator()(
0254 on_fixed_header, error_code ec, size_t num_read
0255 ) {
0256 if (is_cancelled())
0257 return complete(asio::error::operation_aborted);
0258
0259 if (ec)
0260 return do_shutdown(ec);
0261
0262 auto code = control_code_e((*_buffer_ptr)[0] & 0b11110000);
0263
0264 if (code != control_code_e::auth && code != control_code_e::connack)
0265 return do_shutdown(asio::error::try_again);
0266
0267 auto varlen_ptr = _buffer_ptr->cbegin() + 1;
0268 auto varlen = decoders::type_parse(
0269 varlen_ptr, _buffer_ptr->cend(), decoders::basic::varint_
0270 );
0271
0272 if (!varlen)
0273 return do_shutdown(asio::error::try_again);
0274
0275 auto varlen_sz = std::distance(_buffer_ptr->cbegin() + 1, varlen_ptr);
0276 auto remain_len = *varlen -
0277 std::distance(varlen_ptr, _buffer_ptr->cbegin() + num_read);
0278
0279 if (num_read + remain_len > _buffer_ptr->size())
0280 _buffer_ptr->resize(num_read + remain_len);
0281
0282 auto buff = asio::buffer(_buffer_ptr->data() + num_read, remain_len);
0283 auto first = _buffer_ptr->cbegin() + varlen_sz + 1;
0284 auto last = first + *varlen;
0285
0286 asio::async_read(
0287 _stream, buff, asio::transfer_all(),
0288 asio::prepend(
0289 asio::append(std::move(*this), code, first, last),
0290 on_read_packet {}
0291 )
0292 );
0293 }
0294
0295 void operator()(
0296 on_read_packet, error_code ec, size_t, control_code_e code,
0297 byte_citer first, byte_citer last
0298 ) {
0299 if (is_cancelled())
0300 return complete(asio::error::operation_aborted);
0301
0302 if (ec)
0303 return do_shutdown(ec);
0304
0305 if (code == control_code_e::connack)
0306 return on_connack(first, last);
0307
0308 if (!_ctx.co_props[prop::authentication_method].has_value())
0309 return do_shutdown(client::error::malformed_packet);
0310
0311 on_auth(first, last);
0312 }
0313
0314 void on_connack(byte_citer first, byte_citer last) {
0315 auto packet_length = static_cast<uint32_t>(std::distance(first, last));
0316 auto rv = decoders::decode_connack(packet_length, first);
0317 if (!rv.has_value())
0318 return do_shutdown(client::error::malformed_packet);
0319 const auto& [session_present, reason_code, ca_props] = *rv;
0320
0321 _ctx.ca_props = ca_props;
0322 _ctx.state.session_present(session_present);
0323
0324
0325
0326
0327
0328
0329
0330 auto rc = to_reason_code<reason_codes::category::connack>(reason_code);
0331 if (!rc.has_value())
0332 return do_shutdown(client::error::malformed_packet);
0333
0334 _log.at_connack(*rc, session_present, ca_props);
0335 if (*rc)
0336 return do_shutdown(asio::error::try_again);
0337
0338 if (_ctx.co_props[prop::authentication_method].has_value())
0339 return _ctx.authenticator.async_auth(
0340 auth_step_e::server_final,
0341 ca_props[prop::authentication_data].value_or(""),
0342 asio::prepend(std::move(*this), on_complete_auth {})
0343 );
0344
0345 complete(error_code {});
0346 }
0347
0348 void on_auth(byte_citer first, byte_citer last) {
0349 auto packet_length = static_cast<uint32_t>(std::distance(first, last));
0350 auto rv = decoders::decode_auth(packet_length, first);
0351 if (!rv.has_value())
0352 return do_shutdown(client::error::malformed_packet);
0353 const auto& [reason_code, auth_props] = *rv;
0354
0355 auto rc = to_reason_code<reason_codes::category::auth>(reason_code);
0356 if (
0357 !rc.has_value() ||
0358 auth_props[prop::authentication_method]
0359 != _ctx.co_props[prop::authentication_method]
0360 )
0361 return do_shutdown(client::error::malformed_packet);
0362
0363 _ctx.authenticator.async_auth(
0364 auth_step_e::server_challenge,
0365 auth_props[prop::authentication_data].value_or(""),
0366 asio::prepend(std::move(*this), on_auth_data {})
0367 );
0368 }
0369
0370 void operator()(on_auth_data, error_code ec, std::string data) {
0371 if (is_cancelled())
0372 return complete(asio::error::operation_aborted);
0373
0374 if (ec)
0375 return do_shutdown(asio::error::try_again);
0376
0377 auth_props props;
0378 props[prop::authentication_method] =
0379 _ctx.co_props[prop::authentication_method];
0380 props[prop::authentication_data] = std::move(data);
0381
0382 auto packet = control_packet<allocator_type>::of(
0383 no_pid, get_allocator(),
0384 encoders::encode_auth,
0385 reason_codes::continue_authentication.value(), props
0386 );
0387
0388 auto wire_data = packet.wire_data();
0389
0390 detail::async_write(
0391 _stream, asio::buffer(wire_data),
0392 asio::consign(
0393 asio::prepend(std::move(*this), on_send_auth {}),
0394 std::move(packet)
0395 )
0396 );
0397 }
0398
0399 void operator()(on_send_auth, error_code ec, size_t) {
0400 if (is_cancelled())
0401 return complete(asio::error::operation_aborted);
0402
0403 if (ec)
0404 return do_shutdown(ec);
0405
0406 auto buff = asio::buffer(_buffer_ptr->data(), min_packet_sz);
0407 asio::async_read(
0408 _stream, buff, asio::transfer_all(),
0409 asio::prepend(std::move(*this), on_fixed_header {})
0410 );
0411 }
0412
0413 void operator()(on_complete_auth, error_code ec, std::string) {
0414 if (is_cancelled())
0415 return complete(asio::error::operation_aborted);
0416
0417 if (ec)
0418 return do_shutdown(asio::error::try_again);
0419
0420 complete(error_code {});
0421 }
0422
0423 void do_shutdown(error_code connect_ec) {
0424 auto init_shutdown = [&stream = _stream](auto handler) {
0425 async_shutdown(stream, std::move(handler));
0426 };
0427 auto token = asio::prepend(std::move(*this), on_shutdown{}, connect_ec);
0428
0429 return asio::async_initiate<decltype(token), void(error_code)>(
0430 init_shutdown, token
0431 );
0432 }
0433
0434 void operator()(on_shutdown, error_code connect_ec, error_code) {
0435
0436 complete(connect_ec);
0437 }
0438
0439 private:
0440 bool is_cancelled() const {
0441 return _cancellation_state.cancelled() != asio::cancellation_type::none;
0442 }
0443
0444 void complete(error_code ec) {
0445 asio::get_associated_cancellation_slot(_handler).clear();
0446 std::move(_handler)(ec);
0447 }
0448 };
0449
0450
0451 }
0452
0453 #endif