Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:59

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <cstdint>
0021 #include <memory>
0022 #include <string>
0023 #include <string_view>
0024 #include <vector>
0025 
0026 #include "arrow/io/type_fwd.h"
0027 #include "arrow/type_fwd.h"
0028 #include "arrow/util/cancel.h"
0029 #include "arrow/util/macros.h"
0030 #include "arrow/util/type_fwd.h"
0031 #include "arrow/util/visibility.h"
0032 
0033 namespace arrow {
0034 namespace io {
0035 
0036 struct ReadRange {
0037   int64_t offset;
0038   int64_t length;
0039 
0040   friend bool operator==(const ReadRange& left, const ReadRange& right) {
0041     return (left.offset == right.offset && left.length == right.length);
0042   }
0043   friend bool operator!=(const ReadRange& left, const ReadRange& right) {
0044     return !(left == right);
0045   }
0046 
0047   bool Contains(const ReadRange& other) const {
0048     return (offset <= other.offset && offset + length >= other.offset + other.length);
0049   }
0050 };
0051 
0052 /// EXPERIMENTAL: options provider for IO tasks
0053 ///
0054 /// Includes an Executor (which will be used to execute asynchronous reads),
0055 /// a MemoryPool (which will be used to allocate buffers when zero copy reads
0056 /// are not possible), and an external id (in case the executor receives tasks from
0057 /// multiple sources and must distinguish tasks associated with this IOContext).
0058 struct ARROW_EXPORT IOContext {
0059   // No specified executor: will use a global IO thread pool
0060   IOContext() : IOContext(default_memory_pool(), StopToken::Unstoppable()) {}
0061 
0062   explicit IOContext(StopToken stop_token)
0063       : IOContext(default_memory_pool(), std::move(stop_token)) {}
0064 
0065   explicit IOContext(MemoryPool* pool, StopToken stop_token = StopToken::Unstoppable());
0066 
0067   explicit IOContext(MemoryPool* pool, ::arrow::internal::Executor* executor,
0068                      StopToken stop_token = StopToken::Unstoppable(),
0069                      int64_t external_id = -1)
0070       : pool_(pool),
0071         executor_(executor),
0072         external_id_(external_id),
0073         stop_token_(std::move(stop_token)) {}
0074 
0075   explicit IOContext(::arrow::internal::Executor* executor,
0076                      StopToken stop_token = StopToken::Unstoppable(),
0077                      int64_t external_id = -1)
0078       : pool_(default_memory_pool()),
0079         executor_(executor),
0080         external_id_(external_id),
0081         stop_token_(std::move(stop_token)) {}
0082 
0083   MemoryPool* pool() const { return pool_; }
0084 
0085   ::arrow::internal::Executor* executor() const { return executor_; }
0086 
0087   // An application-specific ID, forwarded to executor task submissions
0088   int64_t external_id() const { return external_id_; }
0089 
0090   StopToken stop_token() const { return stop_token_; }
0091 
0092  private:
0093   MemoryPool* pool_;
0094   ::arrow::internal::Executor* executor_;
0095   int64_t external_id_;
0096   StopToken stop_token_;
0097 };
0098 
0099 class ARROW_EXPORT FileInterface : public std::enable_shared_from_this<FileInterface> {
0100  public:
0101   virtual ~FileInterface() = 0;
0102 
0103   /// \brief Close the stream cleanly
0104   ///
0105   /// For writable streams, this will attempt to flush any pending data
0106   /// before releasing the underlying resource.
0107   ///
0108   /// After Close() is called, closed() returns true and the stream is not
0109   /// available for further operations.
0110   virtual Status Close() = 0;
0111 
0112   /// \brief Close the stream asynchronously
0113   ///
0114   /// By default, this will just submit the synchronous Close() to the
0115   /// default I/O thread pool. Subclasses may implement this in a more
0116   /// efficient manner.
0117   virtual Future<> CloseAsync();
0118 
0119   /// \brief Close the stream abruptly
0120   ///
0121   /// This method does not guarantee that any pending data is flushed.
0122   /// It merely releases any underlying resource used by the stream for
0123   /// its operation.
0124   ///
0125   /// After Abort() is called, closed() returns true and the stream is not
0126   /// available for further operations.
0127   virtual Status Abort();
0128 
0129   /// \brief Return the position in this stream
0130   virtual Result<int64_t> Tell() const = 0;
0131 
0132   /// \brief Return whether the stream is closed
0133   virtual bool closed() const = 0;
0134 
0135   FileMode::type mode() const { return mode_; }
0136 
0137  protected:
0138   FileInterface() : mode_(FileMode::READ) {}
0139   FileMode::type mode_;
0140   void set_mode(FileMode::type mode) { mode_ = mode; }
0141 
0142  private:
0143   ARROW_DISALLOW_COPY_AND_ASSIGN(FileInterface);
0144 };
0145 
0146 class ARROW_EXPORT Seekable {
0147  public:
0148   virtual ~Seekable() = default;
0149   virtual Status Seek(int64_t position) = 0;
0150 };
0151 
0152 class ARROW_EXPORT Writable {
0153  public:
0154   virtual ~Writable() = default;
0155 
0156   /// \brief Write the given data to the stream
0157   ///
0158   /// This method always processes the bytes in full.  Depending on the
0159   /// semantics of the stream, the data may be written out immediately,
0160   /// held in a buffer, or written asynchronously.  In the case where
0161   /// the stream buffers the data, it will be copied.  To avoid potentially
0162   /// large copies, use the Write variant that takes an owned Buffer.
0163   virtual Status Write(const void* data, int64_t nbytes) = 0;
0164 
0165   /// \brief Write the given data to the stream
0166   ///
0167   /// Since the Buffer owns its memory, this method can avoid a copy if
0168   /// buffering is required.  See Write(const void*, int64_t) for details.
0169   virtual Status Write(const std::shared_ptr<Buffer>& data);
0170 
0171   /// \brief Flush buffered bytes, if any
0172   virtual Status Flush();
0173 
0174   Status Write(std::string_view data);
0175 };
0176 
0177 class ARROW_EXPORT Readable {
0178  public:
0179   virtual ~Readable() = default;
0180 
0181   /// \brief Read data from current file position.
0182   ///
0183   /// Read at most `nbytes` from the current file position into `out`.
0184   /// The number of bytes read is returned.
0185   virtual Result<int64_t> Read(int64_t nbytes, void* out) = 0;
0186 
0187   /// \brief Read data from current file position.
0188   ///
0189   /// Read at most `nbytes` from the current file position. Less bytes may
0190   /// be read if EOF is reached. This method updates the current file position.
0191   ///
0192   /// In some cases (e.g. a memory-mapped file), this method may avoid a
0193   /// memory copy.
0194   virtual Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) = 0;
0195 
0196   /// EXPERIMENTAL: The IOContext associated with this file.
0197   ///
0198   /// By default, this is the same as default_io_context(), but it may be
0199   /// overridden by subclasses.
0200   virtual const IOContext& io_context() const;
0201 };
0202 
0203 class ARROW_EXPORT OutputStream : virtual public FileInterface, public Writable {
0204  protected:
0205   OutputStream() = default;
0206 };
0207 
0208 class ARROW_EXPORT InputStream : virtual public FileInterface, virtual public Readable {
0209  public:
0210   /// \brief Advance or skip stream indicated number of bytes
0211   /// \param[in] nbytes the number to move forward
0212   /// \return Status
0213   Status Advance(int64_t nbytes);
0214 
0215   /// \brief Return zero-copy string_view to upcoming bytes.
0216   ///
0217   /// Do not modify the stream position.  The view becomes invalid after
0218   /// any operation on the stream.  May trigger buffering if the requested
0219   /// size is larger than the number of buffered bytes.
0220   ///
0221   /// May return NotImplemented on streams that don't support it.
0222   ///
0223   /// \param[in] nbytes the maximum number of bytes to see
0224   virtual Result<std::string_view> Peek(int64_t nbytes);
0225 
0226   /// \brief Return true if InputStream is capable of zero copy Buffer reads
0227   ///
0228   /// Zero copy reads imply the use of Buffer-returning Read() overloads.
0229   virtual bool supports_zero_copy() const;
0230 
0231   /// \brief Read and return stream metadata
0232   ///
0233   /// If the stream implementation doesn't support metadata, empty metadata
0234   /// is returned.  Note that it is allowed to return a null pointer rather
0235   /// than an allocated empty metadata.
0236   virtual Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata();
0237 
0238   /// \brief Read stream metadata asynchronously
0239   virtual Future<std::shared_ptr<const KeyValueMetadata>> ReadMetadataAsync(
0240       const IOContext& io_context);
0241   Future<std::shared_ptr<const KeyValueMetadata>> ReadMetadataAsync();
0242 
0243  protected:
0244   InputStream() = default;
0245 };
0246 
0247 class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable {
0248  public:
0249   /// Necessary because we hold a std::unique_ptr
0250   ~RandomAccessFile() override;
0251 
0252   /// \brief Create an isolated InputStream that reads a segment of a
0253   /// RandomAccessFile. Multiple such stream can be created and used
0254   /// independently without interference
0255   /// \param[in] file a file instance
0256   /// \param[in] file_offset the starting position in the file
0257   /// \param[in] nbytes the extent of bytes to read. The file should have
0258   /// sufficient bytes available
0259   static Result<std::shared_ptr<InputStream>> GetStream(
0260       std::shared_ptr<RandomAccessFile> file, int64_t file_offset, int64_t nbytes);
0261 
0262   /// \brief Return the total file size in bytes.
0263   ///
0264   /// This method does not read or move the current file position, so is safe
0265   /// to call concurrently with e.g. ReadAt().
0266   virtual Result<int64_t> GetSize() = 0;
0267 
0268   /// \brief Read data from given file position.
0269   ///
0270   /// At most `nbytes` bytes are read.  The number of bytes read is returned
0271   /// (it can be less than `nbytes` if EOF is reached).
0272   ///
0273   /// This method can be safely called from multiple threads concurrently.
0274   /// It is unspecified whether this method updates the file position or not.
0275   ///
0276   /// The default RandomAccessFile-provided implementation uses Seek() and Read(),
0277   /// but subclasses may override it with a more efficient implementation
0278   /// that doesn't depend on implicit file positioning.
0279   ///
0280   /// \param[in] position Where to read bytes from
0281   /// \param[in] nbytes The number of bytes to read
0282   /// \param[out] out The buffer to read bytes into
0283   /// \return The number of bytes read, or an error
0284   virtual Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out);
0285 
0286   /// \brief Read data from given file position.
0287   ///
0288   /// At most `nbytes` bytes are read, but it can be less if EOF is reached.
0289   ///
0290   /// \param[in] position Where to read bytes from
0291   /// \param[in] nbytes The number of bytes to read
0292   /// \return A buffer containing the bytes read, or an error
0293   virtual Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes);
0294 
0295   /// EXPERIMENTAL: Read data asynchronously.
0296   virtual Future<std::shared_ptr<Buffer>> ReadAsync(const IOContext&, int64_t position,
0297                                                     int64_t nbytes);
0298 
0299   /// EXPERIMENTAL: Read data asynchronously, using the file's IOContext.
0300   Future<std::shared_ptr<Buffer>> ReadAsync(int64_t position, int64_t nbytes);
0301 
0302   /// EXPERIMENTAL: Explicit multi-read.
0303   /// \brief Request multiple reads at once
0304   ///
0305   /// The underlying filesystem may optimize these reads by coalescing small reads into
0306   /// large reads or by breaking up large reads into multiple parallel smaller reads.  The
0307   /// reads should be issued in parallel if it makes sense for the filesystem.
0308   ///
0309   /// One future will be returned for each input read range.  Multiple returned futures
0310   /// may correspond to a single read.  Or, a single returned future may be a combined
0311   /// result of several individual reads.
0312   ///
0313   /// \param[in] ranges The ranges to read
0314   /// \return A future that will complete with the data from the requested range is
0315   /// available
0316   virtual std::vector<Future<std::shared_ptr<Buffer>>> ReadManyAsync(
0317       const IOContext&, const std::vector<ReadRange>& ranges);
0318 
0319   /// EXPERIMENTAL: Explicit multi-read, using the file's IOContext.
0320   std::vector<Future<std::shared_ptr<Buffer>>> ReadManyAsync(
0321       const std::vector<ReadRange>& ranges);
0322 
0323   /// EXPERIMENTAL: Inform that the given ranges may be read soon.
0324   ///
0325   /// Some implementations might arrange to prefetch some of the data.
0326   /// However, no guarantee is made and the default implementation does nothing.
0327   /// For robust prefetching, use ReadAt() or ReadAsync().
0328   virtual Status WillNeed(const std::vector<ReadRange>& ranges);
0329 
0330  protected:
0331   RandomAccessFile();
0332 
0333  private:
0334   struct ARROW_NO_EXPORT Impl;
0335   std::unique_ptr<Impl> interface_impl_;
0336 };
0337 
0338 class ARROW_EXPORT WritableFile : public OutputStream, public Seekable {
0339  public:
0340   virtual Status WriteAt(int64_t position, const void* data, int64_t nbytes) = 0;
0341 
0342  protected:
0343   WritableFile() = default;
0344 };
0345 
0346 class ARROW_EXPORT ReadWriteFileInterface : public RandomAccessFile, public WritableFile {
0347  protected:
0348   ReadWriteFileInterface() { RandomAccessFile::set_mode(FileMode::READWRITE); }
0349 };
0350 
0351 /// \brief Return an iterator on an input stream
0352 ///
0353 /// The iterator yields a fixed-size block on each Next() call, except the
0354 /// last block in the stream which may be smaller.
0355 /// Once the end of stream is reached, Next() returns nullptr
0356 /// (unlike InputStream::Read() which returns an empty buffer).
0357 ARROW_EXPORT
0358 Result<Iterator<std::shared_ptr<Buffer>>> MakeInputStreamIterator(
0359     std::shared_ptr<InputStream> stream, int64_t block_size);
0360 
0361 }  // namespace io
0362 }  // namespace arrow