File indexing completed on 2025-09-17 08:38:22
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MQTT5_REPLIES_HPP
0009 #define BOOST_MQTT5_REPLIES_HPP
0010
0011 #include <boost/mqtt5/types.hpp>
0012
0013 #include <boost/mqtt5/detail/control_packet.hpp>
0014 #include <boost/mqtt5/detail/internal_types.hpp>
0015
0016 #include <boost/asio/any_completion_handler.hpp>
0017 #include <boost/asio/any_io_executor.hpp>
0018 #include <boost/asio/async_result.hpp>
0019 #include <boost/asio/consign.hpp>
0020 #include <boost/asio/dispatch.hpp>
0021 #include <boost/asio/error.hpp>
0022 #include <boost/asio/post.hpp>
0023 #include <boost/asio/prepend.hpp>
0024
0025 #include <algorithm>
0026 #include <chrono>
0027 #include <cstdint>
0028 #include <memory>
0029 #include <string>
0030 #include <vector>
0031
0032 namespace boost::mqtt5::detail {
0033
0034 namespace asio = boost::asio;
0035
0036 class replies {
0037 public:
0038 using executor_type = asio::any_io_executor;
0039 private:
0040 using Signature = void (error_code, byte_citer, byte_citer);
0041
0042 static constexpr auto max_reply_time = std::chrono::seconds(20);
0043
0044 class reply_handler {
0045 asio::any_completion_handler<Signature> _handler;
0046 control_code_e _code;
0047 uint16_t _packet_id;
0048 std::chrono::time_point<std::chrono::system_clock> _ts;
0049 public:
0050 template <typename H>
0051 reply_handler(control_code_e code, uint16_t pid, H&& handler) :
0052 _handler(std::forward<H>(handler)), _code(code), _packet_id(pid),
0053 _ts(std::chrono::system_clock::now())
0054 {}
0055
0056 reply_handler(reply_handler&&) = default;
0057 reply_handler(const reply_handler&) = delete;
0058
0059 reply_handler& operator=(reply_handler&&) = default;
0060 reply_handler& operator=(const reply_handler&) = delete;
0061
0062 void complete(
0063 error_code ec,
0064 byte_citer first = byte_citer {}, byte_citer last = byte_citer {}
0065 ) {
0066 std::move(_handler)(ec, first, last);
0067 }
0068
0069 void complete_post(const executor_type& ex, error_code ec) {
0070 asio::post(
0071 ex,
0072 asio::prepend(
0073 std::move(_handler), ec, byte_citer {}, byte_citer {}
0074 )
0075 );
0076 }
0077
0078 uint16_t packet_id() const noexcept {
0079 return _packet_id;
0080 }
0081
0082 control_code_e code() const noexcept {
0083 return _code;
0084 }
0085
0086 auto time() const noexcept {
0087 return _ts;
0088 }
0089 };
0090
0091 executor_type _ex;
0092
0093 using handlers = std::vector<reply_handler>;
0094 handlers _handlers;
0095
0096 struct fast_reply {
0097 control_code_e code;
0098 uint16_t packet_id;
0099 std::unique_ptr<std::string> packet;
0100 };
0101 using fast_replies = std::vector<fast_reply>;
0102 fast_replies _fast_replies;
0103
0104 public:
0105 template <typename Executor>
0106 explicit replies(Executor ex) : _ex(std::move(ex)) {}
0107
0108 replies(replies&&) = default;
0109 replies(const replies&) = delete;
0110
0111 replies& operator=(replies&&) = default;
0112 replies& operator=(const replies&) = delete;
0113
0114 template <typename CompletionToken>
0115 decltype(auto) async_wait_reply(
0116 control_code_e code, uint16_t packet_id, CompletionToken&& token
0117 ) {
0118 auto dup_handler_ptr = find_handler(code, packet_id);
0119 if (dup_handler_ptr != _handlers.end()) {
0120 dup_handler_ptr->complete_post(_ex, asio::error::operation_aborted);
0121 _handlers.erase(dup_handler_ptr);
0122 }
0123
0124 auto freply = find_fast_reply(code, packet_id);
0125
0126 if (freply == _fast_replies.end()) {
0127 auto initiation = [](
0128 auto handler, replies& self,
0129 control_code_e code, uint16_t packet_id
0130 ) {
0131 self._handlers.emplace_back(
0132 code, packet_id, std::move(handler)
0133 );
0134 };
0135 return asio::async_initiate<CompletionToken, Signature>(
0136 initiation, token, std::ref(*this), code, packet_id
0137 );
0138 }
0139
0140 auto fdata = std::move(*freply);
0141 _fast_replies.erase(freply);
0142
0143 auto initiation = [](
0144 auto handler, std::unique_ptr<std::string> packet,
0145 const executor_type& ex
0146 ) {
0147 byte_citer first = packet->cbegin();
0148 byte_citer last = packet->cend();
0149
0150 asio::post(
0151 ex,
0152 asio::consign(
0153 asio::prepend(
0154 std::move(handler), error_code {}, first, last
0155 ),
0156 std::move(packet)
0157 )
0158 );
0159 };
0160
0161 return asio::async_initiate<CompletionToken, Signature>(
0162 initiation, token, std::move(fdata.packet), _ex
0163 );
0164 }
0165
0166 void dispatch(
0167 error_code ec, control_code_e code, uint16_t packet_id,
0168 byte_citer first, byte_citer last
0169 ) {
0170 auto handler_ptr = find_handler(code, packet_id);
0171
0172 if (handler_ptr == _handlers.end()) {
0173 _fast_replies.push_back({
0174 code, packet_id,
0175 std::make_unique<std::string>(first, last)
0176 });
0177 return;
0178 }
0179
0180 auto handler = std::move(*handler_ptr);
0181 _handlers.erase(handler_ptr);
0182 handler.complete(ec, first, last);
0183 }
0184
0185 void resend_unanswered() {
0186 auto ua = std::move(_handlers);
0187 for (auto& h : ua)
0188 h.complete(asio::error::try_again);
0189 }
0190
0191 void cancel_unanswered() {
0192 auto ua = std::move(_handlers);
0193 for (auto& h : ua)
0194 h.complete_post(_ex, asio::error::operation_aborted);
0195 }
0196
0197 bool any_expired() {
0198 auto now = std::chrono::system_clock::now();
0199 return std::any_of(
0200 _handlers.begin(), _handlers.end(),
0201 [now](const auto& h) {
0202 return now - h.time() > max_reply_time;
0203 }
0204 );
0205 }
0206
0207 void clear_fast_replies() {
0208 _fast_replies.clear();
0209 }
0210
0211 void clear_pending_pubrels() {
0212 for (auto it = _handlers.begin(); it != _handlers.end();) {
0213 if (it->code() == control_code_e::pubrel) {
0214 it->complete(asio::error::operation_aborted);
0215 it = _handlers.erase(it);
0216 }
0217 else
0218 ++it;
0219 }
0220 }
0221
0222 private:
0223 handlers::iterator find_handler(control_code_e code, uint16_t packet_id) {
0224 return std::find_if(
0225 _handlers.begin(), _handlers.end(),
0226 [code, packet_id](const auto& h) {
0227 return h.code() == code && h.packet_id() == packet_id;
0228 }
0229 );
0230 }
0231
0232 fast_replies::iterator find_fast_reply(
0233 control_code_e code, uint16_t packet_id
0234 ) {
0235 return std::find_if(
0236 _fast_replies.begin(), _fast_replies.end(),
0237 [code, packet_id](const auto& f) {
0238 return f.code == code && f.packet_id == packet_id;
0239 }
0240 );
0241 }
0242
0243 };
0244
0245 }
0246
0247 #endif