File indexing completed on 2025-01-30 09:34:32
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_COBALT_IMPL_CHANNEL_HPP
0009 #define BOOST_COBALT_IMPL_CHANNEL_HPP
0010
0011 #include <boost/cobalt/channel.hpp>
0012 #include <boost/cobalt/result.hpp>
0013
0014 #include <boost/asio/post.hpp>
0015
0016 namespace boost::cobalt
0017 {
0018
0019 #if !defined(BOOST_COBALT_NO_PMR)
0020 template<typename T>
0021 inline channel<T>::channel(
0022 std::size_t limit,
0023 executor executor,
0024 pmr::memory_resource * resource)
0025 : buffer_(limit, resource), executor_(executor) {}
0026 #else
0027 template<typename T>
0028 inline channel<T>::channel(
0029 std::size_t limit,
0030 executor executor)
0031 : buffer_(limit), executor_(executor) {}
0032 #endif
0033
0034 template<typename T>
0035 auto channel<T>::get_executor() -> const executor_type & {return executor_;}
0036
0037 template<typename T>
0038 bool channel<T>::is_open() const {return !is_closed_;}
0039
0040
0041 template<typename T>
0042 channel<T>::~channel()
0043 {
0044 while (!read_queue_.empty())
0045 read_queue_.front().awaited_from.reset();
0046
0047 while (!write_queue_.empty())
0048 write_queue_.front().awaited_from.reset();
0049
0050 }
0051
0052 template<typename T>
0053 void channel<T>::close()
0054 {
0055 is_closed_ = true;
0056 while (!read_queue_.empty())
0057 {
0058 auto & op = read_queue_.front();
0059 op.unlink();
0060 op.cancelled = true;
0061 op.cancel_slot.clear();
0062
0063 if (op.awaited_from)
0064 asio::post(executor_, std::move(op.awaited_from));
0065 }
0066 while (!write_queue_.empty())
0067 {
0068 auto & op = write_queue_.front();
0069 op.unlink();
0070 op.cancelled = true;
0071 op.cancel_slot.clear();
0072 if (op.awaited_from)
0073 asio::post(executor_, std::move(op.awaited_from));
0074 }
0075 }
0076
0077
0078 template<typename T>
0079 struct channel<T>::read_op::cancel_impl
0080 {
0081 read_op * op;
0082 cancel_impl(read_op * op) : op(op) {}
0083 void operator()(asio::cancellation_type)
0084 {
0085 op->cancelled = true;
0086 op->unlink();
0087 if (op->awaited_from)
0088 asio::post(
0089 op->chn->executor_,
0090 std::move(op->awaited_from));
0091 op->cancel_slot.clear();
0092 }
0093 };
0094
0095 template<typename T>
0096 template<typename Promise>
0097 std::coroutine_handle<void> channel<T>::read_op::await_suspend(std::coroutine_handle<Promise> h)
0098 {
0099 if constexpr (requires (Promise p) {p.get_cancellation_slot();})
0100 if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
0101 cancel_slot.emplace<cancel_impl>(this);
0102
0103 if (awaited_from)
0104 boost::throw_exception(std::runtime_error("already-awaited"), loc);
0105 awaited_from.reset(h.address());
0106
0107 if constexpr (requires (Promise p) {p.begin_transaction();})
0108 begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
0109
0110 if (chn->write_queue_.empty())
0111 {
0112 chn->read_queue_.push_back(*this);
0113 return std::noop_coroutine();
0114 }
0115 else
0116 {
0117 cancel_slot.clear();
0118 auto & op = chn->write_queue_.front();
0119 op.transactional_unlink();
0120 op.direct = true;
0121 if (op.ref.index() == 0)
0122 direct = std::move(*variant2::get<0>(op.ref));
0123 else
0124 direct = *variant2::get<1>(op.ref);
0125 BOOST_ASSERT(op.awaited_from);
0126 asio::post(chn->executor_, std::move(awaited_from));
0127 return op.awaited_from.release();
0128 }
0129 }
0130
0131
0132 template<typename T>
0133 T channel<T>::read_op::await_resume()
0134 {
0135 return await_resume(as_result_tag{}).value(loc);
0136 }
0137
0138 template<typename T>
0139 std::tuple<system::error_code, T> channel<T>::read_op::await_resume(const struct as_tuple_tag &)
0140 {
0141 auto res = await_resume(as_result_tag{});
0142
0143 if (res.has_error())
0144 return {res.error(), T{}};
0145 else
0146 return {system::error_code{}, std::move(*res)};
0147
0148 }
0149
0150 template<typename T>
0151 system::result<T> channel<T>::read_op::await_resume(const struct as_result_tag &)
0152 {
0153 if (cancel_slot.is_connected())
0154 cancel_slot.clear();
0155
0156 if (cancelled)
0157 return {system::in_place_error, asio::error::operation_aborted};
0158
0159 T value = direct ? std::move(*direct) : std::move(chn->buffer_.front());
0160 if (!direct)
0161 chn->buffer_.pop_front();
0162
0163 if (!chn->write_queue_.empty())
0164 {
0165 auto &op = chn->write_queue_.front();
0166 BOOST_ASSERT(chn->read_queue_.empty());
0167 if (op.await_ready())
0168 {
0169 op.transactional_unlink();
0170 BOOST_ASSERT(op.awaited_from);
0171 asio::post(chn->executor_, std::move(op.awaited_from));
0172 }
0173 }
0174 return {system::in_place_value, value};
0175 }
0176
0177 template<typename T>
0178 struct channel<T>::write_op::cancel_impl
0179 {
0180 write_op * op;
0181 cancel_impl(write_op * op) : op(op) {}
0182 void operator()(asio::cancellation_type)
0183 {
0184 op->cancelled = true;
0185 op->unlink();
0186 if (op->awaited_from)
0187 asio::post(
0188 op->chn->executor_, std::move(op->awaited_from));
0189 op->cancel_slot.clear();
0190 }
0191 };
0192
0193 template<typename T>
0194 template<typename Promise>
0195 std::coroutine_handle<void> channel<T>::write_op::await_suspend(std::coroutine_handle<Promise> h)
0196 {
0197 if constexpr (requires (Promise p) {p.get_cancellation_slot();})
0198 if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
0199 cancel_slot.emplace<cancel_impl>(this);
0200
0201 awaited_from.reset(h.address());
0202 if constexpr (requires (Promise p) {p.begin_transaction();})
0203 begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
0204
0205
0206 if (chn->read_queue_.empty())
0207 {
0208 chn->write_queue_.push_back(*this);
0209 return std::noop_coroutine();
0210 }
0211 else
0212 {
0213 cancel_slot.clear();
0214 auto & op = chn->read_queue_.front();
0215 op.transactional_unlink();
0216 if (ref.index() == 0)
0217 op.direct = std::move(*variant2::get<0>(ref));
0218 else
0219 op.direct = *variant2::get<1>(ref);
0220
0221 BOOST_ASSERT(op.awaited_from);
0222 direct = true;
0223 asio::post(chn->executor_, std::move(awaited_from));
0224
0225 return op.awaited_from.release();
0226 }
0227 }
0228
0229 template<typename T>
0230 std::tuple<system::error_code> channel<T>::write_op::await_resume(const struct as_tuple_tag &)
0231 {
0232 return await_resume(as_result_tag{}).error();
0233 }
0234
0235 template<typename T>
0236 void channel<T>::write_op::await_resume()
0237 {
0238 await_resume(as_result_tag{}).value(loc);
0239 }
0240
0241 template<typename T>
0242 system::result<void> channel<T>::write_op::await_resume(const struct as_result_tag &)
0243 {
0244 if (cancel_slot.is_connected())
0245 cancel_slot.clear();
0246 if (cancelled)
0247 boost::throw_exception(system::system_error(asio::error::operation_aborted), loc);
0248
0249
0250 if (!direct)
0251 {
0252 BOOST_ASSERT(!chn->buffer_.full());
0253 if (ref.index() == 0)
0254 chn->buffer_.push_back(std::move(*variant2::get<0>(ref)));
0255 else
0256 chn->buffer_.push_back(*variant2::get<1>(ref));
0257 }
0258
0259 if (!chn->read_queue_.empty())
0260 {
0261 auto & op = chn->read_queue_.front();
0262 BOOST_ASSERT(chn->write_queue_.empty());
0263 if (op.await_ready())
0264 {
0265 op.transactional_unlink();
0266 BOOST_ASSERT(op.awaited_from);
0267 asio::post(chn->executor_, std::move(op.awaited_from));
0268 }
0269 }
0270 return system::in_place_value;
0271 }
0272
0273 struct channel<void>::read_op::cancel_impl
0274 {
0275 read_op * op;
0276 cancel_impl(read_op * op) : op(op) {}
0277 void operator()(asio::cancellation_type)
0278 {
0279 op->cancelled = true;
0280 op->unlink();
0281 asio::post(op->chn->executor_, std::move(op->awaited_from));
0282 op->cancel_slot.clear();
0283 }
0284 };
0285
0286 struct channel<void>::write_op::cancel_impl
0287 {
0288 write_op * op;
0289 cancel_impl(write_op * op) : op(op) {}
0290 void operator()(asio::cancellation_type)
0291 {
0292 op->cancelled = true;
0293 op->unlink();
0294 asio::post(op->chn->executor_, std::move(op->awaited_from));
0295 op->cancel_slot.clear();
0296 }
0297 };
0298
0299 template<typename Promise>
0300 std::coroutine_handle<void> channel<void>::read_op::await_suspend(std::coroutine_handle<Promise> h)
0301 {
0302 if constexpr (requires (Promise p) {p.get_cancellation_slot();})
0303 if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
0304 cancel_slot.emplace<cancel_impl>(this);
0305
0306 awaited_from.reset(h.address());
0307
0308 if constexpr (requires (Promise p) {p.begin_transaction();})
0309 begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
0310
0311
0312 if (chn->write_queue_.empty())
0313 {
0314 chn->read_queue_.push_back(*this);
0315 return std::noop_coroutine();
0316 }
0317 else
0318 {
0319 cancel_slot.clear();
0320 auto & op = chn->write_queue_.front();
0321 op.unlink();
0322 op.direct = true;
0323 BOOST_ASSERT(op.awaited_from);
0324 direct = true;
0325 asio::post(chn->executor_, std::move(awaited_from));
0326 return op.awaited_from.release();
0327 }
0328 }
0329
0330
0331 template<typename Promise>
0332 std::coroutine_handle<void> channel<void>::write_op::await_suspend(std::coroutine_handle<Promise> h)
0333 {
0334 if constexpr (requires (Promise p) {p.get_cancellation_slot();})
0335 if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
0336 cancel_slot.emplace<cancel_impl>(this);
0337
0338 awaited_from.reset(h.address());
0339
0340 if constexpr (requires (Promise p) {p.begin_transaction();})
0341 begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
0342
0343 if (chn->read_queue_.empty())
0344 {
0345 chn->write_queue_.push_back(*this);
0346 return std::noop_coroutine();
0347 }
0348 else
0349 {
0350 cancel_slot.clear();
0351 auto & op = chn->read_queue_.front();
0352 op.unlink();
0353 op.direct = true;
0354 BOOST_ASSERT(op.awaited_from);
0355 direct = true;
0356 asio::post(chn->executor_, std::move(awaited_from));
0357 return op.awaited_from.release();
0358 }
0359 }
0360
0361 }
0362
0363 #endif