Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:54:51

0001 ///////////////////////////////////////////////////////////////////////////////
0002 // Copyright (c) Lewis Baker
0003 // Licenced under MIT license. See LICENSE.txt for details.
0004 ///////////////////////////////////////////////////////////////////////////////
0005 #ifndef CPPCORO_ASYNC_GENERATOR_HPP_INCLUDED
0006 #define CPPCORO_ASYNC_GENERATOR_HPP_INCLUDED
0007 
0008 #include <cppcoro/config.hpp>
0009 #include <cppcoro/fmap.hpp>
0010 
0011 #include <exception>
0012 #include <atomic>
0013 #include <iterator>
0014 #include <type_traits>
0015 #include <cppcoro/coroutine.hpp>
0016 #include <functional>
0017 #include <cassert>
0018 
0019 namespace cppcoro
0020 {
0021     template<typename T>
0022     class async_generator;
0023 
0024 #if CPPCORO_COMPILER_SUPPORTS_SYMMETRIC_TRANSFER
0025 
0026     namespace detail
0027     {
0028         template<typename T>
0029         class async_generator_iterator;
0030         class async_generator_yield_operation;
0031         class async_generator_advance_operation;
0032 
0033         class async_generator_promise_base
0034         {
0035         public:
0036 
0037             async_generator_promise_base() noexcept
0038                 : m_exception(nullptr)
0039             {
0040                 // Other variables left intentionally uninitialised as they're
0041                 // only referenced in certain states by which time they should
0042                 // have been initialised.
0043             }
0044 
0045             async_generator_promise_base(const async_generator_promise_base& other) = delete;
0046             async_generator_promise_base& operator=(const async_generator_promise_base& other) = delete;
0047 
0048             cppcoro::suspend_always initial_suspend() const noexcept
0049             {
0050                 return {};
0051             }
0052 
0053             async_generator_yield_operation final_suspend() noexcept;
0054 
0055             void unhandled_exception() noexcept
0056             {
0057                 m_exception = std::current_exception();
0058             }
0059 
0060             void return_void() noexcept
0061             {
0062             }
0063 
0064             /// Query if the generator has reached the end of the sequence.
0065             ///
0066             /// Only valid to call after resuming from an awaited advance operation.
0067             /// i.e. Either a begin() or iterator::operator++() operation.
0068             bool finished() const noexcept
0069             {
0070                 return m_currentValue == nullptr;
0071             }
0072 
0073             void rethrow_if_unhandled_exception()
0074             {
0075                 if (m_exception)
0076                 {
0077                     std::rethrow_exception(std::move(m_exception));
0078                 }
0079             }
0080 
0081         protected:
0082 
0083             async_generator_yield_operation internal_yield_value() noexcept;
0084 
0085         private:
0086 
0087             friend class async_generator_yield_operation;
0088             friend class async_generator_advance_operation;
0089 
0090             std::exception_ptr m_exception;
0091 
0092             cppcoro::coroutine_handle<> m_consumerCoroutine;
0093 
0094         protected:
0095 
0096             void* m_currentValue;
0097         };
0098 
0099         class async_generator_yield_operation final
0100         {
0101         public:
0102 
0103             async_generator_yield_operation(cppcoro::coroutine_handle<> consumer) noexcept
0104                 : m_consumer(consumer)
0105             {}
0106 
0107             bool await_ready() const noexcept
0108             {
0109                 return false;
0110             }
0111 
0112             cppcoro::coroutine_handle<>
0113             await_suspend([[maybe_unused]] cppcoro::coroutine_handle<> producer) noexcept
0114             {
0115                 return m_consumer;
0116             }
0117 
0118             void await_resume() noexcept {}
0119 
0120         private:
0121 
0122             cppcoro::coroutine_handle<> m_consumer;
0123 
0124         };
0125 
0126         inline async_generator_yield_operation async_generator_promise_base::final_suspend() noexcept
0127         {
0128             m_currentValue = nullptr;
0129             return internal_yield_value();
0130         }
0131 
0132         inline async_generator_yield_operation async_generator_promise_base::internal_yield_value() noexcept
0133         {
0134             return async_generator_yield_operation{ m_consumerCoroutine };
0135         }
0136 
0137         class async_generator_advance_operation
0138         {
0139         protected:
0140 
0141             async_generator_advance_operation(std::nullptr_t) noexcept
0142                 : m_promise(nullptr)
0143                 , m_producerCoroutine(nullptr)
0144             {}
0145 
0146             async_generator_advance_operation(
0147                 async_generator_promise_base& promise,
0148                 cppcoro::coroutine_handle<> producerCoroutine) noexcept
0149                 : m_promise(std::addressof(promise))
0150                 , m_producerCoroutine(producerCoroutine)
0151             {
0152             }
0153 
0154         public:
0155 
0156             bool await_ready() const noexcept { return false; }
0157 
0158             cppcoro::coroutine_handle<>
0159                 await_suspend(cppcoro::coroutine_handle<> consumerCoroutine) noexcept
0160             {
0161                 m_promise->m_consumerCoroutine = consumerCoroutine;
0162                 return m_producerCoroutine;
0163             }
0164 
0165         protected:
0166 
0167             async_generator_promise_base* m_promise;
0168             cppcoro::coroutine_handle<> m_producerCoroutine;
0169 
0170         };
0171 
0172         template<typename T>
0173         class async_generator_promise final : public async_generator_promise_base
0174         {
0175             using value_type = std::remove_reference_t<T>;
0176 
0177         public:
0178 
0179             async_generator_promise() noexcept = default;
0180 
0181             async_generator<T> get_return_object() noexcept;
0182 
0183             async_generator_yield_operation yield_value(value_type& value) noexcept
0184             {
0185                 m_currentValue = std::addressof(value);
0186                 return internal_yield_value();
0187             }
0188 
0189             async_generator_yield_operation yield_value(value_type&& value) noexcept
0190             {
0191                 return yield_value(value);
0192             }
0193 
0194             T& value() const noexcept
0195             {
0196                 return *static_cast<T*>(m_currentValue);
0197             }
0198 
0199         };
0200 
0201         template<typename T>
0202         class async_generator_promise<T&&> final : public async_generator_promise_base
0203         {
0204         public:
0205 
0206             async_generator_promise() noexcept = default;
0207 
0208             async_generator<T> get_return_object() noexcept;
0209 
0210             async_generator_yield_operation yield_value(T&& value) noexcept
0211             {
0212                 m_currentValue = std::addressof(value);
0213                 return internal_yield_value();
0214             }
0215 
0216             T&& value() const noexcept
0217             {
0218                 return std::move(*static_cast<T*>(m_currentValue));
0219             }
0220 
0221         };
0222 
0223         template<typename T>
0224         class async_generator_increment_operation final : public async_generator_advance_operation
0225         {
0226         public:
0227 
0228             async_generator_increment_operation(async_generator_iterator<T>& iterator) noexcept
0229                 : async_generator_advance_operation(iterator.m_coroutine.promise(), iterator.m_coroutine)
0230                 , m_iterator(iterator)
0231             {}
0232 
0233             async_generator_iterator<T>& await_resume();
0234 
0235         private:
0236 
0237             async_generator_iterator<T>& m_iterator;
0238 
0239         };
0240 
0241         template<typename T>
0242         class async_generator_iterator final
0243         {
0244             using promise_type = async_generator_promise<T>;
0245             using handle_type = cppcoro::coroutine_handle<promise_type>;
0246 
0247         public:
0248 
0249             using iterator_category = std::input_iterator_tag;
0250             // Not sure what type should be used for difference_type as we don't
0251             // allow calculating difference between two iterators.
0252             using difference_type = std::ptrdiff_t;
0253             using value_type = std::remove_reference_t<T>;
0254             using reference = std::add_lvalue_reference_t<T>;
0255             using pointer = std::add_pointer_t<value_type>;
0256 
0257             async_generator_iterator(std::nullptr_t) noexcept
0258                 : m_coroutine(nullptr)
0259             {}
0260 
0261             async_generator_iterator(handle_type coroutine) noexcept
0262                 : m_coroutine(coroutine)
0263             {}
0264 
0265             async_generator_increment_operation<T> operator++() noexcept
0266             {
0267                 return async_generator_increment_operation<T>{ *this };
0268             }
0269 
0270             reference operator*() const noexcept
0271             {
0272                 return m_coroutine.promise().value();
0273             }
0274 
0275             bool operator==(const async_generator_iterator& other) const noexcept
0276             {
0277                 return m_coroutine == other.m_coroutine;
0278             }
0279 
0280             bool operator!=(const async_generator_iterator& other) const noexcept
0281             {
0282                 return !(*this == other);
0283             }
0284 
0285         private:
0286 
0287             friend class async_generator_increment_operation<T>;
0288 
0289             handle_type m_coroutine;
0290 
0291         };
0292 
0293         template<typename T>
0294         async_generator_iterator<T>& async_generator_increment_operation<T>::await_resume()
0295         {
0296             if (m_promise->finished())
0297             {
0298                 // Update iterator to end()
0299                 m_iterator = async_generator_iterator<T>{ nullptr };
0300                 m_promise->rethrow_if_unhandled_exception();
0301             }
0302 
0303             return m_iterator;
0304         }
0305 
0306         template<typename T>
0307         class async_generator_begin_operation final : public async_generator_advance_operation
0308         {
0309             using promise_type = async_generator_promise<T>;
0310             using handle_type = cppcoro::coroutine_handle<promise_type>;
0311 
0312         public:
0313 
0314             async_generator_begin_operation(std::nullptr_t) noexcept
0315                 : async_generator_advance_operation(nullptr)
0316             {}
0317 
0318             async_generator_begin_operation(handle_type producerCoroutine) noexcept
0319                 : async_generator_advance_operation(producerCoroutine.promise(), producerCoroutine)
0320             {}
0321 
0322             bool await_ready() const noexcept
0323             {
0324                 return m_promise == nullptr || async_generator_advance_operation::await_ready();
0325             }
0326 
0327             async_generator_iterator<T> await_resume()
0328             {
0329                 if (m_promise == nullptr)
0330                 {
0331                     // Called begin() on the empty generator.
0332                     return async_generator_iterator<T>{ nullptr };
0333                 }
0334                 else if (m_promise->finished())
0335                 {
0336                     // Completed without yielding any values.
0337                     m_promise->rethrow_if_unhandled_exception();
0338                     return async_generator_iterator<T>{ nullptr };
0339                 }
0340 
0341                 return async_generator_iterator<T>{
0342                     handle_type::from_promise(*static_cast<promise_type*>(m_promise))
0343                 };
0344             }
0345         };
0346     }
0347 
0348     template<typename T>
0349     class [[nodiscard]] async_generator
0350     {
0351     public:
0352 
0353         using promise_type = detail::async_generator_promise<T>;
0354         using iterator = detail::async_generator_iterator<T>;
0355 
0356         async_generator() noexcept
0357             : m_coroutine(nullptr)
0358         {}
0359 
0360         explicit async_generator(promise_type& promise) noexcept
0361             : m_coroutine(cppcoro::coroutine_handle<promise_type>::from_promise(promise))
0362         {}
0363 
0364         async_generator(async_generator&& other) noexcept
0365             : m_coroutine(other.m_coroutine)
0366         {
0367             other.m_coroutine = nullptr;
0368         }
0369 
0370         ~async_generator()
0371         {
0372             if (m_coroutine)
0373             {
0374                 m_coroutine.destroy();
0375             }
0376         }
0377 
0378         async_generator& operator=(async_generator&& other) noexcept
0379         {
0380             async_generator temp(std::move(other));
0381             swap(temp);
0382             return *this;
0383         }
0384 
0385         async_generator(const async_generator&) = delete;
0386         async_generator& operator=(const async_generator&) = delete;
0387 
0388         auto begin() noexcept
0389         {
0390             if (!m_coroutine)
0391             {
0392                 return detail::async_generator_begin_operation<T>{ nullptr };
0393             }
0394 
0395             return detail::async_generator_begin_operation<T>{ m_coroutine };
0396         }
0397 
0398         auto end() noexcept
0399         {
0400             return iterator{ nullptr };
0401         }
0402 
0403         void swap(async_generator& other) noexcept
0404         {
0405             using std::swap;
0406             swap(m_coroutine, other.m_coroutine);
0407         }
0408 
0409     private:
0410 
0411         cppcoro::coroutine_handle<promise_type> m_coroutine;
0412 
0413     };
0414 
0415     template<typename T>
0416     void swap(async_generator<T>& a, async_generator<T>& b) noexcept
0417     {
0418         a.swap(b);
0419     }
0420 
0421     namespace detail
0422     {
0423         template<typename T>
0424         async_generator<T> async_generator_promise<T>::get_return_object() noexcept
0425         {
0426             return async_generator<T>{ *this };
0427         }
0428     }
0429 #else // !CPPCORO_COMPILER_SUPPORTS_SYMMETRIC_TRANSFER
0430 
0431     namespace detail
0432     {
0433         template<typename T>
0434         class async_generator_iterator;
0435         class async_generator_yield_operation;
0436         class async_generator_advance_operation;
0437 
0438         class async_generator_promise_base
0439         {
0440         public:
0441 
0442             async_generator_promise_base() noexcept
0443                 : m_state(state::value_ready_producer_suspended)
0444                 , m_exception(nullptr)
0445             {
0446                 // Other variables left intentionally uninitialised as they're
0447                 // only referenced in certain states by which time they should
0448                 // have been initialised.
0449             }
0450 
0451             async_generator_promise_base(const async_generator_promise_base& other) = delete;
0452             async_generator_promise_base& operator=(const async_generator_promise_base& other) = delete;
0453 
0454             cppcoro::suspend_always initial_suspend() const noexcept
0455             {
0456                 return {};
0457             }
0458 
0459             async_generator_yield_operation final_suspend() noexcept;
0460 
0461             void unhandled_exception() noexcept
0462             {
0463                 // Don't bother capturing the exception if we have been cancelled
0464                 // as there is no consumer that will see it.
0465                 if (m_state.load(std::memory_order_relaxed) != state::cancelled)
0466                 {
0467                     m_exception = std::current_exception();
0468                 }
0469             }
0470 
0471             void return_void() noexcept
0472             {
0473             }
0474 
0475             /// Query if the generator has reached the end of the sequence.
0476             ///
0477             /// Only valid to call after resuming from an awaited advance operation.
0478             /// i.e. Either a begin() or iterator::operator++() operation.
0479             bool finished() const noexcept
0480             {
0481                 return m_currentValue == nullptr;
0482             }
0483 
0484             void rethrow_if_unhandled_exception()
0485             {
0486                 if (m_exception)
0487                 {
0488                     std::rethrow_exception(std::move(m_exception));
0489                 }
0490             }
0491 
0492             /// Request that the generator cancel generation of new items.
0493             ///
0494             /// \return
0495             /// Returns true if the request was completed synchronously and the associated
0496             /// producer coroutine is now available to be destroyed. In which case the caller
0497             /// is expected to call destroy() on the coroutine_handle.
0498             /// Returns false if the producer coroutine was not at a suitable suspend-point.
0499             /// The coroutine will be destroyed when it next reaches a co_yield or co_return
0500             /// statement.
0501             bool request_cancellation() noexcept
0502             {
0503                 const auto previousState = m_state.exchange(state::cancelled, std::memory_order_acq_rel);
0504 
0505                 // Not valid to destroy async_generator<T> object if consumer coroutine still suspended
0506                 // in a co_await for next item.
0507                 assert(previousState != state::value_not_ready_consumer_suspended);
0508 
0509                 // A coroutine should only ever be cancelled once, from the destructor of the
0510                 // owning async_generator<T> object.
0511                 assert(previousState != state::cancelled);
0512 
0513                 return previousState == state::value_ready_producer_suspended;
0514             }
0515 
0516         protected:
0517 
0518             async_generator_yield_operation internal_yield_value() noexcept;
0519 
0520         private:
0521 
0522             friend class async_generator_yield_operation;
0523             friend class async_generator_advance_operation;
0524 
0525             // State transition diagram
0526             //   VNRCA - value_not_ready_consumer_active
0527             //   VNRCS - value_not_ready_consumer_suspended
0528             //   VRPA  - value_ready_producer_active
0529             //   VRPS  - value_ready_producer_suspended
0530             //
0531             //       A         +---  VNRCA --[C]--> VNRCS   yield_value()
0532             //       |         |     |  A           |  A       |   .
0533             //       |        [C]   [P] |          [P] |       |   .
0534             //       |         |     | [C]          | [C]      |   .
0535             //       |         |     V  |           V  |       |   .
0536             //  operator++/    |     VRPS <--[P]--- VRPA       V   |
0537             //  begin()        |      |              |             |
0538             //                 |     [C]            [C]            |
0539             //                 |      +----+     +---+             |
0540             //                 |           |     |                 |
0541             //                 |           V     V                 V
0542             //                 +--------> cancelled         ~async_generator()
0543             //
0544             // [C] - Consumer performs this transition
0545             // [P] - Producer performs this transition
0546             enum class state
0547             {
0548                 value_not_ready_consumer_active,
0549                 value_not_ready_consumer_suspended,
0550                 value_ready_producer_active,
0551                 value_ready_producer_suspended,
0552                 cancelled
0553             };
0554 
0555             std::atomic<state> m_state;
0556 
0557             std::exception_ptr m_exception;
0558 
0559             cppcoro::coroutine_handle<> m_consumerCoroutine;
0560 
0561         protected:
0562 
0563             void* m_currentValue;
0564         };
0565 
0566         class async_generator_yield_operation final
0567         {
0568             using state = async_generator_promise_base::state;
0569 
0570         public:
0571 
0572             async_generator_yield_operation(async_generator_promise_base& promise, state initialState) noexcept
0573                 : m_promise(promise)
0574                 , m_initialState(initialState)
0575             {}
0576 
0577             bool await_ready() const noexcept
0578             {
0579                 return m_initialState == state::value_not_ready_consumer_suspended;
0580             }
0581 
0582             bool await_suspend(cppcoro::coroutine_handle<> producer) noexcept;
0583 
0584             void await_resume() noexcept {}
0585 
0586         private:
0587             async_generator_promise_base& m_promise;
0588             state m_initialState;
0589         };
0590 
0591         inline async_generator_yield_operation async_generator_promise_base::final_suspend() noexcept
0592         {
0593             m_currentValue = nullptr;
0594             return internal_yield_value();
0595         }
0596 
0597         inline async_generator_yield_operation async_generator_promise_base::internal_yield_value() noexcept
0598         {
0599             state currentState = m_state.load(std::memory_order_acquire);
0600             assert(currentState != state::value_ready_producer_active);
0601             assert(currentState != state::value_ready_producer_suspended);
0602 
0603             if (currentState == state::value_not_ready_consumer_suspended)
0604             {
0605                 // Only need relaxed memory order since we're resuming the
0606                 // consumer on the same thread.
0607                 m_state.store(state::value_ready_producer_active, std::memory_order_relaxed);
0608 
0609                 // Resume the consumer.
0610                 // It might ask for another value before returning, in which case it'll
0611                 // transition to value_not_ready_consumer_suspended and we can return from
0612                 // yield_value without suspending, otherwise we should try to suspend
0613                 // the producer in which case the consumer will wake us up again
0614                 // when it wants the next value.
0615                 m_consumerCoroutine.resume();
0616 
0617                 // Need to use acquire semantics here since it's possible that the
0618                 // consumer might have asked for the next value on a different thread
0619                 // which executed concurrently with the call to m_consumerCoro on the
0620                 // current thread above.
0621                 currentState = m_state.load(std::memory_order_acquire);
0622             }
0623 
0624             return async_generator_yield_operation{ *this, currentState };
0625         }
0626 
0627         inline bool async_generator_yield_operation::await_suspend(
0628             cppcoro::coroutine_handle<> producer) noexcept
0629         {
0630             state currentState = m_initialState;
0631             if (currentState == state::value_not_ready_consumer_active)
0632             {
0633                 bool producerSuspended = m_promise.m_state.compare_exchange_strong(
0634                     currentState,
0635                     state::value_ready_producer_suspended,
0636                     std::memory_order_release,
0637                     std::memory_order_acquire);
0638                 if (producerSuspended)
0639                 {
0640                     return true;
0641                 }
0642 
0643                 if (currentState == state::value_not_ready_consumer_suspended)
0644                 {
0645                     // Can get away with using relaxed memory semantics here since we're
0646                     // resuming the consumer on the current thread.
0647                     m_promise.m_state.store(state::value_ready_producer_active, std::memory_order_relaxed);
0648 
0649                     m_promise.m_consumerCoroutine.resume();
0650 
0651                     // The consumer might have asked for another value before returning, in which case
0652                     // it'll transition to value_not_ready_consumer_suspended and we can return without
0653                     // suspending, otherwise we should try to suspend the producer, in which case the
0654                     // consumer will wake us up again when it wants the next value.
0655                     //
0656                     // Need to use acquire semantics here since it's possible that the consumer might
0657                     // have asked for the next value on a different thread which executed concurrently
0658                     // with the call to m_consumerCoro.resume() above.
0659                     currentState = m_promise.m_state.load(std::memory_order_acquire);
0660                     if (currentState == state::value_not_ready_consumer_suspended)
0661                     {
0662                         return false;
0663                     }
0664                 }
0665             }
0666 
0667             // By this point the consumer has been resumed if required and is now active.
0668 
0669             if (currentState == state::value_ready_producer_active)
0670             {
0671                 // Try to suspend the producer.
0672                 // If we failed to suspend then it's either because the consumer destructed, transitioning
0673                 // the state to cancelled, or requested the next item, transitioning the state to value_not_ready_consumer_suspended.
0674                 const bool suspendedProducer = m_promise.m_state.compare_exchange_strong(
0675                     currentState,
0676                     state::value_ready_producer_suspended,
0677                     std::memory_order_release,
0678                     std::memory_order_acquire);
0679                 if (suspendedProducer)
0680                 {
0681                     return true;
0682                 }
0683 
0684                 if (currentState == state::value_not_ready_consumer_suspended)
0685                 {
0686                     // Consumer has asked for the next value.
0687                     return false;
0688                 }
0689             }
0690 
0691             assert(currentState == state::cancelled);
0692 
0693             // async_generator object has been destroyed and we're now at a
0694             // co_yield/co_return suspension point so we can just destroy
0695             // the coroutine.
0696             producer.destroy();
0697 
0698             return true;
0699         }
0700 
0701         class async_generator_advance_operation
0702         {
0703             using state = async_generator_promise_base::state;
0704 
0705         protected:
0706 
0707             async_generator_advance_operation(std::nullptr_t) noexcept
0708                 : m_promise(nullptr)
0709                 , m_producerCoroutine(nullptr)
0710             {}
0711 
0712             async_generator_advance_operation(
0713                 async_generator_promise_base& promise,
0714                 cppcoro::coroutine_handle<> producerCoroutine) noexcept
0715                 : m_promise(std::addressof(promise))
0716                 , m_producerCoroutine(producerCoroutine)
0717             {
0718                 state initialState = promise.m_state.load(std::memory_order_acquire);
0719                 if (initialState == state::value_ready_producer_suspended)
0720                 {
0721                     // Can use relaxed memory order here as we will be resuming the producer
0722                     // on the same thread.
0723                     promise.m_state.store(state::value_not_ready_consumer_active, std::memory_order_relaxed);
0724 
0725                     producerCoroutine.resume();
0726 
0727                     // Need to use acquire memory order here since it's possible that the
0728                     // coroutine may have transferred execution to another thread and
0729                     // completed on that other thread before the call to resume() returns.
0730                     initialState = promise.m_state.load(std::memory_order_acquire);
0731                 }
0732 
0733                 m_initialState = initialState;
0734             }
0735 
0736         public:
0737 
0738             bool await_ready() const noexcept
0739             {
0740                 return m_initialState == state::value_ready_producer_suspended;
0741             }
0742 
0743             bool await_suspend(cppcoro::coroutine_handle<> consumerCoroutine) noexcept
0744             {
0745                 m_promise->m_consumerCoroutine = consumerCoroutine;
0746 
0747                 auto currentState = m_initialState;
0748                 if (currentState == state::value_ready_producer_active)
0749                 {
0750                     // A potential race between whether consumer or producer coroutine
0751                     // suspends first. Resolve the race using a compare-exchange.
0752                     if (m_promise->m_state.compare_exchange_strong(
0753                         currentState,
0754                         state::value_not_ready_consumer_suspended,
0755                         std::memory_order_release,
0756                         std::memory_order_acquire))
0757                     {
0758                         return true;
0759                     }
0760 
0761                     assert(currentState == state::value_ready_producer_suspended);
0762 
0763                     m_promise->m_state.store(state::value_not_ready_consumer_active, std::memory_order_relaxed);
0764 
0765                     m_producerCoroutine.resume();
0766 
0767                     currentState = m_promise->m_state.load(std::memory_order_acquire);
0768                     if (currentState == state::value_ready_producer_suspended)
0769                     {
0770                         // Producer coroutine produced a value synchronously.
0771                         return false;
0772                     }
0773                 }
0774 
0775                 assert(currentState == state::value_not_ready_consumer_active);
0776 
0777                 // Try to suspend consumer coroutine, transitioning to value_not_ready_consumer_suspended.
0778                 // This could be racing with producer making the next value available and suspending
0779                 // (transition to value_ready_producer_suspended) so we use compare_exchange to decide who
0780                 // wins the race.
0781                 // If compare_exchange succeeds then consumer suspended (and we return true).
0782                 // If it fails then producer yielded next value and suspended and we can return
0783                 // synchronously without suspended (ie. return false).
0784                 return m_promise->m_state.compare_exchange_strong(
0785                     currentState,
0786                     state::value_not_ready_consumer_suspended,
0787                     std::memory_order_release,
0788                     std::memory_order_acquire);
0789             }
0790 
0791         protected:
0792 
0793             async_generator_promise_base* m_promise;
0794             cppcoro::coroutine_handle<> m_producerCoroutine;
0795 
0796         private:
0797 
0798             state m_initialState;
0799 
0800         };
0801 
0802         template<typename T>
0803         class async_generator_promise final : public async_generator_promise_base
0804         {
0805             using value_type = std::remove_reference_t<T>;
0806 
0807         public:
0808 
0809             async_generator_promise() noexcept = default;
0810 
0811             async_generator<T> get_return_object() noexcept;
0812 
0813             async_generator_yield_operation yield_value(value_type& value) noexcept
0814             {
0815                 m_currentValue = std::addressof(value);
0816                 return internal_yield_value();
0817             }
0818 
0819             async_generator_yield_operation yield_value(value_type&& value) noexcept
0820             {
0821                 return yield_value(value);
0822             }
0823 
0824             T& value() const noexcept
0825             {
0826                 return *static_cast<T*>(m_currentValue);
0827             }
0828 
0829         };
0830 
0831         template<typename T>
0832         class async_generator_promise<T&&> final : public async_generator_promise_base
0833         {
0834         public:
0835 
0836             async_generator_promise() noexcept = default;
0837 
0838             async_generator<T> get_return_object() noexcept;
0839 
0840             async_generator_yield_operation yield_value(T&& value) noexcept
0841             {
0842                 m_currentValue = std::addressof(value);
0843                 return internal_yield_value();
0844             }
0845 
0846             T&& value() const noexcept
0847             {
0848                 return std::move(*static_cast<T*>(m_currentValue));
0849             }
0850 
0851         };
0852 
0853         template<typename T>
0854         class async_generator_increment_operation final : public async_generator_advance_operation
0855         {
0856         public:
0857 
0858             async_generator_increment_operation(async_generator_iterator<T>& iterator) noexcept
0859                 : async_generator_advance_operation(iterator.m_coroutine.promise(), iterator.m_coroutine)
0860                 , m_iterator(iterator)
0861             {}
0862 
0863             async_generator_iterator<T>& await_resume();
0864 
0865         private:
0866 
0867             async_generator_iterator<T>& m_iterator;
0868 
0869         };
0870 
0871         template<typename T>
0872         class async_generator_iterator final
0873         {
0874             using promise_type = async_generator_promise<T>;
0875             using handle_type = cppcoro::coroutine_handle<promise_type>;
0876 
0877         public:
0878 
0879             using iterator_category = std::input_iterator_tag;
0880             // Not sure what type should be used for difference_type as we don't
0881             // allow calculating difference between two iterators.
0882             using difference_type = std::ptrdiff_t;
0883             using value_type = std::remove_reference_t<T>;
0884             using reference = std::add_lvalue_reference_t<T>;
0885             using pointer = std::add_pointer_t<value_type>;
0886 
0887             async_generator_iterator(std::nullptr_t) noexcept
0888                 : m_coroutine(nullptr)
0889             {}
0890 
0891             async_generator_iterator(handle_type coroutine) noexcept
0892                 : m_coroutine(coroutine)
0893             {}
0894 
0895             async_generator_increment_operation<T> operator++() noexcept
0896             {
0897                 return async_generator_increment_operation<T>{ *this };
0898             }
0899 
0900             reference operator*() const noexcept
0901             {
0902                 return m_coroutine.promise().value();
0903             }
0904 
0905             bool operator==(const async_generator_iterator& other) const noexcept
0906             {
0907                 return m_coroutine == other.m_coroutine;
0908             }
0909 
0910             bool operator!=(const async_generator_iterator& other) const noexcept
0911             {
0912                 return !(*this == other);
0913             }
0914 
0915         private:
0916 
0917             friend class async_generator_increment_operation<T>;
0918 
0919             handle_type m_coroutine;
0920 
0921         };
0922 
0923         template<typename T>
0924         async_generator_iterator<T>& async_generator_increment_operation<T>::await_resume()
0925         {
0926             if (m_promise->finished())
0927             {
0928                 // Update iterator to end()
0929                 m_iterator = async_generator_iterator<T>{ nullptr };
0930                 m_promise->rethrow_if_unhandled_exception();
0931             }
0932 
0933             return m_iterator;
0934         }
0935 
0936         template<typename T>
0937         class async_generator_begin_operation final : public async_generator_advance_operation
0938         {
0939             using promise_type = async_generator_promise<T>;
0940             using handle_type = cppcoro::coroutine_handle<promise_type>;
0941 
0942         public:
0943 
0944             async_generator_begin_operation(std::nullptr_t) noexcept
0945                 : async_generator_advance_operation(nullptr)
0946             {}
0947 
0948             async_generator_begin_operation(handle_type producerCoroutine) noexcept
0949                 : async_generator_advance_operation(producerCoroutine.promise(), producerCoroutine)
0950             {}
0951 
0952             bool await_ready() const noexcept
0953             {
0954                 return m_promise == nullptr || async_generator_advance_operation::await_ready();
0955             }
0956 
0957             async_generator_iterator<T> await_resume()
0958             {
0959                 if (m_promise == nullptr)
0960                 {
0961                     // Called begin() on the empty generator.
0962                     return async_generator_iterator<T>{ nullptr };
0963                 }
0964                 else if (m_promise->finished())
0965                 {
0966                     // Completed without yielding any values.
0967                     m_promise->rethrow_if_unhandled_exception();
0968                     return async_generator_iterator<T>{ nullptr };
0969                 }
0970 
0971                 return async_generator_iterator<T>{
0972                     handle_type::from_promise(*static_cast<promise_type*>(m_promise))
0973                 };
0974             }
0975         };
0976     }
0977 
0978     template<typename T>
0979     class async_generator
0980     {
0981     public:
0982 
0983         using promise_type = detail::async_generator_promise<T>;
0984         using iterator = detail::async_generator_iterator<T>;
0985 
0986         async_generator() noexcept
0987             : m_coroutine(nullptr)
0988         {}
0989 
0990         explicit async_generator(promise_type& promise) noexcept
0991             : m_coroutine(cppcoro::coroutine_handle<promise_type>::from_promise(promise))
0992         {}
0993 
0994         async_generator(async_generator&& other) noexcept
0995             : m_coroutine(other.m_coroutine)
0996         {
0997             other.m_coroutine = nullptr;
0998         }
0999 
1000         ~async_generator()
1001         {
1002             if (m_coroutine)
1003             {
1004                 if (m_coroutine.promise().request_cancellation())
1005                 {
1006                     m_coroutine.destroy();
1007                 }
1008             }
1009         }
1010 
1011         async_generator& operator=(async_generator&& other) noexcept
1012         {
1013             async_generator temp(std::move(other));
1014             swap(temp);
1015             return *this;
1016         }
1017 
1018         async_generator(const async_generator&) = delete;
1019         async_generator& operator=(const async_generator&) = delete;
1020 
1021         auto begin() noexcept
1022         {
1023             if (!m_coroutine)
1024             {
1025                 return detail::async_generator_begin_operation<T>{ nullptr };
1026             }
1027 
1028             return detail::async_generator_begin_operation<T>{ m_coroutine };
1029         }
1030 
1031         auto end() noexcept
1032         {
1033             return iterator{ nullptr };
1034         }
1035 
1036         void swap(async_generator& other) noexcept
1037         {
1038             using std::swap;
1039             swap(m_coroutine, other.m_coroutine);
1040         }
1041 
1042     private:
1043 
1044         cppcoro::coroutine_handle<promise_type> m_coroutine;
1045 
1046     };
1047 
1048     template<typename T>
1049     void swap(async_generator<T>& a, async_generator<T>& b) noexcept
1050     {
1051         a.swap(b);
1052     }
1053 
1054     namespace detail
1055     {
1056         template<typename T>
1057         async_generator<T> async_generator_promise<T>::get_return_object() noexcept
1058         {
1059             return async_generator<T>{ *this };
1060         }
1061     }
1062 #endif // !CPPCORO_COMPILER_SUPPORTS_SYMMETRIC_TRANSFER
1063 
1064     template<typename FUNC, typename T>
1065     async_generator<std::invoke_result_t<FUNC&, decltype(*std::declval<typename async_generator<T>::iterator&>())>> fmap(
1066         FUNC func,
1067         async_generator<T> source)
1068     {
1069         static_assert(
1070             !std::is_reference_v<FUNC>,
1071             "Passing by reference to async_generator<T> coroutine is unsafe. "
1072             "Use std::ref or std::cref to explicitly pass by reference.");
1073 
1074         // Explicitly hand-coding the loop here rather than using range-based
1075         // for loop since it's difficult to std::forward<???> the value of a
1076         // range-based for-loop, preserving the value category of operator*
1077         // return-value.
1078         auto it = co_await source.begin();
1079         const auto itEnd = source.end();
1080         while (it != itEnd)
1081         {
1082             co_yield std::invoke(func, *it);
1083             (void)co_await ++it;
1084         }
1085     }
1086 }
1087 
1088 #endif