File indexing completed on 2025-09-15 08:42:55
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_CONNECTION_NODE_HPP
0009 #define BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_CONNECTION_NODE_HPP
0010
0011 #include <boost/mysql/any_connection.hpp>
0012 #include <boost/mysql/character_set.hpp>
0013 #include <boost/mysql/client_errc.hpp>
0014 #include <boost/mysql/diagnostics.hpp>
0015 #include <boost/mysql/error_code.hpp>
0016 #include <boost/mysql/pipeline.hpp>
0017
0018 #include <boost/mysql/detail/access.hpp>
0019 #include <boost/mysql/detail/connection_pool_fwd.hpp>
0020
0021 #include <boost/mysql/impl/internal/connection_pool/internal_pool_params.hpp>
0022 #include <boost/mysql/impl/internal/connection_pool/sansio_connection_node.hpp>
0023
0024 #include <boost/asio/any_io_executor.hpp>
0025 #include <boost/asio/basic_waitable_timer.hpp>
0026 #include <boost/asio/cancel_after.hpp>
0027 #include <boost/asio/compose.hpp>
0028 #include <boost/asio/deferred.hpp>
0029 #include <boost/asio/error.hpp>
0030 #include <boost/intrusive/list.hpp>
0031 #include <boost/intrusive/list_hook.hpp>
0032
0033 #include <chrono>
0034 #include <utility>
0035 #include <vector>
0036
0037 namespace boost {
0038 namespace mysql {
0039 namespace detail {
0040
0041
0042 template <class ConnectionType, class ClockType>
0043 struct conn_shared_state
0044 {
0045
0046 intrusive::list<basic_connection_node<ConnectionType, ClockType>> idle_list;
0047
0048
0049 asio::basic_waitable_timer<ClockType> idle_connections_cv;
0050
0051
0052
0053 std::size_t num_pending_connections{0};
0054
0055
0056
0057 std::size_t num_pending_requests{0};
0058
0059
0060
0061 diagnostics last_connect_diag;
0062
0063
0064 std::size_t num_running_connections{0};
0065
0066
0067 asio::basic_waitable_timer<ClockType> conns_finished_cv;
0068
0069 conn_shared_state(asio::any_io_executor ex)
0070 : idle_connections_cv(ex, (ClockType::time_point::max)()),
0071 conns_finished_cv(std::move(ex), (ClockType::time_point::max)())
0072 {
0073 }
0074
0075 void on_connection_start() { ++num_running_connections; }
0076
0077 void on_connection_finish()
0078 {
0079 if (--num_running_connections == 0u)
0080 conns_finished_cv.expires_at((ClockType::time_point::min)());
0081 }
0082 };
0083
0084
0085
0086 template <class ConnectionType, class ClockType>
0087 class basic_connection_node : public intrusive::list_base_hook<>,
0088 public sansio_connection_node<basic_connection_node<ConnectionType, ClockType>>
0089 {
0090 using this_type = basic_connection_node<ConnectionType, ClockType>;
0091 using timer_type = asio::basic_waitable_timer<ClockType>;
0092
0093
0094 const internal_pool_params* params_;
0095 conn_shared_state<ConnectionType, ClockType>* shared_st_;
0096 ConnectionType conn_;
0097 timer_type timer_;
0098 diagnostics connect_diag_;
0099 timer_type collection_timer_;
0100
0101 const pipeline_request* reset_pipeline_req_;
0102 std::vector<stage_response> reset_pipeline_res_;
0103
0104
0105 std::atomic<collection_state> collection_state_{collection_state::none};
0106
0107
0108 friend class sansio_connection_node<this_type>;
0109 void entering_idle()
0110 {
0111 shared_st_->idle_list.push_back(*this);
0112 shared_st_->idle_connections_cv.cancel_one();
0113 }
0114 void exiting_idle() { shared_st_->idle_list.erase(shared_st_->idle_list.iterator_to(*this)); }
0115 void entering_pending() { ++shared_st_->num_pending_connections; }
0116 void exiting_pending() { --shared_st_->num_pending_connections; }
0117
0118
0119 void propagate_connect_diag(error_code ec)
0120 {
0121 shared_st_->last_connect_diag = create_connect_diagnostics(ec, connect_diag_);
0122 }
0123
0124 template <class Op, class Self>
0125 void run_with_timeout(Op&& op, std::chrono::steady_clock::duration timeout, Self& self)
0126 {
0127 if (timeout.count() > 0)
0128 {
0129 std::forward<Op>(op)(asio::cancel_after(timer_, timeout, std::move(self)));
0130 }
0131 else
0132 {
0133 std::forward<Op>(op)(std::move(self));
0134 }
0135 }
0136
0137 struct connection_task_op
0138 {
0139 this_type& node_;
0140 next_connection_action last_act_{next_connection_action::none};
0141
0142 connection_task_op(this_type& node) noexcept : node_(node) {}
0143
0144 template <class Self>
0145 void operator()(Self& self)
0146 {
0147
0148 node_.shared_st_->on_connection_start();
0149 (*this)(self, error_code());
0150 }
0151
0152 template <class Self>
0153 void operator()(Self& self, error_code ec)
0154 {
0155
0156 auto col_st = last_act_ == next_connection_action::idle_wait
0157 ? node_.collection_state_.exchange(collection_state::none)
0158 : collection_state::none;
0159
0160
0161
0162 if (last_act_ == next_connection_action::connect)
0163 node_.propagate_connect_diag(ec);
0164
0165
0166 last_act_ = node_.resume(ec, col_st);
0167
0168
0169 switch (last_act_)
0170 {
0171 case next_connection_action::connect:
0172 node_.run_with_timeout(
0173 node_.conn_
0174 .async_connect(node_.params_->connect_config, node_.connect_diag_, asio::deferred),
0175 node_.params_->connect_timeout,
0176 self
0177 );
0178 break;
0179 case next_connection_action::sleep_connect_failed:
0180 node_.timer_.expires_after(node_.params_->retry_interval);
0181 node_.timer_.async_wait(std::move(self));
0182 break;
0183 case next_connection_action::ping:
0184 node_.run_with_timeout(
0185 node_.conn_.async_ping(asio::deferred),
0186 node_.params_->ping_timeout,
0187 self
0188 );
0189 break;
0190 case next_connection_action::reset:
0191 node_.run_with_timeout(
0192 node_.conn_.async_run_pipeline(
0193 *node_.reset_pipeline_req_,
0194 node_.reset_pipeline_res_,
0195 asio::deferred
0196 ),
0197 node_.params_->ping_timeout,
0198 self
0199 );
0200 break;
0201 case next_connection_action::idle_wait:
0202 node_.run_with_timeout(
0203 node_.collection_timer_.async_wait(asio::deferred),
0204 node_.params_->ping_interval,
0205 self
0206 );
0207 break;
0208 case next_connection_action::none:
0209 node_.shared_st_->on_connection_finish();
0210 self.complete(error_code());
0211 break;
0212 default: BOOST_ASSERT(false);
0213 }
0214 }
0215 };
0216
0217 public:
0218 basic_connection_node(
0219 internal_pool_params& params,
0220 boost::asio::any_io_executor pool_ex,
0221 boost::asio::any_io_executor conn_ex,
0222 conn_shared_state<ConnectionType, ClockType>& shared_st,
0223 const pipeline_request* reset_pipeline_req
0224 )
0225 : params_(¶ms),
0226 shared_st_(&shared_st),
0227 conn_(std::move(conn_ex), params.make_ctor_params()),
0228 timer_(pool_ex),
0229 collection_timer_(pool_ex, (std::chrono::steady_clock::time_point::max)()),
0230 reset_pipeline_req_(reset_pipeline_req)
0231 {
0232 }
0233
0234
0235 void cancel()
0236 {
0237 sansio_connection_node<this_type>::cancel();
0238 timer_.cancel();
0239 collection_timer_.cancel();
0240 }
0241
0242
0243 template <class CompletionToken>
0244 auto async_run(CompletionToken&& token)
0245 -> decltype(asio::async_compose<CompletionToken, void(error_code)>(connection_task_op{*this}, token))
0246 {
0247 return asio::async_compose<CompletionToken, void(error_code)>(connection_task_op{*this}, token);
0248 }
0249
0250
0251 void notify_collectable() { collection_timer_.cancel(); }
0252
0253
0254 void mark_as_collectable(bool should_reset) noexcept
0255 {
0256 collection_state_.store(
0257 should_reset ? collection_state::needs_collect_with_reset : collection_state::needs_collect
0258 );
0259 }
0260
0261
0262 ConnectionType& connection() noexcept { return conn_; }
0263
0264
0265 collection_state get_collection_state() const noexcept { return collection_state_; }
0266 };
0267
0268 }
0269 }
0270 }
0271
0272 #endif