File indexing completed on 2026-04-17 08:35:01
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020 #ifndef THRIFT_TMULTIPLEXEDPROCESSOR_H_
0021 #define THRIFT_TMULTIPLEXEDPROCESSOR_H_ 1
0022
0023 #include <thrift/protocol/TProtocolDecorator.h>
0024 #include <thrift/TApplicationException.h>
0025 #include <thrift/TProcessor.h>
0026 #include <boost/tokenizer.hpp>
0027
0028 namespace apache {
0029 namespace thrift {
0030 namespace protocol {
0031
0032
0033
0034
0035
0036
0037 class StoredMessageProtocol : public TProtocolDecorator {
0038 public:
0039 StoredMessageProtocol(std::shared_ptr<protocol::TProtocol> _protocol,
0040 const std::string& _name,
0041 const TMessageType _type,
0042 const int32_t _seqid)
0043 : TProtocolDecorator(_protocol), name(_name), type(_type), seqid(_seqid) {}
0044
0045 uint32_t readMessageBegin_virt(std::string& _name, TMessageType& _type, int32_t& _seqid) override {
0046
0047 _name = name;
0048 _type = type;
0049 _seqid = seqid;
0050
0051 return 0;
0052 }
0053
0054 std::string name;
0055 TMessageType type;
0056 int32_t seqid;
0057 };
0058 }
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077
0078
0079
0080
0081
0082
0083
0084
0085
0086 class TMultiplexedProcessor : public TProcessor {
0087 public:
0088 typedef std::map<std::string, std::shared_ptr<TProcessor> > services_t;
0089
0090
0091
0092
0093
0094
0095
0096
0097
0098
0099
0100
0101 void registerProcessor(const std::string& serviceName, std::shared_ptr<TProcessor> processor) {
0102 services[serviceName] = processor;
0103 }
0104
0105
0106
0107
0108
0109 void registerDefault(const std::shared_ptr<TProcessor>& processor) {
0110 defaultProcessor = processor;
0111 }
0112
0113
0114
0115
0116 TException protocol_error(std::shared_ptr<protocol::TProtocol> in,
0117 std::shared_ptr<protocol::TProtocol> out,
0118 const std::string& name,
0119 int32_t seqid,
0120 const std::string& msg) const {
0121 in->skip(::apache::thrift::protocol::T_STRUCT);
0122 in->readMessageEnd();
0123 in->getTransport()->readEnd();
0124 ::apache::thrift::TApplicationException
0125 x(::apache::thrift::TApplicationException::PROTOCOL_ERROR,
0126 "TMultiplexedProcessor: " + msg);
0127 out->writeMessageBegin(name, ::apache::thrift::protocol::T_EXCEPTION, seqid);
0128 x.write(out.get());
0129 out->writeMessageEnd();
0130 out->getTransport()->writeEnd();
0131 out->getTransport()->flush();
0132 return TException(msg);
0133 }
0134
0135
0136
0137
0138
0139
0140
0141
0142
0143
0144
0145
0146
0147
0148
0149
0150 bool process(std::shared_ptr<protocol::TProtocol> in,
0151 std::shared_ptr<protocol::TProtocol> out,
0152 void* connectionContext) override {
0153 std::string name;
0154 protocol::TMessageType type;
0155 int32_t seqid;
0156
0157
0158
0159
0160 in->readMessageBegin(name, type, seqid);
0161
0162 if (type != protocol::T_CALL && type != protocol::T_ONEWAY) {
0163
0164 throw protocol_error(in, out, name, seqid, "Unexpected message type");
0165 }
0166
0167
0168 boost::tokenizer<boost::char_separator<char> > tok(name, boost::char_separator<char>(":"));
0169
0170 std::vector<std::string> tokens;
0171 std::copy(tok.begin(), tok.end(), std::back_inserter(tokens));
0172
0173
0174
0175 if (tokens.size() == 2) {
0176
0177 auto it = services.find(tokens[0]);
0178
0179 if (it != services.end()) {
0180 std::shared_ptr<TProcessor> processor = it->second;
0181
0182
0183 return processor
0184 ->process(std::shared_ptr<protocol::TProtocol>(
0185 new protocol::StoredMessageProtocol(in, tokens[1], type, seqid)),
0186 out,
0187 connectionContext);
0188 } else {
0189
0190 throw protocol_error(in, out, name, seqid,
0191 "Unknown service: " + tokens[0] +
0192 ". Did you forget to call registerProcessor()?");
0193 }
0194 } else if (tokens.size() == 1) {
0195 if (defaultProcessor) {
0196
0197 return defaultProcessor
0198 ->process(std::shared_ptr<protocol::TProtocol>(
0199 new protocol::StoredMessageProtocol(in, tokens[0], type, seqid)),
0200 out,
0201 connectionContext);
0202 } else {
0203 throw protocol_error(in, out, name, seqid,
0204 "Non-multiplexed client request dropped. "
0205 "Did you forget to call defaultProcessor()?");
0206 }
0207 } else {
0208 throw protocol_error(in, out, name, seqid,
0209 "Wrong number of tokens.");
0210 }
0211 }
0212
0213 private:
0214
0215 services_t services;
0216
0217
0218
0219 std::shared_ptr<TProcessor> defaultProcessor;
0220 };
0221 }
0222 }
0223
0224 #endif