Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-04-19 08:33:49

0001 //
0002 // Copyright (c) 2019-2024 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_CONNECTION_NODE_HPP
0009 #define BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_CONNECTION_NODE_HPP
0010 
0011 #include <boost/mysql/any_connection.hpp>
0012 #include <boost/mysql/character_set.hpp>
0013 #include <boost/mysql/client_errc.hpp>
0014 #include <boost/mysql/diagnostics.hpp>
0015 #include <boost/mysql/error_code.hpp>
0016 #include <boost/mysql/pipeline.hpp>
0017 
0018 #include <boost/mysql/detail/connection_pool_fwd.hpp>
0019 
0020 #include <boost/mysql/impl/internal/connection_pool/internal_pool_params.hpp>
0021 #include <boost/mysql/impl/internal/connection_pool/run_with_timeout.hpp>
0022 #include <boost/mysql/impl/internal/connection_pool/sansio_connection_node.hpp>
0023 #include <boost/mysql/impl/internal/connection_pool/timer_list.hpp>
0024 
0025 #include <boost/asio/any_io_executor.hpp>
0026 #include <boost/asio/compose.hpp>
0027 #include <boost/asio/deferred.hpp>
0028 #include <boost/asio/steady_timer.hpp>
0029 #include <boost/intrusive/list.hpp>
0030 #include <boost/intrusive/list_hook.hpp>
0031 
0032 #include <chrono>
0033 #include <utility>
0034 #include <vector>
0035 
0036 namespace boost {
0037 namespace mysql {
0038 namespace detail {
0039 
0040 // Traits to use by default for nodes. Templating on traits provides
0041 // a way to mock dependencies in tests. Production code only uses
0042 // instantiations that use io_traits.
0043 // Having this as a traits type (as opposed to individual template params)
0044 // allows us to forward-declare io_traits without having to include steady_timer
0045 struct io_traits
0046 {
0047     using connection_type = any_connection;
0048     using timer_type = asio::steady_timer;
0049 };
0050 
0051 // State shared between connection tasks
0052 template <class IoTraits>
0053 struct conn_shared_state
0054 {
0055     intrusive::list<basic_connection_node<IoTraits>> idle_list;
0056     timer_list<typename IoTraits::timer_type> pending_requests;
0057     std::size_t num_pending_connections{0};
0058     error_code last_ec;
0059     diagnostics last_diag;
0060 };
0061 
0062 // The templated type is never exposed to the user. We template
0063 // so tests can inject mocks.
0064 template <class IoTraits>
0065 class basic_connection_node : public intrusive::list_base_hook<>,
0066                               public sansio_connection_node<basic_connection_node<IoTraits>>
0067 {
0068     using this_type = basic_connection_node<IoTraits>;
0069     using connection_type = typename IoTraits::connection_type;
0070     using timer_type = typename IoTraits::timer_type;
0071 
0072     // Not thread-safe, must be manipulated within the pool's executor
0073     const internal_pool_params* params_;
0074     conn_shared_state<IoTraits>* shared_st_;
0075     connection_type conn_;
0076     timer_type timer_;
0077     diagnostics connect_diag_;
0078     timer_type collection_timer_;  // Notifications about collections. A separate timer makes potential race
0079                                    // conditions not harmful
0080     const pipeline_request* reset_pipeline_req_;
0081     std::vector<stage_response> reset_pipeline_res_;
0082 
0083     // Thread-safe
0084     std::atomic<collection_state> collection_state_{collection_state::none};
0085 
0086     // Hooks for sansio_connection_node
0087     friend class sansio_connection_node<basic_connection_node<IoTraits>>;
0088     void entering_idle()
0089     {
0090         shared_st_->idle_list.push_back(*this);
0091         shared_st_->pending_requests.notify_one();
0092     }
0093     void exiting_idle() { shared_st_->idle_list.erase(shared_st_->idle_list.iterator_to(*this)); }
0094     void entering_pending() { ++shared_st_->num_pending_connections; }
0095     void exiting_pending() { --shared_st_->num_pending_connections; }
0096 
0097     // Helpers
0098     void propagate_connect_diag(error_code ec)
0099     {
0100         shared_st_->last_ec = ec;
0101         shared_st_->last_diag = connect_diag_;
0102     }
0103 
0104     struct connection_task_op
0105     {
0106         this_type& node_;
0107         next_connection_action last_act_{next_connection_action::none};
0108 
0109         connection_task_op(this_type& node) noexcept : node_(node) {}
0110 
0111         template <class Self>
0112         void operator()(Self& self, error_code ec = {})
0113         {
0114             // A collection status may be generated by idle_wait actions
0115             auto col_st = last_act_ == next_connection_action::idle_wait
0116                               ? node_.collection_state_.exchange(collection_state::none)
0117                               : collection_state::none;
0118 
0119             // Connect actions should set the shared diagnostics, so these
0120             // get reported to the user
0121             if (last_act_ == next_connection_action::connect)
0122                 node_.propagate_connect_diag(ec);
0123 
0124             // Invoke the sans-io algorithm
0125             last_act_ = node_.resume(ec, col_st);
0126 
0127             // Apply the next action. run_with_timeout makes sure that all handlers
0128             // are dispatched using the timer's executor (that is, the pool executor)
0129             switch (last_act_)
0130             {
0131             case next_connection_action::connect:
0132                 run_with_timeout(
0133                     node_.conn_
0134                         .async_connect(node_.params_->connect_config, node_.connect_diag_, asio::deferred),
0135                     node_.timer_,
0136                     node_.params_->connect_timeout,
0137                     std::move(self)
0138                 );
0139                 break;
0140             case next_connection_action::sleep_connect_failed:
0141                 node_.timer_.expires_after(node_.params_->retry_interval);
0142                 node_.timer_.async_wait(std::move(self));
0143                 break;
0144             case next_connection_action::ping:
0145                 run_with_timeout(
0146                     node_.conn_.async_ping(asio::deferred),
0147                     node_.timer_,
0148                     node_.params_->ping_timeout,
0149                     std::move(self)
0150                 );
0151                 break;
0152             case next_connection_action::reset:
0153                 run_with_timeout(
0154                     node_.conn_.async_run_pipeline(
0155                         *node_.reset_pipeline_req_,
0156                         node_.reset_pipeline_res_,
0157                         asio::deferred
0158                     ),
0159                     node_.timer_,
0160                     node_.params_->ping_timeout,
0161                     std::move(self)
0162                 );
0163                 break;
0164             case next_connection_action::idle_wait:
0165                 run_with_timeout(
0166                     node_.collection_timer_.async_wait(asio::deferred),
0167                     node_.timer_,
0168                     node_.params_->ping_interval,
0169                     std::move(self)
0170                 );
0171                 break;
0172             case next_connection_action::none: self.complete(error_code()); break;
0173             default: BOOST_ASSERT(false);
0174             }
0175         }
0176     };
0177 
0178 public:
0179     basic_connection_node(
0180         internal_pool_params& params,
0181         boost::asio::any_io_executor ex,
0182         boost::asio::any_io_executor conn_ex,
0183         conn_shared_state<IoTraits>& shared_st,
0184         const pipeline_request* reset_pipeline_req
0185     )
0186         : params_(&params),
0187           shared_st_(&shared_st),
0188           conn_(std::move(conn_ex), params.make_ctor_params()),
0189           timer_(ex),
0190           collection_timer_(ex, (std::chrono::steady_clock::time_point::max)()),
0191           reset_pipeline_req_(reset_pipeline_req)
0192     {
0193     }
0194 
0195     void cancel()
0196     {
0197         sansio_connection_node<this_type>::cancel();
0198         timer_.cancel();
0199         collection_timer_.cancel();
0200     }
0201 
0202     // This initiation must be invoked within the pool's executor
0203     template <class CompletionToken>
0204     auto async_run(CompletionToken&& token
0205     ) -> decltype(asio::async_compose<CompletionToken, void(error_code)>(connection_task_op{*this}, token))
0206     {
0207         return asio::async_compose<CompletionToken, void(error_code)>(connection_task_op{*this}, token);
0208     }
0209 
0210     connection_type& connection() noexcept { return conn_; }
0211     const connection_type& connection() const noexcept { return conn_; }
0212 
0213     // Not thread-safe, must be called within the pool's executor
0214     void notify_collectable() { collection_timer_.cancel(); }
0215 
0216     // Thread-safe. May be safely be called from any thread.
0217     void mark_as_collectable(bool should_reset) noexcept
0218     {
0219         collection_state_.store(
0220             should_reset ? collection_state::needs_collect_with_reset : collection_state::needs_collect
0221         );
0222     }
0223 
0224     // Exposed for testing
0225     collection_state get_collection_state() const noexcept { return collection_state_; }
0226 };
0227 
0228 }  // namespace detail
0229 }  // namespace mysql
0230 }  // namespace boost
0231 
0232 #endif