File indexing completed on 2025-01-30 09:34:31
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_COBALT_DETAIL_TASK_HPP
0009 #define BOOST_COBALT_DETAIL_TASK_HPP
0010
0011 #include <boost/cobalt/detail/exception.hpp>
0012 #include <boost/cobalt/detail/forward_cancellation.hpp>
0013 #include <boost/cobalt/detail/wrapper.hpp>
0014 #include <boost/cobalt/detail/this_thread.hpp>
0015
0016 #include <boost/asio/bind_allocator.hpp>
0017 #include <boost/asio/cancellation_signal.hpp>
0018
0019
0020 #include <coroutine>
0021 #include <optional>
0022 #include <utility>
0023
0024 namespace boost::cobalt
0025 {
0026
0027 struct as_tuple_tag;
0028 struct as_result_tag;
0029
0030 template<typename Return>
0031 struct task;
0032
0033 namespace detail
0034 {
0035
0036 template<typename T>
0037 struct task_receiver;
0038
0039 template<typename T>
0040 struct task_value_holder
0041 {
0042 std::optional<T> result;
0043 bool result_taken = false;
0044
0045 system::result<T, std::exception_ptr> get_result_value()
0046 {
0047 result_taken = true;
0048 BOOST_ASSERT(result);
0049 return {system::in_place_value, std::move(*result)};
0050 }
0051
0052 void return_value(T && ret)
0053 {
0054 result.emplace(std::move(ret));
0055 static_cast<task_receiver<T>*>(this)->set_done();
0056 }
0057 void return_value(const T & ret)
0058 {
0059 result.emplace(ret);
0060 static_cast<task_receiver<T>*>(this)->set_done();
0061 }
0062 };
0063
0064 template<>
0065 struct task_value_holder<void>
0066 {
0067 bool result_taken = false;
0068 system::result<void, std::exception_ptr> get_result_value()
0069 {
0070 result_taken = true;
0071 return {system::in_place_value};
0072 }
0073
0074 inline void return_void();
0075 };
0076
0077
0078 template<typename T>
0079 struct task_promise;
0080
0081 template<typename T>
0082 struct task_receiver : task_value_holder<T>
0083 {
0084 std::exception_ptr exception;
0085 system::result<T, std::exception_ptr> get_result()
0086 {
0087 if (exception && !done)
0088 return {system::in_place_error, std::exchange(exception, nullptr)};
0089 else if (exception)
0090 {
0091 this->result_taken = true;
0092 return {system::in_place_error, exception};
0093 }
0094 return this->get_result_value();
0095 }
0096
0097 void unhandled_exception()
0098 {
0099 exception = std::current_exception();
0100 set_done();
0101 }
0102
0103 bool done = false;
0104 unique_handle<void> awaited_from{nullptr};
0105
0106 void set_done()
0107 {
0108 done = true;
0109 }
0110
0111 task_receiver() = default;
0112 task_receiver(task_receiver && lhs)
0113 : task_value_holder<T>(std::move(lhs)),
0114 exception(std::move(lhs.exception)), done(lhs.done), awaited_from(std::move(lhs.awaited_from)),
0115 promise(lhs.promise)
0116 {
0117 if (!done && !exception)
0118 {
0119 promise->receiver = this;
0120 lhs.exception = moved_from_exception();
0121 }
0122
0123 lhs.done = true;
0124 }
0125
0126 ~task_receiver()
0127 {
0128 if (!done && promise && promise->receiver == this)
0129 {
0130 promise->receiver = nullptr;
0131 if (!promise->started)
0132 std::coroutine_handle<task_promise<T>>::from_promise(*promise).destroy();
0133 }
0134 }
0135
0136 task_receiver(task_promise<T> * promise)
0137 : promise(promise)
0138 {
0139 promise->receiver = this;
0140 }
0141
0142 struct awaitable
0143 {
0144 task_receiver * self;
0145 asio::cancellation_slot cl;
0146 awaitable(task_receiver * self) : self(self)
0147 {
0148 }
0149
0150 awaitable(awaitable && aw) : self(aw.self)
0151 {
0152 }
0153
0154 ~awaitable ()
0155 {
0156 }
0157
0158 bool await_ready() const { return self->done; }
0159
0160 template<typename Promise>
0161 BOOST_NOINLINE std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h)
0162 {
0163 if (self->done)
0164 return std::coroutine_handle<void>::from_address(h.address());
0165
0166 if constexpr (requires (Promise p) {p.get_cancellation_slot();})
0167 if ((cl = h.promise().get_cancellation_slot()).is_connected())
0168 cl.emplace<forward_cancellation>(self->promise->signal);
0169
0170
0171 if constexpr (requires (Promise p) {p.get_executor();})
0172 self->promise->exec.emplace(h.promise().get_executor());
0173 else
0174 self->promise->exec.emplace(this_thread::get_executor());
0175 self->promise->exec_ = self->promise->exec->get_executor();
0176 self->awaited_from.reset(h.address());
0177
0178 return std::coroutine_handle<task_promise<T>>::from_promise(*self->promise);
0179 }
0180
0181 T await_resume(const boost::source_location & loc = BOOST_CURRENT_LOCATION)
0182 {
0183 if (cl.is_connected())
0184 cl.clear();
0185
0186 return self->get_result().value(loc);
0187 }
0188
0189 system::result<T, std::exception_ptr> await_resume(const as_result_tag &)
0190 {
0191 if (cl.is_connected())
0192 cl.clear();
0193 return self->get_result();
0194 }
0195
0196 auto await_resume(const as_tuple_tag &)
0197 {
0198 if (cl.is_connected())
0199 cl.clear();
0200 auto res = self->get_result();
0201 if constexpr (std::is_void_v<T>)
0202 return res.error();
0203 else
0204 {
0205 if (res.has_error())
0206 return std::make_tuple(res.error(), T{});
0207 else
0208 return std::make_tuple(std::exception_ptr(), std::move(*res));
0209 }
0210
0211 }
0212
0213 void interrupt_await() &
0214 {
0215 if (!self)
0216 return ;
0217 self->exception = detached_exception();
0218 if (self->awaited_from)
0219 self->awaited_from.release().resume();
0220 }
0221 };
0222
0223 task_promise<T> * promise;
0224
0225 awaitable get_awaitable() {return awaitable{this};}
0226
0227
0228 void interrupt_await() &
0229 {
0230 exception = detached_exception();
0231 awaited_from.release().resume();
0232 }
0233 };
0234
0235 inline void task_value_holder<void>::return_void()
0236 {
0237 static_cast<task_receiver<void>*>(this)->set_done();
0238 }
0239
0240 template<typename Return>
0241 struct task_promise_result
0242 {
0243 task_receiver<Return>* receiver{nullptr};
0244 void return_value(Return && ret)
0245 {
0246 if(receiver)
0247 receiver->return_value(std::move(ret));
0248 }
0249 void return_value(const Return & ret)
0250 {
0251 if(receiver)
0252 receiver->return_value(ret);
0253 }
0254 };
0255
0256 template<>
0257 struct task_promise_result<void>
0258 {
0259 task_receiver<void>* receiver{nullptr};
0260 void return_void()
0261 {
0262 if(receiver)
0263 receiver->return_void();
0264 }
0265 };
0266
0267 struct async_initiate_spawn;
0268
0269 template<typename Return>
0270 struct task_promise
0271 : promise_memory_resource_base,
0272 promise_cancellation_base<asio::cancellation_slot, asio::enable_total_cancellation>,
0273 promise_throw_if_cancelled_base,
0274 enable_awaitables<task_promise<Return>>,
0275 enable_await_allocator<task_promise<Return>>,
0276 enable_await_executor<task_promise<Return>>,
0277 task_promise_result<Return>
0278 {
0279 using promise_cancellation_base<asio::cancellation_slot, asio::enable_total_cancellation>::await_transform;
0280 using promise_throw_if_cancelled_base::await_transform;
0281 using enable_awaitables<task_promise<Return>>::await_transform;
0282 using enable_await_allocator<task_promise<Return>>::await_transform;
0283 using enable_await_executor<task_promise<Return>>::await_transform;
0284
0285 [[nodiscard]] task<Return> get_return_object()
0286 {
0287 return task<Return>{this};
0288 }
0289
0290 mutable asio::cancellation_signal signal;
0291
0292 using executor_type = executor;
0293 std::optional<asio::executor_work_guard<executor_type>> exec;
0294 std::optional<executor_type> exec_;
0295 const executor_type & get_executor() const
0296 {
0297 if (!exec)
0298 throw_exception(asio::bad_executor());
0299 BOOST_ASSERT(exec_);
0300 return *exec_;
0301 }
0302
0303 template<typename ... Args>
0304 task_promise(Args & ...args)
0305 #if !defined(BOOST_COBALT_NO_PMR)
0306 : promise_memory_resource_base(detail::get_memory_resource_from_args_global(args...))
0307 #endif
0308 {
0309 this->reset_cancellation_source(signal.slot());
0310 }
0311
0312 struct initial_awaitable
0313 {
0314 task_promise * promise;
0315
0316 bool await_ready() const noexcept {return false;}
0317 void await_suspend(std::coroutine_handle<>) {}
0318
0319 void await_resume()
0320 {
0321 promise->started = true;
0322 }
0323 };
0324
0325 auto initial_suspend()
0326 {
0327
0328 return initial_awaitable{this};
0329 }
0330
0331 struct final_awaitable
0332 {
0333 task_promise * promise;
0334 bool await_ready() const noexcept
0335 {
0336 return promise->receiver && promise->receiver->awaited_from.get() == nullptr;
0337 }
0338
0339 BOOST_NOINLINE
0340 auto await_suspend(std::coroutine_handle<task_promise> h) noexcept
0341 {
0342 std::coroutine_handle<void> res = std::noop_coroutine();
0343 if (promise->receiver && promise->receiver->awaited_from.get() != nullptr)
0344 res = promise->receiver->awaited_from.release();
0345
0346
0347 if (auto & rec = h.promise().receiver; rec != nullptr)
0348 {
0349 if (!rec->done && !rec->exception)
0350 rec->exception = completed_unexpected();
0351 rec->set_done();
0352 rec->awaited_from.reset(nullptr);
0353 rec = nullptr;
0354 }
0355 detail::self_destroy(h);
0356 return res;
0357 }
0358
0359 void await_resume() noexcept
0360 {
0361 }
0362 };
0363
0364 auto final_suspend() noexcept
0365 {
0366 return final_awaitable{this};
0367 }
0368
0369 void unhandled_exception()
0370 {
0371 if (this->receiver)
0372 this->receiver->unhandled_exception();
0373 else
0374 throw ;
0375 }
0376
0377 ~task_promise()
0378 {
0379 if (this->receiver)
0380 {
0381 if (!this->receiver->done && !this->receiver->exception)
0382 this->receiver->exception = completed_unexpected();
0383 this->receiver->set_done();
0384 this->receiver->awaited_from.reset(nullptr);
0385 }
0386 }
0387 bool started = false;
0388 friend struct async_initiate;
0389 };
0390
0391 }
0392
0393 }
0394
0395 #endif