Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:54:52

0001 ///////////////////////////////////////////////////////////////////////////////
0002 // Copyright (c) Lewis Baker
0003 // Licenced under MIT license. See LICENSE.txt for details.
0004 ///////////////////////////////////////////////////////////////////////////////
0005 #ifndef CPPCORO_MULTI_PRODUCER_SEQUENCER_HPP_INCLUDED
0006 #define CPPCORO_MULTI_PRODUCER_SEQUENCER_HPP_INCLUDED
0007 
0008 #include <cppcoro/config.hpp>
0009 #include <cppcoro/sequence_barrier.hpp>
0010 #include <cppcoro/sequence_range.hpp>
0011 #include <cppcoro/sequence_traits.hpp>
0012 
0013 #include <cppcoro/detail/manual_lifetime.hpp>
0014 
0015 #include <atomic>
0016 #include <cstdint>
0017 #include <cassert>
0018 
0019 namespace cppcoro
0020 {
0021     template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0022     class multi_producer_sequencer_claim_one_operation;
0023 
0024     template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0025     class multi_producer_sequencer_claim_operation;
0026 
0027     template<typename SEQUENCE, typename TRAITS>
0028     class multi_producer_sequencer_wait_operation_base;
0029 
0030     template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0031     class multi_producer_sequencer_wait_operation;
0032 
0033     /// A multi-producer sequencer is a thread-synchronisation primitive that can be
0034     /// used to synchronise access to a ring-buffer of power-of-two size where you
0035     /// have multiple producers concurrently claiming slots in the ring-buffer and
0036     /// publishing items.
0037     ///
0038     /// When a writer wants to write to a slot in the buffer it first atomically
0039     /// increments a counter by the number of slots it wishes to allocate.
0040     /// It then waits until all of those slots have become available and then
0041     /// returns the range of sequence numbers allocated back to the caller.
0042     /// The caller then writes to those slots and when done publishes them by
0043     /// writing the sequence numbers published to each of the slots to the
0044     /// corresponding element of an array of equal size to the ring buffer.
0045     /// When a reader wants to check if the next sequence number is available
0046     /// it then simply needs to read from the corresponding slot in this array
0047     /// to check if the value stored there is equal to the sequence number it
0048     /// is wanting to read.
0049     ///
0050     /// This means concurrent writers are wait-free when there is space available
0051     /// in the ring buffer, requiring a single atomic fetch-add operation as the
0052     /// only contended write operation. All other writes are to memory locations
0053     /// owned by a particular writer. Concurrent writers can publish items out of
0054     /// order so that one writer does not hold up other writers until the ring
0055     /// buffer fills up.
0056     template<
0057         typename SEQUENCE = std::size_t,
0058         typename TRAITS = sequence_traits<SEQUENCE>>
0059     class multi_producer_sequencer
0060     {
0061     public:
0062 
0063         multi_producer_sequencer(
0064             const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
0065             std::size_t bufferSize,
0066             SEQUENCE initialSequence = TRAITS::initial_sequence);
0067 
0068         /// The size of the circular buffer. This will be a power-of-two.
0069         std::size_t buffer_size() const noexcept { return m_sequenceMask + 1; }
0070 
0071         /// Lookup the last-known-published sequence number after the specified
0072         /// sequence number.
0073         SEQUENCE last_published_after(SEQUENCE lastKnownPublished) const noexcept;
0074 
0075         /// Wait until the specified target sequence number has been published.
0076         ///
0077         /// Returns an awaitable type that when co_awaited will suspend the awaiting
0078         /// coroutine until the specified 'targetSequence' number and all prior sequence
0079         /// numbers have been published.
0080         template<typename SCHEDULER>
0081         multi_producer_sequencer_wait_operation<SEQUENCE, TRAITS, SCHEDULER> wait_until_published(
0082             SEQUENCE targetSequence,
0083             SEQUENCE lastKnownPublished,
0084             SCHEDULER& scheduler) const noexcept;
0085 
0086         /// Query if there are currently any slots available for claiming.
0087         ///
0088         /// Note that this return-value is only approximate if you have multiple producers
0089         /// since immediately after returning true another thread may have claimed the
0090         /// last available slot.
0091         bool any_available() const noexcept;
0092 
0093         /// Claim a single slot in the buffer and wait until that slot becomes available.
0094         ///
0095         /// Returns an Awaitable type that yields the sequence number of the slot that
0096         /// was claimed.
0097         ///
0098         /// Once the producer has claimed a slot then they are free to write to that
0099         /// slot within the ring buffer. Once the value has been initialised the item
0100         /// must be published by calling the .publish() method, passing the sequence
0101         /// number.
0102         template<typename SCHEDULER>
0103         multi_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS, SCHEDULER>
0104         claim_one(SCHEDULER& scheduler) noexcept;
0105 
0106         /// Claim a contiguous range of sequence numbers corresponding to slots within
0107         /// a ring-buffer.
0108         ///
0109         /// This will claim at most the specified count of sequence numbers but may claim
0110         /// fewer if there are only fewer entries available in the buffer. But will claim
0111         /// at least one sequence number.
0112         ///
0113         /// Returns an awaitable that will yield a sequence_range object containing the
0114         /// sequence numbers that were claimed.
0115         ///
0116         /// The caller is responsible for ensuring that they publish every element of the
0117         /// returned sequence range by calling .publish().
0118         template<typename SCHEDULER>
0119         multi_producer_sequencer_claim_operation<SEQUENCE, TRAITS, SCHEDULER>
0120         claim_up_to(std::size_t count, SCHEDULER& scheduler) noexcept;
0121 
0122         /// Publish the element with the specified sequence number, making it available
0123         /// to consumers.
0124         ///
0125         /// Note that different sequence numbers may be published by different producer
0126         /// threads out of order. A sequence number will not become available to consumers
0127         /// until all preceding sequence numbers have also been published.
0128         ///
0129         /// \param sequence
0130         /// The sequence number of the elemnt to publish
0131         /// This sequence number must have been previously acquired via a call to 'claim_one()'
0132         /// or 'claim_up_to()'.
0133         void publish(SEQUENCE sequence) noexcept;
0134 
0135         /// Publish a contiguous range of sequence numbers, making each of them available
0136         /// to consumers.
0137         ///
0138         /// This is equivalent to calling publish(seq) for each sequence number, seq, in
0139         /// the specified range, but is more efficient since it only checks to see if
0140         /// there are coroutines that need to be woken up once.
0141         void publish(const sequence_range<SEQUENCE, TRAITS>& range) noexcept;
0142 
0143     private:
0144 
0145         template<typename SEQUENCE2, typename TRAITS2>
0146         friend class multi_producer_sequencer_wait_operation_base;
0147 
0148         template<typename SEQUENCE2, typename TRAITS2, typename SCHEDULER>
0149         friend class multi_producer_sequencer_claim_operation;
0150 
0151         template<typename SEQUENCE2, typename TRAITS2, typename SCHEDULER>
0152         friend class multi_producer_sequencer_claim_one_operation;
0153 
0154         void resume_ready_awaiters() noexcept;
0155         void add_awaiter(multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>* awaiter) const noexcept;
0156 
0157 #if CPPCORO_COMPILER_MSVC
0158 # pragma warning(push)
0159 # pragma warning(disable : 4324) // C4324: structure was padded due to alignment specifier
0160 #endif
0161 
0162         const sequence_barrier<SEQUENCE, TRAITS>& m_consumerBarrier;
0163         const std::size_t m_sequenceMask;
0164         const std::unique_ptr<std::atomic<SEQUENCE>[]> m_published;
0165 
0166         alignas(CPPCORO_CPU_CACHE_LINE)
0167         std::atomic<SEQUENCE> m_nextToClaim;
0168 
0169         alignas(CPPCORO_CPU_CACHE_LINE)
0170         mutable std::atomic<multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>*> m_awaiters;
0171 
0172 #if CPPCORO_COMPILER_MSVC
0173 # pragma warning(pop)
0174 #endif
0175 
0176     };
0177 
0178     template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0179     class multi_producer_sequencer_claim_awaiter
0180     {
0181     public:
0182 
0183         multi_producer_sequencer_claim_awaiter(
0184             const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
0185             std::size_t bufferSize,
0186             const sequence_range<SEQUENCE, TRAITS>& claimedRange,
0187             SCHEDULER& scheduler) noexcept
0188             : m_barrierWait(consumerBarrier, claimedRange.back() - bufferSize, scheduler)
0189             , m_claimedRange(claimedRange)
0190         {}
0191 
0192         bool await_ready() const noexcept
0193         {
0194             return m_barrierWait.await_ready();
0195         }
0196 
0197         auto await_suspend(cppcoro::coroutine_handle<> awaitingCoroutine) noexcept
0198         {
0199             return m_barrierWait.await_suspend(awaitingCoroutine);
0200         }
0201 
0202         sequence_range<SEQUENCE, TRAITS> await_resume() noexcept
0203         {
0204             return m_claimedRange;
0205         }
0206 
0207     private:
0208 
0209         sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER> m_barrierWait;
0210         sequence_range<SEQUENCE, TRAITS> m_claimedRange;
0211 
0212     };
0213 
0214     template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0215     class multi_producer_sequencer_claim_operation
0216     {
0217     public:
0218 
0219         multi_producer_sequencer_claim_operation(
0220             multi_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
0221             std::size_t count,
0222             SCHEDULER& scheduler) noexcept
0223             : m_sequencer(sequencer)
0224             , m_count(count < sequencer.buffer_size() ? count : sequencer.buffer_size())
0225             , m_scheduler(scheduler)
0226         {
0227         }
0228 
0229         multi_producer_sequencer_claim_awaiter<SEQUENCE, TRAITS, SCHEDULER> operator co_await() noexcept
0230         {
0231             // We wait until the awaitable is actually co_await'ed before we claim the
0232             // range of elements. If we claimed them earlier, then it may be possible for
0233             // the caller to fail to co_await the result eg. due to an exception, which
0234             // would leave the sequence numbers unable to be published and would eventually
0235             // deadlock consumers that waited on them.
0236             //
0237             // TODO: We could try and acquire only as many as are available if fewer than
0238             // m_count elements are available. This would complicate the logic here somewhat
0239             // as we'd need to use a compare-exchange instead.
0240             const SEQUENCE first = m_sequencer.m_nextToClaim.fetch_add(m_count, std::memory_order_relaxed);
0241             return multi_producer_sequencer_claim_awaiter<SEQUENCE, TRAITS, SCHEDULER>{
0242                 m_sequencer.m_consumerBarrier,
0243                 m_sequencer.buffer_size(),
0244                 sequence_range<SEQUENCE, TRAITS>{ first, first + m_count },
0245                 m_scheduler
0246             };
0247         }
0248 
0249     private:
0250 
0251         multi_producer_sequencer<SEQUENCE, TRAITS>& m_sequencer;
0252         std::size_t m_count;
0253         SCHEDULER& m_scheduler;
0254 
0255     };
0256 
0257     template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0258     class multi_producer_sequencer_claim_one_awaiter
0259     {
0260     public:
0261 
0262         multi_producer_sequencer_claim_one_awaiter(
0263             const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
0264             std::size_t bufferSize,
0265             SEQUENCE claimedSequence,
0266             SCHEDULER& scheduler) noexcept
0267             : m_waitOp(consumerBarrier, claimedSequence - bufferSize, scheduler)
0268             , m_claimedSequence(claimedSequence)
0269         {}
0270 
0271         bool await_ready() const noexcept
0272         {
0273             return m_waitOp.await_ready();
0274         }
0275 
0276         auto await_suspend(cppcoro::coroutine_handle<> awaitingCoroutine) noexcept
0277         {
0278             return m_waitOp.await_suspend(awaitingCoroutine);
0279         }
0280 
0281         SEQUENCE await_resume() noexcept
0282         {
0283             return m_claimedSequence;
0284         }
0285 
0286     private:
0287 
0288         sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER> m_waitOp;
0289         SEQUENCE m_claimedSequence;
0290 
0291     };
0292 
0293     template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0294     class multi_producer_sequencer_claim_one_operation
0295     {
0296     public:
0297 
0298         multi_producer_sequencer_claim_one_operation(
0299             multi_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
0300             SCHEDULER& scheduler) noexcept
0301             : m_sequencer(sequencer)
0302             , m_scheduler(scheduler)
0303         {}
0304 
0305         multi_producer_sequencer_claim_one_awaiter<SEQUENCE, TRAITS, SCHEDULER> operator co_await() noexcept
0306         {
0307             return multi_producer_sequencer_claim_one_awaiter<SEQUENCE, TRAITS, SCHEDULER>{
0308                 m_sequencer.m_consumerBarrier,
0309                 m_sequencer.buffer_size(),
0310                 m_sequencer.m_nextToClaim.fetch_add(1, std::memory_order_relaxed),
0311                 m_scheduler
0312             };
0313         }
0314 
0315     private:
0316 
0317         multi_producer_sequencer<SEQUENCE, TRAITS>& m_sequencer;
0318         SCHEDULER& m_scheduler;
0319 
0320     };
0321 
0322     template<typename SEQUENCE, typename TRAITS>
0323     class multi_producer_sequencer_wait_operation_base
0324     {
0325     public:
0326 
0327         multi_producer_sequencer_wait_operation_base(
0328             const multi_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
0329             SEQUENCE targetSequence,
0330             SEQUENCE lastKnownPublished) noexcept
0331             : m_sequencer(sequencer)
0332             , m_targetSequence(targetSequence)
0333             , m_lastKnownPublished(lastKnownPublished)
0334             , m_readyToResume(false)
0335         {}
0336 
0337         multi_producer_sequencer_wait_operation_base(
0338             const multi_producer_sequencer_wait_operation_base& other) noexcept
0339             : m_sequencer(other.m_sequencer)
0340             , m_targetSequence(other.m_targetSequence)
0341             , m_lastKnownPublished(other.m_lastKnownPublished)
0342             , m_readyToResume(false)
0343         {}
0344 
0345         bool await_ready() const noexcept
0346         {
0347             return !TRAITS::precedes(m_lastKnownPublished, m_targetSequence);
0348         }
0349 
0350         bool await_suspend(cppcoro::coroutine_handle<> awaitingCoroutine) noexcept
0351         {
0352             m_awaitingCoroutine = awaitingCoroutine;
0353 
0354             m_sequencer.add_awaiter(this);
0355 
0356             // Mark the waiter as ready to resume.
0357             // If it was already marked as ready-to-resume within the call to add_awaiter() or
0358             // on another thread then this exchange() will return true. In this case we want to
0359             // resume immediately and continue execution by returning false.
0360             return !m_readyToResume.exchange(true, std::memory_order_acquire);
0361         }
0362 
0363         SEQUENCE await_resume() noexcept
0364         {
0365             return m_lastKnownPublished;
0366         }
0367 
0368     protected:
0369 
0370         friend class multi_producer_sequencer<SEQUENCE, TRAITS>;
0371 
0372         void resume(SEQUENCE lastKnownPublished) noexcept
0373         {
0374             m_lastKnownPublished = lastKnownPublished;
0375             if (m_readyToResume.exchange(true, std::memory_order_release))
0376             {
0377                 resume_impl();
0378             }
0379         }
0380 
0381         virtual void resume_impl() noexcept = 0;
0382 
0383         const multi_producer_sequencer<SEQUENCE, TRAITS>& m_sequencer;
0384         SEQUENCE m_targetSequence;
0385         SEQUENCE m_lastKnownPublished;
0386         multi_producer_sequencer_wait_operation_base* m_next;
0387         cppcoro::coroutine_handle<> m_awaitingCoroutine;
0388         std::atomic<bool> m_readyToResume;
0389     };
0390 
0391     template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0392     class multi_producer_sequencer_wait_operation :
0393         public multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>
0394     {
0395         using schedule_operation = decltype(std::declval<SCHEDULER&>().schedule());
0396 
0397     public:
0398 
0399         multi_producer_sequencer_wait_operation(
0400             const multi_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
0401             SEQUENCE targetSequence,
0402             SEQUENCE lastKnownPublished,
0403             SCHEDULER& scheduler) noexcept
0404             : multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>(sequencer, targetSequence, lastKnownPublished)
0405             , m_scheduler(scheduler)
0406         {}
0407 
0408         multi_producer_sequencer_wait_operation(
0409             const multi_producer_sequencer_wait_operation& other) noexcept
0410             : multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>(other)
0411             , m_scheduler(other.m_scheduler)
0412         {}
0413 
0414         ~multi_producer_sequencer_wait_operation()
0415         {
0416             if (m_isScheduleAwaiterCreated)
0417             {
0418                 m_scheduleAwaiter.destruct();
0419             }
0420             if (m_isScheduleOperationCreated)
0421             {
0422                 m_scheduleOperation.destruct();
0423             }
0424         }
0425 
0426         SEQUENCE await_resume() noexcept(noexcept(m_scheduleOperation->await_resume()))
0427         {
0428             if (m_isScheduleOperationCreated)
0429             {
0430                 m_scheduleOperation->await_resume();
0431             }
0432 
0433             return multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>::await_resume();
0434         }
0435 
0436     private:
0437 
0438         void resume_impl() noexcept override
0439         {
0440             try
0441             {
0442                 m_scheduleOperation.construct(m_scheduler.schedule());
0443                 m_isScheduleOperationCreated = true;
0444 
0445                 m_scheduleAwaiter.construct(detail::get_awaiter(
0446                     static_cast<schedule_operation&&>(*m_scheduleOperation)));
0447                 m_isScheduleAwaiterCreated = true;
0448 
0449                 if (!m_scheduleAwaiter->await_ready())
0450                 {
0451                     using await_suspend_result_t = decltype(m_scheduleAwaiter->await_suspend(this->m_awaitingCoroutine));
0452                     if constexpr (std::is_void_v<await_suspend_result_t>)
0453                     {
0454                         m_scheduleAwaiter->await_suspend(this->m_awaitingCoroutine);
0455                         return;
0456                     }
0457                     else if constexpr (std::is_same_v<await_suspend_result_t, bool>)
0458                     {
0459                         if (m_scheduleAwaiter->await_suspend(this->m_awaitingCoroutine))
0460                         {
0461                             return;
0462                         }
0463                     }
0464                     else
0465                     {
0466                         // Assume it returns a coroutine_handle.
0467                         m_scheduleAwaiter->await_suspend(this->m_awaitingCoroutine).resume();
0468                         return;
0469                     }
0470                 }
0471             }
0472             catch (...)
0473             {
0474                 // Ignore failure to reschedule and resume inline?
0475                 // Should we catch the exception and rethrow from await_resume()?
0476                 // Or should we require that 'co_await scheduler.schedule()' is noexcept?
0477             }
0478 
0479             // Resume outside the catch-block.
0480             this->m_awaitingCoroutine.resume();
0481         }
0482 
0483         SCHEDULER& m_scheduler;
0484         // Can't use std::optional<T> here since T could be a reference.
0485         detail::manual_lifetime<schedule_operation> m_scheduleOperation;
0486         detail::manual_lifetime<typename awaitable_traits<schedule_operation>::awaiter_t> m_scheduleAwaiter;
0487         bool m_isScheduleOperationCreated = false;
0488         bool m_isScheduleAwaiterCreated = false;
0489 
0490     };
0491 
0492     template<typename SEQUENCE, typename TRAITS>
0493     multi_producer_sequencer<SEQUENCE, TRAITS>::multi_producer_sequencer(
0494         const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
0495         std::size_t bufferSize,
0496         SEQUENCE initialSequence)
0497         : m_consumerBarrier(consumerBarrier)
0498         , m_sequenceMask(bufferSize - 1)
0499         , m_published(std::make_unique<std::atomic<SEQUENCE>[]>(bufferSize))
0500         , m_nextToClaim(initialSequence + 1)
0501         , m_awaiters(nullptr)
0502     {
0503         // bufferSize must be a positive power-of-two
0504         assert(bufferSize > 0 && (bufferSize & (bufferSize - 1)) == 0);
0505         // but must be no larger than the max diff value.
0506         using diff_t = typename TRAITS::difference_type;
0507         using unsigned_diff_t = std::make_unsigned_t<diff_t>;
0508         constexpr unsigned_diff_t maxSize = static_cast<unsigned_diff_t>(std::numeric_limits<diff_t>::max());
0509         assert(bufferSize <= maxSize);
0510 
0511         SEQUENCE seq = initialSequence - (bufferSize - 1);
0512         do
0513         {
0514 #ifdef __cpp_lib_atomic_value_initialization
0515             m_published[seq & m_sequenceMask].store(seq, std::memory_order_relaxed);
0516 #else // ^^^ __cpp_lib_atomic_value_initialization // !__cpp_lib_atomic_value_initialization vvv
0517             std::atomic_init(&m_published[seq & m_sequenceMask], seq);
0518 #endif // !__cpp_lib_atomic_value_initialization
0519         } while (seq++ != initialSequence);
0520     }
0521 
0522     template<typename SEQUENCE, typename TRAITS>
0523     SEQUENCE multi_producer_sequencer<SEQUENCE, TRAITS>::last_published_after(
0524         SEQUENCE lastKnownPublished) const noexcept
0525     {
0526         const auto mask = m_sequenceMask;
0527         SEQUENCE seq = lastKnownPublished + 1;
0528         while (m_published[seq & mask].load(std::memory_order_acquire) == seq)
0529         {
0530             lastKnownPublished = seq++;
0531         }
0532         return lastKnownPublished;
0533     }
0534 
0535     template<typename SEQUENCE, typename TRAITS>
0536     template<typename SCHEDULER>
0537     multi_producer_sequencer_wait_operation<SEQUENCE, TRAITS, SCHEDULER>
0538     multi_producer_sequencer<SEQUENCE, TRAITS>::wait_until_published(
0539         SEQUENCE targetSequence,
0540         SEQUENCE lastKnownPublished,
0541         SCHEDULER& scheduler) const noexcept
0542     {
0543         return multi_producer_sequencer_wait_operation<SEQUENCE, TRAITS, SCHEDULER>{
0544             *this, targetSequence, lastKnownPublished, scheduler
0545         };
0546     }
0547 
0548     template<typename SEQUENCE, typename TRAITS>
0549     bool multi_producer_sequencer<SEQUENCE, TRAITS>::any_available() const noexcept
0550     {
0551         return TRAITS::precedes(
0552             m_nextToClaim.load(std::memory_order_relaxed),
0553             m_consumerBarrier.last_published() + buffer_size());
0554     }
0555 
0556     template<typename SEQUENCE, typename TRAITS>
0557     template<typename SCHEDULER>
0558     multi_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS, SCHEDULER>
0559     multi_producer_sequencer<SEQUENCE, TRAITS>::claim_one(SCHEDULER& scheduler) noexcept
0560     {
0561         return multi_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS, SCHEDULER>{ *this, scheduler };
0562     }
0563 
0564     template<typename SEQUENCE, typename TRAITS>
0565     template<typename SCHEDULER>
0566     multi_producer_sequencer_claim_operation<SEQUENCE, TRAITS, SCHEDULER>
0567     multi_producer_sequencer<SEQUENCE, TRAITS>::claim_up_to(std::size_t count, SCHEDULER& scheduler) noexcept
0568     {
0569         return multi_producer_sequencer_claim_operation<SEQUENCE, TRAITS, SCHEDULER>{ *this, count, scheduler };
0570     }
0571 
0572     template<typename SEQUENCE, typename TRAITS>
0573     void multi_producer_sequencer<SEQUENCE, TRAITS>::publish(SEQUENCE sequence) noexcept
0574     {
0575         m_published[sequence & m_sequenceMask].store(sequence, std::memory_order_seq_cst);
0576 
0577         // Resume any waiters that might have been satisfied by this publish operation.
0578         resume_ready_awaiters();
0579     }
0580 
0581     template<typename SEQUENCE, typename TRAITS>
0582     void multi_producer_sequencer<SEQUENCE, TRAITS>::publish(const sequence_range<SEQUENCE, TRAITS>& range) noexcept
0583     {
0584         if (range.empty())
0585         {
0586             return;
0587         }
0588 
0589         // Publish all but the first sequence number using relaxed atomics.
0590         // No consumer should be reading those subsequent sequence numbers until they've seen
0591         // that the first sequence number in the range is published.
0592         for (SEQUENCE seq : range.skip(1))
0593         {
0594             m_published[seq & m_sequenceMask].store(seq, std::memory_order_relaxed);
0595         }
0596 
0597         // Now publish the first sequence number with seq_cst semantics.
0598         m_published[range.front() & m_sequenceMask].store(range.front(), std::memory_order_seq_cst);
0599 
0600         // Resume any waiters that might have been satisfied by this publish operation.
0601         resume_ready_awaiters();
0602     }
0603 
0604     template<typename SEQUENCE, typename TRAITS>
0605     void multi_producer_sequencer<SEQUENCE, TRAITS>::resume_ready_awaiters() noexcept
0606     {
0607         using awaiter_t = multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>;
0608 
0609         awaiter_t* awaiters = m_awaiters.load(std::memory_order_seq_cst);
0610         if (awaiters == nullptr)
0611         {
0612             // No awaiters
0613             return;
0614         }
0615 
0616         // There were some awaiters. Try to acquire the list of waiters with an
0617         // atomic exchange as we might be racing with other consumers/producers.
0618         awaiters = m_awaiters.exchange(nullptr, std::memory_order_seq_cst);
0619         if (awaiters == nullptr)
0620         {
0621             // Didn't acquire the list
0622             // Some other thread is now responsible for resuming them. Our job is done.
0623             return;
0624         }
0625 
0626         SEQUENCE lastKnownPublished;
0627 
0628         awaiter_t* awaitersToResume;
0629         awaiter_t** awaitersToResumeTail = &awaitersToResume;
0630 
0631         awaiter_t* awaitersToRequeue;
0632         awaiter_t** awaitersToRequeueTail = &awaitersToRequeue;
0633 
0634         do
0635         {
0636             using diff_t = typename TRAITS::difference_type;
0637 
0638             lastKnownPublished = last_published_after(awaiters->m_lastKnownPublished);
0639 
0640             // First scan the list of awaiters and split them into 'requeue' and 'resume' lists.
0641             auto minDiff = std::numeric_limits<diff_t>::max();
0642             do
0643             {
0644                 auto diff = TRAITS::difference(awaiters->m_targetSequence, lastKnownPublished);
0645                 if (diff > 0)
0646                 {
0647                     // Not ready yet.
0648                     minDiff = diff < minDiff ? diff : minDiff;
0649                     *awaitersToRequeueTail = awaiters;
0650                     awaitersToRequeueTail = &awaiters->m_next;
0651                 }
0652                 else
0653                 {
0654                     *awaitersToResumeTail = awaiters;
0655                     awaitersToResumeTail = &awaiters->m_next;
0656                 }
0657                 awaiters->m_lastKnownPublished = lastKnownPublished;
0658                 awaiters = awaiters->m_next;
0659             } while (awaiters != nullptr);
0660 
0661             // Null-terinate the requeue list
0662             *awaitersToRequeueTail = nullptr;
0663 
0664             if (awaitersToRequeue != nullptr)
0665             {
0666                 // Requeue the waiters that are not ready yet.
0667                 awaiter_t* oldHead = nullptr;
0668                 while (!m_awaiters.compare_exchange_weak(oldHead, awaitersToRequeue, std::memory_order_seq_cst, std::memory_order_relaxed))
0669                 {
0670                     *awaitersToRequeueTail = oldHead;
0671                 }
0672 
0673                 // Reset the awaitersToRequeue list
0674                 awaitersToRequeueTail = &awaitersToRequeue;
0675 
0676                 const SEQUENCE earliestTargetSequence = lastKnownPublished + minDiff;
0677 
0678                 // Now we need to check again to see if any of the waiters we just enqueued
0679                 // is now satisfied by a concurrent call to publish().
0680                 //
0681                 // We need to be a bit more careful here since we are no longer holding any
0682                 // awaiters and so producers/consumers may advance the sequence number arbitrarily
0683                 // far. If the sequence number advances more than buffer_size() ahead of the
0684                 // earliestTargetSequence then the m_published[] array may have sequence numbers
0685                 // that have advanced beyond earliestTargetSequence, potentially even wrapping
0686                 // sequence numbers around to then be preceding where they were before. If this
0687                 // happens then we don't need to worry about resuming any awaiters that were waiting
0688                 // for 'earliestTargetSequence' since some other thread has already resumed them.
0689                 // So the only case we need to worry about here is when all m_published entries for
0690                 // sequence numbers in range [lastKnownPublished + 1, earliestTargetSequence] have
0691                 // published sequence numbers that match the range.
0692                 const auto sequenceMask = m_sequenceMask;
0693                 SEQUENCE seq = lastKnownPublished + 1;
0694                 while (m_published[seq & sequenceMask].load(std::memory_order_seq_cst) == seq)
0695                 {
0696                     lastKnownPublished = seq;
0697                     if (seq == earliestTargetSequence)
0698                     {
0699                         // At least one of the awaiters we just published is now satisfied.
0700                         // Reacquire the list of awaiters and continue around the outer loop.
0701                         awaiters = m_awaiters.exchange(nullptr, std::memory_order_acquire);
0702                         break;
0703                     }
0704                     ++seq;
0705                 }
0706             }
0707         } while (awaiters != nullptr);
0708 
0709         // Null-terminate list of awaiters to resume.
0710         *awaitersToResumeTail = nullptr;
0711 
0712         while (awaitersToResume != nullptr)
0713         {
0714             awaiter_t* next = awaitersToResume->m_next;
0715             awaitersToResume->resume(lastKnownPublished);
0716             awaitersToResume = next;
0717         }
0718     }
0719 
0720     template<typename SEQUENCE, typename TRAITS>
0721     void multi_producer_sequencer<SEQUENCE, TRAITS>::add_awaiter(
0722         multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>* awaiter) const noexcept
0723     {
0724         using awaiter_t = multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>;
0725 
0726         SEQUENCE targetSequence = awaiter->m_targetSequence;
0727         SEQUENCE lastKnownPublished = awaiter->m_lastKnownPublished;
0728 
0729         awaiter_t* awaitersToEnqueue = awaiter;
0730         awaiter_t** awaitersToEnqueueTail = &awaiter->m_next;
0731 
0732         awaiter_t* awaitersToResume;
0733         awaiter_t** awaitersToResumeTail = &awaitersToResume;
0734 
0735         const SEQUENCE sequenceMask = m_sequenceMask;
0736 
0737         do
0738         {
0739             // Enqueue the awaiters.
0740             {
0741                 awaiter_t* oldHead = m_awaiters.load(std::memory_order_relaxed);
0742                 do
0743                 {
0744                     *awaitersToEnqueueTail = oldHead;
0745                 } while (!m_awaiters.compare_exchange_weak(
0746                     oldHead,
0747                     awaitersToEnqueue,
0748                     std::memory_order_seq_cst,
0749                     std::memory_order_relaxed));
0750             }
0751 
0752             // Reset list of waiters
0753             awaitersToEnqueueTail = &awaitersToEnqueue;
0754 
0755             // Check to see if the last-known published sequence number has advanced
0756             // while we were enqueuing the awaiters. Need to use seq_cst memory order
0757             // here to ensure that if there are concurrent calls to publish() that would
0758             // wake up any of the awaiters we just enqueued that either we will see their
0759             // write to m_published slots or they will see our write to m_awaiters.
0760             //
0761             // Note also, that we are assuming that the last-known published sequence is
0762             // not going to advance more than buffer_size() ahead of targetSequence since
0763             // there is at least one consumer that won't be resumed and so thus can't
0764             // publish the sequence number it's waiting for to its sequence_barrier and so
0765             // producers won't be able to claim its slot in the buffer.
0766             //
0767             // TODO: Check whether we can weaken the memory order here to just use 'seq_cst' on the
0768             // first .load() and then use 'acquire' on subsequent .load().
0769             while (m_published[(lastKnownPublished + 1) & sequenceMask].load(std::memory_order_seq_cst) == (lastKnownPublished + 1))
0770             {
0771                 ++lastKnownPublished;
0772             }
0773 
0774             if (!TRAITS::precedes(lastKnownPublished, targetSequence))
0775             {
0776                 // At least one awaiter we just enqueued has now been satisified.
0777                 // To ensure it is woken up we need to reacquire the list of awaiters and resume
0778                 awaiter_t* awaiters = m_awaiters.exchange(nullptr, std::memory_order_acquire);
0779 
0780                 using diff_t = typename TRAITS::difference_type;
0781 
0782                 diff_t minDiff = std::numeric_limits<diff_t>::max();
0783 
0784                 while (awaiters != nullptr)
0785                 {
0786                     diff_t diff = TRAITS::difference(targetSequence, lastKnownPublished);
0787                     if (diff > 0)
0788                     {
0789                         // Not yet ready.
0790                         minDiff = diff < minDiff ? diff : minDiff;
0791                         *awaitersToEnqueueTail = awaiters;
0792                         awaitersToEnqueueTail = &awaiters->m_next;
0793                         awaiters->m_lastKnownPublished = lastKnownPublished;
0794                     }
0795                     else
0796                     {
0797                         // Now ready.
0798                         *awaitersToResumeTail = awaiters;
0799                         awaitersToResumeTail = &awaiters->m_next;
0800                     }
0801                     awaiters = awaiters->m_next;
0802                 }
0803 
0804                 // Calculate the earliest sequence number that any awaiters in the
0805                 // awaitersToEnqueue list are waiting for. We'll use this next time
0806                 // around the loop.
0807                 targetSequence = static_cast<SEQUENCE>(lastKnownPublished + minDiff);
0808             }
0809 
0810             // Null-terminate list of awaiters to enqueue.
0811             *awaitersToEnqueueTail = nullptr;
0812 
0813         } while (awaitersToEnqueue != nullptr);
0814 
0815         // Null-terminate awaiters to resume.
0816         *awaitersToResumeTail = nullptr;
0817 
0818         // Finally, resume any awaiters we've found that are ready to go.
0819         while (awaitersToResume != nullptr)
0820         {
0821             // Read m_next before calling .resume() as resuming could destroy the awaiter.
0822             awaiter_t* next = awaitersToResume->m_next;
0823             awaitersToResume->resume(lastKnownPublished);
0824             awaitersToResume = next;
0825         }
0826     }
0827 }
0828 
0829 #endif