![]() |
|
|||
File indexing completed on 2025-08-28 08:26:59
0001 // Licensed to the Apache Software Foundation (ASF) under one 0002 // or more contributor license agreements. See the NOTICE file 0003 // distributed with this work for additional information 0004 // regarding copyright ownership. The ASF licenses this file 0005 // to you under the Apache License, Version 2.0 (the 0006 // "License"); you may not use this file except in compliance 0007 // with the License. You may obtain a copy of the License at 0008 // 0009 // http://www.apache.org/licenses/LICENSE-2.0 0010 // 0011 // Unless required by applicable law or agreed to in writing, 0012 // software distributed under the License is distributed on an 0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 0014 // KIND, either express or implied. See the License for the 0015 // specific language governing permissions and limitations 0016 // under the License. 0017 0018 #pragma once 0019 0020 #include <cstdint> 0021 #include <memory> 0022 #include <string> 0023 #include <string_view> 0024 #include <vector> 0025 0026 #include "arrow/io/type_fwd.h" 0027 #include "arrow/type_fwd.h" 0028 #include "arrow/util/cancel.h" 0029 #include "arrow/util/macros.h" 0030 #include "arrow/util/type_fwd.h" 0031 #include "arrow/util/visibility.h" 0032 0033 namespace arrow { 0034 namespace io { 0035 0036 struct ReadRange { 0037 int64_t offset; 0038 int64_t length; 0039 0040 friend bool operator==(const ReadRange& left, const ReadRange& right) { 0041 return (left.offset == right.offset && left.length == right.length); 0042 } 0043 friend bool operator!=(const ReadRange& left, const ReadRange& right) { 0044 return !(left == right); 0045 } 0046 0047 bool Contains(const ReadRange& other) const { 0048 return (offset <= other.offset && offset + length >= other.offset + other.length); 0049 } 0050 }; 0051 0052 /// EXPERIMENTAL: options provider for IO tasks 0053 /// 0054 /// Includes an Executor (which will be used to execute asynchronous reads), 0055 /// a MemoryPool (which will be used to allocate buffers when zero copy reads 0056 /// are not possible), and an external id (in case the executor receives tasks from 0057 /// multiple sources and must distinguish tasks associated with this IOContext). 0058 struct ARROW_EXPORT IOContext { 0059 // No specified executor: will use a global IO thread pool 0060 IOContext() : IOContext(default_memory_pool(), StopToken::Unstoppable()) {} 0061 0062 explicit IOContext(StopToken stop_token) 0063 : IOContext(default_memory_pool(), std::move(stop_token)) {} 0064 0065 explicit IOContext(MemoryPool* pool, StopToken stop_token = StopToken::Unstoppable()); 0066 0067 explicit IOContext(MemoryPool* pool, ::arrow::internal::Executor* executor, 0068 StopToken stop_token = StopToken::Unstoppable(), 0069 int64_t external_id = -1) 0070 : pool_(pool), 0071 executor_(executor), 0072 external_id_(external_id), 0073 stop_token_(std::move(stop_token)) {} 0074 0075 explicit IOContext(::arrow::internal::Executor* executor, 0076 StopToken stop_token = StopToken::Unstoppable(), 0077 int64_t external_id = -1) 0078 : pool_(default_memory_pool()), 0079 executor_(executor), 0080 external_id_(external_id), 0081 stop_token_(std::move(stop_token)) {} 0082 0083 MemoryPool* pool() const { return pool_; } 0084 0085 ::arrow::internal::Executor* executor() const { return executor_; } 0086 0087 // An application-specific ID, forwarded to executor task submissions 0088 int64_t external_id() const { return external_id_; } 0089 0090 StopToken stop_token() const { return stop_token_; } 0091 0092 private: 0093 MemoryPool* pool_; 0094 ::arrow::internal::Executor* executor_; 0095 int64_t external_id_; 0096 StopToken stop_token_; 0097 }; 0098 0099 class ARROW_EXPORT FileInterface : public std::enable_shared_from_this<FileInterface> { 0100 public: 0101 virtual ~FileInterface() = 0; 0102 0103 /// \brief Close the stream cleanly 0104 /// 0105 /// For writable streams, this will attempt to flush any pending data 0106 /// before releasing the underlying resource. 0107 /// 0108 /// After Close() is called, closed() returns true and the stream is not 0109 /// available for further operations. 0110 virtual Status Close() = 0; 0111 0112 /// \brief Close the stream asynchronously 0113 /// 0114 /// By default, this will just submit the synchronous Close() to the 0115 /// default I/O thread pool. Subclasses may implement this in a more 0116 /// efficient manner. 0117 virtual Future<> CloseAsync(); 0118 0119 /// \brief Close the stream abruptly 0120 /// 0121 /// This method does not guarantee that any pending data is flushed. 0122 /// It merely releases any underlying resource used by the stream for 0123 /// its operation. 0124 /// 0125 /// After Abort() is called, closed() returns true and the stream is not 0126 /// available for further operations. 0127 virtual Status Abort(); 0128 0129 /// \brief Return the position in this stream 0130 virtual Result<int64_t> Tell() const = 0; 0131 0132 /// \brief Return whether the stream is closed 0133 virtual bool closed() const = 0; 0134 0135 FileMode::type mode() const { return mode_; } 0136 0137 protected: 0138 FileInterface() : mode_(FileMode::READ) {} 0139 FileMode::type mode_; 0140 void set_mode(FileMode::type mode) { mode_ = mode; } 0141 0142 private: 0143 ARROW_DISALLOW_COPY_AND_ASSIGN(FileInterface); 0144 }; 0145 0146 class ARROW_EXPORT Seekable { 0147 public: 0148 virtual ~Seekable() = default; 0149 virtual Status Seek(int64_t position) = 0; 0150 }; 0151 0152 class ARROW_EXPORT Writable { 0153 public: 0154 virtual ~Writable() = default; 0155 0156 /// \brief Write the given data to the stream 0157 /// 0158 /// This method always processes the bytes in full. Depending on the 0159 /// semantics of the stream, the data may be written out immediately, 0160 /// held in a buffer, or written asynchronously. In the case where 0161 /// the stream buffers the data, it will be copied. To avoid potentially 0162 /// large copies, use the Write variant that takes an owned Buffer. 0163 virtual Status Write(const void* data, int64_t nbytes) = 0; 0164 0165 /// \brief Write the given data to the stream 0166 /// 0167 /// Since the Buffer owns its memory, this method can avoid a copy if 0168 /// buffering is required. See Write(const void*, int64_t) for details. 0169 virtual Status Write(const std::shared_ptr<Buffer>& data); 0170 0171 /// \brief Flush buffered bytes, if any 0172 virtual Status Flush(); 0173 0174 Status Write(std::string_view data); 0175 }; 0176 0177 class ARROW_EXPORT Readable { 0178 public: 0179 virtual ~Readable() = default; 0180 0181 /// \brief Read data from current file position. 0182 /// 0183 /// Read at most `nbytes` from the current file position into `out`. 0184 /// The number of bytes read is returned. 0185 virtual Result<int64_t> Read(int64_t nbytes, void* out) = 0; 0186 0187 /// \brief Read data from current file position. 0188 /// 0189 /// Read at most `nbytes` from the current file position. Less bytes may 0190 /// be read if EOF is reached. This method updates the current file position. 0191 /// 0192 /// In some cases (e.g. a memory-mapped file), this method may avoid a 0193 /// memory copy. 0194 virtual Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) = 0; 0195 0196 /// EXPERIMENTAL: The IOContext associated with this file. 0197 /// 0198 /// By default, this is the same as default_io_context(), but it may be 0199 /// overridden by subclasses. 0200 virtual const IOContext& io_context() const; 0201 }; 0202 0203 class ARROW_EXPORT OutputStream : virtual public FileInterface, public Writable { 0204 protected: 0205 OutputStream() = default; 0206 }; 0207 0208 class ARROW_EXPORT InputStream : virtual public FileInterface, virtual public Readable { 0209 public: 0210 /// \brief Advance or skip stream indicated number of bytes 0211 /// \param[in] nbytes the number to move forward 0212 /// \return Status 0213 Status Advance(int64_t nbytes); 0214 0215 /// \brief Return zero-copy string_view to upcoming bytes. 0216 /// 0217 /// Do not modify the stream position. The view becomes invalid after 0218 /// any operation on the stream. May trigger buffering if the requested 0219 /// size is larger than the number of buffered bytes. 0220 /// 0221 /// May return NotImplemented on streams that don't support it. 0222 /// 0223 /// \param[in] nbytes the maximum number of bytes to see 0224 virtual Result<std::string_view> Peek(int64_t nbytes); 0225 0226 /// \brief Return true if InputStream is capable of zero copy Buffer reads 0227 /// 0228 /// Zero copy reads imply the use of Buffer-returning Read() overloads. 0229 virtual bool supports_zero_copy() const; 0230 0231 /// \brief Read and return stream metadata 0232 /// 0233 /// If the stream implementation doesn't support metadata, empty metadata 0234 /// is returned. Note that it is allowed to return a null pointer rather 0235 /// than an allocated empty metadata. 0236 virtual Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata(); 0237 0238 /// \brief Read stream metadata asynchronously 0239 virtual Future<std::shared_ptr<const KeyValueMetadata>> ReadMetadataAsync( 0240 const IOContext& io_context); 0241 Future<std::shared_ptr<const KeyValueMetadata>> ReadMetadataAsync(); 0242 0243 protected: 0244 InputStream() = default; 0245 }; 0246 0247 class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable { 0248 public: 0249 /// Necessary because we hold a std::unique_ptr 0250 ~RandomAccessFile() override; 0251 0252 /// \brief Create an isolated InputStream that reads a segment of a 0253 /// RandomAccessFile. Multiple such stream can be created and used 0254 /// independently without interference 0255 /// \param[in] file a file instance 0256 /// \param[in] file_offset the starting position in the file 0257 /// \param[in] nbytes the extent of bytes to read. The file should have 0258 /// sufficient bytes available 0259 static Result<std::shared_ptr<InputStream>> GetStream( 0260 std::shared_ptr<RandomAccessFile> file, int64_t file_offset, int64_t nbytes); 0261 0262 /// \brief Return the total file size in bytes. 0263 /// 0264 /// This method does not read or move the current file position, so is safe 0265 /// to call concurrently with e.g. ReadAt(). 0266 virtual Result<int64_t> GetSize() = 0; 0267 0268 /// \brief Read data from given file position. 0269 /// 0270 /// At most `nbytes` bytes are read. The number of bytes read is returned 0271 /// (it can be less than `nbytes` if EOF is reached). 0272 /// 0273 /// This method can be safely called from multiple threads concurrently. 0274 /// It is unspecified whether this method updates the file position or not. 0275 /// 0276 /// The default RandomAccessFile-provided implementation uses Seek() and Read(), 0277 /// but subclasses may override it with a more efficient implementation 0278 /// that doesn't depend on implicit file positioning. 0279 /// 0280 /// \param[in] position Where to read bytes from 0281 /// \param[in] nbytes The number of bytes to read 0282 /// \param[out] out The buffer to read bytes into 0283 /// \return The number of bytes read, or an error 0284 virtual Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out); 0285 0286 /// \brief Read data from given file position. 0287 /// 0288 /// At most `nbytes` bytes are read, but it can be less if EOF is reached. 0289 /// 0290 /// \param[in] position Where to read bytes from 0291 /// \param[in] nbytes The number of bytes to read 0292 /// \return A buffer containing the bytes read, or an error 0293 virtual Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes); 0294 0295 /// EXPERIMENTAL: Read data asynchronously. 0296 virtual Future<std::shared_ptr<Buffer>> ReadAsync(const IOContext&, int64_t position, 0297 int64_t nbytes); 0298 0299 /// EXPERIMENTAL: Read data asynchronously, using the file's IOContext. 0300 Future<std::shared_ptr<Buffer>> ReadAsync(int64_t position, int64_t nbytes); 0301 0302 /// EXPERIMENTAL: Explicit multi-read. 0303 /// \brief Request multiple reads at once 0304 /// 0305 /// The underlying filesystem may optimize these reads by coalescing small reads into 0306 /// large reads or by breaking up large reads into multiple parallel smaller reads. The 0307 /// reads should be issued in parallel if it makes sense for the filesystem. 0308 /// 0309 /// One future will be returned for each input read range. Multiple returned futures 0310 /// may correspond to a single read. Or, a single returned future may be a combined 0311 /// result of several individual reads. 0312 /// 0313 /// \param[in] ranges The ranges to read 0314 /// \return A future that will complete with the data from the requested range is 0315 /// available 0316 virtual std::vector<Future<std::shared_ptr<Buffer>>> ReadManyAsync( 0317 const IOContext&, const std::vector<ReadRange>& ranges); 0318 0319 /// EXPERIMENTAL: Explicit multi-read, using the file's IOContext. 0320 std::vector<Future<std::shared_ptr<Buffer>>> ReadManyAsync( 0321 const std::vector<ReadRange>& ranges); 0322 0323 /// EXPERIMENTAL: Inform that the given ranges may be read soon. 0324 /// 0325 /// Some implementations might arrange to prefetch some of the data. 0326 /// However, no guarantee is made and the default implementation does nothing. 0327 /// For robust prefetching, use ReadAt() or ReadAsync(). 0328 virtual Status WillNeed(const std::vector<ReadRange>& ranges); 0329 0330 protected: 0331 RandomAccessFile(); 0332 0333 private: 0334 struct ARROW_NO_EXPORT Impl; 0335 std::unique_ptr<Impl> interface_impl_; 0336 }; 0337 0338 class ARROW_EXPORT WritableFile : public OutputStream, public Seekable { 0339 public: 0340 virtual Status WriteAt(int64_t position, const void* data, int64_t nbytes) = 0; 0341 0342 protected: 0343 WritableFile() = default; 0344 }; 0345 0346 class ARROW_EXPORT ReadWriteFileInterface : public RandomAccessFile, public WritableFile { 0347 protected: 0348 ReadWriteFileInterface() { RandomAccessFile::set_mode(FileMode::READWRITE); } 0349 }; 0350 0351 /// \brief Return an iterator on an input stream 0352 /// 0353 /// The iterator yields a fixed-size block on each Next() call, except the 0354 /// last block in the stream which may be smaller. 0355 /// Once the end of stream is reached, Next() returns nullptr 0356 /// (unlike InputStream::Read() which returns an empty buffer). 0357 ARROW_EXPORT 0358 Result<Iterator<std::shared_ptr<Buffer>>> MakeInputStreamIterator( 0359 std::shared_ptr<InputStream> stream, int64_t block_size); 0360 0361 } // namespace io 0362 } // namespace arrow
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
![]() ![]() |