Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-17 08:38:21

0001 //
0002 // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
0003 //
0004 // Distributed under the Boost Software License, Version 1.0.
0005 // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MQTT5_CONNECT_OP_HPP
0009 #define BOOST_MQTT5_CONNECT_OP_HPP
0010 
0011 #include <boost/mqtt5/error.hpp>
0012 #include <boost/mqtt5/reason_codes.hpp>
0013 
0014 #include <boost/mqtt5/detail/async_traits.hpp>
0015 #include <boost/mqtt5/detail/control_packet.hpp>
0016 #include <boost/mqtt5/detail/internal_types.hpp>
0017 #include <boost/mqtt5/detail/log_invoke.hpp>
0018 #include <boost/mqtt5/detail/shutdown.hpp>
0019 
0020 #include <boost/mqtt5/impl/codecs/message_decoders.hpp>
0021 #include <boost/mqtt5/impl/codecs/message_encoders.hpp>
0022 
0023 #include <boost/asio/any_completion_handler.hpp>
0024 #include <boost/asio/append.hpp>
0025 #include <boost/asio/associated_allocator.hpp>
0026 #include <boost/asio/associated_cancellation_slot.hpp>
0027 #include <boost/asio/associated_executor.hpp>
0028 #include <boost/asio/cancellation_state.hpp>
0029 #include <boost/asio/completion_condition.hpp>
0030 #include <boost/asio/consign.hpp>
0031 #include <boost/asio/dispatch.hpp>
0032 #include <boost/asio/error.hpp>
0033 #include <boost/asio/ip/tcp.hpp>
0034 #include <boost/asio/prepend.hpp>
0035 #include <boost/asio/read.hpp>
0036 #include <boost/asio/write.hpp>
0037 
0038 #include <cstdint>
0039 #include <memory>
0040 #include <string>
0041 
0042 namespace boost::mqtt5::detail {
0043 
0044 template <typename Stream, typename LoggerType>
0045 class connect_op {
0046     static constexpr size_t min_packet_sz = 5;
0047 
0048     struct on_connect {};
0049     struct on_tls_handshake {};
0050     struct on_ws_handshake {};
0051     struct on_send_connect {};
0052     struct on_fixed_header {};
0053     struct on_read_packet {};
0054     struct on_init_auth_data {};
0055     struct on_auth_data {};
0056     struct on_send_auth {};
0057     struct on_complete_auth {};
0058     struct on_shutdown {};
0059 
0060     Stream& _stream;
0061     mqtt_ctx& _ctx;
0062     log_invoke<LoggerType>& _log;
0063 
0064     using handler_type = asio::any_completion_handler<void (error_code)>;
0065     handler_type _handler;
0066 
0067     std::unique_ptr<std::string> _buffer_ptr;
0068     asio::cancellation_state _cancellation_state;
0069 
0070     using endpoint = asio::ip::tcp::endpoint;
0071 
0072 public:
0073     template <typename Handler>
0074     connect_op(
0075         Stream& stream, mqtt_ctx& ctx,
0076         log_invoke<LoggerType>& log,
0077         Handler&& handler
0078     ) :
0079         _stream(stream), _ctx(ctx), _log(log),
0080         _handler(std::forward<Handler>(handler)),
0081         _cancellation_state(
0082             asio::get_associated_cancellation_slot(_handler),
0083             asio::enable_total_cancellation {},
0084             asio::enable_total_cancellation {}
0085         )
0086     {}
0087 
0088     connect_op(connect_op&&) = default;
0089     connect_op(const connect_op&) = delete;
0090 
0091     connect_op& operator=(connect_op&&) = default;
0092     connect_op& operator=(const connect_op&) = delete;
0093 
0094     using allocator_type = asio::associated_allocator_t<handler_type>;
0095     allocator_type get_allocator() const noexcept {
0096         return asio::get_associated_allocator(_handler);
0097     }
0098 
0099     using cancellation_slot_type = asio::cancellation_slot;
0100     cancellation_slot_type get_cancellation_slot() const noexcept {
0101         return _cancellation_state.slot();
0102     }
0103 
0104     using executor_type = asio::associated_executor_t<handler_type>;
0105     executor_type get_executor() const noexcept {
0106         return asio::get_associated_executor(_handler);
0107     }
0108 
0109     void perform(const endpoint& ep, authority_path ap) {
0110         lowest_layer(_stream).async_connect(
0111             ep,
0112             asio::append(
0113                 asio::prepend(std::move(*this), on_connect {}),
0114                 ep, std::move(ap)
0115             )
0116         );
0117     }
0118 
0119     void operator()(
0120         on_connect, error_code ec, endpoint ep, authority_path ap
0121     ) {
0122         if (is_cancelled())
0123             return complete(asio::error::operation_aborted);
0124 
0125         _log.at_tcp_connect(ec, ep);
0126         if (ec)
0127             return complete(ec);
0128 
0129         do_tls_handshake(std::move(ep), std::move(ap));
0130     }
0131 
0132     void do_tls_handshake(endpoint ep, authority_path ap) {
0133         if constexpr (has_tls_handshake<Stream>) {
0134             _stream.async_handshake(
0135                 tls_handshake_type<Stream>::client,
0136                 asio::append(
0137                     asio::prepend(std::move(*this), on_tls_handshake {}),
0138                     std::move(ep), std::move(ap)
0139                 )
0140             );
0141         }
0142         else if constexpr (
0143             has_tls_handshake<next_layer_type<Stream>>
0144         ) {
0145             _stream.next_layer().async_handshake(
0146                 tls_handshake_type<next_layer_type<Stream>>::client,
0147                 asio::append(
0148                     asio::prepend(std::move(*this), on_tls_handshake {}),
0149                     std::move(ep), std::move(ap)
0150                 )
0151             );
0152         }
0153         else
0154             do_ws_handshake(std::move(ep), std::move(ap));
0155     }
0156 
0157     void operator()(
0158         on_tls_handshake, error_code ec, endpoint ep, authority_path ap
0159     ) {
0160         if (is_cancelled())
0161             return complete(asio::error::operation_aborted);
0162 
0163         _log.at_tls_handshake(ec, ep);
0164         if (ec)
0165             return complete(ec);
0166 
0167         do_ws_handshake(std::move(ep), std::move(ap));
0168     }
0169 
0170     void do_ws_handshake(endpoint ep, authority_path ap) {
0171         if constexpr (has_ws_handshake<Stream>)
0172             // If you get a compilation error here,
0173             // it might be because of a missing <boost/mqtt5/websocket.hpp> include
0174             ws_handshake_traits<Stream>::async_handshake(
0175                 _stream, std::move(ap),
0176                 asio::append(
0177                     asio::prepend(std::move(*this), on_ws_handshake {}), ep
0178                 )
0179             );
0180         else
0181             (*this)(on_ws_handshake {}, error_code {}, ep);
0182     }
0183 
0184     void operator()(on_ws_handshake, error_code ec, endpoint ep) {
0185         if (is_cancelled())
0186             return complete(asio::error::operation_aborted);
0187 
0188         if constexpr (has_ws_handshake<Stream>)
0189             _log.at_ws_handshake(ec, ep);
0190 
0191         if (ec)
0192             return complete(ec);
0193 
0194         auto auth_method = _ctx.authenticator.method();
0195         if (!auth_method.empty()) {
0196             _ctx.co_props[prop::authentication_method] = auth_method;
0197             return _ctx.authenticator.async_auth(
0198                 auth_step_e::client_initial, "",
0199                 asio::prepend(std::move(*this), on_init_auth_data {})
0200             );
0201         }
0202 
0203         send_connect();
0204     }
0205 
0206     void operator()(on_init_auth_data, error_code ec, std::string data) {
0207         if (is_cancelled())
0208             return complete(asio::error::operation_aborted);
0209 
0210         if (ec)
0211             return do_shutdown(asio::error::try_again);
0212 
0213         _ctx.co_props[prop::authentication_data] = std::move(data);
0214         send_connect();
0215     }
0216 
0217     void send_connect() {
0218         auto packet = control_packet<allocator_type>::of(
0219             no_pid, get_allocator(),
0220             encoders::encode_connect,
0221             _ctx.creds.client_id,
0222             _ctx.creds.username, _ctx.creds.password,
0223             _ctx.keep_alive, false, _ctx.co_props, _ctx.will_msg
0224         );
0225 
0226         auto wire_data = packet.wire_data();
0227 
0228         detail::async_write(
0229             _stream, asio::buffer(wire_data),
0230             asio::consign(
0231                 asio::prepend(std::move(*this), on_send_connect {}),
0232                 std::move(packet)
0233             )
0234         );
0235     }
0236 
0237     void operator()(on_send_connect, error_code ec, size_t) {
0238         if (is_cancelled())
0239             return complete(asio::error::operation_aborted);
0240 
0241         if (ec)
0242             return do_shutdown(ec);
0243 
0244         _buffer_ptr = std::make_unique<std::string>(min_packet_sz, char(0));
0245 
0246         auto buff = asio::buffer(_buffer_ptr->data(), min_packet_sz);
0247         asio::async_read(
0248             _stream, buff, asio::transfer_all(),
0249             asio::prepend(std::move(*this), on_fixed_header {})
0250         );
0251     }
0252 
0253     void operator()(
0254         on_fixed_header, error_code ec, size_t num_read
0255     ) {
0256         if (is_cancelled())
0257             return complete(asio::error::operation_aborted);
0258 
0259         if (ec)
0260             return do_shutdown(ec);
0261 
0262         auto code = control_code_e((*_buffer_ptr)[0] & 0b11110000);
0263 
0264         if (code != control_code_e::auth && code != control_code_e::connack)
0265             return do_shutdown(asio::error::try_again);
0266 
0267         auto varlen_ptr = _buffer_ptr->cbegin() + 1;
0268         auto varlen = decoders::type_parse(
0269             varlen_ptr, _buffer_ptr->cend(), decoders::basic::varint_
0270         );
0271 
0272         if (!varlen)
0273             return do_shutdown(asio::error::try_again);
0274 
0275         auto varlen_sz = std::distance(_buffer_ptr->cbegin() + 1, varlen_ptr);
0276         auto remain_len = *varlen -
0277             std::distance(varlen_ptr, _buffer_ptr->cbegin() + num_read);
0278 
0279         if (num_read + remain_len > _buffer_ptr->size())
0280             _buffer_ptr->resize(num_read + remain_len);
0281 
0282         auto buff = asio::buffer(_buffer_ptr->data() + num_read, remain_len);
0283         auto first = _buffer_ptr->cbegin() + varlen_sz + 1;
0284         auto last = first + *varlen;
0285 
0286         asio::async_read(
0287             _stream, buff, asio::transfer_all(),
0288             asio::prepend(
0289                 asio::append(std::move(*this), code, first, last),
0290                 on_read_packet {}
0291             )
0292         );
0293     }
0294 
0295     void operator()(
0296         on_read_packet, error_code ec, size_t, control_code_e code,
0297         byte_citer first, byte_citer last
0298     ) {
0299         if (is_cancelled())
0300             return complete(asio::error::operation_aborted);
0301 
0302         if (ec)
0303             return do_shutdown(ec);
0304 
0305         if (code == control_code_e::connack)
0306             return on_connack(first, last);
0307 
0308         if (!_ctx.co_props[prop::authentication_method].has_value())
0309             return do_shutdown(client::error::malformed_packet);
0310 
0311         on_auth(first, last);
0312     }
0313 
0314     void on_connack(byte_citer first, byte_citer last) {
0315         auto packet_length = static_cast<uint32_t>(std::distance(first, last));
0316         auto rv = decoders::decode_connack(packet_length, first);
0317         if (!rv.has_value())
0318             return do_shutdown(client::error::malformed_packet);
0319         const auto& [session_present, reason_code, ca_props] = *rv;
0320 
0321         _ctx.ca_props = ca_props;
0322         _ctx.state.session_present(session_present);
0323 
0324         //  Unexpected result handling:
0325         //  - If we don't have a Session State, and we get session_present = true,
0326         //      we must close the network connection (and restart with a clean start)
0327         //  - If we have a Session State, and we get session_present = false,
0328         //      we must discard our Session State
0329 
0330         auto rc = to_reason_code<reason_codes::category::connack>(reason_code);
0331         if (!rc.has_value()) // reason code not allowed in CONNACK
0332             return do_shutdown(client::error::malformed_packet);
0333 
0334         _log.at_connack(*rc, session_present, ca_props);
0335         if (*rc)
0336             return do_shutdown(asio::error::try_again);
0337 
0338         if (_ctx.co_props[prop::authentication_method].has_value())
0339             return _ctx.authenticator.async_auth(
0340                 auth_step_e::server_final,
0341                 ca_props[prop::authentication_data].value_or(""),
0342                 asio::prepend(std::move(*this), on_complete_auth {})
0343             );
0344 
0345         complete(error_code {});
0346     }
0347 
0348     void on_auth(byte_citer first, byte_citer last) {
0349         auto packet_length = static_cast<uint32_t>(std::distance(first, last));
0350         auto rv = decoders::decode_auth(packet_length, first);
0351         if (!rv.has_value())
0352             return do_shutdown(client::error::malformed_packet);
0353         const auto& [reason_code, auth_props] = *rv;
0354 
0355         auto rc = to_reason_code<reason_codes::category::auth>(reason_code);
0356         if (
0357             !rc.has_value() ||
0358             auth_props[prop::authentication_method]
0359                 != _ctx.co_props[prop::authentication_method]
0360         )
0361             return do_shutdown(client::error::malformed_packet);
0362 
0363         _ctx.authenticator.async_auth(
0364             auth_step_e::server_challenge,
0365             auth_props[prop::authentication_data].value_or(""),
0366             asio::prepend(std::move(*this), on_auth_data {})
0367         );
0368     }
0369 
0370     void operator()(on_auth_data, error_code ec, std::string data) {
0371         if (is_cancelled())
0372             return complete(asio::error::operation_aborted);
0373 
0374         if (ec)
0375             return do_shutdown(asio::error::try_again);
0376 
0377         auth_props props;
0378         props[prop::authentication_method] =
0379             _ctx.co_props[prop::authentication_method];
0380         props[prop::authentication_data] = std::move(data);
0381 
0382         auto packet = control_packet<allocator_type>::of(
0383             no_pid, get_allocator(),
0384             encoders::encode_auth,
0385             reason_codes::continue_authentication.value(), props
0386         );
0387 
0388         auto wire_data = packet.wire_data();
0389 
0390         detail::async_write(
0391             _stream, asio::buffer(wire_data),
0392             asio::consign(
0393                 asio::prepend(std::move(*this), on_send_auth {}),
0394                 std::move(packet)
0395             )
0396         );
0397     }
0398 
0399     void operator()(on_send_auth, error_code ec, size_t) {
0400         if (is_cancelled())
0401             return complete(asio::error::operation_aborted);
0402 
0403         if (ec)
0404             return do_shutdown(ec);
0405 
0406         auto buff = asio::buffer(_buffer_ptr->data(), min_packet_sz);
0407         asio::async_read(
0408             _stream, buff, asio::transfer_all(),
0409             asio::prepend(std::move(*this), on_fixed_header {})
0410         );
0411     }
0412 
0413     void operator()(on_complete_auth, error_code ec, std::string) {
0414         if (is_cancelled())
0415             return complete(asio::error::operation_aborted);
0416 
0417         if (ec)
0418             return do_shutdown(asio::error::try_again);
0419 
0420         complete(error_code {});
0421     }
0422 
0423     void do_shutdown(error_code connect_ec) {
0424         auto init_shutdown = [&stream = _stream](auto handler) {
0425             async_shutdown(stream, std::move(handler));
0426         };
0427         auto token = asio::prepend(std::move(*this), on_shutdown{}, connect_ec);
0428 
0429         return asio::async_initiate<decltype(token), void(error_code)>(
0430             init_shutdown, token
0431         );
0432     }
0433 
0434     void operator()(on_shutdown, error_code connect_ec, error_code) {
0435         // ignore shutdown error_code
0436         complete(connect_ec);
0437     }
0438 
0439 private:
0440     bool is_cancelled() const {
0441         return _cancellation_state.cancelled() != asio::cancellation_type::none;
0442     }
0443 
0444     void complete(error_code ec) {
0445         asio::get_associated_cancellation_slot(_handler).clear();
0446         std::move(_handler)(ec);
0447     }
0448 };
0449 
0450 
0451 } // end namespace boost::mqtt5::detail
0452 
0453 #endif // !BOOST_MQTT5_CONNECT_OP_HPP