Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:27:00

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 // Implement Arrow streaming binary format
0019 
0020 #pragma once
0021 
0022 #include <cstdint>
0023 #include <memory>
0024 #include <vector>
0025 
0026 #include "arrow/ipc/dictionary.h"  // IWYU pragma: export
0027 #include "arrow/ipc/message.h"
0028 #include "arrow/ipc/options.h"
0029 #include "arrow/result.h"
0030 #include "arrow/util/macros.h"
0031 #include "arrow/util/visibility.h"
0032 
0033 namespace arrow {
0034 
0035 class Array;
0036 class Buffer;
0037 class MemoryManager;
0038 class MemoryPool;
0039 class RecordBatch;
0040 class Schema;
0041 class Status;
0042 class Table;
0043 class Tensor;
0044 class SparseTensor;
0045 
0046 namespace io {
0047 
0048 class OutputStream;
0049 
0050 }  // namespace io
0051 
0052 namespace ipc {
0053 
0054 /// \brief Intermediate data structure with metadata header, and zero
0055 /// or more buffers for the message body.
0056 struct IpcPayload {
0057   MessageType type = MessageType::NONE;
0058   std::shared_ptr<Buffer> metadata;
0059   std::vector<std::shared_ptr<Buffer>> body_buffers;
0060   std::vector<int64_t> variadic_buffer_counts;
0061   int64_t body_length = 0;      // serialized body length (padded, maybe compressed)
0062   int64_t raw_body_length = 0;  // initial uncompressed body length
0063 };
0064 
0065 struct WriteStats {
0066   /// Number of IPC messages written.
0067   int64_t num_messages = 0;
0068   /// Number of record batches written.
0069   int64_t num_record_batches = 0;
0070   /// Number of dictionary batches written.
0071   ///
0072   /// Note: num_dictionary_batches >= num_dictionary_deltas + num_replaced_dictionaries
0073   int64_t num_dictionary_batches = 0;
0074 
0075   /// Number of dictionary deltas written.
0076   int64_t num_dictionary_deltas = 0;
0077   /// Number of replaced dictionaries (i.e. where a dictionary batch replaces
0078   /// an existing dictionary with an unrelated new dictionary).
0079   int64_t num_replaced_dictionaries = 0;
0080 
0081   /// Total size in bytes of record batches emitted.
0082   /// The "raw" size counts the original buffer sizes, while the "serialized" size
0083   /// includes padding and (optionally) compression.
0084   int64_t total_raw_body_size = 0;
0085   int64_t total_serialized_body_size = 0;
0086 };
0087 
0088 /// \class RecordBatchWriter
0089 /// \brief Abstract interface for writing a stream of record batches
0090 class ARROW_EXPORT RecordBatchWriter {
0091  public:
0092   virtual ~RecordBatchWriter();
0093 
0094   /// \brief Write a record batch to the stream
0095   ///
0096   /// \param[in] batch the record batch to write to the stream
0097   /// \return Status
0098   virtual Status WriteRecordBatch(const RecordBatch& batch) = 0;
0099 
0100   /// \brief Write a record batch with custom metadata to the stream
0101   ///
0102   /// \param[in] batch the record batch to write to the stream
0103   /// \param[in] custom_metadata the record batch's custom metadata to write to the stream
0104   /// \return Status
0105   virtual Status WriteRecordBatch(
0106       const RecordBatch& batch,
0107       const std::shared_ptr<const KeyValueMetadata>& custom_metadata);
0108 
0109   /// \brief Write possibly-chunked table by creating sequence of record batches
0110   /// \param[in] table table to write
0111   /// \return Status
0112   Status WriteTable(const Table& table);
0113 
0114   /// \brief Write Table with a particular chunksize
0115   /// \param[in] table table to write
0116   /// \param[in] max_chunksize maximum number of rows for table chunks. To
0117   /// indicate that no maximum should be enforced, pass -1.
0118   /// \return Status
0119   virtual Status WriteTable(const Table& table, int64_t max_chunksize);
0120 
0121   /// \brief Perform any logic necessary to finish the stream
0122   ///
0123   /// \return Status
0124   virtual Status Close() = 0;
0125 
0126   /// \brief Return current write statistics
0127   virtual WriteStats stats() const = 0;
0128 };
0129 
0130 /// \defgroup record-batch-writer-factories Functions for creating RecordBatchWriter
0131 /// instances
0132 ///
0133 /// @{
0134 
0135 /// Create a new IPC stream writer from stream sink and schema. User is
0136 /// responsible for closing the actual OutputStream.
0137 ///
0138 /// \param[in] sink output stream to write to
0139 /// \param[in] schema the schema of the record batches to be written
0140 /// \param[in] options options for serialization
0141 /// \return Result<std::shared_ptr<RecordBatchWriter>>
0142 ARROW_EXPORT
0143 Result<std::shared_ptr<RecordBatchWriter>> MakeStreamWriter(
0144     io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
0145     const IpcWriteOptions& options = IpcWriteOptions::Defaults());
0146 
0147 /// Create a new IPC stream writer from stream sink and schema. User is
0148 /// responsible for closing the actual OutputStream.
0149 ///
0150 /// \param[in] sink output stream to write to
0151 /// \param[in] schema the schema of the record batches to be written
0152 /// \param[in] options options for serialization
0153 /// \return Result<std::shared_ptr<RecordBatchWriter>>
0154 ARROW_EXPORT
0155 Result<std::shared_ptr<RecordBatchWriter>> MakeStreamWriter(
0156     std::shared_ptr<io::OutputStream> sink, const std::shared_ptr<Schema>& schema,
0157     const IpcWriteOptions& options = IpcWriteOptions::Defaults());
0158 
0159 /// Create a new IPC file writer from stream sink and schema
0160 ///
0161 /// \param[in] sink output stream to write to
0162 /// \param[in] schema the schema of the record batches to be written
0163 /// \param[in] options options for serialization, optional
0164 /// \param[in] metadata custom metadata for File Footer, optional
0165 /// \return Result<std::shared_ptr<RecordBatchWriter>>
0166 ARROW_EXPORT
0167 Result<std::shared_ptr<RecordBatchWriter>> MakeFileWriter(
0168     io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
0169     const IpcWriteOptions& options = IpcWriteOptions::Defaults(),
0170     const std::shared_ptr<const KeyValueMetadata>& metadata = NULLPTR);
0171 
0172 /// Create a new IPC file writer from stream sink and schema
0173 ///
0174 /// \param[in] sink output stream to write to
0175 /// \param[in] schema the schema of the record batches to be written
0176 /// \param[in] options options for serialization, optional
0177 /// \param[in] metadata custom metadata for File Footer, optional
0178 /// \return Result<std::shared_ptr<RecordBatchWriter>>
0179 ARROW_EXPORT
0180 Result<std::shared_ptr<RecordBatchWriter>> MakeFileWriter(
0181     std::shared_ptr<io::OutputStream> sink, const std::shared_ptr<Schema>& schema,
0182     const IpcWriteOptions& options = IpcWriteOptions::Defaults(),
0183     const std::shared_ptr<const KeyValueMetadata>& metadata = NULLPTR);
0184 
0185 /// @}
0186 
0187 /// \brief Low-level API for writing a record batch (without schema)
0188 /// to an OutputStream as encapsulated IPC message. See Arrow format
0189 /// documentation for more detail.
0190 ///
0191 /// \param[in] batch the record batch to write
0192 /// \param[in] buffer_start_offset the start offset to use in the buffer metadata,
0193 /// generally should be 0
0194 /// \param[in] dst an OutputStream
0195 /// \param[out] metadata_length the size of the length-prefixed flatbuffer
0196 /// including padding to a 64-byte boundary
0197 /// \param[out] body_length the size of the contiguous buffer block plus
0198 /// \param[in] options options for serialization
0199 /// \return Status
0200 ARROW_EXPORT
0201 Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
0202                         io::OutputStream* dst, int32_t* metadata_length,
0203                         int64_t* body_length, const IpcWriteOptions& options);
0204 
0205 /// \brief Serialize record batch as encapsulated IPC message in a new buffer
0206 ///
0207 /// \param[in] batch the record batch
0208 /// \param[in] options the IpcWriteOptions to use for serialization
0209 /// \return the serialized message
0210 ARROW_EXPORT
0211 Result<std::shared_ptr<Buffer>> SerializeRecordBatch(const RecordBatch& batch,
0212                                                      const IpcWriteOptions& options);
0213 
0214 /// \brief Serialize record batch as encapsulated IPC message in a new buffer
0215 ///
0216 /// \param[in] batch the record batch
0217 /// \param[in] mm a MemoryManager to allocate memory from
0218 /// \return the serialized message
0219 ARROW_EXPORT
0220 Result<std::shared_ptr<Buffer>> SerializeRecordBatch(const RecordBatch& batch,
0221                                                      std::shared_ptr<MemoryManager> mm);
0222 
0223 /// \brief Write record batch to OutputStream
0224 ///
0225 /// \param[in] batch the record batch to write
0226 /// \param[in] options the IpcWriteOptions to use for serialization
0227 /// \param[in] out the OutputStream to write the output to
0228 /// \return Status
0229 ///
0230 /// If writing to pre-allocated memory, you can use
0231 /// arrow::ipc::GetRecordBatchSize to compute how much space is required
0232 ARROW_EXPORT
0233 Status SerializeRecordBatch(const RecordBatch& batch, const IpcWriteOptions& options,
0234                             io::OutputStream* out);
0235 
0236 /// \brief Serialize schema as encapsulated IPC message
0237 ///
0238 /// \param[in] schema the schema to write
0239 /// \param[in] pool a MemoryPool to allocate memory from
0240 /// \return the serialized schema
0241 ARROW_EXPORT
0242 Result<std::shared_ptr<Buffer>> SerializeSchema(const Schema& schema,
0243                                                 MemoryPool* pool = default_memory_pool());
0244 
0245 /// \brief Write multiple record batches to OutputStream, including schema
0246 /// \param[in] batches a vector of batches. Must all have same schema
0247 /// \param[in] options options for serialization
0248 /// \param[out] dst an OutputStream
0249 /// \return Status
0250 ARROW_EXPORT
0251 Status WriteRecordBatchStream(const std::vector<std::shared_ptr<RecordBatch>>& batches,
0252                               const IpcWriteOptions& options, io::OutputStream* dst);
0253 
0254 /// \brief Compute the number of bytes needed to write an IPC payload
0255 ///     including metadata
0256 ///
0257 /// \param[in] payload the IPC payload to write
0258 /// \param[in] options write options
0259 /// \return the size of the complete encapsulated message
0260 ARROW_EXPORT
0261 int64_t GetPayloadSize(const IpcPayload& payload,
0262                        const IpcWriteOptions& options = IpcWriteOptions::Defaults());
0263 
0264 /// \brief Compute the number of bytes needed to write a record batch including metadata
0265 ///
0266 /// \param[in] batch the record batch to write
0267 /// \param[out] size the size of the complete encapsulated message
0268 /// \return Status
0269 ARROW_EXPORT
0270 Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size);
0271 
0272 /// \brief Compute the number of bytes needed to write a record batch including metadata
0273 ///
0274 /// \param[in] batch the record batch to write
0275 /// \param[in] options options for serialization
0276 /// \param[out] size the size of the complete encapsulated message
0277 /// \return Status
0278 ARROW_EXPORT
0279 Status GetRecordBatchSize(const RecordBatch& batch, const IpcWriteOptions& options,
0280                           int64_t* size);
0281 
0282 /// \brief Compute the number of bytes needed to write a tensor including metadata
0283 ///
0284 /// \param[in] tensor the tensor to write
0285 /// \param[out] size the size of the complete encapsulated message
0286 /// \return Status
0287 ARROW_EXPORT
0288 Status GetTensorSize(const Tensor& tensor, int64_t* size);
0289 
0290 /// \brief EXPERIMENTAL: Convert arrow::Tensor to a Message with minimal memory
0291 /// allocation
0292 ///
0293 /// \param[in] tensor the Tensor to write
0294 /// \param[in] pool MemoryPool to allocate space for metadata
0295 /// \return the resulting Message
0296 ARROW_EXPORT
0297 Result<std::unique_ptr<Message>> GetTensorMessage(const Tensor& tensor, MemoryPool* pool);
0298 
0299 /// \brief Write arrow::Tensor as a contiguous message.
0300 ///
0301 /// The metadata and body are written assuming 64-byte alignment. It is the
0302 /// user's responsibility to ensure that the OutputStream has been aligned
0303 /// to a 64-byte multiple before writing the message.
0304 ///
0305 /// The message is written out as followed:
0306 /// \code
0307 /// <metadata size> <metadata> <tensor data>
0308 /// \endcode
0309 ///
0310 /// \param[in] tensor the Tensor to write
0311 /// \param[in] dst the OutputStream to write to
0312 /// \param[out] metadata_length the actual metadata length, including padding
0313 /// \param[out] body_length the actual message body length
0314 /// \return Status
0315 ARROW_EXPORT
0316 Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadata_length,
0317                    int64_t* body_length);
0318 
0319 /// \brief EXPERIMENTAL: Convert arrow::SparseTensor to a Message with minimal memory
0320 /// allocation
0321 ///
0322 /// The message is written out as followed:
0323 /// \code
0324 /// <metadata size> <metadata> <sparse index> <sparse tensor body>
0325 /// \endcode
0326 ///
0327 /// \param[in] sparse_tensor the SparseTensor to write
0328 /// \param[in] pool MemoryPool to allocate space for metadata
0329 /// \return the resulting Message
0330 ARROW_EXPORT
0331 Result<std::unique_ptr<Message>> GetSparseTensorMessage(const SparseTensor& sparse_tensor,
0332                                                         MemoryPool* pool);
0333 
0334 /// \brief EXPERIMENTAL: Write arrow::SparseTensor as a contiguous message. The metadata,
0335 /// sparse index, and body are written assuming 64-byte alignment. It is the
0336 /// user's responsibility to ensure that the OutputStream has been aligned
0337 /// to a 64-byte multiple before writing the message.
0338 ///
0339 /// \param[in] sparse_tensor the SparseTensor to write
0340 /// \param[in] dst the OutputStream to write to
0341 /// \param[out] metadata_length the actual metadata length, including padding
0342 /// \param[out] body_length the actual message body length
0343 /// \return Status
0344 ARROW_EXPORT
0345 Status WriteSparseTensor(const SparseTensor& sparse_tensor, io::OutputStream* dst,
0346                          int32_t* metadata_length, int64_t* body_length);
0347 
0348 /// \brief Compute IpcPayload for the given schema
0349 /// \param[in] schema the Schema that is being serialized
0350 /// \param[in] options options for serialization
0351 /// \param[in] mapper object mapping dictionary fields to dictionary ids
0352 /// \param[out] out the returned vector of IpcPayloads
0353 /// \return Status
0354 ARROW_EXPORT
0355 Status GetSchemaPayload(const Schema& schema, const IpcWriteOptions& options,
0356                         const DictionaryFieldMapper& mapper, IpcPayload* out);
0357 
0358 /// \brief Compute IpcPayload for a dictionary
0359 /// \param[in] id the dictionary id
0360 /// \param[in] dictionary the dictionary values
0361 /// \param[in] options options for serialization
0362 /// \param[out] payload the output IpcPayload
0363 /// \return Status
0364 ARROW_EXPORT
0365 Status GetDictionaryPayload(int64_t id, const std::shared_ptr<Array>& dictionary,
0366                             const IpcWriteOptions& options, IpcPayload* payload);
0367 
0368 /// \brief Compute IpcPayload for a dictionary
0369 /// \param[in] id the dictionary id
0370 /// \param[in] is_delta whether the dictionary is a delta dictionary
0371 /// \param[in] dictionary the dictionary values
0372 /// \param[in] options options for serialization
0373 /// \param[out] payload the output IpcPayload
0374 /// \return Status
0375 ARROW_EXPORT
0376 Status GetDictionaryPayload(int64_t id, bool is_delta,
0377                             const std::shared_ptr<Array>& dictionary,
0378                             const IpcWriteOptions& options, IpcPayload* payload);
0379 
0380 /// \brief Compute IpcPayload for the given record batch
0381 /// \param[in] batch the RecordBatch that is being serialized
0382 /// \param[in] options options for serialization
0383 /// \param[out] out the returned IpcPayload
0384 /// \return Status
0385 ARROW_EXPORT
0386 Status GetRecordBatchPayload(const RecordBatch& batch, const IpcWriteOptions& options,
0387                              IpcPayload* out);
0388 
0389 /// \brief Compute IpcPayload for the given record batch and custom metadata
0390 /// \param[in] batch the RecordBatch that is being serialized
0391 /// \param[in] custom_metadata the custom metadata to be serialized with the record batch
0392 /// \param[in] options options for serialization
0393 /// \param[out] out the returned IpcPayload
0394 /// \return Status
0395 ARROW_EXPORT
0396 Status GetRecordBatchPayload(
0397     const RecordBatch& batch,
0398     const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
0399     const IpcWriteOptions& options, IpcPayload* out);
0400 
0401 /// \brief Write an IPC payload to the given stream.
0402 /// \param[in] payload the payload to write
0403 /// \param[in] options options for serialization
0404 /// \param[in] dst The stream to write the payload to.
0405 /// \param[out] metadata_length the length of the serialized metadata
0406 /// \return Status
0407 ARROW_EXPORT
0408 Status WriteIpcPayload(const IpcPayload& payload, const IpcWriteOptions& options,
0409                        io::OutputStream* dst, int32_t* metadata_length);
0410 
0411 /// \brief Compute IpcPayload for the given sparse tensor
0412 /// \param[in] sparse_tensor the SparseTensor that is being serialized
0413 /// \param[in,out] pool for any required temporary memory allocations
0414 /// \param[out] out the returned IpcPayload
0415 /// \return Status
0416 ARROW_EXPORT
0417 Status GetSparseTensorPayload(const SparseTensor& sparse_tensor, MemoryPool* pool,
0418                               IpcPayload* out);
0419 
0420 namespace internal {
0421 
0422 // These internal APIs may change without warning or deprecation
0423 
0424 class ARROW_EXPORT IpcPayloadWriter {
0425  public:
0426   virtual ~IpcPayloadWriter();
0427 
0428   // Default implementation is a no-op
0429   virtual Status Start();
0430 
0431   virtual Status WritePayload(const IpcPayload& payload) = 0;
0432 
0433   virtual Status Close() = 0;
0434 };
0435 
0436 /// Create a new IPC payload stream writer from stream sink. User is
0437 /// responsible for closing the actual OutputStream.
0438 ///
0439 /// \param[in] sink output stream to write to
0440 /// \param[in] options options for serialization
0441 /// \return Result<std::shared_ptr<IpcPayloadWriter>>
0442 ARROW_EXPORT
0443 Result<std::unique_ptr<IpcPayloadWriter>> MakePayloadStreamWriter(
0444     io::OutputStream* sink, const IpcWriteOptions& options = IpcWriteOptions::Defaults());
0445 
0446 /// Create a new IPC payload file writer from stream sink.
0447 ///
0448 /// \param[in] sink output stream to write to
0449 /// \param[in] schema the schema of the record batches to be written
0450 /// \param[in] options options for serialization, optional
0451 /// \param[in] metadata custom metadata for File Footer, optional
0452 /// \return Status
0453 ARROW_EXPORT
0454 Result<std::unique_ptr<IpcPayloadWriter>> MakePayloadFileWriter(
0455     io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
0456     const IpcWriteOptions& options = IpcWriteOptions::Defaults(),
0457     const std::shared_ptr<const KeyValueMetadata>& metadata = NULLPTR);
0458 
0459 /// Create a new RecordBatchWriter from IpcPayloadWriter and schema.
0460 ///
0461 /// The format is implicitly the IPC stream format (allowing dictionary
0462 /// replacement and deltas).
0463 ///
0464 /// \param[in] sink the IpcPayloadWriter to write to
0465 /// \param[in] schema the schema of the record batches to be written
0466 /// \param[in] options options for serialization
0467 /// \return Result<std::unique_ptr<RecordBatchWriter>>
0468 ARROW_EXPORT
0469 Result<std::unique_ptr<RecordBatchWriter>> OpenRecordBatchWriter(
0470     std::unique_ptr<IpcPayloadWriter> sink, const std::shared_ptr<Schema>& schema,
0471     const IpcWriteOptions& options = IpcWriteOptions::Defaults());
0472 
0473 }  // namespace internal
0474 }  // namespace ipc
0475 }  // namespace arrow