Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:54:52

0001 ///////////////////////////////////////////////////////////////////////////////
0002 // Copyright (c) Lewis Baker
0003 // Licenced under MIT license. See LICENSE.txt for details.
0004 ///////////////////////////////////////////////////////////////////////////////
0005 #ifndef CPPCORO_SINGLE_PRODUCER_SEQUENCER_HPP_INCLUDED
0006 #define CPPCORO_SINGLE_PRODUCER_SEQUENCER_HPP_INCLUDED
0007 
0008 #include <cppcoro/config.hpp>
0009 #include <cppcoro/sequence_barrier.hpp>
0010 #include <cppcoro/sequence_range.hpp>
0011 
0012 namespace cppcoro
0013 {
0014     template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0015     class single_producer_sequencer_claim_one_operation;
0016 
0017     template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0018     class single_producer_sequencer_claim_operation;
0019 
0020     template<
0021         typename SEQUENCE = std::size_t,
0022         typename TRAITS = sequence_traits<SEQUENCE>>
0023     class single_producer_sequencer
0024     {
0025     public:
0026 
0027         using size_type = typename sequence_range<SEQUENCE, TRAITS>::size_type;
0028 
0029         single_producer_sequencer(
0030             const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
0031             std::size_t bufferSize,
0032             SEQUENCE initialSequence = TRAITS::initial_sequence) noexcept
0033             : m_consumerBarrier(consumerBarrier)
0034             , m_bufferSize(bufferSize)
0035             , m_nextToClaim(initialSequence + 1)
0036             , m_producerBarrier(initialSequence)
0037         {}
0038 
0039         /// Claim a slot in the ring buffer asynchronously.
0040         ///
0041         /// \return
0042         /// Returns an operation that when awaited will suspend the coroutine until
0043         /// a slot is available for writing in the ring buffer. The result of the
0044         /// co_await expression will be the sequence number of the slot.
0045         /// The caller must publish() the claimed sequence number once they have written to
0046         /// the ring-buffer.
0047         template<typename SCHEDULER>
0048         [[nodiscard]]
0049         single_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS, SCHEDULER>
0050         claim_one(SCHEDULER& scheduler) noexcept;
0051 
0052         /// Claim one or more contiguous slots in the ring-buffer.
0053         ///
0054         /// Use this method over many calls to claim_one() when you have multiple elements to
0055         /// enqueue. This will claim as many slots as are available up to the specified count
0056         /// but may claim as few as one slot if only one slot is available.
0057         ///
0058         /// \param count
0059         /// The maximum number of slots to claim.
0060         ///
0061         /// \return
0062         /// Returns an awaitable object that when awaited returns a sequence_range that contains
0063         /// the range of sequence numbers that were claimed. Once you have written element values
0064         /// to all of the claimed slots you must publish() the sequence range in order to make
0065         /// the elements available to consumers.
0066         template<typename SCHEDULER>
0067         [[nodiscard]]
0068         single_producer_sequencer_claim_operation<SEQUENCE, TRAITS, SCHEDULER> claim_up_to(
0069             std::size_t count, SCHEDULER& scheduler) noexcept;
0070 
0071         /// Publish the specified sequence number.
0072         ///
0073         /// This also implies that all prior sequence numbers have already been published.
0074         void publish(SEQUENCE sequence) noexcept
0075         {
0076             m_producerBarrier.publish(sequence);
0077         }
0078 
0079         /// Publish a contiguous range of sequence numbers.
0080         ///
0081         /// You must have already published all prior sequence numbers.
0082         ///
0083         /// This is equivalent to just publishing the last sequence number in the range.
0084         void publish(const sequence_range<SEQUENCE, TRAITS>& sequences) noexcept
0085         {
0086             m_producerBarrier.publish(sequences.back());
0087         }
0088 
0089         /// Query what the last-published sequence number is.
0090         ///
0091         /// You can assume that all prior sequence numbers are also published.
0092         SEQUENCE last_published() const noexcept
0093         {
0094             return m_producerBarrier.last_published();
0095         }
0096 
0097         /// Asynchronously wait until the specified sequence number is published.
0098         ///
0099         /// \param targetSequence
0100         /// The sequence number to wait for.
0101         ///
0102         /// \return
0103         /// Returns an Awaitable type that, when awaited, will suspend the awaiting coroutine until the
0104         /// specified sequence number has been published.
0105         ///
0106         /// The result of the 'co_await barrier.wait_until_published(seq)' expression will be the
0107         /// last-published sequence number, which is guaranteed to be at least 'seq' but may be some
0108         /// subsequent sequence number if additional items were published while waiting for the
0109         /// the requested sequence number to be published.
0110         template<typename SCHEDULER>
0111         [[nodiscard]]
0112         auto wait_until_published(SEQUENCE targetSequence, SCHEDULER& scheduler) const noexcept
0113         {
0114             return m_producerBarrier.wait_until_published(targetSequence, scheduler);
0115         }
0116 
0117     private:
0118 
0119         template<typename SEQUENCE2, typename TRAITS2, typename SCHEDULER>
0120         friend class single_producer_sequencer_claim_operation;
0121 
0122         template<typename SEQUENCE2, typename TRAITS2, typename SCHEDULER>
0123         friend class single_producer_sequencer_claim_one_operation;
0124 
0125 #if CPPCORO_COMPILER_MSVC
0126 # pragma warning(push)
0127 # pragma warning(disable : 4324) // C4324: structure was padded due to alignment specifier
0128 #endif
0129 
0130         const sequence_barrier<SEQUENCE, TRAITS>& m_consumerBarrier;
0131         const std::size_t m_bufferSize;
0132 
0133         alignas(CPPCORO_CPU_CACHE_LINE)
0134         SEQUENCE m_nextToClaim;
0135 
0136         sequence_barrier<SEQUENCE, TRAITS> m_producerBarrier;
0137 
0138 #if CPPCORO_COMPILER_MSVC
0139 # pragma warning(pop)
0140 #endif
0141     };
0142 
0143     template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0144     class single_producer_sequencer_claim_one_operation
0145     {
0146     public:
0147 
0148         single_producer_sequencer_claim_one_operation(
0149             single_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
0150             SCHEDULER& scheduler) noexcept
0151             : m_consumerWaitOperation(
0152                 sequencer.m_consumerBarrier,
0153                 static_cast<SEQUENCE>(sequencer.m_nextToClaim - sequencer.m_bufferSize),
0154                 scheduler)
0155             , m_sequencer(sequencer)
0156         {}
0157 
0158         bool await_ready() const noexcept
0159         {
0160             return m_consumerWaitOperation.await_ready();
0161         }
0162 
0163         auto await_suspend(cppcoro::coroutine_handle<> awaitingCoroutine) noexcept
0164         {
0165             return m_consumerWaitOperation.await_suspend(awaitingCoroutine);
0166         }
0167 
0168         SEQUENCE await_resume() const noexcept
0169         {
0170             return m_sequencer.m_nextToClaim++;
0171         }
0172 
0173     private:
0174 
0175         sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER> m_consumerWaitOperation;
0176         single_producer_sequencer<SEQUENCE, TRAITS>& m_sequencer;
0177 
0178     };
0179 
0180     template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0181     class single_producer_sequencer_claim_operation
0182     {
0183     public:
0184 
0185         explicit single_producer_sequencer_claim_operation(
0186             single_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
0187             std::size_t count,
0188             SCHEDULER& scheduler) noexcept
0189             : m_consumerWaitOperation(
0190                 sequencer.m_consumerBarrier,
0191                 static_cast<SEQUENCE>(sequencer.m_nextToClaim - sequencer.m_bufferSize),
0192                 scheduler)
0193             , m_sequencer(sequencer)
0194             , m_count(count)
0195         {}
0196 
0197         bool await_ready() const noexcept
0198         {
0199             return m_consumerWaitOperation.await_ready();
0200         }
0201 
0202         auto await_suspend(cppcoro::coroutine_handle<> awaitingCoroutine) noexcept
0203         {
0204             return m_consumerWaitOperation.await_suspend(awaitingCoroutine);
0205         }
0206 
0207         sequence_range<SEQUENCE, TRAITS> await_resume() noexcept
0208         {
0209             const SEQUENCE lastAvailableSequence =
0210                 static_cast<SEQUENCE>(m_consumerWaitOperation.await_resume() + m_sequencer.m_bufferSize);
0211             const SEQUENCE begin = m_sequencer.m_nextToClaim;
0212             const std::size_t availableCount = static_cast<std::size_t>(lastAvailableSequence - begin) + 1;
0213             const std::size_t countToClaim = std::min(m_count, availableCount);
0214             const SEQUENCE end = static_cast<SEQUENCE>(begin + countToClaim);
0215             m_sequencer.m_nextToClaim = end;
0216             return sequence_range<SEQUENCE, TRAITS>(begin, end);
0217         }
0218 
0219     private:
0220 
0221         sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER> m_consumerWaitOperation;
0222         single_producer_sequencer<SEQUENCE, TRAITS>& m_sequencer;
0223         std::size_t m_count;
0224 
0225     };
0226 
0227     template<typename SEQUENCE, typename TRAITS>
0228     template<typename SCHEDULER>
0229     [[nodiscard]]
0230     single_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS, SCHEDULER>
0231     single_producer_sequencer<SEQUENCE, TRAITS>::claim_one(SCHEDULER& scheduler) noexcept
0232     {
0233         return single_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS, SCHEDULER>{ *this, scheduler };
0234     }
0235 
0236     template<typename SEQUENCE, typename TRAITS>
0237     template<typename SCHEDULER>
0238     [[nodiscard]]
0239     single_producer_sequencer_claim_operation<SEQUENCE, TRAITS, SCHEDULER>
0240     single_producer_sequencer<SEQUENCE, TRAITS>::claim_up_to(std::size_t count, SCHEDULER& scheduler) noexcept
0241     {
0242         return single_producer_sequencer_claim_operation<SEQUENCE, TRAITS, SCHEDULER>(*this, count, scheduler);
0243     }
0244 }
0245 
0246 #endif