Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-04-19 08:33:52

0001 //
0002 // Copyright (c) 2019-2024 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MYSQL_IMPL_INTERNAL_SANSIO_MESSAGE_READER_HPP
0009 #define BOOST_MYSQL_IMPL_INTERNAL_SANSIO_MESSAGE_READER_HPP
0010 
0011 #include <boost/mysql/client_errc.hpp>
0012 #include <boost/mysql/error_code.hpp>
0013 
0014 #include <boost/mysql/impl/internal/coroutine.hpp>
0015 #include <boost/mysql/impl/internal/protocol/deserialization.hpp>
0016 #include <boost/mysql/impl/internal/protocol/frame_header.hpp>
0017 #include <boost/mysql/impl/internal/sansio/read_buffer.hpp>
0018 
0019 #include <boost/assert.hpp>
0020 #include <boost/config.hpp>
0021 
0022 #include <cstddef>
0023 #include <cstdint>
0024 
0025 namespace boost {
0026 namespace mysql {
0027 namespace detail {
0028 
0029 // Flow:
0030 //   Prepare a read operation with prepare_read()
0031 //   In a loop, until done():
0032 //      prepare_buffer() to resize the buffer to an appropriate size
0033 //      Read bytes against buffer()
0034 //      Call resume with the number of bytes read
0035 // Or call prepare_read() and check done() to attempt to get a cached message
0036 //    (further prepare_read calls should use keep_state=true)
0037 class message_reader
0038 {
0039 public:
0040     message_reader(
0041         std::size_t initial_buffer_size,
0042         std::size_t max_buffer_size = static_cast<std::size_t>(-1),
0043         std::size_t max_frame_size = max_packet_size
0044     )
0045         : buffer_(initial_buffer_size, max_buffer_size), max_frame_size_(max_frame_size)
0046     {
0047     }
0048 
0049     void reset()
0050     {
0051         buffer_.reset();
0052         state_ = parse_state();
0053     }
0054 
0055     std::size_t max_buffer_size() const { return buffer_.max_size(); }
0056 
0057     // Prepares a read operation. sequence_number should be kept alive until
0058     // the next read is prepared or no more calls to resume() are expected.
0059     // If keep_state=true, and the op is not complete, parsing state is preserved
0060     void prepare_read(std::uint8_t& sequence_number, bool keep_state = false)
0061     {
0062         if (!keep_state || done())
0063             state_ = parse_state(sequence_number);
0064         else
0065             state_.sequence_number = &sequence_number;
0066         resume(0);
0067     }
0068 
0069     // Is parsing the current message done?
0070     bool done() const { return state_.resume_point == -1; }
0071 
0072     // Returns any errors generated during parsing. Requires this->done()
0073     error_code error() const
0074     {
0075         BOOST_ASSERT(done());
0076         return state_.ec;
0077     }
0078 
0079     // Returns the last parsed message. Valid until prepare_buffer()
0080     // is next called. Requires done() && !error()
0081     span<const std::uint8_t> message() const
0082     {
0083         BOOST_ASSERT(done());
0084         BOOST_ASSERT(!error());
0085         return buffer_.current_message();
0086     }
0087 
0088     // Returns buffer space suitable to read bytes to
0089     span<std::uint8_t> buffer() { return buffer_.free_area(); }
0090 
0091     // Removes old messages stored in the buffer, and resizes it, if required, to accomodate
0092     // the message currently being parsed.
0093     BOOST_ATTRIBUTE_NODISCARD
0094     error_code prepare_buffer()
0095     {
0096         buffer_.remove_reserved();
0097         auto ec = buffer_.grow_to_fit(state_.required_size);
0098         if (ec)
0099             return ec;
0100         state_.required_size = 0;
0101         return error_code();
0102     }
0103 
0104     // The main operation. Call it after reading bytes against buffer(),
0105     // with the number of bytes read
0106     void resume(std::size_t bytes_read)
0107     {
0108         frame_header header{};
0109         buffer_.move_to_pending(bytes_read);
0110 
0111         switch (state_.resume_point)
0112         {
0113         case 0:
0114 
0115             // Move the previously parsed message to the reserved area, if any
0116             buffer_.move_to_reserved(buffer_.current_message_size());
0117 
0118             while (true)
0119             {
0120                 // Read the header
0121                 set_required_size(frame_header_size);
0122                 while (buffer_.pending_size() < frame_header_size)
0123                     BOOST_MYSQL_YIELD_VOID(state_.resume_point, 1)
0124 
0125                 // Mark the header as belonging to the current message
0126                 buffer_.move_to_current_message(frame_header_size);
0127 
0128                 // Deserialize the header
0129                 header = deserialize_frame_header(span<const std::uint8_t, frame_header_size>(
0130                     buffer_.pending_first() - frame_header_size,
0131                     frame_header_size
0132                 ));
0133 
0134                 // Process the sequence number
0135                 if (*state_.sequence_number != header.sequence_number)
0136                 {
0137                     state_.ec = client_errc::sequence_number_mismatch;
0138                     state_.resume_point = -1;
0139                     return;
0140                 }
0141                 ++*state_.sequence_number;
0142 
0143                 // Process the packet size
0144                 state_.body_bytes = header.size;
0145                 state_.more_frames_follow = (state_.body_bytes == max_frame_size_);
0146 
0147                 // We are done with the header
0148                 if (state_.is_first_frame)
0149                 {
0150                     // If it's the 1st frame, we can just move the header bytes to the reserved
0151                     // area, avoiding a big memmove
0152                     buffer_.move_to_reserved(frame_header_size);
0153                 }
0154                 else
0155                 {
0156                     buffer_.remove_current_message_last(frame_header_size);
0157                 }
0158                 state_.is_first_frame = false;
0159 
0160                 // Read the body
0161                 set_required_size(state_.body_bytes);
0162                 while (buffer_.pending_size() < state_.body_bytes)
0163                     BOOST_MYSQL_YIELD_VOID(state_.resume_point, 2)
0164 
0165                 buffer_.move_to_current_message(state_.body_bytes);
0166 
0167                 // Check if we're done
0168                 if (!state_.more_frames_follow)
0169                 {
0170                     state_.resume_point = -1;
0171                     return;
0172                 }
0173             }
0174         }
0175     }
0176 
0177     // Exposed for testing
0178     const read_buffer& internal_buffer() const { return buffer_; }
0179 
0180 private:
0181     read_buffer buffer_;
0182     std::size_t max_frame_size_;
0183 
0184     struct parse_state
0185     {
0186         int resume_point{0};
0187         std::uint8_t* sequence_number{};
0188         bool is_first_frame{true};
0189         std::size_t body_bytes{0};
0190         bool more_frames_follow{false};
0191         std::size_t required_size{0};
0192         error_code ec;
0193 
0194         parse_state() = default;
0195         parse_state(std::uint8_t& seqnum) noexcept : sequence_number(&seqnum) {}
0196     } state_;
0197 
0198     void set_required_size(std::size_t required_bytes)
0199     {
0200         if (required_bytes > buffer_.pending_size())
0201             state_.required_size = required_bytes - buffer_.pending_size();
0202         else
0203             state_.required_size = 0;
0204     }
0205 };
0206 
0207 }  // namespace detail
0208 }  // namespace mysql
0209 }  // namespace boost
0210 
0211 #endif