File indexing completed on 2025-04-17 08:38:58
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_CONNECTION_POOL_IMPL_HPP
0009 #define BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_CONNECTION_POOL_IMPL_HPP
0010
0011 #include <boost/mysql/any_connection.hpp>
0012 #include <boost/mysql/character_set.hpp>
0013 #include <boost/mysql/client_errc.hpp>
0014 #include <boost/mysql/diagnostics.hpp>
0015 #include <boost/mysql/error_code.hpp>
0016 #include <boost/mysql/pool_params.hpp>
0017
0018 #include <boost/mysql/detail/config.hpp>
0019
0020 #include <boost/mysql/impl/internal/connection_pool/connection_node.hpp>
0021 #include <boost/mysql/impl/internal/connection_pool/internal_pool_params.hpp>
0022 #include <boost/mysql/impl/internal/connection_pool/timer_list.hpp>
0023 #include <boost/mysql/impl/internal/connection_pool/wait_group.hpp>
0024 #include <boost/mysql/impl/internal/coroutine.hpp>
0025
0026 #include <boost/asio/any_completion_handler.hpp>
0027 #include <boost/asio/any_io_executor.hpp>
0028 #include <boost/asio/bind_executor.hpp>
0029 #include <boost/asio/compose.hpp>
0030 #include <boost/asio/deferred.hpp>
0031 #include <boost/asio/dispatch.hpp>
0032 #include <boost/asio/error.hpp>
0033 #include <boost/asio/post.hpp>
0034 #include <boost/core/ignore_unused.hpp>
0035
0036 #include <chrono>
0037 #include <cstddef>
0038 #include <list>
0039 #include <memory>
0040
0041 namespace boost {
0042 namespace mysql {
0043 namespace detail {
0044
0045 inline pipeline_request make_reset_pipeline()
0046 {
0047 pipeline_request req;
0048 req.add_reset_connection().add_set_character_set(utf8mb4_charset);
0049 return req;
0050 }
0051
0052
0053
0054 template <class IoTraits, class ConnectionWrapper>
0055 class basic_pool_impl : public std::enable_shared_from_this<basic_pool_impl<IoTraits, ConnectionWrapper>>
0056 {
0057 using this_type = basic_pool_impl<IoTraits, ConnectionWrapper>;
0058 using node_type = basic_connection_node<IoTraits>;
0059 using timer_type = typename IoTraits::timer_type;
0060 using timer_block_type = timer_block<timer_type>;
0061 using shared_state_type = conn_shared_state<IoTraits>;
0062
0063 enum class state_t
0064 {
0065 initial,
0066 running,
0067 cancelled,
0068 };
0069
0070 state_t state_{state_t::initial};
0071 internal_pool_params params_;
0072 asio::any_io_executor ex_;
0073 asio::any_io_executor conn_ex_;
0074 std::list<node_type> all_conns_;
0075 shared_state_type shared_st_;
0076 wait_group wait_gp_;
0077 timer_type cancel_timer_;
0078 const pipeline_request reset_pipeline_req_{make_reset_pipeline()};
0079
0080 std::shared_ptr<this_type> shared_from_this_wrapper()
0081 {
0082
0083 return static_cast<std::enable_shared_from_this<this_type>*>(this)->shared_from_this();
0084 }
0085
0086 void create_connection()
0087 {
0088 all_conns_.emplace_back(params_, ex_, conn_ex_, shared_st_, &reset_pipeline_req_);
0089 wait_gp_.run_task(all_conns_.back().async_run(asio::deferred));
0090 }
0091
0092 error_code get_diagnostics(diagnostics* diag) const
0093 {
0094 if (state_ == state_t::cancelled)
0095 {
0096 return client_errc::cancelled;
0097 }
0098 else if (shared_st_.last_ec)
0099 {
0100 if (diag)
0101 *diag = shared_st_.last_diag;
0102 return shared_st_.last_ec;
0103 }
0104 else
0105 {
0106 return client_errc::timeout;
0107 }
0108 }
0109
0110 struct run_op
0111 {
0112 int resume_point_{0};
0113 std::shared_ptr<this_type> obj_;
0114
0115 run_op(std::shared_ptr<this_type> obj) noexcept : obj_(std::move(obj)) {}
0116
0117 template <class Self>
0118 void operator()(Self& self, error_code ec = {})
0119 {
0120
0121 boost::ignore_unused(ec);
0122 switch (resume_point_)
0123 {
0124 case 0:
0125
0126
0127 BOOST_MYSQL_YIELD(resume_point_, 1, asio::dispatch(obj_->ex_, std::move(self)))
0128
0129
0130 BOOST_ASSERT(obj_->state_ == state_t::initial);
0131 obj_->state_ = state_t::running;
0132
0133
0134 for (std::size_t i = 0; i < obj_->params_.initial_size; ++i)
0135 obj_->create_connection();
0136
0137
0138 BOOST_MYSQL_YIELD(resume_point_, 2, obj_->cancel_timer_.async_wait(std::move(self)))
0139
0140
0141
0142
0143 BOOST_MYSQL_YIELD(resume_point_, 3, asio::dispatch(obj_->ex_, std::move(self)))
0144
0145
0146 obj_->state_ = state_t::cancelled;
0147 for (auto& conn : obj_->all_conns_)
0148 conn.cancel();
0149 obj_->shared_st_.pending_requests.notify_all();
0150
0151
0152 BOOST_MYSQL_YIELD(resume_point_, 4, obj_->wait_gp_.async_wait(std::move(self)))
0153
0154
0155 obj_.reset();
0156 self.complete(error_code());
0157 }
0158 }
0159 };
0160
0161 struct get_connection_op
0162 {
0163 int resume_point_{0};
0164 std::shared_ptr<this_type> obj_;
0165 std::chrono::steady_clock::time_point timeout_;
0166 diagnostics* diag_;
0167 std::unique_ptr<timer_block_type> timer_;
0168 error_code stored_ec_;
0169
0170 get_connection_op(
0171 std::shared_ptr<this_type> obj,
0172 std::chrono::steady_clock::time_point timeout,
0173 diagnostics* diag
0174 ) noexcept
0175 : obj_(std::move(obj)), timeout_(timeout), diag_(diag)
0176 {
0177 }
0178
0179 template <class Self>
0180 void do_complete(Self& self, error_code ec, ConnectionWrapper conn)
0181 {
0182
0183 timer_.reset();
0184 obj_.reset();
0185 self.complete(ec, std::move(conn));
0186 }
0187
0188 template <class Self>
0189 void complete_success(Self& self, node_type& node)
0190 {
0191 node.mark_as_in_use();
0192 do_complete(self, error_code(), ConnectionWrapper(node, std::move(obj_)));
0193 }
0194
0195 template <class Self>
0196 void operator()(Self& self, error_code ec = {})
0197 {
0198 switch (resume_point_)
0199 {
0200 case 0:
0201
0202
0203 if (diag_)
0204 diag_->clear();
0205
0206
0207 BOOST_MYSQL_YIELD(resume_point_, 1, asio::post(obj_->ex_, std::move(self)))
0208
0209
0210
0211 while (true)
0212 {
0213
0214 if (obj_->state_ != state_t::running)
0215 {
0216 do_complete(
0217 self,
0218 obj_->state_ == state_t::initial ? client_errc::pool_not_running
0219 : client_errc::cancelled,
0220 ConnectionWrapper()
0221 );
0222 return;
0223 }
0224
0225
0226 if (!obj_->shared_st_.idle_list.empty())
0227 {
0228
0229 complete_success(self, obj_->shared_st_.idle_list.front());
0230 return;
0231 }
0232
0233
0234
0235
0236
0237 if (obj_->all_conns_.size() < obj_->params_.max_size &&
0238 obj_->shared_st_.num_pending_connections == 0u)
0239 {
0240 obj_->create_connection();
0241 }
0242
0243
0244 if (!timer_)
0245 {
0246 timer_.reset(new timer_block_type(obj_->ex_));
0247 obj_->shared_st_.pending_requests.push_back(*timer_);
0248 }
0249
0250
0251 timer_->timer.expires_at(timeout_);
0252 BOOST_MYSQL_YIELD(resume_point_, 2, timer_->timer.async_wait(std::move(self)))
0253 stored_ec_ = ec;
0254
0255
0256
0257
0258 BOOST_MYSQL_YIELD(resume_point_, 3, asio::dispatch(obj_->ex_, std::move(self)))
0259
0260 if (!stored_ec_)
0261 {
0262
0263 do_complete(self, obj_->get_diagnostics(diag_), ConnectionWrapper());
0264 return;
0265 }
0266 }
0267 }
0268 }
0269 };
0270
0271 public:
0272 basic_pool_impl(pool_executor_params&& ex_params, pool_params&& params)
0273 : params_(make_internal_pool_params(std::move(params))),
0274 ex_(std::move(ex_params.pool_executor)),
0275 conn_ex_(std::move(ex_params.connection_executor)),
0276 wait_gp_(ex_),
0277 cancel_timer_(ex_, (std::chrono::steady_clock::time_point::max)())
0278 {
0279 }
0280
0281 using executor_type = asio::any_io_executor;
0282
0283 executor_type get_executor() { return ex_; }
0284
0285 template <class CompletionToken>
0286 BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void(error_code))
0287 async_run(CompletionToken&& token)
0288 {
0289 return asio::async_compose<CompletionToken, void(error_code)>(
0290 run_op(shared_from_this_wrapper()),
0291 token,
0292 ex_
0293 );
0294 }
0295
0296
0297 void cancel_unsafe() { cancel_timer_.expires_at((std::chrono::steady_clock::time_point::min)()); }
0298
0299 template <class CompletionToken>
0300 BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void(error_code, ConnectionWrapper))
0301 async_get_connection(
0302 std::chrono::steady_clock::time_point timeout,
0303 diagnostics* diag,
0304 CompletionToken&& token
0305 )
0306 {
0307 return asio::async_compose<CompletionToken, void(error_code, ConnectionWrapper)>(
0308 get_connection_op(shared_from_this_wrapper(), timeout, diag),
0309 token,
0310 ex_
0311 );
0312 }
0313
0314 template <class CompletionToken>
0315 BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void(error_code, ConnectionWrapper))
0316 async_get_connection(
0317 std::chrono::steady_clock::duration timeout,
0318 diagnostics* diag,
0319 CompletionToken&& token
0320 )
0321 {
0322 return async_get_connection(
0323 timeout.count() > 0 ? std::chrono::steady_clock::now() + timeout
0324 : (std::chrono::steady_clock::time_point::max)(),
0325 diag,
0326 std::forward<CompletionToken>(token)
0327 );
0328 }
0329
0330
0331 std::list<node_type>& nodes() noexcept { return all_conns_; }
0332 shared_state_type& shared_state() noexcept { return shared_st_; }
0333 internal_pool_params& params() noexcept { return params_; }
0334 asio::any_io_executor connection_ex() noexcept { return conn_ex_; }
0335 const pipeline_request& reset_pipeline_request() const { return reset_pipeline_req_; }
0336 };
0337
0338 }
0339 }
0340 }
0341
0342 #endif