Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-12-16 09:44:26

0001 //
0002 // Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_COBALT_IMPL_CHANNEL_HPP
0009 #define BOOST_COBALT_IMPL_CHANNEL_HPP
0010 
0011 #include <boost/cobalt/channel.hpp>
0012 #include <boost/cobalt/result.hpp>
0013 
0014 #include <boost/asio/post.hpp>
0015 
0016 namespace boost::cobalt
0017 {
0018 
0019 #if !defined(BOOST_COBALT_NO_PMR)
0020 template<typename T>
0021 inline channel<T>::channel(
0022     std::size_t limit,
0023     executor executor,
0024     pmr::memory_resource * resource)
0025     : buffer_(limit, pmr::polymorphic_allocator<T>(resource)), executor_(executor) {}
0026 #else
0027 template<typename T>
0028 inline channel<T>::channel(
0029     std::size_t limit,
0030     executor executor)
0031     : buffer_(limit), executor_(executor) {}
0032 #endif
0033 
0034 template<typename T>
0035 auto channel<T>::get_executor() -> const executor_type &  {return executor_;}
0036 
0037 template<typename T>
0038 bool channel<T>::is_open() const {return !is_closed_;}
0039 
0040 
0041 template<typename T>
0042 channel<T>::~channel()
0043 {
0044   while (!read_queue_.empty())
0045     read_queue_.front().awaited_from.reset();
0046 
0047   while (!write_queue_.empty())
0048     write_queue_.front().awaited_from.reset();
0049 
0050 }
0051 
0052 template<typename T>
0053 void channel<T>::close()
0054 {
0055   is_closed_ = true;
0056   while (!read_queue_.empty())
0057   {
0058     auto & op = read_queue_.front();
0059     op.unlink();
0060     op.cancelled = true;
0061     op.cancel_slot.clear();
0062 
0063     if (op.awaited_from)
0064       asio::post(executor_, std::move(op.awaited_from));
0065   }
0066   while (!write_queue_.empty())
0067   {
0068     auto & op = write_queue_.front();
0069     op.unlink();
0070     op.cancelled = true;
0071     op.cancel_slot.clear();
0072     if (op.awaited_from)
0073       asio::post(executor_, std::move(op.awaited_from));
0074   }
0075 }
0076 
0077 
0078 template<typename T>
0079 struct channel<T>::read_op::cancel_impl
0080 {
0081   read_op * op;
0082   cancel_impl(read_op * op) : op(op) {}
0083   void operator()(asio::cancellation_type)
0084   {
0085     op->cancelled = true;
0086     op->unlink();
0087     if (op->awaited_from)
0088       asio::post(
0089           op->chn->executor_,
0090           std::move(op->awaited_from));
0091     op->cancel_slot.clear();
0092   }
0093 };
0094 
0095 template<typename T>
0096 template<typename Promise>
0097 std::coroutine_handle<void> channel<T>::read_op::await_suspend(std::coroutine_handle<Promise> h)
0098 {
0099   if constexpr (requires (Promise p) {p.get_cancellation_slot();})
0100     if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
0101       cancel_slot.emplace<cancel_impl>(this);
0102 
0103   if (awaited_from)
0104     boost::throw_exception(std::runtime_error("already-awaited"), loc);
0105   awaited_from.reset(h.address());
0106   // currently nothing to read
0107   if constexpr (requires (Promise p) {p.begin_transaction();})
0108     begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
0109 
0110   if (chn->write_queue_.empty())
0111   {
0112     chn->read_queue_.push_back(*this);
0113     return std::noop_coroutine();
0114   }
0115   else
0116   {
0117     cancel_slot.clear();
0118     auto & op = chn->write_queue_.front();
0119     // transactional_unlink can interrupt or cancel `op` through `race`, so we need to check.
0120     op.direct = true;
0121     if constexpr (std::is_copy_constructible_v<T>)
0122     {
0123       if (op.ref.index() == 0)
0124         direct = std::move(*variant2::get<0>(op.ref));
0125       else
0126         direct = *variant2::get<1>(op.ref);
0127     }
0128     else
0129       direct = std::move(*op.ref);
0130 
0131     op.transactional_unlink();
0132     BOOST_ASSERT(op.awaited_from);
0133     BOOST_ASSERT(awaited_from);
0134 
0135     asio::post(chn->executor_, std::move(awaited_from));
0136     return op.awaited_from.release();
0137   }
0138 }
0139 
0140 
0141 template<typename T>
0142 T channel<T>::read_op::await_resume()
0143 {
0144   return await_resume(as_result_tag{}).value(loc);
0145 }
0146 
0147 template<typename T>
0148 std::tuple<system::error_code, T> channel<T>::read_op::await_resume(const struct as_tuple_tag &)
0149 {
0150   auto res = await_resume(as_result_tag{});
0151 
0152   if (res.has_error())
0153     return {res.error(), T{}};
0154   else
0155     return {system::error_code{}, std::move(*res)};
0156 
0157 }
0158 
0159 template<typename T>
0160 system::result<T> channel<T>::read_op::await_resume(const struct as_result_tag &)
0161 {
0162   if (cancel_slot.is_connected())
0163     cancel_slot.clear();
0164 
0165   if (cancelled)
0166    return {system::in_place_error, asio::error::operation_aborted};
0167 
0168   T value = direct ? std::move(*direct) : std::move(chn->buffer_.front());
0169   if (!direct)
0170     chn->buffer_.pop_front();
0171 
0172   if (!chn->write_queue_.empty())
0173   {
0174     auto &op = chn->write_queue_.front();
0175     BOOST_ASSERT(chn->read_queue_.empty());
0176     if (op.await_ready())
0177     {
0178       op.transactional_unlink();
0179       BOOST_ASSERT(op.awaited_from);
0180       asio::post(chn->executor_, std::move(op.awaited_from));
0181     }
0182   }
0183   return {system::in_place_value, std::move(value)};
0184 }
0185 
0186 template<typename T>
0187 struct channel<T>::write_op::cancel_impl
0188 {
0189   write_op * op;
0190   cancel_impl(write_op * op) : op(op) {}
0191   void operator()(asio::cancellation_type)
0192   {
0193     op->cancelled = true;
0194     op->unlink();
0195     if (op->awaited_from)
0196       asio::post(
0197         op->chn->executor_, std::move(op->awaited_from));
0198     op->cancel_slot.clear();
0199   }
0200 };
0201 
0202 template<typename T>
0203 template<typename Promise>
0204 std::coroutine_handle<void> channel<T>::write_op::await_suspend(std::coroutine_handle<Promise> h)
0205 {
0206   if constexpr (requires (Promise p) {p.get_cancellation_slot();})
0207     if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
0208       cancel_slot.emplace<cancel_impl>(this);
0209 
0210   awaited_from.reset(h.address());
0211   if constexpr (requires (Promise p) {p.begin_transaction();})
0212     begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
0213 
0214   if (chn->read_queue_.empty())
0215   {
0216     chn->write_queue_.push_back(*this);
0217     return std::noop_coroutine();
0218   }
0219   else
0220   {
0221     cancel_slot.clear();
0222     auto & op = chn->read_queue_.front();
0223     if constexpr (std::is_copy_constructible_v<T>)
0224     {
0225       if (ref.index() == 0)
0226         op.direct.emplace(std::move(*variant2::get<0>(ref)));
0227       else
0228         op.direct.emplace(*variant2::get<1>(ref));
0229     }
0230     else
0231       op.direct.emplace(std::move(*ref));
0232 
0233     direct = true;
0234     op.transactional_unlink();
0235 
0236     BOOST_ASSERT(op.awaited_from);
0237     BOOST_ASSERT(awaited_from);
0238     asio::post(chn->executor_, std::move(awaited_from));
0239 
0240     return op.awaited_from.release();
0241   }
0242 }
0243 
0244 template<typename T>
0245 std::tuple<system::error_code> channel<T>::write_op::await_resume(const struct as_tuple_tag &)
0246 {
0247   return await_resume(as_result_tag{}).error();
0248 }
0249 
0250 template<typename T>
0251 void channel<T>::write_op::await_resume()
0252 {
0253   await_resume(as_result_tag{}).value(loc);
0254 }
0255 
0256 template<typename T>
0257 system::result<void>  channel<T>::write_op::await_resume(const struct as_result_tag &)
0258 {
0259   if (cancel_slot.is_connected())
0260     cancel_slot.clear();
0261   if (cancelled)
0262     boost::throw_exception(system::system_error(asio::error::operation_aborted), loc);
0263 
0264   if (!direct)
0265   {
0266     BOOST_ASSERT(!chn->buffer_.full());
0267     if constexpr (std::is_copy_constructible_v<T>)
0268     {
0269       if (ref.index() == 0)
0270         chn->buffer_.push_back(std::move(*variant2::get<0>(ref)));
0271       else
0272         chn->buffer_.push_back(*variant2::get<1>(ref));
0273     }
0274     else
0275       chn->buffer_.push_back(std::move(*ref));
0276   }
0277 
0278   if (!chn->read_queue_.empty())
0279   {
0280     auto & op = chn->read_queue_.front();
0281     BOOST_ASSERT(chn->write_queue_.empty());
0282     if (op.await_ready())
0283     {
0284       op.transactional_unlink();
0285       BOOST_ASSERT(op.awaited_from);
0286       asio::post(chn->executor_, std::move(op.awaited_from));
0287     }
0288   }
0289   return system::in_place_value;
0290 }
0291 
0292 struct channel<void>::read_op::cancel_impl
0293 {
0294   read_op * op;
0295   cancel_impl(read_op * op) : op(op) {}
0296   void operator()(asio::cancellation_type)
0297   {
0298     op->cancelled = true;
0299     op->unlink();
0300     asio::post(op->chn->executor_, std::move(op->awaited_from));
0301     op->cancel_slot.clear();
0302   }
0303 };
0304 
0305 struct channel<void>::write_op::cancel_impl
0306 {
0307   write_op * op;
0308   cancel_impl(write_op * op) : op(op) {}
0309   void operator()(asio::cancellation_type)
0310   {
0311     op->cancelled = true;
0312     op->unlink();
0313     asio::post(op->chn->executor_, std::move(op->awaited_from));
0314     op->cancel_slot.clear();
0315   }
0316 };
0317 
0318 template<typename Promise>
0319 std::coroutine_handle<void> channel<void>::read_op::await_suspend(std::coroutine_handle<Promise> h)
0320 {
0321   if constexpr (requires (Promise p) {p.get_cancellation_slot();})
0322     if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
0323       cancel_slot.emplace<cancel_impl>(this);
0324 
0325   awaited_from.reset(h.address());
0326 
0327   if constexpr (requires (Promise p) {p.begin_transaction();})
0328     begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
0329 
0330   // nothing to read currently, enqueue
0331   if (chn->write_queue_.empty())
0332   {
0333     chn->read_queue_.push_back(*this);
0334     return std::noop_coroutine();
0335   }
0336   else // we're good, we can read, so we'll do that, but we need to post, so we need to initialize a transactin.
0337   {
0338     cancel_slot.clear();
0339     auto & op = chn->write_queue_.front();
0340     op.direct = true;
0341     direct = true;
0342     op.transactional_unlink();
0343 
0344     BOOST_ASSERT(op.awaited_from);
0345     BOOST_ASSERT(awaited_from);
0346     asio::post(chn->executor_, std::move(awaited_from));
0347     return op.awaited_from.release();
0348   }
0349 }
0350 
0351 
0352 template<typename Promise>
0353 std::coroutine_handle<void> channel<void>::write_op::await_suspend(std::coroutine_handle<Promise> h)
0354 {
0355   if constexpr (requires (Promise p) {p.get_cancellation_slot();})
0356     if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
0357       cancel_slot.emplace<cancel_impl>(this);
0358 
0359   awaited_from.reset(h.address());
0360   // currently nothing to read
0361   if constexpr (requires (Promise p) {p.begin_transaction();})
0362     begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
0363 
0364   if (chn->read_queue_.empty())
0365   {
0366     chn->write_queue_.push_back(*this);
0367     return std::noop_coroutine();
0368   }
0369   else
0370   {
0371     cancel_slot.clear();
0372     auto & op = chn->read_queue_.front();
0373     op.direct = true; // let interrupt_await know that we'll be resuming it!
0374     direct = true;
0375     op.transactional_unlink();
0376 
0377     BOOST_ASSERT(op.awaited_from);
0378     BOOST_ASSERT(awaited_from);
0379 
0380     asio::post(chn->executor_, std::move(awaited_from));
0381     return op.awaited_from.release();
0382   }
0383 }
0384 
0385 }
0386 
0387 #endif //BOOST_COBALT_IMPL_CHANNEL_HPP