Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-17 08:24:41

0001 //
0002 // Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_COBALT_DETAIL_JOIN_HPP
0009 #define BOOST_COBALT_DETAIL_JOIN_HPP
0010 
0011 #include <boost/cobalt/detail/await_result_helper.hpp>
0012 #include <boost/cobalt/detail/exception.hpp>
0013 #include <boost/cobalt/detail/fork.hpp>
0014 #include <boost/cobalt/detail/forward_cancellation.hpp>
0015 #include <boost/cobalt/detail/util.hpp>
0016 #include <boost/cobalt/detail/wrapper.hpp>
0017 #include <boost/cobalt/task.hpp>
0018 #include <boost/cobalt/this_thread.hpp>
0019 
0020 #include <boost/asio/associated_cancellation_slot.hpp>
0021 #include <boost/asio/bind_cancellation_slot.hpp>
0022 #include <boost/asio/cancellation_signal.hpp>
0023 
0024 
0025 #include <boost/core/ignore_unused.hpp>
0026 #include <boost/intrusive_ptr.hpp>
0027 #include <boost/system/result.hpp>
0028 #include <boost/variant2/variant.hpp>
0029 
0030 #include <array>
0031 #include <coroutine>
0032 #include <algorithm>
0033 
0034 namespace boost::cobalt::detail
0035 {
0036 
0037 template<typename ... Args>
0038 struct join_variadic_impl
0039 {
0040   using tuple_type = std::tuple<decltype(get_awaitable_type(std::declval<Args&&>()))...>;
0041 
0042   join_variadic_impl(Args && ... args)
0043       : args{std::forward<Args>(args)...}
0044   {
0045   }
0046 
0047   std::tuple<Args...> args;
0048 
0049   constexpr static std::size_t tuple_size = sizeof...(Args);
0050 
0051   struct awaitable : fork::static_shared_state<256 * tuple_size>
0052   {
0053     template<std::size_t ... Idx>
0054     awaitable(std::tuple<Args...> & args, std::index_sequence<Idx...>) :
0055         aws(awaitable_type_getter<Args>(std::get<Idx>(args))...)
0056     {
0057     }
0058 
0059     tuple_type aws;
0060 
0061     std::array<asio::cancellation_signal, tuple_size> cancel_;
0062     template<typename > constexpr static auto make_null() {return nullptr;};
0063     std::array<asio::cancellation_signal*, tuple_size> cancel = {make_null<Args>()...};
0064 
0065     constexpr static bool all_void = (std::is_void_v<co_await_result_t<Args>> && ...);
0066     template<typename T>
0067     using result_store_part =
0068         std::optional<void_as_monostate<co_await_result_t<T>>>;
0069 
0070     std::conditional_t<all_void,
0071                        variant2::monostate,
0072                        std::tuple<result_store_part<Args>...>> result;
0073     std::exception_ptr error;
0074 
0075     template<std::size_t Idx>
0076     void cancel_step()
0077     {
0078       auto &r = cancel[Idx];
0079       if (r)
0080         std::exchange(r, nullptr)->emit(asio::cancellation_type::all);
0081     }
0082 
0083     void cancel_all()
0084     {
0085       mp11::mp_for_each<mp11::mp_iota_c<sizeof...(Args)>>
0086           ([&](auto idx)
0087            {
0088              cancel_step<idx>();
0089            });
0090     }
0091 
0092 
0093 
0094     template<std::size_t Idx>
0095     void interrupt_await_step()
0096     {
0097       using type = std::tuple_element_t<Idx, tuple_type>;
0098       using t = std::conditional_t<std::is_reference_v<std::tuple_element_t<Idx, std::tuple<Args...>>>,
0099           type &,
0100           type &&>;
0101 
0102       if constexpr (interruptible<t>)
0103         if (this->cancel[Idx] != nullptr)
0104           static_cast<t>(std::get<Idx>(aws)).interrupt_await();
0105     }
0106 
0107     void interrupt_await()
0108     {
0109       mp11::mp_for_each<mp11::mp_iota_c<sizeof...(Args)>>
0110           ([&](auto idx)
0111            {
0112              interrupt_await_step<idx>();
0113            });
0114     }
0115 
0116     // GCC doesn't like member funs
0117     template<std::size_t Idx>
0118     static detail::fork await_impl(awaitable & this_)
0119     BOOST_TRY
0120     {
0121       auto & aw = std::get<Idx>(this_.aws);
0122       // check manually if we're ready
0123       auto rd = aw.await_ready();
0124       if (!rd)
0125       {
0126         this_.cancel[Idx] = &this_.cancel_[Idx];
0127         co_await this_.cancel[Idx]->slot();
0128         // make sure the executor is set
0129         co_await detail::fork::wired_up;
0130         // do the await - this doesn't call await-ready again
0131 
0132         if constexpr (std::is_void_v<decltype(aw.await_resume())>)
0133         {
0134           co_await aw;
0135           if constexpr (!all_void)
0136             std::get<Idx>(this_.result).emplace();
0137         }
0138         else
0139           std::get<Idx>(this_.result).emplace(co_await aw);
0140       }
0141       else
0142       {
0143         if constexpr (std::is_void_v<decltype(aw.await_resume())>)
0144         {
0145           aw.await_resume();
0146           if constexpr (!all_void)
0147             std::get<Idx>(this_.result).emplace();
0148         }
0149         else
0150           std::get<Idx>(this_.result).emplace(aw.await_resume());
0151       }
0152 
0153     }
0154     BOOST_CATCH(...)
0155     {
0156       if (!this_.error)
0157            this_.error = std::current_exception();
0158       this_.cancel_all();
0159     }
0160     BOOST_CATCH_END
0161 
0162     std::array<detail::fork(*)(awaitable&), tuple_size> impls {
0163         []<std::size_t ... Idx>(std::index_sequence<Idx...>)
0164         {
0165           return std::array<detail::fork(*)(awaitable&), tuple_size>{&await_impl<Idx>...};
0166         }(std::make_index_sequence<tuple_size>{})
0167     };
0168 
0169     detail::fork last_forked;
0170     std::size_t last_index = 0u;
0171 
0172     bool await_ready()
0173     {
0174       while (last_index < tuple_size)
0175       {
0176         last_forked = impls[last_index++](*this);
0177         if (!last_forked.done())
0178           return false; // one coro didn't immediately complete!
0179       }
0180       last_forked.release();
0181       return true;
0182     }
0183 
0184     template<typename H>
0185     auto await_suspend(
0186           std::coroutine_handle<H> h
0187 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0188         , const boost::source_location & loc = BOOST_CURRENT_LOCATION
0189 #endif
0190     )
0191     {
0192 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0193       this->loc = loc;
0194 #endif
0195       this->exec = detail::get_executor(h);
0196       last_forked.release().resume();
0197       while (last_index < tuple_size)
0198         impls[last_index++](*this).release();
0199 
0200       if (error)
0201         cancel_all();
0202 
0203       if (!this->outstanding_work()) // already done, resume rightaway.
0204         return false;
0205 
0206       // arm the cancel
0207       assign_cancellation(
0208           h,
0209           [&](asio::cancellation_type ct)
0210           {
0211             for (auto cs : cancel)
0212               if (cs)
0213                 cs->emit(ct);
0214           });
0215 
0216       this->coro.reset(h.address());
0217       return true;
0218     }
0219 
0220 #if _MSC_VER
0221     BOOST_NOINLINE
0222 #endif
0223     auto await_resume()
0224     {
0225       if (error)
0226         std::rethrow_exception(error);
0227       if constexpr(!all_void)
0228         return mp11::tuple_transform(
0229             []<typename T>(std::optional<T> & var)
0230                 -> T
0231             {
0232               BOOST_ASSERT(var.has_value());
0233               return std::move(*var);
0234             }, result);
0235     }
0236 
0237     auto await_resume(const as_tuple_tag &)
0238     {
0239       using t = decltype(await_resume());
0240       if constexpr(!all_void)
0241       {
0242         if (error)
0243           return std::make_tuple(error, t{});
0244         else
0245           return std::make_tuple(std::current_exception(),
0246                                  mp11::tuple_transform(
0247               []<typename T>(std::optional<T> & var)
0248                   -> T
0249               {
0250                 BOOST_ASSERT(var.has_value());
0251                 return std::move(*var);
0252               }, result));
0253       }
0254       else
0255         return std::make_tuple(error);
0256     }
0257 
0258     auto await_resume(const as_result_tag &)
0259     {
0260       using t = decltype(await_resume());
0261       using rt = system::result<t, std::exception_ptr>;
0262       if (error)
0263         return rt(system::in_place_error, error);
0264 
0265       if constexpr(!all_void)
0266         return mp11::tuple_transform(
0267             []<typename T>(std::optional<T> & var)
0268                 -> rt
0269             {
0270               BOOST_ASSERT(var.has_value());
0271               return std::move(*var);
0272             }, result);
0273       else
0274         return rt{system::in_place_value};
0275     }
0276   };
0277   awaitable operator co_await() &&
0278   {
0279     return awaitable(args, std::make_index_sequence<sizeof...(Args)>{});
0280   }
0281 };
0282 
0283 template<typename Range>
0284 struct join_ranged_impl
0285 {
0286   Range aws;
0287 
0288   using result_type = co_await_result_t<std::decay_t<decltype(*std::begin(std::declval<Range>()))>>;
0289 
0290   constexpr static std::size_t result_size =
0291       sizeof(std::conditional_t<std::is_void_v<result_type>, variant2::monostate, result_type>);
0292 
0293   struct awaitable : fork::shared_state
0294   {
0295     struct dummy
0296     {
0297       template<typename ... Args>
0298       dummy(Args && ...) {}
0299     };
0300 
0301     using type = std::decay_t<decltype(*std::begin(std::declval<Range>()))>;
0302 #if !defined(BOOST_COBALT_NO_PMR)
0303     pmr::polymorphic_allocator<void> alloc{&resource};
0304 
0305     std::conditional_t<awaitable_type<type>, Range &,
0306                        pmr::vector<co_awaitable_type<type>>> aws;
0307 
0308     pmr::vector<bool> ready{std::size(aws), alloc};
0309     pmr::vector<asio::cancellation_signal> cancel_{std::size(aws), alloc};
0310     pmr::vector<asio::cancellation_signal*> cancel{std::size(aws), alloc};
0311 
0312 
0313 
0314     std::conditional_t<
0315         std::is_void_v<result_type>,
0316         dummy,
0317         pmr::vector<std::optional<void_as_monostate<result_type>>>>
0318           result{
0319           cancel.size(),
0320           alloc};
0321 #else
0322     std::allocator<void> alloc;
0323     std::conditional_t<awaitable_type<type>, Range &,  std::vector<co_awaitable_type<type>>> aws;
0324 
0325     std::vector<bool> ready{std::size(aws), alloc};
0326     std::vector<asio::cancellation_signal> cancel_{std::size(aws), alloc};
0327     std::vector<asio::cancellation_signal*> cancel{std::size(aws), alloc};
0328 
0329     std::conditional_t<
0330         std::is_void_v<result_type>,
0331         dummy,
0332         std::vector<std::optional<void_as_monostate<result_type>>>>
0333           result{
0334           cancel.size(),
0335           alloc};
0336 #endif
0337     std::exception_ptr error;
0338 
0339     awaitable(Range & aws_, std::false_type /* needs  operator co_await */)
0340       :  fork::shared_state((512 + sizeof(co_awaitable_type<type>) + result_size) * std::size(aws_))
0341       , aws{alloc}
0342       , ready{std::size(aws_), alloc}
0343       , cancel_{std::size(aws_), alloc}
0344       , cancel{std::size(aws_), alloc}
0345     {
0346       aws.reserve(std::size(aws_));
0347       for (auto && a : aws_)
0348       {
0349         using a_0 = std::decay_t<decltype(a)>;
0350         using a_t = std::conditional_t<
0351             std::is_lvalue_reference_v<Range>, a_0 &, a_0 &&>;
0352         aws.emplace_back(awaitable_type_getter<a_t>(static_cast<a_t>(a)));
0353       }
0354 
0355       std::transform(std::begin(this->aws),
0356                      std::end(this->aws),
0357                      std::begin(ready),
0358                      [](auto & aw) {return aw.await_ready();});
0359     }
0360     awaitable(Range & aws, std::true_type /* needs operator co_await */)
0361         : fork::shared_state((512 + sizeof(co_awaitable_type<type>) + result_size) * std::size(aws))
0362         , aws(aws)
0363     {
0364       std::transform(std::begin(aws), std::end(aws), std::begin(ready), [](auto & aw) {return aw.await_ready();});
0365     }
0366 
0367     awaitable(Range & aws)
0368       : awaitable(aws, std::bool_constant<awaitable_type<type>>{})
0369     {
0370     }
0371 
0372 
0373 
0374     void cancel_all()
0375     {
0376       for (auto & r : cancel)
0377         if (r)
0378           std::exchange(r, nullptr)->emit(asio::cancellation_type::all);
0379     }
0380 
0381     void interrupt_await()
0382     {
0383       using t = std::conditional_t<std::is_reference_v<Range>,
0384           co_awaitable_type<type> &,
0385           co_awaitable_type<type> &&>;
0386 
0387       if constexpr (interruptible<t>)
0388       {
0389         std::size_t idx = 0u;
0390         for (auto & aw : aws)
0391           if (cancel[idx])
0392             static_cast<t>(aw).interrupt_await();
0393       }
0394     }
0395 
0396 
0397     static detail::fork await_impl(awaitable & this_, std::size_t idx)
0398     BOOST_TRY
0399     {
0400       auto & aw = *std::next(std::begin(this_.aws), idx);
0401       auto rd = aw.await_ready();
0402       if (!rd)
0403       {
0404         this_.cancel[idx] = &this_.cancel_[idx];
0405         co_await this_.cancel[idx]->slot();
0406         co_await detail::fork::wired_up;
0407         if constexpr (std::is_void_v<decltype(aw.await_resume())>)
0408           co_await aw;
0409         else
0410           this_.result[idx].emplace(co_await aw);
0411       }
0412       else
0413       {
0414         if constexpr (std::is_void_v<decltype(aw.await_resume())>)
0415           aw.await_resume();
0416         else
0417           this_.result[idx].emplace(aw.await_resume());
0418       }
0419     }
0420     BOOST_CATCH(...)
0421     {
0422       if (!this_.error)
0423         this_.error = std::current_exception();
0424       this_.cancel_all();
0425     }
0426     BOOST_CATCH_END
0427 
0428     detail::fork last_forked;
0429     std::size_t last_index = 0u;
0430 
0431     bool await_ready()
0432     {
0433       while (last_index < cancel.size())
0434       {
0435         last_forked = await_impl(*this, last_index++);
0436         if (!last_forked.done())
0437           return false; // one coro didn't immediately complete!
0438       }
0439       last_forked.release();
0440       return true;
0441     }
0442 
0443 
0444     template<typename H>
0445     auto await_suspend(
0446         std::coroutine_handle<H> h
0447 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0448         , const boost::source_location & loc = BOOST_CURRENT_LOCATION
0449 #endif
0450     )
0451     {
0452 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0453       this->loc = loc;
0454 #endif
0455       exec = detail::get_executor(h);
0456 
0457       last_forked.release().resume();
0458       while (last_index < cancel.size())
0459         await_impl(*this, last_index++).release();
0460 
0461       if (error)
0462         cancel_all();
0463 
0464       if (!this->outstanding_work()) // already done, resume right away.
0465         return false;
0466 
0467       // arm the cancel
0468       assign_cancellation(
0469           h,
0470           [&](asio::cancellation_type ct)
0471           {
0472             for (auto cs : cancel)
0473               if (cs)
0474                 cs->emit(ct);
0475           });
0476 
0477 
0478       this->coro.reset(h.address());
0479       return true;
0480     }
0481 
0482     auto await_resume(const as_tuple_tag & )
0483     {
0484 #if defined(BOOST_COBALT_NO_PMR)
0485       std::vector<result_type> rr;
0486 #else
0487       pmr::vector<result_type> rr{this_thread::get_allocator()};
0488 #endif
0489 
0490       if (error)
0491         return std::make_tuple(error, rr);
0492       if constexpr (!std::is_void_v<result_type>)
0493       {
0494         rr.reserve(result.size());
0495         for (auto & t : result)
0496           rr.push_back(*std::move(t));
0497         return std::make_tuple(std::exception_ptr(), std::move(rr));
0498       }
0499     }
0500 
0501     auto await_resume(const as_result_tag & )
0502     {
0503 #if defined(BOOST_COBALT_NO_PMR)
0504       std::vector<result_type> rr;
0505 #else
0506       pmr::vector<result_type> rr{this_thread::get_allocator()};
0507 #endif
0508 
0509       if (error)
0510         return system::result<decltype(rr), std::exception_ptr>(error);
0511       if constexpr (!std::is_void_v<result_type>)
0512       {
0513         rr.reserve(result.size());
0514         for (auto & t : result)
0515           rr.push_back(*std::move(t));
0516         return system::result<decltype(rr), std::exception_ptr>(std::move(rr));
0517       }
0518     }
0519 
0520 #if _MSC_VER
0521     BOOST_NOINLINE
0522 #endif
0523     auto await_resume()
0524     {
0525       if (error)
0526         std::rethrow_exception(error);
0527       if constexpr (!std::is_void_v<result_type>)
0528       {
0529 #if defined(BOOST_COBALT_NO_PMR)
0530         std::vector<result_type> rr;
0531 #else
0532         pmr::vector<result_type> rr{this_thread::get_allocator()};
0533 #endif
0534         rr.reserve(result.size());
0535         for (auto & t : result)
0536           rr.push_back(*std::move(t));
0537         return rr;
0538       }
0539     }
0540   };
0541   awaitable operator co_await() && {return awaitable{aws};}
0542 };
0543 
0544 }
0545 
0546 
0547 #endif //BOOST_COBALT_DETAIL_JOIN_HPP