Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2024-11-15 09:04:16

0001 //
0002 // Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_COBALT_DETAIL_RACE_HPP
0009 #define BOOST_COBALT_DETAIL_RACE_HPP
0010 
0011 #include <boost/cobalt/detail/await_result_helper.hpp>
0012 #include <boost/cobalt/detail/fork.hpp>
0013 #include <boost/cobalt/detail/handler.hpp>
0014 #include <boost/cobalt/detail/forward_cancellation.hpp>
0015 #include <boost/cobalt/result.hpp>
0016 #include <boost/cobalt/this_thread.hpp>
0017 #include <boost/cobalt/detail/util.hpp>
0018 
0019 #include <boost/asio/bind_allocator.hpp>
0020 #include <boost/asio/bind_cancellation_slot.hpp>
0021 #include <boost/asio/bind_executor.hpp>
0022 #include <boost/asio/cancellation_signal.hpp>
0023 #include <boost/asio/associated_cancellation_slot.hpp>
0024 
0025 
0026 #include <boost/intrusive_ptr.hpp>
0027 #include <boost/core/demangle.hpp>
0028 #include <boost/core/span.hpp>
0029 #include <boost/variant2/variant.hpp>
0030 
0031 #include <coroutine>
0032 #include <optional>
0033 #include <algorithm>
0034 
0035 
0036 namespace boost::cobalt::detail
0037 {
0038 
0039 struct left_race_tag {};
0040 
0041 // helpers it determining the type of things;
0042 template<typename Base, // range of aw
0043          typename Awaitable = Base>
0044 struct race_traits
0045 {
0046   // for a ranges race this is based on the range, not the AW in it.
0047   constexpr static bool is_lvalue = std::is_lvalue_reference_v<Base>;
0048 
0049   // what the value is supposed to be cast to before the co_await_operator
0050   using awaitable = std::conditional_t<is_lvalue, std::decay_t<Awaitable> &, Awaitable &&>;
0051 
0052   // do we need operator co_await
0053   constexpr static bool is_actual = awaitable_type<awaitable>;
0054 
0055   // the type with .await_ functions & interrupt_await
0056   using actual_awaitable
0057         = std::conditional_t<
0058             is_actual,
0059               awaitable,
0060               decltype(get_awaitable_type(std::declval<awaitable>()))>;
0061 
0062   // the type to be used with interruptible
0063   using interruptible_type
0064         = std::conditional_t<
0065               std::is_lvalue_reference_v<Base>,
0066               std::decay_t<actual_awaitable> &,
0067               std::decay_t<actual_awaitable> &&>;
0068 
0069   constexpr static bool interruptible =
0070       cobalt::interruptible<interruptible_type>;
0071 
0072   static void do_interrupt(std::decay_t<actual_awaitable> & aw)
0073   {
0074     if constexpr (interruptible)
0075         static_cast<interruptible_type>(aw).interrupt_await();
0076   }
0077 
0078 };
0079 
0080 struct interruptible_base
0081 {
0082   virtual void interrupt_await() = 0;
0083 };
0084 
0085 template<asio::cancellation_type Ct, typename URBG, typename ... Args>
0086 struct race_variadic_impl
0087 {
0088 
0089   template<typename URBG_>
0090   race_variadic_impl(URBG_ && g, Args && ... args)
0091       : args{std::forward<Args>(args)...}, g(std::forward<URBG_>(g))
0092   {
0093   }
0094 
0095   std::tuple<Args...> args;
0096   URBG g;
0097 
0098   constexpr static std::size_t tuple_size = sizeof...(Args);
0099 
0100   struct awaitable : fork::static_shared_state<256 * tuple_size>
0101   {
0102 
0103 #if !defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0104     boost::source_location loc;
0105 #endif
0106 
0107     template<std::size_t ... Idx>
0108     awaitable(std::tuple<Args...> & args, URBG & g, std::index_sequence<Idx...>) :
0109         aws{args}
0110     {
0111       if constexpr (!std::is_same_v<URBG, left_race_tag>)
0112         std::shuffle(impls.begin(), impls.end(), g);
0113       std::fill(working.begin(), working.end(), nullptr);
0114     }
0115 
0116     std::tuple<Args...> & aws;
0117     std::array<asio::cancellation_signal, tuple_size> cancel_;
0118 
0119     template<typename > constexpr static auto make_null() {return nullptr;};
0120     std::array<asio::cancellation_signal*, tuple_size> cancel = {make_null<Args>()...};
0121 
0122     std::array<interruptible_base*, tuple_size> working;
0123 
0124     std::size_t index{std::numeric_limits<std::size_t>::max()};
0125 
0126     constexpr static bool all_void = (std::is_void_v<co_await_result_t<Args>> && ... );
0127     std::optional<variant2::variant<void_as_monostate<co_await_result_t<Args>>...>> result;
0128     std::exception_ptr error;
0129 
0130     bool has_result() const
0131     {
0132       return index != std::numeric_limits<std::size_t>::max();
0133     }
0134 
0135     void cancel_all()
0136     {
0137       interrupt_await();
0138       for (auto i = 0u; i < tuple_size; i++)
0139         if (auto &r = cancel[i]; r)
0140           std::exchange(r, nullptr)->emit(Ct);
0141     }
0142 
0143     void interrupt_await()
0144     {
0145       for (auto i : working)
0146         if (i)
0147           i->interrupt_await();
0148     }
0149 
0150     template<typename T, typename Error>
0151     void assign_error(system::result<T, Error> & res)
0152     try
0153     {
0154       std::move(res).value(loc);
0155     }
0156     catch(...)
0157     {
0158       error = std::current_exception();
0159     }
0160 
0161     template<typename T>
0162     void assign_error(system::result<T, std::exception_ptr> & res)
0163     {
0164       error = std::move(res).error();
0165     }
0166 
0167     template<std::size_t Idx>
0168     static detail::fork await_impl(awaitable & this_)
0169     try
0170     {
0171       using traits = race_traits<mp11::mp_at_c<mp11::mp_list<Args...>, Idx>>;
0172 
0173       typename traits::actual_awaitable aw_{
0174           get_awaitable_type(
0175               static_cast<typename traits::awaitable>(std::get<Idx>(this_.aws))
0176               )
0177       };
0178 
0179       as_result_t aw{aw_};
0180 
0181 
0182       struct interruptor final : interruptible_base
0183       {
0184         std::decay_t<typename traits::actual_awaitable> & aw;
0185         interruptor(std::decay_t<typename traits::actual_awaitable> & aw) : aw(aw) {}
0186         void interrupt_await() override
0187         {
0188           traits::do_interrupt(aw);
0189         }
0190       };
0191       interruptor in{aw_};
0192       //if constexpr (traits::interruptible)
0193         this_.working[Idx] = &in;
0194 
0195       auto transaction = [&this_, idx = Idx] {
0196         if (this_.has_result())
0197           boost::throw_exception(std::runtime_error("Another transaction already started"));
0198         this_.cancel[idx] = nullptr;
0199         // reserve the index early bc
0200         this_.index = idx;
0201         this_.cancel_all();
0202       };
0203 
0204       co_await fork::set_transaction_function(transaction);
0205       // check manually if we're ready
0206       auto rd = aw.await_ready();
0207       if (!rd)
0208       {
0209         this_.cancel[Idx] = &this_.cancel_[Idx];
0210         co_await this_.cancel[Idx]->slot();
0211         // make sure the executor is set
0212         co_await detail::fork::wired_up;
0213 
0214         // do the await - this doesn't call await-ready again
0215         if constexpr (std::is_void_v<decltype(aw_.await_resume())>)
0216         {
0217           auto res = co_await aw;
0218           if (!this_.has_result())
0219           {
0220             this_.index = Idx;
0221             if (res.has_error())
0222               this_.assign_error(res);
0223           }
0224           if constexpr(!all_void)
0225             if (this_.index == Idx && !res.has_error())
0226               this_.result.emplace(variant2::in_place_index<Idx>);
0227         }
0228         else
0229         {
0230           auto val = co_await aw;
0231           if (!this_.has_result())
0232             this_.index = Idx;
0233           if (this_.index == Idx)
0234           {
0235             if (val.has_error())
0236               this_.assign_error(val);
0237             else
0238               this_.result.emplace(variant2::in_place_index<Idx>, *std::move(val));
0239           }
0240         }
0241         this_.cancel[Idx] = nullptr;
0242       }
0243       else
0244       {
0245         if (!this_.has_result())
0246           this_.index = Idx;
0247         if constexpr (std::is_void_v<decltype(aw_.await_resume())>)
0248         {
0249           auto res = aw.await_resume();
0250           if (this_.index == Idx)
0251           {
0252             if (res.has_error())
0253               this_.assign_error(res);
0254             else
0255               this_.result.emplace(variant2::in_place_index<Idx>);
0256           }
0257         }
0258         else
0259         {
0260           if (this_.index == Idx)
0261           {
0262             auto res = aw.await_resume();
0263             if (res.has_error())
0264               this_.assign_error(res);
0265             else
0266               this_.result.emplace(variant2::in_place_index<Idx>, *std::move(res));
0267           }
0268           else
0269             aw.await_resume();
0270         }
0271         this_.cancel[Idx] = nullptr;
0272       }
0273       this_.cancel_all();
0274       this_.working[Idx] = nullptr;
0275     }
0276     catch(...)
0277     {
0278       if (!this_.has_result())
0279         this_.index = Idx;
0280       if (this_.index == Idx)
0281         this_.error = std::current_exception();
0282       this_.working[Idx] = nullptr;
0283     }
0284 
0285     std::array<detail::fork(*)(awaitable&), tuple_size> impls {
0286         []<std::size_t ... Idx>(std::index_sequence<Idx...>)
0287         {
0288           return std::array<detail::fork(*)(awaitable&), tuple_size>{&await_impl<Idx>...};
0289         }(std::make_index_sequence<tuple_size>{})
0290     };
0291 
0292     detail::fork last_forked;
0293 
0294     bool await_ready()
0295     {
0296       last_forked = impls[0](*this);
0297       return last_forked.done();
0298     }
0299 
0300     template<typename H>
0301     auto await_suspend(
0302         std::coroutine_handle<H> h,
0303         const boost::source_location & loc = BOOST_CURRENT_LOCATION)
0304     {
0305       this->loc = loc;
0306 
0307       this->exec = &cobalt::detail::get_executor(h);
0308       last_forked.release().resume();
0309 
0310       if (!this->outstanding_work()) // already done, resume rightaway.
0311         return false;
0312 
0313       for (std::size_t idx = 1u;
0314            idx < tuple_size; idx++) // we'
0315       {
0316         auto l = impls[idx](*this);
0317         const auto d = l.done();
0318         l.release();
0319         if (d)
0320           break;
0321       }
0322 
0323       if (!this->outstanding_work()) // already done, resume rightaway.
0324         return false;
0325 
0326       // arm the cancel
0327       assign_cancellation(
0328           h,
0329           [&](asio::cancellation_type ct)
0330           {
0331             for (auto & cs : cancel)
0332               if (cs)
0333                 cs->emit(ct);
0334           });
0335 
0336       this->coro.reset(h.address());
0337       return true;
0338     }
0339 
0340 #if _MSC_VER
0341     BOOST_NOINLINE
0342 #endif
0343     auto await_resume()
0344     {
0345       if (error)
0346         std::rethrow_exception(error);
0347       if constexpr (all_void)
0348         return index;
0349       else
0350         return std::move(*result);
0351     }
0352 
0353     auto await_resume(const as_tuple_tag &)
0354     {
0355       if constexpr (all_void)
0356         return std::make_tuple(error, index);
0357       else
0358         return std::make_tuple(error, std::move(*result));
0359     }
0360 
0361     auto await_resume(const as_result_tag & )
0362         -> system::result<std::conditional_t<all_void, std::size_t, variant2::variant<void_as_monostate<co_await_result_t<Args>>...>>, std::exception_ptr>
0363     {
0364       if (error)
0365         return {system::in_place_error, error};
0366       if constexpr (all_void)
0367         return {system::in_place_value, index};
0368       else
0369         return {system::in_place_value, std::move(*result)};
0370     }
0371   };
0372   awaitable operator co_await() &&
0373   {
0374     return awaitable{args, g, std::make_index_sequence<tuple_size>{}};
0375   }
0376 };
0377 
0378 
0379 template<asio::cancellation_type Ct, typename URBG, typename Range>
0380 struct race_ranged_impl
0381 {
0382 
0383   using result_type = co_await_result_t<std::decay_t<decltype(*std::begin(std::declval<Range>()))>>;
0384   template<typename URBG_>
0385   race_ranged_impl(URBG_ && g, Range && rng)
0386       : range{std::forward<Range>(rng)}, g(std::forward<URBG_>(g))
0387   {
0388   }
0389 
0390   Range range;
0391   URBG g;
0392 
0393   struct awaitable : fork::shared_state
0394   {
0395 
0396 #if !defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0397     boost::source_location loc;
0398 #endif
0399 
0400     using type = std::decay_t<decltype(*std::begin(std::declval<Range>()))>;
0401     using traits = race_traits<Range, type>;
0402 
0403     std::size_t index{std::numeric_limits<std::size_t>::max()};
0404 
0405     std::conditional_t<
0406         std::is_void_v<result_type>,
0407         variant2::monostate,
0408         std::optional<result_type>> result;
0409 
0410     std::exception_ptr error;
0411 
0412 #if !defined(BOOST_COBALT_NO_PMR)
0413     pmr::monotonic_buffer_resource res;
0414     pmr::polymorphic_allocator<void> alloc{&resource};
0415 
0416     Range &aws;
0417 
0418     struct dummy
0419     {
0420       template<typename ... Args>
0421       dummy(Args && ...) {}
0422     };
0423 
0424     std::conditional_t<traits::interruptible,
0425       pmr::vector<std::decay_t<typename traits::actual_awaitable>*>,
0426       dummy> working{std::size(aws), alloc};
0427 
0428     /* all below `reorder` is reordered
0429      *
0430      * cancel[idx] is for aws[reorder[idx]]
0431     */
0432     pmr::vector<std::size_t> reorder{std::size(aws), alloc};
0433     pmr::vector<asio::cancellation_signal> cancel_{std::size(aws), alloc};
0434     pmr::vector<asio::cancellation_signal*> cancel{std::size(aws), alloc};
0435 
0436 #else
0437     Range &aws;
0438 
0439     struct dummy
0440     {
0441       template<typename ... Args>
0442       dummy(Args && ...) {}
0443     };
0444 
0445     std::conditional_t<traits::interruptible,
0446         std::vector<std::decay_t<typename traits::actual_awaitable>*>,
0447     dummy> working{std::size(aws), std::allocator<void>()};
0448 
0449     /* all below `reorder` is reordered
0450      *
0451      * cancel[idx] is for aws[reorder[idx]]
0452     */
0453     std::vector<std::size_t> reorder{std::size(aws), std::allocator<void>()};
0454     std::vector<asio::cancellation_signal> cancel_{std::size(aws), std::allocator<void>()};
0455     std::vector<asio::cancellation_signal*> cancel{std::size(aws), std::allocator<void>()};
0456 
0457 #endif
0458 
0459     bool has_result() const {return index != std::numeric_limits<std::size_t>::max(); }
0460 
0461 
0462     awaitable(Range & aws, URBG & g)
0463         : fork::shared_state((256 + sizeof(co_awaitable_type<type>) + sizeof(std::size_t)) * std::size(aws))
0464         , aws(aws)
0465     {
0466       std::generate(reorder.begin(), reorder.end(), [i = std::size_t(0u)]() mutable {return i++;});
0467       if constexpr (traits::interruptible)
0468         std::fill(working.begin(), working.end(), nullptr);
0469       if constexpr (!std::is_same_v<URBG, left_race_tag>)
0470         std::shuffle(reorder.begin(), reorder.end(), g);
0471     }
0472 
0473     void cancel_all()
0474     {
0475       interrupt_await();
0476       for (auto & r : cancel)
0477         if (r)
0478           std::exchange(r, nullptr)->emit(Ct);
0479     }
0480     void interrupt_await()
0481     {
0482       if constexpr (traits::interruptible)
0483         for (auto aw : working)
0484           if (aw)
0485             traits::do_interrupt(*aw);
0486     }
0487 
0488 
0489     template<typename T, typename Error>
0490     void assign_error(system::result<T, Error> & res)
0491     try
0492     {
0493       std::move(res).value(loc);
0494     }
0495     catch(...)
0496     {
0497       error = std::current_exception();
0498     }
0499 
0500     template<typename T>
0501     void assign_error(system::result<T, std::exception_ptr> & res)
0502     {
0503       error = std::move(res).error();
0504     }
0505 
0506     static detail::fork await_impl(awaitable & this_, std::size_t idx)
0507     try
0508     {
0509       typename traits::actual_awaitable aw_{
0510           get_awaitable_type(
0511               static_cast<typename traits::awaitable>(*std::next(std::begin(this_.aws), idx))
0512               )};
0513 
0514       as_result_t aw{aw_};
0515 
0516       if constexpr (traits::interruptible)
0517         this_.working[idx] = &aw_;
0518 
0519       auto transaction = [&this_, idx = idx] {
0520         if (this_.has_result())
0521           boost::throw_exception(std::runtime_error("Another transaction already started"));
0522         this_.cancel[idx] = nullptr;
0523         // reserve the index early bc
0524         this_.index = idx;
0525         this_.cancel_all();
0526       };
0527 
0528       co_await fork::set_transaction_function(transaction);
0529       // check manually if we're ready
0530       auto rd = aw.await_ready();
0531       if (!rd)
0532       {
0533         this_.cancel[idx] = &this_.cancel_[idx];
0534         co_await this_.cancel[idx]->slot();
0535         // make sure the executor is set
0536         co_await detail::fork::wired_up;
0537 
0538         // do the await - this doesn't call await-ready again
0539         if constexpr (std::is_void_v<result_type>)
0540         {
0541           auto res = co_await aw;
0542           if (!this_.has_result())
0543           {
0544             if (res.has_error())
0545               this_.assign_error(res);
0546             this_.index = idx;
0547           }
0548         }
0549         else
0550         {
0551           auto val = co_await aw;
0552           if (!this_.has_result())
0553             this_.index = idx;
0554           if (this_.index == idx)
0555           {
0556             if (val.has_error())
0557               this_.assign_error(val);
0558             else
0559               this_.result.emplace(*std::move(val));
0560           }
0561         }
0562         this_.cancel[idx] = nullptr;
0563       }
0564       else
0565       {
0566 
0567         if (!this_.has_result())
0568           this_.index = idx;
0569         if constexpr (std::is_void_v<decltype(aw_.await_resume())>)
0570         {
0571           auto val = aw.await_resume();
0572           if (val.has_error())
0573             this_.assign_error(val);
0574         }
0575         else
0576         {
0577           if (this_.index == idx)
0578           {
0579             auto val = aw.await_resume();
0580             if (val.has_error())
0581               this_.assign_error(val);
0582             else
0583               this_.result.emplace(*std::move(val));
0584           }
0585           else
0586             aw.await_resume();
0587         }
0588         this_.cancel[idx] = nullptr;
0589       }
0590       this_.cancel_all();
0591       if constexpr (traits::interruptible)
0592         this_.working[idx] = nullptr;
0593     }
0594     catch(...)
0595     {
0596       if (!this_.has_result())
0597         this_.index = idx;
0598       if (this_.index == idx)
0599         this_.error = std::current_exception();
0600       if constexpr (traits::interruptible)
0601         this_.working[idx] = nullptr;
0602     }
0603 
0604     detail::fork last_forked;
0605 
0606     bool await_ready()
0607     {
0608       last_forked = await_impl(*this, reorder.front());
0609       return last_forked.done();
0610     }
0611 
0612     template<typename H>
0613     auto await_suspend(std::coroutine_handle<H> h,
0614                        const boost::source_location & loc = BOOST_CURRENT_LOCATION)
0615     {
0616       this->loc = loc;
0617       this->exec = &detail::get_executor(h);
0618       last_forked.release().resume();
0619 
0620       if (!this->outstanding_work()) // already done, resume rightaway.
0621         return false;
0622 
0623       for (auto itr = std::next(reorder.begin());
0624            itr < reorder.end(); std::advance(itr, 1)) // we'
0625       {
0626         auto l = await_impl(*this, *itr);
0627         auto d = l.done();
0628         l.release();
0629         if (d)
0630           break;
0631       }
0632 
0633       if (!this->outstanding_work()) // already done, resume rightaway.
0634         return false;
0635 
0636       // arm the cancel
0637       assign_cancellation(
0638           h,
0639           [&](asio::cancellation_type ct)
0640           {
0641             for (auto & cs : cancel)
0642               if (cs)
0643                 cs->emit(ct);
0644           });
0645 
0646       this->coro.reset(h.address());
0647       return true;
0648     }
0649 
0650 #if _MSC_VER
0651     BOOST_NOINLINE
0652 #endif
0653     auto await_resume()
0654     {
0655       if (error)
0656         std::rethrow_exception(error);
0657       if constexpr (std::is_void_v<result_type>)
0658         return index;
0659       else
0660         return std::make_pair(index, *result);
0661     }
0662 
0663     auto await_resume(const as_tuple_tag &)
0664     {
0665       if constexpr (std::is_void_v<result_type>)
0666         return std::make_tuple(error, index);
0667       else
0668         return std::make_tuple(error, std::make_pair(index, std::move(*result)));
0669     }
0670 
0671     auto await_resume(const as_result_tag & )
0672     -> system::result<result_type, std::exception_ptr>
0673     {
0674       if (error)
0675         return {system::in_place_error, error};
0676       if constexpr (std::is_void_v<result_type>)
0677         return {system::in_place_value, index};
0678       else
0679         return {system::in_place_value, std::make_pair(index, std::move(*result))};
0680     }
0681 
0682   };
0683   awaitable operator co_await() &&
0684   {
0685     return awaitable{range, g};
0686   }
0687 };
0688 
0689 }
0690 
0691 #endif //BOOST_COBALT_DETAIL_RACE_HPP