File indexing completed on 2025-01-18 09:54:52
0001
0002
0003
0004
0005 #ifndef CPPCORO_SINGLE_PRODUCER_SEQUENCER_HPP_INCLUDED
0006 #define CPPCORO_SINGLE_PRODUCER_SEQUENCER_HPP_INCLUDED
0007
0008 #include <cppcoro/config.hpp>
0009 #include <cppcoro/sequence_barrier.hpp>
0010 #include <cppcoro/sequence_range.hpp>
0011
0012 namespace cppcoro
0013 {
0014 template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0015 class single_producer_sequencer_claim_one_operation;
0016
0017 template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0018 class single_producer_sequencer_claim_operation;
0019
0020 template<
0021 typename SEQUENCE = std::size_t,
0022 typename TRAITS = sequence_traits<SEQUENCE>>
0023 class single_producer_sequencer
0024 {
0025 public:
0026
0027 using size_type = typename sequence_range<SEQUENCE, TRAITS>::size_type;
0028
0029 single_producer_sequencer(
0030 const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
0031 std::size_t bufferSize,
0032 SEQUENCE initialSequence = TRAITS::initial_sequence) noexcept
0033 : m_consumerBarrier(consumerBarrier)
0034 , m_bufferSize(bufferSize)
0035 , m_nextToClaim(initialSequence + 1)
0036 , m_producerBarrier(initialSequence)
0037 {}
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047 template<typename SCHEDULER>
0048 [[nodiscard]]
0049 single_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS, SCHEDULER>
0050 claim_one(SCHEDULER& scheduler) noexcept;
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066 template<typename SCHEDULER>
0067 [[nodiscard]]
0068 single_producer_sequencer_claim_operation<SEQUENCE, TRAITS, SCHEDULER> claim_up_to(
0069 std::size_t count, SCHEDULER& scheduler) noexcept;
0070
0071
0072
0073
0074 void publish(SEQUENCE sequence) noexcept
0075 {
0076 m_producerBarrier.publish(sequence);
0077 }
0078
0079
0080
0081
0082
0083
0084 void publish(const sequence_range<SEQUENCE, TRAITS>& sequences) noexcept
0085 {
0086 m_producerBarrier.publish(sequences.back());
0087 }
0088
0089
0090
0091
0092 SEQUENCE last_published() const noexcept
0093 {
0094 return m_producerBarrier.last_published();
0095 }
0096
0097
0098
0099
0100
0101
0102
0103
0104
0105
0106
0107
0108
0109
0110 template<typename SCHEDULER>
0111 [[nodiscard]]
0112 auto wait_until_published(SEQUENCE targetSequence, SCHEDULER& scheduler) const noexcept
0113 {
0114 return m_producerBarrier.wait_until_published(targetSequence, scheduler);
0115 }
0116
0117 private:
0118
0119 template<typename SEQUENCE2, typename TRAITS2, typename SCHEDULER>
0120 friend class single_producer_sequencer_claim_operation;
0121
0122 template<typename SEQUENCE2, typename TRAITS2, typename SCHEDULER>
0123 friend class single_producer_sequencer_claim_one_operation;
0124
0125 #if CPPCORO_COMPILER_MSVC
0126 # pragma warning(push)
0127 # pragma warning(disable : 4324)
0128 #endif
0129
0130 const sequence_barrier<SEQUENCE, TRAITS>& m_consumerBarrier;
0131 const std::size_t m_bufferSize;
0132
0133 alignas(CPPCORO_CPU_CACHE_LINE)
0134 SEQUENCE m_nextToClaim;
0135
0136 sequence_barrier<SEQUENCE, TRAITS> m_producerBarrier;
0137
0138 #if CPPCORO_COMPILER_MSVC
0139 # pragma warning(pop)
0140 #endif
0141 };
0142
0143 template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0144 class single_producer_sequencer_claim_one_operation
0145 {
0146 public:
0147
0148 single_producer_sequencer_claim_one_operation(
0149 single_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
0150 SCHEDULER& scheduler) noexcept
0151 : m_consumerWaitOperation(
0152 sequencer.m_consumerBarrier,
0153 static_cast<SEQUENCE>(sequencer.m_nextToClaim - sequencer.m_bufferSize),
0154 scheduler)
0155 , m_sequencer(sequencer)
0156 {}
0157
0158 bool await_ready() const noexcept
0159 {
0160 return m_consumerWaitOperation.await_ready();
0161 }
0162
0163 auto await_suspend(cppcoro::coroutine_handle<> awaitingCoroutine) noexcept
0164 {
0165 return m_consumerWaitOperation.await_suspend(awaitingCoroutine);
0166 }
0167
0168 SEQUENCE await_resume() const noexcept
0169 {
0170 return m_sequencer.m_nextToClaim++;
0171 }
0172
0173 private:
0174
0175 sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER> m_consumerWaitOperation;
0176 single_producer_sequencer<SEQUENCE, TRAITS>& m_sequencer;
0177
0178 };
0179
0180 template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0181 class single_producer_sequencer_claim_operation
0182 {
0183 public:
0184
0185 explicit single_producer_sequencer_claim_operation(
0186 single_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
0187 std::size_t count,
0188 SCHEDULER& scheduler) noexcept
0189 : m_consumerWaitOperation(
0190 sequencer.m_consumerBarrier,
0191 static_cast<SEQUENCE>(sequencer.m_nextToClaim - sequencer.m_bufferSize),
0192 scheduler)
0193 , m_sequencer(sequencer)
0194 , m_count(count)
0195 {}
0196
0197 bool await_ready() const noexcept
0198 {
0199 return m_consumerWaitOperation.await_ready();
0200 }
0201
0202 auto await_suspend(cppcoro::coroutine_handle<> awaitingCoroutine) noexcept
0203 {
0204 return m_consumerWaitOperation.await_suspend(awaitingCoroutine);
0205 }
0206
0207 sequence_range<SEQUENCE, TRAITS> await_resume() noexcept
0208 {
0209 const SEQUENCE lastAvailableSequence =
0210 static_cast<SEQUENCE>(m_consumerWaitOperation.await_resume() + m_sequencer.m_bufferSize);
0211 const SEQUENCE begin = m_sequencer.m_nextToClaim;
0212 const std::size_t availableCount = static_cast<std::size_t>(lastAvailableSequence - begin) + 1;
0213 const std::size_t countToClaim = std::min(m_count, availableCount);
0214 const SEQUENCE end = static_cast<SEQUENCE>(begin + countToClaim);
0215 m_sequencer.m_nextToClaim = end;
0216 return sequence_range<SEQUENCE, TRAITS>(begin, end);
0217 }
0218
0219 private:
0220
0221 sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER> m_consumerWaitOperation;
0222 single_producer_sequencer<SEQUENCE, TRAITS>& m_sequencer;
0223 std::size_t m_count;
0224
0225 };
0226
0227 template<typename SEQUENCE, typename TRAITS>
0228 template<typename SCHEDULER>
0229 [[nodiscard]]
0230 single_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS, SCHEDULER>
0231 single_producer_sequencer<SEQUENCE, TRAITS>::claim_one(SCHEDULER& scheduler) noexcept
0232 {
0233 return single_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS, SCHEDULER>{ *this, scheduler };
0234 }
0235
0236 template<typename SEQUENCE, typename TRAITS>
0237 template<typename SCHEDULER>
0238 [[nodiscard]]
0239 single_producer_sequencer_claim_operation<SEQUENCE, TRAITS, SCHEDULER>
0240 single_producer_sequencer<SEQUENCE, TRAITS>::claim_up_to(std::size_t count, SCHEDULER& scheduler) noexcept
0241 {
0242 return single_producer_sequencer_claim_operation<SEQUENCE, TRAITS, SCHEDULER>(*this, count, scheduler);
0243 }
0244 }
0245
0246 #endif