Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-30 09:34:32

0001 //
0002 // Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_COBALT_IMPL_CHANNEL_HPP
0009 #define BOOST_COBALT_IMPL_CHANNEL_HPP
0010 
0011 #include <boost/cobalt/channel.hpp>
0012 #include <boost/cobalt/result.hpp>
0013 
0014 #include <boost/asio/post.hpp>
0015 
0016 namespace boost::cobalt
0017 {
0018 
0019 #if !defined(BOOST_COBALT_NO_PMR)
0020 template<typename T>
0021 inline channel<T>::channel(
0022     std::size_t limit,
0023     executor executor,
0024     pmr::memory_resource * resource)
0025     : buffer_(limit, resource), executor_(executor) {}
0026 #else
0027 template<typename T>
0028 inline channel<T>::channel(
0029     std::size_t limit,
0030     executor executor)
0031     : buffer_(limit), executor_(executor) {}
0032 #endif
0033 
0034 template<typename T>
0035 auto channel<T>::get_executor() -> const executor_type &  {return executor_;}
0036 
0037 template<typename T>
0038 bool channel<T>::is_open() const {return !is_closed_;}
0039 
0040 
0041 template<typename T>
0042 channel<T>::~channel()
0043 {
0044   while (!read_queue_.empty())
0045     read_queue_.front().awaited_from.reset();
0046 
0047   while (!write_queue_.empty())
0048     write_queue_.front().awaited_from.reset();
0049 
0050 }
0051 
0052 template<typename T>
0053 void channel<T>::close()
0054 {
0055   is_closed_ = true;
0056   while (!read_queue_.empty())
0057   {
0058     auto & op = read_queue_.front();
0059     op.unlink();
0060     op.cancelled = true;
0061     op.cancel_slot.clear();
0062 
0063     if (op.awaited_from)
0064       asio::post(executor_, std::move(op.awaited_from));
0065   }
0066   while (!write_queue_.empty())
0067   {
0068     auto & op = write_queue_.front();
0069     op.unlink();
0070     op.cancelled = true;
0071     op.cancel_slot.clear();
0072     if (op.awaited_from)
0073       asio::post(executor_, std::move(op.awaited_from));
0074   }
0075 }
0076 
0077 
0078 template<typename T>
0079 struct channel<T>::read_op::cancel_impl
0080 {
0081   read_op * op;
0082   cancel_impl(read_op * op) : op(op) {}
0083   void operator()(asio::cancellation_type)
0084   {
0085     op->cancelled = true;
0086     op->unlink();
0087     if (op->awaited_from)
0088       asio::post(
0089           op->chn->executor_,
0090           std::move(op->awaited_from));
0091     op->cancel_slot.clear();
0092   }
0093 };
0094 
0095 template<typename T>
0096 template<typename Promise>
0097 std::coroutine_handle<void> channel<T>::read_op::await_suspend(std::coroutine_handle<Promise> h)
0098 {
0099   if constexpr (requires (Promise p) {p.get_cancellation_slot();})
0100     if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
0101       cancel_slot.emplace<cancel_impl>(this);
0102 
0103   if (awaited_from)
0104     boost::throw_exception(std::runtime_error("already-awaited"), loc);
0105   awaited_from.reset(h.address());
0106   // currently nothing to read
0107   if constexpr (requires (Promise p) {p.begin_transaction();})
0108     begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
0109 
0110   if (chn->write_queue_.empty())
0111   {
0112     chn->read_queue_.push_back(*this);
0113     return std::noop_coroutine();
0114   }
0115   else
0116   {
0117     cancel_slot.clear();
0118     auto & op = chn->write_queue_.front();
0119     op.transactional_unlink();
0120     op.direct = true;
0121     if (op.ref.index() == 0)
0122       direct = std::move(*variant2::get<0>(op.ref));
0123     else
0124       direct = *variant2::get<1>(op.ref);
0125     BOOST_ASSERT(op.awaited_from);
0126     asio::post(chn->executor_, std::move(awaited_from));
0127     return op.awaited_from.release();
0128   }
0129 }
0130 
0131 
0132 template<typename T>
0133 T channel<T>::read_op::await_resume()
0134 {
0135   return await_resume(as_result_tag{}).value(loc);
0136 }
0137 
0138 template<typename T>
0139 std::tuple<system::error_code, T> channel<T>::read_op::await_resume(const struct as_tuple_tag &)
0140 {
0141   auto res = await_resume(as_result_tag{});
0142 
0143   if (res.has_error())
0144     return {res.error(), T{}};
0145   else
0146     return {system::error_code{}, std::move(*res)};
0147 
0148 }
0149 
0150 template<typename T>
0151 system::result<T> channel<T>::read_op::await_resume(const struct as_result_tag &)
0152 {
0153   if (cancel_slot.is_connected())
0154     cancel_slot.clear();
0155 
0156   if (cancelled)
0157    return {system::in_place_error, asio::error::operation_aborted};
0158 
0159   T value = direct ? std::move(*direct) : std::move(chn->buffer_.front());
0160   if (!direct)
0161     chn->buffer_.pop_front();
0162 
0163   if (!chn->write_queue_.empty())
0164   {
0165     auto &op = chn->write_queue_.front();
0166     BOOST_ASSERT(chn->read_queue_.empty());
0167     if (op.await_ready())
0168     {
0169       op.transactional_unlink();
0170       BOOST_ASSERT(op.awaited_from);
0171       asio::post(chn->executor_, std::move(op.awaited_from));
0172     }
0173   }
0174   return {system::in_place_value, value};
0175 }
0176 
0177 template<typename T>
0178 struct channel<T>::write_op::cancel_impl
0179 {
0180   write_op * op;
0181   cancel_impl(write_op * op) : op(op) {}
0182   void operator()(asio::cancellation_type)
0183   {
0184     op->cancelled = true;
0185     op->unlink();
0186     if (op->awaited_from)
0187       asio::post(
0188         op->chn->executor_, std::move(op->awaited_from));
0189     op->cancel_slot.clear();
0190   }
0191 };
0192 
0193 template<typename T>
0194 template<typename Promise>
0195 std::coroutine_handle<void> channel<T>::write_op::await_suspend(std::coroutine_handle<Promise> h)
0196 {
0197   if constexpr (requires (Promise p) {p.get_cancellation_slot();})
0198     if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
0199       cancel_slot.emplace<cancel_impl>(this);
0200 
0201   awaited_from.reset(h.address());
0202   if constexpr (requires (Promise p) {p.begin_transaction();})
0203     begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
0204 
0205   // currently nothing to read
0206   if (chn->read_queue_.empty())
0207   {
0208     chn->write_queue_.push_back(*this);
0209     return std::noop_coroutine();
0210   }
0211   else
0212   {
0213     cancel_slot.clear();
0214     auto & op = chn->read_queue_.front();
0215     op.transactional_unlink();
0216     if (ref.index() == 0)
0217       op.direct = std::move(*variant2::get<0>(ref));
0218     else
0219       op.direct = *variant2::get<1>(ref);
0220 
0221     BOOST_ASSERT(op.awaited_from);
0222     direct = true;
0223     asio::post(chn->executor_, std::move(awaited_from));
0224 
0225     return op.awaited_from.release();
0226   }
0227 }
0228 
0229 template<typename T>
0230 std::tuple<system::error_code> channel<T>::write_op::await_resume(const struct as_tuple_tag &)
0231 {
0232   return await_resume(as_result_tag{}).error();
0233 }
0234 
0235 template<typename T>
0236 void channel<T>::write_op::await_resume()
0237 {
0238   await_resume(as_result_tag{}).value(loc);
0239 }
0240 
0241 template<typename T>
0242 system::result<void>  channel<T>::write_op::await_resume(const struct as_result_tag &)
0243 {
0244   if (cancel_slot.is_connected())
0245     cancel_slot.clear();
0246   if (cancelled)
0247     boost::throw_exception(system::system_error(asio::error::operation_aborted), loc);
0248 
0249 
0250   if (!direct)
0251   {
0252     BOOST_ASSERT(!chn->buffer_.full());
0253     if (ref.index() == 0)
0254       chn->buffer_.push_back(std::move(*variant2::get<0>(ref)));
0255     else
0256       chn->buffer_.push_back(*variant2::get<1>(ref));
0257   }
0258 
0259   if (!chn->read_queue_.empty())
0260   {
0261     auto & op = chn->read_queue_.front();
0262     BOOST_ASSERT(chn->write_queue_.empty());
0263     if (op.await_ready())
0264     {
0265       op.transactional_unlink();
0266       BOOST_ASSERT(op.awaited_from);
0267       asio::post(chn->executor_, std::move(op.awaited_from));
0268     }
0269   }
0270   return system::in_place_value;
0271 }
0272 
0273 struct channel<void>::read_op::cancel_impl
0274 {
0275   read_op * op;
0276   cancel_impl(read_op * op) : op(op) {}
0277   void operator()(asio::cancellation_type)
0278   {
0279     op->cancelled = true;
0280     op->unlink();
0281     asio::post(op->chn->executor_, std::move(op->awaited_from));
0282     op->cancel_slot.clear();
0283   }
0284 };
0285 
0286 struct channel<void>::write_op::cancel_impl
0287 {
0288   write_op * op;
0289   cancel_impl(write_op * op) : op(op) {}
0290   void operator()(asio::cancellation_type)
0291   {
0292     op->cancelled = true;
0293     op->unlink();
0294     asio::post(op->chn->executor_, std::move(op->awaited_from));
0295     op->cancel_slot.clear();
0296   }
0297 };
0298 
0299 template<typename Promise>
0300 std::coroutine_handle<void> channel<void>::read_op::await_suspend(std::coroutine_handle<Promise> h)
0301 {
0302   if constexpr (requires (Promise p) {p.get_cancellation_slot();})
0303     if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
0304       cancel_slot.emplace<cancel_impl>(this);
0305 
0306   awaited_from.reset(h.address());
0307 
0308   if constexpr (requires (Promise p) {p.begin_transaction();})
0309     begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
0310 
0311   // nothing to read currently, enqueue
0312   if (chn->write_queue_.empty())
0313   {
0314     chn->read_queue_.push_back(*this);
0315     return std::noop_coroutine();
0316   }
0317   else // we're good, we can read, so we'll do that, but we need to post, so we need to initialize a transactin.
0318   {
0319     cancel_slot.clear();
0320     auto & op = chn->write_queue_.front();
0321     op.unlink();
0322     op.direct = true;
0323     BOOST_ASSERT(op.awaited_from);
0324     direct = true;
0325     asio::post(chn->executor_, std::move(awaited_from));
0326     return op.awaited_from.release();
0327   }
0328 }
0329 
0330 
0331 template<typename Promise>
0332 std::coroutine_handle<void> channel<void>::write_op::await_suspend(std::coroutine_handle<Promise> h)
0333 {
0334   if constexpr (requires (Promise p) {p.get_cancellation_slot();})
0335     if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
0336       cancel_slot.emplace<cancel_impl>(this);
0337 
0338   awaited_from.reset(h.address());
0339   // currently nothing to read
0340   if constexpr (requires (Promise p) {p.begin_transaction();})
0341     begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
0342 
0343   if (chn->read_queue_.empty())
0344   {
0345     chn->write_queue_.push_back(*this);
0346     return std::noop_coroutine();
0347   }
0348   else
0349   {
0350     cancel_slot.clear();
0351     auto & op = chn->read_queue_.front();
0352     op.unlink();
0353     op.direct = true;
0354     BOOST_ASSERT(op.awaited_from);
0355     direct = true;
0356     asio::post(chn->executor_, std::move(awaited_from));
0357     return op.awaited_from.release();
0358   }
0359 }
0360 
0361 }
0362 
0363 #endif //BOOST_COBALT_IMPL_CHANNEL_HPP