File indexing completed on 2025-09-17 08:38:22
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MQTT5_RUN_OP_HPP
0009 #define BOOST_MQTT5_RUN_OP_HPP
0010
0011 #include <boost/mqtt5/detail/cancellable_handler.hpp>
0012 #include <boost/mqtt5/detail/control_packet.hpp>
0013 #include <boost/mqtt5/detail/internal_types.hpp>
0014
0015 #include <boost/mqtt5/impl/ping_op.hpp>
0016 #include <boost/mqtt5/impl/read_message_op.hpp>
0017 #include <boost/mqtt5/impl/sentry_op.hpp>
0018
0019 #include <boost/asio/associated_allocator.hpp>
0020 #include <boost/asio/associated_cancellation_slot.hpp>
0021 #include <boost/asio/associated_executor.hpp>
0022 #include <boost/asio/experimental/parallel_group.hpp>
0023
0024 #include <memory>
0025
0026 namespace boost::mqtt5::detail {
0027
0028 namespace asio = boost::asio;
0029
0030 template <typename ClientService, typename Handler>
0031 class run_op {
0032 using client_service = ClientService;
0033
0034 std::shared_ptr<client_service> _svc_ptr;
0035
0036 using handler_type = cancellable_handler<
0037 Handler,
0038 typename client_service::executor_type
0039 >;
0040 handler_type _handler;
0041
0042 public:
0043 run_op(
0044 std::shared_ptr<client_service> svc_ptr,
0045 Handler&& handler
0046 ) :
0047 _svc_ptr(std::move(svc_ptr)),
0048 _handler(std::move(handler), _svc_ptr->get_executor())
0049 {
0050 auto slot = asio::get_associated_cancellation_slot(_handler);
0051 if (slot.is_connected())
0052 slot.assign([&svc = *_svc_ptr](asio::cancellation_type_t) {
0053 svc.cancel();
0054 });
0055 }
0056
0057 run_op(run_op&&) = default;
0058 run_op(const run_op&) = delete;
0059
0060 run_op& operator=(run_op&&) = default;
0061 run_op& operator=(const run_op&) = delete;
0062
0063 using allocator_type = asio::associated_allocator_t<handler_type>;
0064 allocator_type get_allocator() const noexcept {
0065 return asio::get_associated_allocator(_handler);
0066 }
0067
0068 using executor_type = typename client_service::executor_type;
0069 executor_type get_executor() const noexcept {
0070 return _svc_ptr->get_executor();
0071 }
0072
0073 void perform() {
0074 namespace asioex = boost::asio::experimental;
0075
0076 _svc_ptr->_stream.open();
0077 _svc_ptr->_rec_channel.reset();
0078
0079 auto init_read_message_op = [](
0080 auto handler, std::shared_ptr<client_service> svc_ptr
0081 ) {
0082 return read_message_op { std::move(svc_ptr), std::move(handler) }
0083 .perform();
0084 };
0085
0086 auto init_ping_op = [](
0087 auto handler, std::shared_ptr<client_service> svc_ptr
0088 ) {
0089 return ping_op { std::move(svc_ptr), std::move(handler) }
0090 .perform();
0091 };
0092
0093 auto init_senty_op = [](
0094 auto handler, std::shared_ptr<client_service> svc_ptr
0095 ) {
0096 return sentry_op { std::move(svc_ptr), std::move(handler) }
0097 .perform();
0098 };
0099
0100 asioex::make_parallel_group(
0101 asio::async_initiate<const asio::deferred_t, void ()>(
0102 init_read_message_op, asio::deferred, _svc_ptr
0103 ),
0104 asio::async_initiate<const asio::deferred_t, void ()>(
0105 init_ping_op, asio::deferred, _svc_ptr
0106 ),
0107 asio::async_initiate<const asio::deferred_t, void ()>(
0108 init_senty_op, asio::deferred, _svc_ptr
0109 )
0110 ).async_wait(asioex::wait_for_all(), std::move(*this));
0111 }
0112
0113 void operator()(std::array<std::size_t, 3> ) {
0114 _handler.complete(make_error_code(asio::error::operation_aborted));
0115 }
0116 };
0117
0118 template <typename ClientService>
0119 class initiate_async_run {
0120 std::shared_ptr<ClientService> _svc_ptr;
0121 public:
0122 explicit initiate_async_run(std::shared_ptr<ClientService> svc_ptr) :
0123 _svc_ptr(std::move(svc_ptr))
0124 {}
0125
0126 using executor_type = typename ClientService::executor_type;
0127 executor_type get_executor() const noexcept {
0128 return _svc_ptr->get_executor();
0129 }
0130
0131 template <typename Handler>
0132 void operator()(Handler&& handler) {
0133 run_op<ClientService, Handler> {
0134 _svc_ptr, std::move(handler)
0135 }.perform();
0136 }
0137 };
0138
0139
0140 }
0141
0142 #endif