File indexing completed on 2025-08-28 08:26:59
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020 #pragma once
0021
0022 #include <cstdint>
0023 #include <memory>
0024 #include <utility>
0025
0026 #include "arrow/io/interfaces.h"
0027 #include "arrow/util/visibility.h"
0028
0029 namespace arrow {
0030
0031 class Buffer;
0032 class Status;
0033
0034 namespace io {
0035
0036 class ARROW_EXPORT LatencyGenerator {
0037 public:
0038 virtual ~LatencyGenerator();
0039
0040 void Sleep();
0041
0042 virtual double NextLatency() = 0;
0043
0044 static std::shared_ptr<LatencyGenerator> Make(double average_latency);
0045 static std::shared_ptr<LatencyGenerator> Make(double average_latency, int32_t seed);
0046 };
0047
0048
0049
0050 template <class StreamType>
0051 class SlowInputStreamBase : public StreamType {
0052 public:
0053 SlowInputStreamBase(std::shared_ptr<StreamType> stream,
0054 std::shared_ptr<LatencyGenerator> latencies)
0055 : stream_(std::move(stream)), latencies_(std::move(latencies)) {}
0056
0057 SlowInputStreamBase(std::shared_ptr<StreamType> stream, double average_latency)
0058 : stream_(std::move(stream)), latencies_(LatencyGenerator::Make(average_latency)) {}
0059
0060 SlowInputStreamBase(std::shared_ptr<StreamType> stream, double average_latency,
0061 int32_t seed)
0062 : stream_(std::move(stream)),
0063 latencies_(LatencyGenerator::Make(average_latency, seed)) {}
0064
0065 protected:
0066 std::shared_ptr<StreamType> stream_;
0067 std::shared_ptr<LatencyGenerator> latencies_;
0068 };
0069
0070
0071
0072
0073
0074
0075
0076 class ARROW_EXPORT SlowInputStream : public SlowInputStreamBase<InputStream> {
0077 public:
0078 ~SlowInputStream() override;
0079
0080 using SlowInputStreamBase<InputStream>::SlowInputStreamBase;
0081
0082 Status Close() override;
0083 Status Abort() override;
0084 bool closed() const override;
0085
0086 Result<int64_t> Read(int64_t nbytes, void* out) override;
0087 Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override;
0088 Result<std::string_view> Peek(int64_t nbytes) override;
0089
0090 Result<int64_t> Tell() const override;
0091 };
0092
0093
0094
0095
0096 class ARROW_EXPORT SlowRandomAccessFile : public SlowInputStreamBase<RandomAccessFile> {
0097 public:
0098 ~SlowRandomAccessFile() override;
0099
0100 using SlowInputStreamBase<RandomAccessFile>::SlowInputStreamBase;
0101
0102 Status Close() override;
0103 Status Abort() override;
0104 bool closed() const override;
0105
0106 Result<int64_t> Read(int64_t nbytes, void* out) override;
0107 Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override;
0108 Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override;
0109 Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override;
0110 Result<std::string_view> Peek(int64_t nbytes) override;
0111
0112 Result<int64_t> GetSize() override;
0113 Status Seek(int64_t position) override;
0114 Result<int64_t> Tell() const override;
0115 };
0116
0117 }
0118 }