Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-04-19 08:33:52

0001 //
0002 // Copyright (c) 2019-2024 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MYSQL_IMPL_INTERNAL_SANSIO_RUN_PIPELINE_HPP
0009 #define BOOST_MYSQL_IMPL_INTERNAL_SANSIO_RUN_PIPELINE_HPP
0010 
0011 #include <boost/mysql/character_set.hpp>
0012 #include <boost/mysql/diagnostics.hpp>
0013 #include <boost/mysql/error_code.hpp>
0014 #include <boost/mysql/is_fatal_error.hpp>
0015 #include <boost/mysql/pipeline.hpp>
0016 
0017 #include <boost/mysql/detail/access.hpp>
0018 #include <boost/mysql/detail/algo_params.hpp>
0019 #include <boost/mysql/detail/next_action.hpp>
0020 #include <boost/mysql/detail/pipeline.hpp>
0021 
0022 #include <boost/mysql/impl/internal/sansio/connection_state_data.hpp>
0023 #include <boost/mysql/impl/internal/sansio/execute.hpp>
0024 #include <boost/mysql/impl/internal/sansio/ping.hpp>
0025 #include <boost/mysql/impl/internal/sansio/prepare_statement.hpp>
0026 #include <boost/mysql/impl/internal/sansio/reset_connection.hpp>
0027 #include <boost/mysql/impl/internal/sansio/set_character_set.hpp>
0028 
0029 #include <boost/assert.hpp>
0030 #include <boost/core/span.hpp>
0031 
0032 #include <cstddef>
0033 #include <vector>
0034 
0035 namespace boost {
0036 namespace mysql {
0037 namespace detail {
0038 
0039 class run_pipeline_algo
0040 {
0041     union any_read_algo
0042     {
0043         std::nullptr_t nothing;
0044         read_execute_response_algo execute;
0045         read_prepare_statement_response_algo prepare_statement;
0046         read_reset_connection_response_algo reset_connection;
0047         read_ping_response_algo ping;
0048         read_set_character_set_response_algo set_character_set;
0049 
0050         any_read_algo() noexcept : nothing{} {}
0051     };
0052 
0053     diagnostics* diag_;
0054     span<const std::uint8_t> request_buffer_;
0055     span<const pipeline_request_stage> stages_;
0056     std::vector<stage_response>* response_;
0057 
0058     int resume_point_{0};
0059     std::size_t current_stage_index_{0};
0060     error_code pipeline_ec_;  // Result of the entire operation
0061     bool has_hatal_error_{};  // If true, fail further stages with pipeline_ec_
0062     any_read_algo read_response_algo_;
0063     diagnostics temp_diag_;
0064 
0065     void setup_response()
0066     {
0067         if (response_)
0068         {
0069             // Create as many response items as request stages
0070             response_->resize(stages_.size());
0071 
0072             // Setup them
0073             for (std::size_t i = 0u; i < stages_.size(); ++i)
0074             {
0075                 // Execution stages need to be initialized to results objects.
0076                 // Otherwise, clear any previous content
0077                 auto& impl = access::get_impl((*response_)[i]);
0078                 if (stages_[i].kind == pipeline_stage_kind::execute)
0079                     impl.emplace_results();
0080                 else
0081                     impl.emplace_error();
0082             }
0083         }
0084     }
0085 
0086     void setup_current_stage(const connection_state_data& st)
0087     {
0088         // Reset previous data
0089         temp_diag_.clear();
0090 
0091         // Setup read algo
0092         auto stage = stages_[current_stage_index_];
0093         switch (stage.kind)
0094         {
0095         case pipeline_stage_kind::execute:
0096         {
0097             BOOST_ASSERT(response_ != nullptr);  // we don't support execution ignoring the response
0098             auto& processor = access::get_impl((*response_)[current_stage_index_]).get_processor();
0099             processor.reset(stage.stage_specific.enc, st.meta_mode);
0100             processor.sequence_number() = stage.seqnum;
0101             read_response_algo_.execute = {&temp_diag_, &processor};
0102             break;
0103         }
0104         case pipeline_stage_kind::prepare_statement:
0105             read_response_algo_.prepare_statement = {&temp_diag_, stage.seqnum};
0106             break;
0107         case pipeline_stage_kind::close_statement:
0108             // Close statement doesn't have a response
0109             read_response_algo_.nothing = nullptr;
0110             break;
0111         case pipeline_stage_kind::set_character_set:
0112             read_response_algo_.set_character_set = {&temp_diag_, stage.stage_specific.charset, stage.seqnum};
0113             break;
0114         case pipeline_stage_kind::reset_connection:
0115             read_response_algo_.reset_connection = {&temp_diag_, stage.seqnum};
0116             break;
0117         case pipeline_stage_kind::ping: read_response_algo_.ping = {&temp_diag_, stage.seqnum}; break;
0118         default: BOOST_ASSERT(false);
0119         }
0120     }
0121 
0122     void set_stage_error(error_code ec, diagnostics&& diag)
0123     {
0124         if (response_)
0125         {
0126             access::get_impl((*response_)[current_stage_index_]).set_error(ec, std::move(diag));
0127         }
0128     }
0129 
0130     void on_stage_finished(const connection_state_data& st, error_code stage_ec)
0131     {
0132         if (stage_ec)
0133         {
0134             // If the error was fatal, fail successive stages.
0135             // This error is the result of the operation
0136             if (is_fatal_error(stage_ec))
0137             {
0138                 pipeline_ec_ = stage_ec;
0139                 *diag_ = temp_diag_;
0140                 has_hatal_error_ = true;
0141             }
0142             else if (!pipeline_ec_)
0143             {
0144                 // In the absence of fatal errors, the first error we encounter is the result of the operation
0145                 pipeline_ec_ = stage_ec;
0146                 *diag_ = temp_diag_;
0147             }
0148 
0149             // Propagate the error
0150             if (response_ != nullptr)
0151             {
0152                 set_stage_error(stage_ec, std::move(temp_diag_));
0153             }
0154         }
0155         else
0156         {
0157             if (stages_[current_stage_index_].kind == pipeline_stage_kind::prepare_statement)
0158             {
0159                 // Propagate results. We don't support prepare statements ignoring the response
0160                 BOOST_ASSERT(response_ != nullptr);
0161                 access::get_impl((*response_)[current_stage_index_])
0162                     .set_result(read_response_algo_.prepare_statement.result(st));
0163             }
0164         }
0165     }
0166 
0167     next_action resume_read_algo(connection_state_data& st, error_code ec)
0168     {
0169         switch (stages_[current_stage_index_].kind)
0170         {
0171         case pipeline_stage_kind::execute: return read_response_algo_.execute.resume(st, ec);
0172         case pipeline_stage_kind::prepare_statement:
0173             return read_response_algo_.prepare_statement.resume(st, ec);
0174         case pipeline_stage_kind::reset_connection:
0175             return read_response_algo_.reset_connection.resume(st, ec);
0176         case pipeline_stage_kind::set_character_set:
0177             return read_response_algo_.set_character_set.resume(st, ec);
0178         case pipeline_stage_kind::ping: return read_response_algo_.ping.resume(st, ec);
0179         case pipeline_stage_kind::close_statement: return next_action();  // has no response
0180         default: BOOST_ASSERT(false); return next_action();
0181         }
0182     }
0183 
0184 public:
0185     run_pipeline_algo(run_pipeline_algo_params params) noexcept
0186         : diag_(params.diag),
0187           request_buffer_(params.request_buffer),
0188           stages_(params.request_stages),
0189           response_(params.response)
0190     {
0191     }
0192 
0193     next_action resume(connection_state_data& st, error_code ec)
0194     {
0195         next_action act;
0196 
0197         switch (resume_point_)
0198         {
0199         case 0:
0200 
0201             // Clear previous state
0202             diag_->clear();
0203             setup_response();
0204 
0205             // If the request is empty, don't do anything
0206             if (stages_.empty())
0207                 break;
0208 
0209             // Write the request. use_ssl is attached by top_level_algo
0210             BOOST_MYSQL_YIELD(resume_point_, 1, next_action::write({request_buffer_, false}))
0211 
0212             // If writing the request failed, fail all the stages with the given error code
0213             if (ec)
0214             {
0215                 pipeline_ec_ = ec;
0216                 has_hatal_error_ = true;
0217             }
0218 
0219             // For each stage
0220             for (; current_stage_index_ < stages_.size(); ++current_stage_index_)
0221             {
0222                 // If there was a fatal error, just set the error and move forward
0223                 if (has_hatal_error_)
0224                 {
0225                     set_stage_error(pipeline_ec_, diagnostics(*diag_));
0226                     continue;
0227                 }
0228 
0229                 // Setup the stage
0230                 setup_current_stage(st);
0231 
0232                 // Run it until completion
0233                 ec.clear();
0234                 while (!(act = resume_read_algo(st, ec)).is_done())
0235                     BOOST_MYSQL_YIELD(resume_point_, 2, act)
0236 
0237                 // Process the stage's result
0238                 on_stage_finished(st, act.error());
0239             }
0240         }
0241 
0242         return pipeline_ec_;
0243     }
0244 };
0245 
0246 }  // namespace detail
0247 }  // namespace mysql
0248 }  // namespace boost
0249 
0250 #endif