Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-27 08:47:19

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <cstdint>
0021 #include <memory>
0022 #include <string>
0023 #include <vector>
0024 
0025 #include "arrow/compare.h"
0026 #include "arrow/device.h"
0027 #include "arrow/result.h"
0028 #include "arrow/status.h"
0029 #include "arrow/type_fwd.h"
0030 #include "arrow/util/iterator.h"
0031 #include "arrow/util/macros.h"
0032 #include "arrow/util/visibility.h"
0033 
0034 namespace arrow {
0035 
0036 /// \class RecordBatch
0037 /// \brief Collection of equal-length arrays matching a particular Schema
0038 ///
0039 /// A record batch is table-like data structure that is semantically a sequence
0040 /// of fields, each a contiguous Arrow array
0041 class ARROW_EXPORT RecordBatch {
0042  public:
0043   virtual ~RecordBatch() = default;
0044 
0045   /// \param[in] schema The record batch schema
0046   /// \param[in] num_rows length of fields in the record batch. Each array
0047   /// should have the same length as num_rows
0048   /// \param[in] columns the record batch fields as vector of arrays
0049   /// \param[in] sync_event optional synchronization event for non-CPU device
0050   /// memory used by buffers
0051   static std::shared_ptr<RecordBatch> Make(
0052       std::shared_ptr<Schema> schema, int64_t num_rows,
0053       std::vector<std::shared_ptr<Array>> columns,
0054       std::shared_ptr<Device::SyncEvent> sync_event = NULLPTR);
0055 
0056   /// \brief Construct record batch from vector of internal data structures
0057   /// \since 0.5.0
0058   ///
0059   /// This class is intended for internal use, or advanced users.
0060   ///
0061   /// \param schema the record batch schema
0062   /// \param num_rows the number of semantic rows in the record batch. This
0063   /// should be equal to the length of each field
0064   /// \param columns the data for the batch's columns
0065   /// \param device_type the type of the device that the Arrow columns are
0066   /// allocated on
0067   /// \param sync_event optional synchronization event for non-CPU device
0068   /// memory used by buffers
0069   static std::shared_ptr<RecordBatch> Make(
0070       std::shared_ptr<Schema> schema, int64_t num_rows,
0071       std::vector<std::shared_ptr<ArrayData>> columns,
0072       DeviceAllocationType device_type = DeviceAllocationType::kCPU,
0073       std::shared_ptr<Device::SyncEvent> sync_event = NULLPTR);
0074 
0075   /// \brief Create an empty RecordBatch of a given schema
0076   ///
0077   /// The output RecordBatch will be created with DataTypes from
0078   /// the given schema.
0079   ///
0080   /// \param[in] schema the schema of the empty RecordBatch
0081   /// \param[in] pool the memory pool to allocate memory from
0082   /// \return the resulting RecordBatch
0083   static Result<std::shared_ptr<RecordBatch>> MakeEmpty(
0084       std::shared_ptr<Schema> schema, MemoryPool* pool = default_memory_pool());
0085 
0086   /// \brief Convert record batch to struct array
0087   ///
0088   /// Create a struct array whose child arrays are the record batch's columns.
0089   /// Note that the record batch's top-level field metadata cannot be reflected
0090   /// in the resulting struct array.
0091   Result<std::shared_ptr<StructArray>> ToStructArray() const;
0092 
0093   /// \brief Convert record batch with one data type to Tensor
0094   ///
0095   /// Create a Tensor object with shape (number of rows, number of columns) and
0096   /// strides (type size in bytes, type size in bytes * number of rows).
0097   /// Generated Tensor will have column-major layout.
0098   ///
0099   /// \param[in] null_to_nan if true, convert nulls to NaN
0100   /// \param[in] row_major if true, create row-major Tensor else column-major Tensor
0101   /// \param[in] pool the memory pool to allocate the tensor buffer
0102   /// \return the resulting Tensor
0103   Result<std::shared_ptr<Tensor>> ToTensor(
0104       bool null_to_nan = false, bool row_major = true,
0105       MemoryPool* pool = default_memory_pool()) const;
0106 
0107   /// \brief Construct record batch from struct array
0108   ///
0109   /// This constructs a record batch using the child arrays of the given
0110   /// array, which must be a struct array.
0111   ///
0112   /// \param[in] array the source array, must be a StructArray
0113   /// \param[in] pool the memory pool to allocate new validity bitmaps
0114   ///
0115   /// This operation will usually be zero-copy.  However, if the struct array has an
0116   /// offset or a validity bitmap then these will need to be pushed into the child arrays.
0117   /// Pushing the offset is zero-copy but pushing the validity bitmap is not.
0118   static Result<std::shared_ptr<RecordBatch>> FromStructArray(
0119       const std::shared_ptr<Array>& array, MemoryPool* pool = default_memory_pool());
0120 
0121   /// \brief Determine if two record batches are exactly equal
0122   ///
0123   /// \param[in] other the RecordBatch to compare with
0124   /// \param[in] check_metadata if true, check that Schema metadata is the same
0125   /// \param[in] opts the options for equality comparisons
0126   /// \return true if batches are equal
0127   bool Equals(const RecordBatch& other, bool check_metadata = false,
0128               const EqualOptions& opts = EqualOptions::Defaults()) const;
0129 
0130   /// \brief Determine if two record batches are approximately equal
0131   ///
0132   /// \param[in] other the RecordBatch to compare with
0133   /// \param[in] opts the options for equality comparisons
0134   /// \return true if batches are approximately equal
0135   bool ApproxEquals(const RecordBatch& other,
0136                     const EqualOptions& opts = EqualOptions::Defaults()) const;
0137 
0138   /// \return the record batch's schema
0139   const std::shared_ptr<Schema>& schema() const { return schema_; }
0140 
0141   /// \brief Replace the schema with another schema with the same types, but potentially
0142   /// different field names and/or metadata.
0143   Result<std::shared_ptr<RecordBatch>> ReplaceSchema(
0144       std::shared_ptr<Schema> schema) const;
0145 
0146   /// \brief Retrieve all columns at once
0147   virtual const std::vector<std::shared_ptr<Array>>& columns() const = 0;
0148 
0149   /// \brief Retrieve an array from the record batch
0150   /// \param[in] i field index, does not boundscheck
0151   /// \return an Array object
0152   virtual std::shared_ptr<Array> column(int i) const = 0;
0153 
0154   /// \brief Retrieve an array from the record batch
0155   /// \param[in] name field name
0156   /// \return an Array or null if no field was found
0157   std::shared_ptr<Array> GetColumnByName(const std::string& name) const;
0158 
0159   /// \brief Retrieve an array's internal data from the record batch
0160   /// \param[in] i field index, does not boundscheck
0161   /// \return an internal ArrayData object
0162   virtual std::shared_ptr<ArrayData> column_data(int i) const = 0;
0163 
0164   /// \brief Retrieve all arrays' internal data from the record batch.
0165   virtual const ArrayDataVector& column_data() const = 0;
0166 
0167   /// \brief Add column to the record batch, producing a new RecordBatch
0168   ///
0169   /// \param[in] i field index, which will be boundschecked
0170   /// \param[in] field field to be added
0171   /// \param[in] column column to be added
0172   virtual Result<std::shared_ptr<RecordBatch>> AddColumn(
0173       int i, const std::shared_ptr<Field>& field,
0174       const std::shared_ptr<Array>& column) const = 0;
0175 
0176   /// \brief Add new nullable column to the record batch, producing a new
0177   /// RecordBatch.
0178   ///
0179   /// For non-nullable columns, use the Field-based version of this method.
0180   ///
0181   /// \param[in] i field index, which will be boundschecked
0182   /// \param[in] field_name name of field to be added
0183   /// \param[in] column column to be added
0184   virtual Result<std::shared_ptr<RecordBatch>> AddColumn(
0185       int i, std::string field_name, const std::shared_ptr<Array>& column) const;
0186 
0187   /// \brief Replace a column in the record batch, producing a new RecordBatch
0188   ///
0189   /// \param[in] i field index, does boundscheck
0190   /// \param[in] field field to be replaced
0191   /// \param[in] column column to be replaced
0192   virtual Result<std::shared_ptr<RecordBatch>> SetColumn(
0193       int i, const std::shared_ptr<Field>& field,
0194       const std::shared_ptr<Array>& column) const = 0;
0195 
0196   /// \brief Remove column from the record batch, producing a new RecordBatch
0197   ///
0198   /// \param[in] i field index, does boundscheck
0199   virtual Result<std::shared_ptr<RecordBatch>> RemoveColumn(int i) const = 0;
0200 
0201   virtual std::shared_ptr<RecordBatch> ReplaceSchemaMetadata(
0202       const std::shared_ptr<const KeyValueMetadata>& metadata) const = 0;
0203 
0204   /// \brief Name in i-th column
0205   const std::string& column_name(int i) const;
0206 
0207   /// \return the number of columns in the table
0208   int num_columns() const;
0209 
0210   /// \return the number of rows (the corresponding length of each column)
0211   int64_t num_rows() const { return num_rows_; }
0212 
0213   /// \brief Copy the entire RecordBatch to destination MemoryManager
0214   ///
0215   /// This uses Array::CopyTo on each column of the record batch to create
0216   /// a new record batch where all underlying buffers for the columns have
0217   /// been copied to the destination MemoryManager. This uses
0218   /// MemoryManager::CopyBuffer under the hood.
0219   Result<std::shared_ptr<RecordBatch>> CopyTo(
0220       const std::shared_ptr<MemoryManager>& to) const;
0221 
0222   /// \brief View or Copy the entire RecordBatch to destination MemoryManager
0223   ///
0224   /// This uses Array::ViewOrCopyTo on each column of the record batch to create
0225   /// a new record batch where all underlying buffers for the columns have
0226   /// been zero-copy viewed on the destination MemoryManager, falling back
0227   /// to performing a copy if it can't be viewed as a zero-copy buffer. This uses
0228   /// Buffer::ViewOrCopy under the hood.
0229   Result<std::shared_ptr<RecordBatch>> ViewOrCopyTo(
0230       const std::shared_ptr<MemoryManager>& to) const;
0231 
0232   /// \brief Slice each of the arrays in the record batch
0233   /// \param[in] offset the starting offset to slice, through end of batch
0234   /// \return new record batch
0235   virtual std::shared_ptr<RecordBatch> Slice(int64_t offset) const;
0236 
0237   /// \brief Slice each of the arrays in the record batch
0238   /// \param[in] offset the starting offset to slice
0239   /// \param[in] length the number of elements to slice from offset
0240   /// \return new record batch
0241   virtual std::shared_ptr<RecordBatch> Slice(int64_t offset, int64_t length) const = 0;
0242 
0243   /// \return PrettyPrint representation suitable for debugging
0244   std::string ToString() const;
0245 
0246   /// \brief Return names of all columns
0247   std::vector<std::string> ColumnNames() const;
0248 
0249   /// \brief Rename columns with provided names
0250   Result<std::shared_ptr<RecordBatch>> RenameColumns(
0251       const std::vector<std::string>& names) const;
0252 
0253   /// \brief Return new record batch with specified columns
0254   Result<std::shared_ptr<RecordBatch>> SelectColumns(
0255       const std::vector<int>& indices) const;
0256 
0257   /// \brief Perform cheap validation checks to determine obvious inconsistencies
0258   /// within the record batch's schema and internal data.
0259   ///
0260   /// This is O(k) where k is the total number of fields and array descendents.
0261   ///
0262   /// \return Status
0263   virtual Status Validate() const;
0264 
0265   /// \brief Perform extensive validation checks to determine inconsistencies
0266   /// within the record batch's schema and internal data.
0267   ///
0268   /// This is potentially O(k*n) where n is the number of rows.
0269   ///
0270   /// \return Status
0271   virtual Status ValidateFull() const;
0272 
0273   /// \brief EXPERIMENTAL: Return a top-level sync event object for this record batch
0274   ///
0275   /// If all of the data for this record batch is in CPU memory, then this
0276   /// will return null. If the data for this batch is
0277   /// on a device, then if synchronization is needed before accessing the
0278   /// data the returned sync event will allow for it.
0279   ///
0280   /// \return null or a Device::SyncEvent
0281   virtual const std::shared_ptr<Device::SyncEvent>& GetSyncEvent() const = 0;
0282 
0283   virtual DeviceAllocationType device_type() const = 0;
0284 
0285   /// \brief Create a statistics array of this record batch
0286   ///
0287   /// The created array follows the C data interface statistics
0288   /// specification. See
0289   /// https://arrow.apache.org/docs/format/CDataInterfaceStatistics.html
0290   /// for details.
0291   ///
0292   /// \param[in] pool the memory pool to allocate memory from
0293   /// \return the statistics array of this record batch
0294   Result<std::shared_ptr<Array>> MakeStatisticsArray(
0295       MemoryPool* pool = default_memory_pool()) const;
0296 
0297  protected:
0298   RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows);
0299 
0300   std::shared_ptr<Schema> schema_;
0301   int64_t num_rows_;
0302 
0303  private:
0304   ARROW_DISALLOW_COPY_AND_ASSIGN(RecordBatch);
0305 };
0306 
0307 struct ARROW_EXPORT RecordBatchWithMetadata {
0308   std::shared_ptr<RecordBatch> batch;
0309   std::shared_ptr<KeyValueMetadata> custom_metadata;
0310 };
0311 
0312 template <>
0313 struct IterationTraits<RecordBatchWithMetadata> {
0314   static RecordBatchWithMetadata End() { return {NULLPTR, NULLPTR}; }
0315   static bool IsEnd(const RecordBatchWithMetadata& val) { return val.batch == NULLPTR; }
0316 };
0317 
0318 /// \brief Abstract interface for reading stream of record batches
0319 class ARROW_EXPORT RecordBatchReader {
0320  public:
0321   using ValueType = std::shared_ptr<RecordBatch>;
0322 
0323   virtual ~RecordBatchReader();
0324 
0325   /// \return the shared schema of the record batches in the stream
0326   virtual std::shared_ptr<Schema> schema() const = 0;
0327 
0328   /// \brief Read the next record batch in the stream. Return null for batch
0329   /// when reaching end of stream
0330   ///
0331   /// Example:
0332   ///
0333   /// ```
0334   /// while (true) {
0335   ///   std::shared_ptr<RecordBatch> batch;
0336   ///   ARROW_RETURN_NOT_OK(reader->ReadNext(&batch));
0337   ///   if (!batch) {
0338   ///     break;
0339   ///   }
0340   ///   // handling the `batch`, the `batch->num_rows()`
0341   ///   // might be 0.
0342   /// }
0343   /// ```
0344   ///
0345   /// \param[out] batch the next loaded batch, null at end of stream. Returning
0346   /// an empty batch doesn't mean the end of stream because it is valid data.
0347   /// \return Status
0348   virtual Status ReadNext(std::shared_ptr<RecordBatch>* batch) = 0;
0349 
0350   virtual Result<RecordBatchWithMetadata> ReadNext() {
0351     return Status::NotImplemented("ReadNext with custom metadata");
0352   }
0353 
0354   /// \brief Iterator interface
0355   Result<std::shared_ptr<RecordBatch>> Next() {
0356     std::shared_ptr<RecordBatch> batch;
0357     ARROW_RETURN_NOT_OK(ReadNext(&batch));
0358     return batch;
0359   }
0360 
0361   /// \brief finalize reader
0362   virtual Status Close() { return Status::OK(); }
0363 
0364   /// \brief EXPERIMENTAL: Get the device type for record batches this reader produces
0365   ///
0366   /// default implementation is to return DeviceAllocationType::kCPU
0367   virtual DeviceAllocationType device_type() const { return DeviceAllocationType::kCPU; }
0368 
0369   class RecordBatchReaderIterator {
0370    public:
0371     using iterator_category = std::input_iterator_tag;
0372     using difference_type = std::ptrdiff_t;
0373     using value_type = std::shared_ptr<RecordBatch>;
0374     using pointer = value_type const*;
0375     using reference = value_type const&;
0376 
0377     RecordBatchReaderIterator() : batch_(RecordBatchEnd()), reader_(NULLPTR) {}
0378 
0379     explicit RecordBatchReaderIterator(RecordBatchReader* reader)
0380         : batch_(RecordBatchEnd()), reader_(reader) {
0381       Next();
0382     }
0383 
0384     bool operator==(const RecordBatchReaderIterator& other) const {
0385       return batch_ == other.batch_;
0386     }
0387 
0388     bool operator!=(const RecordBatchReaderIterator& other) const {
0389       return !(*this == other);
0390     }
0391 
0392     Result<std::shared_ptr<RecordBatch>> operator*() {
0393       ARROW_RETURN_NOT_OK(batch_.status());
0394 
0395       return batch_;
0396     }
0397 
0398     RecordBatchReaderIterator& operator++() {
0399       Next();
0400       return *this;
0401     }
0402 
0403     RecordBatchReaderIterator operator++(int) {
0404       RecordBatchReaderIterator tmp(*this);
0405       Next();
0406       return tmp;
0407     }
0408 
0409    private:
0410     std::shared_ptr<RecordBatch> RecordBatchEnd() {
0411       return std::shared_ptr<RecordBatch>(NULLPTR);
0412     }
0413 
0414     void Next() {
0415       if (reader_ == NULLPTR) {
0416         batch_ = RecordBatchEnd();
0417         return;
0418       }
0419       batch_ = reader_->Next();
0420     }
0421 
0422     Result<std::shared_ptr<RecordBatch>> batch_;
0423     RecordBatchReader* reader_;
0424   };
0425   /// \brief Return an iterator to the first record batch in the stream
0426   RecordBatchReaderIterator begin() { return RecordBatchReaderIterator(this); }
0427 
0428   /// \brief Return an iterator to the end of the stream
0429   RecordBatchReaderIterator end() { return RecordBatchReaderIterator(); }
0430 
0431   /// \brief Consume entire stream as a vector of record batches
0432   Result<RecordBatchVector> ToRecordBatches();
0433 
0434   /// \brief Read all batches and concatenate as arrow::Table
0435   Result<std::shared_ptr<Table>> ToTable();
0436 
0437   /// \brief Create a RecordBatchReader from a vector of RecordBatch.
0438   ///
0439   /// \param[in] batches the vector of RecordBatch to read from
0440   /// \param[in] schema schema to conform to. Will be inferred from the first
0441   ///            element if not provided.
0442   /// \param[in] device_type the type of device that the batches are allocated on
0443   static Result<std::shared_ptr<RecordBatchReader>> Make(
0444       RecordBatchVector batches, std::shared_ptr<Schema> schema = NULLPTR,
0445       DeviceAllocationType device_type = DeviceAllocationType::kCPU);
0446 
0447   /// \brief Create a RecordBatchReader from an Iterator of RecordBatch.
0448   ///
0449   /// \param[in] batches an iterator of RecordBatch to read from.
0450   /// \param[in] schema schema that each record batch in iterator will conform to.
0451   /// \param[in] device_type the type of device that the batches are allocated on
0452   static Result<std::shared_ptr<RecordBatchReader>> MakeFromIterator(
0453       Iterator<std::shared_ptr<RecordBatch>> batches, std::shared_ptr<Schema> schema,
0454       DeviceAllocationType device_type = DeviceAllocationType::kCPU);
0455 };
0456 
0457 /// \brief Concatenate record batches
0458 ///
0459 /// The columns of the new batch are formed by concatenate the same columns of each input
0460 /// batch. Concatenate multiple batches into a new batch requires that the schema must be
0461 /// consistent. It supports merging batches without columns (only length, scenarios such
0462 /// as count(*)).
0463 ///
0464 /// \param[in] batches a vector of record batches to be concatenated
0465 /// \param[in] pool memory to store the result will be allocated from this memory pool
0466 /// \return the concatenated record batch
0467 ARROW_EXPORT
0468 Result<std::shared_ptr<RecordBatch>> ConcatenateRecordBatches(
0469     const RecordBatchVector& batches, MemoryPool* pool = default_memory_pool());
0470 
0471 }  // namespace arrow