![]() |
|
|||
File indexing completed on 2025-08-28 08:27:00
0001 // Licensed to the Apache Software Foundation (ASF) under one 0002 // or more contributor license agreements. See the NOTICE file 0003 // distributed with this work for additional information 0004 // regarding copyright ownership. The ASF licenses this file 0005 // to you under the Apache License, Version 2.0 (the 0006 // "License"); you may not use this file except in compliance 0007 // with the License. You may obtain a copy of the License at 0008 // 0009 // http://www.apache.org/licenses/LICENSE-2.0 0010 // 0011 // Unless required by applicable law or agreed to in writing, 0012 // software distributed under the License is distributed on an 0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 0014 // KIND, either express or implied. See the License for the 0015 // specific language governing permissions and limitations 0016 // under the License. 0017 0018 // C++ object model and user API for interprocess schema messaging 0019 0020 #pragma once 0021 0022 #include <cstdint> 0023 #include <functional> 0024 #include <memory> 0025 #include <string> 0026 #include <utility> 0027 0028 #include "arrow/io/type_fwd.h" 0029 #include "arrow/ipc/type_fwd.h" 0030 #include "arrow/result.h" 0031 #include "arrow/status.h" 0032 #include "arrow/type_fwd.h" 0033 #include "arrow/util/macros.h" 0034 #include "arrow/util/visibility.h" 0035 0036 namespace arrow { 0037 namespace ipc { 0038 0039 struct IpcWriteOptions; 0040 0041 // Read interface classes. We do not fully deserialize the flatbuffers so that 0042 // individual fields metadata can be retrieved from very large schema without 0043 // 0044 0045 /// \class Message 0046 /// \brief An IPC message including metadata and body 0047 class ARROW_EXPORT Message { 0048 public: 0049 /// \brief Construct message, but do not validate 0050 /// 0051 /// Use at your own risk; Message::Open has more metadata validation 0052 Message(std::shared_ptr<Buffer> metadata, std::shared_ptr<Buffer> body); 0053 0054 ~Message(); 0055 0056 /// \brief Create and validate a Message instance from two buffers 0057 /// 0058 /// \param[in] metadata a buffer containing the Flatbuffer metadata 0059 /// \param[in] body a buffer containing the message body, which may be null 0060 /// \return the created message 0061 static Result<std::unique_ptr<Message>> Open(std::shared_ptr<Buffer> metadata, 0062 std::shared_ptr<Buffer> body); 0063 0064 /// \brief Read message body and create Message given Flatbuffer metadata 0065 /// \param[in] metadata containing a serialized Message flatbuffer 0066 /// \param[in] stream an InputStream 0067 /// \return the created Message 0068 /// 0069 /// \note If stream supports zero-copy, this is zero-copy 0070 static Result<std::unique_ptr<Message>> ReadFrom(std::shared_ptr<Buffer> metadata, 0071 io::InputStream* stream); 0072 0073 /// \brief Read message body from position in file, and create Message given 0074 /// the Flatbuffer metadata 0075 /// \param[in] offset the position in the file where the message body starts. 0076 /// \param[in] metadata containing a serialized Message flatbuffer 0077 /// \param[in] file the seekable file interface to read from 0078 /// \return the created Message 0079 /// 0080 /// \note If file supports zero-copy, this is zero-copy 0081 static Result<std::unique_ptr<Message>> ReadFrom(const int64_t offset, 0082 std::shared_ptr<Buffer> metadata, 0083 io::RandomAccessFile* file); 0084 0085 /// \brief Return true if message type and contents are equal 0086 /// 0087 /// \param other another message 0088 /// \return true if contents equal 0089 bool Equals(const Message& other) const; 0090 0091 /// \brief the Message metadata 0092 /// 0093 /// \return buffer 0094 std::shared_ptr<Buffer> metadata() const; 0095 0096 /// \brief Custom metadata serialized in metadata Flatbuffer. Returns nullptr 0097 /// when none set 0098 const std::shared_ptr<const KeyValueMetadata>& custom_metadata() const; 0099 0100 /// \brief the Message body, if any 0101 /// 0102 /// \return buffer is null if no body 0103 std::shared_ptr<Buffer> body() const; 0104 0105 /// \brief The expected body length according to the metadata, for 0106 /// verification purposes 0107 int64_t body_length() const; 0108 0109 /// \brief The Message type 0110 MessageType type() const; 0111 0112 /// \brief The Message metadata version 0113 MetadataVersion metadata_version() const; 0114 0115 const void* header() const; 0116 0117 /// \brief Write length-prefixed metadata and body to output stream 0118 /// 0119 /// \param[in] file output stream to write to 0120 /// \param[in] options IPC writing options including alignment 0121 /// \param[out] output_length the number of bytes written 0122 /// \return Status 0123 Status SerializeTo(io::OutputStream* file, const IpcWriteOptions& options, 0124 int64_t* output_length) const; 0125 0126 /// \brief Return true if the Message metadata passes Flatbuffer validation 0127 bool Verify() const; 0128 0129 /// \brief Whether a given message type needs a body. 0130 static bool HasBody(MessageType type) { 0131 return type != MessageType::NONE && type != MessageType::SCHEMA; 0132 } 0133 0134 private: 0135 // Hide serialization details from user API 0136 class MessageImpl; 0137 std::unique_ptr<MessageImpl> impl_; 0138 0139 ARROW_DISALLOW_COPY_AND_ASSIGN(Message); 0140 }; 0141 0142 ARROW_EXPORT std::string FormatMessageType(MessageType type); 0143 0144 /// \class MessageDecoderListener 0145 /// \brief An abstract class to listen events from MessageDecoder. 0146 /// 0147 /// This API is EXPERIMENTAL. 0148 /// 0149 /// \since 0.17.0 0150 class ARROW_EXPORT MessageDecoderListener { 0151 public: 0152 virtual ~MessageDecoderListener() = default; 0153 0154 /// \brief Called when a message is decoded. 0155 /// 0156 /// MessageDecoder calls this method when it decodes a message. This 0157 /// method is called multiple times when the target stream has 0158 /// multiple messages. 0159 /// 0160 /// \param[in] message a decoded message 0161 /// \return Status 0162 virtual Status OnMessageDecoded(std::unique_ptr<Message> message) = 0; 0163 0164 /// \brief Called when the decoder state is changed to 0165 /// MessageDecoder::State::INITIAL. 0166 /// 0167 /// The default implementation just returns arrow::Status::OK(). 0168 /// 0169 /// \return Status 0170 virtual Status OnInitial(); 0171 0172 /// \brief Called when the decoder state is changed to 0173 /// MessageDecoder::State::METADATA_LENGTH. 0174 /// 0175 /// The default implementation just returns arrow::Status::OK(). 0176 /// 0177 /// \return Status 0178 virtual Status OnMetadataLength(); 0179 0180 /// \brief Called when the decoder state is changed to 0181 /// MessageDecoder::State::METADATA. 0182 /// 0183 /// The default implementation just returns arrow::Status::OK(). 0184 /// 0185 /// \return Status 0186 virtual Status OnMetadata(); 0187 0188 /// \brief Called when the decoder state is changed to 0189 /// MessageDecoder::State::BODY. 0190 /// 0191 /// The default implementation just returns arrow::Status::OK(). 0192 /// 0193 /// \return Status 0194 virtual Status OnBody(); 0195 0196 /// \brief Called when the decoder state is changed to 0197 /// MessageDecoder::State::EOS. 0198 /// 0199 /// The default implementation just returns arrow::Status::OK(). 0200 /// 0201 /// \return Status 0202 virtual Status OnEOS(); 0203 }; 0204 0205 /// \class AssignMessageDecoderListener 0206 /// \brief Assign a message decoded by MessageDecoder. 0207 /// 0208 /// This API is EXPERIMENTAL. 0209 /// 0210 /// \since 0.17.0 0211 class ARROW_EXPORT AssignMessageDecoderListener : public MessageDecoderListener { 0212 public: 0213 /// \brief Construct a listener that assigns a decoded message to the 0214 /// specified location. 0215 /// 0216 /// \param[in] message a location to store the received message 0217 explicit AssignMessageDecoderListener(std::unique_ptr<Message>* message) 0218 : message_(message) {} 0219 0220 virtual ~AssignMessageDecoderListener() = default; 0221 0222 Status OnMessageDecoded(std::unique_ptr<Message> message) override { 0223 *message_ = std::move(message); 0224 return Status::OK(); 0225 } 0226 0227 private: 0228 std::unique_ptr<Message>* message_; 0229 0230 ARROW_DISALLOW_COPY_AND_ASSIGN(AssignMessageDecoderListener); 0231 }; 0232 0233 /// \class MessageDecoder 0234 /// \brief Push style message decoder that receives data from user. 0235 /// 0236 /// This API is EXPERIMENTAL. 0237 /// 0238 /// \since 0.17.0 0239 class ARROW_EXPORT MessageDecoder { 0240 public: 0241 /// \brief State for reading a message 0242 enum State { 0243 /// The initial state. It requires one of the followings as the next data: 0244 /// 0245 /// * int32_t continuation token 0246 /// * int32_t end-of-stream mark (== 0) 0247 /// * int32_t metadata length (backward compatibility for 0248 /// reading old IPC messages produced prior to version 0.15.0 0249 INITIAL, 0250 0251 /// It requires int32_t metadata length. 0252 METADATA_LENGTH, 0253 0254 /// It requires metadata. 0255 METADATA, 0256 0257 /// It requires message body. 0258 BODY, 0259 0260 /// The end-of-stream state. No more data is processed. 0261 EOS, 0262 }; 0263 0264 /// \brief Construct a message decoder. 0265 /// 0266 /// \param[in] listener a MessageDecoderListener that responds events from 0267 /// the decoder 0268 /// \param[in] pool an optional MemoryPool to copy metadata on the 0269 /// \param[in] skip_body if true the body will be skipped even if the message has a body 0270 /// CPU, if required 0271 explicit MessageDecoder(std::shared_ptr<MessageDecoderListener> listener, 0272 MemoryPool* pool = default_memory_pool(), 0273 bool skip_body = false); 0274 0275 /// \brief Construct a message decoder with the specified state. 0276 /// 0277 /// This is a construct for advanced users that know how to decode 0278 /// Message. 0279 /// 0280 /// \param[in] listener a MessageDecoderListener that responds events from 0281 /// the decoder 0282 /// \param[in] initial_state an initial state of the decode 0283 /// \param[in] initial_next_required_size the number of bytes needed 0284 /// to run the next action 0285 /// \param[in] pool an optional MemoryPool to copy metadata on the 0286 /// CPU, if required 0287 /// \param[in] skip_body if true the body will be skipped even if the message has a body 0288 MessageDecoder(std::shared_ptr<MessageDecoderListener> listener, State initial_state, 0289 int64_t initial_next_required_size, 0290 MemoryPool* pool = default_memory_pool(), bool skip_body = false); 0291 0292 virtual ~MessageDecoder(); 0293 0294 /// \brief Feed data to the decoder as a raw data. 0295 /// 0296 /// If the decoder can decode one or more messages by the data, the 0297 /// decoder calls listener->OnMessageDecoded() with a decoded 0298 /// message multiple times. 0299 /// 0300 /// If the state of the decoder is changed, corresponding callbacks 0301 /// on listener is called: 0302 /// 0303 /// * MessageDecoder::State::INITIAL: listener->OnInitial() 0304 /// * MessageDecoder::State::METADATA_LENGTH: listener->OnMetadataLength() 0305 /// * MessageDecoder::State::METADATA: listener->OnMetadata() 0306 /// * MessageDecoder::State::BODY: listener->OnBody() 0307 /// * MessageDecoder::State::EOS: listener->OnEOS() 0308 /// 0309 /// \param[in] data a raw data to be processed. This data isn't 0310 /// copied. The passed memory must be kept alive through message 0311 /// processing. 0312 /// \param[in] size raw data size. 0313 /// \return Status 0314 Status Consume(const uint8_t* data, int64_t size); 0315 0316 /// \brief Feed data to the decoder as a Buffer. 0317 /// 0318 /// If the decoder can decode one or more messages by the Buffer, 0319 /// the decoder calls listener->OnMessageDecoded() with a decoded 0320 /// message multiple times. 0321 /// 0322 /// \param[in] buffer a Buffer to be processed. 0323 /// \return Status 0324 Status Consume(std::shared_ptr<Buffer> buffer); 0325 0326 /// \brief Return the number of bytes needed to advance the state of 0327 /// the decoder. 0328 /// 0329 /// This method is provided for users who want to optimize performance. 0330 /// Normal users don't need to use this method. 0331 /// 0332 /// Here is an example usage for normal users: 0333 /// 0334 /// ~~~{.cpp} 0335 /// decoder.Consume(buffer1); 0336 /// decoder.Consume(buffer2); 0337 /// decoder.Consume(buffer3); 0338 /// ~~~ 0339 /// 0340 /// Decoder has internal buffer. If consumed data isn't enough to 0341 /// advance the state of the decoder, consumed data is buffered to 0342 /// the internal buffer. It causes performance overhead. 0343 /// 0344 /// If you pass next_required_size() size data to each Consume() 0345 /// call, the decoder doesn't use its internal buffer. It improves 0346 /// performance. 0347 /// 0348 /// Here is an example usage to avoid using internal buffer: 0349 /// 0350 /// ~~~{.cpp} 0351 /// buffer1 = get_data(decoder.next_required_size()); 0352 /// decoder.Consume(buffer1); 0353 /// buffer2 = get_data(decoder.next_required_size()); 0354 /// decoder.Consume(buffer2); 0355 /// ~~~ 0356 /// 0357 /// Users can use this method to avoid creating small 0358 /// chunks. Message body must be contiguous data. If users pass 0359 /// small chunks to the decoder, the decoder needs concatenate small 0360 /// chunks internally. It causes performance overhead. 0361 /// 0362 /// Here is an example usage to reduce small chunks: 0363 /// 0364 /// ~~~{.cpp} 0365 /// buffer = AllocateResizableBuffer(); 0366 /// while ((small_chunk = get_data(&small_chunk_size))) { 0367 /// auto current_buffer_size = buffer->size(); 0368 /// buffer->Resize(current_buffer_size + small_chunk_size); 0369 /// memcpy(buffer->mutable_data() + current_buffer_size, 0370 /// small_chunk, 0371 /// small_chunk_size); 0372 /// if (buffer->size() < decoder.next_required_size()) { 0373 /// continue; 0374 /// } 0375 /// std::shared_ptr<arrow::Buffer> chunk(buffer.release()); 0376 /// decoder.Consume(chunk); 0377 /// buffer = AllocateResizableBuffer(); 0378 /// } 0379 /// if (buffer->size() > 0) { 0380 /// std::shared_ptr<arrow::Buffer> chunk(buffer.release()); 0381 /// decoder.Consume(chunk); 0382 /// } 0383 /// ~~~ 0384 /// 0385 /// \return the number of bytes needed to advance the state of the 0386 /// decoder 0387 int64_t next_required_size() const; 0388 0389 /// \brief Return the current state of the decoder. 0390 /// 0391 /// This method is provided for users who want to optimize performance. 0392 /// Normal users don't need to use this method. 0393 /// 0394 /// Decoder doesn't need Buffer to process data on the 0395 /// MessageDecoder::State::INITIAL state and the 0396 /// MessageDecoder::State::METADATA_LENGTH. Creating Buffer has 0397 /// performance overhead. Advanced users can avoid creating Buffer 0398 /// by checking the current state of the decoder: 0399 /// 0400 /// ~~~{.cpp} 0401 /// switch (decoder.state()) { 0402 /// MessageDecoder::State::INITIAL: 0403 /// MessageDecoder::State::METADATA_LENGTH: 0404 /// { 0405 /// uint8_t data[sizeof(int32_t)]; 0406 /// auto data_size = input->Read(decoder.next_required_size(), data); 0407 /// decoder.Consume(data, data_size); 0408 /// } 0409 /// break; 0410 /// default: 0411 /// { 0412 /// auto buffer = input->Read(decoder.next_required_size()); 0413 /// decoder.Consume(buffer); 0414 /// } 0415 /// break; 0416 /// } 0417 /// ~~~ 0418 /// 0419 /// \return the current state 0420 State state() const; 0421 0422 private: 0423 class MessageDecoderImpl; 0424 std::unique_ptr<MessageDecoderImpl> impl_; 0425 0426 ARROW_DISALLOW_COPY_AND_ASSIGN(MessageDecoder); 0427 }; 0428 0429 /// \brief Abstract interface for a sequence of messages 0430 /// \since 0.5.0 0431 class ARROW_EXPORT MessageReader { 0432 public: 0433 virtual ~MessageReader() = default; 0434 0435 /// \brief Create MessageReader that reads from InputStream 0436 static std::unique_ptr<MessageReader> Open(io::InputStream* stream); 0437 0438 /// \brief Create MessageReader that reads from owned InputStream 0439 static std::unique_ptr<MessageReader> Open( 0440 const std::shared_ptr<io::InputStream>& owned_stream); 0441 0442 /// \brief Read next Message from the interface 0443 /// 0444 /// \return an arrow::ipc::Message instance 0445 virtual Result<std::unique_ptr<Message>> ReadNextMessage() = 0; 0446 }; 0447 0448 // the first parameter of the function should be a pointer to metadata (aka. 0449 // org::apache::arrow::flatbuf::RecordBatch*) 0450 using FieldsLoaderFunction = std::function<Status(const void*, io::RandomAccessFile*)>; 0451 0452 /// \brief Read encapsulated RPC message from position in file 0453 /// 0454 /// Read a length-prefixed message flatbuffer starting at the indicated file 0455 /// offset. If the message has a body with non-zero length, it will also be 0456 /// read 0457 /// 0458 /// The metadata_length includes at least the length prefix and the flatbuffer 0459 /// 0460 /// \param[in] offset the position in the file where the message starts. The 0461 /// first 4 bytes after the offset are the message length 0462 /// \param[in] metadata_length the total number of bytes to read from file 0463 /// \param[in] file the seekable file interface to read from 0464 /// \param[in] fields_loader the function for loading subset of fields from the given file 0465 /// \return the message read 0466 0467 ARROW_EXPORT 0468 Result<std::unique_ptr<Message>> ReadMessage( 0469 const int64_t offset, const int32_t metadata_length, io::RandomAccessFile* file, 0470 const FieldsLoaderFunction& fields_loader = {}); 0471 0472 /// \brief Read encapsulated RPC message from cached buffers 0473 /// 0474 /// The buffers should contain an entire message. Partial reads are not handled. 0475 /// 0476 /// This method can be used to read just the metadata by passing in a nullptr for the 0477 /// body. The body will then be skipped and the body size will not be validated. 0478 /// 0479 /// If the body buffer is provided then it must be the complete body buffer 0480 /// 0481 /// This is similar to Message::Open but performs slightly more validation (e.g. checks 0482 /// to see that the metadata length is correct and that the body is the size the metadata 0483 /// expected) 0484 /// 0485 /// \param metadata The bytes for the metadata 0486 /// \param body The bytes for the body 0487 /// \return The message represented by the buffers 0488 ARROW_EXPORT Result<std::unique_ptr<Message>> ReadMessage( 0489 std::shared_ptr<Buffer> metadata, std::shared_ptr<Buffer> body); 0490 0491 ARROW_EXPORT 0492 Future<std::shared_ptr<Message>> ReadMessageAsync( 0493 const int64_t offset, const int32_t metadata_length, const int64_t body_length, 0494 io::RandomAccessFile* file, const io::IOContext& context = io::default_io_context()); 0495 0496 /// \brief Advance stream to an 8-byte offset if its position is not a multiple 0497 /// of 8 already 0498 /// \param[in] stream an input stream 0499 /// \param[in] alignment the byte multiple for the metadata prefix, usually 8 0500 /// or 64, to ensure the body starts on a multiple of that alignment 0501 /// \return Status 0502 ARROW_EXPORT 0503 Status AlignStream(io::InputStream* stream, int32_t alignment = 8); 0504 0505 /// \brief Advance stream to an 8-byte offset if its position is not a multiple 0506 /// of 8 already 0507 /// \param[in] stream an output stream 0508 /// \param[in] alignment the byte multiple for the metadata prefix, usually 8 0509 /// or 64, to ensure the body starts on a multiple of that alignment 0510 /// \return Status 0511 ARROW_EXPORT 0512 Status AlignStream(io::OutputStream* stream, int32_t alignment = 8); 0513 0514 /// \brief Return error Status if file position is not a multiple of the 0515 /// indicated alignment 0516 ARROW_EXPORT 0517 Status CheckAligned(io::FileInterface* stream, int32_t alignment = 8); 0518 0519 /// \brief Read encapsulated IPC message (metadata and body) from InputStream 0520 /// 0521 /// Returns null if there are not enough bytes available or the 0522 /// message length is 0 (e.g. EOS in a stream) 0523 /// 0524 /// \param[in] stream an input stream 0525 /// \param[in] pool an optional MemoryPool to copy metadata on the CPU, if required 0526 /// \return Message 0527 ARROW_EXPORT 0528 Result<std::unique_ptr<Message>> ReadMessage(io::InputStream* stream, 0529 MemoryPool* pool = default_memory_pool()); 0530 0531 /// \brief Feed data from InputStream to MessageDecoder to decode an 0532 /// encapsulated IPC message (metadata and body) 0533 /// 0534 /// This API is EXPERIMENTAL. 0535 /// 0536 /// \param[in] decoder a decoder 0537 /// \param[in] stream an input stream 0538 /// \return Status 0539 /// 0540 /// \since 0.17.0 0541 ARROW_EXPORT 0542 Status DecodeMessage(MessageDecoder* decoder, io::InputStream* stream); 0543 0544 /// Write encapsulated IPC message Does not make assumptions about 0545 /// whether the stream is aligned already. Can write legacy (pre 0546 /// version 0.15.0) IPC message if option set 0547 /// 0548 /// continuation: 0xFFFFFFFF 0549 /// message_size: int32 0550 /// message: const void* 0551 /// padding 0552 /// 0553 /// 0554 /// \param[in] message a buffer containing the metadata to write 0555 /// \param[in] options IPC writing options, including alignment and 0556 /// legacy message support 0557 /// \param[in,out] file the OutputStream to write to 0558 /// \param[out] message_length the total size of the payload written including 0559 /// padding 0560 /// \return Status 0561 Status WriteMessage(const Buffer& message, const IpcWriteOptions& options, 0562 io::OutputStream* file, int32_t* message_length); 0563 0564 } // namespace ipc 0565 } // namespace arrow
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
![]() ![]() |