Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-17 08:38:20

0001 //
0002 // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
0003 //
0004 // Distributed under the Boost Software License, Version 1.0.
0005 // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MQTT5_ASYNC_SENDER_HPP
0009 #define BOOST_MQTT5_ASYNC_SENDER_HPP
0010 
0011 #include <boost/mqtt5/detail/internal_types.hpp>
0012 
0013 #include <boost/asio/any_completion_handler.hpp>
0014 #include <boost/asio/any_io_executor.hpp>
0015 #include <boost/asio/bind_allocator.hpp>
0016 #include <boost/asio/bind_executor.hpp>
0017 #include <boost/asio/buffer.hpp>
0018 #include <boost/asio/error.hpp>
0019 #include <boost/asio/post.hpp>
0020 #include <boost/asio/prepend.hpp>
0021 #include <boost/asio/recycling_allocator.hpp>
0022 #include <boost/system/error_code.hpp>
0023 
0024 #include <algorithm>
0025 #include <cstdint>
0026 #include <utility>
0027 #include <vector>
0028 
0029 namespace boost::mqtt5::detail {
0030 
0031 namespace asio = boost::asio;
0032 
0033 class write_req {
0034     static constexpr unsigned SERIAL_BITS = sizeof(serial_num_t) * 8;
0035 
0036     asio::const_buffer _buffer;
0037     serial_num_t _serial_num;
0038     unsigned _flags;
0039 
0040     using handler_type = asio::any_completion_handler<void (error_code)>;
0041     handler_type _handler;
0042 
0043 public:
0044     write_req(
0045         asio::const_buffer buffer,
0046         serial_num_t serial_num, unsigned flags,
0047         handler_type handler
0048     ) :
0049         _buffer(buffer), _serial_num(serial_num), _flags(flags),
0050         _handler(std::move(handler))
0051     {}
0052 
0053     write_req(write_req&&) = default;
0054     write_req(const write_req&) = delete;
0055 
0056     write_req& operator=(write_req&&) = default;
0057     write_req& operator=(const write_req&) = delete;
0058 
0059     static serial_num_t next_serial_num(serial_num_t last) {
0060         return last + 1;
0061     }
0062 
0063     asio::const_buffer buffer() const {
0064         return _buffer;
0065     }
0066 
0067     void complete(error_code ec) {
0068         std::move(_handler)(ec);
0069     }
0070 
0071     void complete_post(const asio::any_io_executor& ex, error_code ec) {
0072         asio::post(
0073             ex,
0074             asio::prepend(std::move(_handler), ec)
0075         );
0076     }
0077 
0078     bool empty() const {
0079         return !_handler;
0080     }
0081 
0082     bool throttled() const {
0083         return _flags & send_flag::throttled;
0084     }
0085 
0086     bool terminal() const {
0087         return _flags & send_flag::terminal;
0088     }
0089 
0090     bool operator<(const write_req& other) const {
0091         if (prioritized() != other.prioritized())
0092             return prioritized();
0093 
0094         auto s1 = _serial_num;
0095         auto s2 = other._serial_num;
0096 
0097         if (s1 < s2)
0098             return (s2 - s1) < (1u << (SERIAL_BITS - 1));
0099 
0100         return (s1 - s2) >= (1u << (SERIAL_BITS - 1));
0101     }
0102 
0103 private:
0104     bool prioritized() const {
0105         return _flags & send_flag::prioritized;
0106     }
0107 };
0108 
0109 
0110 template <typename ClientService>
0111 class async_sender {
0112     using self_type = async_sender<ClientService>;
0113 
0114     using client_service = ClientService;
0115 
0116     using queue_allocator_type = asio::recycling_allocator<write_req>;
0117     using write_queue_t = std::vector<write_req, queue_allocator_type>;
0118 
0119     ClientService& _svc;
0120     write_queue_t _write_queue;
0121     bool _write_in_progress { false };
0122 
0123     static constexpr uint16_t MAX_LIMIT = 65535;
0124     uint16_t _limit { MAX_LIMIT };
0125     uint16_t _quota { MAX_LIMIT };
0126 
0127     serial_num_t _last_serial_num { 0 };
0128 
0129 public:
0130     explicit async_sender(ClientService& svc) : _svc(svc) {}
0131 
0132     async_sender(async_sender&&) = default;
0133     async_sender(const async_sender&) = delete;
0134 
0135     async_sender& operator=(async_sender&&) = default;
0136     async_sender& operator=(const async_sender&) = delete;
0137 
0138     using allocator_type = queue_allocator_type;
0139     allocator_type get_allocator() const noexcept {
0140         return allocator_type {};
0141     }
0142 
0143     using executor_type = typename client_service::executor_type;
0144     executor_type get_executor() const noexcept {
0145         return _svc.get_executor();
0146     }
0147 
0148     serial_num_t next_serial_num() {
0149         return _last_serial_num = write_req::next_serial_num(_last_serial_num);
0150     }
0151 
0152     template <typename CompletionToken, typename BufferType>
0153     decltype(auto) async_send(
0154         const BufferType& buffer,
0155         serial_num_t serial_num, unsigned flags,
0156         CompletionToken&& token
0157     ) {
0158         using Signature = void (error_code);
0159 
0160         auto initiation = [](
0161             auto handler, self_type& self, const BufferType& buffer,
0162             serial_num_t serial_num, unsigned flags
0163         ) {
0164             self._write_queue.emplace_back(
0165                 asio::buffer(buffer), serial_num, flags, std::move(handler)
0166             );
0167             self.do_write();
0168         };
0169 
0170         return asio::async_initiate<CompletionToken, Signature>(
0171             initiation, token, std::ref(*this),
0172             buffer, serial_num, flags
0173         );
0174     }
0175 
0176     void cancel() {
0177         auto ops = std::move(_write_queue);
0178         for (auto& op : ops)
0179             op.complete_post(_svc.get_executor(), asio::error::operation_aborted);
0180     }
0181 
0182     void resend() {
0183         if (_write_in_progress)
0184             return;
0185 
0186         // The _write_in_progress flag is set to true to prevent any write
0187         // operations executing before the _write_queue is filled with
0188         // all the packets that require resending.
0189         _write_in_progress = true;
0190 
0191         auto new_limit = _svc._stream_context.connack_property(prop::receive_maximum);
0192         _limit = new_limit.value_or(MAX_LIMIT);
0193         _quota = _limit;
0194 
0195         auto write_queue = std::move(_write_queue);
0196         _svc._replies.resend_unanswered();
0197 
0198         for (auto& op : write_queue)
0199             op.complete(asio::error::try_again);
0200 
0201         std::stable_sort(_write_queue.begin(), _write_queue.end());
0202 
0203         _write_in_progress = false;
0204         do_write();
0205     }
0206 
0207     void operator()(write_queue_t write_queue, error_code ec, size_t) {
0208         _write_in_progress = false;
0209 
0210         if (ec == asio::error::try_again) {
0211             _svc.update_session_state();
0212             _write_queue.insert(
0213                 _write_queue.begin(),
0214                 std::make_move_iterator(write_queue.begin()),
0215                 std::make_move_iterator(write_queue.end())
0216             );
0217             return resend();
0218         }
0219 
0220         if (ec == asio::error::no_recovery)
0221             _svc.cancel();
0222 
0223         // errors, if any, are propagated to ops
0224         for (auto& op : write_queue)
0225             op.complete(ec);
0226 
0227         if (
0228             ec == asio::error::operation_aborted ||
0229             ec == asio::error::no_recovery
0230         )
0231             return;
0232 
0233         do_write();
0234     }
0235 
0236     void throttled_op_done() {
0237         if (_limit == MAX_LIMIT)
0238             return;
0239 
0240         ++_quota;
0241         do_write();
0242     }
0243 
0244 private:
0245     void do_write() {
0246         if (_write_in_progress || _write_queue.empty())
0247             return;
0248 
0249         _write_in_progress = true;
0250 
0251         write_queue_t write_queue;
0252 
0253         auto terminal_req = std::find_if(
0254             _write_queue.begin(), _write_queue.end(),
0255             [](const auto& op) { return op.terminal(); }
0256         );
0257 
0258         if (terminal_req != _write_queue.end()) {
0259             write_queue.push_back(std::move(*terminal_req));
0260             _write_queue.erase(terminal_req);
0261         }
0262         else if (_limit == MAX_LIMIT) {
0263             write_queue = std::move(_write_queue);
0264         }
0265         else {
0266             for (write_req& req : _write_queue)
0267                 if (!req.throttled())
0268                     write_queue.push_back(std::move(req));
0269                 else if (_quota > 0) {
0270                     --_quota;
0271                     write_queue.push_back(std::move(req));
0272                 }
0273 
0274             if (write_queue.empty()) {
0275                 _write_in_progress = false;
0276                 return;
0277             }
0278 
0279             auto it = std::remove_if(
0280                 _write_queue.begin(), _write_queue.end(),
0281                 [](const write_req& req) { return req.empty(); }
0282             );
0283             _write_queue.erase(it, _write_queue.end());
0284         }
0285 
0286         std::vector<asio::const_buffer> buffers;
0287         buffers.reserve(write_queue.size());
0288         for (const auto& op : write_queue)
0289             buffers.push_back(op.buffer());
0290 
0291         _svc._replies.clear_fast_replies();
0292 
0293         _svc._stream.async_write(
0294             buffers,
0295             asio::prepend(std::ref(*this), std::move(write_queue))
0296         );
0297     }
0298 
0299 };
0300 
0301 } // end namespace boost::mqtt5::detail
0302 
0303 #endif // !BOOST_MQTT5_ASYNC_SENDER_HPP