![]() |
|
|||
File indexing completed on 2025-08-27 08:47:19
0001 // Licensed to the Apache Software Foundation (ASF) under one 0002 // or more contributor license agreements. See the NOTICE file 0003 // distributed with this work for additional information 0004 // regarding copyright ownership. The ASF licenses this file 0005 // to you under the Apache License, Version 2.0 (the 0006 // "License"); you may not use this file except in compliance 0007 // with the License. You may obtain a copy of the License at 0008 // 0009 // http://www.apache.org/licenses/LICENSE-2.0 0010 // 0011 // Unless required by applicable law or agreed to in writing, 0012 // software distributed under the License is distributed on an 0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 0014 // KIND, either express or implied. See the License for the 0015 // specific language governing permissions and limitations 0016 // under the License. 0017 0018 #pragma once 0019 0020 #include <cstdint> 0021 #include <memory> 0022 #include <string> 0023 #include <vector> 0024 0025 #include "arrow/compare.h" 0026 #include "arrow/device.h" 0027 #include "arrow/result.h" 0028 #include "arrow/status.h" 0029 #include "arrow/type_fwd.h" 0030 #include "arrow/util/iterator.h" 0031 #include "arrow/util/macros.h" 0032 #include "arrow/util/visibility.h" 0033 0034 namespace arrow { 0035 0036 /// \class RecordBatch 0037 /// \brief Collection of equal-length arrays matching a particular Schema 0038 /// 0039 /// A record batch is table-like data structure that is semantically a sequence 0040 /// of fields, each a contiguous Arrow array 0041 class ARROW_EXPORT RecordBatch { 0042 public: 0043 virtual ~RecordBatch() = default; 0044 0045 /// \param[in] schema The record batch schema 0046 /// \param[in] num_rows length of fields in the record batch. Each array 0047 /// should have the same length as num_rows 0048 /// \param[in] columns the record batch fields as vector of arrays 0049 /// \param[in] sync_event optional synchronization event for non-CPU device 0050 /// memory used by buffers 0051 static std::shared_ptr<RecordBatch> Make( 0052 std::shared_ptr<Schema> schema, int64_t num_rows, 0053 std::vector<std::shared_ptr<Array>> columns, 0054 std::shared_ptr<Device::SyncEvent> sync_event = NULLPTR); 0055 0056 /// \brief Construct record batch from vector of internal data structures 0057 /// \since 0.5.0 0058 /// 0059 /// This class is intended for internal use, or advanced users. 0060 /// 0061 /// \param schema the record batch schema 0062 /// \param num_rows the number of semantic rows in the record batch. This 0063 /// should be equal to the length of each field 0064 /// \param columns the data for the batch's columns 0065 /// \param device_type the type of the device that the Arrow columns are 0066 /// allocated on 0067 /// \param sync_event optional synchronization event for non-CPU device 0068 /// memory used by buffers 0069 static std::shared_ptr<RecordBatch> Make( 0070 std::shared_ptr<Schema> schema, int64_t num_rows, 0071 std::vector<std::shared_ptr<ArrayData>> columns, 0072 DeviceAllocationType device_type = DeviceAllocationType::kCPU, 0073 std::shared_ptr<Device::SyncEvent> sync_event = NULLPTR); 0074 0075 /// \brief Create an empty RecordBatch of a given schema 0076 /// 0077 /// The output RecordBatch will be created with DataTypes from 0078 /// the given schema. 0079 /// 0080 /// \param[in] schema the schema of the empty RecordBatch 0081 /// \param[in] pool the memory pool to allocate memory from 0082 /// \return the resulting RecordBatch 0083 static Result<std::shared_ptr<RecordBatch>> MakeEmpty( 0084 std::shared_ptr<Schema> schema, MemoryPool* pool = default_memory_pool()); 0085 0086 /// \brief Convert record batch to struct array 0087 /// 0088 /// Create a struct array whose child arrays are the record batch's columns. 0089 /// Note that the record batch's top-level field metadata cannot be reflected 0090 /// in the resulting struct array. 0091 Result<std::shared_ptr<StructArray>> ToStructArray() const; 0092 0093 /// \brief Convert record batch with one data type to Tensor 0094 /// 0095 /// Create a Tensor object with shape (number of rows, number of columns) and 0096 /// strides (type size in bytes, type size in bytes * number of rows). 0097 /// Generated Tensor will have column-major layout. 0098 /// 0099 /// \param[in] null_to_nan if true, convert nulls to NaN 0100 /// \param[in] row_major if true, create row-major Tensor else column-major Tensor 0101 /// \param[in] pool the memory pool to allocate the tensor buffer 0102 /// \return the resulting Tensor 0103 Result<std::shared_ptr<Tensor>> ToTensor( 0104 bool null_to_nan = false, bool row_major = true, 0105 MemoryPool* pool = default_memory_pool()) const; 0106 0107 /// \brief Construct record batch from struct array 0108 /// 0109 /// This constructs a record batch using the child arrays of the given 0110 /// array, which must be a struct array. 0111 /// 0112 /// \param[in] array the source array, must be a StructArray 0113 /// \param[in] pool the memory pool to allocate new validity bitmaps 0114 /// 0115 /// This operation will usually be zero-copy. However, if the struct array has an 0116 /// offset or a validity bitmap then these will need to be pushed into the child arrays. 0117 /// Pushing the offset is zero-copy but pushing the validity bitmap is not. 0118 static Result<std::shared_ptr<RecordBatch>> FromStructArray( 0119 const std::shared_ptr<Array>& array, MemoryPool* pool = default_memory_pool()); 0120 0121 /// \brief Determine if two record batches are exactly equal 0122 /// 0123 /// \param[in] other the RecordBatch to compare with 0124 /// \param[in] check_metadata if true, check that Schema metadata is the same 0125 /// \param[in] opts the options for equality comparisons 0126 /// \return true if batches are equal 0127 bool Equals(const RecordBatch& other, bool check_metadata = false, 0128 const EqualOptions& opts = EqualOptions::Defaults()) const; 0129 0130 /// \brief Determine if two record batches are approximately equal 0131 /// 0132 /// \param[in] other the RecordBatch to compare with 0133 /// \param[in] opts the options for equality comparisons 0134 /// \return true if batches are approximately equal 0135 bool ApproxEquals(const RecordBatch& other, 0136 const EqualOptions& opts = EqualOptions::Defaults()) const; 0137 0138 /// \return the record batch's schema 0139 const std::shared_ptr<Schema>& schema() const { return schema_; } 0140 0141 /// \brief Replace the schema with another schema with the same types, but potentially 0142 /// different field names and/or metadata. 0143 Result<std::shared_ptr<RecordBatch>> ReplaceSchema( 0144 std::shared_ptr<Schema> schema) const; 0145 0146 /// \brief Retrieve all columns at once 0147 virtual const std::vector<std::shared_ptr<Array>>& columns() const = 0; 0148 0149 /// \brief Retrieve an array from the record batch 0150 /// \param[in] i field index, does not boundscheck 0151 /// \return an Array object 0152 virtual std::shared_ptr<Array> column(int i) const = 0; 0153 0154 /// \brief Retrieve an array from the record batch 0155 /// \param[in] name field name 0156 /// \return an Array or null if no field was found 0157 std::shared_ptr<Array> GetColumnByName(const std::string& name) const; 0158 0159 /// \brief Retrieve an array's internal data from the record batch 0160 /// \param[in] i field index, does not boundscheck 0161 /// \return an internal ArrayData object 0162 virtual std::shared_ptr<ArrayData> column_data(int i) const = 0; 0163 0164 /// \brief Retrieve all arrays' internal data from the record batch. 0165 virtual const ArrayDataVector& column_data() const = 0; 0166 0167 /// \brief Add column to the record batch, producing a new RecordBatch 0168 /// 0169 /// \param[in] i field index, which will be boundschecked 0170 /// \param[in] field field to be added 0171 /// \param[in] column column to be added 0172 virtual Result<std::shared_ptr<RecordBatch>> AddColumn( 0173 int i, const std::shared_ptr<Field>& field, 0174 const std::shared_ptr<Array>& column) const = 0; 0175 0176 /// \brief Add new nullable column to the record batch, producing a new 0177 /// RecordBatch. 0178 /// 0179 /// For non-nullable columns, use the Field-based version of this method. 0180 /// 0181 /// \param[in] i field index, which will be boundschecked 0182 /// \param[in] field_name name of field to be added 0183 /// \param[in] column column to be added 0184 virtual Result<std::shared_ptr<RecordBatch>> AddColumn( 0185 int i, std::string field_name, const std::shared_ptr<Array>& column) const; 0186 0187 /// \brief Replace a column in the record batch, producing a new RecordBatch 0188 /// 0189 /// \param[in] i field index, does boundscheck 0190 /// \param[in] field field to be replaced 0191 /// \param[in] column column to be replaced 0192 virtual Result<std::shared_ptr<RecordBatch>> SetColumn( 0193 int i, const std::shared_ptr<Field>& field, 0194 const std::shared_ptr<Array>& column) const = 0; 0195 0196 /// \brief Remove column from the record batch, producing a new RecordBatch 0197 /// 0198 /// \param[in] i field index, does boundscheck 0199 virtual Result<std::shared_ptr<RecordBatch>> RemoveColumn(int i) const = 0; 0200 0201 virtual std::shared_ptr<RecordBatch> ReplaceSchemaMetadata( 0202 const std::shared_ptr<const KeyValueMetadata>& metadata) const = 0; 0203 0204 /// \brief Name in i-th column 0205 const std::string& column_name(int i) const; 0206 0207 /// \return the number of columns in the table 0208 int num_columns() const; 0209 0210 /// \return the number of rows (the corresponding length of each column) 0211 int64_t num_rows() const { return num_rows_; } 0212 0213 /// \brief Copy the entire RecordBatch to destination MemoryManager 0214 /// 0215 /// This uses Array::CopyTo on each column of the record batch to create 0216 /// a new record batch where all underlying buffers for the columns have 0217 /// been copied to the destination MemoryManager. This uses 0218 /// MemoryManager::CopyBuffer under the hood. 0219 Result<std::shared_ptr<RecordBatch>> CopyTo( 0220 const std::shared_ptr<MemoryManager>& to) const; 0221 0222 /// \brief View or Copy the entire RecordBatch to destination MemoryManager 0223 /// 0224 /// This uses Array::ViewOrCopyTo on each column of the record batch to create 0225 /// a new record batch where all underlying buffers for the columns have 0226 /// been zero-copy viewed on the destination MemoryManager, falling back 0227 /// to performing a copy if it can't be viewed as a zero-copy buffer. This uses 0228 /// Buffer::ViewOrCopy under the hood. 0229 Result<std::shared_ptr<RecordBatch>> ViewOrCopyTo( 0230 const std::shared_ptr<MemoryManager>& to) const; 0231 0232 /// \brief Slice each of the arrays in the record batch 0233 /// \param[in] offset the starting offset to slice, through end of batch 0234 /// \return new record batch 0235 virtual std::shared_ptr<RecordBatch> Slice(int64_t offset) const; 0236 0237 /// \brief Slice each of the arrays in the record batch 0238 /// \param[in] offset the starting offset to slice 0239 /// \param[in] length the number of elements to slice from offset 0240 /// \return new record batch 0241 virtual std::shared_ptr<RecordBatch> Slice(int64_t offset, int64_t length) const = 0; 0242 0243 /// \return PrettyPrint representation suitable for debugging 0244 std::string ToString() const; 0245 0246 /// \brief Return names of all columns 0247 std::vector<std::string> ColumnNames() const; 0248 0249 /// \brief Rename columns with provided names 0250 Result<std::shared_ptr<RecordBatch>> RenameColumns( 0251 const std::vector<std::string>& names) const; 0252 0253 /// \brief Return new record batch with specified columns 0254 Result<std::shared_ptr<RecordBatch>> SelectColumns( 0255 const std::vector<int>& indices) const; 0256 0257 /// \brief Perform cheap validation checks to determine obvious inconsistencies 0258 /// within the record batch's schema and internal data. 0259 /// 0260 /// This is O(k) where k is the total number of fields and array descendents. 0261 /// 0262 /// \return Status 0263 virtual Status Validate() const; 0264 0265 /// \brief Perform extensive validation checks to determine inconsistencies 0266 /// within the record batch's schema and internal data. 0267 /// 0268 /// This is potentially O(k*n) where n is the number of rows. 0269 /// 0270 /// \return Status 0271 virtual Status ValidateFull() const; 0272 0273 /// \brief EXPERIMENTAL: Return a top-level sync event object for this record batch 0274 /// 0275 /// If all of the data for this record batch is in CPU memory, then this 0276 /// will return null. If the data for this batch is 0277 /// on a device, then if synchronization is needed before accessing the 0278 /// data the returned sync event will allow for it. 0279 /// 0280 /// \return null or a Device::SyncEvent 0281 virtual const std::shared_ptr<Device::SyncEvent>& GetSyncEvent() const = 0; 0282 0283 virtual DeviceAllocationType device_type() const = 0; 0284 0285 /// \brief Create a statistics array of this record batch 0286 /// 0287 /// The created array follows the C data interface statistics 0288 /// specification. See 0289 /// https://arrow.apache.org/docs/format/CDataInterfaceStatistics.html 0290 /// for details. 0291 /// 0292 /// \param[in] pool the memory pool to allocate memory from 0293 /// \return the statistics array of this record batch 0294 Result<std::shared_ptr<Array>> MakeStatisticsArray( 0295 MemoryPool* pool = default_memory_pool()) const; 0296 0297 protected: 0298 RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows); 0299 0300 std::shared_ptr<Schema> schema_; 0301 int64_t num_rows_; 0302 0303 private: 0304 ARROW_DISALLOW_COPY_AND_ASSIGN(RecordBatch); 0305 }; 0306 0307 struct ARROW_EXPORT RecordBatchWithMetadata { 0308 std::shared_ptr<RecordBatch> batch; 0309 std::shared_ptr<KeyValueMetadata> custom_metadata; 0310 }; 0311 0312 template <> 0313 struct IterationTraits<RecordBatchWithMetadata> { 0314 static RecordBatchWithMetadata End() { return {NULLPTR, NULLPTR}; } 0315 static bool IsEnd(const RecordBatchWithMetadata& val) { return val.batch == NULLPTR; } 0316 }; 0317 0318 /// \brief Abstract interface for reading stream of record batches 0319 class ARROW_EXPORT RecordBatchReader { 0320 public: 0321 using ValueType = std::shared_ptr<RecordBatch>; 0322 0323 virtual ~RecordBatchReader(); 0324 0325 /// \return the shared schema of the record batches in the stream 0326 virtual std::shared_ptr<Schema> schema() const = 0; 0327 0328 /// \brief Read the next record batch in the stream. Return null for batch 0329 /// when reaching end of stream 0330 /// 0331 /// Example: 0332 /// 0333 /// ``` 0334 /// while (true) { 0335 /// std::shared_ptr<RecordBatch> batch; 0336 /// ARROW_RETURN_NOT_OK(reader->ReadNext(&batch)); 0337 /// if (!batch) { 0338 /// break; 0339 /// } 0340 /// // handling the `batch`, the `batch->num_rows()` 0341 /// // might be 0. 0342 /// } 0343 /// ``` 0344 /// 0345 /// \param[out] batch the next loaded batch, null at end of stream. Returning 0346 /// an empty batch doesn't mean the end of stream because it is valid data. 0347 /// \return Status 0348 virtual Status ReadNext(std::shared_ptr<RecordBatch>* batch) = 0; 0349 0350 virtual Result<RecordBatchWithMetadata> ReadNext() { 0351 return Status::NotImplemented("ReadNext with custom metadata"); 0352 } 0353 0354 /// \brief Iterator interface 0355 Result<std::shared_ptr<RecordBatch>> Next() { 0356 std::shared_ptr<RecordBatch> batch; 0357 ARROW_RETURN_NOT_OK(ReadNext(&batch)); 0358 return batch; 0359 } 0360 0361 /// \brief finalize reader 0362 virtual Status Close() { return Status::OK(); } 0363 0364 /// \brief EXPERIMENTAL: Get the device type for record batches this reader produces 0365 /// 0366 /// default implementation is to return DeviceAllocationType::kCPU 0367 virtual DeviceAllocationType device_type() const { return DeviceAllocationType::kCPU; } 0368 0369 class RecordBatchReaderIterator { 0370 public: 0371 using iterator_category = std::input_iterator_tag; 0372 using difference_type = std::ptrdiff_t; 0373 using value_type = std::shared_ptr<RecordBatch>; 0374 using pointer = value_type const*; 0375 using reference = value_type const&; 0376 0377 RecordBatchReaderIterator() : batch_(RecordBatchEnd()), reader_(NULLPTR) {} 0378 0379 explicit RecordBatchReaderIterator(RecordBatchReader* reader) 0380 : batch_(RecordBatchEnd()), reader_(reader) { 0381 Next(); 0382 } 0383 0384 bool operator==(const RecordBatchReaderIterator& other) const { 0385 return batch_ == other.batch_; 0386 } 0387 0388 bool operator!=(const RecordBatchReaderIterator& other) const { 0389 return !(*this == other); 0390 } 0391 0392 Result<std::shared_ptr<RecordBatch>> operator*() { 0393 ARROW_RETURN_NOT_OK(batch_.status()); 0394 0395 return batch_; 0396 } 0397 0398 RecordBatchReaderIterator& operator++() { 0399 Next(); 0400 return *this; 0401 } 0402 0403 RecordBatchReaderIterator operator++(int) { 0404 RecordBatchReaderIterator tmp(*this); 0405 Next(); 0406 return tmp; 0407 } 0408 0409 private: 0410 std::shared_ptr<RecordBatch> RecordBatchEnd() { 0411 return std::shared_ptr<RecordBatch>(NULLPTR); 0412 } 0413 0414 void Next() { 0415 if (reader_ == NULLPTR) { 0416 batch_ = RecordBatchEnd(); 0417 return; 0418 } 0419 batch_ = reader_->Next(); 0420 } 0421 0422 Result<std::shared_ptr<RecordBatch>> batch_; 0423 RecordBatchReader* reader_; 0424 }; 0425 /// \brief Return an iterator to the first record batch in the stream 0426 RecordBatchReaderIterator begin() { return RecordBatchReaderIterator(this); } 0427 0428 /// \brief Return an iterator to the end of the stream 0429 RecordBatchReaderIterator end() { return RecordBatchReaderIterator(); } 0430 0431 /// \brief Consume entire stream as a vector of record batches 0432 Result<RecordBatchVector> ToRecordBatches(); 0433 0434 /// \brief Read all batches and concatenate as arrow::Table 0435 Result<std::shared_ptr<Table>> ToTable(); 0436 0437 /// \brief Create a RecordBatchReader from a vector of RecordBatch. 0438 /// 0439 /// \param[in] batches the vector of RecordBatch to read from 0440 /// \param[in] schema schema to conform to. Will be inferred from the first 0441 /// element if not provided. 0442 /// \param[in] device_type the type of device that the batches are allocated on 0443 static Result<std::shared_ptr<RecordBatchReader>> Make( 0444 RecordBatchVector batches, std::shared_ptr<Schema> schema = NULLPTR, 0445 DeviceAllocationType device_type = DeviceAllocationType::kCPU); 0446 0447 /// \brief Create a RecordBatchReader from an Iterator of RecordBatch. 0448 /// 0449 /// \param[in] batches an iterator of RecordBatch to read from. 0450 /// \param[in] schema schema that each record batch in iterator will conform to. 0451 /// \param[in] device_type the type of device that the batches are allocated on 0452 static Result<std::shared_ptr<RecordBatchReader>> MakeFromIterator( 0453 Iterator<std::shared_ptr<RecordBatch>> batches, std::shared_ptr<Schema> schema, 0454 DeviceAllocationType device_type = DeviceAllocationType::kCPU); 0455 }; 0456 0457 /// \brief Concatenate record batches 0458 /// 0459 /// The columns of the new batch are formed by concatenate the same columns of each input 0460 /// batch. Concatenate multiple batches into a new batch requires that the schema must be 0461 /// consistent. It supports merging batches without columns (only length, scenarios such 0462 /// as count(*)). 0463 /// 0464 /// \param[in] batches a vector of record batches to be concatenated 0465 /// \param[in] pool memory to store the result will be allocated from this memory pool 0466 /// \return the concatenated record batch 0467 ARROW_EXPORT 0468 Result<std::shared_ptr<RecordBatch>> ConcatenateRecordBatches( 0469 const RecordBatchVector& batches, MemoryPool* pool = default_memory_pool()); 0470 0471 } // namespace arrow
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
![]() ![]() |