File indexing completed on 2025-04-19 08:33:52
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MYSQL_IMPL_INTERNAL_SANSIO_RUN_PIPELINE_HPP
0009 #define BOOST_MYSQL_IMPL_INTERNAL_SANSIO_RUN_PIPELINE_HPP
0010
0011 #include <boost/mysql/character_set.hpp>
0012 #include <boost/mysql/diagnostics.hpp>
0013 #include <boost/mysql/error_code.hpp>
0014 #include <boost/mysql/is_fatal_error.hpp>
0015 #include <boost/mysql/pipeline.hpp>
0016
0017 #include <boost/mysql/detail/access.hpp>
0018 #include <boost/mysql/detail/algo_params.hpp>
0019 #include <boost/mysql/detail/next_action.hpp>
0020 #include <boost/mysql/detail/pipeline.hpp>
0021
0022 #include <boost/mysql/impl/internal/sansio/connection_state_data.hpp>
0023 #include <boost/mysql/impl/internal/sansio/execute.hpp>
0024 #include <boost/mysql/impl/internal/sansio/ping.hpp>
0025 #include <boost/mysql/impl/internal/sansio/prepare_statement.hpp>
0026 #include <boost/mysql/impl/internal/sansio/reset_connection.hpp>
0027 #include <boost/mysql/impl/internal/sansio/set_character_set.hpp>
0028
0029 #include <boost/assert.hpp>
0030 #include <boost/core/span.hpp>
0031
0032 #include <cstddef>
0033 #include <vector>
0034
0035 namespace boost {
0036 namespace mysql {
0037 namespace detail {
0038
0039 class run_pipeline_algo
0040 {
0041 union any_read_algo
0042 {
0043 std::nullptr_t nothing;
0044 read_execute_response_algo execute;
0045 read_prepare_statement_response_algo prepare_statement;
0046 read_reset_connection_response_algo reset_connection;
0047 read_ping_response_algo ping;
0048 read_set_character_set_response_algo set_character_set;
0049
0050 any_read_algo() noexcept : nothing{} {}
0051 };
0052
0053 diagnostics* diag_;
0054 span<const std::uint8_t> request_buffer_;
0055 span<const pipeline_request_stage> stages_;
0056 std::vector<stage_response>* response_;
0057
0058 int resume_point_{0};
0059 std::size_t current_stage_index_{0};
0060 error_code pipeline_ec_;
0061 bool has_hatal_error_{};
0062 any_read_algo read_response_algo_;
0063 diagnostics temp_diag_;
0064
0065 void setup_response()
0066 {
0067 if (response_)
0068 {
0069
0070 response_->resize(stages_.size());
0071
0072
0073 for (std::size_t i = 0u; i < stages_.size(); ++i)
0074 {
0075
0076
0077 auto& impl = access::get_impl((*response_)[i]);
0078 if (stages_[i].kind == pipeline_stage_kind::execute)
0079 impl.emplace_results();
0080 else
0081 impl.emplace_error();
0082 }
0083 }
0084 }
0085
0086 void setup_current_stage(const connection_state_data& st)
0087 {
0088
0089 temp_diag_.clear();
0090
0091
0092 auto stage = stages_[current_stage_index_];
0093 switch (stage.kind)
0094 {
0095 case pipeline_stage_kind::execute:
0096 {
0097 BOOST_ASSERT(response_ != nullptr);
0098 auto& processor = access::get_impl((*response_)[current_stage_index_]).get_processor();
0099 processor.reset(stage.stage_specific.enc, st.meta_mode);
0100 processor.sequence_number() = stage.seqnum;
0101 read_response_algo_.execute = {&temp_diag_, &processor};
0102 break;
0103 }
0104 case pipeline_stage_kind::prepare_statement:
0105 read_response_algo_.prepare_statement = {&temp_diag_, stage.seqnum};
0106 break;
0107 case pipeline_stage_kind::close_statement:
0108
0109 read_response_algo_.nothing = nullptr;
0110 break;
0111 case pipeline_stage_kind::set_character_set:
0112 read_response_algo_.set_character_set = {&temp_diag_, stage.stage_specific.charset, stage.seqnum};
0113 break;
0114 case pipeline_stage_kind::reset_connection:
0115 read_response_algo_.reset_connection = {&temp_diag_, stage.seqnum};
0116 break;
0117 case pipeline_stage_kind::ping: read_response_algo_.ping = {&temp_diag_, stage.seqnum}; break;
0118 default: BOOST_ASSERT(false);
0119 }
0120 }
0121
0122 void set_stage_error(error_code ec, diagnostics&& diag)
0123 {
0124 if (response_)
0125 {
0126 access::get_impl((*response_)[current_stage_index_]).set_error(ec, std::move(diag));
0127 }
0128 }
0129
0130 void on_stage_finished(const connection_state_data& st, error_code stage_ec)
0131 {
0132 if (stage_ec)
0133 {
0134
0135
0136 if (is_fatal_error(stage_ec))
0137 {
0138 pipeline_ec_ = stage_ec;
0139 *diag_ = temp_diag_;
0140 has_hatal_error_ = true;
0141 }
0142 else if (!pipeline_ec_)
0143 {
0144
0145 pipeline_ec_ = stage_ec;
0146 *diag_ = temp_diag_;
0147 }
0148
0149
0150 if (response_ != nullptr)
0151 {
0152 set_stage_error(stage_ec, std::move(temp_diag_));
0153 }
0154 }
0155 else
0156 {
0157 if (stages_[current_stage_index_].kind == pipeline_stage_kind::prepare_statement)
0158 {
0159
0160 BOOST_ASSERT(response_ != nullptr);
0161 access::get_impl((*response_)[current_stage_index_])
0162 .set_result(read_response_algo_.prepare_statement.result(st));
0163 }
0164 }
0165 }
0166
0167 next_action resume_read_algo(connection_state_data& st, error_code ec)
0168 {
0169 switch (stages_[current_stage_index_].kind)
0170 {
0171 case pipeline_stage_kind::execute: return read_response_algo_.execute.resume(st, ec);
0172 case pipeline_stage_kind::prepare_statement:
0173 return read_response_algo_.prepare_statement.resume(st, ec);
0174 case pipeline_stage_kind::reset_connection:
0175 return read_response_algo_.reset_connection.resume(st, ec);
0176 case pipeline_stage_kind::set_character_set:
0177 return read_response_algo_.set_character_set.resume(st, ec);
0178 case pipeline_stage_kind::ping: return read_response_algo_.ping.resume(st, ec);
0179 case pipeline_stage_kind::close_statement: return next_action();
0180 default: BOOST_ASSERT(false); return next_action();
0181 }
0182 }
0183
0184 public:
0185 run_pipeline_algo(run_pipeline_algo_params params) noexcept
0186 : diag_(params.diag),
0187 request_buffer_(params.request_buffer),
0188 stages_(params.request_stages),
0189 response_(params.response)
0190 {
0191 }
0192
0193 next_action resume(connection_state_data& st, error_code ec)
0194 {
0195 next_action act;
0196
0197 switch (resume_point_)
0198 {
0199 case 0:
0200
0201
0202 diag_->clear();
0203 setup_response();
0204
0205
0206 if (stages_.empty())
0207 break;
0208
0209
0210 BOOST_MYSQL_YIELD(resume_point_, 1, next_action::write({request_buffer_, false}))
0211
0212
0213 if (ec)
0214 {
0215 pipeline_ec_ = ec;
0216 has_hatal_error_ = true;
0217 }
0218
0219
0220 for (; current_stage_index_ < stages_.size(); ++current_stage_index_)
0221 {
0222
0223 if (has_hatal_error_)
0224 {
0225 set_stage_error(pipeline_ec_, diagnostics(*diag_));
0226 continue;
0227 }
0228
0229
0230 setup_current_stage(st);
0231
0232
0233 ec.clear();
0234 while (!(act = resume_read_algo(st, ec)).is_done())
0235 BOOST_MYSQL_YIELD(resume_point_, 2, act)
0236
0237
0238 on_stage_finished(st, act.error());
0239 }
0240 }
0241
0242 return pipeline_ec_;
0243 }
0244 };
0245
0246 }
0247 }
0248 }
0249
0250 #endif