Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-16 08:41:06

0001 //
0002 // Copyright (c) 2019-2025 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MYSQL_IMPL_INTERNAL_SANSIO_READ_SOME_ROWS_HPP
0009 #define BOOST_MYSQL_IMPL_INTERNAL_SANSIO_READ_SOME_ROWS_HPP
0010 
0011 #include <boost/mysql/diagnostics.hpp>
0012 #include <boost/mysql/error_code.hpp>
0013 #include <boost/mysql/field_view.hpp>
0014 
0015 #include <boost/mysql/detail/algo_params.hpp>
0016 #include <boost/mysql/detail/execution_processor/execution_processor.hpp>
0017 #include <boost/mysql/detail/next_action.hpp>
0018 
0019 #include <boost/mysql/impl/internal/coroutine.hpp>
0020 #include <boost/mysql/impl/internal/protocol/deserialization.hpp>
0021 #include <boost/mysql/impl/internal/sansio/connection_state_data.hpp>
0022 
0023 #include <cstddef>
0024 
0025 namespace boost {
0026 namespace mysql {
0027 namespace detail {
0028 
0029 class read_some_rows_algo
0030 {
0031     execution_processor* proc_;
0032     output_ref output_;
0033     bool is_top_level_;
0034 
0035     struct state_t
0036     {
0037         int resume_point{0};
0038         std::size_t rows_read{0};
0039     } state_;
0040 
0041     BOOST_ATTRIBUTE_NODISCARD static std::pair<error_code, std::size_t> process_some_rows(
0042         connection_state_data& st,
0043         execution_processor& proc,
0044         output_ref output,
0045         diagnostics& diag
0046     )
0047     {
0048         // Process all read messages until they run out, an error happens
0049         // or an EOF is received
0050         std::size_t read_rows = 0;
0051         error_code err;
0052         proc.on_row_batch_start();
0053         while (true)
0054         {
0055             // Check for errors (like seqnum mismatches)
0056             if (st.reader.error())
0057                 return {st.reader.error(), read_rows};
0058 
0059             // Get the row message
0060             auto buff = st.reader.message();
0061 
0062             // Deserialize it
0063             auto res = deserialize_row_message(buff, st.flavor, diag);
0064             if (res.type == row_message::type_t::error)
0065             {
0066                 err = res.data.err;
0067             }
0068             else if (res.type == row_message::type_t::row)
0069             {
0070                 output.set_offset(read_rows);
0071                 err = proc.on_row(res.data.row, output, st.shared_fields);
0072                 if (!err)
0073                     ++read_rows;
0074             }
0075             else
0076             {
0077                 st.backslash_escapes = res.data.ok_pack.backslash_escapes();
0078                 err = proc.on_row_ok_packet(res.data.ok_pack);
0079             }
0080 
0081             if (err)
0082                 return {err, read_rows};
0083 
0084             // TODO: can we make this better?
0085             if (!proc.is_reading_rows() || read_rows >= output.max_size())
0086                 break;
0087 
0088             // Attempt to parse the next message
0089             st.reader.prepare_read(proc.sequence_number());
0090             if (!st.reader.done())
0091                 break;
0092         }
0093         proc.on_row_batch_finish();
0094         return {error_code(), read_rows};
0095     }
0096 
0097     // Status changes are only performed if we're the top-level algorithm.
0098     // After an error, multi-function operations are considered finished
0099     void maybe_set_status_ready(connection_state_data& st) const
0100     {
0101         if (is_top_level_)
0102             st.status = connection_status::ready;
0103     }
0104 
0105 public:
0106     read_some_rows_algo(read_some_rows_algo_params params, bool is_top_level = true) noexcept
0107         : proc_(params.proc), output_(params.output), is_top_level_(is_top_level)
0108     {
0109     }
0110 
0111     void reset() { state_ = state_t{}; }
0112 
0113     const execution_processor& processor() const { return *proc_; }
0114     execution_processor& processor() { return *proc_; }
0115 
0116     next_action resume(connection_state_data& st, diagnostics& diag, error_code ec)
0117     {
0118         switch (state_.resume_point)
0119         {
0120         case 0:
0121 
0122             // Clear any previous use of shared fields.
0123             // Required for the dynamic version to work.
0124             st.shared_fields.clear();
0125 
0126             // If we are not reading rows, return (for compatibility, we don't error here)
0127             if (!processor().is_reading_rows())
0128                 return next_action();
0129 
0130             // Check connection status. The check is only correct if we're the top-level algorithm
0131             if (is_top_level_)
0132             {
0133                 ec = st.check_status_multi_function();
0134                 if (ec)
0135                     return ec;
0136             }
0137 
0138             // Read at least one message. Keep parsing state, in case a previous message
0139             // was parsed partially
0140             BOOST_MYSQL_YIELD(state_.resume_point, 1, st.read(proc_->sequence_number(), true))
0141             if (ec)
0142             {
0143                 // If there was an error reading the message, we're no longer in a multi-function operation
0144                 maybe_set_status_ready(st);
0145                 return ec;
0146             }
0147 
0148             // Process messages
0149             std::tie(ec, state_.rows_read) = process_some_rows(st, *proc_, output_, diag);
0150             if (ec)
0151             {
0152                 // If there was an error parsing the message, we're no longer in a multi-function operation
0153                 maybe_set_status_ready(st);
0154                 return ec;
0155             }
0156 
0157             // If we received the final OK packet, we're no longer in a multi-function operation
0158             if (proc_->is_complete() && is_top_level_)
0159                 st.status = connection_status::ready;
0160         }
0161 
0162         return next_action();
0163     }
0164 
0165     std::size_t result(const connection_state_data&) const { return state_.rows_read; }
0166 };
0167 
0168 }  // namespace detail
0169 }  // namespace mysql
0170 }  // namespace boost
0171 
0172 #endif