Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-17 08:35:04

0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one
0003  * or more contributor license agreements. See the NOTICE file
0004  * distributed with this work for additional information
0005  * regarding copyright ownership. The ASF licenses this file
0006  * to you under the Apache License, Version 2.0 (the
0007  * "License"); you may not use this file except in compliance
0008  * with the License. You may obtain a copy of the License at
0009  *
0010  *   http://www.apache.org/licenses/LICENSE-2.0
0011  *
0012  * Unless required by applicable law or agreed to in writing,
0013  * software distributed under the License is distributed on an
0014  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0015  * KIND, either express or implied. See the License for the
0016  * specific language governing permissions and limitations
0017  * under the License.
0018  */
0019 
0020 #ifndef _THRIFT_TRANSPORT_TZLIBTRANSPORT_H_
0021 #define _THRIFT_TRANSPORT_TZLIBTRANSPORT_H_ 1
0022 
0023 #include <thrift/transport/TTransport.h>
0024 #include <thrift/transport/TVirtualTransport.h>
0025 #include <thrift/TToString.h>
0026 #include <zlib.h>
0027 
0028 struct z_stream_s;
0029 
0030 namespace apache {
0031 namespace thrift {
0032 namespace transport {
0033 
0034 class TZlibTransportException : public TTransportException {
0035 public:
0036   TZlibTransportException(int status, const char* msg)
0037     : TTransportException(TTransportException::INTERNAL_ERROR, errorMessage(status, msg)),
0038       zlib_status_(status),
0039       zlib_msg_(msg == nullptr ? "(null)" : msg) {}
0040 
0041   ~TZlibTransportException() noexcept override = default;
0042 
0043   int getZlibStatus() { return zlib_status_; }
0044   std::string getZlibMessage() { return zlib_msg_; }
0045 
0046   static std::string errorMessage(int status, const char* msg) {
0047     std::string rv = "zlib error: ";
0048     if (msg) {
0049       rv += msg;
0050     } else {
0051       rv += "(no message)";
0052     }
0053     rv += " (status = ";
0054     rv += to_string(status);
0055     rv += ")";
0056     return rv;
0057   }
0058 
0059   int zlib_status_;
0060   std::string zlib_msg_;
0061 };
0062 
0063 /**
0064  * This transport uses zlib to compress on write and decompress on read
0065  *
0066  * TODO(dreiss): Don't do an extra copy of the compressed data if
0067  *               the underlying transport is TBuffered or TMemory.
0068  *
0069  */
0070 class TZlibTransport : public TVirtualTransport<TZlibTransport> {
0071 public:
0072   /**
0073    * @param transport    The transport to read compressed data from
0074    *                     and write compressed data to.
0075    * @param urbuf_size   Uncompressed buffer size for reading.
0076    * @param crbuf_size   Compressed buffer size for reading.
0077    * @param uwbuf_size   Uncompressed buffer size for writing.
0078    * @param cwbuf_size   Compressed buffer size for writing.
0079    * @param comp_level   Compression level (0=none[fast], 6=default, 9=max[slow]).
0080    */
0081   TZlibTransport(std::shared_ptr<TTransport> transport,
0082                  int urbuf_size = DEFAULT_URBUF_SIZE,
0083                  int crbuf_size = DEFAULT_CRBUF_SIZE,
0084                  int uwbuf_size = DEFAULT_UWBUF_SIZE,
0085                  int cwbuf_size = DEFAULT_CWBUF_SIZE,
0086                  int16_t comp_level = Z_DEFAULT_COMPRESSION,
0087                  std::shared_ptr<TConfiguration> config = nullptr)
0088     : TVirtualTransport(config),
0089       transport_(transport),
0090       urpos_(0),
0091       uwpos_(0),
0092       input_ended_(false),
0093       output_finished_(false),
0094       urbuf_size_(urbuf_size),
0095       crbuf_size_(crbuf_size),
0096       uwbuf_size_(uwbuf_size),
0097       cwbuf_size_(cwbuf_size),
0098       urbuf_(nullptr),
0099       crbuf_(nullptr),
0100       uwbuf_(nullptr),
0101       cwbuf_(nullptr),
0102       rstream_(nullptr),
0103       wstream_(nullptr),
0104       comp_level_(comp_level) {
0105     if (uwbuf_size_ < MIN_DIRECT_DEFLATE_SIZE) {
0106       // Have to copy this into a local because of a linking issue.
0107       int minimum = MIN_DIRECT_DEFLATE_SIZE;
0108       throw TTransportException(TTransportException::BAD_ARGS,
0109                                 "TZLibTransport: uncompressed write buffer must be at least"
0110                                 + to_string(minimum) + ".");
0111     }
0112 
0113     try {
0114       urbuf_ = new uint8_t[urbuf_size];
0115       crbuf_ = new uint8_t[crbuf_size];
0116       uwbuf_ = new uint8_t[uwbuf_size];
0117       cwbuf_ = new uint8_t[cwbuf_size];
0118 
0119       // Don't call this outside of the constructor.
0120       initZlib();
0121 
0122     } catch (...) {
0123       delete[] urbuf_;
0124       delete[] crbuf_;
0125       delete[] uwbuf_;
0126       delete[] cwbuf_;
0127       throw;
0128     }
0129   }
0130 
0131   // Don't call this outside of the constructor.
0132   void initZlib();
0133 
0134   /**
0135    * TZlibTransport destructor.
0136    *
0137    * Warning: Destroying a TZlibTransport object may discard any written but
0138    * unflushed data.  You must explicitly call flush() or finish() to ensure
0139    * that data is actually written and flushed to the underlying transport.
0140    */
0141   ~TZlibTransport() override;
0142 
0143   bool isOpen() const override;
0144   bool peek() override;
0145 
0146   void open() override { transport_->open(); }
0147 
0148   void close() override { transport_->close(); }
0149 
0150   uint32_t read(uint8_t* buf, uint32_t len);
0151 
0152   void write(const uint8_t* buf, uint32_t len);
0153 
0154   void flush() override;
0155 
0156   /**
0157    * Finalize the zlib stream.
0158    *
0159    * This causes zlib to flush any pending write data and write end-of-stream
0160    * information, including the checksum.  Once finish() has been called, no
0161    * new data can be written to the stream.
0162    */
0163   void finish();
0164 
0165   const uint8_t* borrow(uint8_t* buf, uint32_t* len);
0166 
0167   void consume(uint32_t len);
0168 
0169   /**
0170    * Verify the checksum at the end of the zlib stream.
0171    *
0172    * This may only be called after all data has been read.
0173    * It verifies the checksum that was written by the finish() call.
0174    */
0175   void verifyChecksum();
0176 
0177   /**
0178    * TODO(someone_smart): Choose smart defaults.
0179    */
0180   static const int DEFAULT_URBUF_SIZE = 128;
0181   static const int DEFAULT_CRBUF_SIZE = 1024;
0182   static const int DEFAULT_UWBUF_SIZE = 128;
0183   static const int DEFAULT_CWBUF_SIZE = 1024;
0184 
0185   std::shared_ptr<TTransport> getUnderlyingTransport() const { return transport_; }
0186 
0187 protected:
0188   inline void checkZlibRv(int status, const char* msg);
0189   inline void checkZlibRvNothrow(int status, const char* msg);
0190   inline int readAvail() const;
0191   void flushToTransport(int flush);
0192   void flushToZlib(const uint8_t* buf, int len, int flush);
0193   bool readFromZlib();
0194 
0195 protected:
0196   // Writes smaller than this are buffered up.
0197   // Larger (or equal) writes are dumped straight to zlib.
0198   static const uint32_t MIN_DIRECT_DEFLATE_SIZE = 32;
0199 
0200   std::shared_ptr<TTransport> transport_;
0201 
0202   int urpos_;
0203   int uwpos_;
0204 
0205   /// True iff zlib has reached the end of the input stream.
0206   bool input_ended_;
0207   /// True iff we have finished the output stream.
0208   bool output_finished_;
0209 
0210   uint32_t urbuf_size_;
0211   uint32_t crbuf_size_;
0212   uint32_t uwbuf_size_;
0213   uint32_t cwbuf_size_;
0214 
0215   uint8_t* urbuf_;
0216   uint8_t* crbuf_;
0217   uint8_t* uwbuf_;
0218   uint8_t* cwbuf_;
0219 
0220   struct z_stream_s* rstream_;
0221   struct z_stream_s* wstream_;
0222 
0223   const int comp_level_;
0224 };
0225 
0226 /**
0227  * Wraps a transport into a zlibbed one.
0228  *
0229  */
0230 class TZlibTransportFactory : public TTransportFactory {
0231 public:
0232   TZlibTransportFactory() = default;
0233 
0234   /**
0235    * Wraps a transport factory into a zlibbed one.
0236    */
0237   TZlibTransportFactory(std::shared_ptr<TTransportFactory> transportFactory);
0238 
0239   ~TZlibTransportFactory() override = default;
0240 
0241   std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) override;
0242 
0243 protected:
0244   std::shared_ptr<TTransportFactory> transportFactory_;
0245 };
0246 
0247 }
0248 }
0249 } // apache::thrift::transport
0250 
0251 #endif // #ifndef _THRIFT_TRANSPORT_TZLIBTRANSPORT_H_