Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-02-28 10:10:19

0001 // Protocol Buffers - Google's data interchange format
0002 // Copyright 2008 Google Inc.  All rights reserved.
0003 //
0004 // Use of this source code is governed by a BSD-style
0005 // license that can be found in the LICENSE file or at
0006 // https://developers.google.com/open-source/licenses/bsd
0007 
0008 // Author: kenton@google.com (Kenton Varda)
0009 //  Based on original Protocol Buffers design by
0010 //  Sanjay Ghemawat, Jeff Dean, and others.
0011 //
0012 // This file contains common implementations of the interfaces defined in
0013 // zero_copy_stream.h which are included in the "lite" protobuf library.
0014 // These implementations cover I/O on raw arrays and strings, as well as
0015 // adaptors which make it easy to implement streams based on traditional
0016 // streams.  Of course, many users will probably want to write their own
0017 // implementations of these interfaces specific to the particular I/O
0018 // abstractions they prefer to use, but these should cover the most common
0019 // cases.
0020 
0021 #ifndef GOOGLE_PROTOBUF_IO_ZERO_COPY_STREAM_IMPL_LITE_H__
0022 #define GOOGLE_PROTOBUF_IO_ZERO_COPY_STREAM_IMPL_LITE_H__
0023 
0024 #include <iosfwd>
0025 #include <memory>
0026 #include <string>
0027 #include <utility>
0028 
0029 #include "google/protobuf/stubs/callback.h"
0030 #include "google/protobuf/stubs/common.h"
0031 #include "absl/base/attributes.h"
0032 #include "absl/base/macros.h"
0033 #include "absl/strings/cord.h"
0034 #include "absl/strings/cord_buffer.h"
0035 #include "google/protobuf/io/zero_copy_stream.h"
0036 #include "google/protobuf/port.h"
0037 
0038 
0039 // Must be included last.
0040 #include "google/protobuf/port_def.inc"
0041 
0042 namespace google {
0043 namespace protobuf {
0044 namespace io {
0045 
0046 // ===================================================================
0047 
0048 // A ZeroCopyInputStream backed by an in-memory array of bytes.
0049 class PROTOBUF_EXPORT ArrayInputStream final : public ZeroCopyInputStream {
0050  public:
0051   // Create an InputStream that returns the bytes pointed to by "data".
0052   // "data" remains the property of the caller but must remain valid until
0053   // the stream is destroyed.  If a block_size is given, calls to Next()
0054   // will return data blocks no larger than the given size.  Otherwise, the
0055   // first call to Next() returns the entire array.  block_size is mainly
0056   // useful for testing; in production you would probably never want to set
0057   // it.
0058   ArrayInputStream(const void* data, int size, int block_size = -1);
0059   ~ArrayInputStream() override = default;
0060 
0061   // `ArrayInputStream` is neither copiable nor assignable
0062   ArrayInputStream(const ArrayInputStream&) = delete;
0063   ArrayInputStream& operator=(const ArrayInputStream&) = delete;
0064 
0065   // implements ZeroCopyInputStream ----------------------------------
0066   bool Next(const void** data, int* size) override;
0067   void BackUp(int count) override;
0068   bool Skip(int count) override;
0069   int64_t ByteCount() const override;
0070 
0071 
0072  private:
0073   const uint8_t* const data_;  // The byte array.
0074   const int size_;           // Total size of the array.
0075   const int block_size_;     // How many bytes to return at a time.
0076 
0077   int position_;
0078   int last_returned_size_;  // How many bytes we returned last time Next()
0079                             // was called (used for error checking only).
0080 };
0081 
0082 // ===================================================================
0083 
0084 // A ZeroCopyOutputStream backed by an in-memory array of bytes.
0085 class PROTOBUF_EXPORT ArrayOutputStream final : public ZeroCopyOutputStream {
0086  public:
0087   // Create an OutputStream that writes to the bytes pointed to by "data".
0088   // "data" remains the property of the caller but must remain valid until
0089   // the stream is destroyed.  If a block_size is given, calls to Next()
0090   // will return data blocks no larger than the given size.  Otherwise, the
0091   // first call to Next() returns the entire array.  block_size is mainly
0092   // useful for testing; in production you would probably never want to set
0093   // it.
0094   ArrayOutputStream(void* data, int size, int block_size = -1);
0095   ~ArrayOutputStream() override = default;
0096 
0097   // `ArrayOutputStream` is neither copiable nor assignable
0098   ArrayOutputStream(const ArrayOutputStream&) = delete;
0099   ArrayOutputStream& operator=(const ArrayOutputStream&) = delete;
0100 
0101   // implements ZeroCopyOutputStream ---------------------------------
0102   bool Next(void** data, int* size) override;
0103   void BackUp(int count) override;
0104   int64_t ByteCount() const override;
0105 
0106  private:
0107   uint8_t* const data_;     // The byte array.
0108   const int size_;        // Total size of the array.
0109   const int block_size_;  // How many bytes to return at a time.
0110 
0111   int position_;
0112   int last_returned_size_;  // How many bytes we returned last time Next()
0113                             // was called (used for error checking only).
0114 };
0115 
0116 // ===================================================================
0117 
0118 // A ZeroCopyOutputStream which appends bytes to a string.
0119 class PROTOBUF_EXPORT StringOutputStream final : public ZeroCopyOutputStream {
0120  public:
0121   // Create a StringOutputStream which appends bytes to the given string.
0122   // The string remains property of the caller, but it is mutated in arbitrary
0123   // ways and MUST NOT be accessed in any way until you're done with the
0124   // stream. Either be sure there's no further usage, or (safest) destroy the
0125   // stream before using the contents.
0126   //
0127   // Hint:  If you call target->reserve(n) before creating the stream,
0128   //   the first call to Next() will return at least n bytes of buffer
0129   //   space.
0130   explicit StringOutputStream(std::string* target);
0131   ~StringOutputStream() override = default;
0132 
0133   // `StringOutputStream` is neither copiable nor assignable
0134   StringOutputStream(const StringOutputStream&) = delete;
0135   StringOutputStream& operator=(const StringOutputStream&) = delete;
0136 
0137   // implements ZeroCopyOutputStream ---------------------------------
0138   bool Next(void** data, int* size) override;
0139   void BackUp(int count) override;
0140   int64_t ByteCount() const override;
0141 
0142  private:
0143   static constexpr size_t kMinimumSize = 16;
0144 
0145   std::string* target_;
0146 };
0147 
0148 // Note:  There is no StringInputStream.  Instead, just create an
0149 // ArrayInputStream as follows:
0150 //   ArrayInputStream input(str.data(), str.size());
0151 
0152 // ===================================================================
0153 
0154 // A generic traditional input stream interface.
0155 //
0156 // Lots of traditional input streams (e.g. file descriptors, C stdio
0157 // streams, and C++ iostreams) expose an interface where every read
0158 // involves copying bytes into a buffer.  If you want to take such an
0159 // interface and make a ZeroCopyInputStream based on it, simply implement
0160 // CopyingInputStream and then use CopyingInputStreamAdaptor.
0161 //
0162 // CopyingInputStream implementations should avoid buffering if possible.
0163 // CopyingInputStreamAdaptor does its own buffering and will read data
0164 // in large blocks.
0165 class PROTOBUF_EXPORT CopyingInputStream {
0166  public:
0167   virtual ~CopyingInputStream() {}
0168 
0169   // Reads up to "size" bytes into the given buffer.  Returns the number of
0170   // bytes read.  Read() waits until at least one byte is available, or
0171   // returns zero if no bytes will ever become available (EOF), or -1 if a
0172   // permanent read error occurred.
0173   virtual int Read(void* buffer, int size) = 0;
0174 
0175   // Skips the next "count" bytes of input.  Returns the number of bytes
0176   // actually skipped.  This will always be exactly equal to "count" unless
0177   // EOF was reached or a permanent read error occurred.
0178   //
0179   // The default implementation just repeatedly calls Read() into a scratch
0180   // buffer.
0181   virtual int Skip(int count);
0182 };
0183 
0184 // A ZeroCopyInputStream which reads from a CopyingInputStream.  This is
0185 // useful for implementing ZeroCopyInputStreams that read from traditional
0186 // streams.  Note that this class is not really zero-copy.
0187 //
0188 // If you want to read from file descriptors or C++ istreams, this is
0189 // already implemented for you:  use FileInputStream or IstreamInputStream
0190 // respectively.
0191 class PROTOBUF_EXPORT CopyingInputStreamAdaptor : public ZeroCopyInputStream {
0192  public:
0193   // Creates a stream that reads from the given CopyingInputStream.
0194   // If a block_size is given, it specifies the number of bytes that
0195   // should be read and returned with each call to Next().  Otherwise,
0196   // a reasonable default is used.  The caller retains ownership of
0197   // copying_stream unless SetOwnsCopyingStream(true) is called.
0198   explicit CopyingInputStreamAdaptor(CopyingInputStream* copying_stream,
0199                                      int block_size = -1);
0200   ~CopyingInputStreamAdaptor() override;
0201 
0202   // `CopyingInputStreamAdaptor` is neither copiable nor assignable
0203   CopyingInputStreamAdaptor(const CopyingInputStreamAdaptor&) = delete;
0204   CopyingInputStreamAdaptor& operator=(const CopyingInputStreamAdaptor&) = delete;
0205 
0206   // Call SetOwnsCopyingStream(true) to tell the CopyingInputStreamAdaptor to
0207   // delete the underlying CopyingInputStream when it is destroyed.
0208   void SetOwnsCopyingStream(bool value) { owns_copying_stream_ = value; }
0209 
0210   // implements ZeroCopyInputStream ----------------------------------
0211   bool Next(const void** data, int* size) override;
0212   void BackUp(int count) override;
0213   bool Skip(int count) override;
0214   int64_t ByteCount() const override;
0215 
0216  private:
0217   // Insures that buffer_ is not NULL.
0218   void AllocateBufferIfNeeded();
0219   // Frees the buffer and resets buffer_used_.
0220   void FreeBuffer();
0221 
0222   // The underlying copying stream.
0223   CopyingInputStream* copying_stream_;
0224   bool owns_copying_stream_;
0225 
0226   // True if we have seen a permanent error from the underlying stream.
0227   bool failed_;
0228 
0229   // The current position of copying_stream_, relative to the point where
0230   // we started reading.
0231   int64_t position_;
0232 
0233   // Data is read into this buffer.  It may be NULL if no buffer is currently
0234   // in use.  Otherwise, it points to an array of size buffer_size_.
0235   std::unique_ptr<uint8_t[]> buffer_;
0236   const int buffer_size_;
0237 
0238   // Number of valid bytes currently in the buffer (i.e. the size last
0239   // returned by Next()).  0 <= buffer_used_ <= buffer_size_.
0240   int buffer_used_;
0241 
0242   // Number of bytes in the buffer which were backed up over by a call to
0243   // BackUp().  These need to be returned again.
0244   // 0 <= backup_bytes_ <= buffer_used_
0245   int backup_bytes_;
0246 };
0247 
0248 // ===================================================================
0249 
0250 // A generic traditional output stream interface.
0251 //
0252 // Lots of traditional output streams (e.g. file descriptors, C stdio
0253 // streams, and C++ iostreams) expose an interface where every write
0254 // involves copying bytes from a buffer.  If you want to take such an
0255 // interface and make a ZeroCopyOutputStream based on it, simply implement
0256 // CopyingOutputStream and then use CopyingOutputStreamAdaptor.
0257 //
0258 // CopyingOutputStream implementations should avoid buffering if possible.
0259 // CopyingOutputStreamAdaptor does its own buffering and will write data
0260 // in large blocks.
0261 class PROTOBUF_EXPORT CopyingOutputStream {
0262  public:
0263   virtual ~CopyingOutputStream() {}
0264 
0265   // Writes "size" bytes from the given buffer to the output.  Returns true
0266   // if successful, false on a write error.
0267   virtual bool Write(const void* buffer, int size) = 0;
0268 };
0269 
0270 // A ZeroCopyOutputStream which writes to a CopyingOutputStream.  This is
0271 // useful for implementing ZeroCopyOutputStreams that write to traditional
0272 // streams.  Note that this class is not really zero-copy.
0273 //
0274 // If you want to write to file descriptors or C++ ostreams, this is
0275 // already implemented for you:  use FileOutputStream or OstreamOutputStream
0276 // respectively.
0277 class PROTOBUF_EXPORT CopyingOutputStreamAdaptor : public ZeroCopyOutputStream {
0278  public:
0279   // Creates a stream that writes to the given Unix file descriptor.
0280   // If a block_size is given, it specifies the size of the buffers
0281   // that should be returned by Next().  Otherwise, a reasonable default
0282   // is used.
0283   explicit CopyingOutputStreamAdaptor(CopyingOutputStream* copying_stream,
0284                                       int block_size = -1);
0285   ~CopyingOutputStreamAdaptor() override;
0286 
0287   // `CopyingOutputStreamAdaptor` is neither copiable nor assignable
0288   CopyingOutputStreamAdaptor(const CopyingOutputStreamAdaptor&) = delete;
0289   CopyingOutputStreamAdaptor& operator=(const CopyingOutputStreamAdaptor&) = delete;
0290 
0291   // Writes all pending data to the underlying stream.  Returns false if a
0292   // write error occurred on the underlying stream.  (The underlying
0293   // stream itself is not necessarily flushed.)
0294   bool Flush();
0295 
0296   // Call SetOwnsCopyingStream(true) to tell the CopyingOutputStreamAdaptor to
0297   // delete the underlying CopyingOutputStream when it is destroyed.
0298   void SetOwnsCopyingStream(bool value) { owns_copying_stream_ = value; }
0299 
0300   // implements ZeroCopyOutputStream ---------------------------------
0301   bool Next(void** data, int* size) override;
0302   void BackUp(int count) override;
0303   int64_t ByteCount() const override;
0304   bool WriteAliasedRaw(const void* data, int size) override;
0305   bool AllowsAliasing() const override { return true; }
0306   bool WriteCord(const absl::Cord& cord) override;
0307 
0308  private:
0309   // Write the current buffer, if it is present.
0310   bool WriteBuffer();
0311   // Insures that buffer_ is not NULL.
0312   void AllocateBufferIfNeeded();
0313   // Frees the buffer.
0314   void FreeBuffer();
0315 
0316   // The underlying copying stream.
0317   CopyingOutputStream* copying_stream_;
0318   bool owns_copying_stream_;
0319 
0320   // True if we have seen a permanent error from the underlying stream.
0321   bool failed_;
0322 
0323   // The current position of copying_stream_, relative to the point where
0324   // we started writing.
0325   int64_t position_;
0326 
0327   // Data is written from this buffer.  It may be NULL if no buffer is
0328   // currently in use.  Otherwise, it points to an array of size buffer_size_.
0329   std::unique_ptr<uint8_t[]> buffer_;
0330   const int buffer_size_;
0331 
0332   // Number of valid bytes currently in the buffer (i.e. the size last
0333   // returned by Next()).  When BackUp() is called, we just reduce this.
0334   // 0 <= buffer_used_ <= buffer_size_.
0335   int buffer_used_;
0336 };
0337 
0338 // ===================================================================
0339 
0340 // A ZeroCopyInputStream which wraps some other stream and limits it to
0341 // a particular byte count.
0342 class PROTOBUF_EXPORT LimitingInputStream final : public ZeroCopyInputStream {
0343  public:
0344   LimitingInputStream(ZeroCopyInputStream* input, int64_t limit);
0345   ~LimitingInputStream() override;
0346 
0347   // `LimitingInputStream` is neither copiable nor assignable
0348   LimitingInputStream(const LimitingInputStream&) = delete;
0349   LimitingInputStream& operator=(const LimitingInputStream&) = delete;
0350 
0351   // implements ZeroCopyInputStream ----------------------------------
0352   bool Next(const void** data, int* size) override;
0353   void BackUp(int count) override;
0354   bool Skip(int count) override;
0355   int64_t ByteCount() const override;
0356   bool ReadCord(absl::Cord* cord, int count) override;
0357 
0358 
0359  private:
0360   ZeroCopyInputStream* input_;
0361   int64_t limit_;  // Decreases as we go, becomes negative if we overshoot.
0362   int64_t prior_bytes_read_;  // Bytes read on underlying stream at construction
0363 };
0364 
0365 // ===================================================================
0366 
0367 // A ZeroCopyInputStream backed by a Cord.  This stream implements ReadCord()
0368 // in a way that can share memory between the source and destination cords
0369 // rather than copying.
0370 class PROTOBUF_EXPORT CordInputStream final : public ZeroCopyInputStream {
0371  public:
0372   // Creates an InputStream that reads from the given Cord. `cord` must
0373   // not be null and must outlive this CordInputStream instance. `cord` must
0374   // not be modified while this instance is actively being used: any change
0375   // to `cord` will lead to undefined behavior on any subsequent call into
0376   // this instance.
0377   explicit CordInputStream(
0378       const absl::Cord* cord ABSL_ATTRIBUTE_LIFETIME_BOUND);
0379 
0380 
0381   // `CordInputStream` is neither copiable nor assignable
0382   CordInputStream(const CordInputStream&) = delete;
0383   CordInputStream& operator=(const CordInputStream&) = delete;
0384 
0385   // implements ZeroCopyInputStream ----------------------------------
0386   bool Next(const void** data, int* size) override;
0387   void BackUp(int count) override;
0388   bool Skip(int count) override;
0389   int64_t ByteCount() const override;
0390   bool ReadCord(absl::Cord* cord, int count) override;
0391 
0392 
0393  private:
0394   // Moves `it_` to the next available chunk skipping `skip` extra bytes
0395   // and updates the chunk data pointers.
0396   bool NextChunk(size_t skip);
0397 
0398   // Updates the current chunk data context `data_`, `size_` and `available_`.
0399   // If `bytes_remaining_` is zero, sets `size_` and `available_` to zero.
0400   // Returns true if more data is available, false otherwise.
0401   bool LoadChunkData();
0402 
0403   absl::Cord::CharIterator it_;
0404   size_t length_;
0405   size_t bytes_remaining_;
0406   const char* data_;
0407   size_t size_;
0408   size_t available_;
0409 };
0410 
0411 // ===================================================================
0412 
0413 // A ZeroCopyOutputStream that writes to a Cord.  This stream implements
0414 // WriteCord() in a way that can share memory between the source and
0415 // destination cords rather than copying.
0416 class PROTOBUF_EXPORT CordOutputStream final : public ZeroCopyOutputStream {
0417  public:
0418   // Creates an OutputStream streaming serialized data into a Cord. `size_hint`,
0419   // if given, is the expected total size of the resulting Cord. This is a hint
0420   // only, used for optimization. Callers can obtain the generated Cord value by
0421   // invoking `Consume()`.
0422   explicit CordOutputStream(size_t size_hint = 0);
0423 
0424   // Creates an OutputStream with an initial Cord value. This constructor can be
0425   // used by applications wanting to directly append serialization data to a
0426   // given cord. In such cases, donating the existing value as in:
0427   //
0428   //   CordOutputStream stream(std::move(cord));
0429   //   message.SerializeToZeroCopyStream(&stream);
0430   //   cord = std::move(stream.Consume());
0431   //
0432   // is more efficient then appending the serialized cord in application code:
0433   //
0434   //   CordOutputStream stream;
0435   //   message.SerializeToZeroCopyStream(&stream);
0436   //   cord.Append(stream.Consume());
0437   //
0438   // The former allows `CordOutputStream` to utilize pre-existing privately
0439   // owned Cord buffers from the donated cord where the latter does not, which
0440   // may lead to more memory usage when serialuzing data into existing cords.
0441   explicit CordOutputStream(absl::Cord cord, size_t size_hint = 0);
0442 
0443   // Creates an OutputStream with an initial Cord value and initial buffer.
0444   // This donates both the preexisting cord in `cord`, as well as any
0445   // pre-existing data and additional capacity in `buffer`.
0446   // This function is mainly intended to be used in internal serialization logic
0447   // using eager buffer initialization in EpsCopyOutputStream.
0448   // The donated buffer can be empty, partially empty or full: the outputstream
0449   // will DTRT in all cases and preserve any pre-existing data.
0450   explicit CordOutputStream(absl::Cord cord, absl::CordBuffer buffer,
0451                             size_t size_hint = 0);
0452 
0453   // Creates an OutputStream with an initial buffer.
0454   // This method is logically identical to, but more efficient than:
0455   //   `CordOutputStream(absl::Cord(), std::move(buffer), size_hint)`
0456   explicit CordOutputStream(absl::CordBuffer buffer, size_t size_hint = 0);
0457 
0458   // `CordOutputStream` is neither copiable nor assignable
0459   CordOutputStream(const CordOutputStream&) = delete;
0460   CordOutputStream& operator=(const CordOutputStream&) = delete;
0461 
0462   // implements `ZeroCopyOutputStream` ---------------------------------
0463   bool Next(void** data, int* size) final;
0464   void BackUp(int count) final;
0465   int64_t ByteCount() const final;
0466   bool WriteCord(const absl::Cord& cord) final;
0467 
0468   // Consumes the serialized data as a cord value. `Consume()` internally
0469   // flushes any pending state 'as if' BackUp(0) was called. While a final call
0470   // to BackUp() is generally required by the `ZeroCopyOutputStream` contract,
0471   // applications using `CordOutputStream` directly can call `Consume()` without
0472   // a preceding call to `BackUp()`.
0473   //
0474   // While it will rarely be useful in practice (and especially in the presence
0475   // of size hints) an instance is safe to be used after a call to `Consume()`.
0476   // The only logical change in state is that all serialized data is extracted,
0477   // and any new serialization calls will serialize into new cord data.
0478   absl::Cord Consume();
0479 
0480  private:
0481   // State of `buffer_` and 'cord_. As a default CordBuffer instance always has
0482   // inlined capacity, we track state explicitly to avoid returning 'existing
0483   // capacity' from the default or 'moved from' CordBuffer. 'kSteal' indicates
0484   // we should (attempt to) steal the next buffer from the cord.
0485   enum class State { kEmpty, kFull, kPartial, kSteal };
0486 
0487   absl::Cord cord_;
0488   size_t size_hint_;
0489   State state_ = State::kEmpty;
0490   absl::CordBuffer buffer_;
0491 };
0492 
0493 
0494 // ===================================================================
0495 
0496 // Return a pointer to mutable characters underlying the given string.  The
0497 // return value is valid until the next time the string is resized.  We
0498 // trust the caller to treat the return value as an array of length s->size().
0499 inline char* mutable_string_data(std::string* s) {
0500   return &(*s)[0];
0501 }
0502 
0503 // as_string_data(s) is equivalent to
0504 //  ({ char* p = mutable_string_data(s); make_pair(p, p != NULL); })
0505 // Sometimes it's faster: in some scenarios p cannot be NULL, and then the
0506 // code can avoid that check.
0507 inline std::pair<char*, bool> as_string_data(std::string* s) {
0508   char* p = mutable_string_data(s);
0509   return std::make_pair(p, true);
0510 }
0511 
0512 }  // namespace io
0513 }  // namespace protobuf
0514 }  // namespace google
0515 
0516 #include "google/protobuf/port_undef.inc"
0517 
0518 #endif  // GOOGLE_PROTOBUF_IO_ZERO_COPY_STREAM_IMPL_LITE_H__