File indexing completed on 2025-08-28 08:26:59
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020 #pragma once
0021
0022 #include <cstdint>
0023 #include <memory>
0024 #include <string_view>
0025 #include <vector>
0026
0027 #include "arrow/io/concurrency.h"
0028 #include "arrow/io/interfaces.h"
0029 #include "arrow/type_fwd.h"
0030 #include "arrow/util/visibility.h"
0031
0032 namespace arrow {
0033
0034 class Status;
0035
0036 namespace io {
0037
0038
0039 class ARROW_EXPORT BufferOutputStream : public OutputStream {
0040 public:
0041 explicit BufferOutputStream(const std::shared_ptr<ResizableBuffer>& buffer);
0042
0043
0044
0045
0046
0047
0048
0049 static Result<std::shared_ptr<BufferOutputStream>> Create(
0050 int64_t initial_capacity = 4096, MemoryPool* pool = default_memory_pool());
0051
0052 ~BufferOutputStream() override;
0053
0054
0055
0056
0057 Status Close() override;
0058 bool closed() const override;
0059 Result<int64_t> Tell() const override;
0060 Status Write(const void* data, int64_t nbytes) override;
0061
0062
0063 using OutputStream::Write;
0064
0065
0066
0067 Result<std::shared_ptr<Buffer>> Finish();
0068
0069
0070
0071
0072
0073
0074 Status Reset(int64_t initial_capacity = 1024, MemoryPool* pool = default_memory_pool());
0075
0076 int64_t capacity() const { return capacity_; }
0077
0078 private:
0079 BufferOutputStream();
0080
0081
0082 Status Reserve(int64_t nbytes);
0083
0084 std::shared_ptr<ResizableBuffer> buffer_;
0085 bool is_open_;
0086 int64_t capacity_;
0087 int64_t position_;
0088 uint8_t* mutable_data_;
0089 };
0090
0091
0092
0093
0094
0095
0096 class ARROW_EXPORT MockOutputStream : public OutputStream {
0097 public:
0098 MockOutputStream() : extent_bytes_written_(0), is_open_(true) {}
0099
0100
0101 Status Close() override;
0102 bool closed() const override;
0103 Result<int64_t> Tell() const override;
0104 Status Write(const void* data, int64_t nbytes) override;
0105
0106 using Writable::Write;
0107
0108
0109 int64_t GetExtentBytesWritten() const { return extent_bytes_written_; }
0110
0111 private:
0112 int64_t extent_bytes_written_;
0113 bool is_open_;
0114 };
0115
0116
0117 class ARROW_EXPORT FixedSizeBufferWriter : public WritableFile {
0118 public:
0119
0120 explicit FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer);
0121 ~FixedSizeBufferWriter() override;
0122
0123 Status Close() override;
0124 bool closed() const override;
0125 Status Seek(int64_t position) override;
0126 Result<int64_t> Tell() const override;
0127 Status Write(const void* data, int64_t nbytes) override;
0128
0129 using Writable::Write;
0130
0131
0132 Status WriteAt(int64_t position, const void* data, int64_t nbytes) override;
0133
0134 void set_memcopy_threads(int num_threads);
0135 void set_memcopy_blocksize(int64_t blocksize);
0136 void set_memcopy_threshold(int64_t threshold);
0137
0138 protected:
0139 class FixedSizeBufferWriterImpl;
0140 std::unique_ptr<FixedSizeBufferWriterImpl> impl_;
0141 };
0142
0143
0144
0145 class ARROW_EXPORT BufferReader
0146 : public internal::RandomAccessFileConcurrencyWrapper<BufferReader> {
0147 public:
0148
0149
0150
0151 explicit BufferReader(std::shared_ptr<Buffer> buffer);
0152 ARROW_DEPRECATED(
0153 "Deprecated in 14.0.0. Use FromString or BufferReader(std::shared_ptr<Buffer> "
0154 "buffer) instead.")
0155 explicit BufferReader(const Buffer& buffer);
0156 ARROW_DEPRECATED(
0157 "Deprecated in 14.0.0. Use FromString or BufferReader(std::shared_ptr<Buffer> "
0158 "buffer) instead.")
0159 BufferReader(const uint8_t* data, int64_t size);
0160
0161
0162
0163
0164 ARROW_DEPRECATED(
0165 "Deprecated in 14.0.0. Use FromString or BufferReader(std::shared_ptr<Buffer> "
0166 "buffer) instead.")
0167 explicit BufferReader(std::string_view data);
0168
0169
0170 static std::unique_ptr<BufferReader> FromString(std::string data);
0171
0172 bool closed() const override;
0173
0174 bool supports_zero_copy() const override;
0175
0176 std::shared_ptr<Buffer> buffer() const { return buffer_; }
0177
0178
0179 Future<std::shared_ptr<Buffer>> ReadAsync(const IOContext&, int64_t position,
0180 int64_t nbytes) override;
0181 Status WillNeed(const std::vector<ReadRange>& ranges) override;
0182
0183 protected:
0184 friend RandomAccessFileConcurrencyWrapper<BufferReader>;
0185
0186 Status DoClose();
0187
0188 Result<int64_t> DoRead(int64_t nbytes, void* buffer);
0189 Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes);
0190 Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, void* out);
0191 Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes);
0192 Result<std::string_view> DoPeek(int64_t nbytes) override;
0193
0194 Result<int64_t> DoTell() const;
0195 Status DoSeek(int64_t position);
0196 Result<int64_t> DoGetSize();
0197
0198 Status CheckClosed() const {
0199 if (!is_open_) {
0200 return Status::Invalid("Operation forbidden on closed BufferReader");
0201 }
0202 return Status::OK();
0203 }
0204
0205 std::shared_ptr<Buffer> buffer_;
0206 const uint8_t* data_;
0207 int64_t size_;
0208 int64_t position_;
0209 bool is_open_;
0210 };
0211
0212 }
0213 }