File indexing completed on 2025-07-05 08:28:21
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_COBALT_DETAIL_JOIN_HPP
0009 #define BOOST_COBALT_DETAIL_JOIN_HPP
0010
0011 #include <boost/cobalt/detail/await_result_helper.hpp>
0012 #include <boost/cobalt/detail/exception.hpp>
0013 #include <boost/cobalt/detail/fork.hpp>
0014 #include <boost/cobalt/detail/forward_cancellation.hpp>
0015 #include <boost/cobalt/detail/util.hpp>
0016 #include <boost/cobalt/detail/wrapper.hpp>
0017 #include <boost/cobalt/task.hpp>
0018 #include <boost/cobalt/this_thread.hpp>
0019
0020 #include <boost/asio/associated_cancellation_slot.hpp>
0021 #include <boost/asio/bind_cancellation_slot.hpp>
0022 #include <boost/asio/cancellation_signal.hpp>
0023
0024
0025 #include <boost/core/ignore_unused.hpp>
0026 #include <boost/intrusive_ptr.hpp>
0027 #include <boost/system/result.hpp>
0028 #include <boost/variant2/variant.hpp>
0029
0030 #include <array>
0031 #include <coroutine>
0032 #include <algorithm>
0033
0034 namespace boost::cobalt::detail
0035 {
0036
0037 template<typename ... Args>
0038 struct join_variadic_impl
0039 {
0040 using tuple_type = std::tuple<decltype(get_awaitable_type(std::declval<Args&&>()))...>;
0041
0042 join_variadic_impl(Args && ... args)
0043 : args{std::forward<Args>(args)...}
0044 {
0045 }
0046
0047 std::tuple<Args...> args;
0048
0049 constexpr static std::size_t tuple_size = sizeof...(Args);
0050
0051 struct awaitable : fork::static_shared_state<256 * tuple_size>
0052 {
0053 template<std::size_t ... Idx>
0054 awaitable(std::tuple<Args...> & args, std::index_sequence<Idx...>) :
0055 aws(awaitable_type_getter<Args>(std::get<Idx>(args))...)
0056 {
0057 }
0058
0059 tuple_type aws;
0060
0061 std::array<asio::cancellation_signal, tuple_size> cancel_;
0062 template<typename > constexpr static auto make_null() {return nullptr;};
0063 std::array<asio::cancellation_signal*, tuple_size> cancel = {make_null<Args>()...};
0064
0065 constexpr static bool all_void = (std::is_void_v<co_await_result_t<Args>> && ...);
0066 template<typename T>
0067 using result_store_part =
0068 std::optional<void_as_monostate<co_await_result_t<T>>>;
0069
0070 std::conditional_t<all_void,
0071 variant2::monostate,
0072 std::tuple<result_store_part<Args>...>> result;
0073 std::exception_ptr error;
0074
0075 template<std::size_t Idx>
0076 void cancel_step()
0077 {
0078 auto &r = cancel[Idx];
0079 if (r)
0080 std::exchange(r, nullptr)->emit(asio::cancellation_type::all);
0081 }
0082
0083 void cancel_all()
0084 {
0085 mp11::mp_for_each<mp11::mp_iota_c<sizeof...(Args)>>
0086 ([&](auto idx)
0087 {
0088 cancel_step<idx>();
0089 });
0090 }
0091
0092
0093
0094 template<std::size_t Idx>
0095 void interrupt_await_step()
0096 {
0097 using type = std::tuple_element_t<Idx, tuple_type>;
0098 using t = std::conditional_t<std::is_reference_v<std::tuple_element_t<Idx, std::tuple<Args...>>>,
0099 type &,
0100 type &&>;
0101
0102 if constexpr (interruptible<t>)
0103 if (this->cancel[Idx] != nullptr)
0104 static_cast<t>(std::get<Idx>(aws)).interrupt_await();
0105 }
0106
0107 void interrupt_await()
0108 {
0109 mp11::mp_for_each<mp11::mp_iota_c<sizeof...(Args)>>
0110 ([&](auto idx)
0111 {
0112 interrupt_await_step<idx>();
0113 });
0114 }
0115
0116
0117 template<std::size_t Idx>
0118 static detail::fork await_impl(awaitable & this_)
0119 BOOST_TRY
0120 {
0121 auto & aw = std::get<Idx>(this_.aws);
0122
0123 auto rd = aw.await_ready();
0124 if (!rd)
0125 {
0126 this_.cancel[Idx] = &this_.cancel_[Idx];
0127 co_await this_.cancel[Idx]->slot();
0128
0129 co_await detail::fork::wired_up;
0130
0131
0132 if constexpr (std::is_void_v<decltype(aw.await_resume())>)
0133 {
0134 co_await aw;
0135 if constexpr (!all_void)
0136 std::get<Idx>(this_.result).emplace();
0137 }
0138 else
0139 std::get<Idx>(this_.result).emplace(co_await aw);
0140 }
0141 else
0142 {
0143 if constexpr (std::is_void_v<decltype(aw.await_resume())>)
0144 {
0145 aw.await_resume();
0146 if constexpr (!all_void)
0147 std::get<Idx>(this_.result).emplace();
0148 }
0149 else
0150 std::get<Idx>(this_.result).emplace(aw.await_resume());
0151 }
0152
0153 }
0154 BOOST_CATCH(...)
0155 {
0156 if (!this_.error)
0157 this_.error = std::current_exception();
0158 this_.cancel_all();
0159 }
0160 BOOST_CATCH_END
0161
0162 std::array<detail::fork(*)(awaitable&), tuple_size> impls {
0163 []<std::size_t ... Idx>(std::index_sequence<Idx...>)
0164 {
0165 return std::array<detail::fork(*)(awaitable&), tuple_size>{&await_impl<Idx>...};
0166 }(std::make_index_sequence<tuple_size>{})
0167 };
0168
0169 detail::fork last_forked;
0170 std::size_t last_index = 0u;
0171
0172 bool await_ready()
0173 {
0174 while (last_index < tuple_size)
0175 {
0176 last_forked = impls[last_index++](*this);
0177 if (!last_forked.done())
0178 return false;
0179 }
0180 last_forked.release();
0181 return true;
0182 }
0183
0184 template<typename H>
0185 auto await_suspend(
0186 std::coroutine_handle<H> h
0187 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0188 , const boost::source_location & loc = BOOST_CURRENT_LOCATION
0189 #endif
0190 )
0191 {
0192 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0193 this->loc = loc;
0194 #endif
0195 this->exec = &detail::get_executor(h);
0196 last_forked.release().resume();
0197 while (last_index < tuple_size)
0198 impls[last_index++](*this).release();
0199
0200 if (error)
0201 cancel_all();
0202
0203 if (!this->outstanding_work())
0204 return false;
0205
0206
0207 assign_cancellation(
0208 h,
0209 [&](asio::cancellation_type ct)
0210 {
0211 for (auto cs : cancel)
0212 if (cs)
0213 cs->emit(ct);
0214 });
0215
0216 this->coro.reset(h.address());
0217 return true;
0218 }
0219
0220 #if _MSC_VER
0221 BOOST_NOINLINE
0222 #endif
0223 auto await_resume()
0224 {
0225 if (error)
0226 std::rethrow_exception(error);
0227 if constexpr(!all_void)
0228 return mp11::tuple_transform(
0229 []<typename T>(std::optional<T> & var)
0230 -> T
0231 {
0232 BOOST_ASSERT(var.has_value());
0233 return std::move(*var);
0234 }, result);
0235 }
0236
0237 auto await_resume(const as_tuple_tag &)
0238 {
0239 using t = decltype(await_resume());
0240 if constexpr(!all_void)
0241 {
0242 if (error)
0243 return std::make_tuple(error, t{});
0244 else
0245 return std::make_tuple(std::current_exception(),
0246 mp11::tuple_transform(
0247 []<typename T>(std::optional<T> & var)
0248 -> T
0249 {
0250 BOOST_ASSERT(var.has_value());
0251 return std::move(*var);
0252 }, result));
0253 }
0254 else
0255 return std::make_tuple(error);
0256 }
0257
0258 auto await_resume(const as_result_tag &)
0259 {
0260 using t = decltype(await_resume());
0261 using rt = system::result<t, std::exception_ptr>;
0262 if (error)
0263 return rt(system::in_place_error, error);
0264
0265 if constexpr(!all_void)
0266 return mp11::tuple_transform(
0267 []<typename T>(std::optional<T> & var)
0268 -> rt
0269 {
0270 BOOST_ASSERT(var.has_value());
0271 return std::move(*var);
0272 }, result);
0273 else
0274 return rt{system::in_place_value};
0275 }
0276 };
0277 awaitable operator co_await() &&
0278 {
0279 return awaitable(args, std::make_index_sequence<sizeof...(Args)>{});
0280 }
0281 };
0282
0283 template<typename Range>
0284 struct join_ranged_impl
0285 {
0286 Range aws;
0287
0288 using result_type = co_await_result_t<std::decay_t<decltype(*std::begin(std::declval<Range>()))>>;
0289
0290 constexpr static std::size_t result_size =
0291 sizeof(std::conditional_t<std::is_void_v<result_type>, variant2::monostate, result_type>);
0292
0293 struct awaitable : fork::shared_state
0294 {
0295 struct dummy
0296 {
0297 template<typename ... Args>
0298 dummy(Args && ...) {}
0299 };
0300
0301 using type = std::decay_t<decltype(*std::begin(std::declval<Range>()))>;
0302 #if !defined(BOOST_COBALT_NO_PMR)
0303 pmr::polymorphic_allocator<void> alloc{&resource};
0304
0305 std::conditional_t<awaitable_type<type>, Range &,
0306 pmr::vector<co_awaitable_type<type>>> aws;
0307
0308 pmr::vector<bool> ready{std::size(aws), alloc};
0309 pmr::vector<asio::cancellation_signal> cancel_{std::size(aws), alloc};
0310 pmr::vector<asio::cancellation_signal*> cancel{std::size(aws), alloc};
0311
0312
0313
0314 std::conditional_t<
0315 std::is_void_v<result_type>,
0316 dummy,
0317 pmr::vector<std::optional<void_as_monostate<result_type>>>>
0318 result{
0319 cancel.size(),
0320 alloc};
0321 #else
0322 std::allocator<void> alloc;
0323 std::conditional_t<awaitable_type<type>, Range &, std::vector<co_awaitable_type<type>>> aws;
0324
0325 std::vector<bool> ready{std::size(aws), alloc};
0326 std::vector<asio::cancellation_signal> cancel_{std::size(aws), alloc};
0327 std::vector<asio::cancellation_signal*> cancel{std::size(aws), alloc};
0328
0329 std::conditional_t<
0330 std::is_void_v<result_type>,
0331 dummy,
0332 std::vector<std::optional<void_as_monostate<result_type>>>>
0333 result{
0334 cancel.size(),
0335 alloc};
0336 #endif
0337 std::exception_ptr error;
0338
0339 awaitable(Range & aws_, std::false_type )
0340 : fork::shared_state((512 + sizeof(co_awaitable_type<type>) + result_size) * std::size(aws_))
0341 , aws{alloc}
0342 , ready{std::size(aws_), alloc}
0343 , cancel_{std::size(aws_), alloc}
0344 , cancel{std::size(aws_), alloc}
0345 {
0346 aws.reserve(std::size(aws_));
0347 for (auto && a : aws_)
0348 {
0349 using a_0 = std::decay_t<decltype(a)>;
0350 using a_t = std::conditional_t<
0351 std::is_lvalue_reference_v<Range>, a_0 &, a_0 &&>;
0352 aws.emplace_back(awaitable_type_getter<a_t>(static_cast<a_t>(a)));
0353 }
0354
0355 std::transform(std::begin(this->aws),
0356 std::end(this->aws),
0357 std::begin(ready),
0358 [](auto & aw) {return aw.await_ready();});
0359 }
0360 awaitable(Range & aws, std::true_type )
0361 : fork::shared_state((512 + sizeof(co_awaitable_type<type>) + result_size) * std::size(aws))
0362 , aws(aws)
0363 {
0364 std::transform(std::begin(aws), std::end(aws), std::begin(ready), [](auto & aw) {return aw.await_ready();});
0365 }
0366
0367 awaitable(Range & aws)
0368 : awaitable(aws, std::bool_constant<awaitable_type<type>>{})
0369 {
0370 }
0371
0372 void cancel_all()
0373 {
0374 for (auto & r : cancel)
0375 if (r)
0376 std::exchange(r, nullptr)->emit(asio::cancellation_type::all);
0377 }
0378
0379 void interrupt_await()
0380 {
0381 using t = std::conditional_t<std::is_reference_v<Range>,
0382 co_awaitable_type<type> &,
0383 co_awaitable_type<type> &&>;
0384
0385 if constexpr (interruptible<t>)
0386 {
0387 std::size_t idx = 0u;
0388 for (auto & aw : aws)
0389 if (cancel[idx])
0390 static_cast<t>(aw).interrupt_await();
0391 }
0392 }
0393
0394
0395 static detail::fork await_impl(awaitable & this_, std::size_t idx)
0396 BOOST_TRY
0397 {
0398 auto & aw = *std::next(std::begin(this_.aws), idx);
0399 auto rd = aw.await_ready();
0400 if (!rd)
0401 {
0402 this_.cancel[idx] = &this_.cancel_[idx];
0403 co_await this_.cancel[idx]->slot();
0404 co_await detail::fork::wired_up;
0405 if constexpr (std::is_void_v<decltype(aw.await_resume())>)
0406 co_await aw;
0407 else
0408 this_.result[idx].emplace(co_await aw);
0409 }
0410 else
0411 {
0412 if constexpr (std::is_void_v<decltype(aw.await_resume())>)
0413 aw.await_resume();
0414 else
0415 this_.result[idx].emplace(aw.await_resume());
0416 }
0417 }
0418 BOOST_CATCH(...)
0419 {
0420 if (!this_.error)
0421 this_.error = std::current_exception();
0422 this_.cancel_all();
0423 }
0424 BOOST_CATCH_END
0425
0426 detail::fork last_forked;
0427 std::size_t last_index = 0u;
0428
0429 bool await_ready()
0430 {
0431 while (last_index < cancel.size())
0432 {
0433 last_forked = await_impl(*this, last_index++);
0434 if (!last_forked.done())
0435 return false;
0436 }
0437 last_forked.release();
0438 return true;
0439 }
0440
0441
0442 template<typename H>
0443 auto await_suspend(
0444 std::coroutine_handle<H> h
0445 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0446 , const boost::source_location & loc = BOOST_CURRENT_LOCATION
0447 #endif
0448 )
0449 {
0450 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0451 this->loc = loc;
0452 #endif
0453 exec = &detail::get_executor(h);
0454
0455 last_forked.release().resume();
0456 while (last_index < cancel.size())
0457 await_impl(*this, last_index++).release();
0458
0459 if (error)
0460 cancel_all();
0461
0462 if (!this->outstanding_work())
0463 return false;
0464
0465
0466 assign_cancellation(
0467 h,
0468 [&](asio::cancellation_type ct)
0469 {
0470 for (auto cs : cancel)
0471 if (cs)
0472 cs->emit(ct);
0473 });
0474
0475
0476 this->coro.reset(h.address());
0477 return true;
0478 }
0479
0480 auto await_resume(const as_tuple_tag & )
0481 {
0482 #if defined(BOOST_COBALT_NO_PMR)
0483 std::vector<result_type> rr;
0484 #else
0485 pmr::vector<result_type> rr{this_thread::get_allocator()};
0486 #endif
0487
0488 if (error)
0489 return std::make_tuple(error, rr);
0490 if constexpr (!std::is_void_v<result_type>)
0491 {
0492 rr.reserve(result.size());
0493 for (auto & t : result)
0494 rr.push_back(*std::move(t));
0495 return std::make_tuple(std::exception_ptr(), std::move(rr));
0496 }
0497 }
0498
0499 auto await_resume(const as_result_tag & )
0500 {
0501 #if defined(BOOST_COBALT_NO_PMR)
0502 std::vector<result_type> rr;
0503 #else
0504 pmr::vector<result_type> rr{this_thread::get_allocator()};
0505 #endif
0506
0507 if (error)
0508 return system::result<decltype(rr), std::exception_ptr>(error);
0509 if constexpr (!std::is_void_v<result_type>)
0510 {
0511 rr.reserve(result.size());
0512 for (auto & t : result)
0513 rr.push_back(*std::move(t));
0514 return rr;
0515 }
0516 }
0517
0518 #if _MSC_VER
0519 BOOST_NOINLINE
0520 #endif
0521 auto await_resume()
0522 {
0523 if (error)
0524 std::rethrow_exception(error);
0525 if constexpr (!std::is_void_v<result_type>)
0526 {
0527 #if defined(BOOST_COBALT_NO_PMR)
0528 std::vector<result_type> rr;
0529 #else
0530 pmr::vector<result_type> rr{this_thread::get_allocator()};
0531 #endif
0532 rr.reserve(result.size());
0533 for (auto & t : result)
0534 rr.push_back(*std::move(t));
0535 return rr;
0536 }
0537 }
0538 };
0539 awaitable operator co_await() && {return awaitable{aws};}
0540 };
0541
0542 }
0543
0544
0545 #endif