Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:27:00

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 // C++ object model and user API for interprocess schema messaging
0019 
0020 #pragma once
0021 
0022 #include <cstdint>
0023 #include <functional>
0024 #include <memory>
0025 #include <string>
0026 #include <utility>
0027 
0028 #include "arrow/io/type_fwd.h"
0029 #include "arrow/ipc/type_fwd.h"
0030 #include "arrow/result.h"
0031 #include "arrow/status.h"
0032 #include "arrow/type_fwd.h"
0033 #include "arrow/util/macros.h"
0034 #include "arrow/util/visibility.h"
0035 
0036 namespace arrow {
0037 namespace ipc {
0038 
0039 struct IpcWriteOptions;
0040 
0041 // Read interface classes. We do not fully deserialize the flatbuffers so that
0042 // individual fields metadata can be retrieved from very large schema without
0043 //
0044 
0045 /// \class Message
0046 /// \brief An IPC message including metadata and body
0047 class ARROW_EXPORT Message {
0048  public:
0049   /// \brief Construct message, but do not validate
0050   ///
0051   /// Use at your own risk; Message::Open has more metadata validation
0052   Message(std::shared_ptr<Buffer> metadata, std::shared_ptr<Buffer> body);
0053 
0054   ~Message();
0055 
0056   /// \brief Create and validate a Message instance from two buffers
0057   ///
0058   /// \param[in] metadata a buffer containing the Flatbuffer metadata
0059   /// \param[in] body a buffer containing the message body, which may be null
0060   /// \return the created message
0061   static Result<std::unique_ptr<Message>> Open(std::shared_ptr<Buffer> metadata,
0062                                                std::shared_ptr<Buffer> body);
0063 
0064   /// \brief Read message body and create Message given Flatbuffer metadata
0065   /// \param[in] metadata containing a serialized Message flatbuffer
0066   /// \param[in] stream an InputStream
0067   /// \return the created Message
0068   ///
0069   /// \note If stream supports zero-copy, this is zero-copy
0070   static Result<std::unique_ptr<Message>> ReadFrom(std::shared_ptr<Buffer> metadata,
0071                                                    io::InputStream* stream);
0072 
0073   /// \brief Read message body from position in file, and create Message given
0074   /// the Flatbuffer metadata
0075   /// \param[in] offset the position in the file where the message body starts.
0076   /// \param[in] metadata containing a serialized Message flatbuffer
0077   /// \param[in] file the seekable file interface to read from
0078   /// \return the created Message
0079   ///
0080   /// \note If file supports zero-copy, this is zero-copy
0081   static Result<std::unique_ptr<Message>> ReadFrom(const int64_t offset,
0082                                                    std::shared_ptr<Buffer> metadata,
0083                                                    io::RandomAccessFile* file);
0084 
0085   /// \brief Return true if message type and contents are equal
0086   ///
0087   /// \param other another message
0088   /// \return true if contents equal
0089   bool Equals(const Message& other) const;
0090 
0091   /// \brief the Message metadata
0092   ///
0093   /// \return buffer
0094   std::shared_ptr<Buffer> metadata() const;
0095 
0096   /// \brief Custom metadata serialized in metadata Flatbuffer. Returns nullptr
0097   /// when none set
0098   const std::shared_ptr<const KeyValueMetadata>& custom_metadata() const;
0099 
0100   /// \brief the Message body, if any
0101   ///
0102   /// \return buffer is null if no body
0103   std::shared_ptr<Buffer> body() const;
0104 
0105   /// \brief The expected body length according to the metadata, for
0106   /// verification purposes
0107   int64_t body_length() const;
0108 
0109   /// \brief The Message type
0110   MessageType type() const;
0111 
0112   /// \brief The Message metadata version
0113   MetadataVersion metadata_version() const;
0114 
0115   const void* header() const;
0116 
0117   /// \brief Write length-prefixed metadata and body to output stream
0118   ///
0119   /// \param[in] file output stream to write to
0120   /// \param[in] options IPC writing options including alignment
0121   /// \param[out] output_length the number of bytes written
0122   /// \return Status
0123   Status SerializeTo(io::OutputStream* file, const IpcWriteOptions& options,
0124                      int64_t* output_length) const;
0125 
0126   /// \brief Return true if the Message metadata passes Flatbuffer validation
0127   bool Verify() const;
0128 
0129   /// \brief Whether a given message type needs a body.
0130   static bool HasBody(MessageType type) {
0131     return type != MessageType::NONE && type != MessageType::SCHEMA;
0132   }
0133 
0134  private:
0135   // Hide serialization details from user API
0136   class MessageImpl;
0137   std::unique_ptr<MessageImpl> impl_;
0138 
0139   ARROW_DISALLOW_COPY_AND_ASSIGN(Message);
0140 };
0141 
0142 ARROW_EXPORT std::string FormatMessageType(MessageType type);
0143 
0144 /// \class MessageDecoderListener
0145 /// \brief An abstract class to listen events from MessageDecoder.
0146 ///
0147 /// This API is EXPERIMENTAL.
0148 ///
0149 /// \since 0.17.0
0150 class ARROW_EXPORT MessageDecoderListener {
0151  public:
0152   virtual ~MessageDecoderListener() = default;
0153 
0154   /// \brief Called when a message is decoded.
0155   ///
0156   /// MessageDecoder calls this method when it decodes a message. This
0157   /// method is called multiple times when the target stream has
0158   /// multiple messages.
0159   ///
0160   /// \param[in] message a decoded message
0161   /// \return Status
0162   virtual Status OnMessageDecoded(std::unique_ptr<Message> message) = 0;
0163 
0164   /// \brief Called when the decoder state is changed to
0165   /// MessageDecoder::State::INITIAL.
0166   ///
0167   /// The default implementation just returns arrow::Status::OK().
0168   ///
0169   /// \return Status
0170   virtual Status OnInitial();
0171 
0172   /// \brief Called when the decoder state is changed to
0173   /// MessageDecoder::State::METADATA_LENGTH.
0174   ///
0175   /// The default implementation just returns arrow::Status::OK().
0176   ///
0177   /// \return Status
0178   virtual Status OnMetadataLength();
0179 
0180   /// \brief Called when the decoder state is changed to
0181   /// MessageDecoder::State::METADATA.
0182   ///
0183   /// The default implementation just returns arrow::Status::OK().
0184   ///
0185   /// \return Status
0186   virtual Status OnMetadata();
0187 
0188   /// \brief Called when the decoder state is changed to
0189   /// MessageDecoder::State::BODY.
0190   ///
0191   /// The default implementation just returns arrow::Status::OK().
0192   ///
0193   /// \return Status
0194   virtual Status OnBody();
0195 
0196   /// \brief Called when the decoder state is changed to
0197   /// MessageDecoder::State::EOS.
0198   ///
0199   /// The default implementation just returns arrow::Status::OK().
0200   ///
0201   /// \return Status
0202   virtual Status OnEOS();
0203 };
0204 
0205 /// \class AssignMessageDecoderListener
0206 /// \brief Assign a message decoded by MessageDecoder.
0207 ///
0208 /// This API is EXPERIMENTAL.
0209 ///
0210 /// \since 0.17.0
0211 class ARROW_EXPORT AssignMessageDecoderListener : public MessageDecoderListener {
0212  public:
0213   /// \brief Construct a listener that assigns a decoded message to the
0214   /// specified location.
0215   ///
0216   /// \param[in] message a location to store the received message
0217   explicit AssignMessageDecoderListener(std::unique_ptr<Message>* message)
0218       : message_(message) {}
0219 
0220   virtual ~AssignMessageDecoderListener() = default;
0221 
0222   Status OnMessageDecoded(std::unique_ptr<Message> message) override {
0223     *message_ = std::move(message);
0224     return Status::OK();
0225   }
0226 
0227  private:
0228   std::unique_ptr<Message>* message_;
0229 
0230   ARROW_DISALLOW_COPY_AND_ASSIGN(AssignMessageDecoderListener);
0231 };
0232 
0233 /// \class MessageDecoder
0234 /// \brief Push style message decoder that receives data from user.
0235 ///
0236 /// This API is EXPERIMENTAL.
0237 ///
0238 /// \since 0.17.0
0239 class ARROW_EXPORT MessageDecoder {
0240  public:
0241   /// \brief State for reading a message
0242   enum State {
0243     /// The initial state. It requires one of the followings as the next data:
0244     ///
0245     ///   * int32_t continuation token
0246     ///   * int32_t end-of-stream mark (== 0)
0247     ///   * int32_t metadata length (backward compatibility for
0248     ///     reading old IPC messages produced prior to version 0.15.0
0249     INITIAL,
0250 
0251     /// It requires int32_t metadata length.
0252     METADATA_LENGTH,
0253 
0254     /// It requires metadata.
0255     METADATA,
0256 
0257     /// It requires message body.
0258     BODY,
0259 
0260     /// The end-of-stream state. No more data is processed.
0261     EOS,
0262   };
0263 
0264   /// \brief Construct a message decoder.
0265   ///
0266   /// \param[in] listener a MessageDecoderListener that responds events from
0267   /// the decoder
0268   /// \param[in] pool an optional MemoryPool to copy metadata on the
0269   /// \param[in] skip_body if true the body will be skipped even if the message has a body
0270   /// CPU, if required
0271   explicit MessageDecoder(std::shared_ptr<MessageDecoderListener> listener,
0272                           MemoryPool* pool = default_memory_pool(),
0273                           bool skip_body = false);
0274 
0275   /// \brief Construct a message decoder with the specified state.
0276   ///
0277   /// This is a construct for advanced users that know how to decode
0278   /// Message.
0279   ///
0280   /// \param[in] listener a MessageDecoderListener that responds events from
0281   /// the decoder
0282   /// \param[in] initial_state an initial state of the decode
0283   /// \param[in] initial_next_required_size the number of bytes needed
0284   /// to run the next action
0285   /// \param[in] pool an optional MemoryPool to copy metadata on the
0286   /// CPU, if required
0287   /// \param[in] skip_body if true the body will be skipped even if the message has a body
0288   MessageDecoder(std::shared_ptr<MessageDecoderListener> listener, State initial_state,
0289                  int64_t initial_next_required_size,
0290                  MemoryPool* pool = default_memory_pool(), bool skip_body = false);
0291 
0292   virtual ~MessageDecoder();
0293 
0294   /// \brief Feed data to the decoder as a raw data.
0295   ///
0296   /// If the decoder can decode one or more messages by the data, the
0297   /// decoder calls listener->OnMessageDecoded() with a decoded
0298   /// message multiple times.
0299   ///
0300   /// If the state of the decoder is changed, corresponding callbacks
0301   /// on listener is called:
0302   ///
0303   /// * MessageDecoder::State::INITIAL: listener->OnInitial()
0304   /// * MessageDecoder::State::METADATA_LENGTH: listener->OnMetadataLength()
0305   /// * MessageDecoder::State::METADATA: listener->OnMetadata()
0306   /// * MessageDecoder::State::BODY: listener->OnBody()
0307   /// * MessageDecoder::State::EOS: listener->OnEOS()
0308   ///
0309   /// \param[in] data a raw data to be processed. This data isn't
0310   /// copied. The passed memory must be kept alive through message
0311   /// processing.
0312   /// \param[in] size raw data size.
0313   /// \return Status
0314   Status Consume(const uint8_t* data, int64_t size);
0315 
0316   /// \brief Feed data to the decoder as a Buffer.
0317   ///
0318   /// If the decoder can decode one or more messages by the Buffer,
0319   /// the decoder calls listener->OnMessageDecoded() with a decoded
0320   /// message multiple times.
0321   ///
0322   /// \param[in] buffer a Buffer to be processed.
0323   /// \return Status
0324   Status Consume(std::shared_ptr<Buffer> buffer);
0325 
0326   /// \brief Return the number of bytes needed to advance the state of
0327   /// the decoder.
0328   ///
0329   /// This method is provided for users who want to optimize performance.
0330   /// Normal users don't need to use this method.
0331   ///
0332   /// Here is an example usage for normal users:
0333   ///
0334   /// ~~~{.cpp}
0335   /// decoder.Consume(buffer1);
0336   /// decoder.Consume(buffer2);
0337   /// decoder.Consume(buffer3);
0338   /// ~~~
0339   ///
0340   /// Decoder has internal buffer. If consumed data isn't enough to
0341   /// advance the state of the decoder, consumed data is buffered to
0342   /// the internal buffer. It causes performance overhead.
0343   ///
0344   /// If you pass next_required_size() size data to each Consume()
0345   /// call, the decoder doesn't use its internal buffer. It improves
0346   /// performance.
0347   ///
0348   /// Here is an example usage to avoid using internal buffer:
0349   ///
0350   /// ~~~{.cpp}
0351   /// buffer1 = get_data(decoder.next_required_size());
0352   /// decoder.Consume(buffer1);
0353   /// buffer2 = get_data(decoder.next_required_size());
0354   /// decoder.Consume(buffer2);
0355   /// ~~~
0356   ///
0357   /// Users can use this method to avoid creating small
0358   /// chunks. Message body must be contiguous data. If users pass
0359   /// small chunks to the decoder, the decoder needs concatenate small
0360   /// chunks internally. It causes performance overhead.
0361   ///
0362   /// Here is an example usage to reduce small chunks:
0363   ///
0364   /// ~~~{.cpp}
0365   /// buffer = AllocateResizableBuffer();
0366   /// while ((small_chunk = get_data(&small_chunk_size))) {
0367   ///   auto current_buffer_size = buffer->size();
0368   ///   buffer->Resize(current_buffer_size + small_chunk_size);
0369   ///   memcpy(buffer->mutable_data() + current_buffer_size,
0370   ///          small_chunk,
0371   ///          small_chunk_size);
0372   ///   if (buffer->size() < decoder.next_required_size()) {
0373   ///     continue;
0374   ///   }
0375   ///   std::shared_ptr<arrow::Buffer> chunk(buffer.release());
0376   ///   decoder.Consume(chunk);
0377   ///   buffer = AllocateResizableBuffer();
0378   /// }
0379   /// if (buffer->size() > 0) {
0380   ///   std::shared_ptr<arrow::Buffer> chunk(buffer.release());
0381   ///   decoder.Consume(chunk);
0382   /// }
0383   /// ~~~
0384   ///
0385   /// \return the number of bytes needed to advance the state of the
0386   /// decoder
0387   int64_t next_required_size() const;
0388 
0389   /// \brief Return the current state of the decoder.
0390   ///
0391   /// This method is provided for users who want to optimize performance.
0392   /// Normal users don't need to use this method.
0393   ///
0394   /// Decoder doesn't need Buffer to process data on the
0395   /// MessageDecoder::State::INITIAL state and the
0396   /// MessageDecoder::State::METADATA_LENGTH. Creating Buffer has
0397   /// performance overhead. Advanced users can avoid creating Buffer
0398   /// by checking the current state of the decoder:
0399   ///
0400   /// ~~~{.cpp}
0401   /// switch (decoder.state()) {
0402   ///   MessageDecoder::State::INITIAL:
0403   ///   MessageDecoder::State::METADATA_LENGTH:
0404   ///     {
0405   ///       uint8_t data[sizeof(int32_t)];
0406   ///       auto data_size = input->Read(decoder.next_required_size(), data);
0407   ///       decoder.Consume(data, data_size);
0408   ///     }
0409   ///     break;
0410   ///   default:
0411   ///     {
0412   ///       auto buffer = input->Read(decoder.next_required_size());
0413   ///       decoder.Consume(buffer);
0414   ///     }
0415   ///     break;
0416   /// }
0417   /// ~~~
0418   ///
0419   /// \return the current state
0420   State state() const;
0421 
0422  private:
0423   class MessageDecoderImpl;
0424   std::unique_ptr<MessageDecoderImpl> impl_;
0425 
0426   ARROW_DISALLOW_COPY_AND_ASSIGN(MessageDecoder);
0427 };
0428 
0429 /// \brief Abstract interface for a sequence of messages
0430 /// \since 0.5.0
0431 class ARROW_EXPORT MessageReader {
0432  public:
0433   virtual ~MessageReader() = default;
0434 
0435   /// \brief Create MessageReader that reads from InputStream
0436   static std::unique_ptr<MessageReader> Open(io::InputStream* stream);
0437 
0438   /// \brief Create MessageReader that reads from owned InputStream
0439   static std::unique_ptr<MessageReader> Open(
0440       const std::shared_ptr<io::InputStream>& owned_stream);
0441 
0442   /// \brief Read next Message from the interface
0443   ///
0444   /// \return an arrow::ipc::Message instance
0445   virtual Result<std::unique_ptr<Message>> ReadNextMessage() = 0;
0446 };
0447 
0448 // the first parameter of the function should be a pointer to metadata (aka.
0449 // org::apache::arrow::flatbuf::RecordBatch*)
0450 using FieldsLoaderFunction = std::function<Status(const void*, io::RandomAccessFile*)>;
0451 
0452 /// \brief Read encapsulated RPC message from position in file
0453 ///
0454 /// Read a length-prefixed message flatbuffer starting at the indicated file
0455 /// offset. If the message has a body with non-zero length, it will also be
0456 /// read
0457 ///
0458 /// The metadata_length includes at least the length prefix and the flatbuffer
0459 ///
0460 /// \param[in] offset the position in the file where the message starts. The
0461 /// first 4 bytes after the offset are the message length
0462 /// \param[in] metadata_length the total number of bytes to read from file
0463 /// \param[in] file the seekable file interface to read from
0464 /// \param[in] fields_loader the function for loading subset of fields from the given file
0465 /// \return the message read
0466 
0467 ARROW_EXPORT
0468 Result<std::unique_ptr<Message>> ReadMessage(
0469     const int64_t offset, const int32_t metadata_length, io::RandomAccessFile* file,
0470     const FieldsLoaderFunction& fields_loader = {});
0471 
0472 /// \brief Read encapsulated RPC message from cached buffers
0473 ///
0474 /// The buffers should contain an entire message.  Partial reads are not handled.
0475 ///
0476 /// This method can be used to read just the metadata by passing in a nullptr for the
0477 /// body.  The body will then be skipped and the body size will not be validated.
0478 ///
0479 /// If the body buffer is provided then it must be the complete body buffer
0480 ///
0481 /// This is similar to Message::Open but performs slightly more validation (e.g. checks
0482 /// to see that the metadata length is correct and that the body is the size the metadata
0483 /// expected)
0484 ///
0485 /// \param metadata The bytes for the metadata
0486 /// \param body The bytes for the body
0487 /// \return The message represented by the buffers
0488 ARROW_EXPORT Result<std::unique_ptr<Message>> ReadMessage(
0489     std::shared_ptr<Buffer> metadata, std::shared_ptr<Buffer> body);
0490 
0491 ARROW_EXPORT
0492 Future<std::shared_ptr<Message>> ReadMessageAsync(
0493     const int64_t offset, const int32_t metadata_length, const int64_t body_length,
0494     io::RandomAccessFile* file, const io::IOContext& context = io::default_io_context());
0495 
0496 /// \brief Advance stream to an 8-byte offset if its position is not a multiple
0497 /// of 8 already
0498 /// \param[in] stream an input stream
0499 /// \param[in] alignment the byte multiple for the metadata prefix, usually 8
0500 /// or 64, to ensure the body starts on a multiple of that alignment
0501 /// \return Status
0502 ARROW_EXPORT
0503 Status AlignStream(io::InputStream* stream, int32_t alignment = 8);
0504 
0505 /// \brief Advance stream to an 8-byte offset if its position is not a multiple
0506 /// of 8 already
0507 /// \param[in] stream an output stream
0508 /// \param[in] alignment the byte multiple for the metadata prefix, usually 8
0509 /// or 64, to ensure the body starts on a multiple of that alignment
0510 /// \return Status
0511 ARROW_EXPORT
0512 Status AlignStream(io::OutputStream* stream, int32_t alignment = 8);
0513 
0514 /// \brief Return error Status if file position is not a multiple of the
0515 /// indicated alignment
0516 ARROW_EXPORT
0517 Status CheckAligned(io::FileInterface* stream, int32_t alignment = 8);
0518 
0519 /// \brief Read encapsulated IPC message (metadata and body) from InputStream
0520 ///
0521 /// Returns null if there are not enough bytes available or the
0522 /// message length is 0 (e.g. EOS in a stream)
0523 ///
0524 /// \param[in] stream an input stream
0525 /// \param[in] pool an optional MemoryPool to copy metadata on the CPU, if required
0526 /// \return Message
0527 ARROW_EXPORT
0528 Result<std::unique_ptr<Message>> ReadMessage(io::InputStream* stream,
0529                                              MemoryPool* pool = default_memory_pool());
0530 
0531 /// \brief Feed data from InputStream to MessageDecoder to decode an
0532 /// encapsulated IPC message (metadata and body)
0533 ///
0534 /// This API is EXPERIMENTAL.
0535 ///
0536 /// \param[in] decoder a decoder
0537 /// \param[in] stream an input stream
0538 /// \return Status
0539 ///
0540 /// \since 0.17.0
0541 ARROW_EXPORT
0542 Status DecodeMessage(MessageDecoder* decoder, io::InputStream* stream);
0543 
0544 /// Write encapsulated IPC message Does not make assumptions about
0545 /// whether the stream is aligned already. Can write legacy (pre
0546 /// version 0.15.0) IPC message if option set
0547 ///
0548 /// continuation: 0xFFFFFFFF
0549 /// message_size: int32
0550 /// message: const void*
0551 /// padding
0552 ///
0553 ///
0554 /// \param[in] message a buffer containing the metadata to write
0555 /// \param[in] options IPC writing options, including alignment and
0556 /// legacy message support
0557 /// \param[in,out] file the OutputStream to write to
0558 /// \param[out] message_length the total size of the payload written including
0559 /// padding
0560 /// \return Status
0561 Status WriteMessage(const Buffer& message, const IpcWriteOptions& options,
0562                     io::OutputStream* file, int32_t* message_length);
0563 
0564 }  // namespace ipc
0565 }  // namespace arrow