|
|
|||
File indexing completed on 2026-04-17 08:35:03
0001 /* 0002 * Licensed to the Apache Software Foundation (ASF) under one 0003 * or more contributor license agreements. See the NOTICE file 0004 * distributed with this work for additional information 0005 * regarding copyright ownership. The ASF licenses this file 0006 * to you under the Apache License, Version 2.0 (the 0007 * "License"); you may not use this file except in compliance 0008 * with the License. You may obtain a copy of the License at 0009 * 0010 * http://www.apache.org/licenses/LICENSE-2.0 0011 * 0012 * Unless required by applicable law or agreed to in writing, 0013 * software distributed under the License is distributed on an 0014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 0015 * KIND, either express or implied. See the License for the 0016 * specific language governing permissions and limitations 0017 * under the License. 0018 */ 0019 0020 #ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_ 0021 #define _THRIFT_TRANSPORT_TTRANSPORT_H_ 1 0022 0023 #include <thrift/Thrift.h> 0024 #include <thrift/TConfiguration.h> 0025 #include <thrift/transport/TTransportException.h> 0026 #include <memory> 0027 #include <string> 0028 0029 namespace apache { 0030 namespace thrift { 0031 namespace transport { 0032 0033 /** 0034 * Helper template to hoist readAll implementation out of TTransport 0035 */ 0036 template <class Transport_> 0037 uint32_t readAll(Transport_& trans, uint8_t* buf, uint32_t len) { 0038 uint32_t have = 0; 0039 uint32_t get = 0; 0040 0041 while (have < len) { 0042 get = trans.read(buf + have, len - have); 0043 if (get <= 0) { 0044 throw TTransportException(TTransportException::END_OF_FILE, "No more data to read."); 0045 } 0046 have += get; 0047 } 0048 0049 return have; 0050 } 0051 0052 /** 0053 * Generic interface for a method of transporting data. A TTransport may be 0054 * capable of either reading or writing, but not necessarily both. 0055 * 0056 */ 0057 class TTransport { 0058 public: 0059 TTransport(std::shared_ptr<TConfiguration> config = nullptr) { 0060 if(config == nullptr) { 0061 configuration_ = std::shared_ptr<TConfiguration> (new TConfiguration()); 0062 } else { 0063 configuration_ = config; 0064 } 0065 resetConsumedMessageSize(); 0066 } 0067 0068 /** 0069 * Virtual deconstructor. 0070 */ 0071 virtual ~TTransport() = default; 0072 0073 /** 0074 * Whether this transport is open. 0075 */ 0076 virtual bool isOpen() const { return false; } 0077 0078 /** 0079 * Tests whether there is more data to read or if the remote side is 0080 * still open. By default this is true whenever the transport is open, 0081 * but implementations should add logic to test for this condition where 0082 * possible (i.e. on a socket). 0083 * This is used by a server to check if it should listen for another 0084 * request. 0085 */ 0086 virtual bool peek() { return isOpen(); } 0087 0088 /** 0089 * Opens the transport for communications. 0090 * 0091 * @return bool Whether the transport was successfully opened 0092 * @throws TTransportException if opening failed 0093 */ 0094 virtual void open() { 0095 throw TTransportException(TTransportException::NOT_OPEN, "Cannot open base TTransport."); 0096 } 0097 0098 /** 0099 * Closes the transport. 0100 */ 0101 virtual void close() { 0102 throw TTransportException(TTransportException::NOT_OPEN, "Cannot close base TTransport."); 0103 } 0104 0105 /** 0106 * Attempt to read up to the specified number of bytes into the string. 0107 * 0108 * @param buf Reference to the location to write the data 0109 * @param len How many bytes to read 0110 * @return How many bytes were actually read 0111 * @throws TTransportException If an error occurs 0112 */ 0113 uint32_t read(uint8_t* buf, uint32_t len) { 0114 T_VIRTUAL_CALL(); 0115 return read_virt(buf, len); 0116 } 0117 virtual uint32_t read_virt(uint8_t* /* buf */, uint32_t /* len */) { 0118 throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot read."); 0119 } 0120 0121 /** 0122 * Reads the given amount of data in its entirety no matter what. 0123 * 0124 * @param s Reference to location for read data 0125 * @param len How many bytes to read 0126 * @return How many bytes read, which must be equal to size 0127 * @throws TTransportException If insufficient data was read 0128 */ 0129 uint32_t readAll(uint8_t* buf, uint32_t len) { 0130 T_VIRTUAL_CALL(); 0131 return readAll_virt(buf, len); 0132 } 0133 virtual uint32_t readAll_virt(uint8_t* buf, uint32_t len) { 0134 return apache::thrift::transport::readAll(*this, buf, len); 0135 } 0136 0137 /** 0138 * Called when read is completed. 0139 * This can be over-ridden to perform a transport-specific action 0140 * e.g. logging the request to a file 0141 * 0142 * @return number of bytes read if available, 0 otherwise. 0143 */ 0144 virtual uint32_t readEnd() { 0145 // default behaviour is to do nothing 0146 return 0; 0147 } 0148 0149 /** 0150 * Writes the string in its entirety to the buffer. 0151 * 0152 * Note: You must call flush() to ensure the data is actually written, 0153 * and available to be read back in the future. Destroying a TTransport 0154 * object does not automatically flush pending data--if you destroy a 0155 * TTransport object with written but unflushed data, that data may be 0156 * discarded. 0157 * 0158 * @param buf The data to write out 0159 * @throws TTransportException if an error occurs 0160 */ 0161 void write(const uint8_t* buf, uint32_t len) { 0162 T_VIRTUAL_CALL(); 0163 write_virt(buf, len); 0164 } 0165 virtual void write_virt(const uint8_t* /* buf */, uint32_t /* len */) { 0166 throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot write."); 0167 } 0168 0169 /** 0170 * Called when write is completed. 0171 * This can be over-ridden to perform a transport-specific action 0172 * at the end of a request. 0173 * 0174 * @return number of bytes written if available, 0 otherwise 0175 */ 0176 virtual uint32_t writeEnd() { 0177 // default behaviour is to do nothing 0178 return 0; 0179 } 0180 0181 /** 0182 * Flushes any pending data to be written. Typically used with buffered 0183 * transport mechanisms. 0184 * 0185 * @throws TTransportException if an error occurs 0186 */ 0187 virtual void flush() { 0188 // default behaviour is to do nothing 0189 } 0190 0191 /** 0192 * Attempts to return a pointer to \c len bytes, possibly copied into \c buf. 0193 * Does not consume the bytes read (i.e.: a later read will return the same 0194 * data). This method is meant to support protocols that need to read 0195 * variable-length fields. They can attempt to borrow the maximum amount of 0196 * data that they will need, then consume (see next method) what they 0197 * actually use. Some transports will not support this method and others 0198 * will fail occasionally, so protocols must be prepared to use read if 0199 * borrow fails. 0200 * 0201 * @oaram buf A buffer where the data can be stored if needed. 0202 * If borrow doesn't return buf, then the contents of 0203 * buf after the call are undefined. This parameter may be 0204 * nullptr to indicate that the caller is not supplying storage, 0205 * but would like a pointer into an internal buffer, if 0206 * available. 0207 * @param len *len should initially contain the number of bytes to borrow. 0208 * If borrow succeeds, *len will contain the number of bytes 0209 * available in the returned pointer. This will be at least 0210 * what was requested, but may be more if borrow returns 0211 * a pointer to an internal buffer, rather than buf. 0212 * If borrow fails, the contents of *len are undefined. 0213 * @return If the borrow succeeds, return a pointer to the borrowed data. 0214 * This might be equal to \c buf, or it might be a pointer into 0215 * the transport's internal buffers. 0216 * @throws TTransportException if an error occurs 0217 */ 0218 const uint8_t* borrow(uint8_t* buf, uint32_t* len) { 0219 T_VIRTUAL_CALL(); 0220 return borrow_virt(buf, len); 0221 } 0222 virtual const uint8_t* borrow_virt(uint8_t* /* buf */, uint32_t* /* len */) { return nullptr; } 0223 0224 /** 0225 * Remove len bytes from the transport. This should always follow a borrow 0226 * of at least len bytes, and should always succeed. 0227 * TODO(dreiss): Is there any transport that could borrow but fail to 0228 * consume, or that would require a buffer to dump the consumed data? 0229 * 0230 * @param len How many bytes to consume 0231 * @throws TTransportException If an error occurs 0232 */ 0233 void consume(uint32_t len) { 0234 T_VIRTUAL_CALL(); 0235 consume_virt(len); 0236 } 0237 virtual void consume_virt(uint32_t /* len */) { 0238 throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot consume."); 0239 } 0240 0241 /** 0242 * Returns the origin of the transports call. The value depends on the 0243 * transport used. An IP based transport for example will return the 0244 * IP address of the client making the request. 0245 * If the transport doesn't know the origin Unknown is returned. 0246 * 0247 * The returned value can be used in a log message for example 0248 */ 0249 virtual const std::string getOrigin() const { return "Unknown"; } 0250 0251 std::shared_ptr<TConfiguration> getConfiguration() { return configuration_; } 0252 0253 void setConfiguration(std::shared_ptr<TConfiguration> config) { 0254 if (config != nullptr) configuration_ = config; 0255 } 0256 0257 /** 0258 * Updates RemainingMessageSize to reflect then known real message size (e.g. framed transport). 0259 * Will throw if we already consumed too many bytes or if the new size is larger than allowed. 0260 * 0261 * @param size real message size 0262 */ 0263 void updateKnownMessageSize(long int size) 0264 { 0265 long int consumed = knownMessageSize_ - remainingMessageSize_; 0266 resetConsumedMessageSize(size); 0267 countConsumedMessageBytes(consumed); 0268 } 0269 0270 /** 0271 * Throws if there are not enough bytes in the input stream to satisfy a read of numBytes bytes of data 0272 * 0273 * @param numBytes numBytes bytes of data 0274 */ 0275 void checkReadBytesAvailable(long int numBytes) 0276 { 0277 if (remainingMessageSize_ < numBytes) 0278 throw TTransportException(TTransportException::END_OF_FILE, "MaxMessageSize reached"); 0279 } 0280 0281 protected: 0282 std::shared_ptr<TConfiguration> configuration_; 0283 long int remainingMessageSize_; 0284 long int knownMessageSize_; 0285 0286 inline long int getRemainingMessageSize() { return remainingMessageSize_; } 0287 inline void setRemainingMessageSize(long int remainingMessageSize) { remainingMessageSize_ = remainingMessageSize; } 0288 inline int getMaxMessageSize() { return configuration_->getMaxMessageSize(); } 0289 inline long int getKnownMessageSize() { return knownMessageSize_; } 0290 void setKnownMessageSize(long int knownMessageSize) { knownMessageSize_ = knownMessageSize; } 0291 0292 /** 0293 * Resets RemainingMessageSize to the configured maximum 0294 * 0295 * @param newSize configured size 0296 */ 0297 void resetConsumedMessageSize(long newSize = -1) 0298 { 0299 // full reset 0300 if (newSize < 0) 0301 { 0302 knownMessageSize_ = getMaxMessageSize(); 0303 remainingMessageSize_ = getMaxMessageSize(); 0304 return; 0305 } 0306 0307 // update only: message size can shrink, but not grow 0308 if (newSize > knownMessageSize_) 0309 throw TTransportException(TTransportException::END_OF_FILE, "MaxMessageSize reached"); 0310 0311 knownMessageSize_ = newSize; 0312 remainingMessageSize_ = newSize; 0313 } 0314 0315 /** 0316 * Consumes numBytes from the RemainingMessageSize. 0317 * 0318 * @param numBytes Consumes numBytes 0319 */ 0320 void countConsumedMessageBytes(long int numBytes) 0321 { 0322 if (remainingMessageSize_ >= numBytes) 0323 { 0324 remainingMessageSize_ -= numBytes; 0325 } 0326 else 0327 { 0328 remainingMessageSize_ = 0; 0329 throw TTransportException(TTransportException::END_OF_FILE, "MaxMessageSize reached"); 0330 } 0331 } 0332 }; 0333 0334 /** 0335 * Generic factory class to make an input and output transport out of a 0336 * source transport. Commonly used inside servers to make input and output 0337 * streams out of raw clients. 0338 * 0339 */ 0340 class TTransportFactory { 0341 public: 0342 TTransportFactory() = default; 0343 0344 virtual ~TTransportFactory() = default; 0345 0346 /** 0347 * Default implementation does nothing, just returns the transport given. 0348 */ 0349 virtual std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) { 0350 return trans; 0351 } 0352 }; 0353 } 0354 } 0355 } // apache::thrift::transport 0356 0357 #endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_
| [ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
|
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
|