File indexing completed on 2025-09-17 08:39:18
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MYSQL_IMPL_INTERNAL_SANSIO_RUN_PIPELINE_HPP
0009 #define BOOST_MYSQL_IMPL_INTERNAL_SANSIO_RUN_PIPELINE_HPP
0010
0011 #include <boost/mysql/character_set.hpp>
0012 #include <boost/mysql/diagnostics.hpp>
0013 #include <boost/mysql/error_code.hpp>
0014 #include <boost/mysql/is_fatal_error.hpp>
0015 #include <boost/mysql/pipeline.hpp>
0016
0017 #include <boost/mysql/detail/access.hpp>
0018 #include <boost/mysql/detail/algo_params.hpp>
0019 #include <boost/mysql/detail/next_action.hpp>
0020 #include <boost/mysql/detail/pipeline.hpp>
0021
0022 #include <boost/mysql/impl/internal/sansio/connection_state_data.hpp>
0023 #include <boost/mysql/impl/internal/sansio/execute.hpp>
0024 #include <boost/mysql/impl/internal/sansio/ping.hpp>
0025 #include <boost/mysql/impl/internal/sansio/prepare_statement.hpp>
0026 #include <boost/mysql/impl/internal/sansio/reset_connection.hpp>
0027 #include <boost/mysql/impl/internal/sansio/set_character_set.hpp>
0028
0029 #include <boost/assert.hpp>
0030 #include <boost/core/span.hpp>
0031
0032 #include <cstddef>
0033 #include <vector>
0034
0035 namespace boost {
0036 namespace mysql {
0037 namespace detail {
0038
0039 class run_pipeline_algo
0040 {
0041 union any_read_algo
0042 {
0043 std::nullptr_t nothing;
0044 read_execute_response_algo execute;
0045 read_prepare_statement_response_algo prepare_statement;
0046 read_reset_connection_response_algo reset_connection;
0047 read_ping_response_algo ping;
0048 read_set_character_set_response_algo set_character_set;
0049
0050 any_read_algo() noexcept : nothing{} {}
0051 };
0052
0053 span<const std::uint8_t> request_buffer_;
0054 span<const pipeline_request_stage> stages_;
0055 std::vector<stage_response>* response_;
0056
0057 int resume_point_{0};
0058 std::size_t current_stage_index_{0};
0059 error_code pipeline_ec_;
0060 bool has_hatal_error_{};
0061 any_read_algo read_response_algo_;
0062 diagnostics temp_diag_;
0063
0064 void setup_response()
0065 {
0066 if (response_)
0067 {
0068
0069 response_->resize(stages_.size());
0070
0071
0072 for (std::size_t i = 0u; i < stages_.size(); ++i)
0073 {
0074
0075
0076 auto& impl = access::get_impl((*response_)[i]);
0077 if (stages_[i].kind == pipeline_stage_kind::execute)
0078 impl.emplace_results();
0079 else
0080 impl.emplace_error();
0081 }
0082 }
0083 }
0084
0085 void setup_current_stage(const connection_state_data& st)
0086 {
0087
0088 temp_diag_.clear();
0089
0090
0091 auto stage = stages_[current_stage_index_];
0092 switch (stage.kind)
0093 {
0094 case pipeline_stage_kind::execute:
0095 {
0096 BOOST_ASSERT(response_ != nullptr);
0097 auto& processor = access::get_impl((*response_)[current_stage_index_]).get_processor();
0098 processor.reset(stage.stage_specific.enc, st.meta_mode);
0099 processor.sequence_number() = stage.seqnum;
0100 read_response_algo_.execute = {&processor};
0101 break;
0102 }
0103 case pipeline_stage_kind::prepare_statement:
0104 read_response_algo_.prepare_statement = {stage.seqnum};
0105 break;
0106 case pipeline_stage_kind::close_statement:
0107
0108 read_response_algo_.nothing = nullptr;
0109 break;
0110 case pipeline_stage_kind::set_character_set:
0111 read_response_algo_.set_character_set = {stage.stage_specific.charset, stage.seqnum};
0112 break;
0113 case pipeline_stage_kind::reset_connection:
0114 read_response_algo_.reset_connection = {stage.seqnum};
0115 break;
0116 case pipeline_stage_kind::ping: read_response_algo_.ping = {stage.seqnum}; break;
0117 default: BOOST_ASSERT(false);
0118 }
0119 }
0120
0121 void set_stage_error(error_code ec, diagnostics&& diag)
0122 {
0123 if (response_)
0124 {
0125 access::get_impl((*response_)[current_stage_index_]).set_error(ec, std::move(diag));
0126 }
0127 }
0128
0129 void on_stage_finished(const connection_state_data& st, diagnostics& output_diag, error_code stage_ec)
0130 {
0131 if (stage_ec)
0132 {
0133
0134
0135 if (is_fatal_error(stage_ec))
0136 {
0137 pipeline_ec_ = stage_ec;
0138 output_diag = temp_diag_;
0139 has_hatal_error_ = true;
0140 }
0141 else if (!pipeline_ec_)
0142 {
0143
0144 pipeline_ec_ = stage_ec;
0145 output_diag = temp_diag_;
0146 }
0147
0148
0149 if (response_ != nullptr)
0150 {
0151 set_stage_error(stage_ec, std::move(temp_diag_));
0152 }
0153 }
0154 else
0155 {
0156 if (stages_[current_stage_index_].kind == pipeline_stage_kind::prepare_statement)
0157 {
0158
0159 BOOST_ASSERT(response_ != nullptr);
0160 access::get_impl((*response_)[current_stage_index_])
0161 .set_result(read_response_algo_.prepare_statement.result(st));
0162 }
0163 }
0164 }
0165
0166 next_action resume_read_algo(connection_state_data& st, error_code ec)
0167 {
0168 switch (stages_[current_stage_index_].kind)
0169 {
0170 case pipeline_stage_kind::execute: return read_response_algo_.execute.resume(st, temp_diag_, ec);
0171 case pipeline_stage_kind::prepare_statement:
0172 return read_response_algo_.prepare_statement.resume(st, temp_diag_, ec);
0173 case pipeline_stage_kind::reset_connection:
0174 return read_response_algo_.reset_connection.resume(st, temp_diag_, ec);
0175 case pipeline_stage_kind::set_character_set:
0176 return read_response_algo_.set_character_set.resume(st, temp_diag_, ec);
0177 case pipeline_stage_kind::ping: return read_response_algo_.ping.resume(st, temp_diag_, ec);
0178 case pipeline_stage_kind::close_statement: return next_action();
0179 default: BOOST_ASSERT(false); return next_action();
0180 }
0181 }
0182
0183 public:
0184 run_pipeline_algo(run_pipeline_algo_params params) noexcept
0185 : request_buffer_(params.request_buffer), stages_(params.request_stages), response_(params.response)
0186 {
0187 }
0188
0189 next_action resume(connection_state_data& st, diagnostics& diag, error_code ec)
0190 {
0191 next_action act;
0192
0193 switch (resume_point_)
0194 {
0195 case 0:
0196
0197
0198 setup_response();
0199
0200
0201 if (stages_.empty())
0202 break;
0203
0204
0205 ec = st.check_status_ready();
0206 if (ec)
0207 return ec;
0208
0209
0210 BOOST_MYSQL_YIELD(resume_point_, 1, next_action::write({request_buffer_, false}))
0211
0212
0213 if (ec)
0214 {
0215 pipeline_ec_ = ec;
0216 has_hatal_error_ = true;
0217 }
0218
0219
0220 for (; current_stage_index_ < stages_.size(); ++current_stage_index_)
0221 {
0222
0223 if (has_hatal_error_)
0224 {
0225 set_stage_error(pipeline_ec_, diagnostics(diag));
0226 continue;
0227 }
0228
0229
0230 setup_current_stage(st);
0231
0232
0233 ec.clear();
0234 while (!(act = resume_read_algo(st, ec)).is_done())
0235 BOOST_MYSQL_YIELD(resume_point_, 2, act)
0236
0237
0238 on_stage_finished(st, diag, act.error());
0239 }
0240 }
0241
0242 return pipeline_ec_;
0243 }
0244 };
0245
0246 }
0247 }
0248 }
0249
0250 #endif