Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-17 08:38:20

0001 //
0002 // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
0003 //
0004 // Distributed under the Boost Software License, Version 1.0.
0005 // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MQTT5_CLIENT_SERVICE_HPP
0009 #define BOOST_MQTT5_CLIENT_SERVICE_HPP
0010 
0011 #include <boost/mqtt5/detail/channel_traits.hpp>
0012 #include <boost/mqtt5/detail/internal_types.hpp>
0013 #include <boost/mqtt5/detail/log_invoke.hpp>
0014 
0015 #include <boost/mqtt5/impl/assemble_op.hpp>
0016 #include <boost/mqtt5/impl/async_sender.hpp>
0017 #include <boost/mqtt5/impl/autoconnect_stream.hpp>
0018 #include <boost/mqtt5/impl/replies.hpp>
0019 
0020 #include <boost/asio/async_result.hpp>
0021 #include <boost/asio/experimental/basic_channel.hpp>
0022 #include <boost/asio/post.hpp>
0023 #include <boost/asio/prepend.hpp>
0024 
0025 #include <cstdint>
0026 #include <memory>
0027 #include <string>
0028 #include <type_traits>
0029 #include <variant> // std::monostate
0030 
0031 namespace boost::mqtt5::detail {
0032 
0033 namespace asio = boost::asio;
0034 
0035 template <
0036     typename StreamType, typename TlsContext,
0037     typename Enable = void
0038 >
0039 class stream_context;
0040 
0041 template <
0042     typename StreamType, typename TlsContext
0043 >
0044 class stream_context<
0045     StreamType, TlsContext,
0046     std::enable_if_t<has_tls_layer<StreamType>>
0047 > {
0048     using tls_context_type = TlsContext;
0049 
0050     mqtt_ctx _mqtt_context;
0051     std::shared_ptr<tls_context_type> _tls_context_ptr;
0052 
0053 public:
0054     explicit stream_context(TlsContext tls_context) :
0055         _tls_context_ptr(std::make_shared<tls_context_type>(std::move(tls_context)))
0056     {}
0057 
0058     stream_context(const stream_context& other) :
0059         _mqtt_context(other._mqtt_context), _tls_context_ptr(other._tls_context_ptr)
0060     {}
0061 
0062     auto& mqtt_context() {
0063         return _mqtt_context;
0064     }
0065 
0066     const auto& mqtt_context() const {
0067         return _mqtt_context;
0068     }
0069 
0070     auto& tls_context() {
0071         return *_tls_context_ptr;
0072     }
0073 
0074     auto& session_state() {
0075         return _mqtt_context.state;
0076     }
0077 
0078     const auto& session_state() const {
0079         return _mqtt_context.state;
0080     }
0081 
0082     void will(will will) {
0083         _mqtt_context.will_msg = std::move(will);
0084     }
0085 
0086     template <prop::property_type p>
0087     const auto& connack_property(
0088         std::integral_constant<prop::property_type, p> prop
0089     ) const {
0090         return _mqtt_context.ca_props[prop];
0091     }
0092 
0093     const auto& connack_properties() const {
0094         return _mqtt_context.ca_props;
0095     }
0096 
0097     template <prop::property_type p>
0098     const auto& connect_property(
0099         std::integral_constant<prop::property_type, p> prop
0100     ) const {
0101         return _mqtt_context.co_props[prop];
0102     }
0103 
0104     template <prop::property_type p>
0105     auto& connect_property(
0106         std::integral_constant<prop::property_type, p> prop
0107     ) {
0108         return _mqtt_context.co_props[prop];
0109     }
0110 
0111     void connect_properties(connect_props props) {
0112         _mqtt_context.co_props = std::move(props);
0113     }
0114 
0115     void credentials(
0116         std::string client_id,
0117         std::string username = "", std::string password = ""
0118     ) {
0119         _mqtt_context.creds = {
0120             std::move(client_id),
0121             std::move(username), std::move(password)
0122         };
0123     }
0124 
0125     template <typename Authenticator>
0126     void authenticator(Authenticator&& authenticator) {
0127         _mqtt_context.authenticator = any_authenticator(
0128             std::forward<Authenticator>(authenticator)
0129         );
0130     }
0131 };
0132 
0133 template <typename StreamType>
0134 class stream_context<
0135     StreamType, std::monostate,
0136     std::enable_if_t<!has_tls_layer<StreamType>>
0137 > {
0138     mqtt_ctx _mqtt_context;
0139 public:
0140     explicit stream_context(std::monostate) {}
0141 
0142     stream_context(const stream_context& other) :
0143         _mqtt_context(other._mqtt_context)
0144     {}
0145 
0146     auto& mqtt_context() {
0147         return _mqtt_context;
0148     }
0149 
0150     const auto& mqtt_context() const {
0151         return _mqtt_context;
0152     }
0153 
0154     auto& session_state() {
0155         return _mqtt_context.state;
0156     }
0157 
0158     const auto& session_state() const {
0159         return _mqtt_context.state;
0160     }
0161 
0162     void will(will will) {
0163         _mqtt_context.will_msg = std::move(will);
0164     }
0165 
0166     template <prop::property_type p>
0167     const auto& connack_property(
0168         std::integral_constant<prop::property_type, p> prop
0169     ) const {
0170         return _mqtt_context.ca_props[prop];
0171     }
0172 
0173     const auto& connack_properties() const {
0174         return _mqtt_context.ca_props;
0175     }
0176 
0177     template <prop::property_type p>
0178     const auto& connect_property(
0179         std::integral_constant<prop::property_type, p> prop
0180     ) const {
0181         return _mqtt_context.co_props[prop];
0182     }
0183 
0184     template <prop::property_type p>
0185     auto& connect_property(
0186         std::integral_constant<prop::property_type, p> prop
0187     ) {
0188         return _mqtt_context.co_props[prop];
0189     }
0190 
0191     void connect_properties(connect_props props) {
0192         _mqtt_context.co_props = std::move(props);
0193     }
0194 
0195     void credentials(
0196         std::string client_id,
0197         std::string username = "", std::string password = ""
0198     ) {
0199         _mqtt_context.creds = {
0200             std::move(client_id),
0201             std::move(username), std::move(password)
0202         };
0203     }
0204 
0205     template <typename Authenticator>
0206     void authenticator(Authenticator&& authenticator) {
0207         _mqtt_context.authenticator = any_authenticator(
0208             std::forward<Authenticator>(authenticator)
0209         );
0210     }
0211 };
0212 
0213 template <
0214     typename StreamType,
0215     typename TlsContext = std::monostate,
0216     typename LoggerType = noop_logger
0217 >
0218 class client_service {
0219     using self_type = client_service<StreamType, TlsContext, LoggerType>;
0220     using stream_context_type = stream_context<StreamType, TlsContext>;
0221     using stream_type = autoconnect_stream<
0222         StreamType, stream_context_type, LoggerType
0223     >;
0224 public:
0225     using executor_type = typename stream_type::executor_type;
0226 private:
0227     using tls_context_type = TlsContext;
0228     using logger_type = LoggerType;
0229     using receive_channel = asio::experimental::basic_channel<
0230         executor_type,
0231         channel_traits<>,
0232         void (error_code, std::string, std::string, publish_props)
0233     >;
0234 
0235     template <typename ClientService, typename Handler>
0236     friend class run_op;
0237 
0238     template <typename ClientService>
0239     friend class async_sender;
0240 
0241     template <typename ClientService, typename Handler>
0242     friend class assemble_op;
0243 
0244     template <typename ClientService, typename Handler>
0245     friend class ping_op;
0246 
0247     template <typename ClientService, typename Handler>
0248     friend class sentry_op;
0249 
0250     template <typename ClientService>
0251     friend class re_auth_op;
0252 
0253     executor_type _executor;
0254 
0255     log_invoke<logger_type> _log;
0256 
0257     stream_context_type _stream_context;
0258     stream_type _stream;
0259 
0260     packet_id_allocator _pid_allocator;
0261     replies _replies;
0262     async_sender<client_service> _async_sender;
0263 
0264     std::string _read_buff;
0265     data_span _active_span;
0266 
0267     receive_channel _rec_channel;
0268 
0269     asio::steady_timer _ping_timer;
0270     asio::steady_timer _sentry_timer;
0271 
0272     client_service(const client_service& other) :
0273         _executor(other._executor),
0274         _log(other._log),
0275         _stream_context(other._stream_context),
0276         _stream(_executor, _stream_context, _log),
0277         _replies(_executor),
0278         _async_sender(*this),
0279         _active_span(_read_buff.cend(), _read_buff.cend()),
0280         _rec_channel(_executor, (std::numeric_limits<size_t>::max)()),
0281         _ping_timer(_executor),
0282         _sentry_timer(_executor)
0283     {
0284         _stream.clone_endpoints(other._stream);
0285     }
0286 
0287 public:
0288 
0289     explicit client_service(
0290         const executor_type& ex,
0291         tls_context_type tls_context = {}, logger_type logger = {}
0292     ) :
0293         _executor(ex),
0294         _log(std::move(logger)),
0295         _stream_context(std::move(tls_context)),
0296         _stream(ex, _stream_context, _log),
0297         _replies(ex),
0298         _async_sender(*this),
0299         _active_span(_read_buff.cend(), _read_buff.cend()),
0300         _rec_channel(ex, (std::numeric_limits<size_t>::max)()),
0301         _ping_timer(ex),
0302         _sentry_timer(ex)
0303     {}
0304 
0305     executor_type get_executor() const noexcept {
0306         return _executor;
0307     }
0308 
0309     auto dup() const {
0310         return std::shared_ptr<client_service>(new client_service(*this));
0311     }
0312 
0313     template <
0314         typename Ctx = TlsContext,
0315         std::enable_if_t<!std::is_same_v<Ctx, std::monostate>, bool> = true
0316     >
0317     decltype(auto) tls_context() {
0318         return _stream_context.tls_context();
0319     }
0320 
0321     void will(will will) {
0322         if (!is_open())
0323             _stream_context.will(std::move(will));
0324     }
0325 
0326     void credentials(
0327         std::string client_id,
0328         std::string username = "", std::string password = ""
0329     ) {
0330         if (!is_open())
0331             _stream_context.credentials(
0332                 std::move(client_id),
0333                 std::move(username), std::move(password)
0334             );
0335     }
0336 
0337     void brokers(std::string hosts, uint16_t default_port) {
0338         if (!is_open())
0339             _stream.brokers(std::move(hosts), default_port);
0340     }
0341 
0342     template <
0343         typename Authenticator,
0344         std::enable_if_t<is_authenticator<Authenticator>, bool> = true
0345     >
0346     void authenticator(Authenticator&& authenticator) {
0347         if (!is_open())
0348             _stream_context.authenticator(
0349                 std::forward<Authenticator>(authenticator)
0350             );
0351     }
0352 
0353     uint16_t negotiated_keep_alive() const {
0354         return connack_property(prop::server_keep_alive)
0355             .value_or(_stream_context.mqtt_context().keep_alive);
0356     }
0357 
0358     void keep_alive(uint16_t seconds) {
0359         if (!is_open())
0360             _stream_context.mqtt_context().keep_alive = seconds;
0361     }
0362 
0363     template <prop::property_type p>
0364     const auto& connect_property(
0365         std::integral_constant<prop::property_type, p> prop
0366     ) const {
0367         return _stream_context.connect_property(prop);
0368     }
0369 
0370     template <prop::property_type p>
0371     void connect_property(
0372         std::integral_constant<prop::property_type, p> prop,
0373         prop::value_type_t<p> value
0374     ){
0375         if (!is_open())
0376             _stream_context.connect_property(prop) = value;
0377     }
0378 
0379     void connect_properties(connect_props props) {
0380         if (!is_open())
0381             _stream_context.connect_properties(std::move(props));
0382     }
0383 
0384     template <prop::property_type p>
0385     const auto& connack_property(
0386         std::integral_constant<prop::property_type, p> prop
0387     ) const {
0388         return _stream_context.connack_property(prop);
0389     }
0390 
0391     const auto& connack_properties() const {
0392         return _stream_context.connack_properties();
0393     }
0394 
0395     void open_stream() {
0396         _stream.open();
0397     }
0398 
0399     bool is_open() const {
0400         return _stream.is_open();
0401     }
0402 
0403     template <typename CompletionToken>
0404     decltype(auto) async_shutdown(CompletionToken&& token) {
0405         return _stream.async_shutdown(std::forward<CompletionToken>(token));
0406     }
0407 
0408     void cancel() {
0409         if (!_stream.is_open()) return;
0410 
0411         _ping_timer.cancel();
0412         _sentry_timer.cancel();
0413 
0414         _rec_channel.close();
0415         _replies.cancel_unanswered();
0416         _async_sender.cancel();
0417         _stream.cancel();
0418         _stream.close();
0419     }
0420 
0421     log_invoke<LoggerType>& log() {
0422         return _log;
0423     }
0424 
0425     uint16_t allocate_pid() {
0426         return _pid_allocator.allocate();
0427     }
0428 
0429     void free_pid(uint16_t pid, bool was_throttled = false) {
0430         _pid_allocator.free(pid);
0431         if (was_throttled)
0432             _async_sender.throttled_op_done();
0433     }
0434 
0435     serial_num_t next_serial_num() {
0436         return _async_sender.next_serial_num();
0437     }
0438 
0439     bool subscriptions_present() const {
0440         return _stream_context.session_state().subscriptions_present();
0441     }
0442 
0443     void subscriptions_present(bool present) {
0444         _stream_context.session_state().subscriptions_present(present);
0445     }
0446 
0447     void update_session_state() {
0448         auto& session_state = _stream_context.session_state();
0449 
0450         if (!session_state.session_present()) {
0451             _replies.clear_pending_pubrels();
0452             session_state.session_present(true);
0453 
0454             if (session_state.subscriptions_present()) {
0455                 channel_store_error(client::error::session_expired);
0456                 session_state.subscriptions_present(false);
0457             }
0458         }
0459 
0460         _ping_timer.cancel();
0461     }
0462 
0463     bool channel_store(decoders::publish_message message) {
0464         auto& [topic, packet_id, flags, props, payload] = message;
0465         return _rec_channel.try_send(
0466             error_code {}, std::move(topic),
0467             std::move(payload), std::move(props)
0468         );
0469     }
0470 
0471     bool channel_store_error(error_code ec) {
0472         return _rec_channel.try_send(
0473             ec, std::string {}, std::string {}, publish_props {}
0474         );
0475     }
0476 
0477     template <typename BufferType, typename CompletionToken>
0478     decltype(auto) async_send(
0479         const BufferType& buffer,
0480         serial_num_t serial_num, unsigned flags,
0481         CompletionToken&& token
0482     ) {
0483         return _async_sender.async_send(
0484             buffer, serial_num, flags, std::forward<CompletionToken>(token)
0485         );
0486     }
0487 
0488     template <typename CompletionToken>
0489     decltype(auto) async_assemble(CompletionToken&& token) {
0490         using Signature = void (error_code, uint8_t, byte_citer, byte_citer);
0491 
0492         auto initiation = [] (
0493             auto handler, self_type& self,
0494             std::string& read_buff, data_span& active_span
0495         ) {
0496             assemble_op {
0497                 self, std::move(handler), read_buff, active_span
0498             }.perform(asio::transfer_at_least(0));
0499         };
0500 
0501         return asio::async_initiate<CompletionToken, Signature> (
0502             initiation, token, std::ref(*this),
0503             std::ref(_read_buff), std::ref(_active_span)
0504         );
0505     }
0506 
0507     template <typename CompletionToken>
0508     decltype(auto) async_wait_reply(
0509         control_code_e code, uint16_t packet_id, CompletionToken&& token
0510     ) {
0511         return _replies.async_wait_reply(
0512             code, packet_id, std::forward<CompletionToken>(token)
0513         );
0514     }
0515 
0516     template <typename CompletionToken>
0517     decltype(auto) async_channel_receive(CompletionToken&& token) {
0518         return _rec_channel.async_receive(std::forward<CompletionToken>(token));
0519     }
0520 
0521 };
0522 
0523 } // namespace boost::mqtt5::detail
0524 
0525 #endif // !BOOST_MQTT5_CLIENT_SERVICE_HPP