Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-17 08:35:02

0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one
0003  * or more contributor license agreements. See the NOTICE file
0004  * distributed with this work for additional information
0005  * regarding copyright ownership. The ASF licenses this file
0006  * to you under the Apache License, Version 2.0 (the
0007  * "License"); you may not use this file except in compliance
0008  * with the License. You may obtain a copy of the License at
0009  *
0010  *   http://www.apache.org/licenses/LICENSE-2.0
0011  *
0012  * Unless required by applicable law or agreed to in writing,
0013  * software distributed under the License is distributed on an
0014  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0015  * KIND, either express or implied. See the License for the
0016  * specific language governing permissions and limitations
0017  * under the License.
0018  */
0019 
0020 #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
0021 #define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
0022 
0023 #include <thrift/Thrift.h>
0024 #include <memory>
0025 #include <thrift/server/TServer.h>
0026 #include <thrift/transport/PlatformSocket.h>
0027 #include <thrift/transport/TBufferTransports.h>
0028 #include <thrift/transport/TSocket.h>
0029 #include <thrift/transport/TNonblockingServerTransport.h>
0030 #include <thrift/concurrency/ThreadManager.h>
0031 #include <climits>
0032 #include <thrift/concurrency/Thread.h>
0033 #include <thrift/concurrency/ThreadFactory.h>
0034 #include <thrift/concurrency/Mutex.h>
0035 #include <stack>
0036 #include <vector>
0037 #include <string>
0038 #include <cstdlib>
0039 #include <unordered_set>
0040 #ifdef HAVE_UNISTD_H
0041 #include <unistd.h>
0042 #endif
0043 #include <event.h>
0044 #include <event2/event_compat.h>
0045 #include <event2/event_struct.h>
0046 
0047 namespace apache {
0048 namespace thrift {
0049 namespace server {
0050 
0051 using apache::thrift::transport::TMemoryBuffer;
0052 using apache::thrift::transport::TSocket;
0053 using apache::thrift::transport::TNonblockingServerTransport;
0054 using apache::thrift::protocol::TProtocol;
0055 using apache::thrift::concurrency::Runnable;
0056 using apache::thrift::concurrency::ThreadManager;
0057 using apache::thrift::concurrency::ThreadFactory;
0058 using apache::thrift::concurrency::Thread;
0059 using apache::thrift::concurrency::Mutex;
0060 using apache::thrift::concurrency::Guard;
0061 
0062 #ifdef LIBEVENT_VERSION_NUMBER
0063 #define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
0064 #define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
0065 #define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
0066 #else
0067 // assume latest version 1 series
0068 #define LIBEVENT_VERSION_MAJOR 1
0069 #define LIBEVENT_VERSION_MINOR 14
0070 #define LIBEVENT_VERSION_REL 13
0071 #define LIBEVENT_VERSION_NUMBER                                                                    \
0072   ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
0073 #endif
0074 
0075 #if LIBEVENT_VERSION_NUMBER < 0x02000000
0076 typedef THRIFT_SOCKET evutil_socket_t;
0077 #endif
0078 
0079 #ifndef SOCKOPT_CAST_T
0080 #ifndef _WIN32
0081 #define SOCKOPT_CAST_T void
0082 #else
0083 #define SOCKOPT_CAST_T char
0084 #endif // _WIN32
0085 #endif
0086 
0087 template <class T>
0088 inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
0089   return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
0090 }
0091 
0092 template <class T>
0093 inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
0094   return reinterpret_cast<SOCKOPT_CAST_T*>(v);
0095 }
0096 
0097 /**
0098  * This is a non-blocking server in C++ for high performance that
0099  * operates a set of IO threads (by default only one). It assumes that
0100  * all incoming requests are framed with a 4 byte length indicator and
0101  * writes out responses using the same framing.
0102  */
0103 
0104 /// Overload condition actions.
0105 enum TOverloadAction {
0106   T_OVERLOAD_NO_ACTION,       ///< Don't handle overload */
0107   T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
0108   T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
0109 };
0110 
0111 class TNonblockingIOThread;
0112 
0113 class TNonblockingServer : public TServer {
0114 private:
0115   class TConnection;
0116 
0117   friend class TNonblockingIOThread;
0118 
0119 private:
0120   /// Listen backlog
0121   static const int LISTEN_BACKLOG = 1024;
0122 
0123   /// Default limit on size of idle connection pool
0124   static const size_t CONNECTION_STACK_LIMIT = 1024;
0125 
0126   /// Default limit on frame size
0127   static const int MAX_FRAME_SIZE = 256 * 1024 * 1024;
0128 
0129   /// Default limit on total number of connected sockets
0130   static const int MAX_CONNECTIONS = INT_MAX;
0131 
0132   /// Default limit on connections in handler/task processing
0133   static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
0134 
0135   /// Default size of write buffer
0136   static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
0137 
0138   /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
0139   static const int IDLE_READ_BUFFER_LIMIT = 1024;
0140 
0141   /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
0142   static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
0143 
0144   /// # of calls before resizing oversized buffers (0 = check only on close)
0145   static const int RESIZE_BUFFER_EVERY_N = 512;
0146 
0147   /// # of IO threads to use by default
0148   static const int DEFAULT_IO_THREADS = 1;
0149 
0150   /// # of IO threads this server will use
0151   size_t numIOThreads_;
0152 
0153   /// Whether to set high scheduling priority for IO threads
0154   bool useHighPriorityIOThreads_;
0155 
0156   /// Server socket file descriptor
0157   THRIFT_SOCKET serverSocket_;
0158 
0159   /// The optional user-provided event-base (for single-thread servers)
0160   event_base* userEventBase_;
0161 
0162   /// For processing via thread pool, may be nullptr
0163   std::shared_ptr<ThreadManager> threadManager_;
0164 
0165   /// Is thread pool processing?
0166   bool threadPoolProcessing_;
0167 
0168   // Factory to create the IO threads
0169   std::shared_ptr<ThreadFactory> ioThreadFactory_;
0170 
0171   // Vector of IOThread objects that will handle our IO
0172   std::vector<std::shared_ptr<TNonblockingIOThread> > ioThreads_;
0173 
0174   // Index of next IO Thread to be used (for round-robin)
0175   uint32_t nextIOThread_;
0176 
0177   // Synchronizes access to connection stack and similar data
0178   Mutex connMutex_;
0179 
0180   /// Number of TConnection object we've created
0181   size_t numTConnections_;
0182 
0183   /// Number of Connections processing or waiting to process
0184   size_t numActiveProcessors_;
0185 
0186   /// Limit for how many TConnection objects to cache
0187   size_t connectionStackLimit_;
0188 
0189   /// Limit for number of connections processing or waiting to process
0190   size_t maxActiveProcessors_;
0191 
0192   /// Limit for number of open connections
0193   size_t maxConnections_;
0194 
0195   /// Limit for frame size
0196   size_t maxFrameSize_;
0197 
0198   /// Time in milliseconds before an unperformed task expires (0 == infinite).
0199   int64_t taskExpireTime_;
0200 
0201   /**
0202    * Hysteresis for overload state.  This is the fraction of the overload
0203    * value that needs to be reached before the overload state is cleared;
0204    * must be <= 1.0.
0205    */
0206   double overloadHysteresis_;
0207 
0208   /// Action to take when we're overloaded.
0209   TOverloadAction overloadAction_;
0210 
0211   /**
0212    * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
0213    * and found to be exceeded, reinitialized) to this size.
0214    */
0215   size_t writeBufferDefaultSize_;
0216 
0217   /**
0218    * Max read buffer size for an idle TConnection.  When we place an idle
0219    * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
0220    * we will free the buffer (such that it will be reinitialized by the next
0221    * received frame) if it has exceeded this limit.  0 disables this check.
0222    */
0223   size_t idleReadBufferLimit_;
0224 
0225   /**
0226    * Max write buffer size for an idle connection.  When we place an idle
0227    * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
0228    * we insure that its write buffer is <= to this size; otherwise we
0229    * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
0230    * idle connections don't hog memory. 0 disables this check.
0231    */
0232   size_t idleWriteBufferLimit_;
0233 
0234   /**
0235    * Every N calls we check the buffer size limits on a connected TConnection.
0236    * 0 disables (i.e. the checks are only done when a connection closes).
0237    */
0238   int32_t resizeBufferEveryN_;
0239 
0240   /// Set if we are currently in an overloaded state.
0241   bool overloaded_;
0242 
0243   /// Count of connections dropped since overload started
0244   uint32_t nConnectionsDropped_;
0245 
0246   /// Count of connections dropped on overload since server started
0247   uint64_t nTotalConnectionsDropped_;
0248 
0249   /**
0250    * This is a stack of all the objects that have been created but that
0251    * are NOT currently in use. When we close a connection, we place it on this
0252    * stack so that the object can be reused later, rather than freeing the
0253    * memory and reallocating a new object later.
0254    */
0255   std::stack<TConnection*> connectionStack_;
0256 
0257   /**
0258    * This container holds pointers to all active connections. This container
0259    * allows the server to clean up unlcosed connection objects at destruction,
0260    * which in turn allows their transports, protocols, processors and handlers
0261    * to deallocate and clean up correctly.
0262    */
0263   std::unordered_set<TConnection*> activeConnections_;
0264 
0265   /*
0266   */
0267   std::shared_ptr<TNonblockingServerTransport> serverTransport_;
0268 
0269   /**
0270    * Called when server socket had something happen.  We accept all waiting
0271    * client connections on listen socket fd and assign TConnection objects
0272    * to handle those requests.
0273    *
0274    * @param which the event flag that triggered the handler.
0275    */
0276   void handleEvent(THRIFT_SOCKET fd, short which);
0277 
0278   void init() {
0279     serverSocket_ = THRIFT_INVALID_SOCKET;
0280     numIOThreads_ = DEFAULT_IO_THREADS;
0281     nextIOThread_ = 0;
0282     useHighPriorityIOThreads_ = false;
0283     userEventBase_ = nullptr;
0284     threadPoolProcessing_ = false;
0285     numTConnections_ = 0;
0286     numActiveProcessors_ = 0;
0287     connectionStackLimit_ = CONNECTION_STACK_LIMIT;
0288     maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
0289     maxConnections_ = MAX_CONNECTIONS;
0290     maxFrameSize_ = MAX_FRAME_SIZE;
0291     taskExpireTime_ = 0;
0292     overloadHysteresis_ = 0.8;
0293     overloadAction_ = T_OVERLOAD_NO_ACTION;
0294     writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
0295     idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
0296     idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
0297     resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
0298     overloaded_ = false;
0299     nConnectionsDropped_ = 0;
0300     nTotalConnectionsDropped_ = 0;
0301   }
0302 
0303 public:
0304   TNonblockingServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
0305                      const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport)
0306     : TServer(processorFactory), serverTransport_(serverTransport) {
0307     init();
0308   }
0309 
0310   TNonblockingServer(const std::shared_ptr<TProcessor>& processor,
0311                      const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport)
0312     : TServer(processor), serverTransport_(serverTransport) {
0313     init();
0314   }
0315 
0316 
0317   TNonblockingServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
0318                      const std::shared_ptr<TProtocolFactory>& protocolFactory,
0319                      const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
0320                      const std::shared_ptr<ThreadManager>& threadManager
0321                      = std::shared_ptr<ThreadManager>())
0322     : TServer(processorFactory), serverTransport_(serverTransport) {
0323     init();
0324 
0325     setInputProtocolFactory(protocolFactory);
0326     setOutputProtocolFactory(protocolFactory);
0327     setThreadManager(threadManager);
0328   }
0329 
0330   TNonblockingServer(const std::shared_ptr<TProcessor>& processor,
0331                      const std::shared_ptr<TProtocolFactory>& protocolFactory,
0332                      const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
0333                      const std::shared_ptr<ThreadManager>& threadManager
0334                      = std::shared_ptr<ThreadManager>())
0335     : TServer(processor), serverTransport_(serverTransport) {
0336     init();
0337 
0338     setInputProtocolFactory(protocolFactory);
0339     setOutputProtocolFactory(protocolFactory);
0340     setThreadManager(threadManager);
0341   }
0342 
0343   TNonblockingServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
0344                      const std::shared_ptr<TTransportFactory>& inputTransportFactory,
0345                      const std::shared_ptr<TTransportFactory>& outputTransportFactory,
0346                      const std::shared_ptr<TProtocolFactory>& inputProtocolFactory,
0347                      const std::shared_ptr<TProtocolFactory>& outputProtocolFactory,
0348                      const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
0349                      const std::shared_ptr<ThreadManager>& threadManager
0350                      = std::shared_ptr<ThreadManager>())
0351     : TServer(processorFactory), serverTransport_(serverTransport) {
0352     init();
0353 
0354     setInputTransportFactory(inputTransportFactory);
0355     setOutputTransportFactory(outputTransportFactory);
0356     setInputProtocolFactory(inputProtocolFactory);
0357     setOutputProtocolFactory(outputProtocolFactory);
0358     setThreadManager(threadManager);
0359   }
0360 
0361   TNonblockingServer(const std::shared_ptr<TProcessor>& processor,
0362                      const std::shared_ptr<TTransportFactory>& inputTransportFactory,
0363                      const std::shared_ptr<TTransportFactory>& outputTransportFactory,
0364                      const std::shared_ptr<TProtocolFactory>& inputProtocolFactory,
0365                      const std::shared_ptr<TProtocolFactory>& outputProtocolFactory,
0366                      const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
0367                      const std::shared_ptr<ThreadManager>& threadManager
0368                      = std::shared_ptr<ThreadManager>())
0369     : TServer(processor), serverTransport_(serverTransport) {
0370     init();
0371 
0372     setInputTransportFactory(inputTransportFactory);
0373     setOutputTransportFactory(outputTransportFactory);
0374     setInputProtocolFactory(inputProtocolFactory);
0375     setOutputProtocolFactory(outputProtocolFactory);
0376     setThreadManager(threadManager);
0377   }
0378 
0379   ~TNonblockingServer() override;
0380 
0381   void setThreadManager(std::shared_ptr<ThreadManager> threadManager);
0382 
0383   int getListenPort() { return serverTransport_->getListenPort(); }
0384 
0385   std::shared_ptr<ThreadManager> getThreadManager() { return threadManager_; }
0386 
0387   /**
0388    * Sets the number of IO threads used by this server. Can only be used before
0389    * the call to serve() and has no effect afterwards.
0390    */
0391   void setNumIOThreads(size_t numThreads) {
0392     numIOThreads_ = numThreads;
0393     // User-provided event-base doesn't works for multi-threaded servers
0394     assert(numIOThreads_ <= 1 || !userEventBase_);
0395   }
0396 
0397   /** Return whether the IO threads will get high scheduling priority */
0398   bool useHighPriorityIOThreads() const { return useHighPriorityIOThreads_; }
0399 
0400   /** Set whether the IO threads will get high scheduling priority. */
0401   void setUseHighPriorityIOThreads(bool val) { useHighPriorityIOThreads_ = val; }
0402 
0403   /** Return the number of IO threads used by this server. */
0404   size_t getNumIOThreads() const { return numIOThreads_; }
0405 
0406   /**
0407    * Get the maximum number of unused TConnection we will hold in reserve.
0408    *
0409    * @return the current limit on TConnection pool size.
0410    */
0411   size_t getConnectionStackLimit() const { return connectionStackLimit_; }
0412 
0413   /**
0414    * Set the maximum number of unused TConnection we will hold in reserve.
0415    *
0416    * @param sz the new limit for TConnection pool size.
0417    */
0418   void setConnectionStackLimit(size_t sz) { connectionStackLimit_ = sz; }
0419 
0420   bool isThreadPoolProcessing() const { return threadPoolProcessing_; }
0421 
0422   void addTask(std::shared_ptr<Runnable> task) {
0423     threadManager_->add(task, 0LL, taskExpireTime_);
0424   }
0425 
0426   /**
0427    * Return the count of sockets currently connected to.
0428    *
0429    * @return count of connected sockets.
0430    */
0431   size_t getNumConnections() const { return numTConnections_; }
0432 
0433   /**
0434    * Return the count of sockets currently connected to.
0435    *
0436    * @return count of connected sockets.
0437    */
0438   size_t getNumActiveConnections() const { return getNumConnections() - getNumIdleConnections(); }
0439 
0440   /**
0441    * Return the count of connection objects allocated but not in use.
0442    *
0443    * @return count of idle connection objects.
0444    */
0445   size_t getNumIdleConnections() const { return connectionStack_.size(); }
0446 
0447   /**
0448    * Return count of number of connections which are currently processing.
0449    * This is defined as a connection where all data has been received and
0450    * either assigned a task (when threading) or passed to a handler (when
0451    * not threading), and where the handler has not yet returned.
0452    *
0453    * @return # of connections currently processing.
0454    */
0455   size_t getNumActiveProcessors() const { return numActiveProcessors_; }
0456 
0457   /// Increment the count of connections currently processing.
0458   void incrementActiveProcessors() {
0459     Guard g(connMutex_);
0460     ++numActiveProcessors_;
0461   }
0462 
0463   /// Decrement the count of connections currently processing.
0464   void decrementActiveProcessors() {
0465     Guard g(connMutex_);
0466     if (numActiveProcessors_ > 0) {
0467       --numActiveProcessors_;
0468     }
0469   }
0470 
0471   /**
0472    * Get the maximum # of connections allowed before overload.
0473    *
0474    * @return current setting.
0475    */
0476   size_t getMaxConnections() const { return maxConnections_; }
0477 
0478   /**
0479    * Set the maximum # of connections allowed before overload.
0480    *
0481    * @param maxConnections new setting for maximum # of connections.
0482    */
0483   void setMaxConnections(size_t maxConnections) { maxConnections_ = maxConnections; }
0484 
0485   /**
0486    * Get the maximum # of connections waiting in handler/task before overload.
0487    *
0488    * @return current setting.
0489    */
0490   size_t getMaxActiveProcessors() const { return maxActiveProcessors_; }
0491 
0492   /**
0493    * Set the maximum # of connections waiting in handler/task before overload.
0494    *
0495    * @param maxActiveProcessors new setting for maximum # of active processes.
0496    */
0497   void setMaxActiveProcessors(size_t maxActiveProcessors) {
0498     maxActiveProcessors_ = maxActiveProcessors;
0499   }
0500 
0501   /**
0502    * Get the maximum allowed frame size.
0503    *
0504    * If a client tries to send a message larger than this limit,
0505    * its connection will be closed.
0506    *
0507    * @return Maxium frame size, in bytes.
0508    */
0509   size_t getMaxFrameSize() const { return maxFrameSize_; }
0510 
0511   /**
0512    * Set the maximum allowed frame size.
0513    *
0514    * @param maxFrameSize The new maximum frame size.
0515    */
0516   void setMaxFrameSize(size_t maxFrameSize) { maxFrameSize_ = maxFrameSize; }
0517 
0518   /**
0519    * Get fraction of maximum limits before an overload condition is cleared.
0520    *
0521    * @return hysteresis fraction
0522    */
0523   double getOverloadHysteresis() const { return overloadHysteresis_; }
0524 
0525   /**
0526    * Set fraction of maximum limits before an overload condition is cleared.
0527    * A good value would probably be between 0.5 and 0.9.
0528    *
0529    * @param hysteresisFraction fraction <= 1.0.
0530    */
0531   void setOverloadHysteresis(double hysteresisFraction) {
0532     if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
0533       overloadHysteresis_ = hysteresisFraction;
0534     }
0535   }
0536 
0537   /**
0538    * Get the action the server will take on overload.
0539    *
0540    * @return a TOverloadAction enum value for the currently set action.
0541    */
0542   TOverloadAction getOverloadAction() const { return overloadAction_; }
0543 
0544   /**
0545    * Set the action the server is to take on overload.
0546    *
0547    * @param overloadAction a TOverloadAction enum value for the action.
0548    */
0549   void setOverloadAction(TOverloadAction overloadAction) { overloadAction_ = overloadAction; }
0550 
0551   /**
0552    * Get the time in milliseconds after which a task expires (0 == infinite).
0553    *
0554    * @return a 64-bit time in milliseconds.
0555    */
0556   int64_t getTaskExpireTime() const { return taskExpireTime_; }
0557 
0558   /**
0559    * Set the time in milliseconds after which a task expires (0 == infinite).
0560    *
0561    * @param taskExpireTime a 64-bit time in milliseconds.
0562    */
0563   void setTaskExpireTime(int64_t taskExpireTime) { taskExpireTime_ = taskExpireTime; }
0564 
0565   /**
0566    * Determine if the server is currently overloaded.
0567    * This function checks the maximums for open connections and connections
0568    * currently in processing, and sets an overload condition if they are
0569    * exceeded.  The overload will persist until both values are below the
0570    * current hysteresis fraction of their maximums.
0571    *
0572    * @return true if an overload condition exists, false if not.
0573    */
0574   bool serverOverloaded();
0575 
0576   /** Pop and discard next task on threadpool wait queue.
0577    *
0578    * @return true if a task was discarded, false if the wait queue was empty.
0579    */
0580   bool drainPendingTask();
0581 
0582   /**
0583    * Get the starting size of a TConnection object's write buffer.
0584    *
0585    * @return # bytes we initialize a TConnection object's write buffer to.
0586    */
0587   size_t getWriteBufferDefaultSize() const { return writeBufferDefaultSize_; }
0588 
0589   /**
0590    * Set the starting size of a TConnection object's write buffer.
0591    *
0592    * @param size # bytes we initialize a TConnection object's write buffer to.
0593    */
0594   void setWriteBufferDefaultSize(size_t size) { writeBufferDefaultSize_ = size; }
0595 
0596   /**
0597    * Get the maximum size of read buffer allocated to idle TConnection objects.
0598    *
0599    * @return # bytes beyond which we will dealloc idle buffer.
0600    */
0601   size_t getIdleReadBufferLimit() const { return idleReadBufferLimit_; }
0602 
0603   /**
0604    * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
0605    * Get the maximum size of read buffer allocated to idle TConnection objects.
0606    *
0607    * @return # bytes beyond which we will dealloc idle buffer.
0608    */
0609   size_t getIdleBufferMemLimit() const { return idleReadBufferLimit_; }
0610 
0611   /**
0612    * Set the maximum size read buffer allocated to idle TConnection objects.
0613    * If a TConnection object is found (either on connection close or between
0614    * calls when resizeBufferEveryN_ is set) with more than this much memory
0615    * allocated to its read buffer, we free it and allow it to be reinitialized
0616    * on the next received frame.
0617    *
0618    * @param limit of bytes beyond which we will shrink buffers when checked.
0619    */
0620   void setIdleReadBufferLimit(size_t limit) { idleReadBufferLimit_ = limit; }
0621 
0622   /**
0623    * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
0624    * Set the maximum size read buffer allocated to idle TConnection objects.
0625    * If a TConnection object is found (either on connection close or between
0626    * calls when resizeBufferEveryN_ is set) with more than this much memory
0627    * allocated to its read buffer, we free it and allow it to be reinitialized
0628    * on the next received frame.
0629    *
0630    * @param limit of bytes beyond which we will shrink buffers when checked.
0631    */
0632   void setIdleBufferMemLimit(size_t limit) { idleReadBufferLimit_ = limit; }
0633 
0634   /**
0635    * Get the maximum size of write buffer allocated to idle TConnection objects.
0636    *
0637    * @return # bytes beyond which we will reallocate buffers when checked.
0638    */
0639   size_t getIdleWriteBufferLimit() const { return idleWriteBufferLimit_; }
0640 
0641   /**
0642    * Set the maximum size write buffer allocated to idle TConnection objects.
0643    * If a TConnection object is found (either on connection close or between
0644    * calls when resizeBufferEveryN_ is set) with more than this much memory
0645    * allocated to its write buffer, we destroy and construct that buffer with
0646    * writeBufferDefaultSize_ bytes.
0647    *
0648    * @param limit of bytes beyond which we will shrink buffers when idle.
0649    */
0650   void setIdleWriteBufferLimit(size_t limit) { idleWriteBufferLimit_ = limit; }
0651 
0652   /**
0653    * Get # of calls made between buffer size checks.  0 means disabled.
0654    *
0655    * @return # of calls between buffer size checks.
0656    */
0657   int32_t getResizeBufferEveryN() const { return resizeBufferEveryN_; }
0658 
0659   /**
0660    * Check buffer sizes every "count" calls.  This allows buffer limits
0661    * to be enforced for persistent connections with a controllable degree
0662    * of overhead. 0 disables checks except at connection close.
0663    *
0664    * @param count the number of calls between checks, or 0 to disable
0665    */
0666   void setResizeBufferEveryN(int32_t count) { resizeBufferEveryN_ = count; }
0667 
0668   /**
0669    * Main workhorse function, starts up the server listening on a port and
0670    * loops over the libevent handler.
0671    */
0672   void serve() override;
0673 
0674   /**
0675    * Causes the server to terminate gracefully (can be called from any thread).
0676    */
0677   void stop() override;
0678 
0679   /// Creates a socket to listen on and binds it to the local port.
0680   void createAndListenOnSocket();
0681 
0682   /**
0683    * Register the optional user-provided event-base (for single-thread servers)
0684    *
0685    * This method should be used when the server is running in a single-thread
0686    * mode, and the event base is provided by the user (i.e., the caller).
0687    *
0688    * @param user_event_base the user-provided event-base. The user is
0689    * responsible for freeing the event base memory.
0690    */
0691   void registerEvents(event_base* user_event_base);
0692 
0693   /**
0694    * Returns the optional user-provided event-base (for single-thread servers).
0695    */
0696   event_base* getUserEventBase() const { return userEventBase_; }
0697 
0698   /** Some transports, like THeaderTransport, require passing through
0699    * the framing size instead of stripping it.
0700    */
0701   bool getHeaderTransport();
0702 
0703 private:
0704   /**
0705    * Callback function that the threadmanager calls when a task reaches
0706    * its expiration time.  It is needed to clean up the expired connection.
0707    *
0708    * @param task the runnable associated with the expired task.
0709    */
0710   void expireClose(std::shared_ptr<Runnable> task);
0711 
0712   /**
0713    * Return an initialized connection object.  Creates or recovers from
0714    * pool a TConnection and initializes it with the provided socket FD
0715    * and flags.
0716    *
0717    * @param socket FD of socket associated with this connection.
0718    * @param addr the sockaddr of the client
0719    * @param addrLen the length of addr
0720    * @return pointer to initialized TConnection object.
0721    */
0722   TConnection* createConnection(std::shared_ptr<TSocket> socket);
0723 
0724   /**
0725    * Returns a connection to pool or deletion.  If the connection pool
0726    * (a stack) isn't full, place the connection object on it, otherwise
0727    * just delete it.
0728    *
0729    * @param connection the TConection being returned.
0730    */
0731   void returnConnection(TConnection* connection);
0732 };
0733 
0734 class TNonblockingIOThread : public Runnable {
0735 public:
0736   // Creates an IO thread and sets up the event base.  The listenSocket should
0737   // be a valid FD on which listen() has already been called.  If the
0738   // listenSocket is < 0, accepting will not be done.
0739   TNonblockingIOThread(TNonblockingServer* server,
0740                        int number,
0741                        THRIFT_SOCKET listenSocket,
0742                        bool useHighPriority);
0743 
0744   ~TNonblockingIOThread() override;
0745 
0746   // Returns the event-base for this thread.
0747   event_base* getEventBase() const { return eventBase_; }
0748 
0749   // Returns the server for this thread.
0750   TNonblockingServer* getServer() const { return server_; }
0751 
0752   // Returns the number of this IO thread.
0753   int getThreadNumber() const { return number_; }
0754 
0755   // Returns the thread id associated with this object.  This should
0756   // only be called after the thread has been started.
0757   Thread::id_t getThreadId() const { return threadId_; }
0758 
0759   // Returns the send-fd for task complete notifications.
0760   evutil_socket_t getNotificationSendFD() const { return notificationPipeFDs_[1]; }
0761 
0762   // Returns the read-fd for task complete notifications.
0763   evutil_socket_t getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
0764 
0765   // Returns the actual thread object associated with this IO thread.
0766   std::shared_ptr<Thread> getThread() const { return thread_; }
0767 
0768   // Sets the actual thread object associated with this IO thread.
0769   void setThread(const std::shared_ptr<Thread>& t) { thread_ = t; }
0770 
0771   // Used by TConnection objects to indicate processing has finished.
0772   bool notify(TNonblockingServer::TConnection* conn);
0773 
0774   // Enters the event loop and does not return until a call to stop().
0775   void run() override;
0776 
0777   // Exits the event loop as soon as possible.
0778   void stop();
0779 
0780   // Ensures that the event-loop thread is fully finished and shut down.
0781   void join();
0782 
0783   /// Registers the events for the notification & listen sockets
0784   void registerEvents();
0785 
0786 private:
0787   /**
0788    * C-callable event handler for signaling task completion.  Provides a
0789    * callback that libevent can understand that will read a connection
0790    * object's address from a pipe and call connection->transition() for
0791    * that object.
0792    *
0793    * @param fd the descriptor the event occurred on.
0794    */
0795   static void notifyHandler(evutil_socket_t fd, short which, void* v);
0796 
0797   /**
0798    * C-callable event handler for listener events.  Provides a callback
0799    * that libevent can understand which invokes server->handleEvent().
0800    *
0801    * @param fd the descriptor the event occurred on.
0802    * @param which the flags associated with the event.
0803    * @param v void* callback arg where we placed TNonblockingServer's "this".
0804    */
0805   static void listenHandler(evutil_socket_t fd, short which, void* v) {
0806     ((TNonblockingServer*)v)->handleEvent(fd, which);
0807   }
0808 
0809   /// Exits the loop ASAP in case of shutdown or error.
0810   void breakLoop(bool error);
0811 
0812   /// Create the pipe used to notify I/O process of task completion.
0813   void createNotificationPipe();
0814 
0815   /// Unregisters our events for notification and listen sockets.
0816   void cleanupEvents();
0817 
0818   /// Sets (or clears) high priority scheduling status for the current thread.
0819   void setCurrentThreadHighPriority(bool value);
0820 
0821 private:
0822   /// associated server
0823   TNonblockingServer* server_;
0824 
0825   /// thread number (for debugging).
0826   const int number_;
0827 
0828   /// The actual physical thread id.
0829   Thread::id_t threadId_;
0830 
0831   /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
0832   THRIFT_SOCKET listenSocket_;
0833 
0834   /// Sets a high scheduling priority when running
0835   bool useHighPriority_;
0836 
0837   /// pointer to eventbase to be used for looping
0838   event_base* eventBase_;
0839 
0840   /// Set to true if this class is responsible for freeing the event base
0841   /// memory.
0842   bool ownEventBase_;
0843 
0844   /// Used with eventBase_ for connection events (only in listener thread)
0845   struct event serverEvent_;
0846 
0847   /// Used with eventBase_ for task completion notification
0848   struct event notificationEvent_;
0849 
0850   /// File descriptors for pipe used for task completion notification.
0851   evutil_socket_t notificationPipeFDs_[2];
0852 
0853   /// Actual IO Thread
0854   std::shared_ptr<Thread> thread_;
0855 };
0856 }
0857 }
0858 } // apache::thrift::server
0859 
0860 #endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_