Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:59

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 // Slow stream implementations, mainly for testing and benchmarking
0019 
0020 #pragma once
0021 
0022 #include <cstdint>
0023 #include <memory>
0024 #include <utility>
0025 
0026 #include "arrow/io/interfaces.h"
0027 #include "arrow/util/visibility.h"
0028 
0029 namespace arrow {
0030 
0031 class Buffer;
0032 class Status;
0033 
0034 namespace io {
0035 
0036 class ARROW_EXPORT LatencyGenerator {
0037  public:
0038   virtual ~LatencyGenerator();
0039 
0040   void Sleep();
0041 
0042   virtual double NextLatency() = 0;
0043 
0044   static std::shared_ptr<LatencyGenerator> Make(double average_latency);
0045   static std::shared_ptr<LatencyGenerator> Make(double average_latency, int32_t seed);
0046 };
0047 
0048 // XXX use ConcurrencyWrapper?  It could increase chances of finding a race.
0049 
0050 template <class StreamType>
0051 class SlowInputStreamBase : public StreamType {
0052  public:
0053   SlowInputStreamBase(std::shared_ptr<StreamType> stream,
0054                       std::shared_ptr<LatencyGenerator> latencies)
0055       : stream_(std::move(stream)), latencies_(std::move(latencies)) {}
0056 
0057   SlowInputStreamBase(std::shared_ptr<StreamType> stream, double average_latency)
0058       : stream_(std::move(stream)), latencies_(LatencyGenerator::Make(average_latency)) {}
0059 
0060   SlowInputStreamBase(std::shared_ptr<StreamType> stream, double average_latency,
0061                       int32_t seed)
0062       : stream_(std::move(stream)),
0063         latencies_(LatencyGenerator::Make(average_latency, seed)) {}
0064 
0065  protected:
0066   std::shared_ptr<StreamType> stream_;
0067   std::shared_ptr<LatencyGenerator> latencies_;
0068 };
0069 
0070 /// \brief An InputStream wrapper that makes reads slower.
0071 ///
0072 /// Read() calls are made slower by an average latency (in seconds).
0073 /// Actual latencies form a normal distribution closely centered
0074 /// on the average latency.
0075 /// Other calls are forwarded directly.
0076 class ARROW_EXPORT SlowInputStream : public SlowInputStreamBase<InputStream> {
0077  public:
0078   ~SlowInputStream() override;
0079 
0080   using SlowInputStreamBase<InputStream>::SlowInputStreamBase;
0081 
0082   Status Close() override;
0083   Status Abort() override;
0084   bool closed() const override;
0085 
0086   Result<int64_t> Read(int64_t nbytes, void* out) override;
0087   Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override;
0088   Result<std::string_view> Peek(int64_t nbytes) override;
0089 
0090   Result<int64_t> Tell() const override;
0091 };
0092 
0093 /// \brief A RandomAccessFile wrapper that makes reads slower.
0094 ///
0095 /// Similar to SlowInputStream, but allows random access and seeking.
0096 class ARROW_EXPORT SlowRandomAccessFile : public SlowInputStreamBase<RandomAccessFile> {
0097  public:
0098   ~SlowRandomAccessFile() override;
0099 
0100   using SlowInputStreamBase<RandomAccessFile>::SlowInputStreamBase;
0101 
0102   Status Close() override;
0103   Status Abort() override;
0104   bool closed() const override;
0105 
0106   Result<int64_t> Read(int64_t nbytes, void* out) override;
0107   Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override;
0108   Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override;
0109   Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override;
0110   Result<std::string_view> Peek(int64_t nbytes) override;
0111 
0112   Result<int64_t> GetSize() override;
0113   Status Seek(int64_t position) override;
0114   Result<int64_t> Tell() const override;
0115 };
0116 
0117 }  // namespace io
0118 }  // namespace arrow