File indexing completed on 2025-08-28 08:26:53
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 #pragma once
0019
0020 #include <cassert>
0021 #include <cstdint>
0022 #include <memory>
0023 #include <string>
0024 #include <vector>
0025
0026 #include "arrow/type.h" // for DataType, FieldRef, Field and Schema
0027
0028 namespace arrow {
0029
0030 using internal::checked_cast;
0031
0032 namespace acero {
0033
0034
0035
0036 enum class HashJoinProjection : int {
0037 INPUT = 0,
0038 KEY = 1,
0039 PAYLOAD = 2,
0040 FILTER = 3,
0041 OUTPUT = 4
0042 };
0043
0044 struct SchemaProjectionMap {
0045 static constexpr int kMissingField = -1;
0046 int num_cols;
0047 const int* source_to_base;
0048 const int* base_to_target;
0049 inline int get(int i) const {
0050 assert(i >= 0 && i < num_cols);
0051 assert(source_to_base[i] != kMissingField);
0052 return base_to_target[source_to_base[i]];
0053 }
0054 };
0055
0056
0057
0058
0059
0060
0061 template <typename ProjectionIdEnum>
0062 class SchemaProjectionMaps {
0063 public:
0064 static constexpr int kMissingField = -1;
0065
0066 Status Init(ProjectionIdEnum full_schema_handle, const Schema& schema,
0067 const std::vector<ProjectionIdEnum>& projection_handles,
0068 const std::vector<const std::vector<FieldRef>*>& projections) {
0069 assert(projection_handles.size() == projections.size());
0070 ARROW_RETURN_NOT_OK(RegisterSchema(full_schema_handle, schema));
0071 for (size_t i = 0; i < projections.size(); ++i) {
0072 ARROW_RETURN_NOT_OK(
0073 RegisterProjectedSchema(projection_handles[i], *(projections[i]), schema));
0074 }
0075 RegisterEnd();
0076 return Status::OK();
0077 }
0078
0079 int num_cols(ProjectionIdEnum schema_handle) const {
0080 int id = schema_id(schema_handle);
0081 return static_cast<int>(schemas_[id].second.data_types.size());
0082 }
0083
0084 bool is_empty(ProjectionIdEnum schema_handle) const {
0085 return num_cols(schema_handle) == 0;
0086 }
0087
0088 const std::string& field_name(ProjectionIdEnum schema_handle, int field_id) const {
0089 int id = schema_id(schema_handle);
0090 return schemas_[id].second.field_names[field_id];
0091 }
0092
0093 const std::shared_ptr<DataType>& data_type(ProjectionIdEnum schema_handle,
0094 int field_id) const {
0095 int id = schema_id(schema_handle);
0096 return schemas_[id].second.data_types[field_id];
0097 }
0098
0099 const std::vector<std::shared_ptr<DataType>>& data_types(
0100 ProjectionIdEnum schema_handle) const {
0101 int id = schema_id(schema_handle);
0102 return schemas_[id].second.data_types;
0103 }
0104
0105 SchemaProjectionMap map(ProjectionIdEnum from, ProjectionIdEnum to) const {
0106 int id_from = schema_id(from);
0107 int id_to = schema_id(to);
0108 SchemaProjectionMap result;
0109 result.num_cols = num_cols(from);
0110 result.source_to_base = mappings_[id_from].data();
0111 result.base_to_target = inverse_mappings_[id_to].data();
0112 return result;
0113 }
0114
0115 protected:
0116 struct FieldInfos {
0117 std::vector<int> field_paths;
0118 std::vector<std::string> field_names;
0119 std::vector<std::shared_ptr<DataType>> data_types;
0120 };
0121
0122 Status RegisterSchema(ProjectionIdEnum handle, const Schema& schema) {
0123 FieldInfos out_fields;
0124 const FieldVector& in_fields = schema.fields();
0125 out_fields.field_paths.resize(in_fields.size());
0126 out_fields.field_names.resize(in_fields.size());
0127 out_fields.data_types.resize(in_fields.size());
0128 for (size_t i = 0; i < in_fields.size(); ++i) {
0129 const std::string& name = in_fields[i]->name();
0130 const std::shared_ptr<DataType>& type = in_fields[i]->type();
0131 out_fields.field_paths[i] = static_cast<int>(i);
0132 out_fields.field_names[i] = name;
0133 out_fields.data_types[i] = type;
0134 }
0135 schemas_.push_back(std::make_pair(handle, out_fields));
0136 return Status::OK();
0137 }
0138
0139 Status RegisterProjectedSchema(ProjectionIdEnum handle,
0140 const std::vector<FieldRef>& selected_fields,
0141 const Schema& full_schema) {
0142 FieldInfos out_fields;
0143 const FieldVector& in_fields = full_schema.fields();
0144 out_fields.field_paths.resize(selected_fields.size());
0145 out_fields.field_names.resize(selected_fields.size());
0146 out_fields.data_types.resize(selected_fields.size());
0147 for (size_t i = 0; i < selected_fields.size(); ++i) {
0148
0149 ARROW_ASSIGN_OR_RAISE(auto match, selected_fields[i].FindOne(full_schema));
0150 const std::string& name = in_fields[match[0]]->name();
0151 const std::shared_ptr<DataType>& type = in_fields[match[0]]->type();
0152 out_fields.field_paths[i] = match[0];
0153 out_fields.field_names[i] = name;
0154 out_fields.data_types[i] = type;
0155 }
0156 schemas_.push_back(std::make_pair(handle, out_fields));
0157 return Status::OK();
0158 }
0159
0160 void RegisterEnd() {
0161 size_t size = schemas_.size();
0162 mappings_.resize(size);
0163 inverse_mappings_.resize(size);
0164 int id_base = 0;
0165 for (size_t i = 0; i < size; ++i) {
0166 GenerateMapForProjection(static_cast<int>(i), id_base);
0167 }
0168 }
0169
0170 int schema_id(ProjectionIdEnum schema_handle) const {
0171 for (size_t i = 0; i < schemas_.size(); ++i) {
0172 if (schemas_[i].first == schema_handle) {
0173 return static_cast<int>(i);
0174 }
0175 }
0176
0177 assert(false);
0178 return -1;
0179 }
0180
0181 void GenerateMapForProjection(int id_proj, int id_base) {
0182 int num_cols_proj = static_cast<int>(schemas_[id_proj].second.data_types.size());
0183 int num_cols_base = static_cast<int>(schemas_[id_base].second.data_types.size());
0184
0185 std::vector<int>& mapping = mappings_[id_proj];
0186 std::vector<int>& inverse_mapping = inverse_mappings_[id_proj];
0187 mapping.resize(num_cols_proj);
0188 inverse_mapping.resize(num_cols_base);
0189
0190 if (id_proj == id_base) {
0191 for (int i = 0; i < num_cols_base; ++i) {
0192 mapping[i] = inverse_mapping[i] = i;
0193 }
0194 } else {
0195 const FieldInfos& fields_proj = schemas_[id_proj].second;
0196 const FieldInfos& fields_base = schemas_[id_base].second;
0197 for (int i = 0; i < num_cols_base; ++i) {
0198 inverse_mapping[i] = SchemaProjectionMap::kMissingField;
0199 }
0200 for (int i = 0; i < num_cols_proj; ++i) {
0201 int field_id = SchemaProjectionMap::kMissingField;
0202 for (int j = 0; j < num_cols_base; ++j) {
0203 if (fields_proj.field_paths[i] == fields_base.field_paths[j]) {
0204 field_id = j;
0205
0206
0207 break;
0208 }
0209 }
0210 assert(field_id != SchemaProjectionMap::kMissingField);
0211 mapping[i] = field_id;
0212 inverse_mapping[field_id] = i;
0213 }
0214 }
0215 }
0216
0217
0218 std::vector<std::pair<ProjectionIdEnum, FieldInfos>> schemas_;
0219 std::vector<std::vector<int>> mappings_;
0220 std::vector<std::vector<int>> inverse_mappings_;
0221 };
0222
0223 using HashJoinProjectionMaps = SchemaProjectionMaps<HashJoinProjection>;
0224
0225 }
0226 }