File indexing completed on 2025-08-28 08:27:17
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025 #pragma once
0026
0027 #include <atomic>
0028 #include <cassert>
0029 #include <cstdlib>
0030 #include <memory>
0031 #include <stdexcept>
0032 #include <type_traits>
0033 #include <utility>
0034
0035 namespace arrow_vendored {
0036 namespace folly {
0037
0038
0039 namespace {
0040 #if defined(__arm__)
0041 #define FOLLY_ARM 1
0042 #else
0043 #define FOLLY_ARM 0
0044 #endif
0045
0046 #if defined(__s390x__)
0047 #define FOLLY_S390X 1
0048 #else
0049 #define FOLLY_S390X 0
0050 #endif
0051
0052 constexpr bool kIsArchArm = FOLLY_ARM == 1;
0053 constexpr bool kIsArchS390X = FOLLY_S390X == 1;
0054 }
0055
0056
0057 namespace {
0058
0059 constexpr std::size_t hardware_destructive_interference_size =
0060 (kIsArchArm || kIsArchS390X) ? 64 : 128;
0061
0062 }
0063
0064
0065
0066
0067
0068 template <class T>
0069 struct ProducerConsumerQueue {
0070 typedef T value_type;
0071
0072 ProducerConsumerQueue(const ProducerConsumerQueue&) = delete;
0073 ProducerConsumerQueue& operator=(const ProducerConsumerQueue&) = delete;
0074
0075
0076
0077
0078
0079
0080 explicit ProducerConsumerQueue(uint32_t size)
0081 : size_(size),
0082 records_(static_cast<T*>(std::malloc(sizeof(T) * size))),
0083 readIndex_(0),
0084 writeIndex_(0) {
0085 assert(size >= 2);
0086 if (!records_) {
0087 throw std::bad_alloc();
0088 }
0089 }
0090
0091 ~ProducerConsumerQueue() {
0092
0093
0094
0095 if (!std::is_trivially_destructible<T>::value) {
0096 size_t readIndex = readIndex_;
0097 size_t endIndex = writeIndex_;
0098 while (readIndex != endIndex) {
0099 records_[readIndex].~T();
0100 if (++readIndex == size_) {
0101 readIndex = 0;
0102 }
0103 }
0104 }
0105
0106 std::free(records_);
0107 }
0108
0109 template <class... Args>
0110 bool Write(Args&&... recordArgs) {
0111 auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
0112 auto nextRecord = currentWrite + 1;
0113 if (nextRecord == size_) {
0114 nextRecord = 0;
0115 }
0116 if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
0117 new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
0118 writeIndex_.store(nextRecord, std::memory_order_release);
0119 return true;
0120 }
0121
0122
0123 return false;
0124 }
0125
0126
0127 bool Read(T& record) {
0128 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
0129 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
0130
0131 return false;
0132 }
0133
0134 auto nextRecord = currentRead + 1;
0135 if (nextRecord == size_) {
0136 nextRecord = 0;
0137 }
0138 record = std::move(records_[currentRead]);
0139 records_[currentRead].~T();
0140 readIndex_.store(nextRecord, std::memory_order_release);
0141 return true;
0142 }
0143
0144
0145
0146 T* FrontPtr() {
0147 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
0148 if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
0149
0150 return nullptr;
0151 }
0152 return &records_[currentRead];
0153 }
0154
0155
0156 void PopFront() {
0157 auto const currentRead = readIndex_.load(std::memory_order_relaxed);
0158 assert(currentRead != writeIndex_.load(std::memory_order_acquire));
0159
0160 auto nextRecord = currentRead + 1;
0161 if (nextRecord == size_) {
0162 nextRecord = 0;
0163 }
0164 records_[currentRead].~T();
0165 readIndex_.store(nextRecord, std::memory_order_release);
0166 }
0167
0168 bool IsEmpty() const {
0169 return readIndex_.load(std::memory_order_acquire) ==
0170 writeIndex_.load(std::memory_order_acquire);
0171 }
0172
0173 bool IsFull() const {
0174 auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1;
0175 if (nextRecord == size_) {
0176 nextRecord = 0;
0177 }
0178 if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
0179 return false;
0180 }
0181
0182 return true;
0183 }
0184
0185
0186
0187
0188
0189
0190 size_t SizeGuess() const {
0191 int ret = writeIndex_.load(std::memory_order_acquire) -
0192 readIndex_.load(std::memory_order_acquire);
0193 if (ret < 0) {
0194 ret += size_;
0195 }
0196 return ret;
0197 }
0198
0199
0200 size_t capacity() const { return size_ - 1; }
0201
0202 private:
0203 using AtomicIndex = std::atomic<unsigned int>;
0204
0205 char pad0_[hardware_destructive_interference_size];
0206 const uint32_t size_;
0207 T* const records_;
0208
0209 AtomicIndex readIndex_;
0210 char pad1_[hardware_destructive_interference_size - sizeof(AtomicIndex)];
0211 AtomicIndex writeIndex_;
0212
0213 char pad2_[hardware_destructive_interference_size - sizeof(AtomicIndex)];
0214 };
0215
0216 }
0217 }