File indexing completed on 2025-09-17 08:38:19
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MQTT5_ASYNC_MUTEX_HPP
0009 #define BOOST_MQTT5_ASYNC_MUTEX_HPP
0010
0011 #include <boost/mqtt5/detail/async_traits.hpp>
0012
0013 #include <boost/asio/any_completion_handler.hpp>
0014 #include <boost/asio/any_io_executor.hpp>
0015 #include <boost/asio/associated_allocator.hpp>
0016 #include <boost/asio/associated_cancellation_slot.hpp>
0017 #include <boost/asio/async_result.hpp>
0018 #include <boost/asio/bind_cancellation_slot.hpp>
0019 #include <boost/asio/execution.hpp>
0020 #include <boost/asio/require.hpp>
0021 #include <boost/system/error_code.hpp>
0022
0023 #include <deque>
0024
0025 namespace boost::mqtt5::detail {
0026
0027 namespace asio = boost::asio;
0028 using error_code = boost::system::error_code;
0029
0030 class async_mutex {
0031 public:
0032 using executor_type = asio::any_io_executor;
0033 private:
0034 using queued_op_t = asio::any_completion_handler<
0035 void (error_code)
0036 >;
0037 using queue_t = std::deque<queued_op_t>;
0038
0039
0040
0041
0042 template <typename Handler, typename Executor>
0043 class tracked_op {
0044 tracking_type<Handler, Executor> _executor;
0045 Handler _handler;
0046 public:
0047 tracked_op(Handler&& h, const Executor& ex) :
0048 _executor(tracking_executor(h, ex)), _handler(std::move(h))
0049 {}
0050
0051 tracked_op(tracked_op&&) = default;
0052 tracked_op(const tracked_op&) = delete;
0053
0054 tracked_op& operator=(tracked_op&&) = default;
0055 tracked_op& operator=(const tracked_op&) = delete;
0056
0057 using allocator_type = asio::associated_allocator_t<Handler>;
0058 allocator_type get_allocator() const noexcept {
0059 return asio::get_associated_allocator(_handler);
0060 }
0061
0062 using cancellation_slot_type =
0063 asio::associated_cancellation_slot_t<Handler>;
0064 cancellation_slot_type get_cancellation_slot() const noexcept {
0065 return asio::get_associated_cancellation_slot(_handler);
0066 }
0067
0068 using executor_type = tracking_type<Handler, Executor>;
0069 executor_type get_executor() const noexcept {
0070 return _executor;
0071 }
0072
0073 void operator()(error_code ec) {
0074 std::move(_handler)(ec);
0075 }
0076 };
0077
0078
0079
0080
0081
0082
0083 class cancel_waiting_op {
0084 queue_t::iterator _ihandler;
0085 public:
0086 explicit cancel_waiting_op(queue_t::iterator ih) : _ihandler(ih) {}
0087
0088 void operator()(asio::cancellation_type_t type) {
0089 if (type == asio::cancellation_type_t::none)
0090 return;
0091 if (*_ihandler) {
0092 auto h = std::move(*_ihandler);
0093 auto ex = asio::get_associated_executor(h);
0094 asio::require(ex, asio::execution::blocking.possibly)
0095 .execute([h = std::move(h)]() mutable {
0096 std::move(h)(asio::error::operation_aborted);
0097 });
0098 }
0099 }
0100 };
0101
0102 bool _locked { false };
0103 queue_t _waiting;
0104 executor_type _ex;
0105
0106 public:
0107 template <typename Executor>
0108 explicit async_mutex(Executor&& ex) : _ex(std::forward<Executor>(ex)) {}
0109
0110 async_mutex(const async_mutex&) = delete;
0111 async_mutex& operator=(const async_mutex&) = delete;
0112
0113 ~async_mutex() {
0114 cancel();
0115 }
0116
0117 const executor_type& get_executor() const noexcept {
0118 return _ex;
0119 }
0120
0121 bool is_locked() const noexcept {
0122 return _locked;
0123 }
0124
0125
0126
0127
0128 template <typename CompletionToken>
0129 decltype(auto) lock(CompletionToken&& token) noexcept {
0130 using Signature = void (error_code);
0131
0132 auto initiation = [] (auto handler, async_mutex& self) {
0133 self.execute_or_queue(std::move(handler));
0134 };
0135
0136 return asio::async_initiate<CompletionToken, Signature>(
0137 initiation, token, std::ref(*this)
0138 );
0139 }
0140
0141
0142
0143
0144 void unlock() {
0145 while (!_waiting.empty()) {
0146 auto op = std::move(_waiting.front());
0147 _waiting.pop_front();
0148 if (!op) continue;
0149 op.get_cancellation_slot().clear();
0150 execute_op(std::move(op));
0151 return;
0152 }
0153 _locked = false;
0154 }
0155
0156
0157 void cancel() {
0158 while (!_waiting.empty()) {
0159 auto op = std::move(_waiting.front());
0160 _waiting.pop_front();
0161 if (!op) continue;
0162 op.get_cancellation_slot().clear();
0163 asio::require(_ex, asio::execution::blocking.never)
0164 .execute([ex = _ex, op = std::move(op)]() mutable {
0165 auto opex = asio::get_associated_executor(op, ex);
0166 opex.execute(
0167 [op = std::move(op)]() mutable {
0168 op(asio::error::operation_aborted);
0169 }
0170 );
0171 });
0172 }
0173 }
0174
0175 private:
0176
0177
0178
0179
0180 void execute_op(queued_op_t op) {
0181 asio::require(_ex, asio::execution::blocking.never)
0182 .execute([ex = _ex, op = std::move(op)]() mutable {
0183 auto opex = asio::get_associated_executor(op, ex);
0184 opex.execute(
0185 [op = std::move(op)]() mutable {
0186 op(error_code {});
0187 }
0188 );
0189 });
0190 }
0191
0192
0193
0194
0195
0196 template <typename Handler>
0197 void execute_or_queue(Handler&& handler) noexcept {
0198 tracked_op h { std::move(handler), _ex };
0199 if (_locked) {
0200 _waiting.emplace_back(std::move(h));
0201 auto slot = _waiting.back().get_cancellation_slot();
0202 if (slot.is_connected())
0203 slot.template emplace<cancel_waiting_op>(
0204 _waiting.end() - 1
0205 );
0206 }
0207 else {
0208 _locked = true;
0209 execute_op(queued_op_t { std::move(h) });
0210 }
0211 }
0212 };
0213
0214 }
0215
0216 #endif