Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-17 08:39:18

0001 //
0002 // Copyright (c) 2019-2025 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MYSQL_IMPL_INTERNAL_SANSIO_RUN_PIPELINE_HPP
0009 #define BOOST_MYSQL_IMPL_INTERNAL_SANSIO_RUN_PIPELINE_HPP
0010 
0011 #include <boost/mysql/character_set.hpp>
0012 #include <boost/mysql/diagnostics.hpp>
0013 #include <boost/mysql/error_code.hpp>
0014 #include <boost/mysql/is_fatal_error.hpp>
0015 #include <boost/mysql/pipeline.hpp>
0016 
0017 #include <boost/mysql/detail/access.hpp>
0018 #include <boost/mysql/detail/algo_params.hpp>
0019 #include <boost/mysql/detail/next_action.hpp>
0020 #include <boost/mysql/detail/pipeline.hpp>
0021 
0022 #include <boost/mysql/impl/internal/sansio/connection_state_data.hpp>
0023 #include <boost/mysql/impl/internal/sansio/execute.hpp>
0024 #include <boost/mysql/impl/internal/sansio/ping.hpp>
0025 #include <boost/mysql/impl/internal/sansio/prepare_statement.hpp>
0026 #include <boost/mysql/impl/internal/sansio/reset_connection.hpp>
0027 #include <boost/mysql/impl/internal/sansio/set_character_set.hpp>
0028 
0029 #include <boost/assert.hpp>
0030 #include <boost/core/span.hpp>
0031 
0032 #include <cstddef>
0033 #include <vector>
0034 
0035 namespace boost {
0036 namespace mysql {
0037 namespace detail {
0038 
0039 class run_pipeline_algo
0040 {
0041     union any_read_algo
0042     {
0043         std::nullptr_t nothing;
0044         read_execute_response_algo execute;
0045         read_prepare_statement_response_algo prepare_statement;
0046         read_reset_connection_response_algo reset_connection;
0047         read_ping_response_algo ping;
0048         read_set_character_set_response_algo set_character_set;
0049 
0050         any_read_algo() noexcept : nothing{} {}
0051     };
0052 
0053     span<const std::uint8_t> request_buffer_;
0054     span<const pipeline_request_stage> stages_;
0055     std::vector<stage_response>* response_;
0056 
0057     int resume_point_{0};
0058     std::size_t current_stage_index_{0};
0059     error_code pipeline_ec_;  // Result of the entire operation
0060     bool has_hatal_error_{};  // If true, fail further stages with pipeline_ec_
0061     any_read_algo read_response_algo_;
0062     diagnostics temp_diag_;
0063 
0064     void setup_response()
0065     {
0066         if (response_)
0067         {
0068             // Create as many response items as request stages
0069             response_->resize(stages_.size());
0070 
0071             // Setup them
0072             for (std::size_t i = 0u; i < stages_.size(); ++i)
0073             {
0074                 // Execution stages need to be initialized to results objects.
0075                 // Otherwise, clear any previous content
0076                 auto& impl = access::get_impl((*response_)[i]);
0077                 if (stages_[i].kind == pipeline_stage_kind::execute)
0078                     impl.emplace_results();
0079                 else
0080                     impl.emplace_error();
0081             }
0082         }
0083     }
0084 
0085     void setup_current_stage(const connection_state_data& st)
0086     {
0087         // Reset previous data
0088         temp_diag_.clear();
0089 
0090         // Setup read algo
0091         auto stage = stages_[current_stage_index_];
0092         switch (stage.kind)
0093         {
0094         case pipeline_stage_kind::execute:
0095         {
0096             BOOST_ASSERT(response_ != nullptr);  // we don't support execution ignoring the response
0097             auto& processor = access::get_impl((*response_)[current_stage_index_]).get_processor();
0098             processor.reset(stage.stage_specific.enc, st.meta_mode);
0099             processor.sequence_number() = stage.seqnum;
0100             read_response_algo_.execute = {&processor};
0101             break;
0102         }
0103         case pipeline_stage_kind::prepare_statement:
0104             read_response_algo_.prepare_statement = {stage.seqnum};
0105             break;
0106         case pipeline_stage_kind::close_statement:
0107             // Close statement doesn't have a response
0108             read_response_algo_.nothing = nullptr;
0109             break;
0110         case pipeline_stage_kind::set_character_set:
0111             read_response_algo_.set_character_set = {stage.stage_specific.charset, stage.seqnum};
0112             break;
0113         case pipeline_stage_kind::reset_connection:
0114             read_response_algo_.reset_connection = {stage.seqnum};
0115             break;
0116         case pipeline_stage_kind::ping: read_response_algo_.ping = {stage.seqnum}; break;
0117         default: BOOST_ASSERT(false);  // LCOV_EXCL_LINE
0118         }
0119     }
0120 
0121     void set_stage_error(error_code ec, diagnostics&& diag)
0122     {
0123         if (response_)
0124         {
0125             access::get_impl((*response_)[current_stage_index_]).set_error(ec, std::move(diag));
0126         }
0127     }
0128 
0129     void on_stage_finished(const connection_state_data& st, diagnostics& output_diag, error_code stage_ec)
0130     {
0131         if (stage_ec)
0132         {
0133             // If the error was fatal, fail successive stages.
0134             // This error is the result of the operation
0135             if (is_fatal_error(stage_ec))
0136             {
0137                 pipeline_ec_ = stage_ec;
0138                 output_diag = temp_diag_;
0139                 has_hatal_error_ = true;
0140             }
0141             else if (!pipeline_ec_)
0142             {
0143                 // In the absence of fatal errors, the first error we encounter is the result of the operation
0144                 pipeline_ec_ = stage_ec;
0145                 output_diag = temp_diag_;
0146             }
0147 
0148             // Propagate the error
0149             if (response_ != nullptr)
0150             {
0151                 set_stage_error(stage_ec, std::move(temp_diag_));
0152             }
0153         }
0154         else
0155         {
0156             if (stages_[current_stage_index_].kind == pipeline_stage_kind::prepare_statement)
0157             {
0158                 // Propagate results. We don't support prepare statements ignoring the response
0159                 BOOST_ASSERT(response_ != nullptr);
0160                 access::get_impl((*response_)[current_stage_index_])
0161                     .set_result(read_response_algo_.prepare_statement.result(st));
0162             }
0163         }
0164     }
0165 
0166     next_action resume_read_algo(connection_state_data& st, error_code ec)
0167     {
0168         switch (stages_[current_stage_index_].kind)
0169         {
0170         case pipeline_stage_kind::execute: return read_response_algo_.execute.resume(st, temp_diag_, ec);
0171         case pipeline_stage_kind::prepare_statement:
0172             return read_response_algo_.prepare_statement.resume(st, temp_diag_, ec);
0173         case pipeline_stage_kind::reset_connection:
0174             return read_response_algo_.reset_connection.resume(st, temp_diag_, ec);
0175         case pipeline_stage_kind::set_character_set:
0176             return read_response_algo_.set_character_set.resume(st, temp_diag_, ec);
0177         case pipeline_stage_kind::ping: return read_response_algo_.ping.resume(st, temp_diag_, ec);
0178         case pipeline_stage_kind::close_statement: return next_action();  // has no response
0179         default: BOOST_ASSERT(false); return next_action();               // LCOV_EXCL_LINE
0180         }
0181     }
0182 
0183 public:
0184     run_pipeline_algo(run_pipeline_algo_params params) noexcept
0185         : request_buffer_(params.request_buffer), stages_(params.request_stages), response_(params.response)
0186     {
0187     }
0188 
0189     next_action resume(connection_state_data& st, diagnostics& diag, error_code ec)
0190     {
0191         next_action act;
0192 
0193         switch (resume_point_)
0194         {
0195         case 0:
0196 
0197             // Clear previous state
0198             setup_response();
0199 
0200             // If the request is empty, don't do anything
0201             if (stages_.empty())
0202                 break;
0203 
0204             // Check status
0205             ec = st.check_status_ready();
0206             if (ec)
0207                 return ec;
0208 
0209             // Write the request. use_ssl is attached by top_level_algo
0210             BOOST_MYSQL_YIELD(resume_point_, 1, next_action::write({request_buffer_, false}))
0211 
0212             // If writing the request failed, fail all the stages with the given error code
0213             if (ec)
0214             {
0215                 pipeline_ec_ = ec;
0216                 has_hatal_error_ = true;
0217             }
0218 
0219             // For each stage
0220             for (; current_stage_index_ < stages_.size(); ++current_stage_index_)
0221             {
0222                 // If there was a fatal error, just set the error and move forward
0223                 if (has_hatal_error_)
0224                 {
0225                     set_stage_error(pipeline_ec_, diagnostics(diag));
0226                     continue;
0227                 }
0228 
0229                 // Setup the stage
0230                 setup_current_stage(st);
0231 
0232                 // Run it until completion
0233                 ec.clear();
0234                 while (!(act = resume_read_algo(st, ec)).is_done())
0235                     BOOST_MYSQL_YIELD(resume_point_, 2, act)
0236 
0237                 // Process the stage's result
0238                 on_stage_finished(st, diag, act.error());
0239             }
0240         }
0241 
0242         return pipeline_ec_;
0243     }
0244 };
0245 
0246 }  // namespace detail
0247 }  // namespace mysql
0248 }  // namespace boost
0249 
0250 #endif