File indexing completed on 2026-04-17 08:35:02
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020 #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
0021 #define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
0022
0023 #include <thrift/Thrift.h>
0024 #include <memory>
0025 #include <thrift/server/TServer.h>
0026 #include <thrift/transport/PlatformSocket.h>
0027 #include <thrift/transport/TBufferTransports.h>
0028 #include <thrift/transport/TSocket.h>
0029 #include <thrift/transport/TNonblockingServerTransport.h>
0030 #include <thrift/concurrency/ThreadManager.h>
0031 #include <climits>
0032 #include <thrift/concurrency/Thread.h>
0033 #include <thrift/concurrency/ThreadFactory.h>
0034 #include <thrift/concurrency/Mutex.h>
0035 #include <stack>
0036 #include <vector>
0037 #include <string>
0038 #include <cstdlib>
0039 #include <unordered_set>
0040 #ifdef HAVE_UNISTD_H
0041 #include <unistd.h>
0042 #endif
0043 #include <event.h>
0044 #include <event2/event_compat.h>
0045 #include <event2/event_struct.h>
0046
0047 namespace apache {
0048 namespace thrift {
0049 namespace server {
0050
0051 using apache::thrift::transport::TMemoryBuffer;
0052 using apache::thrift::transport::TSocket;
0053 using apache::thrift::transport::TNonblockingServerTransport;
0054 using apache::thrift::protocol::TProtocol;
0055 using apache::thrift::concurrency::Runnable;
0056 using apache::thrift::concurrency::ThreadManager;
0057 using apache::thrift::concurrency::ThreadFactory;
0058 using apache::thrift::concurrency::Thread;
0059 using apache::thrift::concurrency::Mutex;
0060 using apache::thrift::concurrency::Guard;
0061
0062 #ifdef LIBEVENT_VERSION_NUMBER
0063 #define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
0064 #define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
0065 #define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
0066 #else
0067
0068 #define LIBEVENT_VERSION_MAJOR 1
0069 #define LIBEVENT_VERSION_MINOR 14
0070 #define LIBEVENT_VERSION_REL 13
0071 #define LIBEVENT_VERSION_NUMBER \
0072 ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
0073 #endif
0074
0075 #if LIBEVENT_VERSION_NUMBER < 0x02000000
0076 typedef THRIFT_SOCKET evutil_socket_t;
0077 #endif
0078
0079 #ifndef SOCKOPT_CAST_T
0080 #ifndef _WIN32
0081 #define SOCKOPT_CAST_T void
0082 #else
0083 #define SOCKOPT_CAST_T char
0084 #endif
0085 #endif
0086
0087 template <class T>
0088 inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
0089 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
0090 }
0091
0092 template <class T>
0093 inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
0094 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
0095 }
0096
0097
0098
0099
0100
0101
0102
0103
0104
0105 enum TOverloadAction {
0106 T_OVERLOAD_NO_ACTION,
0107 T_OVERLOAD_CLOSE_ON_ACCEPT,
0108 T_OVERLOAD_DRAIN_TASK_QUEUE
0109 };
0110
0111 class TNonblockingIOThread;
0112
0113 class TNonblockingServer : public TServer {
0114 private:
0115 class TConnection;
0116
0117 friend class TNonblockingIOThread;
0118
0119 private:
0120
0121 static const int LISTEN_BACKLOG = 1024;
0122
0123
0124 static const size_t CONNECTION_STACK_LIMIT = 1024;
0125
0126
0127 static const int MAX_FRAME_SIZE = 256 * 1024 * 1024;
0128
0129
0130 static const int MAX_CONNECTIONS = INT_MAX;
0131
0132
0133 static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
0134
0135
0136 static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
0137
0138
0139 static const int IDLE_READ_BUFFER_LIMIT = 1024;
0140
0141
0142 static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
0143
0144
0145 static const int RESIZE_BUFFER_EVERY_N = 512;
0146
0147
0148 static const int DEFAULT_IO_THREADS = 1;
0149
0150
0151 size_t numIOThreads_;
0152
0153
0154 bool useHighPriorityIOThreads_;
0155
0156
0157 THRIFT_SOCKET serverSocket_;
0158
0159
0160 event_base* userEventBase_;
0161
0162
0163 std::shared_ptr<ThreadManager> threadManager_;
0164
0165
0166 bool threadPoolProcessing_;
0167
0168
0169 std::shared_ptr<ThreadFactory> ioThreadFactory_;
0170
0171
0172 std::vector<std::shared_ptr<TNonblockingIOThread> > ioThreads_;
0173
0174
0175 uint32_t nextIOThread_;
0176
0177
0178 Mutex connMutex_;
0179
0180
0181 size_t numTConnections_;
0182
0183
0184 size_t numActiveProcessors_;
0185
0186
0187 size_t connectionStackLimit_;
0188
0189
0190 size_t maxActiveProcessors_;
0191
0192
0193 size_t maxConnections_;
0194
0195
0196 size_t maxFrameSize_;
0197
0198
0199 int64_t taskExpireTime_;
0200
0201
0202
0203
0204
0205
0206 double overloadHysteresis_;
0207
0208
0209 TOverloadAction overloadAction_;
0210
0211
0212
0213
0214
0215 size_t writeBufferDefaultSize_;
0216
0217
0218
0219
0220
0221
0222
0223 size_t idleReadBufferLimit_;
0224
0225
0226
0227
0228
0229
0230
0231
0232 size_t idleWriteBufferLimit_;
0233
0234
0235
0236
0237
0238 int32_t resizeBufferEveryN_;
0239
0240
0241 bool overloaded_;
0242
0243
0244 uint32_t nConnectionsDropped_;
0245
0246
0247 uint64_t nTotalConnectionsDropped_;
0248
0249
0250
0251
0252
0253
0254
0255 std::stack<TConnection*> connectionStack_;
0256
0257
0258
0259
0260
0261
0262
0263 std::unordered_set<TConnection*> activeConnections_;
0264
0265
0266
0267 std::shared_ptr<TNonblockingServerTransport> serverTransport_;
0268
0269
0270
0271
0272
0273
0274
0275
0276 void handleEvent(THRIFT_SOCKET fd, short which);
0277
0278 void init() {
0279 serverSocket_ = THRIFT_INVALID_SOCKET;
0280 numIOThreads_ = DEFAULT_IO_THREADS;
0281 nextIOThread_ = 0;
0282 useHighPriorityIOThreads_ = false;
0283 userEventBase_ = nullptr;
0284 threadPoolProcessing_ = false;
0285 numTConnections_ = 0;
0286 numActiveProcessors_ = 0;
0287 connectionStackLimit_ = CONNECTION_STACK_LIMIT;
0288 maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
0289 maxConnections_ = MAX_CONNECTIONS;
0290 maxFrameSize_ = MAX_FRAME_SIZE;
0291 taskExpireTime_ = 0;
0292 overloadHysteresis_ = 0.8;
0293 overloadAction_ = T_OVERLOAD_NO_ACTION;
0294 writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
0295 idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
0296 idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
0297 resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
0298 overloaded_ = false;
0299 nConnectionsDropped_ = 0;
0300 nTotalConnectionsDropped_ = 0;
0301 }
0302
0303 public:
0304 TNonblockingServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
0305 const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport)
0306 : TServer(processorFactory), serverTransport_(serverTransport) {
0307 init();
0308 }
0309
0310 TNonblockingServer(const std::shared_ptr<TProcessor>& processor,
0311 const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport)
0312 : TServer(processor), serverTransport_(serverTransport) {
0313 init();
0314 }
0315
0316
0317 TNonblockingServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
0318 const std::shared_ptr<TProtocolFactory>& protocolFactory,
0319 const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
0320 const std::shared_ptr<ThreadManager>& threadManager
0321 = std::shared_ptr<ThreadManager>())
0322 : TServer(processorFactory), serverTransport_(serverTransport) {
0323 init();
0324
0325 setInputProtocolFactory(protocolFactory);
0326 setOutputProtocolFactory(protocolFactory);
0327 setThreadManager(threadManager);
0328 }
0329
0330 TNonblockingServer(const std::shared_ptr<TProcessor>& processor,
0331 const std::shared_ptr<TProtocolFactory>& protocolFactory,
0332 const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
0333 const std::shared_ptr<ThreadManager>& threadManager
0334 = std::shared_ptr<ThreadManager>())
0335 : TServer(processor), serverTransport_(serverTransport) {
0336 init();
0337
0338 setInputProtocolFactory(protocolFactory);
0339 setOutputProtocolFactory(protocolFactory);
0340 setThreadManager(threadManager);
0341 }
0342
0343 TNonblockingServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
0344 const std::shared_ptr<TTransportFactory>& inputTransportFactory,
0345 const std::shared_ptr<TTransportFactory>& outputTransportFactory,
0346 const std::shared_ptr<TProtocolFactory>& inputProtocolFactory,
0347 const std::shared_ptr<TProtocolFactory>& outputProtocolFactory,
0348 const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
0349 const std::shared_ptr<ThreadManager>& threadManager
0350 = std::shared_ptr<ThreadManager>())
0351 : TServer(processorFactory), serverTransport_(serverTransport) {
0352 init();
0353
0354 setInputTransportFactory(inputTransportFactory);
0355 setOutputTransportFactory(outputTransportFactory);
0356 setInputProtocolFactory(inputProtocolFactory);
0357 setOutputProtocolFactory(outputProtocolFactory);
0358 setThreadManager(threadManager);
0359 }
0360
0361 TNonblockingServer(const std::shared_ptr<TProcessor>& processor,
0362 const std::shared_ptr<TTransportFactory>& inputTransportFactory,
0363 const std::shared_ptr<TTransportFactory>& outputTransportFactory,
0364 const std::shared_ptr<TProtocolFactory>& inputProtocolFactory,
0365 const std::shared_ptr<TProtocolFactory>& outputProtocolFactory,
0366 const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
0367 const std::shared_ptr<ThreadManager>& threadManager
0368 = std::shared_ptr<ThreadManager>())
0369 : TServer(processor), serverTransport_(serverTransport) {
0370 init();
0371
0372 setInputTransportFactory(inputTransportFactory);
0373 setOutputTransportFactory(outputTransportFactory);
0374 setInputProtocolFactory(inputProtocolFactory);
0375 setOutputProtocolFactory(outputProtocolFactory);
0376 setThreadManager(threadManager);
0377 }
0378
0379 ~TNonblockingServer() override;
0380
0381 void setThreadManager(std::shared_ptr<ThreadManager> threadManager);
0382
0383 int getListenPort() { return serverTransport_->getListenPort(); }
0384
0385 std::shared_ptr<ThreadManager> getThreadManager() { return threadManager_; }
0386
0387
0388
0389
0390
0391 void setNumIOThreads(size_t numThreads) {
0392 numIOThreads_ = numThreads;
0393
0394 assert(numIOThreads_ <= 1 || !userEventBase_);
0395 }
0396
0397
0398 bool useHighPriorityIOThreads() const { return useHighPriorityIOThreads_; }
0399
0400
0401 void setUseHighPriorityIOThreads(bool val) { useHighPriorityIOThreads_ = val; }
0402
0403
0404 size_t getNumIOThreads() const { return numIOThreads_; }
0405
0406
0407
0408
0409
0410
0411 size_t getConnectionStackLimit() const { return connectionStackLimit_; }
0412
0413
0414
0415
0416
0417
0418 void setConnectionStackLimit(size_t sz) { connectionStackLimit_ = sz; }
0419
0420 bool isThreadPoolProcessing() const { return threadPoolProcessing_; }
0421
0422 void addTask(std::shared_ptr<Runnable> task) {
0423 threadManager_->add(task, 0LL, taskExpireTime_);
0424 }
0425
0426
0427
0428
0429
0430
0431 size_t getNumConnections() const { return numTConnections_; }
0432
0433
0434
0435
0436
0437
0438 size_t getNumActiveConnections() const { return getNumConnections() - getNumIdleConnections(); }
0439
0440
0441
0442
0443
0444
0445 size_t getNumIdleConnections() const { return connectionStack_.size(); }
0446
0447
0448
0449
0450
0451
0452
0453
0454
0455 size_t getNumActiveProcessors() const { return numActiveProcessors_; }
0456
0457
0458 void incrementActiveProcessors() {
0459 Guard g(connMutex_);
0460 ++numActiveProcessors_;
0461 }
0462
0463
0464 void decrementActiveProcessors() {
0465 Guard g(connMutex_);
0466 if (numActiveProcessors_ > 0) {
0467 --numActiveProcessors_;
0468 }
0469 }
0470
0471
0472
0473
0474
0475
0476 size_t getMaxConnections() const { return maxConnections_; }
0477
0478
0479
0480
0481
0482
0483 void setMaxConnections(size_t maxConnections) { maxConnections_ = maxConnections; }
0484
0485
0486
0487
0488
0489
0490 size_t getMaxActiveProcessors() const { return maxActiveProcessors_; }
0491
0492
0493
0494
0495
0496
0497 void setMaxActiveProcessors(size_t maxActiveProcessors) {
0498 maxActiveProcessors_ = maxActiveProcessors;
0499 }
0500
0501
0502
0503
0504
0505
0506
0507
0508
0509 size_t getMaxFrameSize() const { return maxFrameSize_; }
0510
0511
0512
0513
0514
0515
0516 void setMaxFrameSize(size_t maxFrameSize) { maxFrameSize_ = maxFrameSize; }
0517
0518
0519
0520
0521
0522
0523 double getOverloadHysteresis() const { return overloadHysteresis_; }
0524
0525
0526
0527
0528
0529
0530
0531 void setOverloadHysteresis(double hysteresisFraction) {
0532 if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
0533 overloadHysteresis_ = hysteresisFraction;
0534 }
0535 }
0536
0537
0538
0539
0540
0541
0542 TOverloadAction getOverloadAction() const { return overloadAction_; }
0543
0544
0545
0546
0547
0548
0549 void setOverloadAction(TOverloadAction overloadAction) { overloadAction_ = overloadAction; }
0550
0551
0552
0553
0554
0555
0556 int64_t getTaskExpireTime() const { return taskExpireTime_; }
0557
0558
0559
0560
0561
0562
0563 void setTaskExpireTime(int64_t taskExpireTime) { taskExpireTime_ = taskExpireTime; }
0564
0565
0566
0567
0568
0569
0570
0571
0572
0573
0574 bool serverOverloaded();
0575
0576
0577
0578
0579
0580 bool drainPendingTask();
0581
0582
0583
0584
0585
0586
0587 size_t getWriteBufferDefaultSize() const { return writeBufferDefaultSize_; }
0588
0589
0590
0591
0592
0593
0594 void setWriteBufferDefaultSize(size_t size) { writeBufferDefaultSize_ = size; }
0595
0596
0597
0598
0599
0600
0601 size_t getIdleReadBufferLimit() const { return idleReadBufferLimit_; }
0602
0603
0604
0605
0606
0607
0608
0609 size_t getIdleBufferMemLimit() const { return idleReadBufferLimit_; }
0610
0611
0612
0613
0614
0615
0616
0617
0618
0619
0620 void setIdleReadBufferLimit(size_t limit) { idleReadBufferLimit_ = limit; }
0621
0622
0623
0624
0625
0626
0627
0628
0629
0630
0631
0632 void setIdleBufferMemLimit(size_t limit) { idleReadBufferLimit_ = limit; }
0633
0634
0635
0636
0637
0638
0639 size_t getIdleWriteBufferLimit() const { return idleWriteBufferLimit_; }
0640
0641
0642
0643
0644
0645
0646
0647
0648
0649
0650 void setIdleWriteBufferLimit(size_t limit) { idleWriteBufferLimit_ = limit; }
0651
0652
0653
0654
0655
0656
0657 int32_t getResizeBufferEveryN() const { return resizeBufferEveryN_; }
0658
0659
0660
0661
0662
0663
0664
0665
0666 void setResizeBufferEveryN(int32_t count) { resizeBufferEveryN_ = count; }
0667
0668
0669
0670
0671
0672 void serve() override;
0673
0674
0675
0676
0677 void stop() override;
0678
0679
0680 void createAndListenOnSocket();
0681
0682
0683
0684
0685
0686
0687
0688
0689
0690
0691 void registerEvents(event_base* user_event_base);
0692
0693
0694
0695
0696 event_base* getUserEventBase() const { return userEventBase_; }
0697
0698
0699
0700
0701 bool getHeaderTransport();
0702
0703 private:
0704
0705
0706
0707
0708
0709
0710 void expireClose(std::shared_ptr<Runnable> task);
0711
0712
0713
0714
0715
0716
0717
0718
0719
0720
0721
0722 TConnection* createConnection(std::shared_ptr<TSocket> socket);
0723
0724
0725
0726
0727
0728
0729
0730
0731 void returnConnection(TConnection* connection);
0732 };
0733
0734 class TNonblockingIOThread : public Runnable {
0735 public:
0736
0737
0738
0739 TNonblockingIOThread(TNonblockingServer* server,
0740 int number,
0741 THRIFT_SOCKET listenSocket,
0742 bool useHighPriority);
0743
0744 ~TNonblockingIOThread() override;
0745
0746
0747 event_base* getEventBase() const { return eventBase_; }
0748
0749
0750 TNonblockingServer* getServer() const { return server_; }
0751
0752
0753 int getThreadNumber() const { return number_; }
0754
0755
0756
0757 Thread::id_t getThreadId() const { return threadId_; }
0758
0759
0760 evutil_socket_t getNotificationSendFD() const { return notificationPipeFDs_[1]; }
0761
0762
0763 evutil_socket_t getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
0764
0765
0766 std::shared_ptr<Thread> getThread() const { return thread_; }
0767
0768
0769 void setThread(const std::shared_ptr<Thread>& t) { thread_ = t; }
0770
0771
0772 bool notify(TNonblockingServer::TConnection* conn);
0773
0774
0775 void run() override;
0776
0777
0778 void stop();
0779
0780
0781 void join();
0782
0783
0784 void registerEvents();
0785
0786 private:
0787
0788
0789
0790
0791
0792
0793
0794
0795 static void notifyHandler(evutil_socket_t fd, short which, void* v);
0796
0797
0798
0799
0800
0801
0802
0803
0804
0805 static void listenHandler(evutil_socket_t fd, short which, void* v) {
0806 ((TNonblockingServer*)v)->handleEvent(fd, which);
0807 }
0808
0809
0810 void breakLoop(bool error);
0811
0812
0813 void createNotificationPipe();
0814
0815
0816 void cleanupEvents();
0817
0818
0819 void setCurrentThreadHighPriority(bool value);
0820
0821 private:
0822
0823 TNonblockingServer* server_;
0824
0825
0826 const int number_;
0827
0828
0829 Thread::id_t threadId_;
0830
0831
0832 THRIFT_SOCKET listenSocket_;
0833
0834
0835 bool useHighPriority_;
0836
0837
0838 event_base* eventBase_;
0839
0840
0841
0842 bool ownEventBase_;
0843
0844
0845 struct event serverEvent_;
0846
0847
0848 struct event notificationEvent_;
0849
0850
0851 evutil_socket_t notificationPipeFDs_[2];
0852
0853
0854 std::shared_ptr<Thread> thread_;
0855 };
0856 }
0857 }
0858 }
0859
0860 #endif