File indexing completed on 2025-07-01 08:09:57
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_COBALT_DETAIL_RACE_HPP
0009 #define BOOST_COBALT_DETAIL_RACE_HPP
0010
0011 #include <boost/cobalt/detail/await_result_helper.hpp>
0012 #include <boost/cobalt/detail/fork.hpp>
0013 #include <boost/cobalt/detail/handler.hpp>
0014 #include <boost/cobalt/detail/forward_cancellation.hpp>
0015 #include <boost/cobalt/result.hpp>
0016 #include <boost/cobalt/this_thread.hpp>
0017 #include <boost/cobalt/detail/util.hpp>
0018
0019 #include <boost/asio/bind_allocator.hpp>
0020 #include <boost/asio/bind_cancellation_slot.hpp>
0021 #include <boost/asio/bind_executor.hpp>
0022 #include <boost/asio/cancellation_signal.hpp>
0023 #include <boost/asio/associated_cancellation_slot.hpp>
0024 #include <boost/core/no_exceptions_support.hpp>
0025
0026
0027 #include <boost/intrusive_ptr.hpp>
0028 #include <boost/core/demangle.hpp>
0029 #include <boost/core/span.hpp>
0030 #include <boost/variant2/variant.hpp>
0031
0032 #include <coroutine>
0033 #include <optional>
0034 #include <algorithm>
0035
0036
0037 namespace boost::cobalt::detail
0038 {
0039
0040 struct left_race_tag {};
0041
0042
0043 template<typename Base,
0044 typename Awaitable = Base>
0045 struct race_traits
0046 {
0047
0048 constexpr static bool is_lvalue = std::is_lvalue_reference_v<Base>;
0049
0050
0051 using awaitable = std::conditional_t<is_lvalue, std::decay_t<Awaitable> &, Awaitable &&>;
0052
0053
0054 constexpr static bool is_actual = awaitable_type<awaitable>;
0055
0056
0057 using actual_awaitable
0058 = std::conditional_t<
0059 is_actual,
0060 awaitable,
0061 decltype(get_awaitable_type(std::declval<awaitable>()))>;
0062
0063
0064 using interruptible_type
0065 = std::conditional_t<
0066 std::is_lvalue_reference_v<Base>,
0067 std::decay_t<actual_awaitable> &,
0068 std::decay_t<actual_awaitable> &&>;
0069
0070 constexpr static bool interruptible =
0071 cobalt::interruptible<interruptible_type>;
0072
0073 static void do_interrupt(std::decay_t<actual_awaitable> & aw)
0074 {
0075 if constexpr (interruptible)
0076 static_cast<interruptible_type>(aw).interrupt_await();
0077 }
0078
0079 };
0080
0081 struct interruptible_base
0082 {
0083 virtual void interrupt_await() = 0;
0084 };
0085
0086 template<asio::cancellation_type Ct, typename URBG, typename ... Args>
0087 struct race_variadic_impl
0088 {
0089
0090 template<typename URBG_>
0091 race_variadic_impl(URBG_ && g, Args && ... args)
0092 : args{std::forward<Args>(args)...}, g(std::forward<URBG_>(g))
0093 {
0094 }
0095
0096 std::tuple<Args...> args;
0097 URBG g;
0098
0099 constexpr static std::size_t tuple_size = sizeof...(Args);
0100
0101 struct awaitable : fork::static_shared_state<256 * tuple_size>
0102 {
0103
0104 #if !defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0105 boost::source_location loc;
0106 #endif
0107
0108 template<std::size_t ... Idx>
0109 awaitable(std::tuple<Args...> & args, URBG & g, std::index_sequence<Idx...>) :
0110 aws{args}
0111 {
0112 if constexpr (!std::is_same_v<URBG, left_race_tag>)
0113 std::shuffle(impls.begin(), impls.end(), g);
0114 std::fill(working.begin(), working.end(), nullptr);
0115 }
0116
0117 std::tuple<Args...> & aws;
0118 std::array<asio::cancellation_signal, tuple_size> cancel_;
0119
0120 template<typename > constexpr static auto make_null() {return nullptr;};
0121 std::array<asio::cancellation_signal*, tuple_size> cancel = {make_null<Args>()...};
0122
0123 std::array<interruptible_base*, tuple_size> working;
0124
0125 std::size_t index{std::numeric_limits<std::size_t>::max()};
0126
0127 constexpr static bool all_void = (std::is_void_v<co_await_result_t<Args>> && ... );
0128 std::optional<variant2::variant<void_as_monostate<co_await_result_t<Args>>...>> result;
0129 std::exception_ptr error;
0130
0131 bool has_result() const
0132 {
0133 return index != std::numeric_limits<std::size_t>::max();
0134 }
0135
0136 void cancel_all()
0137 {
0138 interrupt_await();
0139 for (auto i = 0u; i < tuple_size; i++)
0140 if (auto &r = cancel[i]; r)
0141 std::exchange(r, nullptr)->emit(Ct);
0142 }
0143
0144 void interrupt_await()
0145 {
0146 for (auto i : working)
0147 if (i)
0148 i->interrupt_await();
0149 }
0150
0151 template<typename T, typename Error>
0152 void assign_error(system::result<T, Error> & res)
0153 BOOST_TRY
0154 {
0155 std::move(res).value(loc);
0156 }
0157 BOOST_CATCH(...)
0158 {
0159 error = std::current_exception();
0160 }
0161 BOOST_CATCH_END
0162
0163 template<typename T>
0164 void assign_error(system::result<T, std::exception_ptr> & res)
0165 {
0166 error = std::move(res).error();
0167 }
0168
0169 template<std::size_t Idx>
0170 static detail::fork await_impl(awaitable & this_)
0171 BOOST_TRY
0172 {
0173 using traits = race_traits<mp11::mp_at_c<mp11::mp_list<Args...>, Idx>>;
0174
0175 typename traits::actual_awaitable aw_{
0176 get_awaitable_type(
0177 static_cast<typename traits::awaitable>(std::get<Idx>(this_.aws))
0178 )
0179 };
0180
0181 as_result_t aw{aw_};
0182
0183
0184 struct interruptor final : interruptible_base
0185 {
0186 std::decay_t<typename traits::actual_awaitable> & aw;
0187 interruptor(std::decay_t<typename traits::actual_awaitable> & aw) : aw(aw) {}
0188 void interrupt_await() override
0189 {
0190 traits::do_interrupt(aw);
0191 }
0192 };
0193 interruptor in{aw_};
0194
0195 this_.working[Idx] = ∈
0196
0197 auto transaction = [&this_, idx = Idx] {
0198 if (this_.has_result())
0199 boost::throw_exception(std::runtime_error("Another transaction already started"));
0200 this_.cancel[idx] = nullptr;
0201
0202 this_.index = idx;
0203 this_.cancel_all();
0204 };
0205
0206 co_await fork::set_transaction_function(transaction);
0207
0208 auto rd = aw.await_ready();
0209 if (!rd)
0210 {
0211 this_.cancel[Idx] = &this_.cancel_[Idx];
0212 co_await this_.cancel[Idx]->slot();
0213
0214 co_await detail::fork::wired_up;
0215
0216
0217 if constexpr (std::is_void_v<decltype(aw_.await_resume())>)
0218 {
0219 auto res = co_await aw;
0220 if (!this_.has_result())
0221 {
0222 this_.index = Idx;
0223 if (res.has_error())
0224 this_.assign_error(res);
0225 }
0226 if constexpr(!all_void)
0227 if (this_.index == Idx && !res.has_error())
0228 this_.result.emplace(variant2::in_place_index<Idx>);
0229 }
0230 else
0231 {
0232 auto val = co_await aw;
0233 if (!this_.has_result())
0234 this_.index = Idx;
0235 if (this_.index == Idx)
0236 {
0237 if (val.has_error())
0238 this_.assign_error(val);
0239 else
0240 this_.result.emplace(variant2::in_place_index<Idx>, *std::move(val));
0241 }
0242 }
0243 this_.cancel[Idx] = nullptr;
0244 }
0245 else
0246 {
0247 if (!this_.has_result())
0248 this_.index = Idx;
0249 if constexpr (std::is_void_v<decltype(aw_.await_resume())>)
0250 {
0251 auto res = aw.await_resume();
0252 if (this_.index == Idx)
0253 {
0254 if (res.has_error())
0255 this_.assign_error(res);
0256 else
0257 this_.result.emplace(variant2::in_place_index<Idx>);
0258 }
0259 }
0260 else
0261 {
0262 if (this_.index == Idx)
0263 {
0264 auto res = aw.await_resume();
0265 if (res.has_error())
0266 this_.assign_error(res);
0267 else
0268 this_.result.emplace(variant2::in_place_index<Idx>, *std::move(res));
0269 }
0270 else
0271 aw.await_resume();
0272 }
0273 this_.cancel[Idx] = nullptr;
0274 }
0275 this_.cancel_all();
0276 this_.working[Idx] = nullptr;
0277 }
0278 BOOST_CATCH(...)
0279 {
0280 if (!this_.has_result())
0281 this_.index = Idx;
0282 if (this_.index == Idx)
0283 this_.error = std::current_exception();
0284 this_.working[Idx] = nullptr;
0285 }
0286 BOOST_CATCH_END
0287
0288 std::array<detail::fork(*)(awaitable&), tuple_size> impls {
0289 []<std::size_t ... Idx>(std::index_sequence<Idx...>)
0290 {
0291 return std::array<detail::fork(*)(awaitable&), tuple_size>{&await_impl<Idx>...};
0292 }(std::make_index_sequence<tuple_size>{})
0293 };
0294
0295 detail::fork last_forked;
0296
0297 bool await_ready()
0298 {
0299 last_forked = impls[0](*this);
0300 return last_forked.done();
0301 }
0302
0303 template<typename H>
0304 auto await_suspend(
0305 std::coroutine_handle<H> h,
0306 const boost::source_location & loc = BOOST_CURRENT_LOCATION)
0307 {
0308 this->loc = loc;
0309
0310 this->exec = &cobalt::detail::get_executor(h);
0311 last_forked.release().resume();
0312
0313 if (!this->outstanding_work())
0314 return false;
0315
0316 for (std::size_t idx = 1u;
0317 idx < tuple_size; idx++)
0318 {
0319 auto l = impls[idx](*this);
0320 const auto d = l.done();
0321 l.release();
0322 if (d)
0323 break;
0324 }
0325
0326 if (!this->outstanding_work())
0327 return false;
0328
0329
0330 assign_cancellation(
0331 h,
0332 [&](asio::cancellation_type ct)
0333 {
0334 for (auto & cs : cancel)
0335 if (cs)
0336 cs->emit(ct);
0337 });
0338
0339 this->coro.reset(h.address());
0340 return true;
0341 }
0342
0343 #if _MSC_VER
0344 BOOST_NOINLINE
0345 #endif
0346 auto await_resume()
0347 {
0348 if (error)
0349 std::rethrow_exception(error);
0350 if constexpr (all_void)
0351 return index;
0352 else
0353 return std::move(*result);
0354 }
0355
0356 auto await_resume(const as_tuple_tag &)
0357 {
0358 if constexpr (all_void)
0359 return std::make_tuple(error, index);
0360 else
0361 return std::make_tuple(error, std::move(*result));
0362 }
0363
0364 auto await_resume(const as_result_tag & )
0365 -> system::result<std::conditional_t<all_void, std::size_t, variant2::variant<void_as_monostate<co_await_result_t<Args>>...>>, std::exception_ptr>
0366 {
0367 if (error)
0368 return {system::in_place_error, error};
0369 if constexpr (all_void)
0370 return {system::in_place_value, index};
0371 else
0372 return {system::in_place_value, std::move(*result)};
0373 }
0374 };
0375 awaitable operator co_await() &&
0376 {
0377 return awaitable{args, g, std::make_index_sequence<tuple_size>{}};
0378 }
0379 };
0380
0381
0382 template<asio::cancellation_type Ct, typename URBG, typename Range>
0383 struct race_ranged_impl
0384 {
0385
0386 using result_type = co_await_result_t<std::decay_t<decltype(*std::begin(std::declval<Range>()))>>;
0387 template<typename URBG_>
0388 race_ranged_impl(URBG_ && g, Range && rng)
0389 : range{std::forward<Range>(rng)}, g(std::forward<URBG_>(g))
0390 {
0391 }
0392
0393 Range range;
0394 URBG g;
0395
0396 struct awaitable : fork::shared_state
0397 {
0398
0399 #if !defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0400 boost::source_location loc;
0401 #endif
0402
0403 using type = std::decay_t<decltype(*std::begin(std::declval<Range>()))>;
0404 using traits = race_traits<Range, type>;
0405
0406 std::size_t index{std::numeric_limits<std::size_t>::max()};
0407
0408 std::conditional_t<
0409 std::is_void_v<result_type>,
0410 variant2::monostate,
0411 std::optional<result_type>> result;
0412
0413 std::exception_ptr error;
0414
0415 #if !defined(BOOST_COBALT_NO_PMR)
0416 pmr::monotonic_buffer_resource res;
0417 pmr::polymorphic_allocator<void> alloc{&resource};
0418
0419 Range &aws;
0420
0421 struct dummy
0422 {
0423 template<typename ... Args>
0424 dummy(Args && ...) {}
0425 };
0426
0427 std::conditional_t<traits::interruptible,
0428 pmr::vector<std::decay_t<typename traits::actual_awaitable>*>,
0429 dummy> working{std::size(aws), alloc};
0430
0431
0432
0433
0434
0435 pmr::vector<std::size_t> reorder{std::size(aws), alloc};
0436 pmr::vector<asio::cancellation_signal> cancel_{std::size(aws), alloc};
0437 pmr::vector<asio::cancellation_signal*> cancel{std::size(aws), alloc};
0438
0439 #else
0440 Range &aws;
0441
0442 struct dummy
0443 {
0444 template<typename ... Args>
0445 dummy(Args && ...) {}
0446 };
0447
0448 std::conditional_t<traits::interruptible,
0449 std::vector<std::decay_t<typename traits::actual_awaitable>*>,
0450 dummy> working{std::size(aws), std::allocator<void>()};
0451
0452
0453
0454
0455
0456 std::vector<std::size_t> reorder{std::size(aws), std::allocator<void>()};
0457 std::vector<asio::cancellation_signal> cancel_{std::size(aws), std::allocator<void>()};
0458 std::vector<asio::cancellation_signal*> cancel{std::size(aws), std::allocator<void>()};
0459
0460 #endif
0461
0462 bool has_result() const {return index != std::numeric_limits<std::size_t>::max(); }
0463
0464
0465 awaitable(Range & aws, URBG & g)
0466 : fork::shared_state((256 + sizeof(co_awaitable_type<type>) + sizeof(std::size_t)) * std::size(aws))
0467 , aws(aws)
0468 {
0469 std::generate(reorder.begin(), reorder.end(), [i = std::size_t(0u)]() mutable {return i++;});
0470 if constexpr (traits::interruptible)
0471 std::fill(working.begin(), working.end(), nullptr);
0472 if constexpr (!std::is_same_v<URBG, left_race_tag>)
0473 std::shuffle(reorder.begin(), reorder.end(), g);
0474 }
0475
0476 void cancel_all()
0477 {
0478 interrupt_await();
0479 for (auto & r : cancel)
0480 if (r)
0481 std::exchange(r, nullptr)->emit(Ct);
0482 }
0483 void interrupt_await()
0484 {
0485 if constexpr (traits::interruptible)
0486 for (auto aw : working)
0487 if (aw)
0488 traits::do_interrupt(*aw);
0489 }
0490
0491
0492 template<typename T, typename Error>
0493 void assign_error(system::result<T, Error> & res)
0494 BOOST_TRY
0495 {
0496 std::move(res).value(loc);
0497 }
0498 BOOST_CATCH(...)
0499 {
0500 error = std::current_exception();
0501 }
0502 BOOST_CATCH_END
0503
0504 template<typename T>
0505 void assign_error(system::result<T, std::exception_ptr> & res)
0506 {
0507 error = std::move(res).error();
0508 }
0509
0510 static detail::fork await_impl(awaitable & this_, std::size_t idx)
0511 BOOST_TRY
0512 {
0513 typename traits::actual_awaitable aw_{
0514 get_awaitable_type(
0515 static_cast<typename traits::awaitable>(*std::next(std::begin(this_.aws), idx))
0516 )};
0517
0518 as_result_t aw{aw_};
0519
0520 if constexpr (traits::interruptible)
0521 this_.working[idx] = &aw_;
0522
0523 auto transaction = [&this_, idx = idx] {
0524 if (this_.has_result())
0525 boost::throw_exception(std::runtime_error("Another transaction already started"));
0526 this_.cancel[idx] = nullptr;
0527
0528 this_.index = idx;
0529 this_.cancel_all();
0530 };
0531
0532 co_await fork::set_transaction_function(transaction);
0533
0534 auto rd = aw.await_ready();
0535 if (!rd)
0536 {
0537 this_.cancel[idx] = &this_.cancel_[idx];
0538 co_await this_.cancel[idx]->slot();
0539
0540 co_await detail::fork::wired_up;
0541
0542
0543 if constexpr (std::is_void_v<result_type>)
0544 {
0545 auto res = co_await aw;
0546 if (!this_.has_result())
0547 {
0548 if (res.has_error())
0549 this_.assign_error(res);
0550 this_.index = idx;
0551 }
0552 }
0553 else
0554 {
0555 auto val = co_await aw;
0556 if (!this_.has_result())
0557 this_.index = idx;
0558 if (this_.index == idx)
0559 {
0560 if (val.has_error())
0561 this_.assign_error(val);
0562 else
0563 this_.result.emplace(*std::move(val));
0564 }
0565 }
0566 this_.cancel[idx] = nullptr;
0567 }
0568 else
0569 {
0570
0571 if (!this_.has_result())
0572 this_.index = idx;
0573 if constexpr (std::is_void_v<decltype(aw_.await_resume())>)
0574 {
0575 auto val = aw.await_resume();
0576 if (val.has_error())
0577 this_.assign_error(val);
0578 }
0579 else
0580 {
0581 if (this_.index == idx)
0582 {
0583 auto val = aw.await_resume();
0584 if (val.has_error())
0585 this_.assign_error(val);
0586 else
0587 this_.result.emplace(*std::move(val));
0588 }
0589 else
0590 aw.await_resume();
0591 }
0592 this_.cancel[idx] = nullptr;
0593 }
0594 this_.cancel_all();
0595 if constexpr (traits::interruptible)
0596 this_.working[idx] = nullptr;
0597 }
0598 BOOST_CATCH(...)
0599 {
0600 if (!this_.has_result())
0601 this_.index = idx;
0602 if (this_.index == idx)
0603 this_.error = std::current_exception();
0604 if constexpr (traits::interruptible)
0605 this_.working[idx] = nullptr;
0606 }
0607 BOOST_CATCH_END
0608
0609 detail::fork last_forked;
0610
0611 bool await_ready()
0612 {
0613 last_forked = await_impl(*this, reorder.front());
0614 return last_forked.done();
0615 }
0616
0617 template<typename H>
0618 auto await_suspend(std::coroutine_handle<H> h,
0619 const boost::source_location & loc = BOOST_CURRENT_LOCATION)
0620 {
0621 this->loc = loc;
0622 this->exec = &detail::get_executor(h);
0623 last_forked.release().resume();
0624
0625 if (!this->outstanding_work())
0626 return false;
0627
0628 for (auto itr = std::next(reorder.begin());
0629 itr < reorder.end(); std::advance(itr, 1))
0630 {
0631 auto l = await_impl(*this, *itr);
0632 auto d = l.done();
0633 l.release();
0634 if (d)
0635 break;
0636 }
0637
0638 if (!this->outstanding_work())
0639 return false;
0640
0641
0642 assign_cancellation(
0643 h,
0644 [&](asio::cancellation_type ct)
0645 {
0646 for (auto & cs : cancel)
0647 if (cs)
0648 cs->emit(ct);
0649 });
0650
0651 this->coro.reset(h.address());
0652 return true;
0653 }
0654
0655 #if _MSC_VER
0656 BOOST_NOINLINE
0657 #endif
0658 auto await_resume()
0659 {
0660 if (error)
0661 std::rethrow_exception(error);
0662 if constexpr (std::is_void_v<result_type>)
0663 return index;
0664 else
0665 return std::make_pair(index, *result);
0666 }
0667
0668 auto await_resume(const as_tuple_tag &)
0669 {
0670 if constexpr (std::is_void_v<result_type>)
0671 return std::make_tuple(error, index);
0672 else
0673 return std::make_tuple(error, std::make_pair(index, std::move(*result)));
0674 }
0675
0676 auto await_resume(const as_result_tag & )
0677 -> system::result<result_type, std::exception_ptr>
0678 {
0679 if (error)
0680 return {system::in_place_error, error};
0681 if constexpr (std::is_void_v<result_type>)
0682 return {system::in_place_value, index};
0683 else
0684 return {system::in_place_value, std::make_pair(index, std::move(*result))};
0685 }
0686
0687 };
0688 awaitable operator co_await() &&
0689 {
0690 return awaitable{range, g};
0691 }
0692 };
0693
0694 }
0695
0696 #endif