Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-17 08:35:03

0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one
0003  * or more contributor license agreements. See the NOTICE file
0004  * distributed with this work for additional information
0005  * regarding copyright ownership. The ASF licenses this file
0006  * to you under the Apache License, Version 2.0 (the
0007  * "License"); you may not use this file except in compliance
0008  * with the License. You may obtain a copy of the License at
0009  *
0010  *   http://www.apache.org/licenses/LICENSE-2.0
0011  *
0012  * Unless required by applicable law or agreed to in writing,
0013  * software distributed under the License is distributed on an
0014  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0015  * KIND, either express or implied. See the License for the
0016  * specific language governing permissions and limitations
0017  * under the License.
0018  */
0019 
0020 #ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_
0021 #define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1
0022 
0023 #include <thrift/transport/TTransport.h>
0024 #include <thrift/Thrift.h>
0025 #include <thrift/TProcessor.h>
0026 
0027 #include <atomic>
0028 #include <string>
0029 #include <stdio.h>
0030 
0031 #include <thrift/concurrency/Mutex.h>
0032 #include <thrift/concurrency/Monitor.h>
0033 #include <thrift/concurrency/ThreadFactory.h>
0034 #include <thrift/concurrency/Thread.h>
0035 
0036 namespace apache {
0037 namespace thrift {
0038 namespace transport {
0039 
0040 using apache::thrift::TProcessor;
0041 using apache::thrift::protocol::TProtocolFactory;
0042 using apache::thrift::concurrency::Mutex;
0043 using apache::thrift::concurrency::Monitor;
0044 
0045 // Data pertaining to a single event
0046 typedef struct eventInfo {
0047   uint8_t* eventBuff_;
0048   uint32_t eventSize_;
0049   uint32_t eventBuffPos_;
0050 
0051   eventInfo() : eventBuff_(nullptr), eventSize_(0), eventBuffPos_(0){};
0052   ~eventInfo() {
0053     if (eventBuff_) {
0054       delete[] eventBuff_;
0055     }
0056   }
0057 } eventInfo;
0058 
0059 // information about current read state
0060 typedef struct readState {
0061   eventInfo* event_;
0062 
0063   // keep track of event size
0064   uint8_t eventSizeBuff_[4];
0065   uint8_t eventSizeBuffPos_;
0066   bool readingSize_;
0067 
0068   // read buffer variables
0069   int32_t bufferPtr_;
0070   int32_t bufferLen_;
0071 
0072   // last successful dispatch point
0073   int32_t lastDispatchPtr_;
0074 
0075   void resetState(uint32_t lastDispatchPtr) {
0076     readingSize_ = true;
0077     eventSizeBuffPos_ = 0;
0078     lastDispatchPtr_ = lastDispatchPtr;
0079   }
0080 
0081   void resetAllValues() {
0082     resetState(0);
0083     bufferPtr_ = 0;
0084     bufferLen_ = 0;
0085     if (event_) {
0086       delete (event_);
0087     }
0088     event_ = nullptr;
0089   }
0090 
0091   inline uint32_t getEventSize() {
0092     const void* buffer = reinterpret_cast<const void*>(eventSizeBuff_);
0093     return *reinterpret_cast<const uint32_t*>(buffer);
0094   }
0095 
0096   readState() {
0097     event_ = nullptr;
0098     resetAllValues();
0099   }
0100 
0101   ~readState() {
0102     if (event_) {
0103       delete (event_);
0104     }
0105   }
0106 
0107 } readState;
0108 
0109 /**
0110  * TFileTransportBuffer - buffer class used by TFileTransport for queueing up events
0111  * to be written to disk.  Should be used in the following way:
0112  *  1) Buffer created
0113  *  2) Buffer written to (addEvent)
0114  *  3) Buffer read from (getNext)
0115  *  4) Buffer reset (reset)
0116  *  5) Go back to 2, or destroy buffer
0117  *
0118  * The buffer should never be written to after it is read from, unless it is reset first.
0119  * Note: The above rules are enforced mainly for debugging its sole client TFileTransport
0120  *       which uses the buffer in this way.
0121  *
0122  */
0123 class TFileTransportBuffer {
0124 public:
0125   TFileTransportBuffer(uint32_t size);
0126   virtual ~TFileTransportBuffer();
0127 
0128   bool addEvent(eventInfo* event);
0129   eventInfo* getNext();
0130   void reset();
0131   bool isFull();
0132   bool isEmpty();
0133 
0134 private:
0135   TFileTransportBuffer(); // should not be used
0136 
0137   enum mode { WRITE, READ };
0138   mode bufferMode_;
0139 
0140   uint32_t writePoint_;
0141   uint32_t readPoint_;
0142   uint32_t size_;
0143   eventInfo** buffer_;
0144 };
0145 
0146 /**
0147  * Abstract interface for transports used to read files
0148  */
0149 class TFileReaderTransport : virtual public TTransport {
0150 public:
0151   virtual int32_t getReadTimeout() = 0;
0152   virtual void setReadTimeout(int32_t readTimeout) = 0;
0153 
0154   virtual uint32_t getNumChunks() = 0;
0155   virtual uint32_t getCurChunk() = 0;
0156   virtual void seekToChunk(int32_t chunk) = 0;
0157   virtual void seekToEnd() = 0;
0158 };
0159 
0160 /**
0161  * Abstract interface for transports used to write files
0162  */
0163 class TFileWriterTransport : virtual public TTransport {
0164 public:
0165   virtual uint32_t getChunkSize() = 0;
0166   virtual void setChunkSize(uint32_t chunkSize) = 0;
0167 };
0168 
0169 /**
0170  * File implementation of a transport. Reads and writes are done to a
0171  * file on disk.
0172  *
0173  */
0174 class TFileTransport : public TFileReaderTransport, public TFileWriterTransport {
0175 public:
0176   TFileTransport(std::string path, bool readOnly = false, std::shared_ptr<TConfiguration> config = nullptr);
0177   ~TFileTransport() override;
0178 
0179   // TODO: what is the correct behaviour for this?
0180   // the log file is generally always open
0181   bool isOpen() const override { return true; }
0182 
0183   void write(const uint8_t* buf, uint32_t len);
0184   void flush() override;
0185 
0186   uint32_t readAll(uint8_t* buf, uint32_t len);
0187   uint32_t read(uint8_t* buf, uint32_t len);
0188   bool peek() override;
0189 
0190   // log-file specific functions
0191   void seekToChunk(int32_t chunk) override;
0192   void seekToEnd() override;
0193   uint32_t getNumChunks() override;
0194   uint32_t getCurChunk() override;
0195 
0196   // for changing the output file
0197   void resetOutputFile(int fd, std::string filename, off_t offset);
0198 
0199   // Setter/Getter functions for user-controllable options
0200   void setReadBuffSize(uint32_t readBuffSize) {
0201     if (readBuffSize) {
0202       readBuffSize_ = readBuffSize;
0203     }
0204   }
0205   uint32_t getReadBuffSize() { return readBuffSize_; }
0206 
0207   static const int32_t TAIL_READ_TIMEOUT = -1;
0208   static const int32_t NO_TAIL_READ_TIMEOUT = 0;
0209   void setReadTimeout(int32_t readTimeout) override { readTimeout_ = readTimeout; }
0210   int32_t getReadTimeout() override { return readTimeout_; }
0211 
0212   void setChunkSize(uint32_t chunkSize) override {
0213     if (chunkSize) {
0214       chunkSize_ = chunkSize;
0215     }
0216   }
0217   uint32_t getChunkSize() override { return chunkSize_; }
0218 
0219   void setEventBufferSize(uint32_t bufferSize) {
0220     if (bufferAndThreadInitialized_) {
0221       GlobalOutput("Cannot change the buffer size after writer thread started");
0222       return;
0223     }
0224     eventBufferSize_ = bufferSize;
0225   }
0226 
0227   uint32_t getEventBufferSize() { return eventBufferSize_; }
0228 
0229   void setFlushMaxUs(uint32_t flushMaxUs) {
0230     if (flushMaxUs) {
0231       flushMaxUs_ = flushMaxUs;
0232     }
0233   }
0234   uint32_t getFlushMaxUs() { return flushMaxUs_; }
0235 
0236   void setFlushMaxBytes(uint32_t flushMaxBytes) {
0237     if (flushMaxBytes) {
0238       flushMaxBytes_ = flushMaxBytes;
0239     }
0240   }
0241   uint32_t getFlushMaxBytes() { return flushMaxBytes_; }
0242 
0243   void setMaxEventSize(uint32_t maxEventSize) { maxEventSize_ = maxEventSize; }
0244   uint32_t getMaxEventSize() { return maxEventSize_; }
0245 
0246   void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) {
0247     maxCorruptedEvents_ = maxCorruptedEvents;
0248   }
0249   uint32_t getMaxCorruptedEvents() { return maxCorruptedEvents_; }
0250 
0251   void setEofSleepTimeUs(uint32_t eofSleepTime) {
0252     if (eofSleepTime) {
0253       eofSleepTime_ = eofSleepTime;
0254     }
0255   }
0256   uint32_t getEofSleepTimeUs() { return eofSleepTime_; }
0257 
0258   /*
0259    * Override TTransport *_virt() functions to invoke our implementations.
0260    * We cannot use TVirtualTransport to provide these, since we need to inherit
0261    * virtually from TTransport.
0262    */
0263   uint32_t read_virt(uint8_t* buf, uint32_t len) override { return this->read(buf, len); }
0264   uint32_t readAll_virt(uint8_t* buf, uint32_t len) override { return this->readAll(buf, len); }
0265   void write_virt(const uint8_t* buf, uint32_t len) override { this->write(buf, len); }
0266 
0267 private:
0268   // helper functions for writing to a file
0269   void enqueueEvent(const uint8_t* buf, uint32_t eventLen);
0270   bool swapEventBuffers(const std::chrono::time_point<std::chrono::steady_clock> *deadline);
0271   bool initBufferAndWriteThread();
0272 
0273   // control for writer thread
0274   static void* startWriterThread(void* ptr) {
0275     static_cast<TFileTransport*>(ptr)->writerThread();
0276     return nullptr;
0277   }
0278   void writerThread();
0279 
0280   // helper functions for reading from a file
0281   eventInfo* readEvent();
0282 
0283   // event corruption-related functions
0284   bool isEventCorrupted();
0285   void performRecovery();
0286 
0287   // Utility functions
0288   void openLogFile();
0289   std::chrono::time_point<std::chrono::steady_clock> getNextFlushTime();
0290 
0291   // Class variables
0292   readState readState_;
0293   uint8_t* readBuff_;
0294   eventInfo* currentEvent_;
0295 
0296   uint32_t readBuffSize_;
0297   static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024;
0298 
0299   int32_t readTimeout_;
0300   static const int32_t DEFAULT_READ_TIMEOUT_MS = 200;
0301 
0302   // size of chunks that file will be split up into
0303   uint32_t chunkSize_;
0304   static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
0305 
0306   // size of event buffers
0307   uint32_t eventBufferSize_;
0308   static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 10000;
0309 
0310   // max number of microseconds that can pass without flushing
0311   uint32_t flushMaxUs_;
0312   static const uint32_t DEFAULT_FLUSH_MAX_US = 3000000;
0313 
0314   // max number of bytes that can be written without flushing
0315   uint32_t flushMaxBytes_;
0316   static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024;
0317 
0318   // max event size
0319   uint32_t maxEventSize_;
0320   static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0;
0321 
0322   // max number of corrupted events per chunk
0323   uint32_t maxCorruptedEvents_;
0324   static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0;
0325 
0326   // sleep duration when EOF is hit
0327   uint32_t eofSleepTime_;
0328   static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000;
0329 
0330   // sleep duration when a corrupted event is encountered
0331   uint32_t corruptedEventSleepTime_;
0332   static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000;
0333 
0334   // sleep duration in seconds when an IO error is encountered in the writer thread
0335   uint32_t writerThreadIOErrorSleepTime_;
0336   static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000;
0337 
0338   // writer thread
0339   apache::thrift::concurrency::ThreadFactory threadFactory_;
0340   std::shared_ptr<apache::thrift::concurrency::Thread> writerThread_;
0341 
0342   // buffers to hold data before it is flushed. Each element of the buffer stores a msg that
0343   // needs to be written to the file.  The buffers are swapped by the writer thread.
0344   TFileTransportBuffer* dequeueBuffer_;
0345   TFileTransportBuffer* enqueueBuffer_;
0346 
0347   // conditions used to block when the buffer is full or empty
0348   Monitor notFull_, notEmpty_;
0349   std::atomic<bool> closing_;
0350 
0351   // To keep track of whether the buffer has been flushed
0352   Monitor flushed_;
0353   std::atomic<bool> forceFlush_;
0354 
0355   // Mutex that is grabbed when enqueueing and swapping the read/write buffers
0356   Mutex mutex_;
0357 
0358   // File information
0359   std::string filename_;
0360   int fd_;
0361 
0362   // Whether the writer thread and buffers have been initialized
0363   bool bufferAndThreadInitialized_;
0364 
0365   // Offset within the file
0366   off_t offset_;
0367 
0368   // event corruption information
0369   uint32_t lastBadChunk_;
0370   uint32_t numCorruptedEventsInChunk_;
0371 
0372   bool readOnly_;
0373 };
0374 
0375 // Exception thrown when EOF is hit
0376 class TEOFException : public TTransportException {
0377 public:
0378   TEOFException() : TTransportException(TTransportException::END_OF_FILE){};
0379 };
0380 
0381 // wrapper class to process events from a file containing thrift events
0382 class TFileProcessor {
0383 public:
0384   /**
0385    * Constructor that defaults output transport to null transport
0386    *
0387    * @param processor processes log-file events
0388    * @param protocolFactory protocol factory
0389    * @param inputTransport file transport
0390    */
0391   TFileProcessor(std::shared_ptr<TProcessor> processor,
0392                  std::shared_ptr<TProtocolFactory> protocolFactory,
0393                  std::shared_ptr<TFileReaderTransport> inputTransport);
0394 
0395   TFileProcessor(std::shared_ptr<TProcessor> processor,
0396                  std::shared_ptr<TProtocolFactory> inputProtocolFactory,
0397                  std::shared_ptr<TProtocolFactory> outputProtocolFactory,
0398                  std::shared_ptr<TFileReaderTransport> inputTransport);
0399 
0400   /**
0401    * Constructor
0402    *
0403    * @param processor processes log-file events
0404    * @param protocolFactory protocol factory
0405    * @param inputTransport input file transport
0406    * @param output output transport
0407    */
0408   TFileProcessor(std::shared_ptr<TProcessor> processor,
0409                  std::shared_ptr<TProtocolFactory> protocolFactory,
0410                  std::shared_ptr<TFileReaderTransport> inputTransport,
0411                  std::shared_ptr<TTransport> outputTransport);
0412 
0413   /**
0414    * processes events from the file
0415    *
0416    * @param numEvents number of events to process (0 for unlimited)
0417    * @param tail tails the file if true
0418    */
0419   void process(uint32_t numEvents, bool tail);
0420 
0421   /**
0422    * process events until the end of the chunk
0423    *
0424    */
0425   void processChunk();
0426 
0427 private:
0428   std::shared_ptr<TProcessor> processor_;
0429   std::shared_ptr<TProtocolFactory> inputProtocolFactory_;
0430   std::shared_ptr<TProtocolFactory> outputProtocolFactory_;
0431   std::shared_ptr<TFileReaderTransport> inputTransport_;
0432   std::shared_ptr<TTransport> outputTransport_;
0433 };
0434 }
0435 }
0436 } // apache::thrift::transport
0437 
0438 #endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_
0439