Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-12-16 09:44:26

0001 //
0002 // Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_COBALT_CHANNEL_HPP
0009 #define BOOST_COBALT_CHANNEL_HPP
0010 
0011 #include <boost/cobalt/this_thread.hpp>
0012 #include <boost/cobalt/unique_handle.hpp>
0013 #include <boost/cobalt/detail/util.hpp>
0014 
0015 #include <boost/asio/cancellation_signal.hpp>
0016 #include <boost/asio/cancellation_type.hpp>
0017 #include <boost/circular_buffer.hpp>
0018 #include <boost/config.hpp>
0019 #include <boost/intrusive/list.hpp>
0020 #include <boost/variant2/variant.hpp>
0021 
0022 #include <optional>
0023 
0024 namespace boost::cobalt
0025 {
0026 
0027 template<typename T>
0028 struct channel_reader;
0029 
0030 // tag::outline[]
0031 template<typename T>
0032 struct channel
0033 {
0034   // end::outline[]
0035 #if defined(BOOST_COBALT_NO_PMR)
0036   channel(std::size_t limit = 0u,
0037           executor executor = this_thread::get_executor());
0038 #else
0039   // tag::outline[]
0040   // create a channel with a buffer limit, executor & resource.
0041   explicit
0042   channel(std::size_t limit = 0u,
0043           executor executor = this_thread::get_executor(),
0044           pmr::memory_resource * resource = this_thread::get_default_resource());
0045   // end::outline[]
0046 #endif
0047   // tag::outline[]
0048   // not movable.
0049   channel(channel && rhs) noexcept = delete;
0050   channel & operator=(channel && lhs) noexcept = delete;
0051 
0052   using executor_type = executor;
0053   const executor_type & get_executor();
0054 
0055   // Closes the channel
0056   ~channel();
0057   bool is_open() const;
0058   // close the operation, will cancel all pending ops, too
0059   void close();
0060 
0061   // end::outline[]
0062  private:
0063 #if !defined(BOOST_COBALT_NO_PMR)
0064   boost::circular_buffer<T, pmr::polymorphic_allocator<T>> buffer_;
0065 #else
0066   boost::circular_buffer<T> buffer_;
0067 #endif
0068   executor_type executor_;
0069   bool is_closed_{false};
0070 
0071   struct read_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> >
0072   {
0073     channel * chn;
0074     boost::source_location loc;
0075     bool cancelled = false;
0076     std::optional<T> direct{};
0077     asio::cancellation_slot cancel_slot{};
0078     unique_handle<void> awaited_from{nullptr};
0079     void (*begin_transaction)(void*) = nullptr;
0080 
0081     void transactional_unlink()
0082     {
0083       if (begin_transaction)
0084           begin_transaction(awaited_from.get());
0085       this->unlink();
0086     }
0087 
0088     void interrupt_await()
0089     {
0090       if (!direct)
0091       {
0092         this->cancelled = true;
0093         if (this->awaited_from)
0094           this->awaited_from.release().resume();
0095       }
0096     }
0097 
0098     struct cancel_impl;
0099     bool await_ready() { return !chn->buffer_.empty(); }
0100     template<typename Promise>
0101     BOOST_NOINLINE 
0102     std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
0103     T await_resume();
0104     std::tuple<system::error_code, T> await_resume(const struct as_tuple_tag & );
0105     system::result<T> await_resume(const struct as_result_tag &);
0106     explicit operator bool() const {return chn && chn->is_open();}
0107   };
0108 
0109   struct write_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> >
0110   {
0111     channel * chn;
0112     using ref_t = std::conditional_t<
0113         std::is_copy_constructible_v<T>,
0114         variant2::variant<T*, const T*>,
0115         T*>;
0116     ref_t ref;
0117     boost::source_location loc;
0118     bool cancelled = false, direct = false;
0119     asio::cancellation_slot cancel_slot{};
0120 
0121     unique_handle<void> awaited_from{nullptr};
0122     void (*begin_transaction)(void*) = nullptr;
0123 
0124     void transactional_unlink()
0125     {
0126       if (begin_transaction)
0127           begin_transaction(awaited_from.get());
0128       this->unlink();
0129     }
0130 
0131     void interrupt_await()
0132     {
0133       if (!direct)
0134       {
0135         this->cancelled = true;
0136         if (this->awaited_from)
0137           this->awaited_from.release().resume();
0138       }
0139     }
0140 
0141     struct cancel_impl;
0142 
0143     bool await_ready() { return !chn->buffer_.full(); }
0144     template<typename Promise>
0145     BOOST_NOINLINE 
0146     std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
0147     void await_resume();
0148     std::tuple<system::error_code> await_resume(const struct as_tuple_tag & );
0149     system::result<void> await_resume(const struct as_result_tag &);
0150     explicit operator bool() const {return chn && chn->is_open();}
0151   };
0152 
0153   boost::intrusive::list<read_op,  intrusive::constant_time_size<false> > read_queue_;
0154   boost::intrusive::list<write_op, intrusive::constant_time_size<false> > write_queue_;
0155  public:
0156   read_op   read(const boost::source_location & loc = BOOST_CURRENT_LOCATION)  {return  read_op{{}, this, loc}; }
0157 
0158 #if defined(BOOST_WINDOWS_API)
0159   BOOST_NOINLINE
0160 #endif
0161   write_op write(const T  && value, const boost::source_location & loc = BOOST_CURRENT_LOCATION)
0162     requires std::is_copy_constructible_v<T>
0163   {
0164     return write_op{{}, this, &value, loc};
0165   }
0166 
0167 #if defined(BOOST_WINDOWS_API)
0168   BOOST_NOINLINE
0169 #endif
0170   write_op write(const T  &  value, const boost::source_location & loc = BOOST_CURRENT_LOCATION)
0171     requires std::is_copy_constructible_v<T>
0172   {
0173     return write_op{{}, this, &value, loc};
0174   }
0175 
0176 
0177 #if defined(BOOST_WINDOWS_API)
0178   BOOST_NOINLINE
0179 #endif
0180   write_op write(      T &&  value, const boost::source_location & loc = BOOST_CURRENT_LOCATION)
0181   {
0182     return write_op{{}, this, &value, loc};
0183   }
0184 
0185 #if defined(BOOST_WINDOWS_API)
0186   BOOST_NOINLINE
0187 #endif
0188   write_op write(      T  &  value, const boost::source_location & loc = BOOST_CURRENT_LOCATION)
0189   {
0190     return write_op{{}, this, &value, loc};
0191   }
0192   /*
0193   // tag::outline[]
0194   // an awaitable that yields T
0195   using __read_op__ = __unspecified__;
0196 
0197   // an awaitable that yields void
0198   using __write_op__ = __unspecified__;
0199 
0200   // read a value to a channel
0201   __read_op__  read();
0202 
0203   // write a value to the channel
0204   __write_op__ write(const T  && value);
0205   __write_op__ write(const T  &  value);
0206   __write_op__ write(      T &&  value);
0207   __write_op__ write(      T  &  value);
0208 
0209   // write a value to the channel if T is void
0210   __write_op__ write();  // end::outline[]
0211    */
0212   // tag::outline[]
0213 
0214 };
0215 // end::outline[]
0216 
0217 template<>
0218 struct channel<void>
0219 {
0220   explicit
0221   channel(std::size_t limit = 0u,
0222           executor executor = this_thread::get_executor())
0223         : limit_(limit), executor_(executor) {}
0224   channel(channel &&) noexcept = delete;
0225   channel & operator=(channel && lhs) noexcept = delete;
0226 
0227   using executor_type = executor;
0228   const executor_type & get_executor() {return executor_;}
0229 
0230   BOOST_COBALT_DECL ~channel();
0231 
0232   bool is_open() const {return !is_closed_;}
0233   BOOST_COBALT_DECL void close();
0234 
0235  private:
0236   std::size_t limit_;
0237   std::size_t n_{0u};
0238   executor_type executor_;
0239   bool is_closed_{false};
0240 
0241   struct read_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> >
0242   {
0243     channel * chn;
0244     boost::source_location loc;
0245     bool cancelled = false, direct = false;
0246     asio::cancellation_slot cancel_slot{};
0247     unique_handle<void> awaited_from{nullptr};
0248     void (*begin_transaction)(void*) = nullptr;
0249 
0250     void transactional_unlink()
0251     {
0252       if (begin_transaction)
0253           begin_transaction(awaited_from.get());
0254       this->unlink();
0255     }
0256 
0257     void interrupt_await()
0258     {
0259       if (!direct)
0260       {
0261         this->cancelled = true;
0262         if (this->awaited_from)
0263           this->awaited_from.release().resume();
0264       }
0265     }
0266 
0267     struct cancel_impl;
0268     bool await_ready() { return (chn->n_ > 0); }
0269     template<typename Promise>
0270     BOOST_NOINLINE 
0271     std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
0272     BOOST_COBALT_DECL void await_resume();
0273     BOOST_COBALT_DECL std::tuple<system::error_code> await_resume(const struct as_tuple_tag & );
0274     BOOST_COBALT_DECL system::result<void> await_resume(const struct as_result_tag &);
0275     explicit operator bool() const {return chn && chn->is_open();}
0276   };
0277 
0278   struct write_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> >
0279   {
0280     channel * chn;
0281     boost::source_location loc;
0282     bool cancelled = false, direct = false;
0283     asio::cancellation_slot cancel_slot{};
0284     unique_handle<void> awaited_from{nullptr};
0285     void (*begin_transaction)(void*) = nullptr;
0286 
0287     void transactional_unlink()
0288     {
0289       if (begin_transaction)
0290           begin_transaction(awaited_from.get());
0291       this->unlink();
0292     }
0293 
0294     void interrupt_await()
0295     {
0296       if (!direct)
0297       {
0298         cancelled = true;
0299         if (this->awaited_from)
0300           this->awaited_from.release().resume();
0301       }
0302     }
0303 
0304     struct cancel_impl;
0305     bool await_ready()
0306     {
0307       return chn->n_ < chn->limit_;
0308     }
0309 
0310     template<typename Promise>
0311     BOOST_NOINLINE 
0312     std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
0313 
0314     BOOST_COBALT_DECL void await_resume();
0315     BOOST_COBALT_DECL std::tuple<system::error_code> await_resume(const struct as_tuple_tag & );
0316     BOOST_COBALT_DECL system::result<void> await_resume(const struct as_result_tag &);
0317     explicit operator bool() const {return chn && chn->is_open();}
0318   };
0319 
0320   boost::intrusive::list<read_op,  intrusive::constant_time_size<false> > read_queue_;
0321   boost::intrusive::list<write_op, intrusive::constant_time_size<false> > write_queue_;
0322  public:
0323   read_op   read(const boost::source_location & loc = BOOST_CURRENT_LOCATION)  {return  read_op{{}, this, loc}; }
0324   write_op write(const boost::source_location & loc = BOOST_CURRENT_LOCATION)  {return write_op{{}, this, loc}; }
0325 };
0326 
0327 template<typename T>
0328 struct channel_reader
0329 {
0330   channel_reader(channel<T> & chan,
0331                  const boost::source_location & loc = BOOST_CURRENT_LOCATION) : chan_(&chan), loc_(loc) {}
0332 
0333   auto operator co_await ()
0334   {
0335     return chan_->read(loc_);
0336   }
0337 
0338   explicit operator bool () const {return chan_ && chan_->is_open();}
0339 
0340  private:
0341   channel<T> * chan_;
0342   boost::source_location loc_;
0343 };
0344 
0345 }
0346 
0347 #include <boost/cobalt/impl/channel.hpp>
0348 
0349 #endif //BOOST_COBALT_CHANNEL_HPP