File indexing completed on 2025-01-30 09:34:30
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_COBALT_DETAIL_GATHER_HPP
0009 #define BOOST_COBALT_DETAIL_GATHER_HPP
0010
0011 #include <boost/cobalt/detail/await_result_helper.hpp>
0012 #include <boost/cobalt/detail/exception.hpp>
0013 #include <boost/cobalt/detail/fork.hpp>
0014 #include <boost/cobalt/detail/forward_cancellation.hpp>
0015 #include <boost/cobalt/detail/util.hpp>
0016 #include <boost/cobalt/detail/wrapper.hpp>
0017 #include <boost/cobalt/task.hpp>
0018 #include <boost/cobalt/this_thread.hpp>
0019
0020 #include <boost/asio/associated_cancellation_slot.hpp>
0021 #include <boost/asio/bind_cancellation_slot.hpp>
0022 #include <boost/asio/cancellation_signal.hpp>
0023
0024
0025 #include <boost/core/ignore_unused.hpp>
0026 #include <boost/intrusive_ptr.hpp>
0027 #include <boost/system/result.hpp>
0028 #include <boost/variant2/variant.hpp>
0029
0030 #include <array>
0031 #include <coroutine>
0032 #include <algorithm>
0033
0034 namespace boost::cobalt::detail
0035 {
0036
0037
0038 template<typename ... Args>
0039 struct gather_variadic_impl
0040 {
0041 using tuple_type = std::tuple<decltype(get_awaitable_type(std::declval<Args&&>()))...>;
0042
0043 gather_variadic_impl(Args && ... args)
0044 : args{std::forward<Args>(args)...}
0045 {
0046 }
0047
0048 std::tuple<Args...> args;
0049
0050 constexpr static std::size_t tuple_size = sizeof...(Args);
0051
0052 struct awaitable : fork::static_shared_state<256 * tuple_size>
0053 {
0054 template<std::size_t ... Idx>
0055 awaitable(std::tuple<Args...> & args, std::index_sequence<Idx...>)
0056 : aws(awaitable_type_getter<Args>(std::get<Idx>(args))...)
0057 {
0058 }
0059
0060 tuple_type aws;
0061 std::array<asio::cancellation_signal, tuple_size> cancel;
0062
0063 template<typename T>
0064 using result_store_part = variant2::variant<
0065 variant2::monostate,
0066 void_as_monostate<co_await_result_t<T>>,
0067 std::exception_ptr>;
0068
0069 std::tuple<result_store_part<Args>...> result;
0070
0071
0072 template<std::size_t Idx>
0073 void interrupt_await_step()
0074 {
0075 using type= std::tuple_element_t<Idx, std::tuple<Args...>>;
0076 using t = std::conditional_t<
0077 std::is_reference_v<std::tuple_element_t<Idx, decltype(aws)>>,
0078 co_awaitable_type<type> &,
0079 co_awaitable_type<type> &&>;
0080
0081 if constexpr (interruptible<t>)
0082 static_cast<t>(std::get<Idx>(aws)).interrupt_await();
0083 }
0084 void interrupt_await()
0085 {
0086 mp11::mp_for_each<mp11::mp_iota_c<sizeof...(Args)>>
0087 ([&](auto idx)
0088 {
0089 interrupt_await_step<idx>();
0090 });
0091 }
0092
0093
0094 template<std::size_t Idx>
0095 static detail::fork await_impl(awaitable & this_)
0096 try
0097 {
0098 auto & aw = std::get<Idx>(this_.aws);
0099
0100 auto rd = aw.await_ready();
0101 if (!rd)
0102 {
0103 co_await this_.cancel[Idx].slot();
0104
0105 co_await detail::fork::wired_up;
0106
0107
0108 if constexpr (std::is_void_v<decltype(aw.await_resume())>)
0109 {
0110 co_await aw;
0111 std::get<Idx>(this_.result).template emplace<1u>();
0112 }
0113 else
0114 std::get<Idx>(this_.result).template emplace<1u>(co_await aw);
0115 }
0116 else
0117 {
0118 if constexpr (std::is_void_v<decltype(aw.await_resume())>)
0119 {
0120 aw.await_resume();
0121 std::get<Idx>(this_.result).template emplace<1u>();
0122 }
0123 else
0124 std::get<Idx>(this_.result).template emplace<1u>(aw.await_resume());
0125 }
0126 }
0127 catch(...)
0128 {
0129 std::get<Idx>(this_.result).template emplace<2u>(std::current_exception());
0130 }
0131
0132 std::array<detail::fork(*)(awaitable&), tuple_size> impls {
0133 []<std::size_t ... Idx>(std::index_sequence<Idx...>)
0134 {
0135 return std::array<detail::fork(*)(awaitable&), tuple_size>{&await_impl<Idx>...};
0136 }(std::make_index_sequence<tuple_size>{})
0137 };
0138
0139 detail::fork last_forked;
0140 std::size_t last_index = 0u;
0141
0142 bool await_ready()
0143 {
0144 while (last_index < tuple_size)
0145 {
0146 last_forked = impls[last_index++](*this);
0147 if (!last_forked.done())
0148 return false;
0149 }
0150 last_forked.release();
0151 return true;
0152 }
0153
0154 template<typename H>
0155 auto await_suspend(
0156 std::coroutine_handle<H> h
0157 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0158 , const boost::source_location & loc = BOOST_CURRENT_LOCATION
0159 #endif
0160 )
0161 {
0162 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0163 this->loc = loc;
0164 #endif
0165 this->exec = &cobalt::detail::get_executor(h);
0166 last_forked.release().resume();
0167 while (last_index < tuple_size)
0168 impls[last_index++](*this).release();
0169
0170 if (!this->outstanding_work())
0171 return false;
0172
0173
0174 assign_cancellation(
0175 h,
0176 [&](asio::cancellation_type ct)
0177 {
0178 for (auto & cs : cancel)
0179 cs.emit(ct);
0180 });
0181
0182
0183 this->coro.reset(h.address());
0184 return true;
0185 }
0186
0187 template<typename T>
0188 using result_part = system::result<co_await_result_t<T>, std::exception_ptr>;
0189
0190 #if _MSC_VER
0191 BOOST_NOINLINE
0192 #endif
0193 std::tuple<result_part<Args> ...> await_resume()
0194 {
0195 return mp11::tuple_transform(
0196 []<typename T>(variant2::variant<variant2::monostate, T, std::exception_ptr> & var)
0197 -> system::result<monostate_as_void<T>, std::exception_ptr>
0198 {
0199 BOOST_ASSERT(var.index() != 0u);
0200 if (var.index() == 1u)
0201 {
0202 if constexpr (std::is_same_v<T, variant2::monostate>)
0203 return {system::in_place_value};
0204 else
0205 return {system::in_place_value, std::move(get<1>(var))};
0206 }
0207 else
0208 return {system::in_place_error, std::move(get<2>(var))};
0209
0210 }
0211 , result);
0212 }
0213 };
0214 awaitable operator co_await() &&
0215 {
0216 return awaitable(args, std::make_index_sequence<sizeof...(Args)>{});
0217 }
0218 };
0219
0220 template<typename Range>
0221 struct gather_ranged_impl
0222 {
0223 Range aws;
0224
0225 using result_type = system::result<
0226 co_await_result_t<std::decay_t<decltype(*std::begin(std::declval<Range>()))>>,
0227 std::exception_ptr>;
0228
0229 using result_storage_type = variant2::variant<
0230 variant2::monostate,
0231 void_as_monostate<
0232 co_await_result_t<std::decay_t<decltype(*std::begin(std::declval<Range>()))>>
0233 >,
0234 std::exception_ptr>;
0235
0236 struct awaitable : fork::shared_state
0237 {
0238 using type = std::decay_t<decltype(*std::begin(std::declval<Range>()))>;
0239 #if !defined(BOOST_COBALT_NO_PMR)
0240 pmr::polymorphic_allocator<void> alloc{&resource};
0241 std::conditional_t<awaitable_type<type>, Range &,
0242 pmr::vector<co_awaitable_type<type>>> aws;
0243
0244 pmr::vector<bool> ready{std::size(aws), alloc};
0245 pmr::vector<asio::cancellation_signal> cancel{std::size(aws), alloc};
0246 pmr::vector<result_storage_type> result{cancel.size(), alloc};
0247
0248 #else
0249 std::allocator<void> alloc{};
0250 std::conditional_t<awaitable_type<type>, Range &,
0251 std::vector<co_awaitable_type<type>>> aws;
0252
0253 std::vector<bool> ready{std::size(aws), alloc};
0254 std::vector<asio::cancellation_signal> cancel{std::size(aws), alloc};
0255 std::vector<result_storage_type> result{cancel.size(), alloc};
0256 #endif
0257
0258
0259 awaitable(Range & aws_, std::false_type )
0260 : fork::shared_state((512 + sizeof(co_awaitable_type<type>)) * std::size(aws_))
0261 , aws{alloc}
0262 , ready{std::size(aws_), alloc}
0263 , cancel{std::size(aws_), alloc}
0264 {
0265 aws.reserve(std::size(aws_));
0266 for (auto && a : aws_)
0267 {
0268 using a_0 = std::decay_t<decltype(a)>;
0269 using a_t = std::conditional_t<
0270 std::is_lvalue_reference_v<Range>, a_0 &, a_0 &&>;
0271 aws.emplace_back(awaitable_type_getter<a_t>(static_cast<a_t>(a)));
0272 }
0273
0274
0275 std::transform(std::begin(this->aws),
0276 std::end(this->aws),
0277 std::begin(ready),
0278 [](auto & aw) {return aw.await_ready();});
0279 }
0280 awaitable(Range & aws, std::true_type )
0281 : fork::shared_state((512 + sizeof(co_awaitable_type<type>)) * std::size(aws))
0282 , aws(aws)
0283 {
0284 std::transform(std::begin(aws), std::end(aws), std::begin(ready), [](auto & aw) {return aw.await_ready();});
0285 }
0286
0287 awaitable(Range & aws)
0288 : awaitable(aws, std::bool_constant<awaitable_type<type>>{})
0289 {
0290 }
0291
0292 void interrupt_await()
0293 {
0294 using t = std::conditional_t<std::is_reference_v<Range>,
0295 co_awaitable_type<type> &,
0296 co_awaitable_type<type> &&>;
0297
0298 if constexpr (interruptible<t>)
0299 for (auto & aw : aws)
0300 static_cast<t>(aw).interrupt_await();
0301 }
0302
0303 static detail::fork await_impl(awaitable & this_, std::size_t idx)
0304 try
0305 {
0306 auto & aw = *std::next(std::begin(this_.aws), idx);
0307 auto rd = aw.await_ready();
0308 if (!rd)
0309 {
0310 co_await this_.cancel[idx].slot();
0311 co_await detail::fork::wired_up;
0312 if constexpr (std::is_void_v<decltype(aw.await_resume())>)
0313 {
0314 co_await aw;
0315 this_.result[idx].template emplace<1u>();
0316 }
0317 else
0318 this_.result[idx].template emplace<1u>(co_await aw);
0319 }
0320 else
0321 {
0322 if constexpr (std::is_void_v<decltype(aw.await_resume())>)
0323 {
0324 aw.await_resume();
0325 this_.result[idx].template emplace<1u>();
0326 }
0327 else
0328 this_.result[idx].template emplace<1u>(aw.await_resume());
0329 }
0330 }
0331 catch(...)
0332 {
0333 this_.result[idx].template emplace<2u>(std::current_exception());
0334
0335 }
0336
0337 detail::fork last_forked;
0338 std::size_t last_index = 0u;
0339
0340 bool await_ready()
0341 {
0342 while (last_index < cancel.size())
0343 {
0344 last_forked = await_impl(*this, last_index++);
0345 if (!last_forked.done())
0346 return false;
0347 }
0348 last_forked.release();
0349 return true;
0350 }
0351
0352 template<typename H>
0353 auto await_suspend(
0354 std::coroutine_handle<H> h
0355 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0356 , const boost::source_location & loc = BOOST_CURRENT_LOCATION
0357 #endif
0358 )
0359 {
0360 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0361 this->loc = loc;
0362 #endif
0363 exec = &detail::get_executor(h);
0364
0365 last_forked.release().resume();
0366 while (last_index < cancel.size())
0367 await_impl(*this, last_index++).release();
0368
0369 if (!this->outstanding_work())
0370 return false;
0371
0372
0373 assign_cancellation(
0374 h,
0375 [&](asio::cancellation_type ct)
0376 {
0377 for (auto & cs : cancel)
0378 cs.emit(ct);
0379 });
0380
0381 this->coro.reset(h.address());
0382 return true;
0383 }
0384
0385 #if _MSC_VER
0386 BOOST_NOINLINE
0387 #endif
0388 auto await_resume()
0389 {
0390 #if !defined(BOOST_COBALT_NO_PMR)
0391 pmr::vector<result_type> res{result.size(), this_thread::get_allocator()};
0392 #else
0393 std::vector<result_type> res(result.size());
0394 #endif
0395
0396 std::transform(
0397 result.begin(), result.end(), res.begin(),
0398 [](result_storage_type & res) -> result_type
0399 {
0400 BOOST_ASSERT(res.index() != 0u);
0401 if (res.index() == 1u)
0402 {
0403 if constexpr (std::is_void_v<typename result_type::value_type>)
0404 return system::in_place_value;
0405 else
0406 return {system::in_place_value, std::move(get<1u>(res))};
0407 }
0408 else
0409 return {system::in_place_error, get<2u>(res)};
0410 });
0411
0412 return res;
0413 }
0414 };
0415 awaitable operator co_await() && {return awaitable{aws};}
0416 };
0417
0418 }
0419
0420 #endif