Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-17 08:39:15

0001 //
0002 // Copyright (c) 2019-2025 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_CONNECTION_POOL_IMPL_HPP
0009 #define BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_CONNECTION_POOL_IMPL_HPP
0010 
0011 #include <boost/mysql/any_connection.hpp>
0012 #include <boost/mysql/character_set.hpp>
0013 #include <boost/mysql/client_errc.hpp>
0014 #include <boost/mysql/diagnostics.hpp>
0015 #include <boost/mysql/error_code.hpp>
0016 #include <boost/mysql/pool_params.hpp>
0017 
0018 #include <boost/mysql/detail/config.hpp>
0019 
0020 #include <boost/mysql/impl/internal/connection_pool/connection_node.hpp>
0021 #include <boost/mysql/impl/internal/connection_pool/internal_pool_params.hpp>
0022 #include <boost/mysql/impl/internal/connection_pool/sansio_connection_node.hpp>
0023 #include <boost/mysql/impl/internal/coroutine.hpp>
0024 
0025 #include <boost/asio/any_completion_handler.hpp>
0026 #include <boost/asio/any_io_executor.hpp>
0027 #include <boost/asio/associated_cancellation_slot.hpp>
0028 #include <boost/asio/basic_waitable_timer.hpp>
0029 #include <boost/asio/bind_cancellation_slot.hpp>
0030 #include <boost/asio/bind_executor.hpp>
0031 #include <boost/asio/cancellation_signal.hpp>
0032 #include <boost/asio/cancellation_type.hpp>
0033 #include <boost/asio/compose.hpp>
0034 #include <boost/asio/detached.hpp>
0035 #include <boost/asio/dispatch.hpp>
0036 #include <boost/asio/error.hpp>
0037 #include <boost/asio/immediate.hpp>
0038 #include <boost/asio/post.hpp>
0039 #include <boost/asio/strand.hpp>
0040 
0041 #include <chrono>
0042 #include <cstddef>
0043 #include <list>
0044 #include <memory>
0045 #include <utility>
0046 
0047 namespace boost {
0048 namespace mysql {
0049 namespace detail {
0050 
0051 inline pipeline_request make_reset_pipeline()
0052 {
0053     pipeline_request req;
0054     req.add_reset_connection().add_set_character_set(utf8mb4_charset);
0055     return req;
0056 }
0057 
0058 // Templating on ConnectionWrapper is useful for mocking in tests.
0059 // Production code always uses ConnectionWrapper = pooled_connection.
0060 template <class ConnectionType, class ClockType, class ConnectionWrapper>
0061 class basic_pool_impl
0062     : public std::enable_shared_from_this<basic_pool_impl<ConnectionType, ClockType, ConnectionWrapper>>
0063 {
0064     using this_type = basic_pool_impl<ConnectionType, ClockType, ConnectionWrapper>;
0065     using node_type = basic_connection_node<ConnectionType, ClockType>;
0066     using timer_type = asio::basic_waitable_timer<ClockType>;
0067     using shared_state_type = conn_shared_state<ConnectionType, ClockType>;
0068 
0069     enum class state_t
0070     {
0071         initial,
0072         running,
0073         cancelled,
0074     };
0075 
0076     // The passed pool executor, as is
0077     asio::any_io_executor original_pool_ex_;
0078 
0079     // If thread_safe, a strand wrapping inner_pool_ex_, otherwise inner_pool_ex_
0080     asio::any_io_executor pool_ex_;
0081 
0082     // executor to be used by connections
0083     asio::any_io_executor conn_ex_;
0084 
0085     // Rest of the parameters
0086     internal_pool_params params_;
0087 
0088     // State
0089     state_t state_{state_t::initial};
0090     std::list<node_type> all_conns_;
0091     shared_state_type shared_st_;
0092     timer_type cancel_timer_;
0093     const pipeline_request reset_pipeline_req_{make_reset_pipeline()};
0094 
0095     std::shared_ptr<this_type> shared_from_this_wrapper()
0096     {
0097         // Some compilers get confused without this explicit cast
0098         return static_cast<std::enable_shared_from_this<this_type>*>(this)->shared_from_this();
0099     }
0100 
0101     // Create and run one connection
0102     void create_connection()
0103     {
0104         // Connection tasks always run in the pool's executor
0105         all_conns_.emplace_back(params_, pool_ex_, conn_ex_, shared_st_, &reset_pipeline_req_);
0106         all_conns_.back().async_run(asio::bind_executor(pool_ex_, asio::detached));
0107     }
0108 
0109     // Create and run connections as required by the current config and state
0110     void create_connections()
0111     {
0112         // Calculate how many we should create
0113         std::size_t n = num_connections_to_create(
0114             params_.initial_size,
0115             params_.max_size,
0116             all_conns_.size(),
0117             shared_st_.num_pending_connections,
0118             shared_st_.num_pending_requests
0119         );
0120 
0121         // Create them
0122         BOOST_ASSERT((all_conns_.size() + n) <= params_.max_size);
0123         for (std::size_t i = 0; i < n; ++i)
0124             create_connection();
0125     }
0126 
0127     // An async_get_connection request is about to wait for an available connection
0128     void enter_request_pending()
0129     {
0130         // Record that we're pending
0131         ++shared_st_.num_pending_requests;
0132 
0133         // Create new connections, if required.
0134         // Don't create any connections if we're not yet running,
0135         // since this would leave connections running after run exits
0136         if (state_ == state_t::running)
0137             create_connections();
0138     }
0139 
0140     // An async_get_connection request finished waiting
0141     void exit_request_pending()
0142     {
0143         // Record that we're no longer pending
0144         BOOST_ASSERT(shared_st_.num_pending_requests > 0u);
0145         --shared_st_.num_pending_requests;
0146     }
0147 
0148     node_type* try_get_connection()
0149     {
0150         if (!shared_st_.idle_list.empty())
0151         {
0152             node_type& res = shared_st_.idle_list.front();
0153             res.mark_as_in_use();
0154             return &res;
0155         }
0156         else
0157         {
0158             return nullptr;
0159         }
0160     }
0161 
0162     template <class OpSelf>
0163     void enter_strand(OpSelf& self)
0164     {
0165         asio::dispatch(asio::bind_executor(pool_ex_, std::move(self)));
0166     }
0167 
0168     template <class OpSelf>
0169     void exit_strand(OpSelf& self)
0170     {
0171         asio::post(get_executor(), std::move(self));
0172     }
0173 
0174     template <class OpSelf>
0175     void wait_for_connections(OpSelf& self)
0176     {
0177         // Having this encapsulated helps prevent subtle use-after-move errors
0178         if (params_.thread_safe)
0179         {
0180             shared_st_.idle_connections_cv.async_wait(asio::bind_executor(pool_ex_, std::move(self)));
0181         }
0182         else
0183         {
0184             shared_st_.idle_connections_cv.async_wait(std::move(self));
0185         }
0186     }
0187 
0188     struct run_op
0189     {
0190         int resume_point_{0};
0191         std::shared_ptr<this_type> obj_;
0192         asio::cancellation_slot cancel_slot_;
0193 
0194         run_op(std::shared_ptr<this_type> obj, asio::cancellation_slot slot) noexcept
0195             : obj_(std::move(obj)), cancel_slot_(slot)
0196         {
0197         }
0198 
0199         struct cancel_handler
0200         {
0201             this_type* self;
0202 
0203             void operator()(asio::cancellation_type_t type) const
0204             {
0205                 if (run_supports_cancel_type(type))
0206                     self->cancel();
0207             }
0208         };
0209 
0210         template <class Self>
0211         void operator()(Self& self, error_code = {})
0212         {
0213             switch (resume_point_)
0214             {
0215             case 0:
0216                 // Emplace a cancellation handler, if required
0217                 if (cancel_slot_.is_connected())
0218                 {
0219                     cancel_slot_.template emplace<cancel_handler>(cancel_handler{obj_.get()});
0220                 }
0221 
0222                 // Ensure we run within the strand
0223                 if (obj_->params_.thread_safe)
0224                 {
0225                     BOOST_MYSQL_YIELD(resume_point_, 1, obj_->enter_strand(self))
0226                 }
0227 
0228                 // Check that we're not running and set the state adequately
0229                 BOOST_ASSERT(obj_->state_ == state_t::initial);
0230                 obj_->state_ = state_t::running;
0231 
0232                 // Create the initial connections
0233                 obj_->create_connections();
0234 
0235                 // Wait for the cancel notification to arrive.
0236                 BOOST_MYSQL_YIELD(resume_point_, 2, obj_->cancel_timer_.async_wait(std::move(self)))
0237 
0238                 // Ensure we run within the strand
0239                 if (obj_->params_.thread_safe)
0240                 {
0241                     BOOST_MYSQL_YIELD(resume_point_, 3, obj_->enter_strand(self))
0242                 }
0243 
0244                 // Deliver the cancel notification to all other tasks
0245                 obj_->state_ = state_t::cancelled;
0246                 for (auto& conn : obj_->all_conns_)
0247                     conn.cancel();
0248                 obj_->shared_st_.idle_connections_cv.expires_at((ClockType::time_point::min)());
0249 
0250                 // Wait for all connection tasks to exit
0251                 BOOST_MYSQL_YIELD(
0252                     resume_point_,
0253                     4,
0254                     obj_->shared_st_.conns_finished_cv.async_wait(std::move(self))
0255                 )
0256 
0257                 // Done
0258                 cancel_slot_.clear();
0259                 obj_.reset();
0260                 self.complete(error_code());
0261             }
0262         }
0263     };
0264 
0265     struct get_connection_op
0266     {
0267         // Operation arguments
0268         std::shared_ptr<this_type> obj;
0269         diagnostics* diag;
0270 
0271         // The proxy signal. Used in thread-safe mode. Present here only for
0272         // lifetime management
0273         std::shared_ptr<asio::cancellation_signal> sig;
0274 
0275         // The original cancellation slot. Needed for proper cleanup after we proxy
0276         // the signal in thread-safe mode
0277         asio::cancellation_slot parent_slot;
0278 
0279         // State
0280         int resume_point{0};
0281         error_code result_ec;
0282         node_type* result_conn{};
0283         bool has_waited{false};
0284 
0285         get_connection_op(
0286             std::shared_ptr<this_type> obj,
0287             diagnostics* diag,
0288             std::shared_ptr<asio::cancellation_signal> sig,
0289             asio::cancellation_slot parent_slot
0290         ) noexcept
0291             : obj(std::move(obj)), diag(diag), sig(std::move(sig)), parent_slot(parent_slot)
0292         {
0293         }
0294 
0295         bool thread_safe() const { return obj->params_.thread_safe; }
0296 
0297         template <class Self>
0298         void do_complete(Self& self)
0299         {
0300             auto wr = result_ec ? ConnectionWrapper() : ConnectionWrapper(*result_conn, std::move(obj));
0301             parent_slot.clear();
0302             sig.reset();
0303             self.complete(result_ec, std::move(wr));
0304         }
0305 
0306         template <class Self>
0307         void operator()(Self& self, error_code = {})
0308         {
0309             switch (resume_point)
0310             {
0311             case 0:
0312                 // This op supports total cancellation. Must be explicitly enabled,
0313                 // as composed ops only support terminal cancellation by default.
0314                 self.reset_cancellation_state(asio::enable_total_cancellation());
0315 
0316                 // Clear diagnostics
0317                 if (diag)
0318                     diag->clear();
0319 
0320                 // Enter the strand
0321                 if (thread_safe())
0322                 {
0323                     BOOST_MYSQL_YIELD(resume_point, 1, obj->enter_strand(self))
0324                 }
0325 
0326                 // This loop guards us against possible race conditions
0327                 // between waiting on the pending request timer and getting the
0328                 // connection
0329                 while (true)
0330                 {
0331                     if (obj->state_ == state_t::cancelled)
0332                     {
0333                         // The pool was cancelled
0334                         result_ec = client_errc::pool_cancelled;
0335                         break;
0336                     }
0337                     else if (get_connection_supports_cancel_type(self.cancelled()))
0338                     {
0339                         // The operation was cancelled. Try to provide diagnostics
0340                         if (obj->state_ == state_t::initial)
0341                         {
0342                             // The operation failed because the pool is not running
0343                             result_ec = client_errc::pool_not_running;
0344                         }
0345                         else
0346                         {
0347                             result_ec = client_errc::no_connection_available;
0348                             if (diag)
0349                                 *diag = obj->shared_st_.last_connect_diag;
0350                         }
0351                         break;
0352                     }
0353 
0354                     // Try to get a connection
0355                     if ((result_conn = obj->try_get_connection()) != nullptr)
0356                     {
0357                         // There was a connection
0358                         break;
0359                     }
0360 
0361                     // No luck. Record that we're waiting for a connection.
0362                     obj->enter_request_pending();
0363 
0364                     // Wait to be notified, or until a cancellation happens
0365                     BOOST_MYSQL_YIELD(resume_point, 2, obj->wait_for_connections(self))
0366 
0367                     // Record that we're no longer pending
0368                     obj->exit_request_pending();
0369 
0370                     // Remember that we have waited, so completions are dispatched
0371                     // correctly
0372                     has_waited = true;
0373                 }
0374 
0375                 // Perform any required dispatching before completing
0376                 if (thread_safe())
0377                 {
0378                     // Exit the strand
0379                     BOOST_MYSQL_YIELD(resume_point, 3, obj->exit_strand(self))
0380                 }
0381                 else if (!has_waited)
0382                 {
0383                     // This is an immediate completion
0384                     BOOST_MYSQL_YIELD(
0385                         resume_point,
0386                         4,
0387                         asio::async_immediate(self.get_io_executor(), std::move(self))
0388                     )
0389                 }
0390 
0391                 // Done
0392                 do_complete(self);
0393             }
0394         }
0395     };
0396 
0397     // Cancel handler to use for get_connection in thread-safe mode.
0398     // This imitates what Asio does for composed ops
0399     struct get_connection_cancel_handler
0400     {
0401         // Pointer to the proxy cancellation signal
0402         // Lifetime managed by the get_connection composed op
0403         std::weak_ptr<asio::cancellation_signal> sig;
0404 
0405         // Pointer to the pool object
0406         std::weak_ptr<this_type> obj;
0407 
0408         get_connection_cancel_handler(
0409             std::weak_ptr<asio::cancellation_signal> sig,
0410             std::weak_ptr<this_type> obj
0411         ) noexcept
0412             : sig(std::move(sig)), obj(std::move(obj))
0413         {
0414         }
0415 
0416         void operator()(asio::cancellation_type_t type)
0417         {
0418             if (get_connection_supports_cancel_type(type))
0419             {
0420                 // Try to get the pool object back
0421                 std::shared_ptr<this_type> obj_shared = obj.lock();
0422                 if (obj_shared)
0423                 {
0424                     // Dispatch to the strand. We don't need to keep a reference to the
0425                     // pool because even if it was destroyed before running the handler,
0426                     // the strand would be alive.
0427                     auto sig_copy = sig;
0428                     asio::dispatch(asio::bind_executor(obj_shared->strand(), [sig_copy, type]() {
0429                         // If the operation has already completed, the weak ptr will be
0430                         // invalid
0431                         auto sig_shared = sig_copy.lock();
0432                         if (sig_shared)
0433                         {
0434                             sig_shared->emit(type);
0435                         }
0436                     }));
0437                 }
0438             }
0439         }
0440     };
0441 
0442     // Not thread-safe
0443     void cancel_unsafe() { cancel_timer_.expires_at((std::chrono::steady_clock::time_point::min)()); }
0444 
0445 public:
0446     basic_pool_impl(asio::any_io_executor ex, pool_params&& params)
0447         : original_pool_ex_(std::move(ex)),
0448           pool_ex_(params.thread_safe ? asio::make_strand(original_pool_ex_) : original_pool_ex_),
0449           conn_ex_(params.connection_executor ? std::move(params.connection_executor) : original_pool_ex_),
0450           params_(make_internal_pool_params(std::move(params))),
0451           shared_st_(pool_ex_),
0452           cancel_timer_(pool_ex_, (std::chrono::steady_clock::time_point::max)())
0453     {
0454     }
0455 
0456     asio::strand<asio::any_io_executor> strand()
0457     {
0458         BOOST_ASSERT(params_.thread_safe);
0459         return *pool_ex_.template target<asio::strand<asio::any_io_executor>>();
0460     }
0461 
0462     using executor_type = asio::any_io_executor;
0463     executor_type get_executor() { return original_pool_ex_; }
0464 
0465     void async_run(asio::any_completion_handler<void(error_code)> handler)
0466     {
0467         // Completely disable composed cancellation handling, as it's not what we
0468         // want
0469         auto slot = asio::get_associated_cancellation_slot(handler);
0470         auto token_without_slot = asio::bind_cancellation_slot(asio::cancellation_slot(), std::move(handler));
0471 
0472         // Initiate passing the original token's slot manually
0473         asio::async_compose<decltype(token_without_slot), void(error_code)>(
0474             run_op(shared_from_this_wrapper(), slot),
0475             token_without_slot,
0476             pool_ex_
0477         );
0478     }
0479 
0480     void async_get_connection(
0481         diagnostics* diag,
0482         asio::any_completion_handler<void(error_code, ConnectionWrapper)> handler
0483     )
0484     {
0485         // The slot to pass for cleanup
0486         asio::cancellation_slot parent_slot;
0487 
0488         // The signal pointer. Will only be created if required
0489         std::shared_ptr<asio::cancellation_signal> sig;
0490 
0491         // In thread-safe mode, and if we have a connected slot, create a proxy
0492         // signal that dispatches to the strand
0493         if (params_.thread_safe)
0494         {
0495             parent_slot = asio::get_associated_cancellation_slot(handler);
0496             if (parent_slot.is_connected())
0497             {
0498                 // Create a signal. In rare cases, the memory acquired here may outlive
0499                 // the async operation (e.g. the completion handler runs after the
0500                 // signal is emitted and before the strand dispatch runs). This means we
0501                 // can't use the handler's allocator.
0502                 sig = std::make_shared<asio::cancellation_signal>();
0503 
0504                 // Emplace the handler
0505                 parent_slot.template emplace<get_connection_cancel_handler>(sig, shared_from_this_wrapper());
0506 
0507                 // Bind the handler to the slot
0508                 handler = asio::bind_cancellation_slot(sig->slot(), std::move(handler));
0509             }
0510         }
0511 
0512         // Start
0513         using handler_type = asio::any_completion_handler<void(error_code, ConnectionWrapper)>;
0514         asio::async_compose<handler_type, void(error_code, ConnectionWrapper)>(
0515             get_connection_op(shared_from_this_wrapper(), diag, std::move(sig), parent_slot),
0516             handler,
0517             pool_ex_
0518         );
0519     }
0520 
0521     void cancel()
0522     {
0523         if (params_.thread_safe)
0524         {
0525             // A handler to be passed to dispatch. Binds the executor
0526             // and keeps the pool alive
0527             struct dispatch_handler
0528             {
0529                 std::shared_ptr<this_type> pool_ptr;
0530 
0531                 using executor_type = asio::any_io_executor;
0532                 executor_type get_executor() const noexcept { return pool_ptr->strand(); }
0533 
0534                 void operator()() const { pool_ptr->cancel_unsafe(); }
0535             };
0536 
0537             asio::dispatch(dispatch_handler{shared_from_this_wrapper()});
0538         }
0539         else
0540         {
0541             cancel_unsafe();
0542         }
0543     }
0544 
0545     void return_connection(node_type& node, bool should_reset) noexcept
0546     {
0547         // This is safe to be called from any thread
0548         node.mark_as_collectable(should_reset);
0549 
0550         // The notification isn't thread-safe
0551         if (params_.thread_safe)
0552         {
0553             // A handler to be passed to dispatch. Binds the executor
0554             // and keeps the pool alive
0555             struct dispatch_handler
0556             {
0557                 std::shared_ptr<this_type> pool_ptr;
0558                 node_type* node_ptr;
0559 
0560                 using executor_type = asio::any_io_executor;
0561                 executor_type get_executor() const noexcept { return pool_ptr->strand(); }
0562 
0563                 void operator()() const { node_ptr->notify_collectable(); }
0564             };
0565 
0566             // If, for any reason, this notification fails, the connection will
0567             // be collected when the next ping is due.
0568             try
0569             {
0570                 asio::dispatch(dispatch_handler{shared_from_this_wrapper(), &node});
0571             }
0572             catch (...)
0573             {
0574             }
0575         }
0576         else
0577         {
0578             node.notify_collectable();
0579         }
0580     }
0581 
0582     // Exposed for testing
0583     static bool run_supports_cancel_type(asio::cancellation_type_t v)
0584     {
0585         // run doesn't support total, as the pool state is always modified
0586         return !!(v & (asio::cancellation_type_t::partial | asio::cancellation_type_t::terminal));
0587     }
0588 
0589     static bool get_connection_supports_cancel_type(asio::cancellation_type_t v)
0590     {
0591         // get_connection supports all cancel types
0592         return !!(
0593             v & (asio::cancellation_type_t::partial | asio::cancellation_type_t::total |
0594                  asio::cancellation_type_t::terminal)
0595         );
0596     }
0597 
0598     std::list<node_type>& nodes() noexcept { return all_conns_; }
0599     shared_state_type& shared_state() noexcept { return shared_st_; }
0600     internal_pool_params& params() noexcept { return params_; }
0601     asio::any_io_executor connection_ex() noexcept { return conn_ex_; }
0602     const pipeline_request& reset_pipeline_request() const { return reset_pipeline_req_; }
0603 };
0604 
0605 }  // namespace detail
0606 }  // namespace mysql
0607 }  // namespace boost
0608 
0609 #endif