Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-17 08:38:22

0001 //
0002 // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
0003 //
0004 // Distributed under the Boost Software License, Version 1.0.
0005 // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MQTT5_REPLIES_HPP
0009 #define BOOST_MQTT5_REPLIES_HPP
0010 
0011 #include <boost/mqtt5/types.hpp>
0012 
0013 #include <boost/mqtt5/detail/control_packet.hpp>
0014 #include <boost/mqtt5/detail/internal_types.hpp>
0015 
0016 #include <boost/asio/any_completion_handler.hpp>
0017 #include <boost/asio/any_io_executor.hpp>
0018 #include <boost/asio/async_result.hpp>
0019 #include <boost/asio/consign.hpp>
0020 #include <boost/asio/dispatch.hpp>
0021 #include <boost/asio/error.hpp>
0022 #include <boost/asio/post.hpp>
0023 #include <boost/asio/prepend.hpp>
0024 
0025 #include <algorithm>
0026 #include <chrono>
0027 #include <cstdint>
0028 #include <memory>
0029 #include <string>
0030 #include <vector>
0031 
0032 namespace boost::mqtt5::detail {
0033 
0034 namespace asio = boost::asio;
0035 
0036 class replies {
0037 public:
0038     using executor_type = asio::any_io_executor;
0039 private:
0040     using Signature = void (error_code, byte_citer, byte_citer);
0041 
0042     static constexpr auto max_reply_time = std::chrono::seconds(20);
0043 
0044     class reply_handler {
0045         asio::any_completion_handler<Signature> _handler;
0046         control_code_e _code;
0047         uint16_t _packet_id;
0048         std::chrono::time_point<std::chrono::system_clock> _ts;
0049     public:
0050         template <typename H>
0051         reply_handler(control_code_e code, uint16_t pid, H&& handler) :
0052             _handler(std::forward<H>(handler)), _code(code), _packet_id(pid),
0053             _ts(std::chrono::system_clock::now())
0054         {}
0055 
0056         reply_handler(reply_handler&&) = default;
0057         reply_handler(const reply_handler&) = delete;
0058 
0059         reply_handler& operator=(reply_handler&&) = default;
0060         reply_handler& operator=(const reply_handler&) = delete;
0061 
0062         void complete(
0063             error_code ec,
0064             byte_citer first = byte_citer {}, byte_citer last = byte_citer {}
0065         ) {
0066             std::move(_handler)(ec, first, last);
0067         }
0068 
0069         void complete_post(const executor_type& ex, error_code ec) {
0070             asio::post(
0071                 ex,
0072                 asio::prepend(
0073                     std::move(_handler), ec, byte_citer {}, byte_citer {}
0074                 )
0075             );
0076         }
0077 
0078         uint16_t packet_id() const noexcept {
0079             return _packet_id;
0080         }
0081 
0082         control_code_e code() const noexcept {
0083             return _code;
0084         }
0085 
0086         auto time() const noexcept {
0087             return _ts;
0088         }
0089     };
0090 
0091     executor_type _ex;
0092 
0093     using handlers = std::vector<reply_handler>;
0094     handlers _handlers;
0095 
0096     struct fast_reply {
0097         control_code_e code;
0098         uint16_t packet_id;
0099         std::unique_ptr<std::string> packet;
0100     };
0101     using fast_replies = std::vector<fast_reply>;
0102     fast_replies _fast_replies;
0103 
0104 public:
0105     template <typename Executor>
0106     explicit replies(Executor ex) : _ex(std::move(ex)) {}
0107 
0108     replies(replies&&) = default;
0109     replies(const replies&) = delete;
0110 
0111     replies& operator=(replies&&) = default;
0112     replies& operator=(const replies&) = delete;
0113 
0114     template <typename CompletionToken>
0115     decltype(auto) async_wait_reply(
0116         control_code_e code, uint16_t packet_id, CompletionToken&& token
0117     ) {
0118         auto dup_handler_ptr = find_handler(code, packet_id);
0119         if (dup_handler_ptr != _handlers.end()) {
0120             dup_handler_ptr->complete_post(_ex, asio::error::operation_aborted);
0121             _handlers.erase(dup_handler_ptr);
0122         }
0123 
0124         auto freply = find_fast_reply(code, packet_id);
0125 
0126         if (freply == _fast_replies.end()) {
0127             auto initiation = [](
0128                 auto handler, replies& self,
0129                 control_code_e code, uint16_t packet_id
0130             ) {
0131                 self._handlers.emplace_back(
0132                     code, packet_id, std::move(handler)
0133                 );
0134             };
0135             return asio::async_initiate<CompletionToken, Signature>(
0136                 initiation, token, std::ref(*this), code, packet_id
0137             );
0138         }
0139 
0140         auto fdata = std::move(*freply);
0141         _fast_replies.erase(freply);
0142 
0143         auto initiation = [](
0144             auto handler, std::unique_ptr<std::string> packet,
0145             const executor_type& ex
0146         ) {
0147             byte_citer first = packet->cbegin();
0148             byte_citer last = packet->cend();
0149 
0150             asio::post(
0151                 ex,
0152                 asio::consign(
0153                     asio::prepend(
0154                         std::move(handler), error_code {}, first, last
0155                     ),
0156                     std::move(packet)
0157                 )
0158             );
0159         };
0160 
0161         return asio::async_initiate<CompletionToken, Signature>(
0162             initiation, token, std::move(fdata.packet), _ex
0163         );
0164     }
0165 
0166     void dispatch(
0167         error_code ec, control_code_e code, uint16_t packet_id,
0168         byte_citer first, byte_citer last
0169     ) {
0170         auto handler_ptr = find_handler(code, packet_id);
0171 
0172         if (handler_ptr == _handlers.end()) {
0173             _fast_replies.push_back({
0174                 code, packet_id,
0175                 std::make_unique<std::string>(first, last)
0176             });
0177             return;
0178         }
0179 
0180         auto handler = std::move(*handler_ptr);
0181         _handlers.erase(handler_ptr);
0182         handler.complete(ec, first, last);
0183     }
0184 
0185     void resend_unanswered() {
0186         auto ua = std::move(_handlers);
0187         for (auto& h : ua)
0188             h.complete(asio::error::try_again);
0189     }
0190 
0191     void cancel_unanswered() {
0192         auto ua = std::move(_handlers);
0193         for (auto& h : ua)
0194             h.complete_post(_ex, asio::error::operation_aborted);
0195     }
0196 
0197     bool any_expired() {
0198         auto now = std::chrono::system_clock::now();
0199         return std::any_of(
0200             _handlers.begin(), _handlers.end(),
0201             [now](const auto& h) {
0202                 return now - h.time() > max_reply_time;
0203             }
0204         );
0205     }
0206 
0207     void clear_fast_replies() {
0208         _fast_replies.clear();
0209     }
0210 
0211     void clear_pending_pubrels() {
0212         for (auto it = _handlers.begin(); it != _handlers.end();) {
0213             if (it->code() == control_code_e::pubrel) {
0214                 it->complete(asio::error::operation_aborted);
0215                 it = _handlers.erase(it);
0216             }
0217             else
0218                 ++it;
0219         }
0220     }
0221 
0222 private:
0223     handlers::iterator find_handler(control_code_e code, uint16_t packet_id) {
0224         return std::find_if(
0225             _handlers.begin(), _handlers.end(),
0226             [code, packet_id](const auto& h) {
0227                 return h.code() == code && h.packet_id() == packet_id;
0228             }
0229         );
0230     }
0231 
0232     fast_replies::iterator find_fast_reply(
0233         control_code_e code, uint16_t packet_id
0234     ) {
0235         return std::find_if(
0236             _fast_replies.begin(), _fast_replies.end(),
0237             [code, packet_id](const auto& f) {
0238                 return f.code == code && f.packet_id == packet_id;
0239             }
0240         );
0241     }
0242 
0243 };
0244 
0245 } // end namespace boost::mqtt5::detail
0246 
0247 #endif // !BOOST_MQTT5_REPLIES_HPP