File indexing completed on 2025-07-05 08:28:20
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_COBALT_DETAIL_GATHER_HPP
0009 #define BOOST_COBALT_DETAIL_GATHER_HPP
0010
0011 #include <boost/cobalt/detail/await_result_helper.hpp>
0012 #include <boost/cobalt/detail/exception.hpp>
0013 #include <boost/cobalt/detail/fork.hpp>
0014 #include <boost/cobalt/detail/forward_cancellation.hpp>
0015 #include <boost/cobalt/detail/util.hpp>
0016 #include <boost/cobalt/detail/wrapper.hpp>
0017 #include <boost/cobalt/task.hpp>
0018 #include <boost/cobalt/this_thread.hpp>
0019
0020 #include <boost/asio/associated_cancellation_slot.hpp>
0021 #include <boost/asio/bind_cancellation_slot.hpp>
0022 #include <boost/asio/cancellation_signal.hpp>
0023
0024
0025 #include <boost/core/ignore_unused.hpp>
0026 #include <boost/intrusive_ptr.hpp>
0027 #include <boost/system/result.hpp>
0028 #include <boost/variant2/variant.hpp>
0029
0030 #include <array>
0031 #include <coroutine>
0032 #include <algorithm>
0033
0034 namespace boost::cobalt::detail
0035 {
0036
0037
0038 template<typename ... Args>
0039 struct gather_variadic_impl
0040 {
0041 using tuple_type = std::tuple<decltype(get_awaitable_type(std::declval<Args&&>()))...>;
0042
0043 gather_variadic_impl(Args && ... args)
0044 : args{std::forward<Args>(args)...}
0045 {
0046 }
0047
0048 std::tuple<Args...> args;
0049
0050 constexpr static std::size_t tuple_size = sizeof...(Args);
0051
0052 struct awaitable : fork::static_shared_state<256 * tuple_size>
0053 {
0054 template<std::size_t ... Idx>
0055 awaitable(std::tuple<Args...> & args, std::index_sequence<Idx...>)
0056 : aws(awaitable_type_getter<Args>(std::get<Idx>(args))...)
0057 {
0058 }
0059
0060 tuple_type aws;
0061 std::array<asio::cancellation_signal, tuple_size> cancel;
0062
0063 template<typename T>
0064 using result_store_part = variant2::variant<
0065 variant2::monostate,
0066 void_as_monostate<co_await_result_t<T>>,
0067 std::exception_ptr>;
0068
0069 std::tuple<result_store_part<Args>...> result;
0070
0071
0072 template<std::size_t Idx>
0073 void interrupt_await_step()
0074 {
0075 using type= std::tuple_element_t<Idx, std::tuple<Args...>>;
0076 using t = std::conditional_t<
0077 std::is_reference_v<std::tuple_element_t<Idx, decltype(aws)>>,
0078 co_awaitable_type<type> &,
0079 co_awaitable_type<type> &&>;
0080
0081 if constexpr (interruptible<t>)
0082 static_cast<t>(std::get<Idx>(aws)).interrupt_await();
0083 }
0084 void interrupt_await()
0085 {
0086 mp11::mp_for_each<mp11::mp_iota_c<sizeof...(Args)>>
0087 ([&](auto idx)
0088 {
0089 interrupt_await_step<idx>();
0090 });
0091 }
0092
0093
0094 template<std::size_t Idx>
0095 static detail::fork await_impl(awaitable & this_)
0096 BOOST_TRY
0097 {
0098 auto & aw = std::get<Idx>(this_.aws);
0099
0100 auto rd = aw.await_ready();
0101 if (!rd)
0102 {
0103 co_await this_.cancel[Idx].slot();
0104
0105 co_await detail::fork::wired_up;
0106
0107
0108 if constexpr (std::is_void_v<decltype(aw.await_resume())>)
0109 {
0110 co_await aw;
0111 std::get<Idx>(this_.result).template emplace<1u>();
0112 }
0113 else
0114 std::get<Idx>(this_.result).template emplace<1u>(co_await aw);
0115 }
0116 else
0117 {
0118 if constexpr (std::is_void_v<decltype(aw.await_resume())>)
0119 {
0120 aw.await_resume();
0121 std::get<Idx>(this_.result).template emplace<1u>();
0122 }
0123 else
0124 std::get<Idx>(this_.result).template emplace<1u>(aw.await_resume());
0125 }
0126 }
0127 BOOST_CATCH(...)
0128 {
0129 std::get<Idx>(this_.result).template emplace<2u>(std::current_exception());
0130 }
0131 BOOST_CATCH_END
0132
0133 std::array<detail::fork(*)(awaitable&), tuple_size> impls {
0134 []<std::size_t ... Idx>(std::index_sequence<Idx...>)
0135 {
0136 return std::array<detail::fork(*)(awaitable&), tuple_size>{&await_impl<Idx>...};
0137 }(std::make_index_sequence<tuple_size>{})
0138 };
0139
0140 detail::fork last_forked;
0141 std::size_t last_index = 0u;
0142
0143 bool await_ready()
0144 {
0145 while (last_index < tuple_size)
0146 {
0147 last_forked = impls[last_index++](*this);
0148 if (!last_forked.done())
0149 return false;
0150 }
0151 last_forked.release();
0152 return true;
0153 }
0154
0155 template<typename H>
0156 auto await_suspend(
0157 std::coroutine_handle<H> h
0158 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0159 , const boost::source_location & loc = BOOST_CURRENT_LOCATION
0160 #endif
0161 )
0162 {
0163 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0164 this->loc = loc;
0165 #endif
0166 this->exec = &cobalt::detail::get_executor(h);
0167 last_forked.release().resume();
0168 while (last_index < tuple_size)
0169 impls[last_index++](*this).release();
0170
0171 if (!this->outstanding_work())
0172 return false;
0173
0174
0175 assign_cancellation(
0176 h,
0177 [&](asio::cancellation_type ct)
0178 {
0179 for (auto & cs : cancel)
0180 cs.emit(ct);
0181 });
0182
0183
0184 this->coro.reset(h.address());
0185 return true;
0186 }
0187
0188 template<typename T>
0189 using result_part = system::result<co_await_result_t<T>, std::exception_ptr>;
0190
0191 #if _MSC_VER
0192 BOOST_NOINLINE
0193 #endif
0194 std::tuple<result_part<Args> ...> await_resume()
0195 {
0196 return mp11::tuple_transform(
0197 []<typename T>(variant2::variant<variant2::monostate, T, std::exception_ptr> & var)
0198 -> system::result<monostate_as_void<T>, std::exception_ptr>
0199 {
0200 BOOST_ASSERT(var.index() != 0u);
0201 if (var.index() == 1u)
0202 {
0203 if constexpr (std::is_same_v<T, variant2::monostate>)
0204 return {system::in_place_value};
0205 else
0206 return {system::in_place_value, std::move(get<1>(var))};
0207 }
0208 else
0209 return {system::in_place_error, std::move(get<2>(var))};
0210
0211 }
0212 , result);
0213 }
0214 };
0215 awaitable operator co_await() &&
0216 {
0217 return awaitable(args, std::make_index_sequence<sizeof...(Args)>{});
0218 }
0219 };
0220
0221 template<typename Range>
0222 struct gather_ranged_impl
0223 {
0224 Range aws;
0225
0226 using result_type = system::result<
0227 co_await_result_t<std::decay_t<decltype(*std::begin(std::declval<Range>()))>>,
0228 std::exception_ptr>;
0229
0230 using result_storage_type = variant2::variant<
0231 variant2::monostate,
0232 void_as_monostate<
0233 co_await_result_t<std::decay_t<decltype(*std::begin(std::declval<Range>()))>>
0234 >,
0235 std::exception_ptr>;
0236
0237 struct awaitable : fork::shared_state
0238 {
0239 using type = std::decay_t<decltype(*std::begin(std::declval<Range>()))>;
0240 #if !defined(BOOST_COBALT_NO_PMR)
0241 pmr::polymorphic_allocator<void> alloc{&resource};
0242 std::conditional_t<awaitable_type<type>, Range &,
0243 pmr::vector<co_awaitable_type<type>>> aws;
0244
0245 pmr::vector<bool> ready{std::size(aws), alloc};
0246 pmr::vector<asio::cancellation_signal> cancel{std::size(aws), alloc};
0247 pmr::vector<result_storage_type> result{cancel.size(), alloc};
0248
0249 #else
0250 std::allocator<void> alloc{};
0251 std::conditional_t<awaitable_type<type>, Range &,
0252 std::vector<co_awaitable_type<type>>> aws;
0253
0254 std::vector<bool> ready{std::size(aws), alloc};
0255 std::vector<asio::cancellation_signal> cancel{std::size(aws), alloc};
0256 std::vector<result_storage_type> result{cancel.size(), alloc};
0257 #endif
0258
0259
0260 awaitable(Range & aws_, std::false_type )
0261 : fork::shared_state((512 + sizeof(co_awaitable_type<type>)) * std::size(aws_))
0262 , aws{alloc}
0263 , ready{std::size(aws_), alloc}
0264 , cancel{std::size(aws_), alloc}
0265 {
0266 aws.reserve(std::size(aws_));
0267 for (auto && a : aws_)
0268 {
0269 using a_0 = std::decay_t<decltype(a)>;
0270 using a_t = std::conditional_t<
0271 std::is_lvalue_reference_v<Range>, a_0 &, a_0 &&>;
0272 aws.emplace_back(awaitable_type_getter<a_t>(static_cast<a_t>(a)));
0273 }
0274
0275
0276 std::transform(std::begin(this->aws),
0277 std::end(this->aws),
0278 std::begin(ready),
0279 [](auto & aw) {return aw.await_ready();});
0280 }
0281 awaitable(Range & aws, std::true_type )
0282 : fork::shared_state((512 + sizeof(co_awaitable_type<type>)) * std::size(aws))
0283 , aws(aws)
0284 {
0285 std::transform(std::begin(aws), std::end(aws), std::begin(ready), [](auto & aw) {return aw.await_ready();});
0286 }
0287
0288 awaitable(Range & aws)
0289 : awaitable(aws, std::bool_constant<awaitable_type<type>>{})
0290 {
0291 }
0292
0293 void interrupt_await()
0294 {
0295 using t = std::conditional_t<std::is_reference_v<Range>,
0296 co_awaitable_type<type> &,
0297 co_awaitable_type<type> &&>;
0298
0299 if constexpr (interruptible<t>)
0300 for (auto & aw : aws)
0301 static_cast<t>(aw).interrupt_await();
0302 }
0303
0304 static detail::fork await_impl(awaitable & this_, std::size_t idx)
0305 BOOST_TRY
0306 {
0307 auto & aw = *std::next(std::begin(this_.aws), idx);
0308 auto rd = aw.await_ready();
0309 if (!rd)
0310 {
0311 co_await this_.cancel[idx].slot();
0312 co_await detail::fork::wired_up;
0313 if constexpr (std::is_void_v<decltype(aw.await_resume())>)
0314 {
0315 co_await aw;
0316 this_.result[idx].template emplace<1u>();
0317 }
0318 else
0319 this_.result[idx].template emplace<1u>(co_await aw);
0320 }
0321 else
0322 {
0323 if constexpr (std::is_void_v<decltype(aw.await_resume())>)
0324 {
0325 aw.await_resume();
0326 this_.result[idx].template emplace<1u>();
0327 }
0328 else
0329 this_.result[idx].template emplace<1u>(aw.await_resume());
0330 }
0331 }
0332 BOOST_CATCH(...)
0333 {
0334 this_.result[idx].template emplace<2u>(std::current_exception());
0335
0336 }
0337 BOOST_CATCH_END
0338
0339 detail::fork last_forked;
0340 std::size_t last_index = 0u;
0341
0342 bool await_ready()
0343 {
0344 while (last_index < cancel.size())
0345 {
0346 last_forked = await_impl(*this, last_index++);
0347 if (!last_forked.done())
0348 return false;
0349 }
0350 last_forked.release();
0351 return true;
0352 }
0353
0354 template<typename H>
0355 auto await_suspend(
0356 std::coroutine_handle<H> h
0357 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0358 , const boost::source_location & loc = BOOST_CURRENT_LOCATION
0359 #endif
0360 )
0361 {
0362 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0363 this->loc = loc;
0364 #endif
0365 exec = &detail::get_executor(h);
0366
0367 last_forked.release().resume();
0368 while (last_index < cancel.size())
0369 await_impl(*this, last_index++).release();
0370
0371 if (!this->outstanding_work())
0372 return false;
0373
0374
0375 assign_cancellation(
0376 h,
0377 [&](asio::cancellation_type ct)
0378 {
0379 for (auto & cs : cancel)
0380 cs.emit(ct);
0381 });
0382
0383 this->coro.reset(h.address());
0384 return true;
0385 }
0386
0387 #if _MSC_VER
0388 BOOST_NOINLINE
0389 #endif
0390 auto await_resume()
0391 {
0392 #if !defined(BOOST_COBALT_NO_PMR)
0393 pmr::vector<result_type> res{result.size(), this_thread::get_allocator()};
0394 #else
0395 std::vector<result_type> res(result.size());
0396 #endif
0397
0398 std::transform(
0399 result.begin(), result.end(), res.begin(),
0400 [](result_storage_type & res) -> result_type
0401 {
0402 BOOST_ASSERT(res.index() != 0u);
0403 if (res.index() == 1u)
0404 {
0405 if constexpr (std::is_void_v<typename result_type::value_type>)
0406 return system::in_place_value;
0407 else
0408 return {system::in_place_value, std::move(get<1u>(res))};
0409 }
0410 else
0411 return {system::in_place_error, get<2u>(res)};
0412 });
0413
0414 return res;
0415 }
0416 };
0417 awaitable operator co_await() && {return awaitable{aws};}
0418 };
0419
0420 }
0421
0422 #endif