File indexing completed on 2025-09-17 08:38:20
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MQTT5_CLIENT_SERVICE_HPP
0009 #define BOOST_MQTT5_CLIENT_SERVICE_HPP
0010
0011 #include <boost/mqtt5/detail/channel_traits.hpp>
0012 #include <boost/mqtt5/detail/internal_types.hpp>
0013 #include <boost/mqtt5/detail/log_invoke.hpp>
0014
0015 #include <boost/mqtt5/impl/assemble_op.hpp>
0016 #include <boost/mqtt5/impl/async_sender.hpp>
0017 #include <boost/mqtt5/impl/autoconnect_stream.hpp>
0018 #include <boost/mqtt5/impl/replies.hpp>
0019
0020 #include <boost/asio/async_result.hpp>
0021 #include <boost/asio/experimental/basic_channel.hpp>
0022 #include <boost/asio/post.hpp>
0023 #include <boost/asio/prepend.hpp>
0024
0025 #include <cstdint>
0026 #include <memory>
0027 #include <string>
0028 #include <type_traits>
0029 #include <variant> // std::monostate
0030
0031 namespace boost::mqtt5::detail {
0032
0033 namespace asio = boost::asio;
0034
0035 template <
0036 typename StreamType, typename TlsContext,
0037 typename Enable = void
0038 >
0039 class stream_context;
0040
0041 template <
0042 typename StreamType, typename TlsContext
0043 >
0044 class stream_context<
0045 StreamType, TlsContext,
0046 std::enable_if_t<has_tls_layer<StreamType>>
0047 > {
0048 using tls_context_type = TlsContext;
0049
0050 mqtt_ctx _mqtt_context;
0051 std::shared_ptr<tls_context_type> _tls_context_ptr;
0052
0053 public:
0054 explicit stream_context(TlsContext tls_context) :
0055 _tls_context_ptr(std::make_shared<tls_context_type>(std::move(tls_context)))
0056 {}
0057
0058 stream_context(const stream_context& other) :
0059 _mqtt_context(other._mqtt_context), _tls_context_ptr(other._tls_context_ptr)
0060 {}
0061
0062 auto& mqtt_context() {
0063 return _mqtt_context;
0064 }
0065
0066 const auto& mqtt_context() const {
0067 return _mqtt_context;
0068 }
0069
0070 auto& tls_context() {
0071 return *_tls_context_ptr;
0072 }
0073
0074 auto& session_state() {
0075 return _mqtt_context.state;
0076 }
0077
0078 const auto& session_state() const {
0079 return _mqtt_context.state;
0080 }
0081
0082 void will(will will) {
0083 _mqtt_context.will_msg = std::move(will);
0084 }
0085
0086 template <prop::property_type p>
0087 const auto& connack_property(
0088 std::integral_constant<prop::property_type, p> prop
0089 ) const {
0090 return _mqtt_context.ca_props[prop];
0091 }
0092
0093 const auto& connack_properties() const {
0094 return _mqtt_context.ca_props;
0095 }
0096
0097 template <prop::property_type p>
0098 const auto& connect_property(
0099 std::integral_constant<prop::property_type, p> prop
0100 ) const {
0101 return _mqtt_context.co_props[prop];
0102 }
0103
0104 template <prop::property_type p>
0105 auto& connect_property(
0106 std::integral_constant<prop::property_type, p> prop
0107 ) {
0108 return _mqtt_context.co_props[prop];
0109 }
0110
0111 void connect_properties(connect_props props) {
0112 _mqtt_context.co_props = std::move(props);
0113 }
0114
0115 void credentials(
0116 std::string client_id,
0117 std::string username = "", std::string password = ""
0118 ) {
0119 _mqtt_context.creds = {
0120 std::move(client_id),
0121 std::move(username), std::move(password)
0122 };
0123 }
0124
0125 template <typename Authenticator>
0126 void authenticator(Authenticator&& authenticator) {
0127 _mqtt_context.authenticator = any_authenticator(
0128 std::forward<Authenticator>(authenticator)
0129 );
0130 }
0131 };
0132
0133 template <typename StreamType>
0134 class stream_context<
0135 StreamType, std::monostate,
0136 std::enable_if_t<!has_tls_layer<StreamType>>
0137 > {
0138 mqtt_ctx _mqtt_context;
0139 public:
0140 explicit stream_context(std::monostate) {}
0141
0142 stream_context(const stream_context& other) :
0143 _mqtt_context(other._mqtt_context)
0144 {}
0145
0146 auto& mqtt_context() {
0147 return _mqtt_context;
0148 }
0149
0150 const auto& mqtt_context() const {
0151 return _mqtt_context;
0152 }
0153
0154 auto& session_state() {
0155 return _mqtt_context.state;
0156 }
0157
0158 const auto& session_state() const {
0159 return _mqtt_context.state;
0160 }
0161
0162 void will(will will) {
0163 _mqtt_context.will_msg = std::move(will);
0164 }
0165
0166 template <prop::property_type p>
0167 const auto& connack_property(
0168 std::integral_constant<prop::property_type, p> prop
0169 ) const {
0170 return _mqtt_context.ca_props[prop];
0171 }
0172
0173 const auto& connack_properties() const {
0174 return _mqtt_context.ca_props;
0175 }
0176
0177 template <prop::property_type p>
0178 const auto& connect_property(
0179 std::integral_constant<prop::property_type, p> prop
0180 ) const {
0181 return _mqtt_context.co_props[prop];
0182 }
0183
0184 template <prop::property_type p>
0185 auto& connect_property(
0186 std::integral_constant<prop::property_type, p> prop
0187 ) {
0188 return _mqtt_context.co_props[prop];
0189 }
0190
0191 void connect_properties(connect_props props) {
0192 _mqtt_context.co_props = std::move(props);
0193 }
0194
0195 void credentials(
0196 std::string client_id,
0197 std::string username = "", std::string password = ""
0198 ) {
0199 _mqtt_context.creds = {
0200 std::move(client_id),
0201 std::move(username), std::move(password)
0202 };
0203 }
0204
0205 template <typename Authenticator>
0206 void authenticator(Authenticator&& authenticator) {
0207 _mqtt_context.authenticator = any_authenticator(
0208 std::forward<Authenticator>(authenticator)
0209 );
0210 }
0211 };
0212
0213 template <
0214 typename StreamType,
0215 typename TlsContext = std::monostate,
0216 typename LoggerType = noop_logger
0217 >
0218 class client_service {
0219 using self_type = client_service<StreamType, TlsContext, LoggerType>;
0220 using stream_context_type = stream_context<StreamType, TlsContext>;
0221 using stream_type = autoconnect_stream<
0222 StreamType, stream_context_type, LoggerType
0223 >;
0224 public:
0225 using executor_type = typename stream_type::executor_type;
0226 private:
0227 using tls_context_type = TlsContext;
0228 using logger_type = LoggerType;
0229 using receive_channel = asio::experimental::basic_channel<
0230 executor_type,
0231 channel_traits<>,
0232 void (error_code, std::string, std::string, publish_props)
0233 >;
0234
0235 template <typename ClientService, typename Handler>
0236 friend class run_op;
0237
0238 template <typename ClientService>
0239 friend class async_sender;
0240
0241 template <typename ClientService, typename Handler>
0242 friend class assemble_op;
0243
0244 template <typename ClientService, typename Handler>
0245 friend class ping_op;
0246
0247 template <typename ClientService, typename Handler>
0248 friend class sentry_op;
0249
0250 template <typename ClientService>
0251 friend class re_auth_op;
0252
0253 executor_type _executor;
0254
0255 log_invoke<logger_type> _log;
0256
0257 stream_context_type _stream_context;
0258 stream_type _stream;
0259
0260 packet_id_allocator _pid_allocator;
0261 replies _replies;
0262 async_sender<client_service> _async_sender;
0263
0264 std::string _read_buff;
0265 data_span _active_span;
0266
0267 receive_channel _rec_channel;
0268
0269 asio::steady_timer _ping_timer;
0270 asio::steady_timer _sentry_timer;
0271
0272 client_service(const client_service& other) :
0273 _executor(other._executor),
0274 _log(other._log),
0275 _stream_context(other._stream_context),
0276 _stream(_executor, _stream_context, _log),
0277 _replies(_executor),
0278 _async_sender(*this),
0279 _active_span(_read_buff.cend(), _read_buff.cend()),
0280 _rec_channel(_executor, (std::numeric_limits<size_t>::max)()),
0281 _ping_timer(_executor),
0282 _sentry_timer(_executor)
0283 {
0284 _stream.clone_endpoints(other._stream);
0285 }
0286
0287 public:
0288
0289 explicit client_service(
0290 const executor_type& ex,
0291 tls_context_type tls_context = {}, logger_type logger = {}
0292 ) :
0293 _executor(ex),
0294 _log(std::move(logger)),
0295 _stream_context(std::move(tls_context)),
0296 _stream(ex, _stream_context, _log),
0297 _replies(ex),
0298 _async_sender(*this),
0299 _active_span(_read_buff.cend(), _read_buff.cend()),
0300 _rec_channel(ex, (std::numeric_limits<size_t>::max)()),
0301 _ping_timer(ex),
0302 _sentry_timer(ex)
0303 {}
0304
0305 executor_type get_executor() const noexcept {
0306 return _executor;
0307 }
0308
0309 auto dup() const {
0310 return std::shared_ptr<client_service>(new client_service(*this));
0311 }
0312
0313 template <
0314 typename Ctx = TlsContext,
0315 std::enable_if_t<!std::is_same_v<Ctx, std::monostate>, bool> = true
0316 >
0317 decltype(auto) tls_context() {
0318 return _stream_context.tls_context();
0319 }
0320
0321 void will(will will) {
0322 if (!is_open())
0323 _stream_context.will(std::move(will));
0324 }
0325
0326 void credentials(
0327 std::string client_id,
0328 std::string username = "", std::string password = ""
0329 ) {
0330 if (!is_open())
0331 _stream_context.credentials(
0332 std::move(client_id),
0333 std::move(username), std::move(password)
0334 );
0335 }
0336
0337 void brokers(std::string hosts, uint16_t default_port) {
0338 if (!is_open())
0339 _stream.brokers(std::move(hosts), default_port);
0340 }
0341
0342 template <
0343 typename Authenticator,
0344 std::enable_if_t<is_authenticator<Authenticator>, bool> = true
0345 >
0346 void authenticator(Authenticator&& authenticator) {
0347 if (!is_open())
0348 _stream_context.authenticator(
0349 std::forward<Authenticator>(authenticator)
0350 );
0351 }
0352
0353 uint16_t negotiated_keep_alive() const {
0354 return connack_property(prop::server_keep_alive)
0355 .value_or(_stream_context.mqtt_context().keep_alive);
0356 }
0357
0358 void keep_alive(uint16_t seconds) {
0359 if (!is_open())
0360 _stream_context.mqtt_context().keep_alive = seconds;
0361 }
0362
0363 template <prop::property_type p>
0364 const auto& connect_property(
0365 std::integral_constant<prop::property_type, p> prop
0366 ) const {
0367 return _stream_context.connect_property(prop);
0368 }
0369
0370 template <prop::property_type p>
0371 void connect_property(
0372 std::integral_constant<prop::property_type, p> prop,
0373 prop::value_type_t<p> value
0374 ){
0375 if (!is_open())
0376 _stream_context.connect_property(prop) = value;
0377 }
0378
0379 void connect_properties(connect_props props) {
0380 if (!is_open())
0381 _stream_context.connect_properties(std::move(props));
0382 }
0383
0384 template <prop::property_type p>
0385 const auto& connack_property(
0386 std::integral_constant<prop::property_type, p> prop
0387 ) const {
0388 return _stream_context.connack_property(prop);
0389 }
0390
0391 const auto& connack_properties() const {
0392 return _stream_context.connack_properties();
0393 }
0394
0395 void open_stream() {
0396 _stream.open();
0397 }
0398
0399 bool is_open() const {
0400 return _stream.is_open();
0401 }
0402
0403 template <typename CompletionToken>
0404 decltype(auto) async_shutdown(CompletionToken&& token) {
0405 return _stream.async_shutdown(std::forward<CompletionToken>(token));
0406 }
0407
0408 void cancel() {
0409 if (!_stream.is_open()) return;
0410
0411 _ping_timer.cancel();
0412 _sentry_timer.cancel();
0413
0414 _rec_channel.close();
0415 _replies.cancel_unanswered();
0416 _async_sender.cancel();
0417 _stream.cancel();
0418 _stream.close();
0419 }
0420
0421 log_invoke<LoggerType>& log() {
0422 return _log;
0423 }
0424
0425 uint16_t allocate_pid() {
0426 return _pid_allocator.allocate();
0427 }
0428
0429 void free_pid(uint16_t pid, bool was_throttled = false) {
0430 _pid_allocator.free(pid);
0431 if (was_throttled)
0432 _async_sender.throttled_op_done();
0433 }
0434
0435 serial_num_t next_serial_num() {
0436 return _async_sender.next_serial_num();
0437 }
0438
0439 bool subscriptions_present() const {
0440 return _stream_context.session_state().subscriptions_present();
0441 }
0442
0443 void subscriptions_present(bool present) {
0444 _stream_context.session_state().subscriptions_present(present);
0445 }
0446
0447 void update_session_state() {
0448 auto& session_state = _stream_context.session_state();
0449
0450 if (!session_state.session_present()) {
0451 _replies.clear_pending_pubrels();
0452 session_state.session_present(true);
0453
0454 if (session_state.subscriptions_present()) {
0455 channel_store_error(client::error::session_expired);
0456 session_state.subscriptions_present(false);
0457 }
0458 }
0459
0460 _ping_timer.cancel();
0461 }
0462
0463 bool channel_store(decoders::publish_message message) {
0464 auto& [topic, packet_id, flags, props, payload] = message;
0465 return _rec_channel.try_send(
0466 error_code {}, std::move(topic),
0467 std::move(payload), std::move(props)
0468 );
0469 }
0470
0471 bool channel_store_error(error_code ec) {
0472 return _rec_channel.try_send(
0473 ec, std::string {}, std::string {}, publish_props {}
0474 );
0475 }
0476
0477 template <typename BufferType, typename CompletionToken>
0478 decltype(auto) async_send(
0479 const BufferType& buffer,
0480 serial_num_t serial_num, unsigned flags,
0481 CompletionToken&& token
0482 ) {
0483 return _async_sender.async_send(
0484 buffer, serial_num, flags, std::forward<CompletionToken>(token)
0485 );
0486 }
0487
0488 template <typename CompletionToken>
0489 decltype(auto) async_assemble(CompletionToken&& token) {
0490 using Signature = void (error_code, uint8_t, byte_citer, byte_citer);
0491
0492 auto initiation = [] (
0493 auto handler, self_type& self,
0494 std::string& read_buff, data_span& active_span
0495 ) {
0496 assemble_op {
0497 self, std::move(handler), read_buff, active_span
0498 }.perform(asio::transfer_at_least(0));
0499 };
0500
0501 return asio::async_initiate<CompletionToken, Signature> (
0502 initiation, token, std::ref(*this),
0503 std::ref(_read_buff), std::ref(_active_span)
0504 );
0505 }
0506
0507 template <typename CompletionToken>
0508 decltype(auto) async_wait_reply(
0509 control_code_e code, uint16_t packet_id, CompletionToken&& token
0510 ) {
0511 return _replies.async_wait_reply(
0512 code, packet_id, std::forward<CompletionToken>(token)
0513 );
0514 }
0515
0516 template <typename CompletionToken>
0517 decltype(auto) async_channel_receive(CompletionToken&& token) {
0518 return _rec_channel.async_receive(std::forward<CompletionToken>(token));
0519 }
0520
0521 };
0522
0523 }
0524
0525 #endif