Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:59

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 // Public API for different memory sharing / IO mechanisms
0019 
0020 #pragma once
0021 
0022 #include <cstdint>
0023 #include <memory>
0024 #include <string_view>
0025 #include <vector>
0026 
0027 #include "arrow/io/concurrency.h"
0028 #include "arrow/io/interfaces.h"
0029 #include "arrow/type_fwd.h"
0030 #include "arrow/util/visibility.h"
0031 
0032 namespace arrow {
0033 
0034 class Status;
0035 
0036 namespace io {
0037 
0038 /// \brief An output stream that writes to a resizable buffer
0039 class ARROW_EXPORT BufferOutputStream : public OutputStream {
0040  public:
0041   explicit BufferOutputStream(const std::shared_ptr<ResizableBuffer>& buffer);
0042 
0043   /// \brief Create in-memory output stream with indicated capacity using a
0044   /// memory pool
0045   /// \param[in] initial_capacity the initial allocated internal capacity of
0046   /// the OutputStream
0047   /// \param[in,out] pool a MemoryPool to use for allocations
0048   /// \return the created stream
0049   static Result<std::shared_ptr<BufferOutputStream>> Create(
0050       int64_t initial_capacity = 4096, MemoryPool* pool = default_memory_pool());
0051 
0052   ~BufferOutputStream() override;
0053 
0054   // Implement the OutputStream interface
0055 
0056   /// Close the stream, preserving the buffer (retrieve it with Finish()).
0057   Status Close() override;
0058   bool closed() const override;
0059   Result<int64_t> Tell() const override;
0060   Status Write(const void* data, int64_t nbytes) override;
0061 
0062   /// \cond FALSE
0063   using OutputStream::Write;
0064   /// \endcond
0065 
0066   /// Close the stream and return the buffer
0067   Result<std::shared_ptr<Buffer>> Finish();
0068 
0069   /// \brief Initialize state of OutputStream with newly allocated memory and
0070   /// set position to 0
0071   /// \param[in] initial_capacity the starting allocated capacity
0072   /// \param[in,out] pool the memory pool to use for allocations
0073   /// \return Status
0074   Status Reset(int64_t initial_capacity = 1024, MemoryPool* pool = default_memory_pool());
0075 
0076   int64_t capacity() const { return capacity_; }
0077 
0078  private:
0079   BufferOutputStream();
0080 
0081   // Ensures there is sufficient space available to write nbytes
0082   Status Reserve(int64_t nbytes);
0083 
0084   std::shared_ptr<ResizableBuffer> buffer_;
0085   bool is_open_;
0086   int64_t capacity_;
0087   int64_t position_;
0088   uint8_t* mutable_data_;
0089 };
0090 
0091 /// \brief A helper class to track the size of allocations
0092 ///
0093 /// Writes to this stream do not copy or retain any data, they just bump
0094 /// a size counter that can be later used to know exactly which data size
0095 /// needs to be allocated for actual writing.
0096 class ARROW_EXPORT MockOutputStream : public OutputStream {
0097  public:
0098   MockOutputStream() : extent_bytes_written_(0), is_open_(true) {}
0099 
0100   // Implement the OutputStream interface
0101   Status Close() override;
0102   bool closed() const override;
0103   Result<int64_t> Tell() const override;
0104   Status Write(const void* data, int64_t nbytes) override;
0105   /// \cond FALSE
0106   using Writable::Write;
0107   /// \endcond
0108 
0109   int64_t GetExtentBytesWritten() const { return extent_bytes_written_; }
0110 
0111  private:
0112   int64_t extent_bytes_written_;
0113   bool is_open_;
0114 };
0115 
0116 /// \brief An output stream that writes into a fixed-size mutable buffer
0117 class ARROW_EXPORT FixedSizeBufferWriter : public WritableFile {
0118  public:
0119   /// Input buffer must be mutable, will abort if not
0120   explicit FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer);
0121   ~FixedSizeBufferWriter() override;
0122 
0123   Status Close() override;
0124   bool closed() const override;
0125   Status Seek(int64_t position) override;
0126   Result<int64_t> Tell() const override;
0127   Status Write(const void* data, int64_t nbytes) override;
0128   /// \cond FALSE
0129   using Writable::Write;
0130   /// \endcond
0131 
0132   Status WriteAt(int64_t position, const void* data, int64_t nbytes) override;
0133 
0134   void set_memcopy_threads(int num_threads);
0135   void set_memcopy_blocksize(int64_t blocksize);
0136   void set_memcopy_threshold(int64_t threshold);
0137 
0138  protected:
0139   class FixedSizeBufferWriterImpl;
0140   std::unique_ptr<FixedSizeBufferWriterImpl> impl_;
0141 };
0142 
0143 /// \class BufferReader
0144 /// \brief Random access zero-copy reads on an arrow::Buffer
0145 class ARROW_EXPORT BufferReader
0146     : public internal::RandomAccessFileConcurrencyWrapper<BufferReader> {
0147  public:
0148   /// \brief Instantiate from std::shared_ptr<Buffer>.
0149   ///
0150   /// This is a zero-copy constructor.
0151   explicit BufferReader(std::shared_ptr<Buffer> buffer);
0152   ARROW_DEPRECATED(
0153       "Deprecated in 14.0.0. Use FromString or BufferReader(std::shared_ptr<Buffer> "
0154       "buffer) instead.")
0155   explicit BufferReader(const Buffer& buffer);
0156   ARROW_DEPRECATED(
0157       "Deprecated in 14.0.0. Use FromString or BufferReader(std::shared_ptr<Buffer> "
0158       "buffer) instead.")
0159   BufferReader(const uint8_t* data, int64_t size);
0160 
0161   /// \brief Instantiate from std::string_view. Does not own data
0162   /// \deprecated Deprecated in 14.0.0. Use FromString or
0163   /// BufferReader(std::shared_ptr<Buffer> buffer) instead.
0164   ARROW_DEPRECATED(
0165       "Deprecated in 14.0.0. Use FromString or BufferReader(std::shared_ptr<Buffer> "
0166       "buffer) instead.")
0167   explicit BufferReader(std::string_view data);
0168 
0169   /// \brief Instantiate from std::string. Owns data.
0170   static std::unique_ptr<BufferReader> FromString(std::string data);
0171 
0172   bool closed() const override;
0173 
0174   bool supports_zero_copy() const override;
0175 
0176   std::shared_ptr<Buffer> buffer() const { return buffer_; }
0177 
0178   // Synchronous ReadAsync override
0179   Future<std::shared_ptr<Buffer>> ReadAsync(const IOContext&, int64_t position,
0180                                             int64_t nbytes) override;
0181   Status WillNeed(const std::vector<ReadRange>& ranges) override;
0182 
0183  protected:
0184   friend RandomAccessFileConcurrencyWrapper<BufferReader>;
0185 
0186   Status DoClose();
0187 
0188   Result<int64_t> DoRead(int64_t nbytes, void* buffer);
0189   Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes);
0190   Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, void* out);
0191   Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes);
0192   Result<std::string_view> DoPeek(int64_t nbytes) override;
0193 
0194   Result<int64_t> DoTell() const;
0195   Status DoSeek(int64_t position);
0196   Result<int64_t> DoGetSize();
0197 
0198   Status CheckClosed() const {
0199     if (!is_open_) {
0200       return Status::Invalid("Operation forbidden on closed BufferReader");
0201     }
0202     return Status::OK();
0203   }
0204 
0205   std::shared_ptr<Buffer> buffer_;
0206   const uint8_t* data_;
0207   int64_t size_;
0208   int64_t position_;
0209   bool is_open_;
0210 };
0211 
0212 }  // namespace io
0213 }  // namespace arrow