File indexing completed on 2026-04-17 08:35:03
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020 #ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_
0021 #define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1
0022
0023 #include <thrift/transport/TTransport.h>
0024 #include <thrift/Thrift.h>
0025 #include <thrift/TProcessor.h>
0026
0027 #include <atomic>
0028 #include <string>
0029 #include <stdio.h>
0030
0031 #include <thrift/concurrency/Mutex.h>
0032 #include <thrift/concurrency/Monitor.h>
0033 #include <thrift/concurrency/ThreadFactory.h>
0034 #include <thrift/concurrency/Thread.h>
0035
0036 namespace apache {
0037 namespace thrift {
0038 namespace transport {
0039
0040 using apache::thrift::TProcessor;
0041 using apache::thrift::protocol::TProtocolFactory;
0042 using apache::thrift::concurrency::Mutex;
0043 using apache::thrift::concurrency::Monitor;
0044
0045
0046 typedef struct eventInfo {
0047 uint8_t* eventBuff_;
0048 uint32_t eventSize_;
0049 uint32_t eventBuffPos_;
0050
0051 eventInfo() : eventBuff_(nullptr), eventSize_(0), eventBuffPos_(0){};
0052 ~eventInfo() {
0053 if (eventBuff_) {
0054 delete[] eventBuff_;
0055 }
0056 }
0057 } eventInfo;
0058
0059
0060 typedef struct readState {
0061 eventInfo* event_;
0062
0063
0064 uint8_t eventSizeBuff_[4];
0065 uint8_t eventSizeBuffPos_;
0066 bool readingSize_;
0067
0068
0069 int32_t bufferPtr_;
0070 int32_t bufferLen_;
0071
0072
0073 int32_t lastDispatchPtr_;
0074
0075 void resetState(uint32_t lastDispatchPtr) {
0076 readingSize_ = true;
0077 eventSizeBuffPos_ = 0;
0078 lastDispatchPtr_ = lastDispatchPtr;
0079 }
0080
0081 void resetAllValues() {
0082 resetState(0);
0083 bufferPtr_ = 0;
0084 bufferLen_ = 0;
0085 if (event_) {
0086 delete (event_);
0087 }
0088 event_ = nullptr;
0089 }
0090
0091 inline uint32_t getEventSize() {
0092 const void* buffer = reinterpret_cast<const void*>(eventSizeBuff_);
0093 return *reinterpret_cast<const uint32_t*>(buffer);
0094 }
0095
0096 readState() {
0097 event_ = nullptr;
0098 resetAllValues();
0099 }
0100
0101 ~readState() {
0102 if (event_) {
0103 delete (event_);
0104 }
0105 }
0106
0107 } readState;
0108
0109
0110
0111
0112
0113
0114
0115
0116
0117
0118
0119
0120
0121
0122
0123 class TFileTransportBuffer {
0124 public:
0125 TFileTransportBuffer(uint32_t size);
0126 virtual ~TFileTransportBuffer();
0127
0128 bool addEvent(eventInfo* event);
0129 eventInfo* getNext();
0130 void reset();
0131 bool isFull();
0132 bool isEmpty();
0133
0134 private:
0135 TFileTransportBuffer();
0136
0137 enum mode { WRITE, READ };
0138 mode bufferMode_;
0139
0140 uint32_t writePoint_;
0141 uint32_t readPoint_;
0142 uint32_t size_;
0143 eventInfo** buffer_;
0144 };
0145
0146
0147
0148
0149 class TFileReaderTransport : virtual public TTransport {
0150 public:
0151 virtual int32_t getReadTimeout() = 0;
0152 virtual void setReadTimeout(int32_t readTimeout) = 0;
0153
0154 virtual uint32_t getNumChunks() = 0;
0155 virtual uint32_t getCurChunk() = 0;
0156 virtual void seekToChunk(int32_t chunk) = 0;
0157 virtual void seekToEnd() = 0;
0158 };
0159
0160
0161
0162
0163 class TFileWriterTransport : virtual public TTransport {
0164 public:
0165 virtual uint32_t getChunkSize() = 0;
0166 virtual void setChunkSize(uint32_t chunkSize) = 0;
0167 };
0168
0169
0170
0171
0172
0173
0174 class TFileTransport : public TFileReaderTransport, public TFileWriterTransport {
0175 public:
0176 TFileTransport(std::string path, bool readOnly = false, std::shared_ptr<TConfiguration> config = nullptr);
0177 ~TFileTransport() override;
0178
0179
0180
0181 bool isOpen() const override { return true; }
0182
0183 void write(const uint8_t* buf, uint32_t len);
0184 void flush() override;
0185
0186 uint32_t readAll(uint8_t* buf, uint32_t len);
0187 uint32_t read(uint8_t* buf, uint32_t len);
0188 bool peek() override;
0189
0190
0191 void seekToChunk(int32_t chunk) override;
0192 void seekToEnd() override;
0193 uint32_t getNumChunks() override;
0194 uint32_t getCurChunk() override;
0195
0196
0197 void resetOutputFile(int fd, std::string filename, off_t offset);
0198
0199
0200 void setReadBuffSize(uint32_t readBuffSize) {
0201 if (readBuffSize) {
0202 readBuffSize_ = readBuffSize;
0203 }
0204 }
0205 uint32_t getReadBuffSize() { return readBuffSize_; }
0206
0207 static const int32_t TAIL_READ_TIMEOUT = -1;
0208 static const int32_t NO_TAIL_READ_TIMEOUT = 0;
0209 void setReadTimeout(int32_t readTimeout) override { readTimeout_ = readTimeout; }
0210 int32_t getReadTimeout() override { return readTimeout_; }
0211
0212 void setChunkSize(uint32_t chunkSize) override {
0213 if (chunkSize) {
0214 chunkSize_ = chunkSize;
0215 }
0216 }
0217 uint32_t getChunkSize() override { return chunkSize_; }
0218
0219 void setEventBufferSize(uint32_t bufferSize) {
0220 if (bufferAndThreadInitialized_) {
0221 GlobalOutput("Cannot change the buffer size after writer thread started");
0222 return;
0223 }
0224 eventBufferSize_ = bufferSize;
0225 }
0226
0227 uint32_t getEventBufferSize() { return eventBufferSize_; }
0228
0229 void setFlushMaxUs(uint32_t flushMaxUs) {
0230 if (flushMaxUs) {
0231 flushMaxUs_ = flushMaxUs;
0232 }
0233 }
0234 uint32_t getFlushMaxUs() { return flushMaxUs_; }
0235
0236 void setFlushMaxBytes(uint32_t flushMaxBytes) {
0237 if (flushMaxBytes) {
0238 flushMaxBytes_ = flushMaxBytes;
0239 }
0240 }
0241 uint32_t getFlushMaxBytes() { return flushMaxBytes_; }
0242
0243 void setMaxEventSize(uint32_t maxEventSize) { maxEventSize_ = maxEventSize; }
0244 uint32_t getMaxEventSize() { return maxEventSize_; }
0245
0246 void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) {
0247 maxCorruptedEvents_ = maxCorruptedEvents;
0248 }
0249 uint32_t getMaxCorruptedEvents() { return maxCorruptedEvents_; }
0250
0251 void setEofSleepTimeUs(uint32_t eofSleepTime) {
0252 if (eofSleepTime) {
0253 eofSleepTime_ = eofSleepTime;
0254 }
0255 }
0256 uint32_t getEofSleepTimeUs() { return eofSleepTime_; }
0257
0258
0259
0260
0261
0262
0263 uint32_t read_virt(uint8_t* buf, uint32_t len) override { return this->read(buf, len); }
0264 uint32_t readAll_virt(uint8_t* buf, uint32_t len) override { return this->readAll(buf, len); }
0265 void write_virt(const uint8_t* buf, uint32_t len) override { this->write(buf, len); }
0266
0267 private:
0268
0269 void enqueueEvent(const uint8_t* buf, uint32_t eventLen);
0270 bool swapEventBuffers(const std::chrono::time_point<std::chrono::steady_clock> *deadline);
0271 bool initBufferAndWriteThread();
0272
0273
0274 static void* startWriterThread(void* ptr) {
0275 static_cast<TFileTransport*>(ptr)->writerThread();
0276 return nullptr;
0277 }
0278 void writerThread();
0279
0280
0281 eventInfo* readEvent();
0282
0283
0284 bool isEventCorrupted();
0285 void performRecovery();
0286
0287
0288 void openLogFile();
0289 std::chrono::time_point<std::chrono::steady_clock> getNextFlushTime();
0290
0291
0292 readState readState_;
0293 uint8_t* readBuff_;
0294 eventInfo* currentEvent_;
0295
0296 uint32_t readBuffSize_;
0297 static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024;
0298
0299 int32_t readTimeout_;
0300 static const int32_t DEFAULT_READ_TIMEOUT_MS = 200;
0301
0302
0303 uint32_t chunkSize_;
0304 static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
0305
0306
0307 uint32_t eventBufferSize_;
0308 static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 10000;
0309
0310
0311 uint32_t flushMaxUs_;
0312 static const uint32_t DEFAULT_FLUSH_MAX_US = 3000000;
0313
0314
0315 uint32_t flushMaxBytes_;
0316 static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024;
0317
0318
0319 uint32_t maxEventSize_;
0320 static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0;
0321
0322
0323 uint32_t maxCorruptedEvents_;
0324 static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0;
0325
0326
0327 uint32_t eofSleepTime_;
0328 static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000;
0329
0330
0331 uint32_t corruptedEventSleepTime_;
0332 static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000;
0333
0334
0335 uint32_t writerThreadIOErrorSleepTime_;
0336 static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000;
0337
0338
0339 apache::thrift::concurrency::ThreadFactory threadFactory_;
0340 std::shared_ptr<apache::thrift::concurrency::Thread> writerThread_;
0341
0342
0343
0344 TFileTransportBuffer* dequeueBuffer_;
0345 TFileTransportBuffer* enqueueBuffer_;
0346
0347
0348 Monitor notFull_, notEmpty_;
0349 std::atomic<bool> closing_;
0350
0351
0352 Monitor flushed_;
0353 std::atomic<bool> forceFlush_;
0354
0355
0356 Mutex mutex_;
0357
0358
0359 std::string filename_;
0360 int fd_;
0361
0362
0363 bool bufferAndThreadInitialized_;
0364
0365
0366 off_t offset_;
0367
0368
0369 uint32_t lastBadChunk_;
0370 uint32_t numCorruptedEventsInChunk_;
0371
0372 bool readOnly_;
0373 };
0374
0375
0376 class TEOFException : public TTransportException {
0377 public:
0378 TEOFException() : TTransportException(TTransportException::END_OF_FILE){};
0379 };
0380
0381
0382 class TFileProcessor {
0383 public:
0384
0385
0386
0387
0388
0389
0390
0391 TFileProcessor(std::shared_ptr<TProcessor> processor,
0392 std::shared_ptr<TProtocolFactory> protocolFactory,
0393 std::shared_ptr<TFileReaderTransport> inputTransport);
0394
0395 TFileProcessor(std::shared_ptr<TProcessor> processor,
0396 std::shared_ptr<TProtocolFactory> inputProtocolFactory,
0397 std::shared_ptr<TProtocolFactory> outputProtocolFactory,
0398 std::shared_ptr<TFileReaderTransport> inputTransport);
0399
0400
0401
0402
0403
0404
0405
0406
0407
0408 TFileProcessor(std::shared_ptr<TProcessor> processor,
0409 std::shared_ptr<TProtocolFactory> protocolFactory,
0410 std::shared_ptr<TFileReaderTransport> inputTransport,
0411 std::shared_ptr<TTransport> outputTransport);
0412
0413
0414
0415
0416
0417
0418
0419 void process(uint32_t numEvents, bool tail);
0420
0421
0422
0423
0424
0425 void processChunk();
0426
0427 private:
0428 std::shared_ptr<TProcessor> processor_;
0429 std::shared_ptr<TProtocolFactory> inputProtocolFactory_;
0430 std::shared_ptr<TProtocolFactory> outputProtocolFactory_;
0431 std::shared_ptr<TFileReaderTransport> inputTransport_;
0432 std::shared_ptr<TTransport> outputTransport_;
0433 };
0434 }
0435 }
0436 }
0437
0438 #endif
0439