File indexing completed on 2026-04-17 08:35:03
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020 #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_
0021 #define _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ 1
0022
0023 #include <cstdlib>
0024 #include <cstring>
0025 #include <limits>
0026
0027 #include <thrift/transport/TTransport.h>
0028 #include <thrift/transport/TVirtualTransport.h>
0029
0030 #ifdef __GNUC__
0031 #define TDB_LIKELY(val) (__builtin_expect((val), 1))
0032 #define TDB_UNLIKELY(val) (__builtin_expect((val), 0))
0033 #else
0034 #define TDB_LIKELY(val) (val)
0035 #define TDB_UNLIKELY(val) (val)
0036 #endif
0037
0038 namespace apache {
0039 namespace thrift {
0040 namespace transport {
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052 class TBufferBase : public TVirtualTransport<TBufferBase> {
0053
0054 public:
0055
0056
0057
0058
0059
0060
0061
0062
0063 uint32_t read(uint8_t* buf, uint32_t len) {
0064 checkReadBytesAvailable(len);
0065 uint8_t* new_rBase = rBase_ + len;
0066 if (TDB_LIKELY(new_rBase <= rBound_)) {
0067 std::memcpy(buf, rBase_, len);
0068 rBase_ = new_rBase;
0069 return len;
0070 }
0071 return readSlow(buf, len);
0072 }
0073
0074
0075
0076
0077 uint32_t readAll(uint8_t* buf, uint32_t len) {
0078 uint8_t* new_rBase = rBase_ + len;
0079 if (TDB_LIKELY(new_rBase <= rBound_)) {
0080 std::memcpy(buf, rBase_, len);
0081 rBase_ = new_rBase;
0082 return len;
0083 }
0084 return apache::thrift::transport::readAll(*this, buf, len);
0085 }
0086
0087
0088
0089
0090
0091
0092
0093
0094
0095
0096 void write(const uint8_t* buf, uint32_t len) {
0097 uint8_t* new_wBase = wBase_ + len;
0098 if (TDB_LIKELY(new_wBase <= wBound_)) {
0099 std::memcpy(wBase_, buf, len);
0100 wBase_ = new_wBase;
0101 return;
0102 }
0103 writeSlow(buf, len);
0104 }
0105
0106
0107
0108
0109 const uint8_t* borrow(uint8_t* buf, uint32_t* len) {
0110 if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) {
0111
0112
0113 *len = static_cast<uint32_t>(rBound_ - rBase_);
0114 return rBase_;
0115 }
0116 return borrowSlow(buf, len);
0117 }
0118
0119
0120
0121
0122 void consume(uint32_t len) {
0123 countConsumedMessageBytes(len);
0124 if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) {
0125 rBase_ += len;
0126 } else {
0127 throw TTransportException(TTransportException::BAD_ARGS, "consume did not follow a borrow.");
0128 }
0129 }
0130
0131 protected:
0132
0133 virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0;
0134
0135
0136 virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0;
0137
0138
0139
0140
0141
0142
0143 virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0;
0144
0145
0146
0147
0148
0149
0150
0151
0152 TBufferBase(std::shared_ptr<TConfiguration> config = nullptr)
0153 : TVirtualTransport(config), rBase_(nullptr), rBound_(nullptr), wBase_(nullptr), wBound_(nullptr) {}
0154
0155
0156 void setReadBuffer(uint8_t* buf, uint32_t len) {
0157 rBase_ = buf;
0158 rBound_ = buf + len;
0159 }
0160
0161
0162 void setWriteBuffer(uint8_t* buf, uint32_t len) {
0163 wBase_ = buf;
0164 wBound_ = buf + len;
0165 }
0166
0167 ~TBufferBase() override = default;
0168
0169
0170 uint8_t* rBase_;
0171
0172 uint8_t* rBound_;
0173
0174
0175 uint8_t* wBase_;
0176
0177 uint8_t* wBound_;
0178 };
0179
0180
0181
0182
0183
0184
0185
0186 class TBufferedTransport : public TVirtualTransport<TBufferedTransport, TBufferBase> {
0187 public:
0188 static const int DEFAULT_BUFFER_SIZE = 512;
0189
0190
0191 TBufferedTransport(std::shared_ptr<TTransport> transport, std::shared_ptr<TConfiguration> config = nullptr)
0192 : TVirtualTransport(config),
0193 transport_(transport),
0194 rBufSize_(DEFAULT_BUFFER_SIZE),
0195 wBufSize_(DEFAULT_BUFFER_SIZE),
0196 rBuf_(new uint8_t[rBufSize_]),
0197 wBuf_(new uint8_t[wBufSize_]) {
0198 initPointers();
0199 }
0200
0201
0202 TBufferedTransport(std::shared_ptr<TTransport> transport, uint32_t sz, std::shared_ptr<TConfiguration> config = nullptr)
0203 : TVirtualTransport(config),
0204 transport_(transport),
0205 rBufSize_(sz),
0206 wBufSize_(sz),
0207 rBuf_(new uint8_t[rBufSize_]),
0208 wBuf_(new uint8_t[wBufSize_]) {
0209 initPointers();
0210 }
0211
0212
0213 TBufferedTransport(std::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz,
0214 std::shared_ptr<TConfiguration> config = nullptr)
0215 : TVirtualTransport(config),
0216 transport_(transport),
0217 rBufSize_(rsz),
0218 wBufSize_(wsz),
0219 rBuf_(new uint8_t[rBufSize_]),
0220 wBuf_(new uint8_t[wBufSize_]) {
0221 initPointers();
0222 }
0223
0224 void open() override { transport_->open(); }
0225
0226 bool isOpen() const override { return transport_->isOpen(); }
0227
0228 bool peek() override {
0229 if (rBase_ == rBound_) {
0230 setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));
0231 }
0232 return (rBound_ > rBase_);
0233 }
0234
0235 void close() override {
0236 flush();
0237 transport_->close();
0238 }
0239
0240 uint32_t readSlow(uint8_t* buf, uint32_t len) override;
0241
0242 void writeSlow(const uint8_t* buf, uint32_t len) override;
0243
0244 void flush() override;
0245
0246
0247
0248
0249 const std::string getOrigin() const override { return transport_->getOrigin(); }
0250
0251
0252
0253
0254
0255
0256
0257
0258
0259
0260
0261
0262 const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) override;
0263
0264 std::shared_ptr<TTransport> getUnderlyingTransport() { return transport_; }
0265
0266
0267
0268
0269
0270 uint32_t readAll(uint8_t* buf, uint32_t len) { return TBufferBase::readAll(buf, len); }
0271
0272 uint32_t readEnd() override {
0273 resetConsumedMessageSize();
0274 return 0;
0275 }
0276
0277 protected:
0278 void initPointers() {
0279 setReadBuffer(rBuf_.get(), 0);
0280 setWriteBuffer(wBuf_.get(), wBufSize_);
0281
0282 }
0283
0284 std::shared_ptr<TTransport> transport_;
0285
0286 uint32_t rBufSize_;
0287 uint32_t wBufSize_;
0288 std::unique_ptr<uint8_t[]> rBuf_;
0289 std::unique_ptr<uint8_t[]> wBuf_;
0290 };
0291
0292
0293
0294
0295
0296 class TBufferedTransportFactory : public TTransportFactory {
0297 public:
0298 TBufferedTransportFactory() = default;
0299
0300 ~TBufferedTransportFactory() override = default;
0301
0302
0303
0304
0305 std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) override {
0306 return std::shared_ptr<TTransport>(new TBufferedTransport(trans));
0307 }
0308 };
0309
0310
0311
0312
0313
0314
0315
0316
0317 class TFramedTransport : public TVirtualTransport<TFramedTransport, TBufferBase> {
0318 public:
0319 static const int DEFAULT_BUFFER_SIZE = 512;
0320 static const int DEFAULT_MAX_FRAME_SIZE = 256 * 1024 * 1024;
0321
0322
0323 TFramedTransport(std::shared_ptr<TConfiguration> config = nullptr)
0324 : TVirtualTransport(config),
0325 transport_(),
0326 rBufSize_(0),
0327 wBufSize_(DEFAULT_BUFFER_SIZE),
0328 rBuf_(),
0329 wBuf_(new uint8_t[wBufSize_]),
0330 bufReclaimThresh_((std::numeric_limits<uint32_t>::max)()) {
0331 initPointers();
0332 }
0333
0334 TFramedTransport(std::shared_ptr<TTransport> transport, std::shared_ptr<TConfiguration> config = nullptr)
0335 : TVirtualTransport(config),
0336 transport_(transport),
0337 rBufSize_(0),
0338 wBufSize_(DEFAULT_BUFFER_SIZE),
0339 rBuf_(),
0340 wBuf_(new uint8_t[wBufSize_]),
0341 bufReclaimThresh_((std::numeric_limits<uint32_t>::max)()),
0342 maxFrameSize_(configuration_->getMaxFrameSize()) {
0343 initPointers();
0344 }
0345
0346 TFramedTransport(std::shared_ptr<TTransport> transport,
0347 uint32_t sz,
0348 uint32_t bufReclaimThresh = (std::numeric_limits<uint32_t>::max)(),
0349 std::shared_ptr<TConfiguration> config = nullptr)
0350 : TVirtualTransport(config),
0351 transport_(transport),
0352 rBufSize_(0),
0353 wBufSize_(sz),
0354 rBuf_(),
0355 wBuf_(new uint8_t[wBufSize_]),
0356 bufReclaimThresh_(bufReclaimThresh),
0357 maxFrameSize_(configuration_->getMaxFrameSize()) {
0358 initPointers();
0359 }
0360
0361 void open() override { transport_->open(); }
0362
0363 bool isOpen() const override { return transport_->isOpen(); }
0364
0365 bool peek() override { return (rBase_ < rBound_) || transport_->peek(); }
0366
0367 void close() override {
0368 flush();
0369 transport_->close();
0370 }
0371
0372 uint32_t readSlow(uint8_t* buf, uint32_t len) override;
0373
0374 void writeSlow(const uint8_t* buf, uint32_t len) override;
0375
0376 void flush() override;
0377
0378 uint32_t readEnd() override;
0379
0380 uint32_t writeEnd() override;
0381
0382 const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) override;
0383
0384 std::shared_ptr<TTransport> getUnderlyingTransport() { return transport_; }
0385
0386
0387
0388
0389
0390 using TBufferBase::readAll;
0391
0392
0393
0394
0395 const std::string getOrigin() const override { return transport_->getOrigin(); }
0396
0397
0398
0399
0400 void setMaxFrameSize(uint32_t maxFrameSize) { maxFrameSize_ = maxFrameSize; }
0401
0402
0403
0404
0405 uint32_t getMaxFrameSize() { return maxFrameSize_; }
0406
0407 protected:
0408
0409
0410
0411
0412
0413
0414 virtual bool readFrame();
0415
0416 void initPointers() {
0417 setReadBuffer(nullptr, 0);
0418 setWriteBuffer(wBuf_.get(), wBufSize_);
0419
0420
0421 int32_t pad = 0;
0422 this->write((uint8_t*)&pad, sizeof(pad));
0423 }
0424
0425 std::shared_ptr<TTransport> transport_;
0426
0427 uint32_t rBufSize_;
0428 uint32_t wBufSize_;
0429 std::unique_ptr<uint8_t[]> rBuf_;
0430 std::unique_ptr<uint8_t[]> wBuf_;
0431 uint32_t bufReclaimThresh_;
0432 uint32_t maxFrameSize_;
0433 };
0434
0435
0436
0437
0438
0439 class TFramedTransportFactory : public TTransportFactory {
0440 public:
0441 TFramedTransportFactory() = default;
0442
0443 ~TFramedTransportFactory() override = default;
0444
0445
0446
0447
0448 std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) override {
0449 return std::shared_ptr<TTransport>(new TFramedTransport(trans));
0450 }
0451 };
0452
0453
0454
0455
0456
0457
0458
0459
0460
0461
0462 class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> {
0463 private:
0464
0465 void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) {
0466
0467 maxBufferSize_ = (std::numeric_limits<uint32_t>::max)();
0468
0469 if (buf == nullptr && size != 0) {
0470 assert(owner);
0471 buf = (uint8_t*)std::malloc(size);
0472 if (buf == nullptr) {
0473 throw std::bad_alloc();
0474 }
0475 }
0476
0477 buffer_ = buf;
0478 bufferSize_ = size;
0479
0480 rBase_ = buffer_;
0481 rBound_ = buffer_ + wPos;
0482
0483 wBase_ = buffer_ + wPos;
0484 wBound_ = buffer_ + bufferSize_;
0485
0486 owner_ = owner;
0487
0488
0489
0490 }
0491
0492 public:
0493 static const uint32_t defaultSize = 1024;
0494
0495
0496
0497
0498
0499
0500
0501
0502
0503
0504
0505
0506
0507
0508
0509
0510
0511
0512
0513
0514
0515 enum MemoryPolicy { OBSERVE = 1, COPY = 2, TAKE_OWNERSHIP = 3 };
0516
0517
0518
0519
0520
0521 TMemoryBuffer(std::shared_ptr<TConfiguration> config = nullptr)
0522 : TVirtualTransport(config) {
0523 initCommon(nullptr, defaultSize, true, 0);
0524 }
0525
0526
0527
0528
0529
0530
0531
0532 TMemoryBuffer(uint32_t sz, std::shared_ptr<TConfiguration> config = nullptr)
0533 : TVirtualTransport(config) {
0534 initCommon(nullptr, sz, true, 0);
0535 }
0536
0537
0538
0539
0540
0541
0542
0543
0544
0545
0546
0547 TMemoryBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE, std::shared_ptr<TConfiguration> config = nullptr)
0548 : TVirtualTransport(config) {
0549 if (buf == nullptr && sz != 0) {
0550 throw TTransportException(TTransportException::BAD_ARGS,
0551 "TMemoryBuffer given null buffer with non-zero size.");
0552 }
0553
0554 switch (policy) {
0555 case OBSERVE:
0556 case TAKE_OWNERSHIP:
0557 initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz);
0558 break;
0559 case COPY:
0560 initCommon(nullptr, sz, true, 0);
0561 this->write(buf, sz);
0562 break;
0563 default:
0564 throw TTransportException(TTransportException::BAD_ARGS,
0565 "Invalid MemoryPolicy for TMemoryBuffer");
0566 }
0567 }
0568
0569 ~TMemoryBuffer() override {
0570 if (owner_) {
0571 std::free(buffer_);
0572 }
0573 }
0574
0575 bool isOpen() const override { return true; }
0576
0577 bool peek() override { return (rBase_ < wBase_); }
0578
0579 void open() override {}
0580
0581 void close() override {}
0582
0583
0584 void getBuffer(uint8_t** bufPtr, uint32_t* sz) {
0585 *bufPtr = rBase_;
0586 *sz = static_cast<uint32_t>(wBase_ - rBase_);
0587 }
0588
0589 std::string getBufferAsString() {
0590 if (buffer_ == nullptr) {
0591 return "";
0592 }
0593 uint8_t* buf;
0594 uint32_t sz;
0595 getBuffer(&buf, &sz);
0596 return std::string((char*)buf, (std::string::size_type)sz);
0597 }
0598
0599 void appendBufferToString(std::string& str) {
0600 if (buffer_ == nullptr) {
0601 return;
0602 }
0603 uint8_t* buf;
0604 uint32_t sz;
0605 getBuffer(&buf, &sz);
0606 str.append((char*)buf, sz);
0607 }
0608
0609 void resetBuffer() {
0610 rBase_ = buffer_;
0611 rBound_ = buffer_;
0612 wBase_ = buffer_;
0613
0614 if (!owner_) {
0615 wBound_ = wBase_;
0616 bufferSize_ = 0;
0617 }
0618 }
0619
0620
0621 void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {
0622
0623
0624
0625
0626
0627
0628
0629
0630
0631
0632
0633
0634
0635 TMemoryBuffer new_buffer(buf, sz, policy);
0636
0637 this->swap(new_buffer);
0638
0639 }
0640
0641
0642 void resetBuffer(uint32_t sz) {
0643
0644 TMemoryBuffer new_buffer(sz);
0645
0646 this->swap(new_buffer);
0647
0648 }
0649
0650 std::string readAsString(uint32_t len) {
0651 std::string str;
0652 (void)readAppendToString(str, len);
0653 return str;
0654 }
0655
0656 uint32_t readAppendToString(std::string& str, uint32_t len);
0657
0658
0659 uint32_t readEnd() override {
0660
0661 auto bytes = static_cast<uint32_t>(rBase_ - buffer_);
0662 if (rBase_ == wBase_) {
0663 resetBuffer();
0664 }
0665 resetConsumedMessageSize();
0666 return bytes;
0667 }
0668
0669
0670 uint32_t writeEnd() override {
0671
0672 return static_cast<uint32_t>(wBase_ - buffer_);
0673 }
0674
0675 uint32_t available_read() const {
0676
0677 return static_cast<uint32_t>(wBase_ - rBase_);
0678 }
0679
0680 uint32_t available_write() const { return static_cast<uint32_t>(wBound_ - wBase_); }
0681
0682
0683
0684
0685
0686
0687 uint8_t* getWritePtr(uint32_t len) {
0688 ensureCanWrite(len);
0689 return wBase_;
0690 }
0691
0692
0693
0694 void wroteBytes(uint32_t len);
0695
0696
0697
0698
0699
0700 uint32_t readAll(uint8_t* buf, uint32_t len) { return TBufferBase::readAll(buf, len); }
0701
0702
0703
0704 uint32_t getBufferSize() const {
0705 return bufferSize_;
0706 }
0707
0708
0709
0710 uint32_t getMaxBufferSize() const {
0711 return maxBufferSize_;
0712 }
0713
0714
0715
0716
0717 void setMaxBufferSize(uint32_t maxSize) {
0718 if (maxSize < bufferSize_) {
0719 throw TTransportException(TTransportException::BAD_ARGS,
0720 "Maximum buffer size would be less than current buffer size");
0721 }
0722 maxBufferSize_ = maxSize;
0723 }
0724
0725 protected:
0726 void swap(TMemoryBuffer& that) {
0727 using std::swap;
0728 swap(buffer_, that.buffer_);
0729 swap(bufferSize_, that.bufferSize_);
0730
0731 swap(rBase_, that.rBase_);
0732 swap(rBound_, that.rBound_);
0733 swap(wBase_, that.wBase_);
0734 swap(wBound_, that.wBound_);
0735
0736 swap(owner_, that.owner_);
0737 }
0738
0739
0740 void ensureCanWrite(uint32_t len);
0741
0742
0743 void computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give);
0744
0745 uint32_t readSlow(uint8_t* buf, uint32_t len) override;
0746
0747 void writeSlow(const uint8_t* buf, uint32_t len) override;
0748
0749 const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) override;
0750
0751
0752 uint8_t* buffer_;
0753
0754
0755 uint32_t bufferSize_;
0756
0757
0758 uint32_t maxBufferSize_;
0759
0760
0761 bool owner_;
0762
0763
0764
0765 };
0766 }
0767 }
0768 }
0769
0770 #endif