Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-17 08:38:22

0001 //
0002 // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
0003 //
0004 // Distributed under the Boost Software License, Version 1.0.
0005 // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MQTT5_READ_OP_HPP
0009 #define BOOST_MQTT5_READ_OP_HPP
0010 
0011 #include <boost/mqtt5/detail/internal_types.hpp>
0012 
0013 #include <boost/asio/associated_allocator.hpp>
0014 #include <boost/asio/associated_executor.hpp>
0015 #include <boost/asio/deferred.hpp>
0016 #include <boost/asio/error.hpp>
0017 #include <boost/asio/experimental/parallel_group.hpp>
0018 #include <boost/asio/post.hpp>
0019 #include <boost/asio/prepend.hpp>
0020 
0021 #include <array>
0022 
0023 namespace boost::mqtt5::detail {
0024 
0025 namespace asio = boost::asio;
0026 namespace asioex = boost::asio::experimental;
0027 
0028 template <typename Owner, typename Handler>
0029 class read_op {
0030     struct on_read {};
0031     struct on_reconnect {};
0032 
0033     Owner& _owner;
0034 
0035     using handler_type = Handler;
0036     handler_type _handler;
0037 
0038 public:
0039     read_op(Owner& owner, Handler&& handler) :
0040         _owner(owner), _handler(std::move(handler))
0041     {}
0042 
0043     read_op(read_op&&) = default;
0044     read_op(const read_op&) = delete;
0045 
0046     read_op& operator=(read_op&&) = default;
0047     read_op& operator=(const read_op&) = delete;
0048 
0049     using allocator_type = asio::associated_allocator_t<handler_type>;
0050     allocator_type get_allocator() const noexcept {
0051         return asio::get_associated_allocator(_handler);
0052     }
0053 
0054     using executor_type = asio::associated_executor_t<handler_type>;
0055     executor_type get_executor() const noexcept {
0056         return asio::get_associated_executor(_handler);
0057     }
0058 
0059     template <typename BufferType>
0060     void perform(
0061         const BufferType& buffer, duration wait_for
0062     ) {
0063         auto stream_ptr = _owner._stream_ptr;
0064 
0065         if (_owner.was_connected()) {
0066             _owner._read_timer.expires_after(wait_for);
0067 
0068             auto timed_read = asioex::make_parallel_group(
0069                 stream_ptr->async_read_some(buffer, asio::deferred),
0070                 _owner._read_timer.async_wait(asio::deferred)
0071             );
0072 
0073             timed_read.async_wait(
0074                 asioex::wait_for_one(),
0075                 asio::prepend(std::move(*this), on_read {}, stream_ptr)
0076             );
0077         }
0078         else
0079             asio::post(
0080                 _owner.get_executor(),
0081                 asio::prepend(
0082                     std::move(*this), on_read {}, stream_ptr,
0083                     std::array<size_t, 2> { 0, 1 },
0084                     asio::error::not_connected, 0, error_code {}
0085                 )
0086             );
0087     }
0088 
0089     void operator()(
0090         on_read, typename Owner::stream_ptr stream_ptr,
0091         std::array<std::size_t, 2> ord, error_code read_ec, size_t bytes_read,
0092         error_code
0093     ) {
0094         if (!_owner.is_open())
0095             return complete(asio::error::operation_aborted, bytes_read);
0096 
0097         error_code ec = ord[0] == 1 ? asio::error::timed_out : read_ec;
0098         bytes_read = ord[0] == 0 ? bytes_read : 0;
0099 
0100         if (!ec)
0101             return complete(ec, bytes_read);
0102 
0103         // websocket returns operation_aborted if disconnected
0104         if (should_reconnect(ec) || ec == asio::error::operation_aborted)
0105             return _owner.async_reconnect(
0106                 stream_ptr, asio::prepend(std::move(*this), on_reconnect {})
0107             );
0108 
0109         return complete(asio::error::no_recovery, bytes_read);
0110     }
0111 
0112     void operator()(on_reconnect, error_code ec) {
0113         if ((ec == asio::error::operation_aborted && _owner.is_open()) || !ec)
0114             ec = asio::error::try_again;
0115 
0116         return complete(ec, 0);
0117     }
0118 
0119 private:
0120     void complete(error_code ec, size_t bytes_read) {
0121         std::move(_handler)(ec, bytes_read);
0122     }
0123 
0124     static bool should_reconnect(error_code ec) {
0125         using namespace asio::error;
0126         // note: Win ERROR_SEM_TIMEOUT == Posix ENOLINK (Reserved)
0127         return ec.value() == 1236L || /* Win ERROR_CONNECTION_ABORTED */
0128             ec.value() == 121L || /* Win ERROR_SEM_TIMEOUT */
0129             ec == connection_aborted || ec == not_connected ||
0130             ec == timed_out || ec == connection_reset ||
0131             ec == broken_pipe || ec == asio::error::eof;
0132     }
0133 
0134 };
0135 
0136 
0137 } // end namespace boost::mqtt5::detail
0138 
0139 #endif // !BOOST_MQTT5_READ_OP_HPP