File indexing completed on 2025-04-19 08:33:52
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MYSQL_IMPL_INTERNAL_SANSIO_MESSAGE_READER_HPP
0009 #define BOOST_MYSQL_IMPL_INTERNAL_SANSIO_MESSAGE_READER_HPP
0010
0011 #include <boost/mysql/client_errc.hpp>
0012 #include <boost/mysql/error_code.hpp>
0013
0014 #include <boost/mysql/impl/internal/coroutine.hpp>
0015 #include <boost/mysql/impl/internal/protocol/deserialization.hpp>
0016 #include <boost/mysql/impl/internal/protocol/frame_header.hpp>
0017 #include <boost/mysql/impl/internal/sansio/read_buffer.hpp>
0018
0019 #include <boost/assert.hpp>
0020 #include <boost/config.hpp>
0021
0022 #include <cstddef>
0023 #include <cstdint>
0024
0025 namespace boost {
0026 namespace mysql {
0027 namespace detail {
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037 class message_reader
0038 {
0039 public:
0040 message_reader(
0041 std::size_t initial_buffer_size,
0042 std::size_t max_buffer_size = static_cast<std::size_t>(-1),
0043 std::size_t max_frame_size = max_packet_size
0044 )
0045 : buffer_(initial_buffer_size, max_buffer_size), max_frame_size_(max_frame_size)
0046 {
0047 }
0048
0049 void reset()
0050 {
0051 buffer_.reset();
0052 state_ = parse_state();
0053 }
0054
0055 std::size_t max_buffer_size() const { return buffer_.max_size(); }
0056
0057
0058
0059
0060 void prepare_read(std::uint8_t& sequence_number, bool keep_state = false)
0061 {
0062 if (!keep_state || done())
0063 state_ = parse_state(sequence_number);
0064 else
0065 state_.sequence_number = &sequence_number;
0066 resume(0);
0067 }
0068
0069
0070 bool done() const { return state_.resume_point == -1; }
0071
0072
0073 error_code error() const
0074 {
0075 BOOST_ASSERT(done());
0076 return state_.ec;
0077 }
0078
0079
0080
0081 span<const std::uint8_t> message() const
0082 {
0083 BOOST_ASSERT(done());
0084 BOOST_ASSERT(!error());
0085 return buffer_.current_message();
0086 }
0087
0088
0089 span<std::uint8_t> buffer() { return buffer_.free_area(); }
0090
0091
0092
0093 BOOST_ATTRIBUTE_NODISCARD
0094 error_code prepare_buffer()
0095 {
0096 buffer_.remove_reserved();
0097 auto ec = buffer_.grow_to_fit(state_.required_size);
0098 if (ec)
0099 return ec;
0100 state_.required_size = 0;
0101 return error_code();
0102 }
0103
0104
0105
0106 void resume(std::size_t bytes_read)
0107 {
0108 frame_header header{};
0109 buffer_.move_to_pending(bytes_read);
0110
0111 switch (state_.resume_point)
0112 {
0113 case 0:
0114
0115
0116 buffer_.move_to_reserved(buffer_.current_message_size());
0117
0118 while (true)
0119 {
0120
0121 set_required_size(frame_header_size);
0122 while (buffer_.pending_size() < frame_header_size)
0123 BOOST_MYSQL_YIELD_VOID(state_.resume_point, 1)
0124
0125
0126 buffer_.move_to_current_message(frame_header_size);
0127
0128
0129 header = deserialize_frame_header(span<const std::uint8_t, frame_header_size>(
0130 buffer_.pending_first() - frame_header_size,
0131 frame_header_size
0132 ));
0133
0134
0135 if (*state_.sequence_number != header.sequence_number)
0136 {
0137 state_.ec = client_errc::sequence_number_mismatch;
0138 state_.resume_point = -1;
0139 return;
0140 }
0141 ++*state_.sequence_number;
0142
0143
0144 state_.body_bytes = header.size;
0145 state_.more_frames_follow = (state_.body_bytes == max_frame_size_);
0146
0147
0148 if (state_.is_first_frame)
0149 {
0150
0151
0152 buffer_.move_to_reserved(frame_header_size);
0153 }
0154 else
0155 {
0156 buffer_.remove_current_message_last(frame_header_size);
0157 }
0158 state_.is_first_frame = false;
0159
0160
0161 set_required_size(state_.body_bytes);
0162 while (buffer_.pending_size() < state_.body_bytes)
0163 BOOST_MYSQL_YIELD_VOID(state_.resume_point, 2)
0164
0165 buffer_.move_to_current_message(state_.body_bytes);
0166
0167
0168 if (!state_.more_frames_follow)
0169 {
0170 state_.resume_point = -1;
0171 return;
0172 }
0173 }
0174 }
0175 }
0176
0177
0178 const read_buffer& internal_buffer() const { return buffer_; }
0179
0180 private:
0181 read_buffer buffer_;
0182 std::size_t max_frame_size_;
0183
0184 struct parse_state
0185 {
0186 int resume_point{0};
0187 std::uint8_t* sequence_number{};
0188 bool is_first_frame{true};
0189 std::size_t body_bytes{0};
0190 bool more_frames_follow{false};
0191 std::size_t required_size{0};
0192 error_code ec;
0193
0194 parse_state() = default;
0195 parse_state(std::uint8_t& seqnum) noexcept : sequence_number(&seqnum) {}
0196 } state_;
0197
0198 void set_required_size(std::size_t required_bytes)
0199 {
0200 if (required_bytes > buffer_.pending_size())
0201 state_.required_size = required_bytes - buffer_.pending_size();
0202 else
0203 state_.required_size = 0;
0204 }
0205 };
0206
0207 }
0208 }
0209 }
0210
0211 #endif