Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-12-16 09:44:25

0001 //
0002 // Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_COBALT_DETAIL_THREAD_HPP
0009 #define BOOST_COBALT_DETAIL_THREAD_HPP
0010 
0011 #include <boost/cobalt/config.hpp>
0012 #include <boost/cobalt/detail/forward_cancellation.hpp>
0013 #include <boost/cobalt/detail/handler.hpp>
0014 #include <boost/cobalt/concepts.hpp>
0015 #include <boost/cobalt/op.hpp>
0016 #include <boost/cobalt/this_coro.hpp>
0017 
0018 #include <boost/asio/cancellation_signal.hpp>
0019 
0020 #include <thread>
0021 
0022 namespace boost::cobalt
0023 {
0024 
0025 struct as_tuple_tag;
0026 struct as_result_tag;
0027 
0028 namespace detail
0029 {
0030 struct thread_promise;
0031 }
0032 
0033 struct thread;
0034 
0035 namespace detail
0036 {
0037 
0038 
0039 struct signal_helper_2
0040 {
0041   asio::cancellation_signal signal;
0042 };
0043 
0044 
0045 struct thread_state
0046 {
0047   asio::io_context ctx{1u};
0048   asio::cancellation_signal signal;
0049   std::mutex mtx;
0050   std::optional<completion_handler<std::exception_ptr>> waitor;
0051   std::atomic<bool> done = false;
0052 };
0053 
0054 struct thread_promise : signal_helper_2,
0055                         promise_cancellation_base<asio::cancellation_slot, asio::enable_total_cancellation>,
0056                         promise_throw_if_cancelled_base,
0057                         enable_awaitables<thread_promise>,
0058                         enable_await_allocator<thread_promise>,
0059                         enable_await_executor<thread_promise>,
0060                         enable_await_deferred
0061 {
0062   BOOST_COBALT_DECL thread_promise();
0063 
0064   struct initial_awaitable
0065   {
0066     bool await_ready() const {return false;}
0067     void await_suspend(std::coroutine_handle<thread_promise> h)
0068     {
0069       h.promise().mtx.unlock();
0070     }
0071 
0072     void await_resume() {}
0073   };
0074 
0075   auto initial_suspend() noexcept
0076   {
0077     return initial_awaitable{};
0078   }
0079   std::suspend_never final_suspend() noexcept
0080   {
0081     wexec_.reset();
0082     return {};
0083   }
0084 
0085 #if !defined(BOOST_NO_EXCEPTIONS)
0086   void unhandled_exception() { throw; }
0087 #endif
0088   void return_void() { }
0089 
0090   using executor_type = typename cobalt::executor;
0091   const executor_type & get_executor() const {return *exec_;}
0092 
0093 #if !defined(BOOST_COBALT_NO_PMR)
0094   using allocator_type = pmr::polymorphic_allocator<void>;
0095   using resource_type  = pmr::unsynchronized_pool_resource;
0096 
0097   resource_type * resource;
0098   allocator_type  get_allocator() const { return allocator_type(resource); }
0099 #endif
0100 
0101   using promise_cancellation_base<asio::cancellation_slot, asio::enable_total_cancellation>::await_transform;
0102   using promise_throw_if_cancelled_base::await_transform;
0103   using enable_awaitables<thread_promise>::await_transform;
0104   using enable_await_allocator<thread_promise>::await_transform;
0105   using enable_await_executor<thread_promise>::await_transform;
0106   using enable_await_deferred::await_transform;
0107 
0108   BOOST_COBALT_DECL
0109   boost::cobalt::thread get_return_object();
0110 
0111   void set_executor(asio::io_context::executor_type exec)
0112   {
0113     wexec_.emplace(exec);
0114     exec_.emplace(exec);
0115   }
0116 
0117   std::mutex mtx;
0118  private:
0119 
0120   std::optional<asio::executor_work_guard<asio::io_context::executor_type>> wexec_;
0121   std::optional<cobalt::executor> exec_;
0122 };
0123 
0124 struct thread_awaitable
0125 {
0126   asio::cancellation_slot cl;
0127   std::optional<std::tuple<std::exception_ptr>> res;
0128   bool await_ready(const boost::source_location & loc = BOOST_CURRENT_LOCATION) const
0129   {
0130     if (state_ == nullptr)
0131       boost::throw_exception(std::invalid_argument("Thread expired"), loc);
0132     std::lock_guard<std::mutex> lock{state_->mtx};
0133     return state_->done;
0134   }
0135 
0136   template<typename Promise>
0137   bool await_suspend(std::coroutine_handle<Promise> h)
0138   {
0139     BOOST_ASSERT(state_);
0140 
0141     std::lock_guard<std::mutex> lock{state_->mtx};
0142     if (state_->done)
0143       return false;
0144 
0145     if constexpr (requires (Promise p) {p.get_cancellation_slot();})
0146       if ((cl = h.promise().get_cancellation_slot()).is_connected())
0147       {
0148         cl.assign(
0149             [st = state_](asio::cancellation_type type)
0150             {
0151               std::lock_guard<std::mutex> lock{st->mtx};
0152               asio::post(st->ctx,
0153                          [st, type]
0154                          {
0155                             BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, __func__));
0156                             st->signal.emit(type);
0157                          });
0158             });
0159 
0160       }
0161 
0162     state_->waitor.emplace(h, res);
0163     return true;
0164   }
0165 
0166   void await_resume()
0167   {
0168     if (cl.is_connected())
0169       cl.clear();
0170     if (thread_)
0171       thread_->join();
0172     if (!res) // await_ready
0173       return;
0174     if (auto ee = std::get<0>(*res))
0175       std::rethrow_exception(ee);
0176   }
0177 
0178   system::result<void, std::exception_ptr> await_resume(const as_result_tag &)
0179   {
0180     if (cl.is_connected())
0181       cl.clear();
0182     if (thread_)
0183       thread_->join();
0184     if (!res) // await_ready
0185       return {system::in_place_value};
0186     if (auto ee = std::get<0>(*res))
0187       return {system::in_place_error, std::move(ee)};
0188 
0189     return {system::in_place_value};
0190   }
0191 
0192   std::tuple<std::exception_ptr> await_resume(const as_tuple_tag &)
0193   {
0194     if (cl.is_connected())
0195       cl.clear();
0196     if (thread_)
0197       thread_->join();
0198 
0199     return std::get<0>(*res);
0200   }
0201 
0202   explicit thread_awaitable(std::shared_ptr<detail::thread_state> state)
0203       : state_(std::move(state)) {}
0204 
0205   explicit thread_awaitable(std::thread thread,
0206                             std::shared_ptr<detail::thread_state> state)
0207       : thread_(std::move(thread)), state_(std::move(state)) {}
0208  private:
0209   std::optional<std::thread> thread_;
0210   std::shared_ptr<detail::thread_state> state_;
0211 };
0212 }
0213 
0214 }
0215 
0216 #endif //BOOST_COBALT_DETAIL_THREAD_HPP