Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-04-17 08:38:58

0001 //
0002 // Copyright (c) 2019-2024 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_CONNECTION_POOL_IMPL_HPP
0009 #define BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_CONNECTION_POOL_IMPL_HPP
0010 
0011 #include <boost/mysql/any_connection.hpp>
0012 #include <boost/mysql/character_set.hpp>
0013 #include <boost/mysql/client_errc.hpp>
0014 #include <boost/mysql/diagnostics.hpp>
0015 #include <boost/mysql/error_code.hpp>
0016 #include <boost/mysql/pool_params.hpp>
0017 
0018 #include <boost/mysql/detail/config.hpp>
0019 
0020 #include <boost/mysql/impl/internal/connection_pool/connection_node.hpp>
0021 #include <boost/mysql/impl/internal/connection_pool/internal_pool_params.hpp>
0022 #include <boost/mysql/impl/internal/connection_pool/timer_list.hpp>
0023 #include <boost/mysql/impl/internal/connection_pool/wait_group.hpp>
0024 #include <boost/mysql/impl/internal/coroutine.hpp>
0025 
0026 #include <boost/asio/any_completion_handler.hpp>
0027 #include <boost/asio/any_io_executor.hpp>
0028 #include <boost/asio/bind_executor.hpp>
0029 #include <boost/asio/compose.hpp>
0030 #include <boost/asio/deferred.hpp>
0031 #include <boost/asio/dispatch.hpp>
0032 #include <boost/asio/error.hpp>
0033 #include <boost/asio/post.hpp>
0034 #include <boost/core/ignore_unused.hpp>
0035 
0036 #include <chrono>
0037 #include <cstddef>
0038 #include <list>
0039 #include <memory>
0040 
0041 namespace boost {
0042 namespace mysql {
0043 namespace detail {
0044 
0045 inline pipeline_request make_reset_pipeline()
0046 {
0047     pipeline_request req;
0048     req.add_reset_connection().add_set_character_set(utf8mb4_charset);
0049     return req;
0050 }
0051 
0052 // Templating on ConnectionWrapper is useful for mocking in tests.
0053 // Production code always uses ConnectionWrapper = pooled_connection.
0054 template <class IoTraits, class ConnectionWrapper>
0055 class basic_pool_impl : public std::enable_shared_from_this<basic_pool_impl<IoTraits, ConnectionWrapper>>
0056 {
0057     using this_type = basic_pool_impl<IoTraits, ConnectionWrapper>;
0058     using node_type = basic_connection_node<IoTraits>;
0059     using timer_type = typename IoTraits::timer_type;
0060     using timer_block_type = timer_block<timer_type>;
0061     using shared_state_type = conn_shared_state<IoTraits>;
0062 
0063     enum class state_t
0064     {
0065         initial,
0066         running,
0067         cancelled,
0068     };
0069 
0070     state_t state_{state_t::initial};
0071     internal_pool_params params_;
0072     asio::any_io_executor ex_;
0073     asio::any_io_executor conn_ex_;
0074     std::list<node_type> all_conns_;
0075     shared_state_type shared_st_;
0076     wait_group wait_gp_;
0077     timer_type cancel_timer_;
0078     const pipeline_request reset_pipeline_req_{make_reset_pipeline()};
0079 
0080     std::shared_ptr<this_type> shared_from_this_wrapper()
0081     {
0082         // Some compilers get confused without this explicit cast
0083         return static_cast<std::enable_shared_from_this<this_type>*>(this)->shared_from_this();
0084     }
0085 
0086     void create_connection()
0087     {
0088         all_conns_.emplace_back(params_, ex_, conn_ex_, shared_st_, &reset_pipeline_req_);
0089         wait_gp_.run_task(all_conns_.back().async_run(asio::deferred));
0090     }
0091 
0092     error_code get_diagnostics(diagnostics* diag) const
0093     {
0094         if (state_ == state_t::cancelled)
0095         {
0096             return client_errc::cancelled;
0097         }
0098         else if (shared_st_.last_ec)
0099         {
0100             if (diag)
0101                 *diag = shared_st_.last_diag;
0102             return shared_st_.last_ec;
0103         }
0104         else
0105         {
0106             return client_errc::timeout;
0107         }
0108     }
0109 
0110     struct run_op
0111     {
0112         int resume_point_{0};
0113         std::shared_ptr<this_type> obj_;
0114 
0115         run_op(std::shared_ptr<this_type> obj) noexcept : obj_(std::move(obj)) {}
0116 
0117         template <class Self>
0118         void operator()(Self& self, error_code ec = {})
0119         {
0120             // TODO: per-operation cancellation here doesn't do the right thing
0121             boost::ignore_unused(ec);
0122             switch (resume_point_)
0123             {
0124             case 0:
0125 
0126                 // Ensure we run within the pool executor (possibly a strand)
0127                 BOOST_MYSQL_YIELD(resume_point_, 1, asio::dispatch(obj_->ex_, std::move(self)))
0128 
0129                 // Check that we're not running and set the state adequately
0130                 BOOST_ASSERT(obj_->state_ == state_t::initial);
0131                 obj_->state_ = state_t::running;
0132 
0133                 // Create the initial connections
0134                 for (std::size_t i = 0; i < obj_->params_.initial_size; ++i)
0135                     obj_->create_connection();
0136 
0137                 // Wait for the cancel notification to arrive.
0138                 BOOST_MYSQL_YIELD(resume_point_, 2, obj_->cancel_timer_.async_wait(std::move(self)))
0139 
0140                 // If the token passed to async_run had a bound executor,
0141                 // the handler will be invoked within that executor.
0142                 // Dispatch so we run within the pool's executor.
0143                 BOOST_MYSQL_YIELD(resume_point_, 3, asio::dispatch(obj_->ex_, std::move(self)))
0144 
0145                 // Deliver the cancel notification to all other tasks
0146                 obj_->state_ = state_t::cancelled;
0147                 for (auto& conn : obj_->all_conns_)
0148                     conn.cancel();
0149                 obj_->shared_st_.pending_requests.notify_all();
0150 
0151                 // Wait for all connection tasks to exit
0152                 BOOST_MYSQL_YIELD(resume_point_, 4, obj_->wait_gp_.async_wait(std::move(self)))
0153 
0154                 // Done
0155                 obj_.reset();
0156                 self.complete(error_code());
0157             }
0158         }
0159     };
0160 
0161     struct get_connection_op
0162     {
0163         int resume_point_{0};
0164         std::shared_ptr<this_type> obj_;
0165         std::chrono::steady_clock::time_point timeout_;
0166         diagnostics* diag_;
0167         std::unique_ptr<timer_block_type> timer_;
0168         error_code stored_ec_;
0169 
0170         get_connection_op(
0171             std::shared_ptr<this_type> obj,
0172             std::chrono::steady_clock::time_point timeout,
0173             diagnostics* diag
0174         ) noexcept
0175             : obj_(std::move(obj)), timeout_(timeout), diag_(diag)
0176         {
0177         }
0178 
0179         template <class Self>
0180         void do_complete(Self& self, error_code ec, ConnectionWrapper conn)
0181         {
0182             // Resetting the timer will remove it from the list thanks to the auto-unlink feature
0183             timer_.reset();
0184             obj_.reset();
0185             self.complete(ec, std::move(conn));
0186         }
0187 
0188         template <class Self>
0189         void complete_success(Self& self, node_type& node)
0190         {
0191             node.mark_as_in_use();
0192             do_complete(self, error_code(), ConnectionWrapper(node, std::move(obj_)));
0193         }
0194 
0195         template <class Self>
0196         void operator()(Self& self, error_code ec = {})
0197         {
0198             switch (resume_point_)
0199             {
0200             case 0:
0201 
0202                 // Clear diagnostics
0203                 if (diag_)
0204                     diag_->clear();
0205 
0206                 // Ensure we run within the pool's executor (or the handler's) (possibly a strand)
0207                 BOOST_MYSQL_YIELD(resume_point_, 1, asio::post(obj_->ex_, std::move(self)))
0208 
0209                 // This loop guards us against possible race conditions
0210                 // between waiting on the pending request timer and getting the connection
0211                 while (true)
0212                 {
0213                     // If we're not running yet, or were cancelled, just return
0214                     if (obj_->state_ != state_t::running)
0215                     {
0216                         do_complete(
0217                             self,
0218                             obj_->state_ == state_t::initial ? client_errc::pool_not_running
0219                                                              : client_errc::cancelled,
0220                             ConnectionWrapper()
0221                         );
0222                         return;
0223                     }
0224 
0225                     // Try to get a connection without blocking
0226                     if (!obj_->shared_st_.idle_list.empty())
0227                     {
0228                         // There was a connection. Done.
0229                         complete_success(self, obj_->shared_st_.idle_list.front());
0230                         return;
0231                     }
0232 
0233                     // No luck. If there is room for more connections, create one.
0234                     // Don't create new connections if we have other connections pending
0235                     // (i.e. being connected, reset... ) - otherwise pool size increases for
0236                     // no reason when there is no connectivity.
0237                     if (obj_->all_conns_.size() < obj_->params_.max_size &&
0238                         obj_->shared_st_.num_pending_connections == 0u)
0239                     {
0240                         obj_->create_connection();
0241                     }
0242 
0243                     // Allocate a timer to perform waits.
0244                     if (!timer_)
0245                     {
0246                         timer_.reset(new timer_block_type(obj_->ex_));
0247                         obj_->shared_st_.pending_requests.push_back(*timer_);
0248                     }
0249 
0250                     // Wait to be notified, or until a timeout happens
0251                     timer_->timer.expires_at(timeout_);
0252                     BOOST_MYSQL_YIELD(resume_point_, 2, timer_->timer.async_wait(std::move(self)))
0253                     stored_ec_ = ec;
0254 
0255                     // If the token passed to async_run had a bound executor,
0256                     // the handler will be invoked within that executor.
0257                     // Dispatch so we run within the pool's executor.
0258                     BOOST_MYSQL_YIELD(resume_point_, 3, asio::dispatch(obj_->ex_, std::move(self)))
0259 
0260                     if (!stored_ec_)
0261                     {
0262                         // We've got a timeout. Try to give as much info as possible
0263                         do_complete(self, obj_->get_diagnostics(diag_), ConnectionWrapper());
0264                         return;
0265                     }
0266                 }
0267             }
0268         }
0269     };
0270 
0271 public:
0272     basic_pool_impl(pool_executor_params&& ex_params, pool_params&& params)
0273         : params_(make_internal_pool_params(std::move(params))),
0274           ex_(std::move(ex_params.pool_executor)),
0275           conn_ex_(std::move(ex_params.connection_executor)),
0276           wait_gp_(ex_),
0277           cancel_timer_(ex_, (std::chrono::steady_clock::time_point::max)())
0278     {
0279     }
0280 
0281     using executor_type = asio::any_io_executor;
0282 
0283     executor_type get_executor() { return ex_; }
0284 
0285     template <class CompletionToken>
0286     BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void(error_code))
0287     async_run(CompletionToken&& token)
0288     {
0289         return asio::async_compose<CompletionToken, void(error_code)>(
0290             run_op(shared_from_this_wrapper()),
0291             token,
0292             ex_
0293         );
0294     }
0295 
0296     // Not thread-safe
0297     void cancel_unsafe() { cancel_timer_.expires_at((std::chrono::steady_clock::time_point::min)()); }
0298 
0299     template <class CompletionToken>
0300     BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void(error_code, ConnectionWrapper))
0301     async_get_connection(
0302         std::chrono::steady_clock::time_point timeout,
0303         diagnostics* diag,
0304         CompletionToken&& token
0305     )
0306     {
0307         return asio::async_compose<CompletionToken, void(error_code, ConnectionWrapper)>(
0308             get_connection_op(shared_from_this_wrapper(), timeout, diag),
0309             token,
0310             ex_
0311         );
0312     }
0313 
0314     template <class CompletionToken>
0315     BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void(error_code, ConnectionWrapper))
0316     async_get_connection(
0317         std::chrono::steady_clock::duration timeout,
0318         diagnostics* diag,
0319         CompletionToken&& token
0320     )
0321     {
0322         return async_get_connection(
0323             timeout.count() > 0 ? std::chrono::steady_clock::now() + timeout
0324                                 : (std::chrono::steady_clock::time_point::max)(),
0325             diag,
0326             std::forward<CompletionToken>(token)
0327         );
0328     }
0329 
0330     // Exposed for testing
0331     std::list<node_type>& nodes() noexcept { return all_conns_; }
0332     shared_state_type& shared_state() noexcept { return shared_st_; }
0333     internal_pool_params& params() noexcept { return params_; }
0334     asio::any_io_executor connection_ex() noexcept { return conn_ex_; }
0335     const pipeline_request& reset_pipeline_request() const { return reset_pipeline_req_; }
0336 };
0337 
0338 }  // namespace detail
0339 }  // namespace mysql
0340 }  // namespace boost
0341 
0342 #endif