Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:27:17

0001 // Vendored from git tag v2021.02.15.00
0002 
0003 /*
0004  * Copyright (c) Facebook, Inc. and its affiliates.
0005  *
0006  * Licensed under the Apache License, Version 2.0 (the "License");
0007  * you may not use this file except in compliance with the License.
0008  * You may obtain a copy of the License at
0009  *
0010  *     http://www.apache.org/licenses/LICENSE-2.0
0011  *
0012  * Unless required by applicable law or agreed to in writing, software
0013  * distributed under the License is distributed on an "AS IS" BASIS,
0014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0015  * See the License for the specific language governing permissions and
0016  * limitations under the License.
0017  */
0018 
0019 // @author Bo Hu (bhu@fb.com)
0020 // @author Jordan DeLong (delong.j@fb.com)
0021 
0022 // This file has been modified as part of Apache Arrow to conform to
0023 // Apache Arrow's coding conventions
0024 
0025 #pragma once
0026 
0027 #include <atomic>
0028 #include <cassert>
0029 #include <cstdlib>
0030 #include <memory>
0031 #include <stdexcept>
0032 #include <type_traits>
0033 #include <utility>
0034 
0035 namespace arrow_vendored {
0036 namespace folly {
0037 
0038 // Vendored from folly/Portability.h
0039 namespace {
0040 #if defined(__arm__)
0041 #define FOLLY_ARM 1
0042 #else
0043 #define FOLLY_ARM 0
0044 #endif
0045 
0046 #if defined(__s390x__)
0047 #define FOLLY_S390X 1
0048 #else
0049 #define FOLLY_S390X 0
0050 #endif
0051 
0052 constexpr bool kIsArchArm = FOLLY_ARM == 1;
0053 constexpr bool kIsArchS390X = FOLLY_S390X == 1;
0054 }  // namespace
0055 
0056 // Vendored from folly/lang/Align.h
0057 namespace {
0058 
0059 constexpr std::size_t hardware_destructive_interference_size =
0060     (kIsArchArm || kIsArchS390X) ? 64 : 128;
0061 
0062 }  // namespace
0063 
0064 /*
0065  * ProducerConsumerQueue is a one producer and one consumer queue
0066  * without locks.
0067  */
0068 template <class T>
0069 struct ProducerConsumerQueue {
0070   typedef T value_type;
0071 
0072   ProducerConsumerQueue(const ProducerConsumerQueue&) = delete;
0073   ProducerConsumerQueue& operator=(const ProducerConsumerQueue&) = delete;
0074 
0075   // size must be >= 2.
0076   //
0077   // Also, note that the number of usable slots in the queue at any
0078   // given time is actually (size-1), so if you start with an empty queue,
0079   // IsFull() will return true after size-1 insertions.
0080   explicit ProducerConsumerQueue(uint32_t size)
0081       : size_(size),
0082         records_(static_cast<T*>(std::malloc(sizeof(T) * size))),
0083         readIndex_(0),
0084         writeIndex_(0) {
0085     assert(size >= 2);
0086     if (!records_) {
0087       throw std::bad_alloc();
0088     }
0089   }
0090 
0091   ~ProducerConsumerQueue() {
0092     // We need to destruct anything that may still exist in our queue.
0093     // (No real synchronization needed at destructor time: only one
0094     // thread can be doing this.)
0095     if (!std::is_trivially_destructible<T>::value) {
0096       size_t readIndex = readIndex_;
0097       size_t endIndex = writeIndex_;
0098       while (readIndex != endIndex) {
0099         records_[readIndex].~T();
0100         if (++readIndex == size_) {
0101           readIndex = 0;
0102         }
0103       }
0104     }
0105 
0106     std::free(records_);
0107   }
0108 
0109   template <class... Args>
0110   bool Write(Args&&... recordArgs) {
0111     auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
0112     auto nextRecord = currentWrite + 1;
0113     if (nextRecord == size_) {
0114       nextRecord = 0;
0115     }
0116     if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
0117       new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
0118       writeIndex_.store(nextRecord, std::memory_order_release);
0119       return true;
0120     }
0121 
0122     // queue is full
0123     return false;
0124   }
0125 
0126   // move the value at the front of the queue to given variable
0127   bool Read(T& record) {
0128     auto const currentRead = readIndex_.load(std::memory_order_relaxed);
0129     if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
0130       // queue is empty
0131       return false;
0132     }
0133 
0134     auto nextRecord = currentRead + 1;
0135     if (nextRecord == size_) {
0136       nextRecord = 0;
0137     }
0138     record = std::move(records_[currentRead]);
0139     records_[currentRead].~T();
0140     readIndex_.store(nextRecord, std::memory_order_release);
0141     return true;
0142   }
0143 
0144   // pointer to the value at the front of the queue (for use in-place) or
0145   // nullptr if empty.
0146   T* FrontPtr() {
0147     auto const currentRead = readIndex_.load(std::memory_order_relaxed);
0148     if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
0149       // queue is empty
0150       return nullptr;
0151     }
0152     return &records_[currentRead];
0153   }
0154 
0155   // queue must not be empty
0156   void PopFront() {
0157     auto const currentRead = readIndex_.load(std::memory_order_relaxed);
0158     assert(currentRead != writeIndex_.load(std::memory_order_acquire));
0159 
0160     auto nextRecord = currentRead + 1;
0161     if (nextRecord == size_) {
0162       nextRecord = 0;
0163     }
0164     records_[currentRead].~T();
0165     readIndex_.store(nextRecord, std::memory_order_release);
0166   }
0167 
0168   bool IsEmpty() const {
0169     return readIndex_.load(std::memory_order_acquire) ==
0170            writeIndex_.load(std::memory_order_acquire);
0171   }
0172 
0173   bool IsFull() const {
0174     auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1;
0175     if (nextRecord == size_) {
0176       nextRecord = 0;
0177     }
0178     if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
0179       return false;
0180     }
0181     // queue is full
0182     return true;
0183   }
0184 
0185   // * If called by consumer, then true size may be more (because producer may
0186   //   be adding items concurrently).
0187   // * If called by producer, then true size may be less (because consumer may
0188   //   be removing items concurrently).
0189   // * It is undefined to call this from any other thread.
0190   size_t SizeGuess() const {
0191     int ret = writeIndex_.load(std::memory_order_acquire) -
0192               readIndex_.load(std::memory_order_acquire);
0193     if (ret < 0) {
0194       ret += size_;
0195     }
0196     return ret;
0197   }
0198 
0199   // maximum number of items in the queue.
0200   size_t capacity() const { return size_ - 1; }
0201 
0202  private:
0203   using AtomicIndex = std::atomic<unsigned int>;
0204 
0205   char pad0_[hardware_destructive_interference_size];
0206   const uint32_t size_;
0207   T* const records_;
0208 
0209   AtomicIndex readIndex_;
0210   char pad1_[hardware_destructive_interference_size - sizeof(AtomicIndex)];
0211   AtomicIndex writeIndex_;
0212 
0213   char pad2_[hardware_destructive_interference_size - sizeof(AtomicIndex)];
0214 };
0215 
0216 }  // namespace folly
0217 }  // namespace arrow_vendored