File indexing completed on 2025-01-30 09:34:31
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_COBALT_DETAIL_PROMISE_HPP
0009 #define BOOST_COBALT_DETAIL_PROMISE_HPP
0010
0011 #include <boost/cobalt/detail/exception.hpp>
0012 #include <boost/cobalt/detail/forward_cancellation.hpp>
0013 #include <boost/cobalt/detail/wrapper.hpp>
0014 #include <boost/cobalt/detail/this_thread.hpp>
0015 #include <boost/cobalt/unique_handle.hpp>
0016
0017 #include <boost/asio/cancellation_signal.hpp>
0018
0019
0020
0021
0022 #include <boost/core/exchange.hpp>
0023
0024 #include <coroutine>
0025 #include <optional>
0026 #include <utility>
0027 #include <boost/asio/bind_allocator.hpp>
0028
0029 namespace boost::cobalt
0030 {
0031
0032 struct as_tuple_tag;
0033 struct as_result_tag;
0034
0035 template<typename Return>
0036 struct promise;
0037
0038 namespace detail
0039 {
0040
0041 template<typename T>
0042 struct promise_receiver;
0043
0044 template<typename T>
0045 struct promise_value_holder
0046 {
0047 std::optional<T> result;
0048 bool result_taken = false;
0049
0050 system::result<T, std::exception_ptr> get_result_value()
0051 {
0052 result_taken = true;
0053 BOOST_ASSERT(result);
0054 return {system::in_place_value, std::move(*result)};
0055 }
0056
0057 void return_value(T && ret)
0058 {
0059 result.emplace(std::move(ret));
0060 static_cast<promise_receiver<T>*>(this)->set_done();
0061 }
0062
0063 void return_value(const T & ret)
0064 {
0065 result.emplace(ret);
0066 static_cast<promise_receiver<T>*>(this)->set_done();
0067 }
0068
0069 };
0070
0071 template<>
0072 struct promise_value_holder<void>
0073 {
0074 bool result_taken = false;
0075 system::result<void, std::exception_ptr> get_result_value()
0076 {
0077 result_taken = true;
0078 return {system::in_place_value};
0079 }
0080
0081 inline void return_void();
0082 };
0083
0084
0085
0086 template<typename T>
0087 struct promise_receiver : promise_value_holder<T>
0088 {
0089 std::exception_ptr exception;
0090 system::result<T, std::exception_ptr> get_result()
0091 {
0092 if (exception && !done)
0093 return {system::in_place_error, std::exchange(exception, nullptr)};
0094 else if (exception)
0095 {
0096 this->result_taken = true;
0097 return {system::in_place_error, exception};
0098 }
0099 return this->get_result_value();
0100 }
0101 void unhandled_exception()
0102 {
0103 exception = std::current_exception();
0104 set_done();
0105 }
0106
0107 bool done = false;
0108 unique_handle<void> awaited_from{nullptr};
0109
0110 void set_done()
0111 {
0112 done = true;
0113 }
0114
0115 promise_receiver() = default;
0116 promise_receiver(promise_receiver && lhs) noexcept
0117 : promise_value_holder<T>(std::move(lhs)),
0118 exception(std::move(lhs.exception)), done(lhs.done), awaited_from(std::move(lhs.awaited_from)),
0119 reference(lhs.reference), cancel_signal(lhs.cancel_signal)
0120 {
0121 if (!done && !exception)
0122 {
0123 reference = this;
0124 lhs.exception = moved_from_exception();
0125 }
0126
0127 lhs.done = true;
0128
0129 }
0130
0131 ~promise_receiver()
0132 {
0133 if (!done && reference == this)
0134 reference = nullptr;
0135 }
0136
0137 promise_receiver(promise_receiver * &reference, asio::cancellation_signal & cancel_signal)
0138 : reference(reference), cancel_signal(cancel_signal)
0139 {
0140 reference = this;
0141 }
0142
0143 struct awaitable
0144 {
0145 promise_receiver * self;
0146 std::exception_ptr ex;
0147 asio::cancellation_slot cl;
0148
0149 awaitable(promise_receiver * self) : self(self)
0150 {
0151 }
0152
0153 awaitable(awaitable && aw) : self(aw.self)
0154 {
0155 }
0156
0157 ~awaitable ()
0158 {
0159 }
0160 bool await_ready() const { return self->done; }
0161
0162 template<typename Promise>
0163 bool await_suspend(std::coroutine_handle<Promise> h)
0164 {
0165 if (self->done)
0166 return false;
0167
0168 if (ex)
0169 return false;
0170
0171 if (self->awaited_from != nullptr)
0172 {
0173 ex = already_awaited();
0174 return false;
0175 }
0176
0177 if constexpr (requires (Promise p) {p.get_cancellation_slot();})
0178 if ((cl = h.promise().get_cancellation_slot()).is_connected())
0179 cl.emplace<forward_cancellation>(self->cancel_signal);
0180
0181 self->awaited_from.reset(h.address());
0182 return true;
0183 }
0184
0185 T await_resume(const boost::source_location & loc = BOOST_CURRENT_LOCATION)
0186 {
0187 if (cl.is_connected())
0188 cl.clear();
0189 if (ex)
0190 std::rethrow_exception(ex);
0191 return self->get_result().value(loc);
0192 }
0193
0194 system::result<T, std::exception_ptr> await_resume(const as_result_tag &)
0195 {
0196 if (cl.is_connected())
0197 cl.clear();
0198 if (ex)
0199 return {system::in_place_error, std::move(ex)};
0200 return self->get_result();
0201 }
0202
0203 auto await_resume(const as_tuple_tag &)
0204 {
0205 if (cl.is_connected())
0206 cl.clear();
0207
0208 if constexpr (std::is_void_v<T>)
0209 {
0210 if (ex)
0211 return std::move(ex);
0212 return self->get_result().error();
0213 }
0214 else
0215 {
0216 if (ex)
0217 return std::make_tuple(std::move(ex), T{});
0218 auto res = self->get_result();
0219 if (res.has_error())
0220 return std::make_tuple(res.error(), T{});
0221 else
0222 return std::make_tuple(std::exception_ptr(), std::move(*res));
0223 }
0224 }
0225 void interrupt_await() &
0226 {
0227 if (!self)
0228 return ;
0229 ex = detached_exception();
0230 if (self->awaited_from)
0231 self->awaited_from.release().resume();
0232 }
0233 };
0234
0235 promise_receiver * &reference;
0236 asio::cancellation_signal & cancel_signal;
0237
0238 awaitable get_awaitable() {return awaitable{this};}
0239
0240
0241 void interrupt_await() &
0242 {
0243 exception = detached_exception();
0244 awaited_from.release().resume();
0245 }
0246 };
0247
0248 inline void promise_value_holder<void>::return_void()
0249 {
0250 static_cast<promise_receiver<void>*>(this)->set_done();
0251 }
0252
0253 template<typename Return>
0254 struct cobalt_promise_result
0255 {
0256 promise_receiver<Return>* receiver{nullptr};
0257 void return_value(Return && ret)
0258 {
0259 if(receiver)
0260 receiver->return_value(std::move(ret));
0261 }
0262
0263 void return_value(const Return & ret)
0264 {
0265 if(receiver)
0266 receiver->return_value(ret);
0267 }
0268
0269 };
0270
0271 template<>
0272 struct cobalt_promise_result<void>
0273 {
0274 promise_receiver<void>* receiver{nullptr};
0275 void return_void()
0276 {
0277 if(receiver)
0278 receiver->return_void();
0279 }
0280 };
0281
0282 template<typename Return>
0283 struct cobalt_promise
0284 : promise_memory_resource_base,
0285 promise_cancellation_base<asio::cancellation_slot, asio::enable_total_cancellation>,
0286 promise_throw_if_cancelled_base,
0287 enable_awaitables<cobalt_promise<Return>>,
0288 enable_await_allocator<cobalt_promise<Return>>,
0289 enable_await_executor<cobalt_promise<Return>>,
0290 cobalt_promise_result<Return>
0291 {
0292 using promise_cancellation_base<asio::cancellation_slot, asio::enable_total_cancellation>::await_transform;
0293 using promise_throw_if_cancelled_base::await_transform;
0294 using enable_awaitables<cobalt_promise<Return>>::await_transform;
0295 using enable_await_allocator<cobalt_promise<Return>>::await_transform;
0296 using enable_await_executor<cobalt_promise<Return>>::await_transform;
0297
0298 [[nodiscard]] promise<Return> get_return_object()
0299 {
0300 return promise<Return>{this};
0301 }
0302
0303 mutable asio::cancellation_signal signal;
0304
0305 using executor_type = executor;
0306 executor_type exec;
0307 const executor_type & get_executor() const {return exec;}
0308
0309 template<typename ... Args>
0310 cobalt_promise(Args & ...args)
0311 :
0312 #if !defined(BOOST_COBALT_NO_PMR)
0313 promise_memory_resource_base(detail::get_memory_resource_from_args(args...)),
0314 #endif
0315 exec{detail::get_executor_from_args(args...)}
0316 {
0317 this->reset_cancellation_source(signal.slot());
0318 }
0319
0320 std::suspend_never initial_suspend() {return {};}
0321 auto final_suspend() noexcept
0322 {
0323 return final_awaitable{this};
0324 }
0325
0326 void unhandled_exception()
0327 {
0328 if (this->receiver)
0329 this->receiver->unhandled_exception();
0330 else
0331 throw ;
0332 }
0333
0334 ~cobalt_promise()
0335 {
0336 if (this->receiver)
0337 {
0338 if (!this->receiver->done && !this->receiver->exception)
0339 this->receiver->exception = completed_unexpected();
0340 this->receiver->set_done();
0341 this->receiver->awaited_from.reset(nullptr);
0342 }
0343
0344 }
0345 private:
0346 struct final_awaitable
0347 {
0348 cobalt_promise * promise;
0349 bool await_ready() const noexcept
0350 {
0351 return promise->receiver && promise->receiver->awaited_from.get() == nullptr;
0352 }
0353
0354 std::coroutine_handle<void> await_suspend(std::coroutine_handle<cobalt_promise> h) noexcept
0355 {
0356 std::coroutine_handle<void> res = std::noop_coroutine();
0357 if (promise->receiver && promise->receiver->awaited_from.get() != nullptr)
0358 res = promise->receiver->awaited_from.release();
0359
0360
0361 if (auto &rec = h.promise().receiver; rec != nullptr)
0362 {
0363 if (!rec->done && !rec->exception)
0364 rec->exception = completed_unexpected();
0365 rec->set_done();
0366 rec->awaited_from.reset(nullptr);
0367 rec = nullptr;
0368 }
0369 detail::self_destroy(h);
0370 return res;
0371 }
0372
0373 void await_resume() noexcept
0374 {
0375 }
0376 };
0377
0378
0379 };
0380
0381 }
0382
0383 }
0384
0385 #endif