Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:59

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 // Buffered stream implementations
0019 
0020 #pragma once
0021 
0022 #include <cstdint>
0023 #include <memory>
0024 #include <string_view>
0025 
0026 #include "arrow/io/concurrency.h"
0027 #include "arrow/io/interfaces.h"
0028 #include "arrow/util/visibility.h"
0029 
0030 namespace arrow {
0031 
0032 class Buffer;
0033 class MemoryPool;
0034 class Status;
0035 
0036 namespace io {
0037 
0038 class ARROW_EXPORT BufferedOutputStream : public OutputStream {
0039  public:
0040   ~BufferedOutputStream() override;
0041 
0042   /// \brief Create a buffered output stream wrapping the given output stream.
0043   /// \param[in] buffer_size the size of the temporary write buffer
0044   /// \param[in] pool a MemoryPool to use for allocations
0045   /// \param[in] raw another OutputStream
0046   /// \return the created BufferedOutputStream
0047   static Result<std::shared_ptr<BufferedOutputStream>> Create(
0048       int64_t buffer_size, MemoryPool* pool, std::shared_ptr<OutputStream> raw);
0049 
0050   /// \brief Resize internal buffer
0051   /// \param[in] new_buffer_size the new buffer size
0052   /// \return Status
0053   Status SetBufferSize(int64_t new_buffer_size);
0054 
0055   /// \brief Return the current size of the internal buffer
0056   int64_t buffer_size() const;
0057 
0058   /// \brief Return the number of remaining bytes that have not been flushed to
0059   /// the raw OutputStream
0060   int64_t bytes_buffered() const;
0061 
0062   /// \brief Flush any buffered writes and release the raw
0063   /// OutputStream. Further operations on this object are invalid
0064   /// \return the underlying OutputStream
0065   Result<std::shared_ptr<OutputStream>> Detach();
0066 
0067   // OutputStream interface
0068 
0069   /// \brief Close the buffered output stream.  This implicitly closes the
0070   /// underlying raw output stream.
0071   Status Close() override;
0072   Status Abort() override;
0073   bool closed() const override;
0074 
0075   Result<int64_t> Tell() const override;
0076   // Write bytes to the stream. Thread-safe
0077   Status Write(const void* data, int64_t nbytes) override;
0078   Status Write(const std::shared_ptr<Buffer>& data) override;
0079 
0080   Status Flush() override;
0081 
0082   /// \brief Return the underlying raw output stream.
0083   std::shared_ptr<OutputStream> raw() const;
0084 
0085  private:
0086   explicit BufferedOutputStream(std::shared_ptr<OutputStream> raw, MemoryPool* pool);
0087 
0088   class ARROW_NO_EXPORT Impl;
0089   std::unique_ptr<Impl> impl_;
0090 };
0091 
0092 /// \class BufferedInputStream
0093 /// \brief An InputStream that performs buffered reads from an unbuffered
0094 /// InputStream, which can mitigate the overhead of many small reads in some
0095 /// cases
0096 class ARROW_EXPORT BufferedInputStream
0097     : public internal::InputStreamConcurrencyWrapper<BufferedInputStream> {
0098  public:
0099   ~BufferedInputStream() override;
0100 
0101   /// \brief Create a BufferedInputStream from a raw InputStream
0102   /// \param[in] buffer_size the size of the temporary read buffer
0103   /// \param[in] pool a MemoryPool to use for allocations
0104   /// \param[in] raw a raw InputStream
0105   /// \param[in] raw_read_bound a bound on the maximum number of bytes
0106   /// to read from the raw input stream. The default -1 indicates that
0107   /// it is unbounded
0108   /// \return the created BufferedInputStream
0109   static Result<std::shared_ptr<BufferedInputStream>> Create(
0110       int64_t buffer_size, MemoryPool* pool, std::shared_ptr<InputStream> raw,
0111       int64_t raw_read_bound = -1);
0112 
0113   /// \brief Resize internal read buffer; calls to Read(...) will read at least
0114   ///        this many bytes from the raw InputStream if possible.
0115   /// \param[in] new_buffer_size the new read buffer size
0116   /// \return Status
0117   Status SetBufferSize(int64_t new_buffer_size);
0118 
0119   /// \brief Return the number of remaining bytes in the read buffer
0120   int64_t bytes_buffered() const;
0121 
0122   /// \brief Return the current size of the internal buffer
0123   int64_t buffer_size() const;
0124 
0125   /// \brief Release the raw InputStream. Any data buffered will be
0126   /// discarded. Further operations on this object are invalid
0127   /// \return raw the underlying InputStream
0128   std::shared_ptr<InputStream> Detach();
0129 
0130   /// \brief Return the unbuffered InputStream
0131   std::shared_ptr<InputStream> raw() const;
0132 
0133   // InputStream APIs
0134 
0135   bool closed() const override;
0136   Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata() override;
0137   Future<std::shared_ptr<const KeyValueMetadata>> ReadMetadataAsync(
0138       const IOContext& io_context) override;
0139 
0140  private:
0141   friend InputStreamConcurrencyWrapper<BufferedInputStream>;
0142 
0143   explicit BufferedInputStream(std::shared_ptr<InputStream> raw, MemoryPool* pool,
0144                                int64_t raw_total_bytes_bound);
0145 
0146   Status DoClose();
0147   Status DoAbort() override;
0148 
0149   /// \brief Returns the position of the buffered stream, though the position
0150   /// of the unbuffered stream may be further advanced.
0151   Result<int64_t> DoTell() const;
0152 
0153   Result<int64_t> DoRead(int64_t nbytes, void* out);
0154 
0155   /// \brief Read into buffer.
0156   Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes);
0157 
0158   /// \brief Return a zero-copy string view referencing buffered data,
0159   /// but do not advance the position of the stream. Buffers data and
0160   /// expands the buffer size if necessary
0161   Result<std::string_view> DoPeek(int64_t nbytes) override;
0162 
0163   class ARROW_NO_EXPORT Impl;
0164   std::unique_ptr<Impl> impl_;
0165 };
0166 
0167 }  // namespace io
0168 }  // namespace arrow