![]() |
|
|||
File indexing completed on 2025-08-28 08:27:00
0001 // Licensed to the Apache Software Foundation (ASF) under one 0002 // or more contributor license agreements. See the NOTICE file 0003 // distributed with this work for additional information 0004 // regarding copyright ownership. The ASF licenses this file 0005 // to you under the Apache License, Version 2.0 (the 0006 // "License"); you may not use this file except in compliance 0007 // with the License. You may obtain a copy of the License at 0008 // 0009 // http://www.apache.org/licenses/LICENSE-2.0 0010 // 0011 // Unless required by applicable law or agreed to in writing, 0012 // software distributed under the License is distributed on an 0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 0014 // KIND, either express or implied. See the License for the 0015 // specific language governing permissions and limitations 0016 // under the License. 0017 0018 // Implement Arrow streaming binary format 0019 0020 #pragma once 0021 0022 #include <cstdint> 0023 #include <memory> 0024 #include <vector> 0025 0026 #include "arrow/ipc/dictionary.h" // IWYU pragma: export 0027 #include "arrow/ipc/message.h" 0028 #include "arrow/ipc/options.h" 0029 #include "arrow/result.h" 0030 #include "arrow/util/macros.h" 0031 #include "arrow/util/visibility.h" 0032 0033 namespace arrow { 0034 0035 class Array; 0036 class Buffer; 0037 class MemoryManager; 0038 class MemoryPool; 0039 class RecordBatch; 0040 class Schema; 0041 class Status; 0042 class Table; 0043 class Tensor; 0044 class SparseTensor; 0045 0046 namespace io { 0047 0048 class OutputStream; 0049 0050 } // namespace io 0051 0052 namespace ipc { 0053 0054 /// \brief Intermediate data structure with metadata header, and zero 0055 /// or more buffers for the message body. 0056 struct IpcPayload { 0057 MessageType type = MessageType::NONE; 0058 std::shared_ptr<Buffer> metadata; 0059 std::vector<std::shared_ptr<Buffer>> body_buffers; 0060 std::vector<int64_t> variadic_buffer_counts; 0061 int64_t body_length = 0; // serialized body length (padded, maybe compressed) 0062 int64_t raw_body_length = 0; // initial uncompressed body length 0063 }; 0064 0065 struct WriteStats { 0066 /// Number of IPC messages written. 0067 int64_t num_messages = 0; 0068 /// Number of record batches written. 0069 int64_t num_record_batches = 0; 0070 /// Number of dictionary batches written. 0071 /// 0072 /// Note: num_dictionary_batches >= num_dictionary_deltas + num_replaced_dictionaries 0073 int64_t num_dictionary_batches = 0; 0074 0075 /// Number of dictionary deltas written. 0076 int64_t num_dictionary_deltas = 0; 0077 /// Number of replaced dictionaries (i.e. where a dictionary batch replaces 0078 /// an existing dictionary with an unrelated new dictionary). 0079 int64_t num_replaced_dictionaries = 0; 0080 0081 /// Total size in bytes of record batches emitted. 0082 /// The "raw" size counts the original buffer sizes, while the "serialized" size 0083 /// includes padding and (optionally) compression. 0084 int64_t total_raw_body_size = 0; 0085 int64_t total_serialized_body_size = 0; 0086 }; 0087 0088 /// \class RecordBatchWriter 0089 /// \brief Abstract interface for writing a stream of record batches 0090 class ARROW_EXPORT RecordBatchWriter { 0091 public: 0092 virtual ~RecordBatchWriter(); 0093 0094 /// \brief Write a record batch to the stream 0095 /// 0096 /// \param[in] batch the record batch to write to the stream 0097 /// \return Status 0098 virtual Status WriteRecordBatch(const RecordBatch& batch) = 0; 0099 0100 /// \brief Write a record batch with custom metadata to the stream 0101 /// 0102 /// \param[in] batch the record batch to write to the stream 0103 /// \param[in] custom_metadata the record batch's custom metadata to write to the stream 0104 /// \return Status 0105 virtual Status WriteRecordBatch( 0106 const RecordBatch& batch, 0107 const std::shared_ptr<const KeyValueMetadata>& custom_metadata); 0108 0109 /// \brief Write possibly-chunked table by creating sequence of record batches 0110 /// \param[in] table table to write 0111 /// \return Status 0112 Status WriteTable(const Table& table); 0113 0114 /// \brief Write Table with a particular chunksize 0115 /// \param[in] table table to write 0116 /// \param[in] max_chunksize maximum number of rows for table chunks. To 0117 /// indicate that no maximum should be enforced, pass -1. 0118 /// \return Status 0119 virtual Status WriteTable(const Table& table, int64_t max_chunksize); 0120 0121 /// \brief Perform any logic necessary to finish the stream 0122 /// 0123 /// \return Status 0124 virtual Status Close() = 0; 0125 0126 /// \brief Return current write statistics 0127 virtual WriteStats stats() const = 0; 0128 }; 0129 0130 /// \defgroup record-batch-writer-factories Functions for creating RecordBatchWriter 0131 /// instances 0132 /// 0133 /// @{ 0134 0135 /// Create a new IPC stream writer from stream sink and schema. User is 0136 /// responsible for closing the actual OutputStream. 0137 /// 0138 /// \param[in] sink output stream to write to 0139 /// \param[in] schema the schema of the record batches to be written 0140 /// \param[in] options options for serialization 0141 /// \return Result<std::shared_ptr<RecordBatchWriter>> 0142 ARROW_EXPORT 0143 Result<std::shared_ptr<RecordBatchWriter>> MakeStreamWriter( 0144 io::OutputStream* sink, const std::shared_ptr<Schema>& schema, 0145 const IpcWriteOptions& options = IpcWriteOptions::Defaults()); 0146 0147 /// Create a new IPC stream writer from stream sink and schema. User is 0148 /// responsible for closing the actual OutputStream. 0149 /// 0150 /// \param[in] sink output stream to write to 0151 /// \param[in] schema the schema of the record batches to be written 0152 /// \param[in] options options for serialization 0153 /// \return Result<std::shared_ptr<RecordBatchWriter>> 0154 ARROW_EXPORT 0155 Result<std::shared_ptr<RecordBatchWriter>> MakeStreamWriter( 0156 std::shared_ptr<io::OutputStream> sink, const std::shared_ptr<Schema>& schema, 0157 const IpcWriteOptions& options = IpcWriteOptions::Defaults()); 0158 0159 /// Create a new IPC file writer from stream sink and schema 0160 /// 0161 /// \param[in] sink output stream to write to 0162 /// \param[in] schema the schema of the record batches to be written 0163 /// \param[in] options options for serialization, optional 0164 /// \param[in] metadata custom metadata for File Footer, optional 0165 /// \return Result<std::shared_ptr<RecordBatchWriter>> 0166 ARROW_EXPORT 0167 Result<std::shared_ptr<RecordBatchWriter>> MakeFileWriter( 0168 io::OutputStream* sink, const std::shared_ptr<Schema>& schema, 0169 const IpcWriteOptions& options = IpcWriteOptions::Defaults(), 0170 const std::shared_ptr<const KeyValueMetadata>& metadata = NULLPTR); 0171 0172 /// Create a new IPC file writer from stream sink and schema 0173 /// 0174 /// \param[in] sink output stream to write to 0175 /// \param[in] schema the schema of the record batches to be written 0176 /// \param[in] options options for serialization, optional 0177 /// \param[in] metadata custom metadata for File Footer, optional 0178 /// \return Result<std::shared_ptr<RecordBatchWriter>> 0179 ARROW_EXPORT 0180 Result<std::shared_ptr<RecordBatchWriter>> MakeFileWriter( 0181 std::shared_ptr<io::OutputStream> sink, const std::shared_ptr<Schema>& schema, 0182 const IpcWriteOptions& options = IpcWriteOptions::Defaults(), 0183 const std::shared_ptr<const KeyValueMetadata>& metadata = NULLPTR); 0184 0185 /// @} 0186 0187 /// \brief Low-level API for writing a record batch (without schema) 0188 /// to an OutputStream as encapsulated IPC message. See Arrow format 0189 /// documentation for more detail. 0190 /// 0191 /// \param[in] batch the record batch to write 0192 /// \param[in] buffer_start_offset the start offset to use in the buffer metadata, 0193 /// generally should be 0 0194 /// \param[in] dst an OutputStream 0195 /// \param[out] metadata_length the size of the length-prefixed flatbuffer 0196 /// including padding to a 64-byte boundary 0197 /// \param[out] body_length the size of the contiguous buffer block plus 0198 /// \param[in] options options for serialization 0199 /// \return Status 0200 ARROW_EXPORT 0201 Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, 0202 io::OutputStream* dst, int32_t* metadata_length, 0203 int64_t* body_length, const IpcWriteOptions& options); 0204 0205 /// \brief Serialize record batch as encapsulated IPC message in a new buffer 0206 /// 0207 /// \param[in] batch the record batch 0208 /// \param[in] options the IpcWriteOptions to use for serialization 0209 /// \return the serialized message 0210 ARROW_EXPORT 0211 Result<std::shared_ptr<Buffer>> SerializeRecordBatch(const RecordBatch& batch, 0212 const IpcWriteOptions& options); 0213 0214 /// \brief Serialize record batch as encapsulated IPC message in a new buffer 0215 /// 0216 /// \param[in] batch the record batch 0217 /// \param[in] mm a MemoryManager to allocate memory from 0218 /// \return the serialized message 0219 ARROW_EXPORT 0220 Result<std::shared_ptr<Buffer>> SerializeRecordBatch(const RecordBatch& batch, 0221 std::shared_ptr<MemoryManager> mm); 0222 0223 /// \brief Write record batch to OutputStream 0224 /// 0225 /// \param[in] batch the record batch to write 0226 /// \param[in] options the IpcWriteOptions to use for serialization 0227 /// \param[in] out the OutputStream to write the output to 0228 /// \return Status 0229 /// 0230 /// If writing to pre-allocated memory, you can use 0231 /// arrow::ipc::GetRecordBatchSize to compute how much space is required 0232 ARROW_EXPORT 0233 Status SerializeRecordBatch(const RecordBatch& batch, const IpcWriteOptions& options, 0234 io::OutputStream* out); 0235 0236 /// \brief Serialize schema as encapsulated IPC message 0237 /// 0238 /// \param[in] schema the schema to write 0239 /// \param[in] pool a MemoryPool to allocate memory from 0240 /// \return the serialized schema 0241 ARROW_EXPORT 0242 Result<std::shared_ptr<Buffer>> SerializeSchema(const Schema& schema, 0243 MemoryPool* pool = default_memory_pool()); 0244 0245 /// \brief Write multiple record batches to OutputStream, including schema 0246 /// \param[in] batches a vector of batches. Must all have same schema 0247 /// \param[in] options options for serialization 0248 /// \param[out] dst an OutputStream 0249 /// \return Status 0250 ARROW_EXPORT 0251 Status WriteRecordBatchStream(const std::vector<std::shared_ptr<RecordBatch>>& batches, 0252 const IpcWriteOptions& options, io::OutputStream* dst); 0253 0254 /// \brief Compute the number of bytes needed to write an IPC payload 0255 /// including metadata 0256 /// 0257 /// \param[in] payload the IPC payload to write 0258 /// \param[in] options write options 0259 /// \return the size of the complete encapsulated message 0260 ARROW_EXPORT 0261 int64_t GetPayloadSize(const IpcPayload& payload, 0262 const IpcWriteOptions& options = IpcWriteOptions::Defaults()); 0263 0264 /// \brief Compute the number of bytes needed to write a record batch including metadata 0265 /// 0266 /// \param[in] batch the record batch to write 0267 /// \param[out] size the size of the complete encapsulated message 0268 /// \return Status 0269 ARROW_EXPORT 0270 Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size); 0271 0272 /// \brief Compute the number of bytes needed to write a record batch including metadata 0273 /// 0274 /// \param[in] batch the record batch to write 0275 /// \param[in] options options for serialization 0276 /// \param[out] size the size of the complete encapsulated message 0277 /// \return Status 0278 ARROW_EXPORT 0279 Status GetRecordBatchSize(const RecordBatch& batch, const IpcWriteOptions& options, 0280 int64_t* size); 0281 0282 /// \brief Compute the number of bytes needed to write a tensor including metadata 0283 /// 0284 /// \param[in] tensor the tensor to write 0285 /// \param[out] size the size of the complete encapsulated message 0286 /// \return Status 0287 ARROW_EXPORT 0288 Status GetTensorSize(const Tensor& tensor, int64_t* size); 0289 0290 /// \brief EXPERIMENTAL: Convert arrow::Tensor to a Message with minimal memory 0291 /// allocation 0292 /// 0293 /// \param[in] tensor the Tensor to write 0294 /// \param[in] pool MemoryPool to allocate space for metadata 0295 /// \return the resulting Message 0296 ARROW_EXPORT 0297 Result<std::unique_ptr<Message>> GetTensorMessage(const Tensor& tensor, MemoryPool* pool); 0298 0299 /// \brief Write arrow::Tensor as a contiguous message. 0300 /// 0301 /// The metadata and body are written assuming 64-byte alignment. It is the 0302 /// user's responsibility to ensure that the OutputStream has been aligned 0303 /// to a 64-byte multiple before writing the message. 0304 /// 0305 /// The message is written out as followed: 0306 /// \code 0307 /// <metadata size> <metadata> <tensor data> 0308 /// \endcode 0309 /// 0310 /// \param[in] tensor the Tensor to write 0311 /// \param[in] dst the OutputStream to write to 0312 /// \param[out] metadata_length the actual metadata length, including padding 0313 /// \param[out] body_length the actual message body length 0314 /// \return Status 0315 ARROW_EXPORT 0316 Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadata_length, 0317 int64_t* body_length); 0318 0319 /// \brief EXPERIMENTAL: Convert arrow::SparseTensor to a Message with minimal memory 0320 /// allocation 0321 /// 0322 /// The message is written out as followed: 0323 /// \code 0324 /// <metadata size> <metadata> <sparse index> <sparse tensor body> 0325 /// \endcode 0326 /// 0327 /// \param[in] sparse_tensor the SparseTensor to write 0328 /// \param[in] pool MemoryPool to allocate space for metadata 0329 /// \return the resulting Message 0330 ARROW_EXPORT 0331 Result<std::unique_ptr<Message>> GetSparseTensorMessage(const SparseTensor& sparse_tensor, 0332 MemoryPool* pool); 0333 0334 /// \brief EXPERIMENTAL: Write arrow::SparseTensor as a contiguous message. The metadata, 0335 /// sparse index, and body are written assuming 64-byte alignment. It is the 0336 /// user's responsibility to ensure that the OutputStream has been aligned 0337 /// to a 64-byte multiple before writing the message. 0338 /// 0339 /// \param[in] sparse_tensor the SparseTensor to write 0340 /// \param[in] dst the OutputStream to write to 0341 /// \param[out] metadata_length the actual metadata length, including padding 0342 /// \param[out] body_length the actual message body length 0343 /// \return Status 0344 ARROW_EXPORT 0345 Status WriteSparseTensor(const SparseTensor& sparse_tensor, io::OutputStream* dst, 0346 int32_t* metadata_length, int64_t* body_length); 0347 0348 /// \brief Compute IpcPayload for the given schema 0349 /// \param[in] schema the Schema that is being serialized 0350 /// \param[in] options options for serialization 0351 /// \param[in] mapper object mapping dictionary fields to dictionary ids 0352 /// \param[out] out the returned vector of IpcPayloads 0353 /// \return Status 0354 ARROW_EXPORT 0355 Status GetSchemaPayload(const Schema& schema, const IpcWriteOptions& options, 0356 const DictionaryFieldMapper& mapper, IpcPayload* out); 0357 0358 /// \brief Compute IpcPayload for a dictionary 0359 /// \param[in] id the dictionary id 0360 /// \param[in] dictionary the dictionary values 0361 /// \param[in] options options for serialization 0362 /// \param[out] payload the output IpcPayload 0363 /// \return Status 0364 ARROW_EXPORT 0365 Status GetDictionaryPayload(int64_t id, const std::shared_ptr<Array>& dictionary, 0366 const IpcWriteOptions& options, IpcPayload* payload); 0367 0368 /// \brief Compute IpcPayload for a dictionary 0369 /// \param[in] id the dictionary id 0370 /// \param[in] is_delta whether the dictionary is a delta dictionary 0371 /// \param[in] dictionary the dictionary values 0372 /// \param[in] options options for serialization 0373 /// \param[out] payload the output IpcPayload 0374 /// \return Status 0375 ARROW_EXPORT 0376 Status GetDictionaryPayload(int64_t id, bool is_delta, 0377 const std::shared_ptr<Array>& dictionary, 0378 const IpcWriteOptions& options, IpcPayload* payload); 0379 0380 /// \brief Compute IpcPayload for the given record batch 0381 /// \param[in] batch the RecordBatch that is being serialized 0382 /// \param[in] options options for serialization 0383 /// \param[out] out the returned IpcPayload 0384 /// \return Status 0385 ARROW_EXPORT 0386 Status GetRecordBatchPayload(const RecordBatch& batch, const IpcWriteOptions& options, 0387 IpcPayload* out); 0388 0389 /// \brief Compute IpcPayload for the given record batch and custom metadata 0390 /// \param[in] batch the RecordBatch that is being serialized 0391 /// \param[in] custom_metadata the custom metadata to be serialized with the record batch 0392 /// \param[in] options options for serialization 0393 /// \param[out] out the returned IpcPayload 0394 /// \return Status 0395 ARROW_EXPORT 0396 Status GetRecordBatchPayload( 0397 const RecordBatch& batch, 0398 const std::shared_ptr<const KeyValueMetadata>& custom_metadata, 0399 const IpcWriteOptions& options, IpcPayload* out); 0400 0401 /// \brief Write an IPC payload to the given stream. 0402 /// \param[in] payload the payload to write 0403 /// \param[in] options options for serialization 0404 /// \param[in] dst The stream to write the payload to. 0405 /// \param[out] metadata_length the length of the serialized metadata 0406 /// \return Status 0407 ARROW_EXPORT 0408 Status WriteIpcPayload(const IpcPayload& payload, const IpcWriteOptions& options, 0409 io::OutputStream* dst, int32_t* metadata_length); 0410 0411 /// \brief Compute IpcPayload for the given sparse tensor 0412 /// \param[in] sparse_tensor the SparseTensor that is being serialized 0413 /// \param[in,out] pool for any required temporary memory allocations 0414 /// \param[out] out the returned IpcPayload 0415 /// \return Status 0416 ARROW_EXPORT 0417 Status GetSparseTensorPayload(const SparseTensor& sparse_tensor, MemoryPool* pool, 0418 IpcPayload* out); 0419 0420 namespace internal { 0421 0422 // These internal APIs may change without warning or deprecation 0423 0424 class ARROW_EXPORT IpcPayloadWriter { 0425 public: 0426 virtual ~IpcPayloadWriter(); 0427 0428 // Default implementation is a no-op 0429 virtual Status Start(); 0430 0431 virtual Status WritePayload(const IpcPayload& payload) = 0; 0432 0433 virtual Status Close() = 0; 0434 }; 0435 0436 /// Create a new IPC payload stream writer from stream sink. User is 0437 /// responsible for closing the actual OutputStream. 0438 /// 0439 /// \param[in] sink output stream to write to 0440 /// \param[in] options options for serialization 0441 /// \return Result<std::shared_ptr<IpcPayloadWriter>> 0442 ARROW_EXPORT 0443 Result<std::unique_ptr<IpcPayloadWriter>> MakePayloadStreamWriter( 0444 io::OutputStream* sink, const IpcWriteOptions& options = IpcWriteOptions::Defaults()); 0445 0446 /// Create a new IPC payload file writer from stream sink. 0447 /// 0448 /// \param[in] sink output stream to write to 0449 /// \param[in] schema the schema of the record batches to be written 0450 /// \param[in] options options for serialization, optional 0451 /// \param[in] metadata custom metadata for File Footer, optional 0452 /// \return Status 0453 ARROW_EXPORT 0454 Result<std::unique_ptr<IpcPayloadWriter>> MakePayloadFileWriter( 0455 io::OutputStream* sink, const std::shared_ptr<Schema>& schema, 0456 const IpcWriteOptions& options = IpcWriteOptions::Defaults(), 0457 const std::shared_ptr<const KeyValueMetadata>& metadata = NULLPTR); 0458 0459 /// Create a new RecordBatchWriter from IpcPayloadWriter and schema. 0460 /// 0461 /// The format is implicitly the IPC stream format (allowing dictionary 0462 /// replacement and deltas). 0463 /// 0464 /// \param[in] sink the IpcPayloadWriter to write to 0465 /// \param[in] schema the schema of the record batches to be written 0466 /// \param[in] options options for serialization 0467 /// \return Result<std::unique_ptr<RecordBatchWriter>> 0468 ARROW_EXPORT 0469 Result<std::unique_ptr<RecordBatchWriter>> OpenRecordBatchWriter( 0470 std::unique_ptr<IpcPayloadWriter> sink, const std::shared_ptr<Schema>& schema, 0471 const IpcWriteOptions& options = IpcWriteOptions::Defaults()); 0472 0473 } // namespace internal 0474 } // namespace ipc 0475 } // namespace arrow
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
![]() ![]() |