Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-17 08:35:03

0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one
0003  * or more contributor license agreements. See the NOTICE file
0004  * distributed with this work for additional information
0005  * regarding copyright ownership. The ASF licenses this file
0006  * to you under the Apache License, Version 2.0 (the
0007  * "License"); you may not use this file except in compliance
0008  * with the License. You may obtain a copy of the License at
0009  *
0010  *   http://www.apache.org/licenses/LICENSE-2.0
0011  *
0012  * Unless required by applicable law or agreed to in writing,
0013  * software distributed under the License is distributed on an
0014  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0015  * KIND, either express or implied. See the License for the
0016  * specific language governing permissions and limitations
0017  * under the License.
0018  */
0019 
0020 #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_
0021 #define _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_ 1
0022 
0023 #include <cstdlib>
0024 #include <cstring>
0025 #include <string>
0026 #include <algorithm>
0027 #include <thrift/transport/TTransport.h>
0028 // Include the buffered transports that used to be defined here.
0029 #include <thrift/transport/TBufferTransports.h>
0030 #include <thrift/transport/TFileTransport.h>
0031 
0032 namespace apache {
0033 namespace thrift {
0034 namespace transport {
0035 
0036 /**
0037  * The null transport is a dummy transport that doesn't actually do anything.
0038  * It's sort of an analogy to /dev/null, you can never read anything from it
0039  * and it will let you write anything you want to it, though it won't actually
0040  * go anywhere.
0041  *
0042  */
0043 class TNullTransport : public TVirtualTransport<TNullTransport> {
0044 public:
0045   TNullTransport() = default;
0046 
0047   ~TNullTransport() override = default;
0048 
0049   bool isOpen() const override { return true; }
0050 
0051   void open() override {}
0052 
0053   void write(const uint8_t* /* buf */, uint32_t /* len */) { return; }
0054 };
0055 
0056 /**
0057  * TPipedTransport. This transport allows piping of a request from one
0058  * transport to another either when readEnd() or writeEnd(). The typical
0059  * use case for this is to log a request or a reply to disk.
0060  * The underlying buffer expands to a keep a copy of the entire
0061  * request/response.
0062  *
0063  */
0064 class TPipedTransport : virtual public TTransport {
0065 public:
0066   TPipedTransport(std::shared_ptr<TTransport> srcTrans, std::shared_ptr<TTransport> dstTrans,
0067                  std::shared_ptr<TConfiguration> config = nullptr)
0068     : TTransport(config),
0069       srcTrans_(srcTrans),
0070       dstTrans_(dstTrans),
0071       rBufSize_(512),
0072       rPos_(0),
0073       rLen_(0),
0074       wBufSize_(512),
0075       wLen_(0) {
0076 
0077     // default is to to pipe the request when readEnd() is called
0078     pipeOnRead_ = true;
0079     pipeOnWrite_ = false;
0080 
0081     rBuf_ = (uint8_t*)std::malloc(sizeof(uint8_t) * rBufSize_);
0082     if (rBuf_ == nullptr) {
0083       throw std::bad_alloc();
0084     }
0085     wBuf_ = (uint8_t*)std::malloc(sizeof(uint8_t) * wBufSize_);
0086     if (wBuf_ == nullptr) {
0087       throw std::bad_alloc();
0088     }
0089   }
0090 
0091   TPipedTransport(std::shared_ptr<TTransport> srcTrans,
0092                   std::shared_ptr<TTransport> dstTrans,
0093                   uint32_t sz,
0094                   std::shared_ptr<TConfiguration> config = nullptr)
0095     : TTransport(config),
0096       srcTrans_(srcTrans),
0097       dstTrans_(dstTrans),
0098       rBufSize_(512),
0099       rPos_(0),
0100       rLen_(0),
0101       wBufSize_(sz),
0102       wLen_(0) {
0103 
0104     rBuf_ = (uint8_t*)std::malloc(sizeof(uint8_t) * rBufSize_);
0105     if (rBuf_ == nullptr) {
0106       throw std::bad_alloc();
0107     }
0108     wBuf_ = (uint8_t*)std::malloc(sizeof(uint8_t) * wBufSize_);
0109     if (wBuf_ == nullptr) {
0110       throw std::bad_alloc();
0111     }
0112   }
0113 
0114   ~TPipedTransport() override {
0115     std::free(rBuf_);
0116     std::free(wBuf_);
0117   }
0118 
0119   bool isOpen() const override { return srcTrans_->isOpen(); }
0120 
0121   bool peek() override {
0122     if (rPos_ >= rLen_) {
0123       // Double the size of the underlying buffer if it is full
0124       if (rLen_ == rBufSize_) {
0125         rBufSize_ *= 2;
0126         auto * tmpBuf = (uint8_t*)std::realloc(rBuf_, sizeof(uint8_t) * rBufSize_);
0127     if (tmpBuf == nullptr) {
0128       throw std::bad_alloc();
0129     }
0130     rBuf_ = tmpBuf;
0131       }
0132 
0133       // try to fill up the buffer
0134       rLen_ += srcTrans_->read(rBuf_ + rPos_, rBufSize_ - rPos_);
0135     }
0136     return (rLen_ > rPos_);
0137   }
0138 
0139   void open() override { srcTrans_->open(); }
0140 
0141   void close() override { srcTrans_->close(); }
0142 
0143   void setPipeOnRead(bool pipeVal) { pipeOnRead_ = pipeVal; }
0144 
0145   void setPipeOnWrite(bool pipeVal) { pipeOnWrite_ = pipeVal; }
0146 
0147   uint32_t read(uint8_t* buf, uint32_t len);
0148 
0149   uint32_t readEnd() override {
0150 
0151     if (pipeOnRead_) {
0152       dstTrans_->write(rBuf_, rPos_);
0153       dstTrans_->flush();
0154     }
0155 
0156     srcTrans_->readEnd();
0157 
0158     // If requests are being pipelined, copy down our read-ahead data,
0159     // then reset our state.
0160     int read_ahead = rLen_ - rPos_;
0161     uint32_t bytes = rPos_;
0162     memcpy(rBuf_, rBuf_ + rPos_, read_ahead);
0163     rPos_ = 0;
0164     rLen_ = read_ahead;
0165 
0166     return bytes;
0167   }
0168 
0169   void write(const uint8_t* buf, uint32_t len);
0170 
0171   uint32_t writeEnd() override {
0172     if (pipeOnWrite_) {
0173       dstTrans_->write(wBuf_, wLen_);
0174       dstTrans_->flush();
0175     }
0176     return wLen_;
0177   }
0178 
0179   void flush() override;
0180 
0181   std::shared_ptr<TTransport> getTargetTransport() { return dstTrans_; }
0182 
0183   /*
0184    * Override TTransport *_virt() functions to invoke our implementations.
0185    * We cannot use TVirtualTransport to provide these, since we need to inherit
0186    * virtually from TTransport.
0187    */
0188   uint32_t read_virt(uint8_t* buf, uint32_t len) override { return this->read(buf, len); }
0189   void write_virt(const uint8_t* buf, uint32_t len) override { this->write(buf, len); }
0190 
0191 protected:
0192   std::shared_ptr<TTransport> srcTrans_;
0193   std::shared_ptr<TTransport> dstTrans_;
0194 
0195   uint8_t* rBuf_;
0196   uint32_t rBufSize_;
0197   uint32_t rPos_;
0198   uint32_t rLen_;
0199 
0200   uint8_t* wBuf_;
0201   uint32_t wBufSize_;
0202   uint32_t wLen_;
0203 
0204   bool pipeOnRead_;
0205   bool pipeOnWrite_;
0206 };
0207 
0208 /**
0209  * Wraps a transport into a pipedTransport instance.
0210  *
0211  */
0212 class TPipedTransportFactory : public TTransportFactory {
0213 public:
0214   TPipedTransportFactory() = default;
0215   TPipedTransportFactory(std::shared_ptr<TTransport> dstTrans) {
0216     initializeTargetTransport(dstTrans);
0217   }
0218   ~TPipedTransportFactory() override = default;
0219 
0220   /**
0221    * Wraps the base transport into a piped transport.
0222    */
0223   std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> srcTrans) override {
0224     return std::shared_ptr<TTransport>(new TPipedTransport(srcTrans, dstTrans_));
0225   }
0226 
0227   virtual void initializeTargetTransport(std::shared_ptr<TTransport> dstTrans) {
0228     if (dstTrans_.get() == nullptr) {
0229       dstTrans_ = dstTrans;
0230     } else {
0231       throw TException("Target transport already initialized");
0232     }
0233   }
0234 
0235 protected:
0236   std::shared_ptr<TTransport> dstTrans_;
0237 };
0238 
0239 /**
0240  * TPipedFileTransport. This is just like a TTransport, except that
0241  * it is a templatized class, so that clients who rely on a specific
0242  * TTransport can still access the original transport.
0243  *
0244  */
0245 class TPipedFileReaderTransport : public TPipedTransport, public TFileReaderTransport {
0246 public:
0247   TPipedFileReaderTransport(std::shared_ptr<TFileReaderTransport> srcTrans,
0248                             std::shared_ptr<TTransport> dstTrans,
0249                             std::shared_ptr<TConfiguration> config = nullptr);
0250 
0251   ~TPipedFileReaderTransport() override;
0252 
0253   // TTransport functions
0254   bool isOpen() const override;
0255   bool peek() override;
0256   void open() override;
0257   void close() override;
0258   uint32_t read(uint8_t* buf, uint32_t len);
0259   uint32_t readAll(uint8_t* buf, uint32_t len);
0260   uint32_t readEnd() override;
0261   void write(const uint8_t* buf, uint32_t len);
0262   uint32_t writeEnd() override;
0263   void flush() override;
0264 
0265   // TFileReaderTransport functions
0266   int32_t getReadTimeout() override;
0267   void setReadTimeout(int32_t readTimeout) override;
0268   uint32_t getNumChunks() override;
0269   uint32_t getCurChunk() override;
0270   void seekToChunk(int32_t chunk) override;
0271   void seekToEnd() override;
0272 
0273   /*
0274    * Override TTransport *_virt() functions to invoke our implementations.
0275    * We cannot use TVirtualTransport to provide these, since we need to inherit
0276    * virtually from TTransport.
0277    */
0278   uint32_t read_virt(uint8_t* buf, uint32_t len) override { return this->read(buf, len); }
0279   uint32_t readAll_virt(uint8_t* buf, uint32_t len) override { return this->readAll(buf, len); }
0280   void write_virt(const uint8_t* buf, uint32_t len) override { this->write(buf, len); }
0281 
0282 protected:
0283   // shouldn't be used
0284   TPipedFileReaderTransport();
0285   std::shared_ptr<TFileReaderTransport> srcTrans_;
0286 };
0287 
0288 /**
0289  * Creates a TPipedFileReaderTransport from a filepath and a destination transport
0290  *
0291  */
0292 class TPipedFileReaderTransportFactory : public TPipedTransportFactory {
0293 public:
0294   TPipedFileReaderTransportFactory() = default;
0295   TPipedFileReaderTransportFactory(std::shared_ptr<TTransport> dstTrans)
0296     : TPipedTransportFactory(dstTrans) {}
0297   ~TPipedFileReaderTransportFactory() override = default;
0298 
0299   std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> srcTrans) override {
0300     std::shared_ptr<TFileReaderTransport> pFileReaderTransport
0301         = std::dynamic_pointer_cast<TFileReaderTransport>(srcTrans);
0302     if (pFileReaderTransport.get() != nullptr) {
0303       return getFileReaderTransport(pFileReaderTransport);
0304     } else {
0305       return std::shared_ptr<TTransport>();
0306     }
0307   }
0308 
0309   std::shared_ptr<TFileReaderTransport> getFileReaderTransport(
0310       std::shared_ptr<TFileReaderTransport> srcTrans) {
0311     return std::shared_ptr<TFileReaderTransport>(
0312         new TPipedFileReaderTransport(srcTrans, dstTrans_));
0313   }
0314 };
0315 }
0316 }
0317 } // apache::thrift::transport
0318 
0319 #endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_