![]() |
|
|||
File indexing completed on 2025-08-28 08:26:59
0001 // Licensed to the Apache Software Foundation (ASF) under one 0002 // or more contributor license agreements. See the NOTICE file 0003 // distributed with this work for additional information 0004 // regarding copyright ownership. The ASF licenses this file 0005 // to you under the Apache License, Version 2.0 (the 0006 // "License"); you may not use this file except in compliance 0007 // with the License. You may obtain a copy of the License at 0008 // 0009 // http://www.apache.org/licenses/LICENSE-2.0 0010 // 0011 // Unless required by applicable law or agreed to in writing, 0012 // software distributed under the License is distributed on an 0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 0014 // KIND, either express or implied. See the License for the 0015 // specific language governing permissions and limitations 0016 // under the License. 0017 0018 // Buffered stream implementations 0019 0020 #pragma once 0021 0022 #include <cstdint> 0023 #include <memory> 0024 #include <string_view> 0025 0026 #include "arrow/io/concurrency.h" 0027 #include "arrow/io/interfaces.h" 0028 #include "arrow/util/visibility.h" 0029 0030 namespace arrow { 0031 0032 class Buffer; 0033 class MemoryPool; 0034 class Status; 0035 0036 namespace io { 0037 0038 class ARROW_EXPORT BufferedOutputStream : public OutputStream { 0039 public: 0040 ~BufferedOutputStream() override; 0041 0042 /// \brief Create a buffered output stream wrapping the given output stream. 0043 /// \param[in] buffer_size the size of the temporary write buffer 0044 /// \param[in] pool a MemoryPool to use for allocations 0045 /// \param[in] raw another OutputStream 0046 /// \return the created BufferedOutputStream 0047 static Result<std::shared_ptr<BufferedOutputStream>> Create( 0048 int64_t buffer_size, MemoryPool* pool, std::shared_ptr<OutputStream> raw); 0049 0050 /// \brief Resize internal buffer 0051 /// \param[in] new_buffer_size the new buffer size 0052 /// \return Status 0053 Status SetBufferSize(int64_t new_buffer_size); 0054 0055 /// \brief Return the current size of the internal buffer 0056 int64_t buffer_size() const; 0057 0058 /// \brief Return the number of remaining bytes that have not been flushed to 0059 /// the raw OutputStream 0060 int64_t bytes_buffered() const; 0061 0062 /// \brief Flush any buffered writes and release the raw 0063 /// OutputStream. Further operations on this object are invalid 0064 /// \return the underlying OutputStream 0065 Result<std::shared_ptr<OutputStream>> Detach(); 0066 0067 // OutputStream interface 0068 0069 /// \brief Close the buffered output stream. This implicitly closes the 0070 /// underlying raw output stream. 0071 Status Close() override; 0072 Status Abort() override; 0073 bool closed() const override; 0074 0075 Result<int64_t> Tell() const override; 0076 // Write bytes to the stream. Thread-safe 0077 Status Write(const void* data, int64_t nbytes) override; 0078 Status Write(const std::shared_ptr<Buffer>& data) override; 0079 0080 Status Flush() override; 0081 0082 /// \brief Return the underlying raw output stream. 0083 std::shared_ptr<OutputStream> raw() const; 0084 0085 private: 0086 explicit BufferedOutputStream(std::shared_ptr<OutputStream> raw, MemoryPool* pool); 0087 0088 class ARROW_NO_EXPORT Impl; 0089 std::unique_ptr<Impl> impl_; 0090 }; 0091 0092 /// \class BufferedInputStream 0093 /// \brief An InputStream that performs buffered reads from an unbuffered 0094 /// InputStream, which can mitigate the overhead of many small reads in some 0095 /// cases 0096 class ARROW_EXPORT BufferedInputStream 0097 : public internal::InputStreamConcurrencyWrapper<BufferedInputStream> { 0098 public: 0099 ~BufferedInputStream() override; 0100 0101 /// \brief Create a BufferedInputStream from a raw InputStream 0102 /// \param[in] buffer_size the size of the temporary read buffer 0103 /// \param[in] pool a MemoryPool to use for allocations 0104 /// \param[in] raw a raw InputStream 0105 /// \param[in] raw_read_bound a bound on the maximum number of bytes 0106 /// to read from the raw input stream. The default -1 indicates that 0107 /// it is unbounded 0108 /// \return the created BufferedInputStream 0109 static Result<std::shared_ptr<BufferedInputStream>> Create( 0110 int64_t buffer_size, MemoryPool* pool, std::shared_ptr<InputStream> raw, 0111 int64_t raw_read_bound = -1); 0112 0113 /// \brief Resize internal read buffer; calls to Read(...) will read at least 0114 /// this many bytes from the raw InputStream if possible. 0115 /// \param[in] new_buffer_size the new read buffer size 0116 /// \return Status 0117 Status SetBufferSize(int64_t new_buffer_size); 0118 0119 /// \brief Return the number of remaining bytes in the read buffer 0120 int64_t bytes_buffered() const; 0121 0122 /// \brief Return the current size of the internal buffer 0123 int64_t buffer_size() const; 0124 0125 /// \brief Release the raw InputStream. Any data buffered will be 0126 /// discarded. Further operations on this object are invalid 0127 /// \return raw the underlying InputStream 0128 std::shared_ptr<InputStream> Detach(); 0129 0130 /// \brief Return the unbuffered InputStream 0131 std::shared_ptr<InputStream> raw() const; 0132 0133 // InputStream APIs 0134 0135 bool closed() const override; 0136 Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata() override; 0137 Future<std::shared_ptr<const KeyValueMetadata>> ReadMetadataAsync( 0138 const IOContext& io_context) override; 0139 0140 private: 0141 friend InputStreamConcurrencyWrapper<BufferedInputStream>; 0142 0143 explicit BufferedInputStream(std::shared_ptr<InputStream> raw, MemoryPool* pool, 0144 int64_t raw_total_bytes_bound); 0145 0146 Status DoClose(); 0147 Status DoAbort() override; 0148 0149 /// \brief Returns the position of the buffered stream, though the position 0150 /// of the unbuffered stream may be further advanced. 0151 Result<int64_t> DoTell() const; 0152 0153 Result<int64_t> DoRead(int64_t nbytes, void* out); 0154 0155 /// \brief Read into buffer. 0156 Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes); 0157 0158 /// \brief Return a zero-copy string view referencing buffered data, 0159 /// but do not advance the position of the stream. Buffers data and 0160 /// expands the buffer size if necessary 0161 Result<std::string_view> DoPeek(int64_t nbytes) override; 0162 0163 class ARROW_NO_EXPORT Impl; 0164 std::unique_ptr<Impl> impl_; 0165 }; 0166 0167 } // namespace io 0168 } // namespace arrow
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
![]() ![]() |