File indexing completed on 2025-09-17 08:38:20
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MQTT5_ASYNC_SENDER_HPP
0009 #define BOOST_MQTT5_ASYNC_SENDER_HPP
0010
0011 #include <boost/mqtt5/detail/internal_types.hpp>
0012
0013 #include <boost/asio/any_completion_handler.hpp>
0014 #include <boost/asio/any_io_executor.hpp>
0015 #include <boost/asio/bind_allocator.hpp>
0016 #include <boost/asio/bind_executor.hpp>
0017 #include <boost/asio/buffer.hpp>
0018 #include <boost/asio/error.hpp>
0019 #include <boost/asio/post.hpp>
0020 #include <boost/asio/prepend.hpp>
0021 #include <boost/asio/recycling_allocator.hpp>
0022 #include <boost/system/error_code.hpp>
0023
0024 #include <algorithm>
0025 #include <cstdint>
0026 #include <utility>
0027 #include <vector>
0028
0029 namespace boost::mqtt5::detail {
0030
0031 namespace asio = boost::asio;
0032
0033 class write_req {
0034 static constexpr unsigned SERIAL_BITS = sizeof(serial_num_t) * 8;
0035
0036 asio::const_buffer _buffer;
0037 serial_num_t _serial_num;
0038 unsigned _flags;
0039
0040 using handler_type = asio::any_completion_handler<void (error_code)>;
0041 handler_type _handler;
0042
0043 public:
0044 write_req(
0045 asio::const_buffer buffer,
0046 serial_num_t serial_num, unsigned flags,
0047 handler_type handler
0048 ) :
0049 _buffer(buffer), _serial_num(serial_num), _flags(flags),
0050 _handler(std::move(handler))
0051 {}
0052
0053 write_req(write_req&&) = default;
0054 write_req(const write_req&) = delete;
0055
0056 write_req& operator=(write_req&&) = default;
0057 write_req& operator=(const write_req&) = delete;
0058
0059 static serial_num_t next_serial_num(serial_num_t last) {
0060 return last + 1;
0061 }
0062
0063 asio::const_buffer buffer() const {
0064 return _buffer;
0065 }
0066
0067 void complete(error_code ec) {
0068 std::move(_handler)(ec);
0069 }
0070
0071 void complete_post(const asio::any_io_executor& ex, error_code ec) {
0072 asio::post(
0073 ex,
0074 asio::prepend(std::move(_handler), ec)
0075 );
0076 }
0077
0078 bool empty() const {
0079 return !_handler;
0080 }
0081
0082 bool throttled() const {
0083 return _flags & send_flag::throttled;
0084 }
0085
0086 bool terminal() const {
0087 return _flags & send_flag::terminal;
0088 }
0089
0090 bool operator<(const write_req& other) const {
0091 if (prioritized() != other.prioritized())
0092 return prioritized();
0093
0094 auto s1 = _serial_num;
0095 auto s2 = other._serial_num;
0096
0097 if (s1 < s2)
0098 return (s2 - s1) < (1u << (SERIAL_BITS - 1));
0099
0100 return (s1 - s2) >= (1u << (SERIAL_BITS - 1));
0101 }
0102
0103 private:
0104 bool prioritized() const {
0105 return _flags & send_flag::prioritized;
0106 }
0107 };
0108
0109
0110 template <typename ClientService>
0111 class async_sender {
0112 using self_type = async_sender<ClientService>;
0113
0114 using client_service = ClientService;
0115
0116 using queue_allocator_type = asio::recycling_allocator<write_req>;
0117 using write_queue_t = std::vector<write_req, queue_allocator_type>;
0118
0119 ClientService& _svc;
0120 write_queue_t _write_queue;
0121 bool _write_in_progress { false };
0122
0123 static constexpr uint16_t MAX_LIMIT = 65535;
0124 uint16_t _limit { MAX_LIMIT };
0125 uint16_t _quota { MAX_LIMIT };
0126
0127 serial_num_t _last_serial_num { 0 };
0128
0129 public:
0130 explicit async_sender(ClientService& svc) : _svc(svc) {}
0131
0132 async_sender(async_sender&&) = default;
0133 async_sender(const async_sender&) = delete;
0134
0135 async_sender& operator=(async_sender&&) = default;
0136 async_sender& operator=(const async_sender&) = delete;
0137
0138 using allocator_type = queue_allocator_type;
0139 allocator_type get_allocator() const noexcept {
0140 return allocator_type {};
0141 }
0142
0143 using executor_type = typename client_service::executor_type;
0144 executor_type get_executor() const noexcept {
0145 return _svc.get_executor();
0146 }
0147
0148 serial_num_t next_serial_num() {
0149 return _last_serial_num = write_req::next_serial_num(_last_serial_num);
0150 }
0151
0152 template <typename CompletionToken, typename BufferType>
0153 decltype(auto) async_send(
0154 const BufferType& buffer,
0155 serial_num_t serial_num, unsigned flags,
0156 CompletionToken&& token
0157 ) {
0158 using Signature = void (error_code);
0159
0160 auto initiation = [](
0161 auto handler, self_type& self, const BufferType& buffer,
0162 serial_num_t serial_num, unsigned flags
0163 ) {
0164 self._write_queue.emplace_back(
0165 asio::buffer(buffer), serial_num, flags, std::move(handler)
0166 );
0167 self.do_write();
0168 };
0169
0170 return asio::async_initiate<CompletionToken, Signature>(
0171 initiation, token, std::ref(*this),
0172 buffer, serial_num, flags
0173 );
0174 }
0175
0176 void cancel() {
0177 auto ops = std::move(_write_queue);
0178 for (auto& op : ops)
0179 op.complete_post(_svc.get_executor(), asio::error::operation_aborted);
0180 }
0181
0182 void resend() {
0183 if (_write_in_progress)
0184 return;
0185
0186
0187
0188
0189 _write_in_progress = true;
0190
0191 auto new_limit = _svc._stream_context.connack_property(prop::receive_maximum);
0192 _limit = new_limit.value_or(MAX_LIMIT);
0193 _quota = _limit;
0194
0195 auto write_queue = std::move(_write_queue);
0196 _svc._replies.resend_unanswered();
0197
0198 for (auto& op : write_queue)
0199 op.complete(asio::error::try_again);
0200
0201 std::stable_sort(_write_queue.begin(), _write_queue.end());
0202
0203 _write_in_progress = false;
0204 do_write();
0205 }
0206
0207 void operator()(write_queue_t write_queue, error_code ec, size_t) {
0208 _write_in_progress = false;
0209
0210 if (ec == asio::error::try_again) {
0211 _svc.update_session_state();
0212 _write_queue.insert(
0213 _write_queue.begin(),
0214 std::make_move_iterator(write_queue.begin()),
0215 std::make_move_iterator(write_queue.end())
0216 );
0217 return resend();
0218 }
0219
0220 if (ec == asio::error::no_recovery)
0221 _svc.cancel();
0222
0223
0224 for (auto& op : write_queue)
0225 op.complete(ec);
0226
0227 if (
0228 ec == asio::error::operation_aborted ||
0229 ec == asio::error::no_recovery
0230 )
0231 return;
0232
0233 do_write();
0234 }
0235
0236 void throttled_op_done() {
0237 if (_limit == MAX_LIMIT)
0238 return;
0239
0240 ++_quota;
0241 do_write();
0242 }
0243
0244 private:
0245 void do_write() {
0246 if (_write_in_progress || _write_queue.empty())
0247 return;
0248
0249 _write_in_progress = true;
0250
0251 write_queue_t write_queue;
0252
0253 auto terminal_req = std::find_if(
0254 _write_queue.begin(), _write_queue.end(),
0255 [](const auto& op) { return op.terminal(); }
0256 );
0257
0258 if (terminal_req != _write_queue.end()) {
0259 write_queue.push_back(std::move(*terminal_req));
0260 _write_queue.erase(terminal_req);
0261 }
0262 else if (_limit == MAX_LIMIT) {
0263 write_queue = std::move(_write_queue);
0264 }
0265 else {
0266 for (write_req& req : _write_queue)
0267 if (!req.throttled())
0268 write_queue.push_back(std::move(req));
0269 else if (_quota > 0) {
0270 --_quota;
0271 write_queue.push_back(std::move(req));
0272 }
0273
0274 if (write_queue.empty()) {
0275 _write_in_progress = false;
0276 return;
0277 }
0278
0279 auto it = std::remove_if(
0280 _write_queue.begin(), _write_queue.end(),
0281 [](const write_req& req) { return req.empty(); }
0282 );
0283 _write_queue.erase(it, _write_queue.end());
0284 }
0285
0286 std::vector<asio::const_buffer> buffers;
0287 buffers.reserve(write_queue.size());
0288 for (const auto& op : write_queue)
0289 buffers.push_back(op.buffer());
0290
0291 _svc._replies.clear_fast_replies();
0292
0293 _svc._stream.async_write(
0294 buffers,
0295 asio::prepend(std::ref(*this), std::move(write_queue))
0296 );
0297 }
0298
0299 };
0300
0301 }
0302
0303 #endif