Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:54:52

0001 ///////////////////////////////////////////////////////////////////////////////
0002 // Copyright (c) Lewis Baker
0003 // Licenced under MIT license. See LICENSE.txt for details.
0004 ///////////////////////////////////////////////////////////////////////////////
0005 #ifndef CPPCORO_SEQUENCE_BARRIER_HPP_INCLUDED
0006 #define CPPCORO_SEQUENCE_BARRIER_HPP_INCLUDED
0007 
0008 #include <cppcoro/config.hpp>
0009 #include <cppcoro/awaitable_traits.hpp>
0010 #include <cppcoro/sequence_traits.hpp>
0011 #include <cppcoro/detail/manual_lifetime.hpp>
0012 
0013 #include <atomic>
0014 #include <cassert>
0015 #include <cstdint>
0016 #include <limits>
0017 #include <optional>
0018 #include <cppcoro/coroutine.hpp>
0019 
0020 namespace cppcoro
0021 {
0022     template<typename SEQUENCE, typename TRAITS>
0023     class sequence_barrier_wait_operation_base;
0024 
0025     template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0026     class sequence_barrier_wait_operation;
0027 
0028     /// A sequence barrier is a synchronisation primitive that allows a single-producer
0029     /// and multiple-consumers to coordinate with respect to a monotonically increasing
0030     /// sequence number.
0031     ///
0032     /// A single producer advances the sequence number by publishing new sequence numbers in a
0033     /// monotonically increasing order. One or more consumers can query the last-published
0034     /// sequence number and can wait until a particular sequence number has been published.
0035     ///
0036     /// A sequence barrier can be used to represent a cursor into a thread-safe producer/consumer
0037     /// ring-buffer.
0038     ///
0039     /// See the LMAX Disruptor pattern for more background:
0040     /// https://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdf
0041     template<
0042         typename SEQUENCE = std::size_t,
0043         typename TRAITS = sequence_traits<SEQUENCE>>
0044     class sequence_barrier
0045     {
0046         static_assert(
0047             std::is_integral_v<SEQUENCE>,
0048             "sequence_barrier requires an integral sequence type");
0049 
0050         using awaiter_t = sequence_barrier_wait_operation_base<SEQUENCE, TRAITS>;
0051 
0052     public:
0053 
0054         /// Construct a sequence barrier with the specified initial sequence number
0055         /// as the initial value 'last_published()'.
0056         sequence_barrier(SEQUENCE initialSequence = TRAITS::initial_sequence) noexcept
0057             : m_lastPublished(initialSequence)
0058             , m_awaiters(nullptr)
0059         {}
0060 
0061         ~sequence_barrier()
0062         {
0063             // Shouldn't be destructing a sequence barrier if there are still waiters.
0064             assert(m_awaiters.load(std::memory_order_relaxed) == nullptr);
0065         }
0066 
0067         /// Query the sequence number that was most recently published by the producer.
0068         ///
0069         /// You can assume that all sequence numbers prior to the returned sequence number
0070         /// have also been published. This means you can safely access all elements with
0071         /// sequence numbers up to and including the returned sequence number without any
0072         /// further synchronisation.
0073         SEQUENCE last_published() const noexcept
0074         {
0075             return m_lastPublished.load(std::memory_order_acquire);
0076         }
0077 
0078         /// Wait until a particular sequence number has been published.
0079         ///
0080         /// If the specified sequence number is not yet published then the awaiting coroutine
0081         /// will be suspended and later resumed inside the call to publish() that publishes
0082         /// the specified sequence number.
0083         ///
0084         /// \param targetSequence
0085         /// The sequence number to wait for.
0086         ///
0087         /// \return
0088         /// An awaitable that when co_await'ed will suspend the awaiting coroutine until
0089         /// the specified target sequence number has been published.
0090         /// The result of the co_await expression will be the last-known published sequence
0091         /// number. This is guaranteed not to precede \p targetSequence but may be a sequence
0092         /// number after \p targetSequence, which indicates that more elements have been
0093         /// published than you were waiting for.
0094         template<typename SCHEDULER>
0095         [[nodiscard]]
0096         sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER> wait_until_published(
0097             SEQUENCE targetSequence,
0098             SCHEDULER& scheduler) const noexcept;
0099 
0100         /// Publish the specified sequence number to consumers.
0101         ///
0102         /// This publishes all sequence numbers up to and including the specified sequence
0103         /// number. This will resume any coroutine that was suspended waiting for a sequence
0104         /// number that was published by this operation.
0105         ///
0106         /// \param sequence
0107         /// The sequence number to publish. This number must not precede the current
0108         /// last_published() value. ie. the published sequence numbers must be monotonically
0109         /// increasing.
0110         void publish(SEQUENCE sequence) noexcept;
0111 
0112     private:
0113 
0114         friend class sequence_barrier_wait_operation_base<SEQUENCE, TRAITS>;
0115 
0116         void add_awaiter(awaiter_t* awaiter) const noexcept;
0117 
0118 #if CPPCORO_COMPILER_MSVC
0119 # pragma warning(push)
0120 # pragma warning(disable : 4324) // C4324: structure was padded due to alignment specifier
0121 #endif
0122 
0123         // First cache-line is written to by the producer only
0124         alignas(CPPCORO_CPU_CACHE_LINE)
0125         std::atomic<SEQUENCE> m_lastPublished;
0126 
0127         // Second cache-line is written to by both the producer and consumers
0128         alignas(CPPCORO_CPU_CACHE_LINE)
0129         mutable std::atomic<awaiter_t*> m_awaiters;
0130 
0131 #if CPPCORO_COMPILER_MSVC
0132 # pragma warning(pop)
0133 #endif
0134 
0135     };
0136 
0137     template<typename SEQUENCE, typename TRAITS>
0138     class sequence_barrier_wait_operation_base
0139     {
0140     public:
0141 
0142         explicit sequence_barrier_wait_operation_base(
0143             const sequence_barrier<SEQUENCE, TRAITS>& barrier,
0144             SEQUENCE targetSequence) noexcept
0145             : m_barrier(barrier)
0146             , m_targetSequence(targetSequence)
0147             , m_lastKnownPublished(barrier.last_published())
0148             , m_readyToResume(false)
0149         {}
0150 
0151         sequence_barrier_wait_operation_base(
0152             const sequence_barrier_wait_operation_base& other) noexcept
0153             : m_barrier(other.m_barrier)
0154             , m_targetSequence(other.m_targetSequence)
0155             , m_lastKnownPublished(other.m_lastKnownPublished)
0156             , m_readyToResume(false)
0157         {}
0158 
0159         bool await_ready() const noexcept
0160         {
0161             return !TRAITS::precedes(m_lastKnownPublished, m_targetSequence);
0162         }
0163 
0164         bool await_suspend(cppcoro::coroutine_handle<> awaitingCoroutine) noexcept
0165         {
0166             m_awaitingCoroutine = awaitingCoroutine;
0167             m_barrier.add_awaiter(this);
0168             return !m_readyToResume.exchange(true, std::memory_order_acquire);
0169         }
0170 
0171         SEQUENCE await_resume() noexcept
0172         {
0173             return m_lastKnownPublished;
0174         }
0175 
0176     protected:
0177 
0178         friend class sequence_barrier<SEQUENCE, TRAITS>;
0179 
0180         void resume() noexcept
0181         {
0182             // This synchronises with the exchange(true, std::memory_order_acquire) in await_suspend().
0183             if (m_readyToResume.exchange(true, std::memory_order_release))
0184             {
0185                 resume_impl();
0186             }
0187         }
0188 
0189         virtual void resume_impl() noexcept = 0;
0190 
0191         const sequence_barrier<SEQUENCE, TRAITS>& m_barrier;
0192         const SEQUENCE m_targetSequence;
0193         SEQUENCE m_lastKnownPublished;
0194         sequence_barrier_wait_operation_base* m_next;
0195         cppcoro::coroutine_handle<> m_awaitingCoroutine;
0196         std::atomic<bool> m_readyToResume;
0197 
0198     };
0199 
0200     template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0201     class sequence_barrier_wait_operation : public sequence_barrier_wait_operation_base<SEQUENCE, TRAITS>
0202     {
0203         using schedule_operation = decltype(std::declval<SCHEDULER&>().schedule());
0204 
0205     public:
0206         sequence_barrier_wait_operation(
0207             const sequence_barrier<SEQUENCE, TRAITS>& barrier,
0208             SEQUENCE targetSequence,
0209             SCHEDULER& scheduler) noexcept
0210             : sequence_barrier_wait_operation_base<SEQUENCE, TRAITS>(barrier, targetSequence)
0211             , m_scheduler(scheduler)
0212         {}
0213 
0214         sequence_barrier_wait_operation(
0215             const sequence_barrier_wait_operation& other) noexcept
0216             : sequence_barrier_wait_operation_base<SEQUENCE, TRAITS>(other)
0217             , m_scheduler(other.m_scheduler)
0218         {}
0219 
0220         ~sequence_barrier_wait_operation()
0221         {
0222             if (m_isScheduleAwaiterCreated)
0223             {
0224                 m_scheduleAwaiter.destruct();
0225             }
0226             if (m_isScheduleOperationCreated)
0227             {
0228                 m_scheduleOperation.destruct();
0229             }
0230         }
0231 
0232         decltype(auto) await_resume() noexcept(noexcept(m_scheduleAwaiter->await_resume()))
0233         {
0234             if (m_isScheduleAwaiterCreated)
0235             {
0236                 m_scheduleAwaiter->await_resume();
0237             }
0238 
0239             return sequence_barrier_wait_operation_base<SEQUENCE, TRAITS>::await_resume();
0240         }
0241 
0242     private:
0243 
0244         void resume_impl() noexcept override
0245         {
0246             try
0247             {
0248                 m_scheduleOperation.construct(m_scheduler.schedule());
0249                 m_isScheduleOperationCreated = true;
0250 
0251                 m_scheduleAwaiter.construct(detail::get_awaiter(
0252                     static_cast<schedule_operation&&>(*m_scheduleOperation)));
0253                 m_isScheduleAwaiterCreated = true;
0254 
0255                 if (!m_scheduleAwaiter->await_ready())
0256                 {
0257                     using await_suspend_result_t = decltype(m_scheduleAwaiter->await_suspend(this->m_awaitingCoroutine));
0258                     if constexpr (std::is_void_v<await_suspend_result_t>)
0259                     {
0260                         m_scheduleAwaiter->await_suspend(this->m_awaitingCoroutine);
0261                         return;
0262                     }
0263                     else if constexpr (std::is_same_v<await_suspend_result_t, bool>)
0264                     {
0265                         if (m_scheduleAwaiter->await_suspend(this->m_awaitingCoroutine))
0266                         {
0267                             return;
0268                         }
0269                     }
0270                     else
0271                     {
0272                         // Assume it returns a coroutine_handle.
0273                         m_scheduleAwaiter->await_suspend(this->m_awaitingCoroutine).resume();
0274                         return;
0275                     }
0276                 }
0277             }
0278             catch (...)
0279             {
0280                 // Ignore failure to reschedule and resume inline?
0281                 // Should we catch the exception and rethrow from await_resume()?
0282                 // Or should we require that 'co_await scheduler.schedule()' is noexcept?
0283             }
0284 
0285             // Resume outside the catch-block.
0286             this->m_awaitingCoroutine.resume();
0287         }
0288 
0289         SCHEDULER& m_scheduler;
0290         // Can't use std::optional<T> here since T could be a reference.
0291         detail::manual_lifetime<schedule_operation> m_scheduleOperation;
0292         detail::manual_lifetime<typename awaitable_traits<schedule_operation>::awaiter_t> m_scheduleAwaiter;
0293         bool m_isScheduleOperationCreated = false;
0294         bool m_isScheduleAwaiterCreated = false;
0295     };
0296 
0297     template<typename SEQUENCE, typename TRAITS>
0298     template<typename SCHEDULER>
0299     [[nodiscard]]
0300     sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER> sequence_barrier<SEQUENCE, TRAITS>::wait_until_published(
0301         SEQUENCE targetSequence,
0302         SCHEDULER& scheduler) const noexcept
0303     {
0304         return sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER>(*this, targetSequence, scheduler);
0305     }
0306 
0307     template<typename SEQUENCE, typename TRAITS>
0308     void sequence_barrier<SEQUENCE, TRAITS>::publish(SEQUENCE sequence) noexcept
0309     {
0310         m_lastPublished.store(sequence, std::memory_order_seq_cst);
0311 
0312         // Cheaper check to see if there are any awaiting coroutines.
0313         auto* awaiters = m_awaiters.load(std::memory_order_seq_cst);
0314         if (awaiters == nullptr)
0315         {
0316             return;
0317         }
0318 
0319         // Acquire the list of awaiters.
0320         // Note we may be racing with add_awaiter() which could also acquire the list of waiters
0321         // so we need to check again whether we won the race and acquired the list.
0322         awaiters = m_awaiters.exchange(nullptr, std::memory_order_acquire);
0323         if (awaiters == nullptr)
0324         {
0325             return;
0326         }
0327 
0328         // Check the list of awaiters for ones that are now satisfied by the sequence number
0329         // we just published. Awaiters are added to either the 'awaitersToResume' list or to
0330         // the 'awaitersToRequeue' list.
0331         awaiter_t* awaitersToResume;
0332         awaiter_t** awaitersToResumeTail = &awaitersToResume;
0333 
0334         awaiter_t* awaitersToRequeue;
0335         awaiter_t** awaitersToRequeueTail = &awaitersToRequeue;
0336 
0337         do
0338         {
0339             if (TRAITS::precedes(sequence, awaiters->m_targetSequence))
0340             {
0341                 // Target sequence not reached. Append to 'requeue' list.
0342                 *awaitersToRequeueTail = awaiters;
0343                 awaitersToRequeueTail = &awaiters->m_next;
0344             }
0345             else
0346             {
0347                 // Target sequence reached. Append to 'resume' list.
0348                 *awaitersToResumeTail = awaiters;
0349                 awaitersToResumeTail = &awaiters->m_next;
0350             }
0351             awaiters = awaiters->m_next;
0352         } while (awaiters != nullptr);
0353 
0354         // Null-terminate the two lists.
0355         *awaitersToRequeueTail = nullptr;
0356         *awaitersToResumeTail = nullptr;
0357 
0358         if (awaitersToRequeue != nullptr)
0359         {
0360             awaiter_t* oldHead = nullptr;
0361             while (!m_awaiters.compare_exchange_weak(
0362                 oldHead,
0363                 awaitersToRequeue,
0364                 std::memory_order_release,
0365                 std::memory_order_relaxed))
0366             {
0367                 *awaitersToRequeueTail = oldHead;
0368             }
0369         }
0370 
0371         while (awaitersToResume != nullptr)
0372         {
0373             auto* next = awaitersToResume->m_next;
0374             awaitersToResume->m_lastKnownPublished = sequence;
0375             awaitersToResume->resume();
0376             awaitersToResume = next;
0377         }
0378     }
0379 
0380     template<typename SEQUENCE, typename TRAITS>
0381     void sequence_barrier<SEQUENCE, TRAITS>::add_awaiter(awaiter_t* awaiter) const noexcept
0382     {
0383         SEQUENCE targetSequence = awaiter->m_targetSequence;
0384         awaiter_t* awaitersToRequeue = awaiter;
0385         awaiter_t** awaitersToRequeueTail = &awaiter->m_next;
0386 
0387         SEQUENCE lastKnownPublished;
0388         awaiter_t* awaitersToResume;
0389         awaiter_t** awaitersToResumeTail = &awaitersToResume;
0390 
0391         do
0392         {
0393             // Enqueue the awaiter(s)
0394             {
0395                 auto* oldHead = m_awaiters.load(std::memory_order_relaxed);
0396                 do
0397                 {
0398                     *awaitersToRequeueTail = oldHead;
0399                 } while (!m_awaiters.compare_exchange_weak(
0400                     oldHead,
0401                     awaitersToRequeue,
0402                     std::memory_order_seq_cst,
0403                     std::memory_order_relaxed));
0404             }
0405 
0406             // Check that the sequence we were waiting for wasn't published while
0407             // we were enqueueing the waiter.
0408             // This needs to be seq_cst memory order to ensure that in the case that the producer
0409             // publishes a new sequence number concurrently with this call that we either see
0410             // their write to m_lastPublished after enqueueing our awaiter, or they see our
0411             // write to m_awaiters after their write to m_lastPublished.
0412             lastKnownPublished = m_lastPublished.load(std::memory_order_seq_cst);
0413             if (TRAITS::precedes(lastKnownPublished, targetSequence))
0414             {
0415                 // None of the the awaiters we enqueued have been satisfied yet.
0416                 break;
0417             }
0418 
0419             // Reset the requeue list to empty
0420             awaitersToRequeueTail = &awaitersToRequeue;
0421 
0422             // At least one of the awaiters we just enqueued is now satisfied by a concurrently
0423             // published sequence number. The producer thread may not have seen our write to m_awaiters
0424             // so we need to try to re-acquire the list of awaiters to ensure that the waiters that
0425             // are now satisfied are woken up.
0426             auto* awaiters = m_awaiters.exchange(nullptr, std::memory_order_acquire);
0427 
0428             auto minDiff = std::numeric_limits<typename TRAITS::difference_type>::max();
0429 
0430             while (awaiters != nullptr)
0431             {
0432                 const auto diff = TRAITS::difference(awaiters->m_targetSequence, lastKnownPublished);
0433                 if (diff > 0)
0434                 {
0435                     *awaitersToRequeueTail = awaiters;
0436                     awaitersToRequeueTail = &awaiters->m_next;
0437                     minDiff = diff < minDiff ? diff : minDiff;
0438                 }
0439                 else
0440                 {
0441                     *awaitersToResumeTail = awaiters;
0442                     awaitersToResumeTail = &awaiters->m_next;
0443                 }
0444 
0445                 awaiters = awaiters->m_next;
0446             }
0447 
0448             // Null-terminate the list of awaiters to requeue.
0449             *awaitersToRequeueTail = nullptr;
0450 
0451             // Calculate the earliest target sequence required by any of the awaiters to requeue.
0452             targetSequence = static_cast<SEQUENCE>(lastKnownPublished + minDiff);
0453 
0454         } while (awaitersToRequeue != nullptr);
0455 
0456         // Null-terminate the list of awaiters to resume
0457         *awaitersToResumeTail = nullptr;
0458 
0459         // Resume the awaiters that are ready
0460         while (awaitersToResume != nullptr)
0461         {
0462             auto* next = awaitersToResume->m_next;
0463             awaitersToResume->m_lastKnownPublished = lastKnownPublished;
0464             awaitersToResume->resume();
0465             awaitersToResume = next;
0466         }
0467     }
0468 }
0469 
0470 #endif