File indexing completed on 2025-12-16 09:44:26
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_COBALT_CHANNEL_HPP
0009 #define BOOST_COBALT_CHANNEL_HPP
0010
0011 #include <boost/cobalt/this_thread.hpp>
0012 #include <boost/cobalt/unique_handle.hpp>
0013 #include <boost/cobalt/detail/util.hpp>
0014
0015 #include <boost/asio/cancellation_signal.hpp>
0016 #include <boost/asio/cancellation_type.hpp>
0017 #include <boost/circular_buffer.hpp>
0018 #include <boost/config.hpp>
0019 #include <boost/intrusive/list.hpp>
0020 #include <boost/variant2/variant.hpp>
0021
0022 #include <optional>
0023
0024 namespace boost::cobalt
0025 {
0026
0027 template<typename T>
0028 struct channel_reader;
0029
0030
0031 template<typename T>
0032 struct channel
0033 {
0034
0035 #if defined(BOOST_COBALT_NO_PMR)
0036 channel(std::size_t limit = 0u,
0037 executor executor = this_thread::get_executor());
0038 #else
0039
0040
0041 explicit
0042 channel(std::size_t limit = 0u,
0043 executor executor = this_thread::get_executor(),
0044 pmr::memory_resource * resource = this_thread::get_default_resource());
0045
0046 #endif
0047
0048
0049 channel(channel && rhs) noexcept = delete;
0050 channel & operator=(channel && lhs) noexcept = delete;
0051
0052 using executor_type = executor;
0053 const executor_type & get_executor();
0054
0055
0056 ~channel();
0057 bool is_open() const;
0058
0059 void close();
0060
0061
0062 private:
0063 #if !defined(BOOST_COBALT_NO_PMR)
0064 boost::circular_buffer<T, pmr::polymorphic_allocator<T>> buffer_;
0065 #else
0066 boost::circular_buffer<T> buffer_;
0067 #endif
0068 executor_type executor_;
0069 bool is_closed_{false};
0070
0071 struct read_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> >
0072 {
0073 channel * chn;
0074 boost::source_location loc;
0075 bool cancelled = false;
0076 std::optional<T> direct{};
0077 asio::cancellation_slot cancel_slot{};
0078 unique_handle<void> awaited_from{nullptr};
0079 void (*begin_transaction)(void*) = nullptr;
0080
0081 void transactional_unlink()
0082 {
0083 if (begin_transaction)
0084 begin_transaction(awaited_from.get());
0085 this->unlink();
0086 }
0087
0088 void interrupt_await()
0089 {
0090 if (!direct)
0091 {
0092 this->cancelled = true;
0093 if (this->awaited_from)
0094 this->awaited_from.release().resume();
0095 }
0096 }
0097
0098 struct cancel_impl;
0099 bool await_ready() { return !chn->buffer_.empty(); }
0100 template<typename Promise>
0101 BOOST_NOINLINE
0102 std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
0103 T await_resume();
0104 std::tuple<system::error_code, T> await_resume(const struct as_tuple_tag & );
0105 system::result<T> await_resume(const struct as_result_tag &);
0106 explicit operator bool() const {return chn && chn->is_open();}
0107 };
0108
0109 struct write_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> >
0110 {
0111 channel * chn;
0112 using ref_t = std::conditional_t<
0113 std::is_copy_constructible_v<T>,
0114 variant2::variant<T*, const T*>,
0115 T*>;
0116 ref_t ref;
0117 boost::source_location loc;
0118 bool cancelled = false, direct = false;
0119 asio::cancellation_slot cancel_slot{};
0120
0121 unique_handle<void> awaited_from{nullptr};
0122 void (*begin_transaction)(void*) = nullptr;
0123
0124 void transactional_unlink()
0125 {
0126 if (begin_transaction)
0127 begin_transaction(awaited_from.get());
0128 this->unlink();
0129 }
0130
0131 void interrupt_await()
0132 {
0133 if (!direct)
0134 {
0135 this->cancelled = true;
0136 if (this->awaited_from)
0137 this->awaited_from.release().resume();
0138 }
0139 }
0140
0141 struct cancel_impl;
0142
0143 bool await_ready() { return !chn->buffer_.full(); }
0144 template<typename Promise>
0145 BOOST_NOINLINE
0146 std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
0147 void await_resume();
0148 std::tuple<system::error_code> await_resume(const struct as_tuple_tag & );
0149 system::result<void> await_resume(const struct as_result_tag &);
0150 explicit operator bool() const {return chn && chn->is_open();}
0151 };
0152
0153 boost::intrusive::list<read_op, intrusive::constant_time_size<false> > read_queue_;
0154 boost::intrusive::list<write_op, intrusive::constant_time_size<false> > write_queue_;
0155 public:
0156 read_op read(const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return read_op{{}, this, loc}; }
0157
0158 #if defined(BOOST_WINDOWS_API)
0159 BOOST_NOINLINE
0160 #endif
0161 write_op write(const T && value, const boost::source_location & loc = BOOST_CURRENT_LOCATION)
0162 requires std::is_copy_constructible_v<T>
0163 {
0164 return write_op{{}, this, &value, loc};
0165 }
0166
0167 #if defined(BOOST_WINDOWS_API)
0168 BOOST_NOINLINE
0169 #endif
0170 write_op write(const T & value, const boost::source_location & loc = BOOST_CURRENT_LOCATION)
0171 requires std::is_copy_constructible_v<T>
0172 {
0173 return write_op{{}, this, &value, loc};
0174 }
0175
0176
0177 #if defined(BOOST_WINDOWS_API)
0178 BOOST_NOINLINE
0179 #endif
0180 write_op write( T && value, const boost::source_location & loc = BOOST_CURRENT_LOCATION)
0181 {
0182 return write_op{{}, this, &value, loc};
0183 }
0184
0185 #if defined(BOOST_WINDOWS_API)
0186 BOOST_NOINLINE
0187 #endif
0188 write_op write( T & value, const boost::source_location & loc = BOOST_CURRENT_LOCATION)
0189 {
0190 return write_op{{}, this, &value, loc};
0191 }
0192
0193
0194
0195
0196
0197
0198
0199
0200
0201
0202
0203
0204
0205
0206
0207
0208
0209
0210
0211
0212
0213
0214 };
0215
0216
0217 template<>
0218 struct channel<void>
0219 {
0220 explicit
0221 channel(std::size_t limit = 0u,
0222 executor executor = this_thread::get_executor())
0223 : limit_(limit), executor_(executor) {}
0224 channel(channel &&) noexcept = delete;
0225 channel & operator=(channel && lhs) noexcept = delete;
0226
0227 using executor_type = executor;
0228 const executor_type & get_executor() {return executor_;}
0229
0230 BOOST_COBALT_DECL ~channel();
0231
0232 bool is_open() const {return !is_closed_;}
0233 BOOST_COBALT_DECL void close();
0234
0235 private:
0236 std::size_t limit_;
0237 std::size_t n_{0u};
0238 executor_type executor_;
0239 bool is_closed_{false};
0240
0241 struct read_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> >
0242 {
0243 channel * chn;
0244 boost::source_location loc;
0245 bool cancelled = false, direct = false;
0246 asio::cancellation_slot cancel_slot{};
0247 unique_handle<void> awaited_from{nullptr};
0248 void (*begin_transaction)(void*) = nullptr;
0249
0250 void transactional_unlink()
0251 {
0252 if (begin_transaction)
0253 begin_transaction(awaited_from.get());
0254 this->unlink();
0255 }
0256
0257 void interrupt_await()
0258 {
0259 if (!direct)
0260 {
0261 this->cancelled = true;
0262 if (this->awaited_from)
0263 this->awaited_from.release().resume();
0264 }
0265 }
0266
0267 struct cancel_impl;
0268 bool await_ready() { return (chn->n_ > 0); }
0269 template<typename Promise>
0270 BOOST_NOINLINE
0271 std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
0272 BOOST_COBALT_DECL void await_resume();
0273 BOOST_COBALT_DECL std::tuple<system::error_code> await_resume(const struct as_tuple_tag & );
0274 BOOST_COBALT_DECL system::result<void> await_resume(const struct as_result_tag &);
0275 explicit operator bool() const {return chn && chn->is_open();}
0276 };
0277
0278 struct write_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> >
0279 {
0280 channel * chn;
0281 boost::source_location loc;
0282 bool cancelled = false, direct = false;
0283 asio::cancellation_slot cancel_slot{};
0284 unique_handle<void> awaited_from{nullptr};
0285 void (*begin_transaction)(void*) = nullptr;
0286
0287 void transactional_unlink()
0288 {
0289 if (begin_transaction)
0290 begin_transaction(awaited_from.get());
0291 this->unlink();
0292 }
0293
0294 void interrupt_await()
0295 {
0296 if (!direct)
0297 {
0298 cancelled = true;
0299 if (this->awaited_from)
0300 this->awaited_from.release().resume();
0301 }
0302 }
0303
0304 struct cancel_impl;
0305 bool await_ready()
0306 {
0307 return chn->n_ < chn->limit_;
0308 }
0309
0310 template<typename Promise>
0311 BOOST_NOINLINE
0312 std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
0313
0314 BOOST_COBALT_DECL void await_resume();
0315 BOOST_COBALT_DECL std::tuple<system::error_code> await_resume(const struct as_tuple_tag & );
0316 BOOST_COBALT_DECL system::result<void> await_resume(const struct as_result_tag &);
0317 explicit operator bool() const {return chn && chn->is_open();}
0318 };
0319
0320 boost::intrusive::list<read_op, intrusive::constant_time_size<false> > read_queue_;
0321 boost::intrusive::list<write_op, intrusive::constant_time_size<false> > write_queue_;
0322 public:
0323 read_op read(const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return read_op{{}, this, loc}; }
0324 write_op write(const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return write_op{{}, this, loc}; }
0325 };
0326
0327 template<typename T>
0328 struct channel_reader
0329 {
0330 channel_reader(channel<T> & chan,
0331 const boost::source_location & loc = BOOST_CURRENT_LOCATION) : chan_(&chan), loc_(loc) {}
0332
0333 auto operator co_await ()
0334 {
0335 return chan_->read(loc_);
0336 }
0337
0338 explicit operator bool () const {return chan_ && chan_->is_open();}
0339
0340 private:
0341 channel<T> * chan_;
0342 boost::source_location loc_;
0343 };
0344
0345 }
0346
0347 #include <boost/cobalt/impl/channel.hpp>
0348
0349 #endif