Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-17 08:38:19

0001 //
0002 // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
0003 //
0004 // Distributed under the Boost Software License, Version 1.0.
0005 // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MQTT5_ASYNC_MUTEX_HPP
0009 #define BOOST_MQTT5_ASYNC_MUTEX_HPP
0010 
0011 #include <boost/mqtt5/detail/async_traits.hpp>
0012 
0013 #include <boost/asio/any_completion_handler.hpp>
0014 #include <boost/asio/any_io_executor.hpp>
0015 #include <boost/asio/associated_allocator.hpp>
0016 #include <boost/asio/associated_cancellation_slot.hpp>
0017 #include <boost/asio/async_result.hpp>
0018 #include <boost/asio/bind_cancellation_slot.hpp>
0019 #include <boost/asio/execution.hpp>
0020 #include <boost/asio/require.hpp>
0021 #include <boost/system/error_code.hpp>
0022 
0023 #include <deque>
0024 
0025 namespace boost::mqtt5::detail {
0026 
0027 namespace asio = boost::asio;
0028 using error_code = boost::system::error_code;
0029 
0030 class async_mutex {
0031 public:
0032     using executor_type = asio::any_io_executor;
0033 private:
0034     using queued_op_t = asio::any_completion_handler<
0035         void (error_code)
0036     >;
0037     using queue_t = std::deque<queued_op_t>;
0038 
0039     // Handler with assigned tracking executor.
0040     // Objects of this type are type-erased by any_completion_handler
0041     // and stored in the waiting queue.
0042     template <typename Handler, typename Executor>
0043     class tracked_op {
0044         tracking_type<Handler, Executor> _executor;
0045         Handler _handler;
0046     public:
0047         tracked_op(Handler&& h, const Executor& ex) :
0048             _executor(tracking_executor(h, ex)), _handler(std::move(h))
0049         {}
0050 
0051         tracked_op(tracked_op&&) = default;
0052         tracked_op(const tracked_op&) = delete;
0053 
0054         tracked_op& operator=(tracked_op&&) = default;
0055         tracked_op& operator=(const tracked_op&) = delete;
0056 
0057         using allocator_type = asio::associated_allocator_t<Handler>;
0058         allocator_type get_allocator() const noexcept {
0059             return asio::get_associated_allocator(_handler);
0060         }
0061 
0062         using cancellation_slot_type =
0063             asio::associated_cancellation_slot_t<Handler>;
0064         cancellation_slot_type get_cancellation_slot() const noexcept {
0065             return asio::get_associated_cancellation_slot(_handler);
0066         }
0067 
0068         using executor_type = tracking_type<Handler, Executor>;
0069         executor_type get_executor() const noexcept {
0070             return _executor;
0071         }
0072 
0073         void operator()(error_code ec) {
0074             std::move(_handler)(ec);
0075         }
0076     };
0077 
0078     // Per-operation cancellation helper.
0079     // It is safe to emit the cancellation signal from any thread
0080     // provided there are no other concurrent calls to the async_mutex.
0081     // The helper stores queue iterator to operation since the iterator
0082     // would not be invalidated by other queue operations.
0083     class cancel_waiting_op {
0084         queue_t::iterator _ihandler;
0085     public:
0086         explicit cancel_waiting_op(queue_t::iterator ih) : _ihandler(ih) {}
0087 
0088         void operator()(asio::cancellation_type_t type) {
0089             if (type == asio::cancellation_type_t::none)
0090                 return;
0091             if (*_ihandler) {
0092                 auto h = std::move(*_ihandler);
0093                 auto ex = asio::get_associated_executor(h);
0094                 asio::require(ex, asio::execution::blocking.possibly)
0095                     .execute([h = std::move(h)]() mutable {
0096                         std::move(h)(asio::error::operation_aborted);
0097                     });
0098             }
0099         }
0100     };
0101 
0102     bool _locked { false };
0103     queue_t _waiting;
0104     executor_type _ex;
0105 
0106 public:
0107     template <typename Executor>
0108     explicit async_mutex(Executor&& ex) : _ex(std::forward<Executor>(ex)) {}
0109 
0110     async_mutex(const async_mutex&) = delete;
0111     async_mutex& operator=(const async_mutex&) = delete;
0112 
0113     ~async_mutex() {
0114         cancel();
0115     }
0116 
0117     const executor_type& get_executor() const noexcept {
0118         return _ex;
0119     }
0120 
0121     bool is_locked() const noexcept {
0122         return _locked;
0123     }
0124 
0125     // Schedules mutex for lock operation and return immediately.
0126     // Calls given completion handler when mutex is locked.
0127     // It's the responsibility of the completion handler to unlock the mutex.
0128     template <typename CompletionToken>
0129     decltype(auto) lock(CompletionToken&& token) noexcept {
0130         using Signature = void (error_code);
0131 
0132         auto initiation = [] (auto handler, async_mutex& self) {
0133             self.execute_or_queue(std::move(handler));
0134         };
0135 
0136         return asio::async_initiate<CompletionToken, Signature>(
0137             initiation, token, std::ref(*this)
0138         );
0139     }
0140 
0141     // Unlocks the mutex. The mutex must be in locked state.
0142     // Next queued operation, if any, will be executed in a manner
0143     // equivalent to asio::post.
0144     void unlock() {
0145         while (!_waiting.empty()) {
0146             auto op = std::move(_waiting.front());
0147             _waiting.pop_front();
0148             if (!op) continue;
0149             op.get_cancellation_slot().clear();
0150             execute_op(std::move(op));
0151             return;
0152         }
0153         _locked = false;
0154     }
0155 
0156     // Cancels all outstanding operations waiting on the mutex.
0157     void cancel() {
0158         while (!_waiting.empty()) {
0159             auto op = std::move(_waiting.front());
0160             _waiting.pop_front();
0161             if (!op) continue;
0162             op.get_cancellation_slot().clear();
0163             asio::require(_ex, asio::execution::blocking.never)
0164                 .execute([ex = _ex, op = std::move(op)]() mutable {
0165                     auto opex = asio::get_associated_executor(op, ex);
0166                     opex.execute(
0167                         [op = std::move(op)]() mutable {
0168                             op(asio::error::operation_aborted);
0169                         }
0170                     );
0171                 });
0172         }
0173     }
0174 
0175 private:
0176 
0177     // Schedule operation to `opex` executor using `_ex` executor.
0178     // The operation is equivalent to asio::post(_ex, op) but
0179     // for some reason this form of execution is much faster.
0180     void execute_op(queued_op_t op) {
0181         asio::require(_ex, asio::execution::blocking.never)
0182             .execute([ex = _ex, op = std::move(op)]() mutable {
0183                 auto opex = asio::get_associated_executor(op, ex);
0184                 opex.execute(
0185                     [op = std::move(op)]() mutable {
0186                         op(error_code {});
0187                     }
0188                 );
0189             });
0190     }
0191 
0192     // Executes operation immediately if mutex is not locked
0193     // or queues it for later execution otherwise. In both cases
0194     // the operation will be executed in a manner equivalent
0195     // to asio::post to avoid recursion.
0196     template <typename Handler>
0197     void execute_or_queue(Handler&& handler) noexcept {
0198         tracked_op h { std::move(handler), _ex };
0199         if (_locked) {
0200             _waiting.emplace_back(std::move(h));
0201             auto slot = _waiting.back().get_cancellation_slot();
0202             if (slot.is_connected())
0203                 slot.template emplace<cancel_waiting_op>(
0204                     _waiting.end() - 1
0205                 );
0206         }
0207         else {
0208             _locked = true;
0209             execute_op(queued_op_t { std::move(h) });
0210         }
0211     }
0212 };
0213 
0214 } // end namespace boost::mqtt5::detail
0215 
0216 #endif // !BOOST_MQTT5_ASYNC_MUTEX_HPP