File indexing completed on 2025-12-16 09:44:25
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_COBALT_DETAIL_THREAD_HPP
0009 #define BOOST_COBALT_DETAIL_THREAD_HPP
0010
0011 #include <boost/cobalt/config.hpp>
0012 #include <boost/cobalt/detail/forward_cancellation.hpp>
0013 #include <boost/cobalt/detail/handler.hpp>
0014 #include <boost/cobalt/concepts.hpp>
0015 #include <boost/cobalt/op.hpp>
0016 #include <boost/cobalt/this_coro.hpp>
0017
0018 #include <boost/asio/cancellation_signal.hpp>
0019
0020 #include <thread>
0021
0022 namespace boost::cobalt
0023 {
0024
0025 struct as_tuple_tag;
0026 struct as_result_tag;
0027
0028 namespace detail
0029 {
0030 struct thread_promise;
0031 }
0032
0033 struct thread;
0034
0035 namespace detail
0036 {
0037
0038
0039 struct signal_helper_2
0040 {
0041 asio::cancellation_signal signal;
0042 };
0043
0044
0045 struct thread_state
0046 {
0047 asio::io_context ctx{1u};
0048 asio::cancellation_signal signal;
0049 std::mutex mtx;
0050 std::optional<completion_handler<std::exception_ptr>> waitor;
0051 std::atomic<bool> done = false;
0052 };
0053
0054 struct thread_promise : signal_helper_2,
0055 promise_cancellation_base<asio::cancellation_slot, asio::enable_total_cancellation>,
0056 promise_throw_if_cancelled_base,
0057 enable_awaitables<thread_promise>,
0058 enable_await_allocator<thread_promise>,
0059 enable_await_executor<thread_promise>,
0060 enable_await_deferred
0061 {
0062 BOOST_COBALT_DECL thread_promise();
0063
0064 struct initial_awaitable
0065 {
0066 bool await_ready() const {return false;}
0067 void await_suspend(std::coroutine_handle<thread_promise> h)
0068 {
0069 h.promise().mtx.unlock();
0070 }
0071
0072 void await_resume() {}
0073 };
0074
0075 auto initial_suspend() noexcept
0076 {
0077 return initial_awaitable{};
0078 }
0079 std::suspend_never final_suspend() noexcept
0080 {
0081 wexec_.reset();
0082 return {};
0083 }
0084
0085 #if !defined(BOOST_NO_EXCEPTIONS)
0086 void unhandled_exception() { throw; }
0087 #endif
0088 void return_void() { }
0089
0090 using executor_type = typename cobalt::executor;
0091 const executor_type & get_executor() const {return *exec_;}
0092
0093 #if !defined(BOOST_COBALT_NO_PMR)
0094 using allocator_type = pmr::polymorphic_allocator<void>;
0095 using resource_type = pmr::unsynchronized_pool_resource;
0096
0097 resource_type * resource;
0098 allocator_type get_allocator() const { return allocator_type(resource); }
0099 #endif
0100
0101 using promise_cancellation_base<asio::cancellation_slot, asio::enable_total_cancellation>::await_transform;
0102 using promise_throw_if_cancelled_base::await_transform;
0103 using enable_awaitables<thread_promise>::await_transform;
0104 using enable_await_allocator<thread_promise>::await_transform;
0105 using enable_await_executor<thread_promise>::await_transform;
0106 using enable_await_deferred::await_transform;
0107
0108 BOOST_COBALT_DECL
0109 boost::cobalt::thread get_return_object();
0110
0111 void set_executor(asio::io_context::executor_type exec)
0112 {
0113 wexec_.emplace(exec);
0114 exec_.emplace(exec);
0115 }
0116
0117 std::mutex mtx;
0118 private:
0119
0120 std::optional<asio::executor_work_guard<asio::io_context::executor_type>> wexec_;
0121 std::optional<cobalt::executor> exec_;
0122 };
0123
0124 struct thread_awaitable
0125 {
0126 asio::cancellation_slot cl;
0127 std::optional<std::tuple<std::exception_ptr>> res;
0128 bool await_ready(const boost::source_location & loc = BOOST_CURRENT_LOCATION) const
0129 {
0130 if (state_ == nullptr)
0131 boost::throw_exception(std::invalid_argument("Thread expired"), loc);
0132 std::lock_guard<std::mutex> lock{state_->mtx};
0133 return state_->done;
0134 }
0135
0136 template<typename Promise>
0137 bool await_suspend(std::coroutine_handle<Promise> h)
0138 {
0139 BOOST_ASSERT(state_);
0140
0141 std::lock_guard<std::mutex> lock{state_->mtx};
0142 if (state_->done)
0143 return false;
0144
0145 if constexpr (requires (Promise p) {p.get_cancellation_slot();})
0146 if ((cl = h.promise().get_cancellation_slot()).is_connected())
0147 {
0148 cl.assign(
0149 [st = state_](asio::cancellation_type type)
0150 {
0151 std::lock_guard<std::mutex> lock{st->mtx};
0152 asio::post(st->ctx,
0153 [st, type]
0154 {
0155 BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, __func__));
0156 st->signal.emit(type);
0157 });
0158 });
0159
0160 }
0161
0162 state_->waitor.emplace(h, res);
0163 return true;
0164 }
0165
0166 void await_resume()
0167 {
0168 if (cl.is_connected())
0169 cl.clear();
0170 if (thread_)
0171 thread_->join();
0172 if (!res)
0173 return;
0174 if (auto ee = std::get<0>(*res))
0175 std::rethrow_exception(ee);
0176 }
0177
0178 system::result<void, std::exception_ptr> await_resume(const as_result_tag &)
0179 {
0180 if (cl.is_connected())
0181 cl.clear();
0182 if (thread_)
0183 thread_->join();
0184 if (!res)
0185 return {system::in_place_value};
0186 if (auto ee = std::get<0>(*res))
0187 return {system::in_place_error, std::move(ee)};
0188
0189 return {system::in_place_value};
0190 }
0191
0192 std::tuple<std::exception_ptr> await_resume(const as_tuple_tag &)
0193 {
0194 if (cl.is_connected())
0195 cl.clear();
0196 if (thread_)
0197 thread_->join();
0198
0199 return std::get<0>(*res);
0200 }
0201
0202 explicit thread_awaitable(std::shared_ptr<detail::thread_state> state)
0203 : state_(std::move(state)) {}
0204
0205 explicit thread_awaitable(std::thread thread,
0206 std::shared_ptr<detail::thread_state> state)
0207 : thread_(std::move(thread)), state_(std::move(state)) {}
0208 private:
0209 std::optional<std::thread> thread_;
0210 std::shared_ptr<detail::thread_state> state_;
0211 };
0212 }
0213
0214 }
0215
0216 #endif