File indexing completed on 2025-01-18 09:54:52
0001
0002
0003
0004
0005 #ifndef CPPCORO_SEQUENCE_BARRIER_HPP_INCLUDED
0006 #define CPPCORO_SEQUENCE_BARRIER_HPP_INCLUDED
0007
0008 #include <cppcoro/config.hpp>
0009 #include <cppcoro/awaitable_traits.hpp>
0010 #include <cppcoro/sequence_traits.hpp>
0011 #include <cppcoro/detail/manual_lifetime.hpp>
0012
0013 #include <atomic>
0014 #include <cassert>
0015 #include <cstdint>
0016 #include <limits>
0017 #include <optional>
0018 #include <cppcoro/coroutine.hpp>
0019
0020 namespace cppcoro
0021 {
0022 template<typename SEQUENCE, typename TRAITS>
0023 class sequence_barrier_wait_operation_base;
0024
0025 template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0026 class sequence_barrier_wait_operation;
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041 template<
0042 typename SEQUENCE = std::size_t,
0043 typename TRAITS = sequence_traits<SEQUENCE>>
0044 class sequence_barrier
0045 {
0046 static_assert(
0047 std::is_integral_v<SEQUENCE>,
0048 "sequence_barrier requires an integral sequence type");
0049
0050 using awaiter_t = sequence_barrier_wait_operation_base<SEQUENCE, TRAITS>;
0051
0052 public:
0053
0054
0055
0056 sequence_barrier(SEQUENCE initialSequence = TRAITS::initial_sequence) noexcept
0057 : m_lastPublished(initialSequence)
0058 , m_awaiters(nullptr)
0059 {}
0060
0061 ~sequence_barrier()
0062 {
0063
0064 assert(m_awaiters.load(std::memory_order_relaxed) == nullptr);
0065 }
0066
0067
0068
0069
0070
0071
0072
0073 SEQUENCE last_published() const noexcept
0074 {
0075 return m_lastPublished.load(std::memory_order_acquire);
0076 }
0077
0078
0079
0080
0081
0082
0083
0084
0085
0086
0087
0088
0089
0090
0091
0092
0093
0094 template<typename SCHEDULER>
0095 [[nodiscard]]
0096 sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER> wait_until_published(
0097 SEQUENCE targetSequence,
0098 SCHEDULER& scheduler) const noexcept;
0099
0100
0101
0102
0103
0104
0105
0106
0107
0108
0109
0110 void publish(SEQUENCE sequence) noexcept;
0111
0112 private:
0113
0114 friend class sequence_barrier_wait_operation_base<SEQUENCE, TRAITS>;
0115
0116 void add_awaiter(awaiter_t* awaiter) const noexcept;
0117
0118 #if CPPCORO_COMPILER_MSVC
0119 # pragma warning(push)
0120 # pragma warning(disable : 4324)
0121 #endif
0122
0123
0124 alignas(CPPCORO_CPU_CACHE_LINE)
0125 std::atomic<SEQUENCE> m_lastPublished;
0126
0127
0128 alignas(CPPCORO_CPU_CACHE_LINE)
0129 mutable std::atomic<awaiter_t*> m_awaiters;
0130
0131 #if CPPCORO_COMPILER_MSVC
0132 # pragma warning(pop)
0133 #endif
0134
0135 };
0136
0137 template<typename SEQUENCE, typename TRAITS>
0138 class sequence_barrier_wait_operation_base
0139 {
0140 public:
0141
0142 explicit sequence_barrier_wait_operation_base(
0143 const sequence_barrier<SEQUENCE, TRAITS>& barrier,
0144 SEQUENCE targetSequence) noexcept
0145 : m_barrier(barrier)
0146 , m_targetSequence(targetSequence)
0147 , m_lastKnownPublished(barrier.last_published())
0148 , m_readyToResume(false)
0149 {}
0150
0151 sequence_barrier_wait_operation_base(
0152 const sequence_barrier_wait_operation_base& other) noexcept
0153 : m_barrier(other.m_barrier)
0154 , m_targetSequence(other.m_targetSequence)
0155 , m_lastKnownPublished(other.m_lastKnownPublished)
0156 , m_readyToResume(false)
0157 {}
0158
0159 bool await_ready() const noexcept
0160 {
0161 return !TRAITS::precedes(m_lastKnownPublished, m_targetSequence);
0162 }
0163
0164 bool await_suspend(cppcoro::coroutine_handle<> awaitingCoroutine) noexcept
0165 {
0166 m_awaitingCoroutine = awaitingCoroutine;
0167 m_barrier.add_awaiter(this);
0168 return !m_readyToResume.exchange(true, std::memory_order_acquire);
0169 }
0170
0171 SEQUENCE await_resume() noexcept
0172 {
0173 return m_lastKnownPublished;
0174 }
0175
0176 protected:
0177
0178 friend class sequence_barrier<SEQUENCE, TRAITS>;
0179
0180 void resume() noexcept
0181 {
0182
0183 if (m_readyToResume.exchange(true, std::memory_order_release))
0184 {
0185 resume_impl();
0186 }
0187 }
0188
0189 virtual void resume_impl() noexcept = 0;
0190
0191 const sequence_barrier<SEQUENCE, TRAITS>& m_barrier;
0192 const SEQUENCE m_targetSequence;
0193 SEQUENCE m_lastKnownPublished;
0194 sequence_barrier_wait_operation_base* m_next;
0195 cppcoro::coroutine_handle<> m_awaitingCoroutine;
0196 std::atomic<bool> m_readyToResume;
0197
0198 };
0199
0200 template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
0201 class sequence_barrier_wait_operation : public sequence_barrier_wait_operation_base<SEQUENCE, TRAITS>
0202 {
0203 using schedule_operation = decltype(std::declval<SCHEDULER&>().schedule());
0204
0205 public:
0206 sequence_barrier_wait_operation(
0207 const sequence_barrier<SEQUENCE, TRAITS>& barrier,
0208 SEQUENCE targetSequence,
0209 SCHEDULER& scheduler) noexcept
0210 : sequence_barrier_wait_operation_base<SEQUENCE, TRAITS>(barrier, targetSequence)
0211 , m_scheduler(scheduler)
0212 {}
0213
0214 sequence_barrier_wait_operation(
0215 const sequence_barrier_wait_operation& other) noexcept
0216 : sequence_barrier_wait_operation_base<SEQUENCE, TRAITS>(other)
0217 , m_scheduler(other.m_scheduler)
0218 {}
0219
0220 ~sequence_barrier_wait_operation()
0221 {
0222 if (m_isScheduleAwaiterCreated)
0223 {
0224 m_scheduleAwaiter.destruct();
0225 }
0226 if (m_isScheduleOperationCreated)
0227 {
0228 m_scheduleOperation.destruct();
0229 }
0230 }
0231
0232 decltype(auto) await_resume() noexcept(noexcept(m_scheduleAwaiter->await_resume()))
0233 {
0234 if (m_isScheduleAwaiterCreated)
0235 {
0236 m_scheduleAwaiter->await_resume();
0237 }
0238
0239 return sequence_barrier_wait_operation_base<SEQUENCE, TRAITS>::await_resume();
0240 }
0241
0242 private:
0243
0244 void resume_impl() noexcept override
0245 {
0246 try
0247 {
0248 m_scheduleOperation.construct(m_scheduler.schedule());
0249 m_isScheduleOperationCreated = true;
0250
0251 m_scheduleAwaiter.construct(detail::get_awaiter(
0252 static_cast<schedule_operation&&>(*m_scheduleOperation)));
0253 m_isScheduleAwaiterCreated = true;
0254
0255 if (!m_scheduleAwaiter->await_ready())
0256 {
0257 using await_suspend_result_t = decltype(m_scheduleAwaiter->await_suspend(this->m_awaitingCoroutine));
0258 if constexpr (std::is_void_v<await_suspend_result_t>)
0259 {
0260 m_scheduleAwaiter->await_suspend(this->m_awaitingCoroutine);
0261 return;
0262 }
0263 else if constexpr (std::is_same_v<await_suspend_result_t, bool>)
0264 {
0265 if (m_scheduleAwaiter->await_suspend(this->m_awaitingCoroutine))
0266 {
0267 return;
0268 }
0269 }
0270 else
0271 {
0272
0273 m_scheduleAwaiter->await_suspend(this->m_awaitingCoroutine).resume();
0274 return;
0275 }
0276 }
0277 }
0278 catch (...)
0279 {
0280
0281
0282
0283 }
0284
0285
0286 this->m_awaitingCoroutine.resume();
0287 }
0288
0289 SCHEDULER& m_scheduler;
0290
0291 detail::manual_lifetime<schedule_operation> m_scheduleOperation;
0292 detail::manual_lifetime<typename awaitable_traits<schedule_operation>::awaiter_t> m_scheduleAwaiter;
0293 bool m_isScheduleOperationCreated = false;
0294 bool m_isScheduleAwaiterCreated = false;
0295 };
0296
0297 template<typename SEQUENCE, typename TRAITS>
0298 template<typename SCHEDULER>
0299 [[nodiscard]]
0300 sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER> sequence_barrier<SEQUENCE, TRAITS>::wait_until_published(
0301 SEQUENCE targetSequence,
0302 SCHEDULER& scheduler) const noexcept
0303 {
0304 return sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER>(*this, targetSequence, scheduler);
0305 }
0306
0307 template<typename SEQUENCE, typename TRAITS>
0308 void sequence_barrier<SEQUENCE, TRAITS>::publish(SEQUENCE sequence) noexcept
0309 {
0310 m_lastPublished.store(sequence, std::memory_order_seq_cst);
0311
0312
0313 auto* awaiters = m_awaiters.load(std::memory_order_seq_cst);
0314 if (awaiters == nullptr)
0315 {
0316 return;
0317 }
0318
0319
0320
0321
0322 awaiters = m_awaiters.exchange(nullptr, std::memory_order_acquire);
0323 if (awaiters == nullptr)
0324 {
0325 return;
0326 }
0327
0328
0329
0330
0331 awaiter_t* awaitersToResume;
0332 awaiter_t** awaitersToResumeTail = &awaitersToResume;
0333
0334 awaiter_t* awaitersToRequeue;
0335 awaiter_t** awaitersToRequeueTail = &awaitersToRequeue;
0336
0337 do
0338 {
0339 if (TRAITS::precedes(sequence, awaiters->m_targetSequence))
0340 {
0341
0342 *awaitersToRequeueTail = awaiters;
0343 awaitersToRequeueTail = &awaiters->m_next;
0344 }
0345 else
0346 {
0347
0348 *awaitersToResumeTail = awaiters;
0349 awaitersToResumeTail = &awaiters->m_next;
0350 }
0351 awaiters = awaiters->m_next;
0352 } while (awaiters != nullptr);
0353
0354
0355 *awaitersToRequeueTail = nullptr;
0356 *awaitersToResumeTail = nullptr;
0357
0358 if (awaitersToRequeue != nullptr)
0359 {
0360 awaiter_t* oldHead = nullptr;
0361 while (!m_awaiters.compare_exchange_weak(
0362 oldHead,
0363 awaitersToRequeue,
0364 std::memory_order_release,
0365 std::memory_order_relaxed))
0366 {
0367 *awaitersToRequeueTail = oldHead;
0368 }
0369 }
0370
0371 while (awaitersToResume != nullptr)
0372 {
0373 auto* next = awaitersToResume->m_next;
0374 awaitersToResume->m_lastKnownPublished = sequence;
0375 awaitersToResume->resume();
0376 awaitersToResume = next;
0377 }
0378 }
0379
0380 template<typename SEQUENCE, typename TRAITS>
0381 void sequence_barrier<SEQUENCE, TRAITS>::add_awaiter(awaiter_t* awaiter) const noexcept
0382 {
0383 SEQUENCE targetSequence = awaiter->m_targetSequence;
0384 awaiter_t* awaitersToRequeue = awaiter;
0385 awaiter_t** awaitersToRequeueTail = &awaiter->m_next;
0386
0387 SEQUENCE lastKnownPublished;
0388 awaiter_t* awaitersToResume;
0389 awaiter_t** awaitersToResumeTail = &awaitersToResume;
0390
0391 do
0392 {
0393
0394 {
0395 auto* oldHead = m_awaiters.load(std::memory_order_relaxed);
0396 do
0397 {
0398 *awaitersToRequeueTail = oldHead;
0399 } while (!m_awaiters.compare_exchange_weak(
0400 oldHead,
0401 awaitersToRequeue,
0402 std::memory_order_seq_cst,
0403 std::memory_order_relaxed));
0404 }
0405
0406
0407
0408
0409
0410
0411
0412 lastKnownPublished = m_lastPublished.load(std::memory_order_seq_cst);
0413 if (TRAITS::precedes(lastKnownPublished, targetSequence))
0414 {
0415
0416 break;
0417 }
0418
0419
0420 awaitersToRequeueTail = &awaitersToRequeue;
0421
0422
0423
0424
0425
0426 auto* awaiters = m_awaiters.exchange(nullptr, std::memory_order_acquire);
0427
0428 auto minDiff = std::numeric_limits<typename TRAITS::difference_type>::max();
0429
0430 while (awaiters != nullptr)
0431 {
0432 const auto diff = TRAITS::difference(awaiters->m_targetSequence, lastKnownPublished);
0433 if (diff > 0)
0434 {
0435 *awaitersToRequeueTail = awaiters;
0436 awaitersToRequeueTail = &awaiters->m_next;
0437 minDiff = diff < minDiff ? diff : minDiff;
0438 }
0439 else
0440 {
0441 *awaitersToResumeTail = awaiters;
0442 awaitersToResumeTail = &awaiters->m_next;
0443 }
0444
0445 awaiters = awaiters->m_next;
0446 }
0447
0448
0449 *awaitersToRequeueTail = nullptr;
0450
0451
0452 targetSequence = static_cast<SEQUENCE>(lastKnownPublished + minDiff);
0453
0454 } while (awaitersToRequeue != nullptr);
0455
0456
0457 *awaitersToResumeTail = nullptr;
0458
0459
0460 while (awaitersToResume != nullptr)
0461 {
0462 auto* next = awaitersToResume->m_next;
0463 awaitersToResume->m_lastKnownPublished = lastKnownPublished;
0464 awaitersToResume->resume();
0465 awaitersToResume = next;
0466 }
0467 }
0468 }
0469
0470 #endif