Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-04-19 08:33:52

0001 //
0002 // Copyright (c) 2019-2024 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MYSQL_IMPL_INTERNAL_SANSIO_READ_SOME_ROWS_HPP
0009 #define BOOST_MYSQL_IMPL_INTERNAL_SANSIO_READ_SOME_ROWS_HPP
0010 
0011 #include <boost/mysql/diagnostics.hpp>
0012 #include <boost/mysql/error_code.hpp>
0013 #include <boost/mysql/field_view.hpp>
0014 
0015 #include <boost/mysql/detail/algo_params.hpp>
0016 #include <boost/mysql/detail/execution_processor/execution_processor.hpp>
0017 
0018 #include <boost/mysql/impl/internal/coroutine.hpp>
0019 #include <boost/mysql/impl/internal/protocol/deserialization.hpp>
0020 #include <boost/mysql/impl/internal/sansio/connection_state_data.hpp>
0021 
0022 #include <cstddef>
0023 
0024 namespace boost {
0025 namespace mysql {
0026 namespace detail {
0027 
0028 class read_some_rows_algo
0029 {
0030     diagnostics* diag_;
0031     execution_processor* proc_;
0032     output_ref output_;
0033 
0034     struct state_t
0035     {
0036         int resume_point{0};
0037         std::size_t rows_read{0};
0038     } state_;
0039 
0040     BOOST_ATTRIBUTE_NODISCARD static std::pair<error_code, std::size_t> process_some_rows(
0041         connection_state_data& st,
0042         execution_processor& proc,
0043         output_ref output,
0044         diagnostics& diag
0045     )
0046     {
0047         // Process all read messages until they run out, an error happens
0048         // or an EOF is received
0049         std::size_t read_rows = 0;
0050         error_code err;
0051         proc.on_row_batch_start();
0052         while (true)
0053         {
0054             // Check for errors (like seqnum mismatches)
0055             if (st.reader.error())
0056                 return {st.reader.error(), read_rows};
0057 
0058             // Get the row message
0059             auto buff = st.reader.message();
0060 
0061             // Deserialize it
0062             auto res = deserialize_row_message(buff, st.flavor, diag);
0063             if (res.type == row_message::type_t::error)
0064             {
0065                 err = res.data.err;
0066             }
0067             else if (res.type == row_message::type_t::row)
0068             {
0069                 output.set_offset(read_rows);
0070                 err = proc.on_row(res.data.row, output, st.shared_fields);
0071                 if (!err)
0072                     ++read_rows;
0073             }
0074             else
0075             {
0076                 st.backslash_escapes = res.data.ok_pack.backslash_escapes();
0077                 err = proc.on_row_ok_packet(res.data.ok_pack);
0078             }
0079 
0080             if (err)
0081                 return {err, read_rows};
0082 
0083             // TODO: can we make this better?
0084             if (!proc.is_reading_rows() || read_rows >= output.max_size())
0085                 break;
0086 
0087             // Attempt to parse the next message
0088             st.reader.prepare_read(proc.sequence_number());
0089             if (!st.reader.done())
0090                 break;
0091         }
0092         proc.on_row_batch_finish();
0093         return {error_code(), read_rows};
0094     }
0095 
0096 public:
0097     read_some_rows_algo(read_some_rows_algo_params params) noexcept
0098         : diag_(params.diag), proc_(params.proc), output_(params.output)
0099     {
0100     }
0101 
0102     void reset() { state_ = state_t{}; }
0103 
0104     const execution_processor& processor() const { return *proc_; }
0105     execution_processor& processor() { return *proc_; }
0106 
0107     next_action resume(connection_state_data& st, error_code ec)
0108     {
0109         if (ec)
0110             return ec;
0111 
0112         switch (state_.resume_point)
0113         {
0114         case 0:
0115 
0116             // Clear diagnostics
0117             diag_->clear();
0118 
0119             // Clear any previous use of shared fields.
0120             // Required for the dynamic version to work.
0121             st.shared_fields.clear();
0122 
0123             // If we are not reading rows, return
0124             if (!processor().is_reading_rows())
0125                 return next_action();
0126 
0127             // Read at least one message. Keep parsing state, in case a previous message
0128             // was parsed partially
0129             BOOST_MYSQL_YIELD(state_.resume_point, 1, st.read(proc_->sequence_number(), true))
0130 
0131             // Process messages
0132             std::tie(ec, state_.rows_read) = process_some_rows(st, *proc_, output_, *diag_);
0133             return ec;
0134         }
0135 
0136         return next_action();
0137     }
0138 
0139     std::size_t result(const connection_state_data&) const { return state_.rows_read; }
0140 };
0141 
0142 }  // namespace detail
0143 }  // namespace mysql
0144 }  // namespace boost
0145 
0146 #endif