File indexing completed on 2025-12-16 09:44:26
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_COBALT_IMPL_CHANNEL_HPP
0009 #define BOOST_COBALT_IMPL_CHANNEL_HPP
0010
0011 #include <boost/cobalt/channel.hpp>
0012 #include <boost/cobalt/result.hpp>
0013
0014 #include <boost/asio/post.hpp>
0015
0016 namespace boost::cobalt
0017 {
0018
0019 #if !defined(BOOST_COBALT_NO_PMR)
0020 template<typename T>
0021 inline channel<T>::channel(
0022 std::size_t limit,
0023 executor executor,
0024 pmr::memory_resource * resource)
0025 : buffer_(limit, pmr::polymorphic_allocator<T>(resource)), executor_(executor) {}
0026 #else
0027 template<typename T>
0028 inline channel<T>::channel(
0029 std::size_t limit,
0030 executor executor)
0031 : buffer_(limit), executor_(executor) {}
0032 #endif
0033
0034 template<typename T>
0035 auto channel<T>::get_executor() -> const executor_type & {return executor_;}
0036
0037 template<typename T>
0038 bool channel<T>::is_open() const {return !is_closed_;}
0039
0040
0041 template<typename T>
0042 channel<T>::~channel()
0043 {
0044 while (!read_queue_.empty())
0045 read_queue_.front().awaited_from.reset();
0046
0047 while (!write_queue_.empty())
0048 write_queue_.front().awaited_from.reset();
0049
0050 }
0051
0052 template<typename T>
0053 void channel<T>::close()
0054 {
0055 is_closed_ = true;
0056 while (!read_queue_.empty())
0057 {
0058 auto & op = read_queue_.front();
0059 op.unlink();
0060 op.cancelled = true;
0061 op.cancel_slot.clear();
0062
0063 if (op.awaited_from)
0064 asio::post(executor_, std::move(op.awaited_from));
0065 }
0066 while (!write_queue_.empty())
0067 {
0068 auto & op = write_queue_.front();
0069 op.unlink();
0070 op.cancelled = true;
0071 op.cancel_slot.clear();
0072 if (op.awaited_from)
0073 asio::post(executor_, std::move(op.awaited_from));
0074 }
0075 }
0076
0077
0078 template<typename T>
0079 struct channel<T>::read_op::cancel_impl
0080 {
0081 read_op * op;
0082 cancel_impl(read_op * op) : op(op) {}
0083 void operator()(asio::cancellation_type)
0084 {
0085 op->cancelled = true;
0086 op->unlink();
0087 if (op->awaited_from)
0088 asio::post(
0089 op->chn->executor_,
0090 std::move(op->awaited_from));
0091 op->cancel_slot.clear();
0092 }
0093 };
0094
0095 template<typename T>
0096 template<typename Promise>
0097 std::coroutine_handle<void> channel<T>::read_op::await_suspend(std::coroutine_handle<Promise> h)
0098 {
0099 if constexpr (requires (Promise p) {p.get_cancellation_slot();})
0100 if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
0101 cancel_slot.emplace<cancel_impl>(this);
0102
0103 if (awaited_from)
0104 boost::throw_exception(std::runtime_error("already-awaited"), loc);
0105 awaited_from.reset(h.address());
0106
0107 if constexpr (requires (Promise p) {p.begin_transaction();})
0108 begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
0109
0110 if (chn->write_queue_.empty())
0111 {
0112 chn->read_queue_.push_back(*this);
0113 return std::noop_coroutine();
0114 }
0115 else
0116 {
0117 cancel_slot.clear();
0118 auto & op = chn->write_queue_.front();
0119
0120 op.direct = true;
0121 if constexpr (std::is_copy_constructible_v<T>)
0122 {
0123 if (op.ref.index() == 0)
0124 direct = std::move(*variant2::get<0>(op.ref));
0125 else
0126 direct = *variant2::get<1>(op.ref);
0127 }
0128 else
0129 direct = std::move(*op.ref);
0130
0131 op.transactional_unlink();
0132 BOOST_ASSERT(op.awaited_from);
0133 BOOST_ASSERT(awaited_from);
0134
0135 asio::post(chn->executor_, std::move(awaited_from));
0136 return op.awaited_from.release();
0137 }
0138 }
0139
0140
0141 template<typename T>
0142 T channel<T>::read_op::await_resume()
0143 {
0144 return await_resume(as_result_tag{}).value(loc);
0145 }
0146
0147 template<typename T>
0148 std::tuple<system::error_code, T> channel<T>::read_op::await_resume(const struct as_tuple_tag &)
0149 {
0150 auto res = await_resume(as_result_tag{});
0151
0152 if (res.has_error())
0153 return {res.error(), T{}};
0154 else
0155 return {system::error_code{}, std::move(*res)};
0156
0157 }
0158
0159 template<typename T>
0160 system::result<T> channel<T>::read_op::await_resume(const struct as_result_tag &)
0161 {
0162 if (cancel_slot.is_connected())
0163 cancel_slot.clear();
0164
0165 if (cancelled)
0166 return {system::in_place_error, asio::error::operation_aborted};
0167
0168 T value = direct ? std::move(*direct) : std::move(chn->buffer_.front());
0169 if (!direct)
0170 chn->buffer_.pop_front();
0171
0172 if (!chn->write_queue_.empty())
0173 {
0174 auto &op = chn->write_queue_.front();
0175 BOOST_ASSERT(chn->read_queue_.empty());
0176 if (op.await_ready())
0177 {
0178 op.transactional_unlink();
0179 BOOST_ASSERT(op.awaited_from);
0180 asio::post(chn->executor_, std::move(op.awaited_from));
0181 }
0182 }
0183 return {system::in_place_value, std::move(value)};
0184 }
0185
0186 template<typename T>
0187 struct channel<T>::write_op::cancel_impl
0188 {
0189 write_op * op;
0190 cancel_impl(write_op * op) : op(op) {}
0191 void operator()(asio::cancellation_type)
0192 {
0193 op->cancelled = true;
0194 op->unlink();
0195 if (op->awaited_from)
0196 asio::post(
0197 op->chn->executor_, std::move(op->awaited_from));
0198 op->cancel_slot.clear();
0199 }
0200 };
0201
0202 template<typename T>
0203 template<typename Promise>
0204 std::coroutine_handle<void> channel<T>::write_op::await_suspend(std::coroutine_handle<Promise> h)
0205 {
0206 if constexpr (requires (Promise p) {p.get_cancellation_slot();})
0207 if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
0208 cancel_slot.emplace<cancel_impl>(this);
0209
0210 awaited_from.reset(h.address());
0211 if constexpr (requires (Promise p) {p.begin_transaction();})
0212 begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
0213
0214 if (chn->read_queue_.empty())
0215 {
0216 chn->write_queue_.push_back(*this);
0217 return std::noop_coroutine();
0218 }
0219 else
0220 {
0221 cancel_slot.clear();
0222 auto & op = chn->read_queue_.front();
0223 if constexpr (std::is_copy_constructible_v<T>)
0224 {
0225 if (ref.index() == 0)
0226 op.direct.emplace(std::move(*variant2::get<0>(ref)));
0227 else
0228 op.direct.emplace(*variant2::get<1>(ref));
0229 }
0230 else
0231 op.direct.emplace(std::move(*ref));
0232
0233 direct = true;
0234 op.transactional_unlink();
0235
0236 BOOST_ASSERT(op.awaited_from);
0237 BOOST_ASSERT(awaited_from);
0238 asio::post(chn->executor_, std::move(awaited_from));
0239
0240 return op.awaited_from.release();
0241 }
0242 }
0243
0244 template<typename T>
0245 std::tuple<system::error_code> channel<T>::write_op::await_resume(const struct as_tuple_tag &)
0246 {
0247 return await_resume(as_result_tag{}).error();
0248 }
0249
0250 template<typename T>
0251 void channel<T>::write_op::await_resume()
0252 {
0253 await_resume(as_result_tag{}).value(loc);
0254 }
0255
0256 template<typename T>
0257 system::result<void> channel<T>::write_op::await_resume(const struct as_result_tag &)
0258 {
0259 if (cancel_slot.is_connected())
0260 cancel_slot.clear();
0261 if (cancelled)
0262 boost::throw_exception(system::system_error(asio::error::operation_aborted), loc);
0263
0264 if (!direct)
0265 {
0266 BOOST_ASSERT(!chn->buffer_.full());
0267 if constexpr (std::is_copy_constructible_v<T>)
0268 {
0269 if (ref.index() == 0)
0270 chn->buffer_.push_back(std::move(*variant2::get<0>(ref)));
0271 else
0272 chn->buffer_.push_back(*variant2::get<1>(ref));
0273 }
0274 else
0275 chn->buffer_.push_back(std::move(*ref));
0276 }
0277
0278 if (!chn->read_queue_.empty())
0279 {
0280 auto & op = chn->read_queue_.front();
0281 BOOST_ASSERT(chn->write_queue_.empty());
0282 if (op.await_ready())
0283 {
0284 op.transactional_unlink();
0285 BOOST_ASSERT(op.awaited_from);
0286 asio::post(chn->executor_, std::move(op.awaited_from));
0287 }
0288 }
0289 return system::in_place_value;
0290 }
0291
0292 struct channel<void>::read_op::cancel_impl
0293 {
0294 read_op * op;
0295 cancel_impl(read_op * op) : op(op) {}
0296 void operator()(asio::cancellation_type)
0297 {
0298 op->cancelled = true;
0299 op->unlink();
0300 asio::post(op->chn->executor_, std::move(op->awaited_from));
0301 op->cancel_slot.clear();
0302 }
0303 };
0304
0305 struct channel<void>::write_op::cancel_impl
0306 {
0307 write_op * op;
0308 cancel_impl(write_op * op) : op(op) {}
0309 void operator()(asio::cancellation_type)
0310 {
0311 op->cancelled = true;
0312 op->unlink();
0313 asio::post(op->chn->executor_, std::move(op->awaited_from));
0314 op->cancel_slot.clear();
0315 }
0316 };
0317
0318 template<typename Promise>
0319 std::coroutine_handle<void> channel<void>::read_op::await_suspend(std::coroutine_handle<Promise> h)
0320 {
0321 if constexpr (requires (Promise p) {p.get_cancellation_slot();})
0322 if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
0323 cancel_slot.emplace<cancel_impl>(this);
0324
0325 awaited_from.reset(h.address());
0326
0327 if constexpr (requires (Promise p) {p.begin_transaction();})
0328 begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
0329
0330
0331 if (chn->write_queue_.empty())
0332 {
0333 chn->read_queue_.push_back(*this);
0334 return std::noop_coroutine();
0335 }
0336 else
0337 {
0338 cancel_slot.clear();
0339 auto & op = chn->write_queue_.front();
0340 op.direct = true;
0341 direct = true;
0342 op.transactional_unlink();
0343
0344 BOOST_ASSERT(op.awaited_from);
0345 BOOST_ASSERT(awaited_from);
0346 asio::post(chn->executor_, std::move(awaited_from));
0347 return op.awaited_from.release();
0348 }
0349 }
0350
0351
0352 template<typename Promise>
0353 std::coroutine_handle<void> channel<void>::write_op::await_suspend(std::coroutine_handle<Promise> h)
0354 {
0355 if constexpr (requires (Promise p) {p.get_cancellation_slot();})
0356 if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
0357 cancel_slot.emplace<cancel_impl>(this);
0358
0359 awaited_from.reset(h.address());
0360
0361 if constexpr (requires (Promise p) {p.begin_transaction();})
0362 begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
0363
0364 if (chn->read_queue_.empty())
0365 {
0366 chn->write_queue_.push_back(*this);
0367 return std::noop_coroutine();
0368 }
0369 else
0370 {
0371 cancel_slot.clear();
0372 auto & op = chn->read_queue_.front();
0373 op.direct = true;
0374 direct = true;
0375 op.transactional_unlink();
0376
0377 BOOST_ASSERT(op.awaited_from);
0378 BOOST_ASSERT(awaited_from);
0379
0380 asio::post(chn->executor_, std::move(awaited_from));
0381 return op.awaited_from.release();
0382 }
0383 }
0384
0385 }
0386
0387 #endif