Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-07-05 08:28:20

0001 //
0002 // Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_COBALT_DETAIL_GATHER_HPP
0009 #define BOOST_COBALT_DETAIL_GATHER_HPP
0010 
0011 #include <boost/cobalt/detail/await_result_helper.hpp>
0012 #include <boost/cobalt/detail/exception.hpp>
0013 #include <boost/cobalt/detail/fork.hpp>
0014 #include <boost/cobalt/detail/forward_cancellation.hpp>
0015 #include <boost/cobalt/detail/util.hpp>
0016 #include <boost/cobalt/detail/wrapper.hpp>
0017 #include <boost/cobalt/task.hpp>
0018 #include <boost/cobalt/this_thread.hpp>
0019 
0020 #include <boost/asio/associated_cancellation_slot.hpp>
0021 #include <boost/asio/bind_cancellation_slot.hpp>
0022 #include <boost/asio/cancellation_signal.hpp>
0023 
0024 
0025 #include <boost/core/ignore_unused.hpp>
0026 #include <boost/intrusive_ptr.hpp>
0027 #include <boost/system/result.hpp>
0028 #include <boost/variant2/variant.hpp>
0029 
0030 #include <array>
0031 #include <coroutine>
0032 #include <algorithm>
0033 
0034 namespace boost::cobalt::detail
0035 {
0036 
0037 
0038 template<typename ... Args>
0039 struct gather_variadic_impl
0040 {
0041   using tuple_type = std::tuple<decltype(get_awaitable_type(std::declval<Args&&>()))...>;
0042 
0043   gather_variadic_impl(Args && ... args)
0044       : args{std::forward<Args>(args)...}
0045   {
0046   }
0047 
0048   std::tuple<Args...> args;
0049 
0050   constexpr static std::size_t tuple_size = sizeof...(Args);
0051 
0052   struct awaitable : fork::static_shared_state<256 * tuple_size>
0053   {
0054     template<std::size_t ... Idx>
0055     awaitable(std::tuple<Args...> & args, std::index_sequence<Idx...>)
0056       : aws(awaitable_type_getter<Args>(std::get<Idx>(args))...)
0057     {
0058     }
0059 
0060     tuple_type aws;
0061     std::array<asio::cancellation_signal, tuple_size> cancel;
0062 
0063     template<typename T>
0064     using result_store_part = variant2::variant<
0065         variant2::monostate,
0066         void_as_monostate<co_await_result_t<T>>,
0067         std::exception_ptr>;
0068 
0069     std::tuple<result_store_part<Args>...> result;
0070 
0071 
0072     template<std::size_t Idx>
0073     void interrupt_await_step()
0074     {
0075       using type= std::tuple_element_t<Idx, std::tuple<Args...>>;
0076       using t = std::conditional_t<
0077           std::is_reference_v<std::tuple_element_t<Idx, decltype(aws)>>,
0078           co_awaitable_type<type> &,
0079           co_awaitable_type<type> &&>;
0080 
0081       if constexpr (interruptible<t>)
0082           static_cast<t>(std::get<Idx>(aws)).interrupt_await();
0083     }
0084     void interrupt_await()
0085     {
0086       mp11::mp_for_each<mp11::mp_iota_c<sizeof...(Args)>>
0087           ([&](auto idx)
0088            {
0089              interrupt_await_step<idx>();
0090            });
0091     }
0092 
0093     // GCC doesn't like member funs
0094     template<std::size_t Idx>
0095     static detail::fork await_impl(awaitable & this_)
0096     BOOST_TRY
0097     {
0098       auto & aw = std::get<Idx>(this_.aws);
0099       // check manually if we're ready
0100       auto rd = aw.await_ready();
0101       if (!rd)
0102       {
0103         co_await this_.cancel[Idx].slot();
0104         // make sure the executor is set
0105         co_await detail::fork::wired_up;
0106         // do the await - this doesn't call await-ready again
0107 
0108         if constexpr (std::is_void_v<decltype(aw.await_resume())>)
0109         {
0110           co_await aw;
0111           std::get<Idx>(this_.result).template emplace<1u>();
0112         }
0113         else
0114           std::get<Idx>(this_.result).template emplace<1u>(co_await aw);
0115       }
0116       else
0117       {
0118         if constexpr (std::is_void_v<decltype(aw.await_resume())>)
0119         {
0120           aw.await_resume();
0121           std::get<Idx>(this_.result).template emplace<1u>();
0122         }
0123         else
0124           std::get<Idx>(this_.result).template emplace<1u>(aw.await_resume());
0125       }
0126     }
0127     BOOST_CATCH(...)
0128     {
0129       std::get<Idx>(this_.result).template emplace<2u>(std::current_exception());
0130     }
0131     BOOST_CATCH_END
0132 
0133     std::array<detail::fork(*)(awaitable&), tuple_size> impls {
0134       []<std::size_t ... Idx>(std::index_sequence<Idx...>)
0135       {
0136         return std::array<detail::fork(*)(awaitable&), tuple_size>{&await_impl<Idx>...};
0137       }(std::make_index_sequence<tuple_size>{})
0138     };
0139 
0140     detail::fork last_forked;
0141     std::size_t last_index = 0u;
0142 
0143     bool await_ready()
0144     {
0145       while (last_index < tuple_size)
0146       {
0147         last_forked = impls[last_index++](*this);
0148         if (!last_forked.done())
0149           return false; // one coro didn't immediately complete!
0150       }
0151       last_forked.release();
0152       return true;
0153     }
0154 
0155     template<typename H>
0156     auto await_suspend(
0157           std::coroutine_handle<H> h
0158 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0159         , const boost::source_location & loc = BOOST_CURRENT_LOCATION
0160 #endif
0161     )
0162     {
0163 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0164       this->loc = loc;
0165 #endif
0166       this->exec = &cobalt::detail::get_executor(h);
0167       last_forked.release().resume();
0168       while (last_index < tuple_size)
0169         impls[last_index++](*this).release();
0170 
0171       if (!this->outstanding_work()) // already done, resume rightaway.
0172         return false;
0173 
0174       // arm the cancel
0175       assign_cancellation(
0176           h,
0177           [&](asio::cancellation_type ct)
0178           {
0179             for (auto & cs : cancel)
0180               cs.emit(ct);
0181           });
0182 
0183 
0184       this->coro.reset(h.address());
0185       return true;
0186     }
0187 
0188     template<typename T>
0189     using result_part = system::result<co_await_result_t<T>, std::exception_ptr>;
0190 
0191 #if _MSC_VER
0192     BOOST_NOINLINE
0193 #endif
0194     std::tuple<result_part<Args> ...> await_resume()
0195     {
0196       return mp11::tuple_transform(
0197           []<typename T>(variant2::variant<variant2::monostate, T, std::exception_ptr> & var)
0198               -> system::result<monostate_as_void<T>, std::exception_ptr>
0199           {
0200             BOOST_ASSERT(var.index() != 0u);
0201             if (var.index() == 1u)
0202             {
0203               if constexpr (std::is_same_v<T, variant2::monostate>)
0204                 return {system::in_place_value};
0205               else
0206                 return {system::in_place_value, std::move(get<1>(var))};
0207             }
0208             else
0209               return {system::in_place_error, std::move(get<2>(var))};
0210 
0211           }
0212           , result);
0213     }
0214   };
0215   awaitable operator co_await() &&
0216   {
0217     return awaitable(args, std::make_index_sequence<sizeof...(Args)>{});
0218   }
0219 };
0220 
0221 template<typename Range>
0222 struct gather_ranged_impl
0223 {
0224   Range aws;
0225 
0226   using result_type = system::result<
0227       co_await_result_t<std::decay_t<decltype(*std::begin(std::declval<Range>()))>>,
0228       std::exception_ptr>;
0229 
0230   using result_storage_type = variant2::variant<
0231       variant2::monostate,
0232       void_as_monostate<
0233           co_await_result_t<std::decay_t<decltype(*std::begin(std::declval<Range>()))>>
0234         >,
0235       std::exception_ptr>;
0236 
0237   struct awaitable : fork::shared_state
0238   {
0239     using type = std::decay_t<decltype(*std::begin(std::declval<Range>()))>;
0240 #if !defined(BOOST_COBALT_NO_PMR)
0241     pmr::polymorphic_allocator<void> alloc{&resource};
0242     std::conditional_t<awaitable_type<type>, Range &,
0243                        pmr::vector<co_awaitable_type<type>>> aws;
0244 
0245     pmr::vector<bool> ready{std::size(aws), alloc};
0246     pmr::vector<asio::cancellation_signal> cancel{std::size(aws), alloc};
0247     pmr::vector<result_storage_type> result{cancel.size(), alloc};
0248 
0249 #else
0250     std::allocator<void> alloc{};
0251     std::conditional_t<awaitable_type<type>, Range &,
0252         std::vector<co_awaitable_type<type>>> aws;
0253 
0254     std::vector<bool> ready{std::size(aws), alloc};
0255     std::vector<asio::cancellation_signal> cancel{std::size(aws), alloc};
0256     std::vector<result_storage_type> result{cancel.size(), alloc};
0257 #endif
0258 
0259 
0260     awaitable(Range & aws_, std::false_type /* needs operator co_await */)
0261       : fork::shared_state((512 + sizeof(co_awaitable_type<type>)) * std::size(aws_))
0262       , aws{alloc}
0263       , ready{std::size(aws_), alloc}
0264       , cancel{std::size(aws_), alloc}
0265     {
0266       aws.reserve(std::size(aws_));
0267       for (auto && a : aws_)
0268       {
0269         using a_0 = std::decay_t<decltype(a)>;
0270         using a_t = std::conditional_t<
0271             std::is_lvalue_reference_v<Range>, a_0 &, a_0 &&>;
0272         aws.emplace_back(awaitable_type_getter<a_t>(static_cast<a_t>(a)));
0273       }
0274 
0275 
0276       std::transform(std::begin(this->aws),
0277                      std::end(this->aws),
0278                      std::begin(ready),
0279                      [](auto & aw) {return aw.await_ready();});
0280     }
0281     awaitable(Range & aws, std::true_type /* needs operator co_await */)
0282         : fork::shared_state((512 + sizeof(co_awaitable_type<type>)) * std::size(aws))
0283         , aws(aws)
0284     {
0285       std::transform(std::begin(aws), std::end(aws), std::begin(ready), [](auto & aw) {return aw.await_ready();});
0286     }
0287 
0288     awaitable(Range & aws)
0289       : awaitable(aws, std::bool_constant<awaitable_type<type>>{})
0290     {
0291     }
0292 
0293     void interrupt_await()
0294     {
0295       using t = std::conditional_t<std::is_reference_v<Range>,
0296                                    co_awaitable_type<type> &,
0297                                    co_awaitable_type<type> &&>;
0298 
0299       if constexpr (interruptible<t>)
0300         for (auto & aw : aws)
0301           static_cast<t>(aw).interrupt_await();
0302     }
0303 
0304     static detail::fork await_impl(awaitable & this_, std::size_t idx)
0305     BOOST_TRY
0306     {
0307       auto & aw = *std::next(std::begin(this_.aws), idx);
0308       auto rd = aw.await_ready();
0309       if (!rd)
0310       {
0311         co_await this_.cancel[idx].slot();
0312         co_await detail::fork::wired_up;
0313         if constexpr (std::is_void_v<decltype(aw.await_resume())>)
0314         {
0315           co_await aw;
0316           this_.result[idx].template emplace<1u>();
0317         }
0318         else
0319           this_.result[idx].template emplace<1u>(co_await aw);
0320       }
0321       else
0322       {
0323         if constexpr (std::is_void_v<decltype(aw.await_resume())>)
0324         {
0325           aw.await_resume();
0326           this_.result[idx].template emplace<1u>();
0327         }
0328         else
0329           this_.result[idx].template emplace<1u>(aw.await_resume());
0330       }
0331     }
0332     BOOST_CATCH(...)
0333     {
0334       this_.result[idx].template emplace<2u>(std::current_exception());
0335 
0336     }
0337     BOOST_CATCH_END
0338 
0339     detail::fork last_forked;
0340     std::size_t last_index = 0u;
0341 
0342     bool await_ready()
0343     {
0344       while (last_index < cancel.size())
0345       {
0346         last_forked = await_impl(*this, last_index++);
0347         if (!last_forked.done())
0348           return false; // one coro didn't immediately complete!
0349       }
0350       last_forked.release();
0351       return true;
0352     }
0353 
0354     template<typename H>
0355     auto await_suspend(
0356           std::coroutine_handle<H> h
0357 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0358         , const boost::source_location & loc = BOOST_CURRENT_LOCATION
0359 #endif
0360     )
0361     {
0362 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0363       this->loc = loc;
0364 #endif
0365       exec = &detail::get_executor(h);
0366 
0367       last_forked.release().resume();
0368       while (last_index < cancel.size())
0369         await_impl(*this, last_index++).release();
0370 
0371       if (!this->outstanding_work()) // already done, resume rightaway.
0372         return false;
0373 
0374       // arm the cancel
0375       assign_cancellation(
0376           h,
0377           [&](asio::cancellation_type ct)
0378           {
0379             for (auto & cs : cancel)
0380               cs.emit(ct);
0381           });
0382 
0383       this->coro.reset(h.address());
0384       return true;
0385     }
0386 
0387 #if _MSC_VER
0388     BOOST_NOINLINE
0389 #endif
0390     auto await_resume()
0391     {
0392 #if !defined(BOOST_COBALT_NO_PMR)
0393       pmr::vector<result_type> res{result.size(), this_thread::get_allocator()};
0394 #else
0395       std::vector<result_type> res(result.size());
0396 #endif
0397 
0398       std::transform(
0399           result.begin(), result.end(), res.begin(),
0400           [](result_storage_type & res) -> result_type
0401           {
0402             BOOST_ASSERT(res.index() != 0u);
0403             if (res.index() == 1u)
0404             {
0405               if constexpr (std::is_void_v<typename result_type::value_type>)
0406                 return system::in_place_value;
0407               else
0408                 return {system::in_place_value, std::move(get<1u>(res))};
0409             }
0410             else
0411               return {system::in_place_error, get<2u>(res)};
0412           });
0413 
0414       return res;
0415     }
0416   };
0417   awaitable operator co_await() && {return awaitable{aws};}
0418 };
0419 
0420 }
0421 
0422 #endif //BOOST_COBALT_DETAIL_GATHER_HPP