File indexing completed on 2026-04-17 08:35:04
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020 #ifndef _THRIFT_TRANSPORT_TZLIBTRANSPORT_H_
0021 #define _THRIFT_TRANSPORT_TZLIBTRANSPORT_H_ 1
0022
0023 #include <thrift/transport/TTransport.h>
0024 #include <thrift/transport/TVirtualTransport.h>
0025 #include <thrift/TToString.h>
0026 #include <zlib.h>
0027
0028 struct z_stream_s;
0029
0030 namespace apache {
0031 namespace thrift {
0032 namespace transport {
0033
0034 class TZlibTransportException : public TTransportException {
0035 public:
0036 TZlibTransportException(int status, const char* msg)
0037 : TTransportException(TTransportException::INTERNAL_ERROR, errorMessage(status, msg)),
0038 zlib_status_(status),
0039 zlib_msg_(msg == nullptr ? "(null)" : msg) {}
0040
0041 ~TZlibTransportException() noexcept override = default;
0042
0043 int getZlibStatus() { return zlib_status_; }
0044 std::string getZlibMessage() { return zlib_msg_; }
0045
0046 static std::string errorMessage(int status, const char* msg) {
0047 std::string rv = "zlib error: ";
0048 if (msg) {
0049 rv += msg;
0050 } else {
0051 rv += "(no message)";
0052 }
0053 rv += " (status = ";
0054 rv += to_string(status);
0055 rv += ")";
0056 return rv;
0057 }
0058
0059 int zlib_status_;
0060 std::string zlib_msg_;
0061 };
0062
0063
0064
0065
0066
0067
0068
0069
0070 class TZlibTransport : public TVirtualTransport<TZlibTransport> {
0071 public:
0072
0073
0074
0075
0076
0077
0078
0079
0080
0081 TZlibTransport(std::shared_ptr<TTransport> transport,
0082 int urbuf_size = DEFAULT_URBUF_SIZE,
0083 int crbuf_size = DEFAULT_CRBUF_SIZE,
0084 int uwbuf_size = DEFAULT_UWBUF_SIZE,
0085 int cwbuf_size = DEFAULT_CWBUF_SIZE,
0086 int16_t comp_level = Z_DEFAULT_COMPRESSION,
0087 std::shared_ptr<TConfiguration> config = nullptr)
0088 : TVirtualTransport(config),
0089 transport_(transport),
0090 urpos_(0),
0091 uwpos_(0),
0092 input_ended_(false),
0093 output_finished_(false),
0094 urbuf_size_(urbuf_size),
0095 crbuf_size_(crbuf_size),
0096 uwbuf_size_(uwbuf_size),
0097 cwbuf_size_(cwbuf_size),
0098 urbuf_(nullptr),
0099 crbuf_(nullptr),
0100 uwbuf_(nullptr),
0101 cwbuf_(nullptr),
0102 rstream_(nullptr),
0103 wstream_(nullptr),
0104 comp_level_(comp_level) {
0105 if (uwbuf_size_ < MIN_DIRECT_DEFLATE_SIZE) {
0106
0107 int minimum = MIN_DIRECT_DEFLATE_SIZE;
0108 throw TTransportException(TTransportException::BAD_ARGS,
0109 "TZLibTransport: uncompressed write buffer must be at least"
0110 + to_string(minimum) + ".");
0111 }
0112
0113 try {
0114 urbuf_ = new uint8_t[urbuf_size];
0115 crbuf_ = new uint8_t[crbuf_size];
0116 uwbuf_ = new uint8_t[uwbuf_size];
0117 cwbuf_ = new uint8_t[cwbuf_size];
0118
0119
0120 initZlib();
0121
0122 } catch (...) {
0123 delete[] urbuf_;
0124 delete[] crbuf_;
0125 delete[] uwbuf_;
0126 delete[] cwbuf_;
0127 throw;
0128 }
0129 }
0130
0131
0132 void initZlib();
0133
0134
0135
0136
0137
0138
0139
0140
0141 ~TZlibTransport() override;
0142
0143 bool isOpen() const override;
0144 bool peek() override;
0145
0146 void open() override { transport_->open(); }
0147
0148 void close() override { transport_->close(); }
0149
0150 uint32_t read(uint8_t* buf, uint32_t len);
0151
0152 void write(const uint8_t* buf, uint32_t len);
0153
0154 void flush() override;
0155
0156
0157
0158
0159
0160
0161
0162
0163 void finish();
0164
0165 const uint8_t* borrow(uint8_t* buf, uint32_t* len);
0166
0167 void consume(uint32_t len);
0168
0169
0170
0171
0172
0173
0174
0175 void verifyChecksum();
0176
0177
0178
0179
0180 static const int DEFAULT_URBUF_SIZE = 128;
0181 static const int DEFAULT_CRBUF_SIZE = 1024;
0182 static const int DEFAULT_UWBUF_SIZE = 128;
0183 static const int DEFAULT_CWBUF_SIZE = 1024;
0184
0185 std::shared_ptr<TTransport> getUnderlyingTransport() const { return transport_; }
0186
0187 protected:
0188 inline void checkZlibRv(int status, const char* msg);
0189 inline void checkZlibRvNothrow(int status, const char* msg);
0190 inline int readAvail() const;
0191 void flushToTransport(int flush);
0192 void flushToZlib(const uint8_t* buf, int len, int flush);
0193 bool readFromZlib();
0194
0195 protected:
0196
0197
0198 static const uint32_t MIN_DIRECT_DEFLATE_SIZE = 32;
0199
0200 std::shared_ptr<TTransport> transport_;
0201
0202 int urpos_;
0203 int uwpos_;
0204
0205
0206 bool input_ended_;
0207
0208 bool output_finished_;
0209
0210 uint32_t urbuf_size_;
0211 uint32_t crbuf_size_;
0212 uint32_t uwbuf_size_;
0213 uint32_t cwbuf_size_;
0214
0215 uint8_t* urbuf_;
0216 uint8_t* crbuf_;
0217 uint8_t* uwbuf_;
0218 uint8_t* cwbuf_;
0219
0220 struct z_stream_s* rstream_;
0221 struct z_stream_s* wstream_;
0222
0223 const int comp_level_;
0224 };
0225
0226
0227
0228
0229
0230 class TZlibTransportFactory : public TTransportFactory {
0231 public:
0232 TZlibTransportFactory() = default;
0233
0234
0235
0236
0237 TZlibTransportFactory(std::shared_ptr<TTransportFactory> transportFactory);
0238
0239 ~TZlibTransportFactory() override = default;
0240
0241 std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) override;
0242
0243 protected:
0244 std::shared_ptr<TTransportFactory> transportFactory_;
0245 };
0246
0247 }
0248 }
0249 }
0250
0251 #endif