Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-17 08:38:22

0001 //
0002 // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
0003 //
0004 // Distributed under the Boost Software License, Version 1.0.
0005 // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MQTT5_WRITE_OP_HPP
0009 #define BOOST_MQTT5_WRITE_OP_HPP
0010 
0011 #include <boost/mqtt5/detail/async_traits.hpp>
0012 
0013 #include <boost/asio/associated_allocator.hpp>
0014 #include <boost/asio/associated_executor.hpp>
0015 #include <boost/asio/error.hpp>
0016 #include <boost/asio/post.hpp>
0017 #include <boost/asio/prepend.hpp>
0018 #include <boost/asio/write.hpp>
0019 #include <boost/system/error_code.hpp>
0020 
0021 namespace boost::mqtt5::detail {
0022 
0023 template <typename Owner, typename Handler>
0024 class write_op {
0025     struct on_write {};
0026     struct on_reconnect {};
0027 
0028     Owner& _owner;
0029 
0030     using handler_type = Handler;
0031     handler_type _handler;
0032 
0033 public:
0034     write_op(Owner& owner, Handler&& handler) :
0035         _owner(owner), _handler(std::move(handler))
0036     {}
0037 
0038     write_op(write_op&&) = default;
0039     write_op(const write_op&) = delete;
0040 
0041     write_op& operator=(write_op&&) = default;
0042     write_op& operator=(const write_op&) = delete;
0043 
0044     using allocator_type = asio::associated_allocator_t<handler_type>;
0045     allocator_type get_allocator() const noexcept {
0046         return asio::get_associated_allocator(_handler);
0047     }
0048 
0049     using executor_type = asio::associated_executor_t<handler_type>;
0050     executor_type get_executor() const noexcept {
0051         return asio::get_associated_executor(_handler);
0052     }
0053 
0054     template <typename BufferType>
0055     void perform(BufferType& buffer) {
0056         auto stream_ptr = _owner._stream_ptr;
0057         if (_owner.was_connected())
0058             // note: write operation should not be time-limited
0059             detail::async_write(
0060                 *stream_ptr, buffer,
0061                 asio::prepend(std::move(*this), on_write {}, stream_ptr)
0062             );
0063         else
0064             asio::post(
0065                 _owner.get_executor(),
0066                 asio::prepend(
0067                     std::move(*this), on_write {},
0068                     stream_ptr, asio::error::not_connected, 0
0069                 )
0070             );
0071     }
0072 
0073     void operator()(
0074         on_write, typename Owner::stream_ptr stream_ptr,
0075         error_code ec, size_t bytes_written
0076     ) {
0077         if (!_owner.is_open())
0078             return complete(asio::error::operation_aborted, 0);
0079 
0080         if (!ec)
0081             return complete(ec, bytes_written);
0082 
0083         // websocket returns operation_aborted if disconnected
0084         if (should_reconnect(ec) || ec == asio::error::operation_aborted)
0085             return _owner.async_reconnect(
0086                 stream_ptr, asio::prepend(std::move(*this), on_reconnect {})
0087             );
0088 
0089         return complete(asio::error::no_recovery, 0);
0090     }
0091 
0092     void operator()(on_reconnect, error_code ec) {
0093         if ((ec == asio::error::operation_aborted && _owner.is_open()) || !ec)
0094             ec = asio::error::try_again;
0095 
0096         return complete(ec, 0);
0097     }
0098 
0099 private:
0100     void complete(error_code ec, size_t bytes_written) {
0101         std::move(_handler)(ec, bytes_written);
0102     }
0103 
0104     static bool should_reconnect(error_code ec) {
0105         using namespace asio::error;
0106         // note: Win ERROR_SEM_TIMEOUT == Posix ENOLINK (Reserved)
0107         return ec.value() == 1236L || /* Win ERROR_CONNECTION_ABORTED */
0108             ec.value() == 121L || /* Win ERROR_SEM_TIMEOUT */
0109             ec == connection_aborted || ec == not_connected ||
0110             ec == timed_out || ec == connection_reset ||
0111             ec == broken_pipe || ec == asio::error::eof;
0112     }
0113 
0114 };
0115 
0116 } // end namespace boost::mqtt5::detail
0117 
0118 #endif // !BOOST_MQTT5_WRITE_OP_HPP