Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-15 08:42:55

0001 //
0002 // Copyright (c) 2019-2025 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_CONNECTION_NODE_HPP
0009 #define BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_CONNECTION_NODE_HPP
0010 
0011 #include <boost/mysql/any_connection.hpp>
0012 #include <boost/mysql/character_set.hpp>
0013 #include <boost/mysql/client_errc.hpp>
0014 #include <boost/mysql/diagnostics.hpp>
0015 #include <boost/mysql/error_code.hpp>
0016 #include <boost/mysql/pipeline.hpp>
0017 
0018 #include <boost/mysql/detail/access.hpp>
0019 #include <boost/mysql/detail/connection_pool_fwd.hpp>
0020 
0021 #include <boost/mysql/impl/internal/connection_pool/internal_pool_params.hpp>
0022 #include <boost/mysql/impl/internal/connection_pool/sansio_connection_node.hpp>
0023 
0024 #include <boost/asio/any_io_executor.hpp>
0025 #include <boost/asio/basic_waitable_timer.hpp>
0026 #include <boost/asio/cancel_after.hpp>
0027 #include <boost/asio/compose.hpp>
0028 #include <boost/asio/deferred.hpp>
0029 #include <boost/asio/error.hpp>
0030 #include <boost/intrusive/list.hpp>
0031 #include <boost/intrusive/list_hook.hpp>
0032 
0033 #include <chrono>
0034 #include <utility>
0035 #include <vector>
0036 
0037 namespace boost {
0038 namespace mysql {
0039 namespace detail {
0040 
0041 // State shared between connection tasks
0042 template <class ConnectionType, class ClockType>
0043 struct conn_shared_state
0044 {
0045     // The list of connections that are currently idle. Non-owning.
0046     intrusive::list<basic_connection_node<ConnectionType, ClockType>> idle_list;
0047 
0048     // Timer acting as a condition variable to wait for idle connections
0049     asio::basic_waitable_timer<ClockType> idle_connections_cv;
0050 
0051     // The number of pending connections (currently getting ready).
0052     // Required to compute how many connections we should create at any given point in time.
0053     std::size_t num_pending_connections{0};
0054 
0055     // The number of async_get_connection ops that are waiting for a connection to become available.
0056     // Required to compute how many connections we should create at any given point in time.
0057     std::size_t num_pending_requests{0};
0058 
0059     // Info about the last connection attempt. Already processed, suitable to be used
0060     // as the result of an async_get_connection op
0061     diagnostics last_connect_diag;
0062 
0063     // The number of running connections, to track when they exit
0064     std::size_t num_running_connections{0};
0065 
0066     // Timer acting as a condition variable to wait for all connections to exit
0067     asio::basic_waitable_timer<ClockType> conns_finished_cv;
0068 
0069     conn_shared_state(asio::any_io_executor ex)
0070         : idle_connections_cv(ex, (ClockType::time_point::max)()),
0071           conns_finished_cv(std::move(ex), (ClockType::time_point::max)())
0072     {
0073     }
0074 
0075     void on_connection_start() { ++num_running_connections; }
0076 
0077     void on_connection_finish()
0078     {
0079         if (--num_running_connections == 0u)
0080             conns_finished_cv.expires_at((ClockType::time_point::min)());
0081     }
0082 };
0083 
0084 // The templated type is never exposed to the user. We template
0085 // so tests can inject mocks.
0086 template <class ConnectionType, class ClockType>
0087 class basic_connection_node : public intrusive::list_base_hook<>,
0088                               public sansio_connection_node<basic_connection_node<ConnectionType, ClockType>>
0089 {
0090     using this_type = basic_connection_node<ConnectionType, ClockType>;
0091     using timer_type = asio::basic_waitable_timer<ClockType>;
0092 
0093     // Not thread-safe
0094     const internal_pool_params* params_;
0095     conn_shared_state<ConnectionType, ClockType>* shared_st_;
0096     ConnectionType conn_;
0097     timer_type timer_;
0098     diagnostics connect_diag_;
0099     timer_type collection_timer_;  // Notifications about collections. A separate timer makes potential race
0100                                    // conditions not harmful
0101     const pipeline_request* reset_pipeline_req_;
0102     std::vector<stage_response> reset_pipeline_res_;
0103 
0104     // Thread-safe
0105     std::atomic<collection_state> collection_state_{collection_state::none};
0106 
0107     // Hooks for sansio_connection_node
0108     friend class sansio_connection_node<this_type>;
0109     void entering_idle()
0110     {
0111         shared_st_->idle_list.push_back(*this);
0112         shared_st_->idle_connections_cv.cancel_one();
0113     }
0114     void exiting_idle() { shared_st_->idle_list.erase(shared_st_->idle_list.iterator_to(*this)); }
0115     void entering_pending() { ++shared_st_->num_pending_connections; }
0116     void exiting_pending() { --shared_st_->num_pending_connections; }
0117 
0118     // Helpers
0119     void propagate_connect_diag(error_code ec)
0120     {
0121         shared_st_->last_connect_diag = create_connect_diagnostics(ec, connect_diag_);
0122     }
0123 
0124     template <class Op, class Self>
0125     void run_with_timeout(Op&& op, std::chrono::steady_clock::duration timeout, Self& self)
0126     {
0127         if (timeout.count() > 0)
0128         {
0129             std::forward<Op>(op)(asio::cancel_after(timer_, timeout, std::move(self)));
0130         }
0131         else
0132         {
0133             std::forward<Op>(op)(std::move(self));
0134         }
0135     }
0136 
0137     struct connection_task_op
0138     {
0139         this_type& node_;
0140         next_connection_action last_act_{next_connection_action::none};
0141 
0142         connection_task_op(this_type& node) noexcept : node_(node) {}
0143 
0144         template <class Self>
0145         void operator()(Self& self)
0146         {
0147             // Called when the op starts
0148             node_.shared_st_->on_connection_start();
0149             (*this)(self, error_code());
0150         }
0151 
0152         template <class Self>
0153         void operator()(Self& self, error_code ec)
0154         {
0155             // A collection status may be generated by idle_wait actions
0156             auto col_st = last_act_ == next_connection_action::idle_wait
0157                               ? node_.collection_state_.exchange(collection_state::none)
0158                               : collection_state::none;
0159 
0160             // Connect actions should set the shared diagnostics, so these
0161             // get reported to the user
0162             if (last_act_ == next_connection_action::connect)
0163                 node_.propagate_connect_diag(ec);
0164 
0165             // Invoke the sans-io algorithm
0166             last_act_ = node_.resume(ec, col_st);
0167 
0168             // Apply the next action
0169             switch (last_act_)
0170             {
0171             case next_connection_action::connect:
0172                 node_.run_with_timeout(
0173                     node_.conn_
0174                         .async_connect(node_.params_->connect_config, node_.connect_diag_, asio::deferred),
0175                     node_.params_->connect_timeout,
0176                     self
0177                 );
0178                 break;
0179             case next_connection_action::sleep_connect_failed:
0180                 node_.timer_.expires_after(node_.params_->retry_interval);
0181                 node_.timer_.async_wait(std::move(self));
0182                 break;
0183             case next_connection_action::ping:
0184                 node_.run_with_timeout(
0185                     node_.conn_.async_ping(asio::deferred),
0186                     node_.params_->ping_timeout,
0187                     self
0188                 );
0189                 break;
0190             case next_connection_action::reset:
0191                 node_.run_with_timeout(
0192                     node_.conn_.async_run_pipeline(
0193                         *node_.reset_pipeline_req_,
0194                         node_.reset_pipeline_res_,
0195                         asio::deferred
0196                     ),
0197                     node_.params_->ping_timeout,
0198                     self
0199                 );
0200                 break;
0201             case next_connection_action::idle_wait:
0202                 node_.run_with_timeout(
0203                     node_.collection_timer_.async_wait(asio::deferred),
0204                     node_.params_->ping_interval,
0205                     self
0206                 );
0207                 break;
0208             case next_connection_action::none:
0209                 node_.shared_st_->on_connection_finish();
0210                 self.complete(error_code());
0211                 break;
0212             default: BOOST_ASSERT(false);  // LCOV_EXCL_LINE
0213             }
0214         }
0215     };
0216 
0217 public:
0218     basic_connection_node(
0219         internal_pool_params& params,
0220         boost::asio::any_io_executor pool_ex,
0221         boost::asio::any_io_executor conn_ex,
0222         conn_shared_state<ConnectionType, ClockType>& shared_st,
0223         const pipeline_request* reset_pipeline_req
0224     )
0225         : params_(&params),
0226           shared_st_(&shared_st),
0227           conn_(std::move(conn_ex), params.make_ctor_params()),
0228           timer_(pool_ex),
0229           collection_timer_(pool_ex, (std::chrono::steady_clock::time_point::max)()),
0230           reset_pipeline_req_(reset_pipeline_req)
0231     {
0232     }
0233 
0234     // Not thread-safe
0235     void cancel()
0236     {
0237         sansio_connection_node<this_type>::cancel();
0238         timer_.cancel();
0239         collection_timer_.cancel();
0240     }
0241 
0242     // Not thread-safe
0243     template <class CompletionToken>
0244     auto async_run(CompletionToken&& token)
0245         -> decltype(asio::async_compose<CompletionToken, void(error_code)>(connection_task_op{*this}, token))
0246     {
0247         return asio::async_compose<CompletionToken, void(error_code)>(connection_task_op{*this}, token);
0248     }
0249 
0250     // Not thread-safe
0251     void notify_collectable() { collection_timer_.cancel(); }
0252 
0253     // Thread-safe
0254     void mark_as_collectable(bool should_reset) noexcept
0255     {
0256         collection_state_.store(
0257             should_reset ? collection_state::needs_collect_with_reset : collection_state::needs_collect
0258         );
0259     }
0260 
0261     // Getter, used by pooled_connection
0262     ConnectionType& connection() noexcept { return conn_; }
0263 
0264     // Exposed for testing
0265     collection_state get_collection_state() const noexcept { return collection_state_; }
0266 };
0267 
0268 }  // namespace detail
0269 }  // namespace mysql
0270 }  // namespace boost
0271 
0272 #endif