Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:27:10

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include "arrow/record_batch.h"
0021 #include "arrow/result.h"
0022 #include "arrow/status.h"
0023 #include "arrow/table_builder.h"
0024 #include "arrow/util/iterator.h"
0025 
0026 #include <type_traits>
0027 
0028 namespace arrow::util {
0029 
0030 namespace detail {
0031 
0032 // Default identity function row accessor. Used to for the common case where the value
0033 // of each row iterated over is it's self also directly iterable.
0034 [[nodiscard]] constexpr inline auto MakeDefaultRowAccessor() {
0035   return [](auto& x) -> Result<decltype(std::ref(x))> { return std::ref(x); };
0036 }
0037 
0038 // Meta-function to check if a type `T` is a range (iterable using `std::begin()` /
0039 // `std::end()`). `is_range<T>::value` will be false if `T` is not a valid range.
0040 template <typename T, typename = void>
0041 struct is_range : std::false_type {};
0042 
0043 template <typename T>
0044 struct is_range<T, std::void_t<decltype(std::begin(std::declval<T>())),
0045                                decltype(std::end(std::declval<T>()))>> : std::true_type {
0046 };
0047 
0048 }  // namespace detail
0049 
0050 /// Delete overload for `const Range&& rows` because the data's lifetime must exceed
0051 /// the lifetime of the function call. `data` will be read when client uses the
0052 /// `RecordBatchReader`
0053 template <class Range, class DataPointConvertor,
0054           class RowAccessor = decltype(detail::MakeDefaultRowAccessor())>
0055 [[nodiscard]] typename std::enable_if_t<detail::is_range<Range>::value,
0056                                         Result<std::shared_ptr<RecordBatchReader>>>
0057 /* Result<std::shared_ptr<RecordBatchReader>>> */ RowsToBatches(
0058     const std::shared_ptr<Schema>& schema, const Range&& rows,
0059     DataPointConvertor&& data_point_convertor,
0060     RowAccessor&& row_accessor = detail::MakeDefaultRowAccessor(),
0061     MemoryPool* pool = default_memory_pool(),
0062     const std::size_t batch_size = 1024) = delete;
0063 
0064 /// \brief Utility function for converting any row-based structure into an
0065 /// `arrow::RecordBatchReader` (this can be easily converted to an `arrow::Table` using
0066 /// `arrow::RecordBatchReader::ToTable()`).
0067 ///
0068 /// Examples of supported types:
0069 /// - `std::vector<std::vector<std::variant<int, bsl::string>>>`
0070 /// - `std::vector<MyRowStruct>`
0071 
0072 /// If `rows` (client’s row-based structure) is not a valid C++ range, the client will
0073 /// need to either make it iterable, or make an adapter/wrapper that is a valid C++
0074 /// range.
0075 
0076 /// The client must provide a `DataPointConvertor` callable type that will convert the
0077 /// structure’s data points into the corresponding arrow types.
0078 
0079 /// Complex nested rows can be supported by providing a custom `row_accessor` instead
0080 /// of the default.
0081 
0082 /// Example usage:
0083 /// \code{.cpp}
0084 /// auto IntConvertor = [](ArrayBuilder& array_builder, int value) {
0085 ///  return static_cast<Int64Builder&>(array_builder).Append(value);
0086 /// };
0087 /// std::vector<std::vector<int>> data = {{1, 2, 4}, {5, 6, 7}};
0088 /// auto batches = RowsToBatches(kTestSchema, data, IntConvertor);
0089 /// \endcode
0090 
0091 /// \param[in] schema - The schema to be used in the `RecordBatchReader`
0092 
0093 /// \param[in] rows - Iterable row-based structure that will be converted to arrow
0094 /// batches
0095 
0096 /// \param[in] data_point_convertor - Client provided callable type that will convert
0097 /// the structure’s data points into the corresponding arrow types. The convertor must
0098 /// return an error `Status` if an error happens during conversion.
0099 
0100 /// \param[in] row_accessor - In the common case where the value of each row iterated
0101 /// over is it's self also directly iterable, the client can just use the default.
0102 /// The provided callable must take the values of the `rows` range and return a
0103 /// `std::reference_wrapper<Range>` to the data points in a given row. The data points
0104 /// must be in order of their corresponding fields in the schema.
0105 /// see: /ref `MakeDefaultRowAccessor`
0106 
0107 /// \param[in] pool - The MemoryPool to use for allocations.
0108 
0109 /// \param[in] batch_size - Number of rows to insert into each RecordBatch.
0110 
0111 /// \return `Result<std::shared_ptr<RecordBatchReader>>>` result will be a
0112 /// `std::shared_ptr<RecordBatchReader>>` if not errors occurred, else an error status.
0113 template <class Range, class DataPointConvertor,
0114           class RowAccessor = decltype(detail::MakeDefaultRowAccessor())>
0115 [[nodiscard]] typename std::enable_if_t<detail::is_range<Range>::value,
0116                                         Result<std::shared_ptr<RecordBatchReader>>>
0117 /* Result<std::shared_ptr<RecordBatchReader>>> */ RowsToBatches(
0118     const std::shared_ptr<Schema>& schema, const Range& rows,
0119     DataPointConvertor&& data_point_convertor,
0120     RowAccessor&& row_accessor = detail::MakeDefaultRowAccessor(),
0121     MemoryPool* pool = default_memory_pool(), const std::size_t batch_size = 1024) {
0122   auto make_next_batch =
0123       [pool = pool, batch_size = batch_size, rows_ittr = std::begin(rows),
0124        rows_ittr_end = std::end(rows), schema = schema,
0125        row_accessor = std::forward<RowAccessor>(row_accessor),
0126        data_point_convertor = std::forward<DataPointConvertor>(
0127            data_point_convertor)]() mutable -> Result<std::shared_ptr<RecordBatch>> {
0128     if (rows_ittr == rows_ittr_end) return NULLPTR;
0129 
0130     ARROW_ASSIGN_OR_RAISE(auto record_batch_builder,
0131                           RecordBatchBuilder::Make(schema, pool, batch_size));
0132 
0133     for (size_t i = 0; i < batch_size && (rows_ittr != rows_ittr_end);
0134          i++, std::advance(rows_ittr, 1)) {
0135       int col_index = 0;
0136       ARROW_ASSIGN_OR_RAISE(const auto row, row_accessor(*rows_ittr));
0137 
0138       // If the accessor returns a `std::reference_wrapper` unwrap if
0139       const auto& row_unwrapped = [&]() {
0140         if constexpr (detail::is_range<decltype(row)>::value)
0141           return row;
0142         else
0143           return row.get();
0144       }();
0145 
0146       for (auto& data_point : row_unwrapped) {
0147         ArrayBuilder* array_builder = record_batch_builder->GetField(col_index);
0148         ARROW_RETURN_IF(array_builder == NULLPTR,
0149                         Status::Invalid("array_builder == NULLPTR"));
0150 
0151         ARROW_RETURN_NOT_OK(data_point_convertor(*array_builder, data_point));
0152         col_index++;
0153       }
0154     }
0155 
0156     ARROW_ASSIGN_OR_RAISE(auto result, record_batch_builder->Flush());
0157     return result;
0158   };
0159   return RecordBatchReader::MakeFromIterator(MakeFunctionIterator(make_next_batch),
0160                                              schema);
0161 }
0162 
0163 }  // namespace arrow::util