Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:53

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <cassert>
0021 #include <cstdint>
0022 #include <memory>
0023 #include <string>
0024 #include <vector>
0025 
0026 #include "arrow/type.h"  // for DataType, FieldRef, Field and Schema
0027 
0028 namespace arrow {
0029 
0030 using internal::checked_cast;
0031 
0032 namespace acero {
0033 
0034 // Identifiers for all different row schemas that are used in a join
0035 //
0036 enum class HashJoinProjection : int {
0037   INPUT = 0,
0038   KEY = 1,
0039   PAYLOAD = 2,
0040   FILTER = 3,
0041   OUTPUT = 4
0042 };
0043 
0044 struct SchemaProjectionMap {
0045   static constexpr int kMissingField = -1;
0046   int num_cols;
0047   const int* source_to_base;
0048   const int* base_to_target;
0049   inline int get(int i) const {
0050     assert(i >= 0 && i < num_cols);
0051     assert(source_to_base[i] != kMissingField);
0052     return base_to_target[source_to_base[i]];
0053   }
0054 };
0055 
0056 /// Helper class for managing different projections of the same row schema.
0057 /// Used to efficiently map any field in one projection to a corresponding field in
0058 /// another projection.
0059 /// Materialized mappings are generated lazily at the time of the first access.
0060 /// Thread-safe apart from initialization.
0061 template <typename ProjectionIdEnum>
0062 class SchemaProjectionMaps {
0063  public:
0064   static constexpr int kMissingField = -1;
0065 
0066   Status Init(ProjectionIdEnum full_schema_handle, const Schema& schema,
0067               const std::vector<ProjectionIdEnum>& projection_handles,
0068               const std::vector<const std::vector<FieldRef>*>& projections) {
0069     assert(projection_handles.size() == projections.size());
0070     ARROW_RETURN_NOT_OK(RegisterSchema(full_schema_handle, schema));
0071     for (size_t i = 0; i < projections.size(); ++i) {
0072       ARROW_RETURN_NOT_OK(
0073           RegisterProjectedSchema(projection_handles[i], *(projections[i]), schema));
0074     }
0075     RegisterEnd();
0076     return Status::OK();
0077   }
0078 
0079   int num_cols(ProjectionIdEnum schema_handle) const {
0080     int id = schema_id(schema_handle);
0081     return static_cast<int>(schemas_[id].second.data_types.size());
0082   }
0083 
0084   bool is_empty(ProjectionIdEnum schema_handle) const {
0085     return num_cols(schema_handle) == 0;
0086   }
0087 
0088   const std::string& field_name(ProjectionIdEnum schema_handle, int field_id) const {
0089     int id = schema_id(schema_handle);
0090     return schemas_[id].second.field_names[field_id];
0091   }
0092 
0093   const std::shared_ptr<DataType>& data_type(ProjectionIdEnum schema_handle,
0094                                              int field_id) const {
0095     int id = schema_id(schema_handle);
0096     return schemas_[id].second.data_types[field_id];
0097   }
0098 
0099   const std::vector<std::shared_ptr<DataType>>& data_types(
0100       ProjectionIdEnum schema_handle) const {
0101     int id = schema_id(schema_handle);
0102     return schemas_[id].second.data_types;
0103   }
0104 
0105   SchemaProjectionMap map(ProjectionIdEnum from, ProjectionIdEnum to) const {
0106     int id_from = schema_id(from);
0107     int id_to = schema_id(to);
0108     SchemaProjectionMap result;
0109     result.num_cols = num_cols(from);
0110     result.source_to_base = mappings_[id_from].data();
0111     result.base_to_target = inverse_mappings_[id_to].data();
0112     return result;
0113   }
0114 
0115  protected:
0116   struct FieldInfos {
0117     std::vector<int> field_paths;
0118     std::vector<std::string> field_names;
0119     std::vector<std::shared_ptr<DataType>> data_types;
0120   };
0121 
0122   Status RegisterSchema(ProjectionIdEnum handle, const Schema& schema) {
0123     FieldInfos out_fields;
0124     const FieldVector& in_fields = schema.fields();
0125     out_fields.field_paths.resize(in_fields.size());
0126     out_fields.field_names.resize(in_fields.size());
0127     out_fields.data_types.resize(in_fields.size());
0128     for (size_t i = 0; i < in_fields.size(); ++i) {
0129       const std::string& name = in_fields[i]->name();
0130       const std::shared_ptr<DataType>& type = in_fields[i]->type();
0131       out_fields.field_paths[i] = static_cast<int>(i);
0132       out_fields.field_names[i] = name;
0133       out_fields.data_types[i] = type;
0134     }
0135     schemas_.push_back(std::make_pair(handle, out_fields));
0136     return Status::OK();
0137   }
0138 
0139   Status RegisterProjectedSchema(ProjectionIdEnum handle,
0140                                  const std::vector<FieldRef>& selected_fields,
0141                                  const Schema& full_schema) {
0142     FieldInfos out_fields;
0143     const FieldVector& in_fields = full_schema.fields();
0144     out_fields.field_paths.resize(selected_fields.size());
0145     out_fields.field_names.resize(selected_fields.size());
0146     out_fields.data_types.resize(selected_fields.size());
0147     for (size_t i = 0; i < selected_fields.size(); ++i) {
0148       // All fields must be found in schema without ambiguity
0149       ARROW_ASSIGN_OR_RAISE(auto match, selected_fields[i].FindOne(full_schema));
0150       const std::string& name = in_fields[match[0]]->name();
0151       const std::shared_ptr<DataType>& type = in_fields[match[0]]->type();
0152       out_fields.field_paths[i] = match[0];
0153       out_fields.field_names[i] = name;
0154       out_fields.data_types[i] = type;
0155     }
0156     schemas_.push_back(std::make_pair(handle, out_fields));
0157     return Status::OK();
0158   }
0159 
0160   void RegisterEnd() {
0161     size_t size = schemas_.size();
0162     mappings_.resize(size);
0163     inverse_mappings_.resize(size);
0164     int id_base = 0;
0165     for (size_t i = 0; i < size; ++i) {
0166       GenerateMapForProjection(static_cast<int>(i), id_base);
0167     }
0168   }
0169 
0170   int schema_id(ProjectionIdEnum schema_handle) const {
0171     for (size_t i = 0; i < schemas_.size(); ++i) {
0172       if (schemas_[i].first == schema_handle) {
0173         return static_cast<int>(i);
0174       }
0175     }
0176     // We should never get here
0177     assert(false);
0178     return -1;
0179   }
0180 
0181   void GenerateMapForProjection(int id_proj, int id_base) {
0182     int num_cols_proj = static_cast<int>(schemas_[id_proj].second.data_types.size());
0183     int num_cols_base = static_cast<int>(schemas_[id_base].second.data_types.size());
0184 
0185     std::vector<int>& mapping = mappings_[id_proj];
0186     std::vector<int>& inverse_mapping = inverse_mappings_[id_proj];
0187     mapping.resize(num_cols_proj);
0188     inverse_mapping.resize(num_cols_base);
0189 
0190     if (id_proj == id_base) {
0191       for (int i = 0; i < num_cols_base; ++i) {
0192         mapping[i] = inverse_mapping[i] = i;
0193       }
0194     } else {
0195       const FieldInfos& fields_proj = schemas_[id_proj].second;
0196       const FieldInfos& fields_base = schemas_[id_base].second;
0197       for (int i = 0; i < num_cols_base; ++i) {
0198         inverse_mapping[i] = SchemaProjectionMap::kMissingField;
0199       }
0200       for (int i = 0; i < num_cols_proj; ++i) {
0201         int field_id = SchemaProjectionMap::kMissingField;
0202         for (int j = 0; j < num_cols_base; ++j) {
0203           if (fields_proj.field_paths[i] == fields_base.field_paths[j]) {
0204             field_id = j;
0205             // If there are multiple matches for the same input field,
0206             // it will be mapped to the first match.
0207             break;
0208           }
0209         }
0210         assert(field_id != SchemaProjectionMap::kMissingField);
0211         mapping[i] = field_id;
0212         inverse_mapping[field_id] = i;
0213       }
0214     }
0215   }
0216 
0217   // vector used as a mapping from ProjectionIdEnum to fields
0218   std::vector<std::pair<ProjectionIdEnum, FieldInfos>> schemas_;
0219   std::vector<std::vector<int>> mappings_;
0220   std::vector<std::vector<int>> inverse_mappings_;
0221 };
0222 
0223 using HashJoinProjectionMaps = SchemaProjectionMaps<HashJoinProjection>;
0224 
0225 }  // namespace acero
0226 }  // namespace arrow