File indexing completed on 2025-09-17 08:39:15
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_CONNECTION_POOL_IMPL_HPP
0009 #define BOOST_MYSQL_IMPL_INTERNAL_CONNECTION_POOL_CONNECTION_POOL_IMPL_HPP
0010
0011 #include <boost/mysql/any_connection.hpp>
0012 #include <boost/mysql/character_set.hpp>
0013 #include <boost/mysql/client_errc.hpp>
0014 #include <boost/mysql/diagnostics.hpp>
0015 #include <boost/mysql/error_code.hpp>
0016 #include <boost/mysql/pool_params.hpp>
0017
0018 #include <boost/mysql/detail/config.hpp>
0019
0020 #include <boost/mysql/impl/internal/connection_pool/connection_node.hpp>
0021 #include <boost/mysql/impl/internal/connection_pool/internal_pool_params.hpp>
0022 #include <boost/mysql/impl/internal/connection_pool/sansio_connection_node.hpp>
0023 #include <boost/mysql/impl/internal/coroutine.hpp>
0024
0025 #include <boost/asio/any_completion_handler.hpp>
0026 #include <boost/asio/any_io_executor.hpp>
0027 #include <boost/asio/associated_cancellation_slot.hpp>
0028 #include <boost/asio/basic_waitable_timer.hpp>
0029 #include <boost/asio/bind_cancellation_slot.hpp>
0030 #include <boost/asio/bind_executor.hpp>
0031 #include <boost/asio/cancellation_signal.hpp>
0032 #include <boost/asio/cancellation_type.hpp>
0033 #include <boost/asio/compose.hpp>
0034 #include <boost/asio/detached.hpp>
0035 #include <boost/asio/dispatch.hpp>
0036 #include <boost/asio/error.hpp>
0037 #include <boost/asio/immediate.hpp>
0038 #include <boost/asio/post.hpp>
0039 #include <boost/asio/strand.hpp>
0040
0041 #include <chrono>
0042 #include <cstddef>
0043 #include <list>
0044 #include <memory>
0045 #include <utility>
0046
0047 namespace boost {
0048 namespace mysql {
0049 namespace detail {
0050
0051 inline pipeline_request make_reset_pipeline()
0052 {
0053 pipeline_request req;
0054 req.add_reset_connection().add_set_character_set(utf8mb4_charset);
0055 return req;
0056 }
0057
0058
0059
0060 template <class ConnectionType, class ClockType, class ConnectionWrapper>
0061 class basic_pool_impl
0062 : public std::enable_shared_from_this<basic_pool_impl<ConnectionType, ClockType, ConnectionWrapper>>
0063 {
0064 using this_type = basic_pool_impl<ConnectionType, ClockType, ConnectionWrapper>;
0065 using node_type = basic_connection_node<ConnectionType, ClockType>;
0066 using timer_type = asio::basic_waitable_timer<ClockType>;
0067 using shared_state_type = conn_shared_state<ConnectionType, ClockType>;
0068
0069 enum class state_t
0070 {
0071 initial,
0072 running,
0073 cancelled,
0074 };
0075
0076
0077 asio::any_io_executor original_pool_ex_;
0078
0079
0080 asio::any_io_executor pool_ex_;
0081
0082
0083 asio::any_io_executor conn_ex_;
0084
0085
0086 internal_pool_params params_;
0087
0088
0089 state_t state_{state_t::initial};
0090 std::list<node_type> all_conns_;
0091 shared_state_type shared_st_;
0092 timer_type cancel_timer_;
0093 const pipeline_request reset_pipeline_req_{make_reset_pipeline()};
0094
0095 std::shared_ptr<this_type> shared_from_this_wrapper()
0096 {
0097
0098 return static_cast<std::enable_shared_from_this<this_type>*>(this)->shared_from_this();
0099 }
0100
0101
0102 void create_connection()
0103 {
0104
0105 all_conns_.emplace_back(params_, pool_ex_, conn_ex_, shared_st_, &reset_pipeline_req_);
0106 all_conns_.back().async_run(asio::bind_executor(pool_ex_, asio::detached));
0107 }
0108
0109
0110 void create_connections()
0111 {
0112
0113 std::size_t n = num_connections_to_create(
0114 params_.initial_size,
0115 params_.max_size,
0116 all_conns_.size(),
0117 shared_st_.num_pending_connections,
0118 shared_st_.num_pending_requests
0119 );
0120
0121
0122 BOOST_ASSERT((all_conns_.size() + n) <= params_.max_size);
0123 for (std::size_t i = 0; i < n; ++i)
0124 create_connection();
0125 }
0126
0127
0128 void enter_request_pending()
0129 {
0130
0131 ++shared_st_.num_pending_requests;
0132
0133
0134
0135
0136 if (state_ == state_t::running)
0137 create_connections();
0138 }
0139
0140
0141 void exit_request_pending()
0142 {
0143
0144 BOOST_ASSERT(shared_st_.num_pending_requests > 0u);
0145 --shared_st_.num_pending_requests;
0146 }
0147
0148 node_type* try_get_connection()
0149 {
0150 if (!shared_st_.idle_list.empty())
0151 {
0152 node_type& res = shared_st_.idle_list.front();
0153 res.mark_as_in_use();
0154 return &res;
0155 }
0156 else
0157 {
0158 return nullptr;
0159 }
0160 }
0161
0162 template <class OpSelf>
0163 void enter_strand(OpSelf& self)
0164 {
0165 asio::dispatch(asio::bind_executor(pool_ex_, std::move(self)));
0166 }
0167
0168 template <class OpSelf>
0169 void exit_strand(OpSelf& self)
0170 {
0171 asio::post(get_executor(), std::move(self));
0172 }
0173
0174 template <class OpSelf>
0175 void wait_for_connections(OpSelf& self)
0176 {
0177
0178 if (params_.thread_safe)
0179 {
0180 shared_st_.idle_connections_cv.async_wait(asio::bind_executor(pool_ex_, std::move(self)));
0181 }
0182 else
0183 {
0184 shared_st_.idle_connections_cv.async_wait(std::move(self));
0185 }
0186 }
0187
0188 struct run_op
0189 {
0190 int resume_point_{0};
0191 std::shared_ptr<this_type> obj_;
0192 asio::cancellation_slot cancel_slot_;
0193
0194 run_op(std::shared_ptr<this_type> obj, asio::cancellation_slot slot) noexcept
0195 : obj_(std::move(obj)), cancel_slot_(slot)
0196 {
0197 }
0198
0199 struct cancel_handler
0200 {
0201 this_type* self;
0202
0203 void operator()(asio::cancellation_type_t type) const
0204 {
0205 if (run_supports_cancel_type(type))
0206 self->cancel();
0207 }
0208 };
0209
0210 template <class Self>
0211 void operator()(Self& self, error_code = {})
0212 {
0213 switch (resume_point_)
0214 {
0215 case 0:
0216
0217 if (cancel_slot_.is_connected())
0218 {
0219 cancel_slot_.template emplace<cancel_handler>(cancel_handler{obj_.get()});
0220 }
0221
0222
0223 if (obj_->params_.thread_safe)
0224 {
0225 BOOST_MYSQL_YIELD(resume_point_, 1, obj_->enter_strand(self))
0226 }
0227
0228
0229 BOOST_ASSERT(obj_->state_ == state_t::initial);
0230 obj_->state_ = state_t::running;
0231
0232
0233 obj_->create_connections();
0234
0235
0236 BOOST_MYSQL_YIELD(resume_point_, 2, obj_->cancel_timer_.async_wait(std::move(self)))
0237
0238
0239 if (obj_->params_.thread_safe)
0240 {
0241 BOOST_MYSQL_YIELD(resume_point_, 3, obj_->enter_strand(self))
0242 }
0243
0244
0245 obj_->state_ = state_t::cancelled;
0246 for (auto& conn : obj_->all_conns_)
0247 conn.cancel();
0248 obj_->shared_st_.idle_connections_cv.expires_at((ClockType::time_point::min)());
0249
0250
0251 BOOST_MYSQL_YIELD(
0252 resume_point_,
0253 4,
0254 obj_->shared_st_.conns_finished_cv.async_wait(std::move(self))
0255 )
0256
0257
0258 cancel_slot_.clear();
0259 obj_.reset();
0260 self.complete(error_code());
0261 }
0262 }
0263 };
0264
0265 struct get_connection_op
0266 {
0267
0268 std::shared_ptr<this_type> obj;
0269 diagnostics* diag;
0270
0271
0272
0273 std::shared_ptr<asio::cancellation_signal> sig;
0274
0275
0276
0277 asio::cancellation_slot parent_slot;
0278
0279
0280 int resume_point{0};
0281 error_code result_ec;
0282 node_type* result_conn{};
0283 bool has_waited{false};
0284
0285 get_connection_op(
0286 std::shared_ptr<this_type> obj,
0287 diagnostics* diag,
0288 std::shared_ptr<asio::cancellation_signal> sig,
0289 asio::cancellation_slot parent_slot
0290 ) noexcept
0291 : obj(std::move(obj)), diag(diag), sig(std::move(sig)), parent_slot(parent_slot)
0292 {
0293 }
0294
0295 bool thread_safe() const { return obj->params_.thread_safe; }
0296
0297 template <class Self>
0298 void do_complete(Self& self)
0299 {
0300 auto wr = result_ec ? ConnectionWrapper() : ConnectionWrapper(*result_conn, std::move(obj));
0301 parent_slot.clear();
0302 sig.reset();
0303 self.complete(result_ec, std::move(wr));
0304 }
0305
0306 template <class Self>
0307 void operator()(Self& self, error_code = {})
0308 {
0309 switch (resume_point)
0310 {
0311 case 0:
0312
0313
0314 self.reset_cancellation_state(asio::enable_total_cancellation());
0315
0316
0317 if (diag)
0318 diag->clear();
0319
0320
0321 if (thread_safe())
0322 {
0323 BOOST_MYSQL_YIELD(resume_point, 1, obj->enter_strand(self))
0324 }
0325
0326
0327
0328
0329 while (true)
0330 {
0331 if (obj->state_ == state_t::cancelled)
0332 {
0333
0334 result_ec = client_errc::pool_cancelled;
0335 break;
0336 }
0337 else if (get_connection_supports_cancel_type(self.cancelled()))
0338 {
0339
0340 if (obj->state_ == state_t::initial)
0341 {
0342
0343 result_ec = client_errc::pool_not_running;
0344 }
0345 else
0346 {
0347 result_ec = client_errc::no_connection_available;
0348 if (diag)
0349 *diag = obj->shared_st_.last_connect_diag;
0350 }
0351 break;
0352 }
0353
0354
0355 if ((result_conn = obj->try_get_connection()) != nullptr)
0356 {
0357
0358 break;
0359 }
0360
0361
0362 obj->enter_request_pending();
0363
0364
0365 BOOST_MYSQL_YIELD(resume_point, 2, obj->wait_for_connections(self))
0366
0367
0368 obj->exit_request_pending();
0369
0370
0371
0372 has_waited = true;
0373 }
0374
0375
0376 if (thread_safe())
0377 {
0378
0379 BOOST_MYSQL_YIELD(resume_point, 3, obj->exit_strand(self))
0380 }
0381 else if (!has_waited)
0382 {
0383
0384 BOOST_MYSQL_YIELD(
0385 resume_point,
0386 4,
0387 asio::async_immediate(self.get_io_executor(), std::move(self))
0388 )
0389 }
0390
0391
0392 do_complete(self);
0393 }
0394 }
0395 };
0396
0397
0398
0399 struct get_connection_cancel_handler
0400 {
0401
0402
0403 std::weak_ptr<asio::cancellation_signal> sig;
0404
0405
0406 std::weak_ptr<this_type> obj;
0407
0408 get_connection_cancel_handler(
0409 std::weak_ptr<asio::cancellation_signal> sig,
0410 std::weak_ptr<this_type> obj
0411 ) noexcept
0412 : sig(std::move(sig)), obj(std::move(obj))
0413 {
0414 }
0415
0416 void operator()(asio::cancellation_type_t type)
0417 {
0418 if (get_connection_supports_cancel_type(type))
0419 {
0420
0421 std::shared_ptr<this_type> obj_shared = obj.lock();
0422 if (obj_shared)
0423 {
0424
0425
0426
0427 auto sig_copy = sig;
0428 asio::dispatch(asio::bind_executor(obj_shared->strand(), [sig_copy, type]() {
0429
0430
0431 auto sig_shared = sig_copy.lock();
0432 if (sig_shared)
0433 {
0434 sig_shared->emit(type);
0435 }
0436 }));
0437 }
0438 }
0439 }
0440 };
0441
0442
0443 void cancel_unsafe() { cancel_timer_.expires_at((std::chrono::steady_clock::time_point::min)()); }
0444
0445 public:
0446 basic_pool_impl(asio::any_io_executor ex, pool_params&& params)
0447 : original_pool_ex_(std::move(ex)),
0448 pool_ex_(params.thread_safe ? asio::make_strand(original_pool_ex_) : original_pool_ex_),
0449 conn_ex_(params.connection_executor ? std::move(params.connection_executor) : original_pool_ex_),
0450 params_(make_internal_pool_params(std::move(params))),
0451 shared_st_(pool_ex_),
0452 cancel_timer_(pool_ex_, (std::chrono::steady_clock::time_point::max)())
0453 {
0454 }
0455
0456 asio::strand<asio::any_io_executor> strand()
0457 {
0458 BOOST_ASSERT(params_.thread_safe);
0459 return *pool_ex_.template target<asio::strand<asio::any_io_executor>>();
0460 }
0461
0462 using executor_type = asio::any_io_executor;
0463 executor_type get_executor() { return original_pool_ex_; }
0464
0465 void async_run(asio::any_completion_handler<void(error_code)> handler)
0466 {
0467
0468
0469 auto slot = asio::get_associated_cancellation_slot(handler);
0470 auto token_without_slot = asio::bind_cancellation_slot(asio::cancellation_slot(), std::move(handler));
0471
0472
0473 asio::async_compose<decltype(token_without_slot), void(error_code)>(
0474 run_op(shared_from_this_wrapper(), slot),
0475 token_without_slot,
0476 pool_ex_
0477 );
0478 }
0479
0480 void async_get_connection(
0481 diagnostics* diag,
0482 asio::any_completion_handler<void(error_code, ConnectionWrapper)> handler
0483 )
0484 {
0485
0486 asio::cancellation_slot parent_slot;
0487
0488
0489 std::shared_ptr<asio::cancellation_signal> sig;
0490
0491
0492
0493 if (params_.thread_safe)
0494 {
0495 parent_slot = asio::get_associated_cancellation_slot(handler);
0496 if (parent_slot.is_connected())
0497 {
0498
0499
0500
0501
0502 sig = std::make_shared<asio::cancellation_signal>();
0503
0504
0505 parent_slot.template emplace<get_connection_cancel_handler>(sig, shared_from_this_wrapper());
0506
0507
0508 handler = asio::bind_cancellation_slot(sig->slot(), std::move(handler));
0509 }
0510 }
0511
0512
0513 using handler_type = asio::any_completion_handler<void(error_code, ConnectionWrapper)>;
0514 asio::async_compose<handler_type, void(error_code, ConnectionWrapper)>(
0515 get_connection_op(shared_from_this_wrapper(), diag, std::move(sig), parent_slot),
0516 handler,
0517 pool_ex_
0518 );
0519 }
0520
0521 void cancel()
0522 {
0523 if (params_.thread_safe)
0524 {
0525
0526
0527 struct dispatch_handler
0528 {
0529 std::shared_ptr<this_type> pool_ptr;
0530
0531 using executor_type = asio::any_io_executor;
0532 executor_type get_executor() const noexcept { return pool_ptr->strand(); }
0533
0534 void operator()() const { pool_ptr->cancel_unsafe(); }
0535 };
0536
0537 asio::dispatch(dispatch_handler{shared_from_this_wrapper()});
0538 }
0539 else
0540 {
0541 cancel_unsafe();
0542 }
0543 }
0544
0545 void return_connection(node_type& node, bool should_reset) noexcept
0546 {
0547
0548 node.mark_as_collectable(should_reset);
0549
0550
0551 if (params_.thread_safe)
0552 {
0553
0554
0555 struct dispatch_handler
0556 {
0557 std::shared_ptr<this_type> pool_ptr;
0558 node_type* node_ptr;
0559
0560 using executor_type = asio::any_io_executor;
0561 executor_type get_executor() const noexcept { return pool_ptr->strand(); }
0562
0563 void operator()() const { node_ptr->notify_collectable(); }
0564 };
0565
0566
0567
0568 try
0569 {
0570 asio::dispatch(dispatch_handler{shared_from_this_wrapper(), &node});
0571 }
0572 catch (...)
0573 {
0574 }
0575 }
0576 else
0577 {
0578 node.notify_collectable();
0579 }
0580 }
0581
0582
0583 static bool run_supports_cancel_type(asio::cancellation_type_t v)
0584 {
0585
0586 return !!(v & (asio::cancellation_type_t::partial | asio::cancellation_type_t::terminal));
0587 }
0588
0589 static bool get_connection_supports_cancel_type(asio::cancellation_type_t v)
0590 {
0591
0592 return !!(
0593 v & (asio::cancellation_type_t::partial | asio::cancellation_type_t::total |
0594 asio::cancellation_type_t::terminal)
0595 );
0596 }
0597
0598 std::list<node_type>& nodes() noexcept { return all_conns_; }
0599 shared_state_type& shared_state() noexcept { return shared_st_; }
0600 internal_pool_params& params() noexcept { return params_; }
0601 asio::any_io_executor connection_ex() noexcept { return conn_ex_; }
0602 const pipeline_request& reset_pipeline_request() const { return reset_pipeline_req_; }
0603 };
0604
0605 }
0606 }
0607 }
0608
0609 #endif