![]() |
|
|||
File indexing completed on 2025-08-28 08:27:10
0001 // Licensed to the Apache Software Foundation (ASF) under one 0002 // or more contributor license agreements. See the NOTICE file 0003 // distributed with this work for additional information 0004 // regarding copyright ownership. The ASF licenses this file 0005 // to you under the Apache License, Version 2.0 (the 0006 // "License"); you may not use this file except in compliance 0007 // with the License. You may obtain a copy of the License at 0008 // 0009 // http://www.apache.org/licenses/LICENSE-2.0 0010 // 0011 // Unless required by applicable law or agreed to in writing, 0012 // software distributed under the License is distributed on an 0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 0014 // KIND, either express or implied. See the License for the 0015 // specific language governing permissions and limitations 0016 // under the License. 0017 0018 #pragma once 0019 0020 #include "arrow/record_batch.h" 0021 #include "arrow/result.h" 0022 #include "arrow/status.h" 0023 #include "arrow/table_builder.h" 0024 #include "arrow/util/iterator.h" 0025 0026 #include <type_traits> 0027 0028 namespace arrow::util { 0029 0030 namespace detail { 0031 0032 // Default identity function row accessor. Used to for the common case where the value 0033 // of each row iterated over is it's self also directly iterable. 0034 [[nodiscard]] constexpr inline auto MakeDefaultRowAccessor() { 0035 return [](auto& x) -> Result<decltype(std::ref(x))> { return std::ref(x); }; 0036 } 0037 0038 // Meta-function to check if a type `T` is a range (iterable using `std::begin()` / 0039 // `std::end()`). `is_range<T>::value` will be false if `T` is not a valid range. 0040 template <typename T, typename = void> 0041 struct is_range : std::false_type {}; 0042 0043 template <typename T> 0044 struct is_range<T, std::void_t<decltype(std::begin(std::declval<T>())), 0045 decltype(std::end(std::declval<T>()))>> : std::true_type { 0046 }; 0047 0048 } // namespace detail 0049 0050 /// Delete overload for `const Range&& rows` because the data's lifetime must exceed 0051 /// the lifetime of the function call. `data` will be read when client uses the 0052 /// `RecordBatchReader` 0053 template <class Range, class DataPointConvertor, 0054 class RowAccessor = decltype(detail::MakeDefaultRowAccessor())> 0055 [[nodiscard]] typename std::enable_if_t<detail::is_range<Range>::value, 0056 Result<std::shared_ptr<RecordBatchReader>>> 0057 /* Result<std::shared_ptr<RecordBatchReader>>> */ RowsToBatches( 0058 const std::shared_ptr<Schema>& schema, const Range&& rows, 0059 DataPointConvertor&& data_point_convertor, 0060 RowAccessor&& row_accessor = detail::MakeDefaultRowAccessor(), 0061 MemoryPool* pool = default_memory_pool(), 0062 const std::size_t batch_size = 1024) = delete; 0063 0064 /// \brief Utility function for converting any row-based structure into an 0065 /// `arrow::RecordBatchReader` (this can be easily converted to an `arrow::Table` using 0066 /// `arrow::RecordBatchReader::ToTable()`). 0067 /// 0068 /// Examples of supported types: 0069 /// - `std::vector<std::vector<std::variant<int, bsl::string>>>` 0070 /// - `std::vector<MyRowStruct>` 0071 0072 /// If `rows` (client’s row-based structure) is not a valid C++ range, the client will 0073 /// need to either make it iterable, or make an adapter/wrapper that is a valid C++ 0074 /// range. 0075 0076 /// The client must provide a `DataPointConvertor` callable type that will convert the 0077 /// structure’s data points into the corresponding arrow types. 0078 0079 /// Complex nested rows can be supported by providing a custom `row_accessor` instead 0080 /// of the default. 0081 0082 /// Example usage: 0083 /// \code{.cpp} 0084 /// auto IntConvertor = [](ArrayBuilder& array_builder, int value) { 0085 /// return static_cast<Int64Builder&>(array_builder).Append(value); 0086 /// }; 0087 /// std::vector<std::vector<int>> data = {{1, 2, 4}, {5, 6, 7}}; 0088 /// auto batches = RowsToBatches(kTestSchema, data, IntConvertor); 0089 /// \endcode 0090 0091 /// \param[in] schema - The schema to be used in the `RecordBatchReader` 0092 0093 /// \param[in] rows - Iterable row-based structure that will be converted to arrow 0094 /// batches 0095 0096 /// \param[in] data_point_convertor - Client provided callable type that will convert 0097 /// the structure’s data points into the corresponding arrow types. The convertor must 0098 /// return an error `Status` if an error happens during conversion. 0099 0100 /// \param[in] row_accessor - In the common case where the value of each row iterated 0101 /// over is it's self also directly iterable, the client can just use the default. 0102 /// The provided callable must take the values of the `rows` range and return a 0103 /// `std::reference_wrapper<Range>` to the data points in a given row. The data points 0104 /// must be in order of their corresponding fields in the schema. 0105 /// see: /ref `MakeDefaultRowAccessor` 0106 0107 /// \param[in] pool - The MemoryPool to use for allocations. 0108 0109 /// \param[in] batch_size - Number of rows to insert into each RecordBatch. 0110 0111 /// \return `Result<std::shared_ptr<RecordBatchReader>>>` result will be a 0112 /// `std::shared_ptr<RecordBatchReader>>` if not errors occurred, else an error status. 0113 template <class Range, class DataPointConvertor, 0114 class RowAccessor = decltype(detail::MakeDefaultRowAccessor())> 0115 [[nodiscard]] typename std::enable_if_t<detail::is_range<Range>::value, 0116 Result<std::shared_ptr<RecordBatchReader>>> 0117 /* Result<std::shared_ptr<RecordBatchReader>>> */ RowsToBatches( 0118 const std::shared_ptr<Schema>& schema, const Range& rows, 0119 DataPointConvertor&& data_point_convertor, 0120 RowAccessor&& row_accessor = detail::MakeDefaultRowAccessor(), 0121 MemoryPool* pool = default_memory_pool(), const std::size_t batch_size = 1024) { 0122 auto make_next_batch = 0123 [pool = pool, batch_size = batch_size, rows_ittr = std::begin(rows), 0124 rows_ittr_end = std::end(rows), schema = schema, 0125 row_accessor = std::forward<RowAccessor>(row_accessor), 0126 data_point_convertor = std::forward<DataPointConvertor>( 0127 data_point_convertor)]() mutable -> Result<std::shared_ptr<RecordBatch>> { 0128 if (rows_ittr == rows_ittr_end) return NULLPTR; 0129 0130 ARROW_ASSIGN_OR_RAISE(auto record_batch_builder, 0131 RecordBatchBuilder::Make(schema, pool, batch_size)); 0132 0133 for (size_t i = 0; i < batch_size && (rows_ittr != rows_ittr_end); 0134 i++, std::advance(rows_ittr, 1)) { 0135 int col_index = 0; 0136 ARROW_ASSIGN_OR_RAISE(const auto row, row_accessor(*rows_ittr)); 0137 0138 // If the accessor returns a `std::reference_wrapper` unwrap if 0139 const auto& row_unwrapped = [&]() { 0140 if constexpr (detail::is_range<decltype(row)>::value) 0141 return row; 0142 else 0143 return row.get(); 0144 }(); 0145 0146 for (auto& data_point : row_unwrapped) { 0147 ArrayBuilder* array_builder = record_batch_builder->GetField(col_index); 0148 ARROW_RETURN_IF(array_builder == NULLPTR, 0149 Status::Invalid("array_builder == NULLPTR")); 0150 0151 ARROW_RETURN_NOT_OK(data_point_convertor(*array_builder, data_point)); 0152 col_index++; 0153 } 0154 } 0155 0156 ARROW_ASSIGN_OR_RAISE(auto result, record_batch_builder->Flush()); 0157 return result; 0158 }; 0159 return RecordBatchReader::MakeFromIterator(MakeFunctionIterator(make_next_batch), 0160 schema); 0161 } 0162 0163 } // namespace arrow::util
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
![]() ![]() |