File indexing completed on 2025-09-17 08:38:22
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MQTT5_WRITE_OP_HPP
0009 #define BOOST_MQTT5_WRITE_OP_HPP
0010
0011 #include <boost/mqtt5/detail/async_traits.hpp>
0012
0013 #include <boost/asio/associated_allocator.hpp>
0014 #include <boost/asio/associated_executor.hpp>
0015 #include <boost/asio/error.hpp>
0016 #include <boost/asio/post.hpp>
0017 #include <boost/asio/prepend.hpp>
0018 #include <boost/asio/write.hpp>
0019 #include <boost/system/error_code.hpp>
0020
0021 namespace boost::mqtt5::detail {
0022
0023 template <typename Owner, typename Handler>
0024 class write_op {
0025 struct on_write {};
0026 struct on_reconnect {};
0027
0028 Owner& _owner;
0029
0030 using handler_type = Handler;
0031 handler_type _handler;
0032
0033 public:
0034 write_op(Owner& owner, Handler&& handler) :
0035 _owner(owner), _handler(std::move(handler))
0036 {}
0037
0038 write_op(write_op&&) = default;
0039 write_op(const write_op&) = delete;
0040
0041 write_op& operator=(write_op&&) = default;
0042 write_op& operator=(const write_op&) = delete;
0043
0044 using allocator_type = asio::associated_allocator_t<handler_type>;
0045 allocator_type get_allocator() const noexcept {
0046 return asio::get_associated_allocator(_handler);
0047 }
0048
0049 using executor_type = asio::associated_executor_t<handler_type>;
0050 executor_type get_executor() const noexcept {
0051 return asio::get_associated_executor(_handler);
0052 }
0053
0054 template <typename BufferType>
0055 void perform(BufferType& buffer) {
0056 auto stream_ptr = _owner._stream_ptr;
0057 if (_owner.was_connected())
0058
0059 detail::async_write(
0060 *stream_ptr, buffer,
0061 asio::prepend(std::move(*this), on_write {}, stream_ptr)
0062 );
0063 else
0064 asio::post(
0065 _owner.get_executor(),
0066 asio::prepend(
0067 std::move(*this), on_write {},
0068 stream_ptr, asio::error::not_connected, 0
0069 )
0070 );
0071 }
0072
0073 void operator()(
0074 on_write, typename Owner::stream_ptr stream_ptr,
0075 error_code ec, size_t bytes_written
0076 ) {
0077 if (!_owner.is_open())
0078 return complete(asio::error::operation_aborted, 0);
0079
0080 if (!ec)
0081 return complete(ec, bytes_written);
0082
0083
0084 if (should_reconnect(ec) || ec == asio::error::operation_aborted)
0085 return _owner.async_reconnect(
0086 stream_ptr, asio::prepend(std::move(*this), on_reconnect {})
0087 );
0088
0089 return complete(asio::error::no_recovery, 0);
0090 }
0091
0092 void operator()(on_reconnect, error_code ec) {
0093 if ((ec == asio::error::operation_aborted && _owner.is_open()) || !ec)
0094 ec = asio::error::try_again;
0095
0096 return complete(ec, 0);
0097 }
0098
0099 private:
0100 void complete(error_code ec, size_t bytes_written) {
0101 std::move(_handler)(ec, bytes_written);
0102 }
0103
0104 static bool should_reconnect(error_code ec) {
0105 using namespace asio::error;
0106
0107 return ec.value() == 1236L ||
0108 ec.value() == 121L ||
0109 ec == connection_aborted || ec == not_connected ||
0110 ec == timed_out || ec == connection_reset ||
0111 ec == broken_pipe || ec == asio::error::eof;
0112 }
0113
0114 };
0115
0116 }
0117
0118 #endif