File indexing completed on 2026-04-17 08:35:03
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020 #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_
0021 #define _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_ 1
0022
0023 #include <cstdlib>
0024 #include <cstring>
0025 #include <string>
0026 #include <algorithm>
0027 #include <thrift/transport/TTransport.h>
0028
0029 #include <thrift/transport/TBufferTransports.h>
0030 #include <thrift/transport/TFileTransport.h>
0031
0032 namespace apache {
0033 namespace thrift {
0034 namespace transport {
0035
0036
0037
0038
0039
0040
0041
0042
0043 class TNullTransport : public TVirtualTransport<TNullTransport> {
0044 public:
0045 TNullTransport() = default;
0046
0047 ~TNullTransport() override = default;
0048
0049 bool isOpen() const override { return true; }
0050
0051 void open() override {}
0052
0053 void write(const uint8_t* , uint32_t ) { return; }
0054 };
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064 class TPipedTransport : virtual public TTransport {
0065 public:
0066 TPipedTransport(std::shared_ptr<TTransport> srcTrans, std::shared_ptr<TTransport> dstTrans,
0067 std::shared_ptr<TConfiguration> config = nullptr)
0068 : TTransport(config),
0069 srcTrans_(srcTrans),
0070 dstTrans_(dstTrans),
0071 rBufSize_(512),
0072 rPos_(0),
0073 rLen_(0),
0074 wBufSize_(512),
0075 wLen_(0) {
0076
0077
0078 pipeOnRead_ = true;
0079 pipeOnWrite_ = false;
0080
0081 rBuf_ = (uint8_t*)std::malloc(sizeof(uint8_t) * rBufSize_);
0082 if (rBuf_ == nullptr) {
0083 throw std::bad_alloc();
0084 }
0085 wBuf_ = (uint8_t*)std::malloc(sizeof(uint8_t) * wBufSize_);
0086 if (wBuf_ == nullptr) {
0087 throw std::bad_alloc();
0088 }
0089 }
0090
0091 TPipedTransport(std::shared_ptr<TTransport> srcTrans,
0092 std::shared_ptr<TTransport> dstTrans,
0093 uint32_t sz,
0094 std::shared_ptr<TConfiguration> config = nullptr)
0095 : TTransport(config),
0096 srcTrans_(srcTrans),
0097 dstTrans_(dstTrans),
0098 rBufSize_(512),
0099 rPos_(0),
0100 rLen_(0),
0101 wBufSize_(sz),
0102 wLen_(0) {
0103
0104 rBuf_ = (uint8_t*)std::malloc(sizeof(uint8_t) * rBufSize_);
0105 if (rBuf_ == nullptr) {
0106 throw std::bad_alloc();
0107 }
0108 wBuf_ = (uint8_t*)std::malloc(sizeof(uint8_t) * wBufSize_);
0109 if (wBuf_ == nullptr) {
0110 throw std::bad_alloc();
0111 }
0112 }
0113
0114 ~TPipedTransport() override {
0115 std::free(rBuf_);
0116 std::free(wBuf_);
0117 }
0118
0119 bool isOpen() const override { return srcTrans_->isOpen(); }
0120
0121 bool peek() override {
0122 if (rPos_ >= rLen_) {
0123
0124 if (rLen_ == rBufSize_) {
0125 rBufSize_ *= 2;
0126 auto * tmpBuf = (uint8_t*)std::realloc(rBuf_, sizeof(uint8_t) * rBufSize_);
0127 if (tmpBuf == nullptr) {
0128 throw std::bad_alloc();
0129 }
0130 rBuf_ = tmpBuf;
0131 }
0132
0133
0134 rLen_ += srcTrans_->read(rBuf_ + rPos_, rBufSize_ - rPos_);
0135 }
0136 return (rLen_ > rPos_);
0137 }
0138
0139 void open() override { srcTrans_->open(); }
0140
0141 void close() override { srcTrans_->close(); }
0142
0143 void setPipeOnRead(bool pipeVal) { pipeOnRead_ = pipeVal; }
0144
0145 void setPipeOnWrite(bool pipeVal) { pipeOnWrite_ = pipeVal; }
0146
0147 uint32_t read(uint8_t* buf, uint32_t len);
0148
0149 uint32_t readEnd() override {
0150
0151 if (pipeOnRead_) {
0152 dstTrans_->write(rBuf_, rPos_);
0153 dstTrans_->flush();
0154 }
0155
0156 srcTrans_->readEnd();
0157
0158
0159
0160 int read_ahead = rLen_ - rPos_;
0161 uint32_t bytes = rPos_;
0162 memcpy(rBuf_, rBuf_ + rPos_, read_ahead);
0163 rPos_ = 0;
0164 rLen_ = read_ahead;
0165
0166 return bytes;
0167 }
0168
0169 void write(const uint8_t* buf, uint32_t len);
0170
0171 uint32_t writeEnd() override {
0172 if (pipeOnWrite_) {
0173 dstTrans_->write(wBuf_, wLen_);
0174 dstTrans_->flush();
0175 }
0176 return wLen_;
0177 }
0178
0179 void flush() override;
0180
0181 std::shared_ptr<TTransport> getTargetTransport() { return dstTrans_; }
0182
0183
0184
0185
0186
0187
0188 uint32_t read_virt(uint8_t* buf, uint32_t len) override { return this->read(buf, len); }
0189 void write_virt(const uint8_t* buf, uint32_t len) override { this->write(buf, len); }
0190
0191 protected:
0192 std::shared_ptr<TTransport> srcTrans_;
0193 std::shared_ptr<TTransport> dstTrans_;
0194
0195 uint8_t* rBuf_;
0196 uint32_t rBufSize_;
0197 uint32_t rPos_;
0198 uint32_t rLen_;
0199
0200 uint8_t* wBuf_;
0201 uint32_t wBufSize_;
0202 uint32_t wLen_;
0203
0204 bool pipeOnRead_;
0205 bool pipeOnWrite_;
0206 };
0207
0208
0209
0210
0211
0212 class TPipedTransportFactory : public TTransportFactory {
0213 public:
0214 TPipedTransportFactory() = default;
0215 TPipedTransportFactory(std::shared_ptr<TTransport> dstTrans) {
0216 initializeTargetTransport(dstTrans);
0217 }
0218 ~TPipedTransportFactory() override = default;
0219
0220
0221
0222
0223 std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> srcTrans) override {
0224 return std::shared_ptr<TTransport>(new TPipedTransport(srcTrans, dstTrans_));
0225 }
0226
0227 virtual void initializeTargetTransport(std::shared_ptr<TTransport> dstTrans) {
0228 if (dstTrans_.get() == nullptr) {
0229 dstTrans_ = dstTrans;
0230 } else {
0231 throw TException("Target transport already initialized");
0232 }
0233 }
0234
0235 protected:
0236 std::shared_ptr<TTransport> dstTrans_;
0237 };
0238
0239
0240
0241
0242
0243
0244
0245 class TPipedFileReaderTransport : public TPipedTransport, public TFileReaderTransport {
0246 public:
0247 TPipedFileReaderTransport(std::shared_ptr<TFileReaderTransport> srcTrans,
0248 std::shared_ptr<TTransport> dstTrans,
0249 std::shared_ptr<TConfiguration> config = nullptr);
0250
0251 ~TPipedFileReaderTransport() override;
0252
0253
0254 bool isOpen() const override;
0255 bool peek() override;
0256 void open() override;
0257 void close() override;
0258 uint32_t read(uint8_t* buf, uint32_t len);
0259 uint32_t readAll(uint8_t* buf, uint32_t len);
0260 uint32_t readEnd() override;
0261 void write(const uint8_t* buf, uint32_t len);
0262 uint32_t writeEnd() override;
0263 void flush() override;
0264
0265
0266 int32_t getReadTimeout() override;
0267 void setReadTimeout(int32_t readTimeout) override;
0268 uint32_t getNumChunks() override;
0269 uint32_t getCurChunk() override;
0270 void seekToChunk(int32_t chunk) override;
0271 void seekToEnd() override;
0272
0273
0274
0275
0276
0277
0278 uint32_t read_virt(uint8_t* buf, uint32_t len) override { return this->read(buf, len); }
0279 uint32_t readAll_virt(uint8_t* buf, uint32_t len) override { return this->readAll(buf, len); }
0280 void write_virt(const uint8_t* buf, uint32_t len) override { this->write(buf, len); }
0281
0282 protected:
0283
0284 TPipedFileReaderTransport();
0285 std::shared_ptr<TFileReaderTransport> srcTrans_;
0286 };
0287
0288
0289
0290
0291
0292 class TPipedFileReaderTransportFactory : public TPipedTransportFactory {
0293 public:
0294 TPipedFileReaderTransportFactory() = default;
0295 TPipedFileReaderTransportFactory(std::shared_ptr<TTransport> dstTrans)
0296 : TPipedTransportFactory(dstTrans) {}
0297 ~TPipedFileReaderTransportFactory() override = default;
0298
0299 std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> srcTrans) override {
0300 std::shared_ptr<TFileReaderTransport> pFileReaderTransport
0301 = std::dynamic_pointer_cast<TFileReaderTransport>(srcTrans);
0302 if (pFileReaderTransport.get() != nullptr) {
0303 return getFileReaderTransport(pFileReaderTransport);
0304 } else {
0305 return std::shared_ptr<TTransport>();
0306 }
0307 }
0308
0309 std::shared_ptr<TFileReaderTransport> getFileReaderTransport(
0310 std::shared_ptr<TFileReaderTransport> srcTrans) {
0311 return std::shared_ptr<TFileReaderTransport>(
0312 new TPipedFileReaderTransport(srcTrans, dstTrans_));
0313 }
0314 };
0315 }
0316 }
0317 }
0318
0319 #endif