File indexing completed on 2026-04-17 08:35:01
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019 #ifndef _THRIFT_TCONCURRENTCLIENTSYNCINFO_H_
0020 #define _THRIFT_TCONCURRENTCLIENTSYNCINFO_H_ 1
0021
0022 #include <thrift/protocol/TProtocol.h>
0023 #include <thrift/concurrency/Mutex.h>
0024 #include <thrift/concurrency/Monitor.h>
0025 #include <memory>
0026 #include <vector>
0027 #include <string>
0028 #include <map>
0029
0030 namespace apache {
0031 namespace thrift {
0032 namespace async {
0033
0034 class TConcurrentClientSyncInfo;
0035
0036 class TConcurrentSendSentry {
0037 public:
0038 explicit TConcurrentSendSentry(TConcurrentClientSyncInfo* sync);
0039 virtual ~TConcurrentSendSentry();
0040
0041 void commit();
0042
0043 private:
0044 TConcurrentClientSyncInfo& sync_;
0045 bool committed_;
0046 };
0047
0048 class TConcurrentRecvSentry {
0049 public:
0050 TConcurrentRecvSentry(TConcurrentClientSyncInfo* sync, int32_t seqid);
0051 virtual ~TConcurrentRecvSentry();
0052
0053 void commit();
0054
0055 private:
0056 TConcurrentClientSyncInfo& sync_;
0057 int32_t seqid_;
0058 bool committed_;
0059 };
0060
0061 class TConcurrentClientSyncInfo {
0062 private:
0063 typedef std::shared_ptr< ::apache::thrift::concurrency::Monitor> MonitorPtr;
0064 typedef std::map<int32_t, MonitorPtr> MonitorMap;
0065
0066 public:
0067 TConcurrentClientSyncInfo();
0068
0069 int32_t generateSeqId();
0070
0071 bool getPending(std::string& fname,
0072 ::apache::thrift::protocol::TMessageType& mtype,
0073 int32_t& rseqid);
0074
0075 void updatePending(const std::string& fname,
0076 ::apache::thrift::protocol::TMessageType mtype,
0077 int32_t rseqid);
0078
0079 void waitForWork(int32_t seqid);
0080
0081 ::apache::thrift::concurrency::Mutex& getReadMutex() { return readMutex_; }
0082 ::apache::thrift::concurrency::Mutex& getWriteMutex() { return writeMutex_; }
0083
0084 private:
0085 enum { MONITOR_CACHE_SIZE = 10 };
0086
0087 private:
0088 MonitorPtr newMonitor_(
0089 const ::apache::thrift::concurrency::Guard& seqidGuard);
0090 void deleteMonitor_(const ::apache::thrift::concurrency::Guard& seqidGuard, MonitorPtr& m);
0091
0092 void wakeupAnyone_(
0093 const ::apache::thrift::concurrency::Guard& seqidGuard);
0094 void markBad_(const ::apache::thrift::concurrency::Guard& seqidGuard);
0095 void throwBadSeqId_();
0096 void throwDeadConnection_();
0097
0098 private:
0099 volatile bool stop_;
0100
0101 ::apache::thrift::concurrency::Mutex seqidMutex_;
0102
0103 int32_t nextseqid_;
0104 MonitorMap seqidToMonitorMap_;
0105 std::vector<MonitorPtr> freeMonitors_;
0106
0107
0108 ::apache::thrift::concurrency::Mutex writeMutex_;
0109
0110 ::apache::thrift::concurrency::Mutex readMutex_;
0111
0112 bool recvPending_;
0113 bool wakeupSomeone_;
0114 int32_t seqidPending_;
0115 std::string fnamePending_;
0116 ::apache::thrift::protocol::TMessageType mtypePending_;
0117
0118
0119 friend class TConcurrentSendSentry;
0120 friend class TConcurrentRecvSentry;
0121 };
0122 }
0123 }
0124 }
0125
0126 #endif