Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-17 08:35:01

0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one
0003  * or more contributor license agreements. See the NOTICE file
0004  * distributed with this work for additional information
0005  * regarding copyright ownership. The ASF licenses this file
0006  * to you under the Apache License, Version 2.0 (the
0007  * "License"); you may not use this file except in compliance
0008  * with the License. You may obtain a copy of the License at
0009  *
0010  *   http://www.apache.org/licenses/LICENSE-2.0
0011  *
0012  * Unless required by applicable law or agreed to in writing,
0013  * software distributed under the License is distributed on an
0014  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0015  * KIND, either express or implied. See the License for the
0016  * specific language governing permissions and limitations
0017  * under the License.
0018  */
0019 
0020 #ifndef THRIFT_TMULTIPLEXEDPROCESSOR_H_
0021 #define THRIFT_TMULTIPLEXEDPROCESSOR_H_ 1
0022 
0023 #include <thrift/protocol/TProtocolDecorator.h>
0024 #include <thrift/TApplicationException.h>
0025 #include <thrift/TProcessor.h>
0026 #include <boost/tokenizer.hpp>
0027 
0028 namespace apache {
0029 namespace thrift {
0030 namespace protocol {
0031 
0032 /**
0033  *  To be able to work with any protocol, we needed
0034  *  to allow them to call readMessageBegin() and get a TMessage in exactly
0035  *  the standard format, without the service name prepended to TMessage.name.
0036  */
0037 class StoredMessageProtocol : public TProtocolDecorator {
0038 public:
0039   StoredMessageProtocol(std::shared_ptr<protocol::TProtocol> _protocol,
0040                         const std::string& _name,
0041                         const TMessageType _type,
0042                         const int32_t _seqid)
0043     : TProtocolDecorator(_protocol), name(_name), type(_type), seqid(_seqid) {}
0044 
0045   uint32_t readMessageBegin_virt(std::string& _name, TMessageType& _type, int32_t& _seqid) override {
0046 
0047     _name = name;
0048     _type = type;
0049     _seqid = seqid;
0050 
0051     return 0; // (Normal TProtocol read functions return number of bytes read)
0052   }
0053 
0054   std::string name;
0055   TMessageType type;
0056   int32_t seqid;
0057 };
0058 } // namespace protocol
0059 
0060 /**
0061  * <code>TMultiplexedProcessor</code> is a <code>TProcessor</code> allowing
0062  * a single <code>TServer</code> to provide multiple services.
0063  *
0064  * <p>To do so, you instantiate the processor and then register additional
0065  * processors with it, as shown in the following example:</p>
0066  *
0067  * <blockquote><code>
0068  *     std::shared_ptr<TMultiplexedProcessor> processor(new TMultiplexedProcessor());
0069  *
0070  *     processor->registerProcessor(
0071  *         "Calculator",
0072  *         std::shared_ptr<TProcessor>( new CalculatorProcessor(
0073  *             std::shared_ptr<CalculatorHandler>( new CalculatorHandler()))));
0074  *
0075  *     processor->registerProcessor(
0076  *         "WeatherReport",
0077  *         std::shared_ptr<TProcessor>( new WeatherReportProcessor(
0078  *             std::shared_ptr<WeatherReportHandler>( new WeatherReportHandler()))));
0079  *
0080  *     std::shared_ptr<TServerTransport> transport(new TServerSocket(9090));
0081  *     TSimpleServer server(processor, transport);
0082  *
0083  *     server.serve();
0084  * </code></blockquote>
0085  */
0086 class TMultiplexedProcessor : public TProcessor {
0087 public:
0088   typedef std::map<std::string, std::shared_ptr<TProcessor> > services_t;
0089 
0090   /**
0091     * 'Register' a service with this <code>TMultiplexedProcessor</code>.  This
0092     * allows us to broker requests to individual services by using the service
0093     * name to select them at request time.
0094     *
0095     * \param [in] serviceName Name of a service, has to be identical to the name
0096     *                         declared in the Thrift IDL, e.g. "WeatherReport".
0097     * \param [in] processor   Implementation of a service, usually referred to
0098     *                         as "handlers", e.g. WeatherReportHandler,
0099     *                         implementing WeatherReportIf interface.
0100     */
0101   void registerProcessor(const std::string& serviceName, std::shared_ptr<TProcessor> processor) {
0102     services[serviceName] = processor;
0103   }
0104 
0105   /**
0106    * Register a service to be called to process queries without service name
0107    * \param [in] processor   Implementation of a service.
0108    */
0109   void registerDefault(const std::shared_ptr<TProcessor>& processor) {
0110     defaultProcessor = processor;
0111   }
0112 
0113   /**
0114    * Chew up invalid input and return an exception to throw.
0115    */
0116   TException protocol_error(std::shared_ptr<protocol::TProtocol> in,
0117                             std::shared_ptr<protocol::TProtocol> out,
0118                             const std::string& name, 
0119                             int32_t seqid, 
0120                             const std::string& msg) const {
0121     in->skip(::apache::thrift::protocol::T_STRUCT);
0122     in->readMessageEnd();
0123     in->getTransport()->readEnd();
0124     ::apache::thrift::TApplicationException
0125       x(::apache::thrift::TApplicationException::PROTOCOL_ERROR,
0126         "TMultiplexedProcessor: " + msg);
0127     out->writeMessageBegin(name, ::apache::thrift::protocol::T_EXCEPTION, seqid);
0128     x.write(out.get());
0129     out->writeMessageEnd();
0130     out->getTransport()->writeEnd();
0131     out->getTransport()->flush();
0132     return TException(msg);
0133 }
0134    
0135   /**
0136    * This implementation of <code>process</code> performs the following steps:
0137    *
0138    * <ol>
0139    *     <li>Read the beginning of the message.</li>
0140    *     <li>Extract the service name from the message.</li>
0141    *     <li>Using the service name to locate the appropriate processor.</li>
0142    *     <li>Dispatch to the processor, with a decorated instance of TProtocol
0143    *         that allows readMessageBegin() to return the original TMessage.</li>
0144    * </ol>
0145    *
0146    * \throws TException If the message type is not T_CALL or T_ONEWAY, if
0147    * the service name was not found in the message, or if the service
0148    * name was not found in the service map.
0149    */
0150   bool process(std::shared_ptr<protocol::TProtocol> in,
0151                std::shared_ptr<protocol::TProtocol> out,
0152                void* connectionContext) override {
0153     std::string name;
0154     protocol::TMessageType type;
0155     int32_t seqid;
0156 
0157     // Use the actual underlying protocol (e.g. TBinaryProtocol) to read the
0158     // message header.  This pulls the message "off the wire", which we'll
0159     // deal with at the end of this method.
0160     in->readMessageBegin(name, type, seqid);
0161 
0162     if (type != protocol::T_CALL && type != protocol::T_ONEWAY) {
0163       // Unexpected message type.
0164       throw protocol_error(in, out, name, seqid, "Unexpected message type");
0165     }
0166 
0167     // Extract the service name
0168     boost::tokenizer<boost::char_separator<char> > tok(name, boost::char_separator<char>(":"));
0169 
0170     std::vector<std::string> tokens;
0171     std::copy(tok.begin(), tok.end(), std::back_inserter(tokens));
0172 
0173     // A valid message should consist of two tokens: the service
0174     // name and the name of the method to call.
0175     if (tokens.size() == 2) {
0176       // Search for a processor associated with this service name.
0177       auto it = services.find(tokens[0]);
0178 
0179       if (it != services.end()) {
0180         std::shared_ptr<TProcessor> processor = it->second;
0181         // Let the processor registered for this service name
0182         // process the message.
0183         return processor
0184             ->process(std::shared_ptr<protocol::TProtocol>(
0185                           new protocol::StoredMessageProtocol(in, tokens[1], type, seqid)),
0186                       out,
0187                       connectionContext);
0188       } else {
0189         // Unknown service.
0190         throw protocol_error(in, out, name, seqid, 
0191             "Unknown service: " + tokens[0] +
0192                 ". Did you forget to call registerProcessor()?");
0193       }
0194     } else if (tokens.size() == 1) {
0195       if (defaultProcessor) {
0196         // non-multiplexed client forwards to default processor
0197         return defaultProcessor            
0198             ->process(std::shared_ptr<protocol::TProtocol>(
0199                           new protocol::StoredMessageProtocol(in, tokens[0], type, seqid)),
0200                       out,
0201                       connectionContext);
0202       } else {
0203         throw protocol_error(in, out, name, seqid,
0204             "Non-multiplexed client request dropped. "
0205             "Did you forget to call defaultProcessor()?");
0206       }
0207     } else {
0208         throw protocol_error(in, out, name, seqid,
0209             "Wrong number of tokens.");
0210     }
0211   }
0212 
0213 private:
0214   /** Map of service processor objects, indexed by service names. */
0215   services_t services;
0216   
0217   //! If a non-multi client requests something, it goes to the
0218   //! default processor (if one is defined) for backwards compatibility.
0219   std::shared_ptr<TProcessor> defaultProcessor;
0220 };
0221 }
0222 }
0223 
0224 #endif // THRIFT_TMULTIPLEXEDPROCESSOR_H_