File indexing completed on 2025-09-16 08:41:06
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MYSQL_IMPL_INTERNAL_SANSIO_READ_SOME_ROWS_HPP
0009 #define BOOST_MYSQL_IMPL_INTERNAL_SANSIO_READ_SOME_ROWS_HPP
0010
0011 #include <boost/mysql/diagnostics.hpp>
0012 #include <boost/mysql/error_code.hpp>
0013 #include <boost/mysql/field_view.hpp>
0014
0015 #include <boost/mysql/detail/algo_params.hpp>
0016 #include <boost/mysql/detail/execution_processor/execution_processor.hpp>
0017 #include <boost/mysql/detail/next_action.hpp>
0018
0019 #include <boost/mysql/impl/internal/coroutine.hpp>
0020 #include <boost/mysql/impl/internal/protocol/deserialization.hpp>
0021 #include <boost/mysql/impl/internal/sansio/connection_state_data.hpp>
0022
0023 #include <cstddef>
0024
0025 namespace boost {
0026 namespace mysql {
0027 namespace detail {
0028
0029 class read_some_rows_algo
0030 {
0031 execution_processor* proc_;
0032 output_ref output_;
0033 bool is_top_level_;
0034
0035 struct state_t
0036 {
0037 int resume_point{0};
0038 std::size_t rows_read{0};
0039 } state_;
0040
0041 BOOST_ATTRIBUTE_NODISCARD static std::pair<error_code, std::size_t> process_some_rows(
0042 connection_state_data& st,
0043 execution_processor& proc,
0044 output_ref output,
0045 diagnostics& diag
0046 )
0047 {
0048
0049
0050 std::size_t read_rows = 0;
0051 error_code err;
0052 proc.on_row_batch_start();
0053 while (true)
0054 {
0055
0056 if (st.reader.error())
0057 return {st.reader.error(), read_rows};
0058
0059
0060 auto buff = st.reader.message();
0061
0062
0063 auto res = deserialize_row_message(buff, st.flavor, diag);
0064 if (res.type == row_message::type_t::error)
0065 {
0066 err = res.data.err;
0067 }
0068 else if (res.type == row_message::type_t::row)
0069 {
0070 output.set_offset(read_rows);
0071 err = proc.on_row(res.data.row, output, st.shared_fields);
0072 if (!err)
0073 ++read_rows;
0074 }
0075 else
0076 {
0077 st.backslash_escapes = res.data.ok_pack.backslash_escapes();
0078 err = proc.on_row_ok_packet(res.data.ok_pack);
0079 }
0080
0081 if (err)
0082 return {err, read_rows};
0083
0084
0085 if (!proc.is_reading_rows() || read_rows >= output.max_size())
0086 break;
0087
0088
0089 st.reader.prepare_read(proc.sequence_number());
0090 if (!st.reader.done())
0091 break;
0092 }
0093 proc.on_row_batch_finish();
0094 return {error_code(), read_rows};
0095 }
0096
0097
0098
0099 void maybe_set_status_ready(connection_state_data& st) const
0100 {
0101 if (is_top_level_)
0102 st.status = connection_status::ready;
0103 }
0104
0105 public:
0106 read_some_rows_algo(read_some_rows_algo_params params, bool is_top_level = true) noexcept
0107 : proc_(params.proc), output_(params.output), is_top_level_(is_top_level)
0108 {
0109 }
0110
0111 void reset() { state_ = state_t{}; }
0112
0113 const execution_processor& processor() const { return *proc_; }
0114 execution_processor& processor() { return *proc_; }
0115
0116 next_action resume(connection_state_data& st, diagnostics& diag, error_code ec)
0117 {
0118 switch (state_.resume_point)
0119 {
0120 case 0:
0121
0122
0123
0124 st.shared_fields.clear();
0125
0126
0127 if (!processor().is_reading_rows())
0128 return next_action();
0129
0130
0131 if (is_top_level_)
0132 {
0133 ec = st.check_status_multi_function();
0134 if (ec)
0135 return ec;
0136 }
0137
0138
0139
0140 BOOST_MYSQL_YIELD(state_.resume_point, 1, st.read(proc_->sequence_number(), true))
0141 if (ec)
0142 {
0143
0144 maybe_set_status_ready(st);
0145 return ec;
0146 }
0147
0148
0149 std::tie(ec, state_.rows_read) = process_some_rows(st, *proc_, output_, diag);
0150 if (ec)
0151 {
0152
0153 maybe_set_status_ready(st);
0154 return ec;
0155 }
0156
0157
0158 if (proc_->is_complete() && is_top_level_)
0159 st.status = connection_status::ready;
0160 }
0161
0162 return next_action();
0163 }
0164
0165 std::size_t result(const connection_state_data&) const { return state_.rows_read; }
0166 };
0167
0168 }
0169 }
0170 }
0171
0172 #endif