Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-07-01 08:09:57

0001 //
0002 // Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_COBALT_DETAIL_RACE_HPP
0009 #define BOOST_COBALT_DETAIL_RACE_HPP
0010 
0011 #include <boost/cobalt/detail/await_result_helper.hpp>
0012 #include <boost/cobalt/detail/fork.hpp>
0013 #include <boost/cobalt/detail/handler.hpp>
0014 #include <boost/cobalt/detail/forward_cancellation.hpp>
0015 #include <boost/cobalt/result.hpp>
0016 #include <boost/cobalt/this_thread.hpp>
0017 #include <boost/cobalt/detail/util.hpp>
0018 
0019 #include <boost/asio/bind_allocator.hpp>
0020 #include <boost/asio/bind_cancellation_slot.hpp>
0021 #include <boost/asio/bind_executor.hpp>
0022 #include <boost/asio/cancellation_signal.hpp>
0023 #include <boost/asio/associated_cancellation_slot.hpp>
0024 #include <boost/core/no_exceptions_support.hpp>
0025 
0026 
0027 #include <boost/intrusive_ptr.hpp>
0028 #include <boost/core/demangle.hpp>
0029 #include <boost/core/span.hpp>
0030 #include <boost/variant2/variant.hpp>
0031 
0032 #include <coroutine>
0033 #include <optional>
0034 #include <algorithm>
0035 
0036 
0037 namespace boost::cobalt::detail
0038 {
0039 
0040 struct left_race_tag {};
0041 
0042 // helpers it determining the type of things;
0043 template<typename Base, // range of aw
0044          typename Awaitable = Base>
0045 struct race_traits
0046 {
0047   // for a ranges race this is based on the range, not the AW in it.
0048   constexpr static bool is_lvalue = std::is_lvalue_reference_v<Base>;
0049 
0050   // what the value is supposed to be cast to before the co_await_operator
0051   using awaitable = std::conditional_t<is_lvalue, std::decay_t<Awaitable> &, Awaitable &&>;
0052 
0053   // do we need operator co_await
0054   constexpr static bool is_actual = awaitable_type<awaitable>;
0055 
0056   // the type with .await_ functions & interrupt_await
0057   using actual_awaitable
0058         = std::conditional_t<
0059             is_actual,
0060               awaitable,
0061               decltype(get_awaitable_type(std::declval<awaitable>()))>;
0062 
0063   // the type to be used with interruptible
0064   using interruptible_type
0065         = std::conditional_t<
0066               std::is_lvalue_reference_v<Base>,
0067               std::decay_t<actual_awaitable> &,
0068               std::decay_t<actual_awaitable> &&>;
0069 
0070   constexpr static bool interruptible =
0071       cobalt::interruptible<interruptible_type>;
0072 
0073   static void do_interrupt(std::decay_t<actual_awaitable> & aw)
0074   {
0075     if constexpr (interruptible)
0076         static_cast<interruptible_type>(aw).interrupt_await();
0077   }
0078 
0079 };
0080 
0081 struct interruptible_base
0082 {
0083   virtual void interrupt_await() = 0;
0084 };
0085 
0086 template<asio::cancellation_type Ct, typename URBG, typename ... Args>
0087 struct race_variadic_impl
0088 {
0089 
0090   template<typename URBG_>
0091   race_variadic_impl(URBG_ && g, Args && ... args)
0092       : args{std::forward<Args>(args)...}, g(std::forward<URBG_>(g))
0093   {
0094   }
0095 
0096   std::tuple<Args...> args;
0097   URBG g;
0098 
0099   constexpr static std::size_t tuple_size = sizeof...(Args);
0100 
0101   struct awaitable : fork::static_shared_state<256 * tuple_size>
0102   {
0103 
0104 #if !defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0105     boost::source_location loc;
0106 #endif
0107 
0108     template<std::size_t ... Idx>
0109     awaitable(std::tuple<Args...> & args, URBG & g, std::index_sequence<Idx...>) :
0110         aws{args}
0111     {
0112       if constexpr (!std::is_same_v<URBG, left_race_tag>)
0113         std::shuffle(impls.begin(), impls.end(), g);
0114       std::fill(working.begin(), working.end(), nullptr);
0115     }
0116 
0117     std::tuple<Args...> & aws;
0118     std::array<asio::cancellation_signal, tuple_size> cancel_;
0119 
0120     template<typename > constexpr static auto make_null() {return nullptr;};
0121     std::array<asio::cancellation_signal*, tuple_size> cancel = {make_null<Args>()...};
0122 
0123     std::array<interruptible_base*, tuple_size> working;
0124 
0125     std::size_t index{std::numeric_limits<std::size_t>::max()};
0126 
0127     constexpr static bool all_void = (std::is_void_v<co_await_result_t<Args>> && ... );
0128     std::optional<variant2::variant<void_as_monostate<co_await_result_t<Args>>...>> result;
0129     std::exception_ptr error;
0130 
0131     bool has_result() const
0132     {
0133       return index != std::numeric_limits<std::size_t>::max();
0134     }
0135 
0136     void cancel_all()
0137     {
0138       interrupt_await();
0139       for (auto i = 0u; i < tuple_size; i++)
0140         if (auto &r = cancel[i]; r)
0141           std::exchange(r, nullptr)->emit(Ct);
0142     }
0143 
0144     void interrupt_await()
0145     {
0146       for (auto i : working)
0147         if (i)
0148           i->interrupt_await();
0149     }
0150 
0151     template<typename T, typename Error>
0152     void assign_error(system::result<T, Error> & res)
0153     BOOST_TRY
0154     {
0155       std::move(res).value(loc);
0156     }
0157     BOOST_CATCH(...)
0158     {
0159       error = std::current_exception();
0160     }
0161     BOOST_CATCH_END
0162 
0163     template<typename T>
0164     void assign_error(system::result<T, std::exception_ptr> & res)
0165     {
0166       error = std::move(res).error();
0167     }
0168 
0169     template<std::size_t Idx>
0170     static detail::fork await_impl(awaitable & this_)
0171     BOOST_TRY
0172     {
0173       using traits = race_traits<mp11::mp_at_c<mp11::mp_list<Args...>, Idx>>;
0174 
0175       typename traits::actual_awaitable aw_{
0176           get_awaitable_type(
0177               static_cast<typename traits::awaitable>(std::get<Idx>(this_.aws))
0178               )
0179       };
0180 
0181       as_result_t aw{aw_};
0182 
0183 
0184       struct interruptor final : interruptible_base
0185       {
0186         std::decay_t<typename traits::actual_awaitable> & aw;
0187         interruptor(std::decay_t<typename traits::actual_awaitable> & aw) : aw(aw) {}
0188         void interrupt_await() override
0189         {
0190           traits::do_interrupt(aw);
0191         }
0192       };
0193       interruptor in{aw_};
0194       //if constexpr (traits::interruptible)
0195         this_.working[Idx] = &in;
0196 
0197       auto transaction = [&this_, idx = Idx] {
0198         if (this_.has_result())
0199           boost::throw_exception(std::runtime_error("Another transaction already started"));
0200         this_.cancel[idx] = nullptr;
0201         // reserve the index early bc
0202         this_.index = idx;
0203         this_.cancel_all();
0204       };
0205 
0206       co_await fork::set_transaction_function(transaction);
0207       // check manually if we're ready
0208       auto rd = aw.await_ready();
0209       if (!rd)
0210       {
0211         this_.cancel[Idx] = &this_.cancel_[Idx];
0212         co_await this_.cancel[Idx]->slot();
0213         // make sure the executor is set
0214         co_await detail::fork::wired_up;
0215 
0216         // do the await - this doesn't call await-ready again
0217         if constexpr (std::is_void_v<decltype(aw_.await_resume())>)
0218         {
0219           auto res = co_await aw;
0220           if (!this_.has_result())
0221           {
0222             this_.index = Idx;
0223             if (res.has_error())
0224               this_.assign_error(res);
0225           }
0226           if constexpr(!all_void)
0227             if (this_.index == Idx && !res.has_error())
0228               this_.result.emplace(variant2::in_place_index<Idx>);
0229         }
0230         else
0231         {
0232           auto val = co_await aw;
0233           if (!this_.has_result())
0234             this_.index = Idx;
0235           if (this_.index == Idx)
0236           {
0237             if (val.has_error())
0238               this_.assign_error(val);
0239             else
0240               this_.result.emplace(variant2::in_place_index<Idx>, *std::move(val));
0241           }
0242         }
0243         this_.cancel[Idx] = nullptr;
0244       }
0245       else
0246       {
0247         if (!this_.has_result())
0248           this_.index = Idx;
0249         if constexpr (std::is_void_v<decltype(aw_.await_resume())>)
0250         {
0251           auto res = aw.await_resume();
0252           if (this_.index == Idx)
0253           {
0254             if (res.has_error())
0255               this_.assign_error(res);
0256             else
0257               this_.result.emplace(variant2::in_place_index<Idx>);
0258           }
0259         }
0260         else
0261         {
0262           if (this_.index == Idx)
0263           {
0264             auto res = aw.await_resume();
0265             if (res.has_error())
0266               this_.assign_error(res);
0267             else
0268               this_.result.emplace(variant2::in_place_index<Idx>, *std::move(res));
0269           }
0270           else
0271             aw.await_resume();
0272         }
0273         this_.cancel[Idx] = nullptr;
0274       }
0275       this_.cancel_all();
0276       this_.working[Idx] = nullptr;
0277     }
0278     BOOST_CATCH(...)
0279     {
0280       if (!this_.has_result())
0281         this_.index = Idx;
0282       if (this_.index == Idx)
0283         this_.error = std::current_exception();
0284       this_.working[Idx] = nullptr;
0285     }
0286     BOOST_CATCH_END
0287 
0288     std::array<detail::fork(*)(awaitable&), tuple_size> impls {
0289         []<std::size_t ... Idx>(std::index_sequence<Idx...>)
0290         {
0291           return std::array<detail::fork(*)(awaitable&), tuple_size>{&await_impl<Idx>...};
0292         }(std::make_index_sequence<tuple_size>{})
0293     };
0294 
0295     detail::fork last_forked;
0296 
0297     bool await_ready()
0298     {
0299       last_forked = impls[0](*this);
0300       return last_forked.done();
0301     }
0302 
0303     template<typename H>
0304     auto await_suspend(
0305         std::coroutine_handle<H> h,
0306         const boost::source_location & loc = BOOST_CURRENT_LOCATION)
0307     {
0308       this->loc = loc;
0309 
0310       this->exec = &cobalt::detail::get_executor(h);
0311       last_forked.release().resume();
0312 
0313       if (!this->outstanding_work()) // already done, resume rightaway.
0314         return false;
0315 
0316       for (std::size_t idx = 1u;
0317            idx < tuple_size; idx++) // we'
0318       {
0319         auto l = impls[idx](*this);
0320         const auto d = l.done();
0321         l.release();
0322         if (d)
0323           break;
0324       }
0325 
0326       if (!this->outstanding_work()) // already done, resume rightaway.
0327         return false;
0328 
0329       // arm the cancel
0330       assign_cancellation(
0331           h,
0332           [&](asio::cancellation_type ct)
0333           {
0334             for (auto & cs : cancel)
0335               if (cs)
0336                 cs->emit(ct);
0337           });
0338 
0339       this->coro.reset(h.address());
0340       return true;
0341     }
0342 
0343 #if _MSC_VER
0344     BOOST_NOINLINE
0345 #endif
0346     auto await_resume()
0347     {
0348       if (error)
0349         std::rethrow_exception(error);
0350       if constexpr (all_void)
0351         return index;
0352       else
0353         return std::move(*result);
0354     }
0355 
0356     auto await_resume(const as_tuple_tag &)
0357     {
0358       if constexpr (all_void)
0359         return std::make_tuple(error, index);
0360       else
0361         return std::make_tuple(error, std::move(*result));
0362     }
0363 
0364     auto await_resume(const as_result_tag & )
0365         -> system::result<std::conditional_t<all_void, std::size_t, variant2::variant<void_as_monostate<co_await_result_t<Args>>...>>, std::exception_ptr>
0366     {
0367       if (error)
0368         return {system::in_place_error, error};
0369       if constexpr (all_void)
0370         return {system::in_place_value, index};
0371       else
0372         return {system::in_place_value, std::move(*result)};
0373     }
0374   };
0375   awaitable operator co_await() &&
0376   {
0377     return awaitable{args, g, std::make_index_sequence<tuple_size>{}};
0378   }
0379 };
0380 
0381 
0382 template<asio::cancellation_type Ct, typename URBG, typename Range>
0383 struct race_ranged_impl
0384 {
0385 
0386   using result_type = co_await_result_t<std::decay_t<decltype(*std::begin(std::declval<Range>()))>>;
0387   template<typename URBG_>
0388   race_ranged_impl(URBG_ && g, Range && rng)
0389       : range{std::forward<Range>(rng)}, g(std::forward<URBG_>(g))
0390   {
0391   }
0392 
0393   Range range;
0394   URBG g;
0395 
0396   struct awaitable : fork::shared_state
0397   {
0398 
0399 #if !defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0400     boost::source_location loc;
0401 #endif
0402 
0403     using type = std::decay_t<decltype(*std::begin(std::declval<Range>()))>;
0404     using traits = race_traits<Range, type>;
0405 
0406     std::size_t index{std::numeric_limits<std::size_t>::max()};
0407 
0408     std::conditional_t<
0409         std::is_void_v<result_type>,
0410         variant2::monostate,
0411         std::optional<result_type>> result;
0412 
0413     std::exception_ptr error;
0414 
0415 #if !defined(BOOST_COBALT_NO_PMR)
0416     pmr::monotonic_buffer_resource res;
0417     pmr::polymorphic_allocator<void> alloc{&resource};
0418 
0419     Range &aws;
0420 
0421     struct dummy
0422     {
0423       template<typename ... Args>
0424       dummy(Args && ...) {}
0425     };
0426 
0427     std::conditional_t<traits::interruptible,
0428       pmr::vector<std::decay_t<typename traits::actual_awaitable>*>,
0429       dummy> working{std::size(aws), alloc};
0430 
0431     /* all below `reorder` is reordered
0432      *
0433      * cancel[idx] is for aws[reorder[idx]]
0434     */
0435     pmr::vector<std::size_t> reorder{std::size(aws), alloc};
0436     pmr::vector<asio::cancellation_signal> cancel_{std::size(aws), alloc};
0437     pmr::vector<asio::cancellation_signal*> cancel{std::size(aws), alloc};
0438 
0439 #else
0440     Range &aws;
0441 
0442     struct dummy
0443     {
0444       template<typename ... Args>
0445       dummy(Args && ...) {}
0446     };
0447 
0448     std::conditional_t<traits::interruptible,
0449         std::vector<std::decay_t<typename traits::actual_awaitable>*>,
0450     dummy> working{std::size(aws), std::allocator<void>()};
0451 
0452     /* all below `reorder` is reordered
0453      *
0454      * cancel[idx] is for aws[reorder[idx]]
0455     */
0456     std::vector<std::size_t> reorder{std::size(aws), std::allocator<void>()};
0457     std::vector<asio::cancellation_signal> cancel_{std::size(aws), std::allocator<void>()};
0458     std::vector<asio::cancellation_signal*> cancel{std::size(aws), std::allocator<void>()};
0459 
0460 #endif
0461 
0462     bool has_result() const {return index != std::numeric_limits<std::size_t>::max(); }
0463 
0464 
0465     awaitable(Range & aws, URBG & g)
0466         : fork::shared_state((256 + sizeof(co_awaitable_type<type>) + sizeof(std::size_t)) * std::size(aws))
0467         , aws(aws)
0468     {
0469       std::generate(reorder.begin(), reorder.end(), [i = std::size_t(0u)]() mutable {return i++;});
0470       if constexpr (traits::interruptible)
0471         std::fill(working.begin(), working.end(), nullptr);
0472       if constexpr (!std::is_same_v<URBG, left_race_tag>)
0473         std::shuffle(reorder.begin(), reorder.end(), g);
0474     }
0475 
0476     void cancel_all()
0477     {
0478       interrupt_await();
0479       for (auto & r : cancel)
0480         if (r)
0481           std::exchange(r, nullptr)->emit(Ct);
0482     }
0483     void interrupt_await()
0484     {
0485       if constexpr (traits::interruptible)
0486         for (auto aw : working)
0487           if (aw)
0488             traits::do_interrupt(*aw);
0489     }
0490 
0491 
0492     template<typename T, typename Error>
0493     void assign_error(system::result<T, Error> & res)
0494     BOOST_TRY
0495     {
0496       std::move(res).value(loc);
0497     }
0498     BOOST_CATCH(...)
0499     {
0500       error = std::current_exception();
0501     }
0502     BOOST_CATCH_END
0503 
0504     template<typename T>
0505     void assign_error(system::result<T, std::exception_ptr> & res)
0506     {
0507       error = std::move(res).error();
0508     }
0509 
0510     static detail::fork await_impl(awaitable & this_, std::size_t idx)
0511     BOOST_TRY
0512     {
0513       typename traits::actual_awaitable aw_{
0514           get_awaitable_type(
0515               static_cast<typename traits::awaitable>(*std::next(std::begin(this_.aws), idx))
0516               )};
0517 
0518       as_result_t aw{aw_};
0519 
0520       if constexpr (traits::interruptible)
0521         this_.working[idx] = &aw_;
0522 
0523       auto transaction = [&this_, idx = idx] {
0524         if (this_.has_result())
0525           boost::throw_exception(std::runtime_error("Another transaction already started"));
0526         this_.cancel[idx] = nullptr;
0527         // reserve the index early bc
0528         this_.index = idx;
0529         this_.cancel_all();
0530       };
0531 
0532       co_await fork::set_transaction_function(transaction);
0533       // check manually if we're ready
0534       auto rd = aw.await_ready();
0535       if (!rd)
0536       {
0537         this_.cancel[idx] = &this_.cancel_[idx];
0538         co_await this_.cancel[idx]->slot();
0539         // make sure the executor is set
0540         co_await detail::fork::wired_up;
0541 
0542         // do the await - this doesn't call await-ready again
0543         if constexpr (std::is_void_v<result_type>)
0544         {
0545           auto res = co_await aw;
0546           if (!this_.has_result())
0547           {
0548             if (res.has_error())
0549               this_.assign_error(res);
0550             this_.index = idx;
0551           }
0552         }
0553         else
0554         {
0555           auto val = co_await aw;
0556           if (!this_.has_result())
0557             this_.index = idx;
0558           if (this_.index == idx)
0559           {
0560             if (val.has_error())
0561               this_.assign_error(val);
0562             else
0563               this_.result.emplace(*std::move(val));
0564           }
0565         }
0566         this_.cancel[idx] = nullptr;
0567       }
0568       else
0569       {
0570 
0571         if (!this_.has_result())
0572           this_.index = idx;
0573         if constexpr (std::is_void_v<decltype(aw_.await_resume())>)
0574         {
0575           auto val = aw.await_resume();
0576           if (val.has_error())
0577             this_.assign_error(val);
0578         }
0579         else
0580         {
0581           if (this_.index == idx)
0582           {
0583             auto val = aw.await_resume();
0584             if (val.has_error())
0585               this_.assign_error(val);
0586             else
0587               this_.result.emplace(*std::move(val));
0588           }
0589           else
0590             aw.await_resume();
0591         }
0592         this_.cancel[idx] = nullptr;
0593       }
0594       this_.cancel_all();
0595       if constexpr (traits::interruptible)
0596         this_.working[idx] = nullptr;
0597     }
0598     BOOST_CATCH(...)
0599     {
0600       if (!this_.has_result())
0601         this_.index = idx;
0602       if (this_.index == idx)
0603         this_.error = std::current_exception();
0604       if constexpr (traits::interruptible)
0605         this_.working[idx] = nullptr;
0606     }
0607     BOOST_CATCH_END
0608 
0609     detail::fork last_forked;
0610 
0611     bool await_ready()
0612     {
0613       last_forked = await_impl(*this, reorder.front());
0614       return last_forked.done();
0615     }
0616 
0617     template<typename H>
0618     auto await_suspend(std::coroutine_handle<H> h,
0619                        const boost::source_location & loc = BOOST_CURRENT_LOCATION)
0620     {
0621       this->loc = loc;
0622       this->exec = &detail::get_executor(h);
0623       last_forked.release().resume();
0624 
0625       if (!this->outstanding_work()) // already done, resume rightaway.
0626         return false;
0627 
0628       for (auto itr = std::next(reorder.begin());
0629            itr < reorder.end(); std::advance(itr, 1)) // we'
0630       {
0631         auto l = await_impl(*this, *itr);
0632         auto d = l.done();
0633         l.release();
0634         if (d)
0635           break;
0636       }
0637 
0638       if (!this->outstanding_work()) // already done, resume rightaway.
0639         return false;
0640 
0641       // arm the cancel
0642       assign_cancellation(
0643           h,
0644           [&](asio::cancellation_type ct)
0645           {
0646             for (auto & cs : cancel)
0647               if (cs)
0648                 cs->emit(ct);
0649           });
0650 
0651       this->coro.reset(h.address());
0652       return true;
0653     }
0654 
0655 #if _MSC_VER
0656     BOOST_NOINLINE
0657 #endif
0658     auto await_resume()
0659     {
0660       if (error)
0661         std::rethrow_exception(error);
0662       if constexpr (std::is_void_v<result_type>)
0663         return index;
0664       else
0665         return std::make_pair(index, *result);
0666     }
0667 
0668     auto await_resume(const as_tuple_tag &)
0669     {
0670       if constexpr (std::is_void_v<result_type>)
0671         return std::make_tuple(error, index);
0672       else
0673         return std::make_tuple(error, std::make_pair(index, std::move(*result)));
0674     }
0675 
0676     auto await_resume(const as_result_tag & )
0677     -> system::result<result_type, std::exception_ptr>
0678     {
0679       if (error)
0680         return {system::in_place_error, error};
0681       if constexpr (std::is_void_v<result_type>)
0682         return {system::in_place_value, index};
0683       else
0684         return {system::in_place_value, std::make_pair(index, std::move(*result))};
0685     }
0686 
0687   };
0688   awaitable operator co_await() &&
0689   {
0690     return awaitable{range, g};
0691   }
0692 };
0693 
0694 }
0695 
0696 #endif //BOOST_COBALT_DETAIL_RACE_HPP