![]() |
|
|||
File indexing completed on 2025-02-28 10:10:19
0001 // Protocol Buffers - Google's data interchange format 0002 // Copyright 2008 Google Inc. All rights reserved. 0003 // 0004 // Use of this source code is governed by a BSD-style 0005 // license that can be found in the LICENSE file or at 0006 // https://developers.google.com/open-source/licenses/bsd 0007 0008 // Author: kenton@google.com (Kenton Varda) 0009 // Based on original Protocol Buffers design by 0010 // Sanjay Ghemawat, Jeff Dean, and others. 0011 // 0012 // This file contains common implementations of the interfaces defined in 0013 // zero_copy_stream.h which are included in the "lite" protobuf library. 0014 // These implementations cover I/O on raw arrays and strings, as well as 0015 // adaptors which make it easy to implement streams based on traditional 0016 // streams. Of course, many users will probably want to write their own 0017 // implementations of these interfaces specific to the particular I/O 0018 // abstractions they prefer to use, but these should cover the most common 0019 // cases. 0020 0021 #ifndef GOOGLE_PROTOBUF_IO_ZERO_COPY_STREAM_IMPL_LITE_H__ 0022 #define GOOGLE_PROTOBUF_IO_ZERO_COPY_STREAM_IMPL_LITE_H__ 0023 0024 #include <iosfwd> 0025 #include <memory> 0026 #include <string> 0027 #include <utility> 0028 0029 #include "google/protobuf/stubs/callback.h" 0030 #include "google/protobuf/stubs/common.h" 0031 #include "absl/base/attributes.h" 0032 #include "absl/base/macros.h" 0033 #include "absl/strings/cord.h" 0034 #include "absl/strings/cord_buffer.h" 0035 #include "google/protobuf/io/zero_copy_stream.h" 0036 #include "google/protobuf/port.h" 0037 0038 0039 // Must be included last. 0040 #include "google/protobuf/port_def.inc" 0041 0042 namespace google { 0043 namespace protobuf { 0044 namespace io { 0045 0046 // =================================================================== 0047 0048 // A ZeroCopyInputStream backed by an in-memory array of bytes. 0049 class PROTOBUF_EXPORT ArrayInputStream final : public ZeroCopyInputStream { 0050 public: 0051 // Create an InputStream that returns the bytes pointed to by "data". 0052 // "data" remains the property of the caller but must remain valid until 0053 // the stream is destroyed. If a block_size is given, calls to Next() 0054 // will return data blocks no larger than the given size. Otherwise, the 0055 // first call to Next() returns the entire array. block_size is mainly 0056 // useful for testing; in production you would probably never want to set 0057 // it. 0058 ArrayInputStream(const void* data, int size, int block_size = -1); 0059 ~ArrayInputStream() override = default; 0060 0061 // `ArrayInputStream` is neither copiable nor assignable 0062 ArrayInputStream(const ArrayInputStream&) = delete; 0063 ArrayInputStream& operator=(const ArrayInputStream&) = delete; 0064 0065 // implements ZeroCopyInputStream ---------------------------------- 0066 bool Next(const void** data, int* size) override; 0067 void BackUp(int count) override; 0068 bool Skip(int count) override; 0069 int64_t ByteCount() const override; 0070 0071 0072 private: 0073 const uint8_t* const data_; // The byte array. 0074 const int size_; // Total size of the array. 0075 const int block_size_; // How many bytes to return at a time. 0076 0077 int position_; 0078 int last_returned_size_; // How many bytes we returned last time Next() 0079 // was called (used for error checking only). 0080 }; 0081 0082 // =================================================================== 0083 0084 // A ZeroCopyOutputStream backed by an in-memory array of bytes. 0085 class PROTOBUF_EXPORT ArrayOutputStream final : public ZeroCopyOutputStream { 0086 public: 0087 // Create an OutputStream that writes to the bytes pointed to by "data". 0088 // "data" remains the property of the caller but must remain valid until 0089 // the stream is destroyed. If a block_size is given, calls to Next() 0090 // will return data blocks no larger than the given size. Otherwise, the 0091 // first call to Next() returns the entire array. block_size is mainly 0092 // useful for testing; in production you would probably never want to set 0093 // it. 0094 ArrayOutputStream(void* data, int size, int block_size = -1); 0095 ~ArrayOutputStream() override = default; 0096 0097 // `ArrayOutputStream` is neither copiable nor assignable 0098 ArrayOutputStream(const ArrayOutputStream&) = delete; 0099 ArrayOutputStream& operator=(const ArrayOutputStream&) = delete; 0100 0101 // implements ZeroCopyOutputStream --------------------------------- 0102 bool Next(void** data, int* size) override; 0103 void BackUp(int count) override; 0104 int64_t ByteCount() const override; 0105 0106 private: 0107 uint8_t* const data_; // The byte array. 0108 const int size_; // Total size of the array. 0109 const int block_size_; // How many bytes to return at a time. 0110 0111 int position_; 0112 int last_returned_size_; // How many bytes we returned last time Next() 0113 // was called (used for error checking only). 0114 }; 0115 0116 // =================================================================== 0117 0118 // A ZeroCopyOutputStream which appends bytes to a string. 0119 class PROTOBUF_EXPORT StringOutputStream final : public ZeroCopyOutputStream { 0120 public: 0121 // Create a StringOutputStream which appends bytes to the given string. 0122 // The string remains property of the caller, but it is mutated in arbitrary 0123 // ways and MUST NOT be accessed in any way until you're done with the 0124 // stream. Either be sure there's no further usage, or (safest) destroy the 0125 // stream before using the contents. 0126 // 0127 // Hint: If you call target->reserve(n) before creating the stream, 0128 // the first call to Next() will return at least n bytes of buffer 0129 // space. 0130 explicit StringOutputStream(std::string* target); 0131 ~StringOutputStream() override = default; 0132 0133 // `StringOutputStream` is neither copiable nor assignable 0134 StringOutputStream(const StringOutputStream&) = delete; 0135 StringOutputStream& operator=(const StringOutputStream&) = delete; 0136 0137 // implements ZeroCopyOutputStream --------------------------------- 0138 bool Next(void** data, int* size) override; 0139 void BackUp(int count) override; 0140 int64_t ByteCount() const override; 0141 0142 private: 0143 static constexpr size_t kMinimumSize = 16; 0144 0145 std::string* target_; 0146 }; 0147 0148 // Note: There is no StringInputStream. Instead, just create an 0149 // ArrayInputStream as follows: 0150 // ArrayInputStream input(str.data(), str.size()); 0151 0152 // =================================================================== 0153 0154 // A generic traditional input stream interface. 0155 // 0156 // Lots of traditional input streams (e.g. file descriptors, C stdio 0157 // streams, and C++ iostreams) expose an interface where every read 0158 // involves copying bytes into a buffer. If you want to take such an 0159 // interface and make a ZeroCopyInputStream based on it, simply implement 0160 // CopyingInputStream and then use CopyingInputStreamAdaptor. 0161 // 0162 // CopyingInputStream implementations should avoid buffering if possible. 0163 // CopyingInputStreamAdaptor does its own buffering and will read data 0164 // in large blocks. 0165 class PROTOBUF_EXPORT CopyingInputStream { 0166 public: 0167 virtual ~CopyingInputStream() {} 0168 0169 // Reads up to "size" bytes into the given buffer. Returns the number of 0170 // bytes read. Read() waits until at least one byte is available, or 0171 // returns zero if no bytes will ever become available (EOF), or -1 if a 0172 // permanent read error occurred. 0173 virtual int Read(void* buffer, int size) = 0; 0174 0175 // Skips the next "count" bytes of input. Returns the number of bytes 0176 // actually skipped. This will always be exactly equal to "count" unless 0177 // EOF was reached or a permanent read error occurred. 0178 // 0179 // The default implementation just repeatedly calls Read() into a scratch 0180 // buffer. 0181 virtual int Skip(int count); 0182 }; 0183 0184 // A ZeroCopyInputStream which reads from a CopyingInputStream. This is 0185 // useful for implementing ZeroCopyInputStreams that read from traditional 0186 // streams. Note that this class is not really zero-copy. 0187 // 0188 // If you want to read from file descriptors or C++ istreams, this is 0189 // already implemented for you: use FileInputStream or IstreamInputStream 0190 // respectively. 0191 class PROTOBUF_EXPORT CopyingInputStreamAdaptor : public ZeroCopyInputStream { 0192 public: 0193 // Creates a stream that reads from the given CopyingInputStream. 0194 // If a block_size is given, it specifies the number of bytes that 0195 // should be read and returned with each call to Next(). Otherwise, 0196 // a reasonable default is used. The caller retains ownership of 0197 // copying_stream unless SetOwnsCopyingStream(true) is called. 0198 explicit CopyingInputStreamAdaptor(CopyingInputStream* copying_stream, 0199 int block_size = -1); 0200 ~CopyingInputStreamAdaptor() override; 0201 0202 // `CopyingInputStreamAdaptor` is neither copiable nor assignable 0203 CopyingInputStreamAdaptor(const CopyingInputStreamAdaptor&) = delete; 0204 CopyingInputStreamAdaptor& operator=(const CopyingInputStreamAdaptor&) = delete; 0205 0206 // Call SetOwnsCopyingStream(true) to tell the CopyingInputStreamAdaptor to 0207 // delete the underlying CopyingInputStream when it is destroyed. 0208 void SetOwnsCopyingStream(bool value) { owns_copying_stream_ = value; } 0209 0210 // implements ZeroCopyInputStream ---------------------------------- 0211 bool Next(const void** data, int* size) override; 0212 void BackUp(int count) override; 0213 bool Skip(int count) override; 0214 int64_t ByteCount() const override; 0215 0216 private: 0217 // Insures that buffer_ is not NULL. 0218 void AllocateBufferIfNeeded(); 0219 // Frees the buffer and resets buffer_used_. 0220 void FreeBuffer(); 0221 0222 // The underlying copying stream. 0223 CopyingInputStream* copying_stream_; 0224 bool owns_copying_stream_; 0225 0226 // True if we have seen a permanent error from the underlying stream. 0227 bool failed_; 0228 0229 // The current position of copying_stream_, relative to the point where 0230 // we started reading. 0231 int64_t position_; 0232 0233 // Data is read into this buffer. It may be NULL if no buffer is currently 0234 // in use. Otherwise, it points to an array of size buffer_size_. 0235 std::unique_ptr<uint8_t[]> buffer_; 0236 const int buffer_size_; 0237 0238 // Number of valid bytes currently in the buffer (i.e. the size last 0239 // returned by Next()). 0 <= buffer_used_ <= buffer_size_. 0240 int buffer_used_; 0241 0242 // Number of bytes in the buffer which were backed up over by a call to 0243 // BackUp(). These need to be returned again. 0244 // 0 <= backup_bytes_ <= buffer_used_ 0245 int backup_bytes_; 0246 }; 0247 0248 // =================================================================== 0249 0250 // A generic traditional output stream interface. 0251 // 0252 // Lots of traditional output streams (e.g. file descriptors, C stdio 0253 // streams, and C++ iostreams) expose an interface where every write 0254 // involves copying bytes from a buffer. If you want to take such an 0255 // interface and make a ZeroCopyOutputStream based on it, simply implement 0256 // CopyingOutputStream and then use CopyingOutputStreamAdaptor. 0257 // 0258 // CopyingOutputStream implementations should avoid buffering if possible. 0259 // CopyingOutputStreamAdaptor does its own buffering and will write data 0260 // in large blocks. 0261 class PROTOBUF_EXPORT CopyingOutputStream { 0262 public: 0263 virtual ~CopyingOutputStream() {} 0264 0265 // Writes "size" bytes from the given buffer to the output. Returns true 0266 // if successful, false on a write error. 0267 virtual bool Write(const void* buffer, int size) = 0; 0268 }; 0269 0270 // A ZeroCopyOutputStream which writes to a CopyingOutputStream. This is 0271 // useful for implementing ZeroCopyOutputStreams that write to traditional 0272 // streams. Note that this class is not really zero-copy. 0273 // 0274 // If you want to write to file descriptors or C++ ostreams, this is 0275 // already implemented for you: use FileOutputStream or OstreamOutputStream 0276 // respectively. 0277 class PROTOBUF_EXPORT CopyingOutputStreamAdaptor : public ZeroCopyOutputStream { 0278 public: 0279 // Creates a stream that writes to the given Unix file descriptor. 0280 // If a block_size is given, it specifies the size of the buffers 0281 // that should be returned by Next(). Otherwise, a reasonable default 0282 // is used. 0283 explicit CopyingOutputStreamAdaptor(CopyingOutputStream* copying_stream, 0284 int block_size = -1); 0285 ~CopyingOutputStreamAdaptor() override; 0286 0287 // `CopyingOutputStreamAdaptor` is neither copiable nor assignable 0288 CopyingOutputStreamAdaptor(const CopyingOutputStreamAdaptor&) = delete; 0289 CopyingOutputStreamAdaptor& operator=(const CopyingOutputStreamAdaptor&) = delete; 0290 0291 // Writes all pending data to the underlying stream. Returns false if a 0292 // write error occurred on the underlying stream. (The underlying 0293 // stream itself is not necessarily flushed.) 0294 bool Flush(); 0295 0296 // Call SetOwnsCopyingStream(true) to tell the CopyingOutputStreamAdaptor to 0297 // delete the underlying CopyingOutputStream when it is destroyed. 0298 void SetOwnsCopyingStream(bool value) { owns_copying_stream_ = value; } 0299 0300 // implements ZeroCopyOutputStream --------------------------------- 0301 bool Next(void** data, int* size) override; 0302 void BackUp(int count) override; 0303 int64_t ByteCount() const override; 0304 bool WriteAliasedRaw(const void* data, int size) override; 0305 bool AllowsAliasing() const override { return true; } 0306 bool WriteCord(const absl::Cord& cord) override; 0307 0308 private: 0309 // Write the current buffer, if it is present. 0310 bool WriteBuffer(); 0311 // Insures that buffer_ is not NULL. 0312 void AllocateBufferIfNeeded(); 0313 // Frees the buffer. 0314 void FreeBuffer(); 0315 0316 // The underlying copying stream. 0317 CopyingOutputStream* copying_stream_; 0318 bool owns_copying_stream_; 0319 0320 // True if we have seen a permanent error from the underlying stream. 0321 bool failed_; 0322 0323 // The current position of copying_stream_, relative to the point where 0324 // we started writing. 0325 int64_t position_; 0326 0327 // Data is written from this buffer. It may be NULL if no buffer is 0328 // currently in use. Otherwise, it points to an array of size buffer_size_. 0329 std::unique_ptr<uint8_t[]> buffer_; 0330 const int buffer_size_; 0331 0332 // Number of valid bytes currently in the buffer (i.e. the size last 0333 // returned by Next()). When BackUp() is called, we just reduce this. 0334 // 0 <= buffer_used_ <= buffer_size_. 0335 int buffer_used_; 0336 }; 0337 0338 // =================================================================== 0339 0340 // A ZeroCopyInputStream which wraps some other stream and limits it to 0341 // a particular byte count. 0342 class PROTOBUF_EXPORT LimitingInputStream final : public ZeroCopyInputStream { 0343 public: 0344 LimitingInputStream(ZeroCopyInputStream* input, int64_t limit); 0345 ~LimitingInputStream() override; 0346 0347 // `LimitingInputStream` is neither copiable nor assignable 0348 LimitingInputStream(const LimitingInputStream&) = delete; 0349 LimitingInputStream& operator=(const LimitingInputStream&) = delete; 0350 0351 // implements ZeroCopyInputStream ---------------------------------- 0352 bool Next(const void** data, int* size) override; 0353 void BackUp(int count) override; 0354 bool Skip(int count) override; 0355 int64_t ByteCount() const override; 0356 bool ReadCord(absl::Cord* cord, int count) override; 0357 0358 0359 private: 0360 ZeroCopyInputStream* input_; 0361 int64_t limit_; // Decreases as we go, becomes negative if we overshoot. 0362 int64_t prior_bytes_read_; // Bytes read on underlying stream at construction 0363 }; 0364 0365 // =================================================================== 0366 0367 // A ZeroCopyInputStream backed by a Cord. This stream implements ReadCord() 0368 // in a way that can share memory between the source and destination cords 0369 // rather than copying. 0370 class PROTOBUF_EXPORT CordInputStream final : public ZeroCopyInputStream { 0371 public: 0372 // Creates an InputStream that reads from the given Cord. `cord` must 0373 // not be null and must outlive this CordInputStream instance. `cord` must 0374 // not be modified while this instance is actively being used: any change 0375 // to `cord` will lead to undefined behavior on any subsequent call into 0376 // this instance. 0377 explicit CordInputStream( 0378 const absl::Cord* cord ABSL_ATTRIBUTE_LIFETIME_BOUND); 0379 0380 0381 // `CordInputStream` is neither copiable nor assignable 0382 CordInputStream(const CordInputStream&) = delete; 0383 CordInputStream& operator=(const CordInputStream&) = delete; 0384 0385 // implements ZeroCopyInputStream ---------------------------------- 0386 bool Next(const void** data, int* size) override; 0387 void BackUp(int count) override; 0388 bool Skip(int count) override; 0389 int64_t ByteCount() const override; 0390 bool ReadCord(absl::Cord* cord, int count) override; 0391 0392 0393 private: 0394 // Moves `it_` to the next available chunk skipping `skip` extra bytes 0395 // and updates the chunk data pointers. 0396 bool NextChunk(size_t skip); 0397 0398 // Updates the current chunk data context `data_`, `size_` and `available_`. 0399 // If `bytes_remaining_` is zero, sets `size_` and `available_` to zero. 0400 // Returns true if more data is available, false otherwise. 0401 bool LoadChunkData(); 0402 0403 absl::Cord::CharIterator it_; 0404 size_t length_; 0405 size_t bytes_remaining_; 0406 const char* data_; 0407 size_t size_; 0408 size_t available_; 0409 }; 0410 0411 // =================================================================== 0412 0413 // A ZeroCopyOutputStream that writes to a Cord. This stream implements 0414 // WriteCord() in a way that can share memory between the source and 0415 // destination cords rather than copying. 0416 class PROTOBUF_EXPORT CordOutputStream final : public ZeroCopyOutputStream { 0417 public: 0418 // Creates an OutputStream streaming serialized data into a Cord. `size_hint`, 0419 // if given, is the expected total size of the resulting Cord. This is a hint 0420 // only, used for optimization. Callers can obtain the generated Cord value by 0421 // invoking `Consume()`. 0422 explicit CordOutputStream(size_t size_hint = 0); 0423 0424 // Creates an OutputStream with an initial Cord value. This constructor can be 0425 // used by applications wanting to directly append serialization data to a 0426 // given cord. In such cases, donating the existing value as in: 0427 // 0428 // CordOutputStream stream(std::move(cord)); 0429 // message.SerializeToZeroCopyStream(&stream); 0430 // cord = std::move(stream.Consume()); 0431 // 0432 // is more efficient then appending the serialized cord in application code: 0433 // 0434 // CordOutputStream stream; 0435 // message.SerializeToZeroCopyStream(&stream); 0436 // cord.Append(stream.Consume()); 0437 // 0438 // The former allows `CordOutputStream` to utilize pre-existing privately 0439 // owned Cord buffers from the donated cord where the latter does not, which 0440 // may lead to more memory usage when serialuzing data into existing cords. 0441 explicit CordOutputStream(absl::Cord cord, size_t size_hint = 0); 0442 0443 // Creates an OutputStream with an initial Cord value and initial buffer. 0444 // This donates both the preexisting cord in `cord`, as well as any 0445 // pre-existing data and additional capacity in `buffer`. 0446 // This function is mainly intended to be used in internal serialization logic 0447 // using eager buffer initialization in EpsCopyOutputStream. 0448 // The donated buffer can be empty, partially empty or full: the outputstream 0449 // will DTRT in all cases and preserve any pre-existing data. 0450 explicit CordOutputStream(absl::Cord cord, absl::CordBuffer buffer, 0451 size_t size_hint = 0); 0452 0453 // Creates an OutputStream with an initial buffer. 0454 // This method is logically identical to, but more efficient than: 0455 // `CordOutputStream(absl::Cord(), std::move(buffer), size_hint)` 0456 explicit CordOutputStream(absl::CordBuffer buffer, size_t size_hint = 0); 0457 0458 // `CordOutputStream` is neither copiable nor assignable 0459 CordOutputStream(const CordOutputStream&) = delete; 0460 CordOutputStream& operator=(const CordOutputStream&) = delete; 0461 0462 // implements `ZeroCopyOutputStream` --------------------------------- 0463 bool Next(void** data, int* size) final; 0464 void BackUp(int count) final; 0465 int64_t ByteCount() const final; 0466 bool WriteCord(const absl::Cord& cord) final; 0467 0468 // Consumes the serialized data as a cord value. `Consume()` internally 0469 // flushes any pending state 'as if' BackUp(0) was called. While a final call 0470 // to BackUp() is generally required by the `ZeroCopyOutputStream` contract, 0471 // applications using `CordOutputStream` directly can call `Consume()` without 0472 // a preceding call to `BackUp()`. 0473 // 0474 // While it will rarely be useful in practice (and especially in the presence 0475 // of size hints) an instance is safe to be used after a call to `Consume()`. 0476 // The only logical change in state is that all serialized data is extracted, 0477 // and any new serialization calls will serialize into new cord data. 0478 absl::Cord Consume(); 0479 0480 private: 0481 // State of `buffer_` and 'cord_. As a default CordBuffer instance always has 0482 // inlined capacity, we track state explicitly to avoid returning 'existing 0483 // capacity' from the default or 'moved from' CordBuffer. 'kSteal' indicates 0484 // we should (attempt to) steal the next buffer from the cord. 0485 enum class State { kEmpty, kFull, kPartial, kSteal }; 0486 0487 absl::Cord cord_; 0488 size_t size_hint_; 0489 State state_ = State::kEmpty; 0490 absl::CordBuffer buffer_; 0491 }; 0492 0493 0494 // =================================================================== 0495 0496 // Return a pointer to mutable characters underlying the given string. The 0497 // return value is valid until the next time the string is resized. We 0498 // trust the caller to treat the return value as an array of length s->size(). 0499 inline char* mutable_string_data(std::string* s) { 0500 return &(*s)[0]; 0501 } 0502 0503 // as_string_data(s) is equivalent to 0504 // ({ char* p = mutable_string_data(s); make_pair(p, p != NULL); }) 0505 // Sometimes it's faster: in some scenarios p cannot be NULL, and then the 0506 // code can avoid that check. 0507 inline std::pair<char*, bool> as_string_data(std::string* s) { 0508 char* p = mutable_string_data(s); 0509 return std::make_pair(p, true); 0510 } 0511 0512 } // namespace io 0513 } // namespace protobuf 0514 } // namespace google 0515 0516 #include "google/protobuf/port_undef.inc" 0517 0518 #endif // GOOGLE_PROTOBUF_IO_ZERO_COPY_STREAM_IMPL_LITE_H__
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
![]() ![]() |