File indexing completed on 2025-04-19 08:33:52
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MYSQL_IMPL_INTERNAL_SANSIO_READ_SOME_ROWS_HPP
0009 #define BOOST_MYSQL_IMPL_INTERNAL_SANSIO_READ_SOME_ROWS_HPP
0010
0011 #include <boost/mysql/diagnostics.hpp>
0012 #include <boost/mysql/error_code.hpp>
0013 #include <boost/mysql/field_view.hpp>
0014
0015 #include <boost/mysql/detail/algo_params.hpp>
0016 #include <boost/mysql/detail/execution_processor/execution_processor.hpp>
0017
0018 #include <boost/mysql/impl/internal/coroutine.hpp>
0019 #include <boost/mysql/impl/internal/protocol/deserialization.hpp>
0020 #include <boost/mysql/impl/internal/sansio/connection_state_data.hpp>
0021
0022 #include <cstddef>
0023
0024 namespace boost {
0025 namespace mysql {
0026 namespace detail {
0027
0028 class read_some_rows_algo
0029 {
0030 diagnostics* diag_;
0031 execution_processor* proc_;
0032 output_ref output_;
0033
0034 struct state_t
0035 {
0036 int resume_point{0};
0037 std::size_t rows_read{0};
0038 } state_;
0039
0040 BOOST_ATTRIBUTE_NODISCARD static std::pair<error_code, std::size_t> process_some_rows(
0041 connection_state_data& st,
0042 execution_processor& proc,
0043 output_ref output,
0044 diagnostics& diag
0045 )
0046 {
0047
0048
0049 std::size_t read_rows = 0;
0050 error_code err;
0051 proc.on_row_batch_start();
0052 while (true)
0053 {
0054
0055 if (st.reader.error())
0056 return {st.reader.error(), read_rows};
0057
0058
0059 auto buff = st.reader.message();
0060
0061
0062 auto res = deserialize_row_message(buff, st.flavor, diag);
0063 if (res.type == row_message::type_t::error)
0064 {
0065 err = res.data.err;
0066 }
0067 else if (res.type == row_message::type_t::row)
0068 {
0069 output.set_offset(read_rows);
0070 err = proc.on_row(res.data.row, output, st.shared_fields);
0071 if (!err)
0072 ++read_rows;
0073 }
0074 else
0075 {
0076 st.backslash_escapes = res.data.ok_pack.backslash_escapes();
0077 err = proc.on_row_ok_packet(res.data.ok_pack);
0078 }
0079
0080 if (err)
0081 return {err, read_rows};
0082
0083
0084 if (!proc.is_reading_rows() || read_rows >= output.max_size())
0085 break;
0086
0087
0088 st.reader.prepare_read(proc.sequence_number());
0089 if (!st.reader.done())
0090 break;
0091 }
0092 proc.on_row_batch_finish();
0093 return {error_code(), read_rows};
0094 }
0095
0096 public:
0097 read_some_rows_algo(read_some_rows_algo_params params) noexcept
0098 : diag_(params.diag), proc_(params.proc), output_(params.output)
0099 {
0100 }
0101
0102 void reset() { state_ = state_t{}; }
0103
0104 const execution_processor& processor() const { return *proc_; }
0105 execution_processor& processor() { return *proc_; }
0106
0107 next_action resume(connection_state_data& st, error_code ec)
0108 {
0109 if (ec)
0110 return ec;
0111
0112 switch (state_.resume_point)
0113 {
0114 case 0:
0115
0116
0117 diag_->clear();
0118
0119
0120
0121 st.shared_fields.clear();
0122
0123
0124 if (!processor().is_reading_rows())
0125 return next_action();
0126
0127
0128
0129 BOOST_MYSQL_YIELD(state_.resume_point, 1, st.read(proc_->sequence_number(), true))
0130
0131
0132 std::tie(ec, state_.rows_read) = process_some_rows(st, *proc_, output_, *diag_);
0133 return ec;
0134 }
0135
0136 return next_action();
0137 }
0138
0139 std::size_t result(const connection_state_data&) const { return state_.rows_read; }
0140 };
0141
0142 }
0143 }
0144 }
0145
0146 #endif