File indexing completed on 2025-01-18 09:54:52
0001
0002
0003
0004
0005 #ifndef CPPCORO_MULTI_PRODUCER_SEQUENCER_HPP_INCLUDED
0006 #define CPPCORO_MULTI_PRODUCER_SEQUENCER_HPP_INCLUDED
0007
0008 #include <cppcoro/config.hpp>
0009 #include <cppcoro/sequence_barrier.hpp>
0010 #include <cppcoro/sequence_range.hpp>
0011 #include <cppcoro/sequence_traits.hpp>
0012
0013 #include <cppcoro/detail/manual_lifetime.hpp>
0014
0015 #include <atomic>
0016 #include <cstdint>
0017 #include <cassert>
0018
0019 namespace cppcoro
0020 {
0021 template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0022 class multi_producer_sequencer_claim_one_operation;
0023
0024 template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0025 class multi_producer_sequencer_claim_operation;
0026
0027 template<typename SEQUENCE, typename TRAITS>
0028 class multi_producer_sequencer_wait_operation_base;
0029
0030 template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0031 class multi_producer_sequencer_wait_operation;
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056 template<
0057 typename SEQUENCE = std::size_t,
0058 typename TRAITS = sequence_traits<SEQUENCE>>
0059 class multi_producer_sequencer
0060 {
0061 public:
0062
0063 multi_producer_sequencer(
0064 const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
0065 std::size_t bufferSize,
0066 SEQUENCE initialSequence = TRAITS::initial_sequence);
0067
0068
0069 std::size_t buffer_size() const noexcept { return m_sequenceMask + 1; }
0070
0071
0072
0073 SEQUENCE last_published_after(SEQUENCE lastKnownPublished) const noexcept;
0074
0075
0076
0077
0078
0079
0080 template<typename SCHEDULER>
0081 multi_producer_sequencer_wait_operation<SEQUENCE, TRAITS, SCHEDULER> wait_until_published(
0082 SEQUENCE targetSequence,
0083 SEQUENCE lastKnownPublished,
0084 SCHEDULER& scheduler) const noexcept;
0085
0086
0087
0088
0089
0090
0091 bool any_available() const noexcept;
0092
0093
0094
0095
0096
0097
0098
0099
0100
0101
0102 template<typename SCHEDULER>
0103 multi_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS, SCHEDULER>
0104 claim_one(SCHEDULER& scheduler) noexcept;
0105
0106
0107
0108
0109
0110
0111
0112
0113
0114
0115
0116
0117
0118 template<typename SCHEDULER>
0119 multi_producer_sequencer_claim_operation<SEQUENCE, TRAITS, SCHEDULER>
0120 claim_up_to(std::size_t count, SCHEDULER& scheduler) noexcept;
0121
0122
0123
0124
0125
0126
0127
0128
0129
0130
0131
0132
0133 void publish(SEQUENCE sequence) noexcept;
0134
0135
0136
0137
0138
0139
0140
0141 void publish(const sequence_range<SEQUENCE, TRAITS>& range) noexcept;
0142
0143 private:
0144
0145 template<typename SEQUENCE2, typename TRAITS2>
0146 friend class multi_producer_sequencer_wait_operation_base;
0147
0148 template<typename SEQUENCE2, typename TRAITS2, typename SCHEDULER>
0149 friend class multi_producer_sequencer_claim_operation;
0150
0151 template<typename SEQUENCE2, typename TRAITS2, typename SCHEDULER>
0152 friend class multi_producer_sequencer_claim_one_operation;
0153
0154 void resume_ready_awaiters() noexcept;
0155 void add_awaiter(multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>* awaiter) const noexcept;
0156
0157 #if CPPCORO_COMPILER_MSVC
0158 # pragma warning(push)
0159 # pragma warning(disable : 4324)
0160 #endif
0161
0162 const sequence_barrier<SEQUENCE, TRAITS>& m_consumerBarrier;
0163 const std::size_t m_sequenceMask;
0164 const std::unique_ptr<std::atomic<SEQUENCE>[]> m_published;
0165
0166 alignas(CPPCORO_CPU_CACHE_LINE)
0167 std::atomic<SEQUENCE> m_nextToClaim;
0168
0169 alignas(CPPCORO_CPU_CACHE_LINE)
0170 mutable std::atomic<multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>*> m_awaiters;
0171
0172 #if CPPCORO_COMPILER_MSVC
0173 # pragma warning(pop)
0174 #endif
0175
0176 };
0177
0178 template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0179 class multi_producer_sequencer_claim_awaiter
0180 {
0181 public:
0182
0183 multi_producer_sequencer_claim_awaiter(
0184 const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
0185 std::size_t bufferSize,
0186 const sequence_range<SEQUENCE, TRAITS>& claimedRange,
0187 SCHEDULER& scheduler) noexcept
0188 : m_barrierWait(consumerBarrier, claimedRange.back() - bufferSize, scheduler)
0189 , m_claimedRange(claimedRange)
0190 {}
0191
0192 bool await_ready() const noexcept
0193 {
0194 return m_barrierWait.await_ready();
0195 }
0196
0197 auto await_suspend(cppcoro::coroutine_handle<> awaitingCoroutine) noexcept
0198 {
0199 return m_barrierWait.await_suspend(awaitingCoroutine);
0200 }
0201
0202 sequence_range<SEQUENCE, TRAITS> await_resume() noexcept
0203 {
0204 return m_claimedRange;
0205 }
0206
0207 private:
0208
0209 sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER> m_barrierWait;
0210 sequence_range<SEQUENCE, TRAITS> m_claimedRange;
0211
0212 };
0213
0214 template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0215 class multi_producer_sequencer_claim_operation
0216 {
0217 public:
0218
0219 multi_producer_sequencer_claim_operation(
0220 multi_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
0221 std::size_t count,
0222 SCHEDULER& scheduler) noexcept
0223 : m_sequencer(sequencer)
0224 , m_count(count < sequencer.buffer_size() ? count : sequencer.buffer_size())
0225 , m_scheduler(scheduler)
0226 {
0227 }
0228
0229 multi_producer_sequencer_claim_awaiter<SEQUENCE, TRAITS, SCHEDULER> operator co_await() noexcept
0230 {
0231
0232
0233
0234
0235
0236
0237
0238
0239
0240 const SEQUENCE first = m_sequencer.m_nextToClaim.fetch_add(m_count, std::memory_order_relaxed);
0241 return multi_producer_sequencer_claim_awaiter<SEQUENCE, TRAITS, SCHEDULER>{
0242 m_sequencer.m_consumerBarrier,
0243 m_sequencer.buffer_size(),
0244 sequence_range<SEQUENCE, TRAITS>{ first, first + m_count },
0245 m_scheduler
0246 };
0247 }
0248
0249 private:
0250
0251 multi_producer_sequencer<SEQUENCE, TRAITS>& m_sequencer;
0252 std::size_t m_count;
0253 SCHEDULER& m_scheduler;
0254
0255 };
0256
0257 template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0258 class multi_producer_sequencer_claim_one_awaiter
0259 {
0260 public:
0261
0262 multi_producer_sequencer_claim_one_awaiter(
0263 const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
0264 std::size_t bufferSize,
0265 SEQUENCE claimedSequence,
0266 SCHEDULER& scheduler) noexcept
0267 : m_waitOp(consumerBarrier, claimedSequence - bufferSize, scheduler)
0268 , m_claimedSequence(claimedSequence)
0269 {}
0270
0271 bool await_ready() const noexcept
0272 {
0273 return m_waitOp.await_ready();
0274 }
0275
0276 auto await_suspend(cppcoro::coroutine_handle<> awaitingCoroutine) noexcept
0277 {
0278 return m_waitOp.await_suspend(awaitingCoroutine);
0279 }
0280
0281 SEQUENCE await_resume() noexcept
0282 {
0283 return m_claimedSequence;
0284 }
0285
0286 private:
0287
0288 sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER> m_waitOp;
0289 SEQUENCE m_claimedSequence;
0290
0291 };
0292
0293 template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0294 class multi_producer_sequencer_claim_one_operation
0295 {
0296 public:
0297
0298 multi_producer_sequencer_claim_one_operation(
0299 multi_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
0300 SCHEDULER& scheduler) noexcept
0301 : m_sequencer(sequencer)
0302 , m_scheduler(scheduler)
0303 {}
0304
0305 multi_producer_sequencer_claim_one_awaiter<SEQUENCE, TRAITS, SCHEDULER> operator co_await() noexcept
0306 {
0307 return multi_producer_sequencer_claim_one_awaiter<SEQUENCE, TRAITS, SCHEDULER>{
0308 m_sequencer.m_consumerBarrier,
0309 m_sequencer.buffer_size(),
0310 m_sequencer.m_nextToClaim.fetch_add(1, std::memory_order_relaxed),
0311 m_scheduler
0312 };
0313 }
0314
0315 private:
0316
0317 multi_producer_sequencer<SEQUENCE, TRAITS>& m_sequencer;
0318 SCHEDULER& m_scheduler;
0319
0320 };
0321
0322 template<typename SEQUENCE, typename TRAITS>
0323 class multi_producer_sequencer_wait_operation_base
0324 {
0325 public:
0326
0327 multi_producer_sequencer_wait_operation_base(
0328 const multi_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
0329 SEQUENCE targetSequence,
0330 SEQUENCE lastKnownPublished) noexcept
0331 : m_sequencer(sequencer)
0332 , m_targetSequence(targetSequence)
0333 , m_lastKnownPublished(lastKnownPublished)
0334 , m_readyToResume(false)
0335 {}
0336
0337 multi_producer_sequencer_wait_operation_base(
0338 const multi_producer_sequencer_wait_operation_base& other) noexcept
0339 : m_sequencer(other.m_sequencer)
0340 , m_targetSequence(other.m_targetSequence)
0341 , m_lastKnownPublished(other.m_lastKnownPublished)
0342 , m_readyToResume(false)
0343 {}
0344
0345 bool await_ready() const noexcept
0346 {
0347 return !TRAITS::precedes(m_lastKnownPublished, m_targetSequence);
0348 }
0349
0350 bool await_suspend(cppcoro::coroutine_handle<> awaitingCoroutine) noexcept
0351 {
0352 m_awaitingCoroutine = awaitingCoroutine;
0353
0354 m_sequencer.add_awaiter(this);
0355
0356
0357
0358
0359
0360 return !m_readyToResume.exchange(true, std::memory_order_acquire);
0361 }
0362
0363 SEQUENCE await_resume() noexcept
0364 {
0365 return m_lastKnownPublished;
0366 }
0367
0368 protected:
0369
0370 friend class multi_producer_sequencer<SEQUENCE, TRAITS>;
0371
0372 void resume(SEQUENCE lastKnownPublished) noexcept
0373 {
0374 m_lastKnownPublished = lastKnownPublished;
0375 if (m_readyToResume.exchange(true, std::memory_order_release))
0376 {
0377 resume_impl();
0378 }
0379 }
0380
0381 virtual void resume_impl() noexcept = 0;
0382
0383 const multi_producer_sequencer<SEQUENCE, TRAITS>& m_sequencer;
0384 SEQUENCE m_targetSequence;
0385 SEQUENCE m_lastKnownPublished;
0386 multi_producer_sequencer_wait_operation_base* m_next;
0387 cppcoro::coroutine_handle<> m_awaitingCoroutine;
0388 std::atomic<bool> m_readyToResume;
0389 };
0390
0391 template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0392 class multi_producer_sequencer_wait_operation :
0393 public multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>
0394 {
0395 using schedule_operation = decltype(std::declval<SCHEDULER&>().schedule());
0396
0397 public:
0398
0399 multi_producer_sequencer_wait_operation(
0400 const multi_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
0401 SEQUENCE targetSequence,
0402 SEQUENCE lastKnownPublished,
0403 SCHEDULER& scheduler) noexcept
0404 : multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>(sequencer, targetSequence, lastKnownPublished)
0405 , m_scheduler(scheduler)
0406 {}
0407
0408 multi_producer_sequencer_wait_operation(
0409 const multi_producer_sequencer_wait_operation& other) noexcept
0410 : multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>(other)
0411 , m_scheduler(other.m_scheduler)
0412 {}
0413
0414 ~multi_producer_sequencer_wait_operation()
0415 {
0416 if (m_isScheduleAwaiterCreated)
0417 {
0418 m_scheduleAwaiter.destruct();
0419 }
0420 if (m_isScheduleOperationCreated)
0421 {
0422 m_scheduleOperation.destruct();
0423 }
0424 }
0425
0426 SEQUENCE await_resume() noexcept(noexcept(m_scheduleOperation->await_resume()))
0427 {
0428 if (m_isScheduleOperationCreated)
0429 {
0430 m_scheduleOperation->await_resume();
0431 }
0432
0433 return multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>::await_resume();
0434 }
0435
0436 private:
0437
0438 void resume_impl() noexcept override
0439 {
0440 try
0441 {
0442 m_scheduleOperation.construct(m_scheduler.schedule());
0443 m_isScheduleOperationCreated = true;
0444
0445 m_scheduleAwaiter.construct(detail::get_awaiter(
0446 static_cast<schedule_operation&&>(*m_scheduleOperation)));
0447 m_isScheduleAwaiterCreated = true;
0448
0449 if (!m_scheduleAwaiter->await_ready())
0450 {
0451 using await_suspend_result_t = decltype(m_scheduleAwaiter->await_suspend(this->m_awaitingCoroutine));
0452 if constexpr (std::is_void_v<await_suspend_result_t>)
0453 {
0454 m_scheduleAwaiter->await_suspend(this->m_awaitingCoroutine);
0455 return;
0456 }
0457 else if constexpr (std::is_same_v<await_suspend_result_t, bool>)
0458 {
0459 if (m_scheduleAwaiter->await_suspend(this->m_awaitingCoroutine))
0460 {
0461 return;
0462 }
0463 }
0464 else
0465 {
0466
0467 m_scheduleAwaiter->await_suspend(this->m_awaitingCoroutine).resume();
0468 return;
0469 }
0470 }
0471 }
0472 catch (...)
0473 {
0474
0475
0476
0477 }
0478
0479
0480 this->m_awaitingCoroutine.resume();
0481 }
0482
0483 SCHEDULER& m_scheduler;
0484
0485 detail::manual_lifetime<schedule_operation> m_scheduleOperation;
0486 detail::manual_lifetime<typename awaitable_traits<schedule_operation>::awaiter_t> m_scheduleAwaiter;
0487 bool m_isScheduleOperationCreated = false;
0488 bool m_isScheduleAwaiterCreated = false;
0489
0490 };
0491
0492 template<typename SEQUENCE, typename TRAITS>
0493 multi_producer_sequencer<SEQUENCE, TRAITS>::multi_producer_sequencer(
0494 const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
0495 std::size_t bufferSize,
0496 SEQUENCE initialSequence)
0497 : m_consumerBarrier(consumerBarrier)
0498 , m_sequenceMask(bufferSize - 1)
0499 , m_published(std::make_unique<std::atomic<SEQUENCE>[]>(bufferSize))
0500 , m_nextToClaim(initialSequence + 1)
0501 , m_awaiters(nullptr)
0502 {
0503
0504 assert(bufferSize > 0 && (bufferSize & (bufferSize - 1)) == 0);
0505
0506 using diff_t = typename TRAITS::difference_type;
0507 using unsigned_diff_t = std::make_unsigned_t<diff_t>;
0508 constexpr unsigned_diff_t maxSize = static_cast<unsigned_diff_t>(std::numeric_limits<diff_t>::max());
0509 assert(bufferSize <= maxSize);
0510
0511 SEQUENCE seq = initialSequence - (bufferSize - 1);
0512 do
0513 {
0514 #ifdef __cpp_lib_atomic_value_initialization
0515 m_published[seq & m_sequenceMask].store(seq, std::memory_order_relaxed);
0516 #else
0517 std::atomic_init(&m_published[seq & m_sequenceMask], seq);
0518 #endif
0519 } while (seq++ != initialSequence);
0520 }
0521
0522 template<typename SEQUENCE, typename TRAITS>
0523 SEQUENCE multi_producer_sequencer<SEQUENCE, TRAITS>::last_published_after(
0524 SEQUENCE lastKnownPublished) const noexcept
0525 {
0526 const auto mask = m_sequenceMask;
0527 SEQUENCE seq = lastKnownPublished + 1;
0528 while (m_published[seq & mask].load(std::memory_order_acquire) == seq)
0529 {
0530 lastKnownPublished = seq++;
0531 }
0532 return lastKnownPublished;
0533 }
0534
0535 template<typename SEQUENCE, typename TRAITS>
0536 template<typename SCHEDULER>
0537 multi_producer_sequencer_wait_operation<SEQUENCE, TRAITS, SCHEDULER>
0538 multi_producer_sequencer<SEQUENCE, TRAITS>::wait_until_published(
0539 SEQUENCE targetSequence,
0540 SEQUENCE lastKnownPublished,
0541 SCHEDULER& scheduler) const noexcept
0542 {
0543 return multi_producer_sequencer_wait_operation<SEQUENCE, TRAITS, SCHEDULER>{
0544 *this, targetSequence, lastKnownPublished, scheduler
0545 };
0546 }
0547
0548 template<typename SEQUENCE, typename TRAITS>
0549 bool multi_producer_sequencer<SEQUENCE, TRAITS>::any_available() const noexcept
0550 {
0551 return TRAITS::precedes(
0552 m_nextToClaim.load(std::memory_order_relaxed),
0553 m_consumerBarrier.last_published() + buffer_size());
0554 }
0555
0556 template<typename SEQUENCE, typename TRAITS>
0557 template<typename SCHEDULER>
0558 multi_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS, SCHEDULER>
0559 multi_producer_sequencer<SEQUENCE, TRAITS>::claim_one(SCHEDULER& scheduler) noexcept
0560 {
0561 return multi_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS, SCHEDULER>{ *this, scheduler };
0562 }
0563
0564 template<typename SEQUENCE, typename TRAITS>
0565 template<typename SCHEDULER>
0566 multi_producer_sequencer_claim_operation<SEQUENCE, TRAITS, SCHEDULER>
0567 multi_producer_sequencer<SEQUENCE, TRAITS>::claim_up_to(std::size_t count, SCHEDULER& scheduler) noexcept
0568 {
0569 return multi_producer_sequencer_claim_operation<SEQUENCE, TRAITS, SCHEDULER>{ *this, count, scheduler };
0570 }
0571
0572 template<typename SEQUENCE, typename TRAITS>
0573 void multi_producer_sequencer<SEQUENCE, TRAITS>::publish(SEQUENCE sequence) noexcept
0574 {
0575 m_published[sequence & m_sequenceMask].store(sequence, std::memory_order_seq_cst);
0576
0577
0578 resume_ready_awaiters();
0579 }
0580
0581 template<typename SEQUENCE, typename TRAITS>
0582 void multi_producer_sequencer<SEQUENCE, TRAITS>::publish(const sequence_range<SEQUENCE, TRAITS>& range) noexcept
0583 {
0584 if (range.empty())
0585 {
0586 return;
0587 }
0588
0589
0590
0591
0592 for (SEQUENCE seq : range.skip(1))
0593 {
0594 m_published[seq & m_sequenceMask].store(seq, std::memory_order_relaxed);
0595 }
0596
0597
0598 m_published[range.front() & m_sequenceMask].store(range.front(), std::memory_order_seq_cst);
0599
0600
0601 resume_ready_awaiters();
0602 }
0603
0604 template<typename SEQUENCE, typename TRAITS>
0605 void multi_producer_sequencer<SEQUENCE, TRAITS>::resume_ready_awaiters() noexcept
0606 {
0607 using awaiter_t = multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>;
0608
0609 awaiter_t* awaiters = m_awaiters.load(std::memory_order_seq_cst);
0610 if (awaiters == nullptr)
0611 {
0612
0613 return;
0614 }
0615
0616
0617
0618 awaiters = m_awaiters.exchange(nullptr, std::memory_order_seq_cst);
0619 if (awaiters == nullptr)
0620 {
0621
0622
0623 return;
0624 }
0625
0626 SEQUENCE lastKnownPublished;
0627
0628 awaiter_t* awaitersToResume;
0629 awaiter_t** awaitersToResumeTail = &awaitersToResume;
0630
0631 awaiter_t* awaitersToRequeue;
0632 awaiter_t** awaitersToRequeueTail = &awaitersToRequeue;
0633
0634 do
0635 {
0636 using diff_t = typename TRAITS::difference_type;
0637
0638 lastKnownPublished = last_published_after(awaiters->m_lastKnownPublished);
0639
0640
0641 auto minDiff = std::numeric_limits<diff_t>::max();
0642 do
0643 {
0644 auto diff = TRAITS::difference(awaiters->m_targetSequence, lastKnownPublished);
0645 if (diff > 0)
0646 {
0647
0648 minDiff = diff < minDiff ? diff : minDiff;
0649 *awaitersToRequeueTail = awaiters;
0650 awaitersToRequeueTail = &awaiters->m_next;
0651 }
0652 else
0653 {
0654 *awaitersToResumeTail = awaiters;
0655 awaitersToResumeTail = &awaiters->m_next;
0656 }
0657 awaiters->m_lastKnownPublished = lastKnownPublished;
0658 awaiters = awaiters->m_next;
0659 } while (awaiters != nullptr);
0660
0661
0662 *awaitersToRequeueTail = nullptr;
0663
0664 if (awaitersToRequeue != nullptr)
0665 {
0666
0667 awaiter_t* oldHead = nullptr;
0668 while (!m_awaiters.compare_exchange_weak(oldHead, awaitersToRequeue, std::memory_order_seq_cst, std::memory_order_relaxed))
0669 {
0670 *awaitersToRequeueTail = oldHead;
0671 }
0672
0673
0674 awaitersToRequeueTail = &awaitersToRequeue;
0675
0676 const SEQUENCE earliestTargetSequence = lastKnownPublished + minDiff;
0677
0678
0679
0680
0681
0682
0683
0684
0685
0686
0687
0688
0689
0690
0691
0692 const auto sequenceMask = m_sequenceMask;
0693 SEQUENCE seq = lastKnownPublished + 1;
0694 while (m_published[seq & sequenceMask].load(std::memory_order_seq_cst) == seq)
0695 {
0696 lastKnownPublished = seq;
0697 if (seq == earliestTargetSequence)
0698 {
0699
0700
0701 awaiters = m_awaiters.exchange(nullptr, std::memory_order_acquire);
0702 break;
0703 }
0704 ++seq;
0705 }
0706 }
0707 } while (awaiters != nullptr);
0708
0709
0710 *awaitersToResumeTail = nullptr;
0711
0712 while (awaitersToResume != nullptr)
0713 {
0714 awaiter_t* next = awaitersToResume->m_next;
0715 awaitersToResume->resume(lastKnownPublished);
0716 awaitersToResume = next;
0717 }
0718 }
0719
0720 template<typename SEQUENCE, typename TRAITS>
0721 void multi_producer_sequencer<SEQUENCE, TRAITS>::add_awaiter(
0722 multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>* awaiter) const noexcept
0723 {
0724 using awaiter_t = multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>;
0725
0726 SEQUENCE targetSequence = awaiter->m_targetSequence;
0727 SEQUENCE lastKnownPublished = awaiter->m_lastKnownPublished;
0728
0729 awaiter_t* awaitersToEnqueue = awaiter;
0730 awaiter_t** awaitersToEnqueueTail = &awaiter->m_next;
0731
0732 awaiter_t* awaitersToResume;
0733 awaiter_t** awaitersToResumeTail = &awaitersToResume;
0734
0735 const SEQUENCE sequenceMask = m_sequenceMask;
0736
0737 do
0738 {
0739
0740 {
0741 awaiter_t* oldHead = m_awaiters.load(std::memory_order_relaxed);
0742 do
0743 {
0744 *awaitersToEnqueueTail = oldHead;
0745 } while (!m_awaiters.compare_exchange_weak(
0746 oldHead,
0747 awaitersToEnqueue,
0748 std::memory_order_seq_cst,
0749 std::memory_order_relaxed));
0750 }
0751
0752
0753 awaitersToEnqueueTail = &awaitersToEnqueue;
0754
0755
0756
0757
0758
0759
0760
0761
0762
0763
0764
0765
0766
0767
0768
0769 while (m_published[(lastKnownPublished + 1) & sequenceMask].load(std::memory_order_seq_cst) == (lastKnownPublished + 1))
0770 {
0771 ++lastKnownPublished;
0772 }
0773
0774 if (!TRAITS::precedes(lastKnownPublished, targetSequence))
0775 {
0776
0777
0778 awaiter_t* awaiters = m_awaiters.exchange(nullptr, std::memory_order_acquire);
0779
0780 using diff_t = typename TRAITS::difference_type;
0781
0782 diff_t minDiff = std::numeric_limits<diff_t>::max();
0783
0784 while (awaiters != nullptr)
0785 {
0786 diff_t diff = TRAITS::difference(targetSequence, lastKnownPublished);
0787 if (diff > 0)
0788 {
0789
0790 minDiff = diff < minDiff ? diff : minDiff;
0791 *awaitersToEnqueueTail = awaiters;
0792 awaitersToEnqueueTail = &awaiters->m_next;
0793 awaiters->m_lastKnownPublished = lastKnownPublished;
0794 }
0795 else
0796 {
0797
0798 *awaitersToResumeTail = awaiters;
0799 awaitersToResumeTail = &awaiters->m_next;
0800 }
0801 awaiters = awaiters->m_next;
0802 }
0803
0804
0805
0806
0807 targetSequence = static_cast<SEQUENCE>(lastKnownPublished + minDiff);
0808 }
0809
0810
0811 *awaitersToEnqueueTail = nullptr;
0812
0813 } while (awaitersToEnqueue != nullptr);
0814
0815
0816 *awaitersToResumeTail = nullptr;
0817
0818
0819 while (awaitersToResume != nullptr)
0820 {
0821
0822 awaiter_t* next = awaitersToResume->m_next;
0823 awaitersToResume->resume(lastKnownPublished);
0824 awaitersToResume = next;
0825 }
0826 }
0827 }
0828
0829 #endif