File indexing completed on 2024-11-15 09:04:16
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_COBALT_DETAIL_RACE_HPP
0009 #define BOOST_COBALT_DETAIL_RACE_HPP
0010
0011 #include <boost/cobalt/detail/await_result_helper.hpp>
0012 #include <boost/cobalt/detail/fork.hpp>
0013 #include <boost/cobalt/detail/handler.hpp>
0014 #include <boost/cobalt/detail/forward_cancellation.hpp>
0015 #include <boost/cobalt/result.hpp>
0016 #include <boost/cobalt/this_thread.hpp>
0017 #include <boost/cobalt/detail/util.hpp>
0018
0019 #include <boost/asio/bind_allocator.hpp>
0020 #include <boost/asio/bind_cancellation_slot.hpp>
0021 #include <boost/asio/bind_executor.hpp>
0022 #include <boost/asio/cancellation_signal.hpp>
0023 #include <boost/asio/associated_cancellation_slot.hpp>
0024
0025
0026 #include <boost/intrusive_ptr.hpp>
0027 #include <boost/core/demangle.hpp>
0028 #include <boost/core/span.hpp>
0029 #include <boost/variant2/variant.hpp>
0030
0031 #include <coroutine>
0032 #include <optional>
0033 #include <algorithm>
0034
0035
0036 namespace boost::cobalt::detail
0037 {
0038
0039 struct left_race_tag {};
0040
0041
0042 template<typename Base,
0043 typename Awaitable = Base>
0044 struct race_traits
0045 {
0046
0047 constexpr static bool is_lvalue = std::is_lvalue_reference_v<Base>;
0048
0049
0050 using awaitable = std::conditional_t<is_lvalue, std::decay_t<Awaitable> &, Awaitable &&>;
0051
0052
0053 constexpr static bool is_actual = awaitable_type<awaitable>;
0054
0055
0056 using actual_awaitable
0057 = std::conditional_t<
0058 is_actual,
0059 awaitable,
0060 decltype(get_awaitable_type(std::declval<awaitable>()))>;
0061
0062
0063 using interruptible_type
0064 = std::conditional_t<
0065 std::is_lvalue_reference_v<Base>,
0066 std::decay_t<actual_awaitable> &,
0067 std::decay_t<actual_awaitable> &&>;
0068
0069 constexpr static bool interruptible =
0070 cobalt::interruptible<interruptible_type>;
0071
0072 static void do_interrupt(std::decay_t<actual_awaitable> & aw)
0073 {
0074 if constexpr (interruptible)
0075 static_cast<interruptible_type>(aw).interrupt_await();
0076 }
0077
0078 };
0079
0080 struct interruptible_base
0081 {
0082 virtual void interrupt_await() = 0;
0083 };
0084
0085 template<asio::cancellation_type Ct, typename URBG, typename ... Args>
0086 struct race_variadic_impl
0087 {
0088
0089 template<typename URBG_>
0090 race_variadic_impl(URBG_ && g, Args && ... args)
0091 : args{std::forward<Args>(args)...}, g(std::forward<URBG_>(g))
0092 {
0093 }
0094
0095 std::tuple<Args...> args;
0096 URBG g;
0097
0098 constexpr static std::size_t tuple_size = sizeof...(Args);
0099
0100 struct awaitable : fork::static_shared_state<256 * tuple_size>
0101 {
0102
0103 #if !defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0104 boost::source_location loc;
0105 #endif
0106
0107 template<std::size_t ... Idx>
0108 awaitable(std::tuple<Args...> & args, URBG & g, std::index_sequence<Idx...>) :
0109 aws{args}
0110 {
0111 if constexpr (!std::is_same_v<URBG, left_race_tag>)
0112 std::shuffle(impls.begin(), impls.end(), g);
0113 std::fill(working.begin(), working.end(), nullptr);
0114 }
0115
0116 std::tuple<Args...> & aws;
0117 std::array<asio::cancellation_signal, tuple_size> cancel_;
0118
0119 template<typename > constexpr static auto make_null() {return nullptr;};
0120 std::array<asio::cancellation_signal*, tuple_size> cancel = {make_null<Args>()...};
0121
0122 std::array<interruptible_base*, tuple_size> working;
0123
0124 std::size_t index{std::numeric_limits<std::size_t>::max()};
0125
0126 constexpr static bool all_void = (std::is_void_v<co_await_result_t<Args>> && ... );
0127 std::optional<variant2::variant<void_as_monostate<co_await_result_t<Args>>...>> result;
0128 std::exception_ptr error;
0129
0130 bool has_result() const
0131 {
0132 return index != std::numeric_limits<std::size_t>::max();
0133 }
0134
0135 void cancel_all()
0136 {
0137 interrupt_await();
0138 for (auto i = 0u; i < tuple_size; i++)
0139 if (auto &r = cancel[i]; r)
0140 std::exchange(r, nullptr)->emit(Ct);
0141 }
0142
0143 void interrupt_await()
0144 {
0145 for (auto i : working)
0146 if (i)
0147 i->interrupt_await();
0148 }
0149
0150 template<typename T, typename Error>
0151 void assign_error(system::result<T, Error> & res)
0152 try
0153 {
0154 std::move(res).value(loc);
0155 }
0156 catch(...)
0157 {
0158 error = std::current_exception();
0159 }
0160
0161 template<typename T>
0162 void assign_error(system::result<T, std::exception_ptr> & res)
0163 {
0164 error = std::move(res).error();
0165 }
0166
0167 template<std::size_t Idx>
0168 static detail::fork await_impl(awaitable & this_)
0169 try
0170 {
0171 using traits = race_traits<mp11::mp_at_c<mp11::mp_list<Args...>, Idx>>;
0172
0173 typename traits::actual_awaitable aw_{
0174 get_awaitable_type(
0175 static_cast<typename traits::awaitable>(std::get<Idx>(this_.aws))
0176 )
0177 };
0178
0179 as_result_t aw{aw_};
0180
0181
0182 struct interruptor final : interruptible_base
0183 {
0184 std::decay_t<typename traits::actual_awaitable> & aw;
0185 interruptor(std::decay_t<typename traits::actual_awaitable> & aw) : aw(aw) {}
0186 void interrupt_await() override
0187 {
0188 traits::do_interrupt(aw);
0189 }
0190 };
0191 interruptor in{aw_};
0192
0193 this_.working[Idx] = ∈
0194
0195 auto transaction = [&this_, idx = Idx] {
0196 if (this_.has_result())
0197 boost::throw_exception(std::runtime_error("Another transaction already started"));
0198 this_.cancel[idx] = nullptr;
0199
0200 this_.index = idx;
0201 this_.cancel_all();
0202 };
0203
0204 co_await fork::set_transaction_function(transaction);
0205
0206 auto rd = aw.await_ready();
0207 if (!rd)
0208 {
0209 this_.cancel[Idx] = &this_.cancel_[Idx];
0210 co_await this_.cancel[Idx]->slot();
0211
0212 co_await detail::fork::wired_up;
0213
0214
0215 if constexpr (std::is_void_v<decltype(aw_.await_resume())>)
0216 {
0217 auto res = co_await aw;
0218 if (!this_.has_result())
0219 {
0220 this_.index = Idx;
0221 if (res.has_error())
0222 this_.assign_error(res);
0223 }
0224 if constexpr(!all_void)
0225 if (this_.index == Idx && !res.has_error())
0226 this_.result.emplace(variant2::in_place_index<Idx>);
0227 }
0228 else
0229 {
0230 auto val = co_await aw;
0231 if (!this_.has_result())
0232 this_.index = Idx;
0233 if (this_.index == Idx)
0234 {
0235 if (val.has_error())
0236 this_.assign_error(val);
0237 else
0238 this_.result.emplace(variant2::in_place_index<Idx>, *std::move(val));
0239 }
0240 }
0241 this_.cancel[Idx] = nullptr;
0242 }
0243 else
0244 {
0245 if (!this_.has_result())
0246 this_.index = Idx;
0247 if constexpr (std::is_void_v<decltype(aw_.await_resume())>)
0248 {
0249 auto res = aw.await_resume();
0250 if (this_.index == Idx)
0251 {
0252 if (res.has_error())
0253 this_.assign_error(res);
0254 else
0255 this_.result.emplace(variant2::in_place_index<Idx>);
0256 }
0257 }
0258 else
0259 {
0260 if (this_.index == Idx)
0261 {
0262 auto res = aw.await_resume();
0263 if (res.has_error())
0264 this_.assign_error(res);
0265 else
0266 this_.result.emplace(variant2::in_place_index<Idx>, *std::move(res));
0267 }
0268 else
0269 aw.await_resume();
0270 }
0271 this_.cancel[Idx] = nullptr;
0272 }
0273 this_.cancel_all();
0274 this_.working[Idx] = nullptr;
0275 }
0276 catch(...)
0277 {
0278 if (!this_.has_result())
0279 this_.index = Idx;
0280 if (this_.index == Idx)
0281 this_.error = std::current_exception();
0282 this_.working[Idx] = nullptr;
0283 }
0284
0285 std::array<detail::fork(*)(awaitable&), tuple_size> impls {
0286 []<std::size_t ... Idx>(std::index_sequence<Idx...>)
0287 {
0288 return std::array<detail::fork(*)(awaitable&), tuple_size>{&await_impl<Idx>...};
0289 }(std::make_index_sequence<tuple_size>{})
0290 };
0291
0292 detail::fork last_forked;
0293
0294 bool await_ready()
0295 {
0296 last_forked = impls[0](*this);
0297 return last_forked.done();
0298 }
0299
0300 template<typename H>
0301 auto await_suspend(
0302 std::coroutine_handle<H> h,
0303 const boost::source_location & loc = BOOST_CURRENT_LOCATION)
0304 {
0305 this->loc = loc;
0306
0307 this->exec = &cobalt::detail::get_executor(h);
0308 last_forked.release().resume();
0309
0310 if (!this->outstanding_work())
0311 return false;
0312
0313 for (std::size_t idx = 1u;
0314 idx < tuple_size; idx++)
0315 {
0316 auto l = impls[idx](*this);
0317 const auto d = l.done();
0318 l.release();
0319 if (d)
0320 break;
0321 }
0322
0323 if (!this->outstanding_work())
0324 return false;
0325
0326
0327 assign_cancellation(
0328 h,
0329 [&](asio::cancellation_type ct)
0330 {
0331 for (auto & cs : cancel)
0332 if (cs)
0333 cs->emit(ct);
0334 });
0335
0336 this->coro.reset(h.address());
0337 return true;
0338 }
0339
0340 #if _MSC_VER
0341 BOOST_NOINLINE
0342 #endif
0343 auto await_resume()
0344 {
0345 if (error)
0346 std::rethrow_exception(error);
0347 if constexpr (all_void)
0348 return index;
0349 else
0350 return std::move(*result);
0351 }
0352
0353 auto await_resume(const as_tuple_tag &)
0354 {
0355 if constexpr (all_void)
0356 return std::make_tuple(error, index);
0357 else
0358 return std::make_tuple(error, std::move(*result));
0359 }
0360
0361 auto await_resume(const as_result_tag & )
0362 -> system::result<std::conditional_t<all_void, std::size_t, variant2::variant<void_as_monostate<co_await_result_t<Args>>...>>, std::exception_ptr>
0363 {
0364 if (error)
0365 return {system::in_place_error, error};
0366 if constexpr (all_void)
0367 return {system::in_place_value, index};
0368 else
0369 return {system::in_place_value, std::move(*result)};
0370 }
0371 };
0372 awaitable operator co_await() &&
0373 {
0374 return awaitable{args, g, std::make_index_sequence<tuple_size>{}};
0375 }
0376 };
0377
0378
0379 template<asio::cancellation_type Ct, typename URBG, typename Range>
0380 struct race_ranged_impl
0381 {
0382
0383 using result_type = co_await_result_t<std::decay_t<decltype(*std::begin(std::declval<Range>()))>>;
0384 template<typename URBG_>
0385 race_ranged_impl(URBG_ && g, Range && rng)
0386 : range{std::forward<Range>(rng)}, g(std::forward<URBG_>(g))
0387 {
0388 }
0389
0390 Range range;
0391 URBG g;
0392
0393 struct awaitable : fork::shared_state
0394 {
0395
0396 #if !defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0397 boost::source_location loc;
0398 #endif
0399
0400 using type = std::decay_t<decltype(*std::begin(std::declval<Range>()))>;
0401 using traits = race_traits<Range, type>;
0402
0403 std::size_t index{std::numeric_limits<std::size_t>::max()};
0404
0405 std::conditional_t<
0406 std::is_void_v<result_type>,
0407 variant2::monostate,
0408 std::optional<result_type>> result;
0409
0410 std::exception_ptr error;
0411
0412 #if !defined(BOOST_COBALT_NO_PMR)
0413 pmr::monotonic_buffer_resource res;
0414 pmr::polymorphic_allocator<void> alloc{&resource};
0415
0416 Range &aws;
0417
0418 struct dummy
0419 {
0420 template<typename ... Args>
0421 dummy(Args && ...) {}
0422 };
0423
0424 std::conditional_t<traits::interruptible,
0425 pmr::vector<std::decay_t<typename traits::actual_awaitable>*>,
0426 dummy> working{std::size(aws), alloc};
0427
0428
0429
0430
0431
0432 pmr::vector<std::size_t> reorder{std::size(aws), alloc};
0433 pmr::vector<asio::cancellation_signal> cancel_{std::size(aws), alloc};
0434 pmr::vector<asio::cancellation_signal*> cancel{std::size(aws), alloc};
0435
0436 #else
0437 Range &aws;
0438
0439 struct dummy
0440 {
0441 template<typename ... Args>
0442 dummy(Args && ...) {}
0443 };
0444
0445 std::conditional_t<traits::interruptible,
0446 std::vector<std::decay_t<typename traits::actual_awaitable>*>,
0447 dummy> working{std::size(aws), std::allocator<void>()};
0448
0449
0450
0451
0452
0453 std::vector<std::size_t> reorder{std::size(aws), std::allocator<void>()};
0454 std::vector<asio::cancellation_signal> cancel_{std::size(aws), std::allocator<void>()};
0455 std::vector<asio::cancellation_signal*> cancel{std::size(aws), std::allocator<void>()};
0456
0457 #endif
0458
0459 bool has_result() const {return index != std::numeric_limits<std::size_t>::max(); }
0460
0461
0462 awaitable(Range & aws, URBG & g)
0463 : fork::shared_state((256 + sizeof(co_awaitable_type<type>) + sizeof(std::size_t)) * std::size(aws))
0464 , aws(aws)
0465 {
0466 std::generate(reorder.begin(), reorder.end(), [i = std::size_t(0u)]() mutable {return i++;});
0467 if constexpr (traits::interruptible)
0468 std::fill(working.begin(), working.end(), nullptr);
0469 if constexpr (!std::is_same_v<URBG, left_race_tag>)
0470 std::shuffle(reorder.begin(), reorder.end(), g);
0471 }
0472
0473 void cancel_all()
0474 {
0475 interrupt_await();
0476 for (auto & r : cancel)
0477 if (r)
0478 std::exchange(r, nullptr)->emit(Ct);
0479 }
0480 void interrupt_await()
0481 {
0482 if constexpr (traits::interruptible)
0483 for (auto aw : working)
0484 if (aw)
0485 traits::do_interrupt(*aw);
0486 }
0487
0488
0489 template<typename T, typename Error>
0490 void assign_error(system::result<T, Error> & res)
0491 try
0492 {
0493 std::move(res).value(loc);
0494 }
0495 catch(...)
0496 {
0497 error = std::current_exception();
0498 }
0499
0500 template<typename T>
0501 void assign_error(system::result<T, std::exception_ptr> & res)
0502 {
0503 error = std::move(res).error();
0504 }
0505
0506 static detail::fork await_impl(awaitable & this_, std::size_t idx)
0507 try
0508 {
0509 typename traits::actual_awaitable aw_{
0510 get_awaitable_type(
0511 static_cast<typename traits::awaitable>(*std::next(std::begin(this_.aws), idx))
0512 )};
0513
0514 as_result_t aw{aw_};
0515
0516 if constexpr (traits::interruptible)
0517 this_.working[idx] = &aw_;
0518
0519 auto transaction = [&this_, idx = idx] {
0520 if (this_.has_result())
0521 boost::throw_exception(std::runtime_error("Another transaction already started"));
0522 this_.cancel[idx] = nullptr;
0523
0524 this_.index = idx;
0525 this_.cancel_all();
0526 };
0527
0528 co_await fork::set_transaction_function(transaction);
0529
0530 auto rd = aw.await_ready();
0531 if (!rd)
0532 {
0533 this_.cancel[idx] = &this_.cancel_[idx];
0534 co_await this_.cancel[idx]->slot();
0535
0536 co_await detail::fork::wired_up;
0537
0538
0539 if constexpr (std::is_void_v<result_type>)
0540 {
0541 auto res = co_await aw;
0542 if (!this_.has_result())
0543 {
0544 if (res.has_error())
0545 this_.assign_error(res);
0546 this_.index = idx;
0547 }
0548 }
0549 else
0550 {
0551 auto val = co_await aw;
0552 if (!this_.has_result())
0553 this_.index = idx;
0554 if (this_.index == idx)
0555 {
0556 if (val.has_error())
0557 this_.assign_error(val);
0558 else
0559 this_.result.emplace(*std::move(val));
0560 }
0561 }
0562 this_.cancel[idx] = nullptr;
0563 }
0564 else
0565 {
0566
0567 if (!this_.has_result())
0568 this_.index = idx;
0569 if constexpr (std::is_void_v<decltype(aw_.await_resume())>)
0570 {
0571 auto val = aw.await_resume();
0572 if (val.has_error())
0573 this_.assign_error(val);
0574 }
0575 else
0576 {
0577 if (this_.index == idx)
0578 {
0579 auto val = aw.await_resume();
0580 if (val.has_error())
0581 this_.assign_error(val);
0582 else
0583 this_.result.emplace(*std::move(val));
0584 }
0585 else
0586 aw.await_resume();
0587 }
0588 this_.cancel[idx] = nullptr;
0589 }
0590 this_.cancel_all();
0591 if constexpr (traits::interruptible)
0592 this_.working[idx] = nullptr;
0593 }
0594 catch(...)
0595 {
0596 if (!this_.has_result())
0597 this_.index = idx;
0598 if (this_.index == idx)
0599 this_.error = std::current_exception();
0600 if constexpr (traits::interruptible)
0601 this_.working[idx] = nullptr;
0602 }
0603
0604 detail::fork last_forked;
0605
0606 bool await_ready()
0607 {
0608 last_forked = await_impl(*this, reorder.front());
0609 return last_forked.done();
0610 }
0611
0612 template<typename H>
0613 auto await_suspend(std::coroutine_handle<H> h,
0614 const boost::source_location & loc = BOOST_CURRENT_LOCATION)
0615 {
0616 this->loc = loc;
0617 this->exec = &detail::get_executor(h);
0618 last_forked.release().resume();
0619
0620 if (!this->outstanding_work())
0621 return false;
0622
0623 for (auto itr = std::next(reorder.begin());
0624 itr < reorder.end(); std::advance(itr, 1))
0625 {
0626 auto l = await_impl(*this, *itr);
0627 auto d = l.done();
0628 l.release();
0629 if (d)
0630 break;
0631 }
0632
0633 if (!this->outstanding_work())
0634 return false;
0635
0636
0637 assign_cancellation(
0638 h,
0639 [&](asio::cancellation_type ct)
0640 {
0641 for (auto & cs : cancel)
0642 if (cs)
0643 cs->emit(ct);
0644 });
0645
0646 this->coro.reset(h.address());
0647 return true;
0648 }
0649
0650 #if _MSC_VER
0651 BOOST_NOINLINE
0652 #endif
0653 auto await_resume()
0654 {
0655 if (error)
0656 std::rethrow_exception(error);
0657 if constexpr (std::is_void_v<result_type>)
0658 return index;
0659 else
0660 return std::make_pair(index, *result);
0661 }
0662
0663 auto await_resume(const as_tuple_tag &)
0664 {
0665 if constexpr (std::is_void_v<result_type>)
0666 return std::make_tuple(error, index);
0667 else
0668 return std::make_tuple(error, std::make_pair(index, std::move(*result)));
0669 }
0670
0671 auto await_resume(const as_result_tag & )
0672 -> system::result<result_type, std::exception_ptr>
0673 {
0674 if (error)
0675 return {system::in_place_error, error};
0676 if constexpr (std::is_void_v<result_type>)
0677 return {system::in_place_value, index};
0678 else
0679 return {system::in_place_value, std::make_pair(index, std::move(*result))};
0680 }
0681
0682 };
0683 awaitable operator co_await() &&
0684 {
0685 return awaitable{range, g};
0686 }
0687 };
0688
0689 }
0690
0691 #endif