File indexing completed on 2025-04-19 08:33:49
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_CONNECTION_NODE_HPP
0009 #define BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_CONNECTION_NODE_HPP
0010
0011 #include <boost/mysql/any_connection.hpp>
0012 #include <boost/mysql/character_set.hpp>
0013 #include <boost/mysql/client_errc.hpp>
0014 #include <boost/mysql/diagnostics.hpp>
0015 #include <boost/mysql/error_code.hpp>
0016 #include <boost/mysql/pipeline.hpp>
0017
0018 #include <boost/mysql/detail/connection_pool_fwd.hpp>
0019
0020 #include <boost/mysql/impl/internal/connection_pool/internal_pool_params.hpp>
0021 #include <boost/mysql/impl/internal/connection_pool/run_with_timeout.hpp>
0022 #include <boost/mysql/impl/internal/connection_pool/sansio_connection_node.hpp>
0023 #include <boost/mysql/impl/internal/connection_pool/timer_list.hpp>
0024
0025 #include <boost/asio/any_io_executor.hpp>
0026 #include <boost/asio/compose.hpp>
0027 #include <boost/asio/deferred.hpp>
0028 #include <boost/asio/steady_timer.hpp>
0029 #include <boost/intrusive/list.hpp>
0030 #include <boost/intrusive/list_hook.hpp>
0031
0032 #include <chrono>
0033 #include <utility>
0034 #include <vector>
0035
0036 namespace boost {
0037 namespace mysql {
0038 namespace detail {
0039
0040
0041
0042
0043
0044
0045 struct io_traits
0046 {
0047 using connection_type = any_connection;
0048 using timer_type = asio::steady_timer;
0049 };
0050
0051
0052 template <class IoTraits>
0053 struct conn_shared_state
0054 {
0055 intrusive::list<basic_connection_node<IoTraits>> idle_list;
0056 timer_list<typename IoTraits::timer_type> pending_requests;
0057 std::size_t num_pending_connections{0};
0058 error_code last_ec;
0059 diagnostics last_diag;
0060 };
0061
0062
0063
0064 template <class IoTraits>
0065 class basic_connection_node : public intrusive::list_base_hook<>,
0066 public sansio_connection_node<basic_connection_node<IoTraits>>
0067 {
0068 using this_type = basic_connection_node<IoTraits>;
0069 using connection_type = typename IoTraits::connection_type;
0070 using timer_type = typename IoTraits::timer_type;
0071
0072
0073 const internal_pool_params* params_;
0074 conn_shared_state<IoTraits>* shared_st_;
0075 connection_type conn_;
0076 timer_type timer_;
0077 diagnostics connect_diag_;
0078 timer_type collection_timer_;
0079
0080 const pipeline_request* reset_pipeline_req_;
0081 std::vector<stage_response> reset_pipeline_res_;
0082
0083
0084 std::atomic<collection_state> collection_state_{collection_state::none};
0085
0086
0087 friend class sansio_connection_node<basic_connection_node<IoTraits>>;
0088 void entering_idle()
0089 {
0090 shared_st_->idle_list.push_back(*this);
0091 shared_st_->pending_requests.notify_one();
0092 }
0093 void exiting_idle() { shared_st_->idle_list.erase(shared_st_->idle_list.iterator_to(*this)); }
0094 void entering_pending() { ++shared_st_->num_pending_connections; }
0095 void exiting_pending() { --shared_st_->num_pending_connections; }
0096
0097
0098 void propagate_connect_diag(error_code ec)
0099 {
0100 shared_st_->last_ec = ec;
0101 shared_st_->last_diag = connect_diag_;
0102 }
0103
0104 struct connection_task_op
0105 {
0106 this_type& node_;
0107 next_connection_action last_act_{next_connection_action::none};
0108
0109 connection_task_op(this_type& node) noexcept : node_(node) {}
0110
0111 template <class Self>
0112 void operator()(Self& self, error_code ec = {})
0113 {
0114
0115 auto col_st = last_act_ == next_connection_action::idle_wait
0116 ? node_.collection_state_.exchange(collection_state::none)
0117 : collection_state::none;
0118
0119
0120
0121 if (last_act_ == next_connection_action::connect)
0122 node_.propagate_connect_diag(ec);
0123
0124
0125 last_act_ = node_.resume(ec, col_st);
0126
0127
0128
0129 switch (last_act_)
0130 {
0131 case next_connection_action::connect:
0132 run_with_timeout(
0133 node_.conn_
0134 .async_connect(node_.params_->connect_config, node_.connect_diag_, asio::deferred),
0135 node_.timer_,
0136 node_.params_->connect_timeout,
0137 std::move(self)
0138 );
0139 break;
0140 case next_connection_action::sleep_connect_failed:
0141 node_.timer_.expires_after(node_.params_->retry_interval);
0142 node_.timer_.async_wait(std::move(self));
0143 break;
0144 case next_connection_action::ping:
0145 run_with_timeout(
0146 node_.conn_.async_ping(asio::deferred),
0147 node_.timer_,
0148 node_.params_->ping_timeout,
0149 std::move(self)
0150 );
0151 break;
0152 case next_connection_action::reset:
0153 run_with_timeout(
0154 node_.conn_.async_run_pipeline(
0155 *node_.reset_pipeline_req_,
0156 node_.reset_pipeline_res_,
0157 asio::deferred
0158 ),
0159 node_.timer_,
0160 node_.params_->ping_timeout,
0161 std::move(self)
0162 );
0163 break;
0164 case next_connection_action::idle_wait:
0165 run_with_timeout(
0166 node_.collection_timer_.async_wait(asio::deferred),
0167 node_.timer_,
0168 node_.params_->ping_interval,
0169 std::move(self)
0170 );
0171 break;
0172 case next_connection_action::none: self.complete(error_code()); break;
0173 default: BOOST_ASSERT(false);
0174 }
0175 }
0176 };
0177
0178 public:
0179 basic_connection_node(
0180 internal_pool_params& params,
0181 boost::asio::any_io_executor ex,
0182 boost::asio::any_io_executor conn_ex,
0183 conn_shared_state<IoTraits>& shared_st,
0184 const pipeline_request* reset_pipeline_req
0185 )
0186 : params_(¶ms),
0187 shared_st_(&shared_st),
0188 conn_(std::move(conn_ex), params.make_ctor_params()),
0189 timer_(ex),
0190 collection_timer_(ex, (std::chrono::steady_clock::time_point::max)()),
0191 reset_pipeline_req_(reset_pipeline_req)
0192 {
0193 }
0194
0195 void cancel()
0196 {
0197 sansio_connection_node<this_type>::cancel();
0198 timer_.cancel();
0199 collection_timer_.cancel();
0200 }
0201
0202
0203 template <class CompletionToken>
0204 auto async_run(CompletionToken&& token
0205 ) -> decltype(asio::async_compose<CompletionToken, void(error_code)>(connection_task_op{*this}, token))
0206 {
0207 return asio::async_compose<CompletionToken, void(error_code)>(connection_task_op{*this}, token);
0208 }
0209
0210 connection_type& connection() noexcept { return conn_; }
0211 const connection_type& connection() const noexcept { return conn_; }
0212
0213
0214 void notify_collectable() { collection_timer_.cancel(); }
0215
0216
0217 void mark_as_collectable(bool should_reset) noexcept
0218 {
0219 collection_state_.store(
0220 should_reset ? collection_state::needs_collect_with_reset : collection_state::needs_collect
0221 );
0222 }
0223
0224
0225 collection_state get_collection_state() const noexcept { return collection_state_; }
0226 };
0227
0228 }
0229 }
0230 }
0231
0232 #endif