Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-16 08:29:51

0001 //
0002 // Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_COBALT_DETAIL_RACE_HPP
0009 #define BOOST_COBALT_DETAIL_RACE_HPP
0010 
0011 #include <boost/cobalt/detail/await_result_helper.hpp>
0012 #include <boost/cobalt/detail/fork.hpp>
0013 #include <boost/cobalt/detail/handler.hpp>
0014 #include <boost/cobalt/detail/forward_cancellation.hpp>
0015 #include <boost/cobalt/result.hpp>
0016 #include <boost/cobalt/this_thread.hpp>
0017 #include <boost/cobalt/detail/util.hpp>
0018 
0019 #include <boost/asio/bind_allocator.hpp>
0020 #include <boost/asio/bind_cancellation_slot.hpp>
0021 #include <boost/asio/bind_executor.hpp>
0022 #include <boost/asio/cancellation_signal.hpp>
0023 #include <boost/asio/associated_cancellation_slot.hpp>
0024 #include <boost/core/no_exceptions_support.hpp>
0025 
0026 
0027 #include <boost/intrusive_ptr.hpp>
0028 #include <boost/core/demangle.hpp>
0029 #include <boost/core/span.hpp>
0030 #include <boost/variant2/variant.hpp>
0031 
0032 #include <coroutine>
0033 #include <optional>
0034 #include <algorithm>
0035 
0036 
0037 namespace boost::cobalt::detail
0038 {
0039 
0040 struct left_race_tag {};
0041 
0042 // helpers it determining the type of things;
0043 template<typename Base, // range of aw
0044          typename Awaitable = Base>
0045 struct race_traits
0046 {
0047   // for a ranges race this is based on the range, not the AW in it.
0048   constexpr static bool is_lvalue = std::is_lvalue_reference_v<Base>;
0049 
0050   // what the value is supposed to be cast to before the co_await_operator
0051   using awaitable = std::conditional_t<is_lvalue, std::decay_t<Awaitable> &, Awaitable &&>;
0052 
0053   // do we need operator co_await
0054   constexpr static bool is_actual = awaitable_type<awaitable>;
0055 
0056   // the type with .await_ functions & interrupt_await
0057   using actual_awaitable
0058         = std::conditional_t<
0059             is_actual,
0060               awaitable,
0061               decltype(get_awaitable_type(std::declval<awaitable>()))>;
0062 
0063   // the type to be used with interruptible
0064   using interruptible_type
0065         = std::conditional_t<
0066               std::is_lvalue_reference_v<Base>,
0067               std::decay_t<actual_awaitable> &,
0068               std::decay_t<actual_awaitable> &&>;
0069 
0070   constexpr static bool interruptible =
0071       cobalt::interruptible<interruptible_type>;
0072 
0073   static void do_interrupt(std::decay_t<actual_awaitable> & aw)
0074   {
0075     if constexpr (interruptible)
0076         static_cast<interruptible_type>(aw).interrupt_await();
0077   }
0078 
0079 };
0080 
0081 struct interruptible_base
0082 {
0083   virtual void interrupt_await() = 0;
0084 };
0085 
0086 template<asio::cancellation_type Ct, typename URBG, typename ... Args>
0087 struct race_variadic_impl
0088 {
0089 
0090   template<typename URBG_>
0091   race_variadic_impl(URBG_ && g, Args && ... args)
0092       : args{std::forward<Args>(args)...}, g(std::forward<URBG_>(g))
0093   {
0094   }
0095 
0096   std::tuple<Args...> args;
0097   URBG g;
0098 
0099   constexpr static std::size_t tuple_size = sizeof...(Args);
0100 
0101   struct awaitable : fork::static_shared_state<256 * tuple_size>
0102   {
0103 
0104     boost::source_location loc;
0105 
0106     template<std::size_t ... Idx>
0107     awaitable(std::tuple<Args...> & args, URBG & g, std::index_sequence<Idx...>) :
0108         aws{args}
0109     {
0110       if constexpr (!std::is_same_v<URBG, left_race_tag>)
0111         std::shuffle(impls.begin(), impls.end(), g);
0112       std::fill(working.begin(), working.end(), nullptr);
0113     }
0114 
0115     std::tuple<Args...> & aws;
0116     std::array<asio::cancellation_signal, tuple_size> cancel_;
0117 
0118     template<typename > constexpr static auto make_null() {return nullptr;};
0119     std::array<asio::cancellation_signal*, tuple_size> cancel = {make_null<Args>()...};
0120 
0121     std::array<interruptible_base*, tuple_size> working;
0122 
0123     std::size_t index{std::numeric_limits<std::size_t>::max()};
0124 
0125     constexpr static bool all_void = (std::is_void_v<co_await_result_t<Args>> && ... );
0126     std::optional<variant2::variant<void_as_monostate<co_await_result_t<Args>>...>> result;
0127     std::exception_ptr error;
0128 
0129     bool has_result() const
0130     {
0131       return index != std::numeric_limits<std::size_t>::max();
0132     }
0133 
0134     void cancel_all()
0135     {
0136       interrupt_await();
0137       for (auto i = 0u; i < tuple_size; i++)
0138         if (auto &r = cancel[i]; r)
0139           std::exchange(r, nullptr)->emit(Ct);
0140     }
0141 
0142     void interrupt_await()
0143     {
0144       for (auto i : working)
0145         if (i)
0146           i->interrupt_await();
0147     }
0148 
0149     template<typename T, typename Error>
0150     void assign_error(system::result<T, Error> & res)
0151     BOOST_TRY
0152     {
0153       std::move(res).value(loc);
0154     }
0155     BOOST_CATCH(...)
0156     {
0157       error = std::current_exception();
0158     }
0159     BOOST_CATCH_END
0160 
0161     template<typename T>
0162     void assign_error(system::result<T, std::exception_ptr> & res)
0163     {
0164       error = std::move(res).error();
0165     }
0166 
0167     template<std::size_t Idx>
0168     static detail::fork await_impl(awaitable & this_)
0169     BOOST_TRY
0170     {
0171       using traits = race_traits<mp11::mp_at_c<mp11::mp_list<Args...>, Idx>>;
0172 
0173       typename traits::actual_awaitable aw_{
0174           get_awaitable_type(
0175               static_cast<typename traits::awaitable>(std::get<Idx>(this_.aws))
0176               )
0177       };
0178 
0179       as_result_t aw{aw_};
0180 
0181 
0182       struct interruptor final : interruptible_base
0183       {
0184         std::decay_t<typename traits::actual_awaitable> & aw;
0185         interruptor(std::decay_t<typename traits::actual_awaitable> & aw) : aw(aw) {}
0186         void interrupt_await() override
0187         {
0188           traits::do_interrupt(aw);
0189         }
0190       };
0191       interruptor in{aw_};
0192       //if constexpr (traits::interruptible)
0193         this_.working[Idx] = &in;
0194 
0195       auto transaction = [&this_, idx = Idx] {
0196         if (this_.has_result())
0197           boost::throw_exception(std::runtime_error("Another transaction already started"));
0198         this_.cancel[idx] = nullptr;
0199         // reserve the index early bc
0200         this_.index = idx;
0201         this_.cancel_all();
0202       };
0203 
0204       co_await fork::set_transaction_function(transaction);
0205       // check manually if we're ready
0206       auto rd = aw.await_ready();
0207       if (!rd)
0208       {
0209         this_.cancel[Idx] = &this_.cancel_[Idx];
0210         co_await this_.cancel[Idx]->slot();
0211         // make sure the executor is set
0212         co_await detail::fork::wired_up;
0213 
0214         // do the await - this doesn't call await-ready again
0215         if constexpr (std::is_void_v<decltype(aw_.await_resume())>)
0216         {
0217           auto res = co_await aw;
0218           if (!this_.has_result())
0219           {
0220             this_.index = Idx;
0221             if (res.has_error())
0222               this_.assign_error(res);
0223           }
0224           if constexpr(!all_void)
0225             if (this_.index == Idx && !res.has_error())
0226               this_.result.emplace(variant2::in_place_index<Idx>);
0227         }
0228         else
0229         {
0230           auto val = co_await aw;
0231           if (!this_.has_result())
0232             this_.index = Idx;
0233           if (this_.index == Idx)
0234           {
0235             if (val.has_error())
0236               this_.assign_error(val);
0237             else
0238               this_.result.emplace(variant2::in_place_index<Idx>, *std::move(val));
0239           }
0240         }
0241         this_.cancel[Idx] = nullptr;
0242       }
0243       else
0244       {
0245         if (!this_.has_result())
0246           this_.index = Idx;
0247         if constexpr (std::is_void_v<decltype(aw_.await_resume())>)
0248         {
0249           auto res = aw.await_resume();
0250           if (this_.index == Idx)
0251           {
0252             if (res.has_error())
0253               this_.assign_error(res);
0254             else
0255               this_.result.emplace(variant2::in_place_index<Idx>);
0256           }
0257         }
0258         else
0259         {
0260           if (this_.index == Idx)
0261           {
0262             auto res = aw.await_resume();
0263             if (res.has_error())
0264               this_.assign_error(res);
0265             else
0266               this_.result.emplace(variant2::in_place_index<Idx>, *std::move(res));
0267           }
0268           else
0269             aw.await_resume();
0270         }
0271         this_.cancel[Idx] = nullptr;
0272       }
0273       this_.cancel_all();
0274       this_.working[Idx] = nullptr;
0275     }
0276     BOOST_CATCH(...)
0277     {
0278       if (!this_.has_result())
0279         this_.index = Idx;
0280       if (this_.index == Idx)
0281         this_.error = std::current_exception();
0282       this_.working[Idx] = nullptr;
0283     }
0284     BOOST_CATCH_END
0285 
0286     std::array<detail::fork(*)(awaitable&), tuple_size> impls {
0287         []<std::size_t ... Idx>(std::index_sequence<Idx...>)
0288         {
0289           return std::array<detail::fork(*)(awaitable&), tuple_size>{&await_impl<Idx>...};
0290         }(std::make_index_sequence<tuple_size>{})
0291     };
0292 
0293     detail::fork last_forked;
0294 
0295     bool await_ready()
0296     {
0297       last_forked = impls[0](*this);
0298       return last_forked.done();
0299     }
0300 
0301     template<typename H>
0302     auto await_suspend(
0303         std::coroutine_handle<H> h,
0304         const boost::source_location & loc = BOOST_CURRENT_LOCATION)
0305     {
0306       this->loc = loc;
0307 
0308       this->exec = cobalt::detail::get_executor(h);
0309       last_forked.release().resume();
0310 
0311       if (!this->outstanding_work()) // already done, resume rightaway.
0312         return false;
0313 
0314       for (std::size_t idx = 1u;
0315            idx < tuple_size; idx++) // we'
0316       {
0317         auto l = impls[idx](*this);
0318         const auto d = l.done();
0319         l.release();
0320         if (d)
0321           break;
0322       }
0323 
0324       if (!this->outstanding_work()) // already done, resume rightaway.
0325         return false;
0326 
0327       // arm the cancel
0328       assign_cancellation(
0329           h,
0330           [&](asio::cancellation_type ct)
0331           {
0332             for (auto & cs : cancel)
0333               if (cs)
0334                 cs->emit(ct);
0335           });
0336 
0337       this->coro.reset(h.address());
0338       return true;
0339     }
0340 
0341 #if _MSC_VER
0342     BOOST_NOINLINE
0343 #endif
0344     auto await_resume()
0345     {
0346       if (error)
0347         std::rethrow_exception(error);
0348       if constexpr (all_void)
0349         return index;
0350       else
0351         return std::move(*result);
0352     }
0353 
0354     auto await_resume(const as_tuple_tag &)
0355     {
0356       if constexpr (all_void)
0357         return std::make_tuple(error, index);
0358       else
0359         return std::make_tuple(error, std::move(*result));
0360     }
0361 
0362     auto await_resume(const as_result_tag & )
0363         -> system::result<std::conditional_t<all_void, std::size_t, variant2::variant<void_as_monostate<co_await_result_t<Args>>...>>, std::exception_ptr>
0364     {
0365       if (error)
0366         return {system::in_place_error, error};
0367       if constexpr (all_void)
0368         return {system::in_place_value, index};
0369       else
0370         return {system::in_place_value, std::move(*result)};
0371     }
0372   };
0373   awaitable operator co_await() &&
0374   {
0375     return awaitable{args, g, std::make_index_sequence<tuple_size>{}};
0376   }
0377 };
0378 
0379 
0380 template<asio::cancellation_type Ct, typename URBG, typename Range>
0381 struct race_ranged_impl
0382 {
0383 
0384   using result_type = co_await_result_t<std::decay_t<decltype(*std::begin(std::declval<Range>()))>>;
0385   template<typename URBG_>
0386   race_ranged_impl(URBG_ && g, Range && rng)
0387       : range{std::forward<Range>(rng)}, g(std::forward<URBG_>(g))
0388   {
0389   }
0390 
0391   Range range;
0392   URBG g;
0393 
0394   struct awaitable : fork::shared_state
0395   {
0396 
0397 #if !defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0398     boost::source_location loc;
0399 #endif
0400 
0401     using type = std::decay_t<decltype(*std::begin(std::declval<Range>()))>;
0402     using traits = race_traits<Range, type>;
0403 
0404     std::size_t index{std::numeric_limits<std::size_t>::max()};
0405 
0406     std::conditional_t<
0407         std::is_void_v<result_type>,
0408         variant2::monostate,
0409         std::optional<result_type>> result;
0410 
0411     std::exception_ptr error;
0412 
0413 #if !defined(BOOST_COBALT_NO_PMR)
0414     pmr::polymorphic_allocator<void> alloc{&resource};
0415 
0416     Range &aws;
0417 
0418     struct dummy
0419     {
0420       template<typename ... Args>
0421       dummy(Args && ...) {}
0422     };
0423 
0424     std::conditional_t<traits::interruptible,
0425       pmr::vector<std::decay_t<typename traits::actual_awaitable>*>,
0426       dummy> working{std::size(aws), alloc};
0427 
0428     /* all below `reorder` is reordered
0429      *
0430      * cancel[idx] is for aws[reorder[idx]]
0431     */
0432     pmr::vector<std::size_t> reorder{std::size(aws), alloc};
0433     pmr::vector<asio::cancellation_signal> cancel_{std::size(aws), alloc};
0434     pmr::vector<asio::cancellation_signal*> cancel{std::size(aws), alloc};
0435 
0436 #else
0437     Range &aws;
0438 
0439     struct dummy
0440     {
0441       template<typename ... Args>
0442       dummy(Args && ...) {}
0443     };
0444 
0445     std::conditional_t<traits::interruptible,
0446         std::vector<std::decay_t<typename traits::actual_awaitable>*>,
0447     dummy> working{std::size(aws), std::allocator<void>()};
0448 
0449     /* all below `reorder` is reordered
0450      *
0451      * cancel[idx] is for aws[reorder[idx]]
0452     */
0453     std::vector<std::size_t> reorder{std::size(aws), std::allocator<void>()};
0454     std::vector<asio::cancellation_signal> cancel_{std::size(aws), std::allocator<void>()};
0455     std::vector<asio::cancellation_signal*> cancel{std::size(aws), std::allocator<void>()};
0456 
0457 #endif
0458 
0459     bool has_result() const {return index != std::numeric_limits<std::size_t>::max(); }
0460 
0461 
0462     awaitable(Range & aws, URBG & g)
0463         : fork::shared_state((256 + sizeof(co_awaitable_type<type>) + sizeof(std::size_t)) * std::size(aws))
0464         , aws(aws)
0465     {
0466       std::generate(reorder.begin(), reorder.end(), [i = std::size_t(0u)]() mutable {return i++;});
0467       if constexpr (traits::interruptible)
0468         std::fill(working.begin(), working.end(), nullptr);
0469       if constexpr (!std::is_same_v<URBG, left_race_tag>)
0470         std::shuffle(reorder.begin(), reorder.end(), g);
0471     }
0472 
0473     void cancel_all()
0474     {
0475       interrupt_await();
0476       for (auto & r : cancel)
0477         if (r)
0478           std::exchange(r, nullptr)->emit(Ct);
0479     }
0480     void interrupt_await()
0481     {
0482       if constexpr (traits::interruptible)
0483         for (auto aw : working)
0484           if (aw)
0485             traits::do_interrupt(*aw);
0486     }
0487 
0488 
0489     template<typename T, typename Error>
0490     void assign_error(system::result<T, Error> & res)
0491     BOOST_TRY
0492     {
0493       std::move(res).value(loc);
0494     }
0495     BOOST_CATCH(...)
0496     {
0497       error = std::current_exception();
0498     }
0499     BOOST_CATCH_END
0500 
0501     template<typename T>
0502     void assign_error(system::result<T, std::exception_ptr> & res)
0503     {
0504       error = std::move(res).error();
0505     }
0506 
0507     static detail::fork await_impl(awaitable & this_, std::size_t idx)
0508     BOOST_TRY
0509     {
0510       typename traits::actual_awaitable aw_{
0511           get_awaitable_type(
0512               static_cast<typename traits::awaitable>(*std::next(std::begin(this_.aws), idx))
0513               )};
0514 
0515       as_result_t aw{aw_};
0516 
0517       if constexpr (traits::interruptible)
0518         this_.working[idx] = &aw_;
0519 
0520       auto transaction = [&this_, idx = idx] {
0521         if (this_.has_result())
0522           boost::throw_exception(std::runtime_error("Another transaction already started"));
0523         this_.cancel[idx] = nullptr;
0524         // reserve the index early bc
0525         this_.index = idx;
0526         this_.cancel_all();
0527       };
0528 
0529       co_await fork::set_transaction_function(transaction);
0530       // check manually if we're ready
0531       auto rd = aw.await_ready();
0532       if (!rd)
0533       {
0534         this_.cancel[idx] = &this_.cancel_[idx];
0535         co_await this_.cancel[idx]->slot();
0536         // make sure the executor is set
0537         co_await detail::fork::wired_up;
0538 
0539         // do the await - this doesn't call await-ready again
0540         if constexpr (std::is_void_v<result_type>)
0541         {
0542           auto res = co_await aw;
0543           if (!this_.has_result())
0544           {
0545             if (res.has_error())
0546               this_.assign_error(res);
0547             this_.index = idx;
0548           }
0549         }
0550         else
0551         {
0552           auto val = co_await aw;
0553           if (!this_.has_result())
0554             this_.index = idx;
0555           if (this_.index == idx)
0556           {
0557             if (val.has_error())
0558               this_.assign_error(val);
0559             else
0560               this_.result.emplace(*std::move(val));
0561           }
0562         }
0563         this_.cancel[idx] = nullptr;
0564       }
0565       else
0566       {
0567 
0568         if (!this_.has_result())
0569           this_.index = idx;
0570         if constexpr (std::is_void_v<decltype(aw_.await_resume())>)
0571         {
0572           auto val = aw.await_resume();
0573           if (val.has_error())
0574             this_.assign_error(val);
0575         }
0576         else
0577         {
0578           if (this_.index == idx)
0579           {
0580             auto val = aw.await_resume();
0581             if (val.has_error())
0582               this_.assign_error(val);
0583             else
0584               this_.result.emplace(*std::move(val));
0585           }
0586           else
0587             aw.await_resume();
0588         }
0589         this_.cancel[idx] = nullptr;
0590       }
0591       this_.cancel_all();
0592       if constexpr (traits::interruptible)
0593         this_.working[idx] = nullptr;
0594     }
0595     BOOST_CATCH(...)
0596     {
0597       if (!this_.has_result())
0598         this_.index = idx;
0599       if (this_.index == idx)
0600         this_.error = std::current_exception();
0601       if constexpr (traits::interruptible)
0602         this_.working[idx] = nullptr;
0603     }
0604     BOOST_CATCH_END
0605 
0606     detail::fork last_forked;
0607 
0608     bool await_ready()
0609     {
0610       last_forked = await_impl(*this, reorder.front());
0611       return last_forked.done();
0612     }
0613 
0614     template<typename H>
0615     auto await_suspend(std::coroutine_handle<H> h,
0616                        const boost::source_location & loc = BOOST_CURRENT_LOCATION)
0617     {
0618       this->loc = loc;
0619       this->exec = detail::get_executor(h);
0620       last_forked.release().resume();
0621 
0622       if (!this->outstanding_work()) // already done, resume rightaway.
0623         return false;
0624 
0625       for (auto itr = std::next(reorder.begin());
0626            itr < reorder.end(); std::advance(itr, 1)) // we'
0627       {
0628         auto l = await_impl(*this, *itr);
0629         auto d = l.done();
0630         l.release();
0631         if (d)
0632           break;
0633       }
0634 
0635       if (!this->outstanding_work()) // already done, resume rightaway.
0636         return false;
0637 
0638       // arm the cancel
0639       assign_cancellation(
0640           h,
0641           [&](asio::cancellation_type ct)
0642           {
0643             for (auto & cs : cancel)
0644               if (cs)
0645                 cs->emit(ct);
0646           });
0647 
0648       this->coro.reset(h.address());
0649       return true;
0650     }
0651 
0652 #if _MSC_VER
0653     BOOST_NOINLINE
0654 #endif
0655     auto await_resume()
0656     {
0657       if (error)
0658         std::rethrow_exception(error);
0659       if constexpr (std::is_void_v<result_type>)
0660         return index;
0661       else
0662         return std::make_pair(index, *result);
0663     }
0664 
0665     auto await_resume(const as_tuple_tag &)
0666     {
0667       if constexpr (std::is_void_v<result_type>)
0668         return std::make_tuple(error, index);
0669       else
0670         return std::make_tuple(error, std::make_pair(index, std::move(*result)));
0671     }
0672 
0673     auto await_resume(const as_result_tag & )
0674     -> system::result<std::conditional_t<std::is_void_v<result_type>, std::size_t, std::pair<std::size_t, result_type>>, std::exception_ptr>
0675     {
0676       if (error)
0677         return {system::in_place_error, error};
0678       if constexpr (std::is_void_v<result_type>)
0679         return {system::in_place_value, index};
0680       else
0681         return {system::in_place_value, std::make_pair(index, std::move(*result))};
0682     }
0683 
0684   };
0685   awaitable operator co_await() &&
0686   {
0687     return awaitable{range, g};
0688   }
0689 };
0690 
0691 }
0692 
0693 #endif //BOOST_COBALT_DETAIL_RACE_HPP