This is an automated email from the ASF dual-hosted git repository. zhangyue19921010 pushed a commit to branch HUDI-8990-Part2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 2a2886f4340ef90adb653849a38307c5b7f9a5c9 Author: YueZhang <zhangyue921...@163.com> AuthorDate: Mon Mar 31 21:29:02 2025 +0800 Flink && Spark query adopt partition bucket index based buckid pruning --- .../apache/hudi/index/bucket/BucketIdentifier.java | 10 +- .../index/bucket/partition/NumBucketsFunction.java | 22 ++- .../model/PartitionBucketIndexHashingConfig.java | 34 +++- .../java/org/apache/hudi/source/FileIndex.java | 65 ++++--- .../hudi/source/prune/PrimaryKeyPruners.java | 4 +- .../org/apache/hudi/table/HoodieTableSource.java | 26 ++- .../hudi/table/TestPartitionBucketPruning.java | 196 ++++++++++++++++++++ .../scala/org/apache/hudi/BucketIndexSupport.scala | 4 +- .../scala/org/apache/hudi/HoodieFileIndex.scala | 12 +- .../apache/hudi/PartitionBucketIndexSupport.scala | 85 +++++++++ .../TestPartitionBucketIndexSupport.scala | 201 +++++++++++++++++++++ 11 files changed, 613 insertions(+), 46 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java index 1f7b141061c..5c15a4c5683 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java @@ -38,7 +38,15 @@ public class BucketIdentifier implements Serializable { } public static int getBucketId(List<String> hashKeyFields, int numBuckets) { - return (hashKeyFields.hashCode() & Integer.MAX_VALUE) % numBuckets; + return getBucketId(getFieldsHashing(hashKeyFields), numBuckets); + } + + public static int getBucketId(int fieldsHashing, int numBuckets) { + return fieldsHashing % numBuckets; + } + + public static int getFieldsHashing(List<String> hashKeyFields) { + return (hashKeyFields.hashCode() & Integer.MAX_VALUE); } protected static List<String> getHashKeys(String recordKey, String indexKeyFields) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/partition/NumBucketsFunction.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/partition/NumBucketsFunction.java index 0587304d760..c290fcf23b4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/partition/NumBucketsFunction.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/partition/NumBucketsFunction.java @@ -18,6 +18,8 @@ package org.apache.hudi.index.bucket.partition; +import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -49,8 +51,6 @@ public class NumBucketsFunction implements Serializable { * Calculator for partition-specific bucket numbers. */ private final PartitionBucketIndexCalculator calculator; - private final String expressions; - private final String ruleType; /** * Creates a NumBucketsFunction with the given configuration. @@ -59,8 +59,6 @@ public class NumBucketsFunction implements Serializable { */ public NumBucketsFunction(String expressions, String ruleType, int defaultBucketNumber) { this.defaultBucketNumber = defaultBucketNumber; - this.expressions = expressions; - this.ruleType = ruleType; this.isPartitionLevelBucketIndexEnabled = StringUtils.nonEmpty(expressions); if (isPartitionLevelBucketIndexEnabled) { this.calculator = PartitionBucketIndexCalculator.getInstance( @@ -73,6 +71,12 @@ public class NumBucketsFunction implements Serializable { } } + public NumBucketsFunction(int defaultBucketNumber) { + this.defaultBucketNumber = defaultBucketNumber; + this.isPartitionLevelBucketIndexEnabled = false; + this.calculator = null; + } + public static NumBucketsFunction fromWriteConfig(HoodieWriteConfig writeConfig) { String expression = writeConfig.getBucketIndexPartitionExpression(); String ruleType = writeConfig.getBucketIndexPartitionRuleType(); @@ -80,6 +84,16 @@ public class NumBucketsFunction implements Serializable { return new NumBucketsFunction(expression, ruleType, numBuckets); } + public static NumBucketsFunction fromMetaClient(HoodieTableMetaClient metaClient, int defaultBucketNumber) { + if (PartitionBucketIndexUtils.isPartitionSimpleBucketIndex(metaClient.getStorageConf(), metaClient.getBasePath().toString())) { + PartitionBucketIndexHashingConfig hashingConfig = PartitionBucketIndexHashingConfig.loadingLatestHashingConfig(metaClient); + return new NumBucketsFunction(hashingConfig.getExpressions(), hashingConfig.getRule(), hashingConfig.getDefaultBucketNumber()); + } else { + return new NumBucketsFunction(defaultBucketNumber); + } + } + + /** * Gets the number of buckets for the given partition path. * diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java index c186e4f049e..9c3431050b3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java @@ -196,24 +196,50 @@ public class PartitionBucketIndexHashingConfig implements Serializable { /** * Get Latest committed hashing config instant to load. + * If instant is empty, then return latest hashing config instant */ - public static String getLatestHashingConfigInstantToLoad(HoodieTableMetaClient metaClient) { + public static Option<String> getHashingConfigInstantToLoad(HoodieTableMetaClient metaClient, Option<String> instant) { try { List<String> allCommittedHashingConfig = getCommittedHashingConfigInstants(metaClient); - return allCommittedHashingConfig.get(allCommittedHashingConfig.size() - 1); + if (instant.isPresent()) { + Option<String> res = getHashingConfigInstantToLoadBeforeOrOn(allCommittedHashingConfig, instant.get()); + // fall back to look up archived hashing config instant before return empty + return res.isPresent() ? res : getHashingConfigInstantToLoadBeforeOrOn(getArchiveHashingConfigInstants(metaClient), instant.get()); + } else { + return Option.of(allCommittedHashingConfig.get(allCommittedHashingConfig.size() - 1)); + } } catch (Exception e) { throw new HoodieException("Failed to get hashing config instant to load.", e); } } + private static Option<String> getHashingConfigInstantToLoadBeforeOrOn(List<String> hashingConfigInstants, String instant) { + List<String> res = hashingConfigInstants.stream().filter(hashingConfigInstant -> { + return hashingConfigInstant.compareTo(instant) <= 0; + }).collect(Collectors.toList()); + return res.isEmpty() ? Option.empty() : Option.of(res.get(res.size() - 1)); + } + public static PartitionBucketIndexHashingConfig loadingLatestHashingConfig(HoodieTableMetaClient metaClient) { - String instantToLoad = getLatestHashingConfigInstantToLoad(metaClient); - Option<PartitionBucketIndexHashingConfig> latestHashingConfig = loadHashingConfig(metaClient.getStorage(), getHashingConfigPath(metaClient.getBasePath().toString(), instantToLoad)); + Option<String> instantToLoad = getHashingConfigInstantToLoad(metaClient, Option.empty()); + ValidationUtils.checkArgument(instantToLoad.isPresent(), "Can not load latest hashing config " + instantToLoad); + Option<PartitionBucketIndexHashingConfig> latestHashingConfig = loadHashingConfig(metaClient.getStorage(), getHashingConfigPath(metaClient.getBasePath().toString(), instantToLoad.get())); ValidationUtils.checkArgument(latestHashingConfig.isPresent(), "Can not load latest hashing config " + instantToLoad); return latestHashingConfig.get(); } + public static Option<PartitionBucketIndexHashingConfig> loadingLatestHashingConfigBeforeOrOn(HoodieTableMetaClient metaClient, String instant) { + Option<String> hashingConfigInstantToLoad = getHashingConfigInstantToLoad(metaClient, Option.of(instant)); + if (hashingConfigInstantToLoad.isPresent()) { + Option<PartitionBucketIndexHashingConfig> latestHashingConfig = loadHashingConfig(metaClient.getStorage(), getHashingConfigPath(metaClient.getBasePath().toString(), hashingConfigInstantToLoad.get())); + ValidationUtils.checkArgument(latestHashingConfig.isPresent(), "Can not load hashing config " + hashingConfigInstantToLoad + " based on " + instant); + return latestHashingConfig; + } else { + return Option.empty(); + } + } + /** * Archive hashing config. */ diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java index a8ff13fe621..fdc629f9d56 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java @@ -25,6 +25,7 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.index.bucket.BucketIdentifier; +import org.apache.hudi.index.bucket.partition.NumBucketsFunction; import org.apache.hudi.source.prune.ColumnStatsProbe; import org.apache.hudi.source.prune.PartitionPruners; import org.apache.hudi.source.prune.PrimaryKeyPruners; @@ -68,9 +69,10 @@ public class FileIndex implements Serializable { private final org.apache.hadoop.conf.Configuration hadoopConf; private final PartitionPruners.PartitionPruner partitionPruner; // for partition pruning private final ColumnStatsProbe colStatsProbe; // for probing column stats - private final int dataBucket; // for bucket pruning + private final int dataBucketHashing; // for bucket pruning private List<String> partitionPaths; // cache of partition paths private final FileStatsIndex fileStatsIndex; // for data skipping + private final NumBucketsFunction numBucketsFunction; private FileIndex( StoragePath path, @@ -78,15 +80,19 @@ public class FileIndex implements Serializable { RowType rowType, ColumnStatsProbe colStatsProbe, PartitionPruners.PartitionPruner partitionPruner, - int dataBucket) { + int dataBucketHashing, + NumBucketsFunction numBucketsFunction) { this.path = path; this.hadoopConf = HadoopConfigurations.getHadoopConf(conf); this.tableExists = StreamerUtil.tableExists(path.toString(), hadoopConf); this.metadataConfig = StreamerUtil.metadataConfig(conf); this.colStatsProbe = isDataSkippingFeasible(conf.get(FlinkOptions.READ_DATA_SKIPPING_ENABLED)) ? colStatsProbe : null; this.partitionPruner = partitionPruner; - this.dataBucket = dataBucket; + this.dataBucketHashing = dataBucketHashing; this.fileStatsIndex = new FileStatsIndex(path.toString(), rowType, metadataConfig); + this.numBucketsFunction = numBucketsFunction == null + ? new NumBucketsFunction(conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS)) + : numBucketsFunction; } /** @@ -157,28 +163,35 @@ public class FileIndex implements Serializable { if (partitions.length < 1) { return Collections.emptyList(); } - List<StoragePathInfo> allFiles = FSUtils.getFilesInPartitions( - new HoodieFlinkEngineContext(hadoopConf), - new HoodieHadoopStorage(path, HadoopFSUtils.getStorageConf(hadoopConf)), metadataConfig, path.toString(), partitions) - .values().stream() - .flatMap(Collection::stream) - .collect(Collectors.toList()); + Map<String, List<StoragePathInfo>> partition2Files = FSUtils.getFilesInPartitions( + new HoodieFlinkEngineContext(hadoopConf), + new HoodieHadoopStorage(path, HadoopFSUtils.getStorageConf(hadoopConf)), metadataConfig, path.toString(), partitions); + + List<StoragePathInfo> allFiles; + // bucket pruning + if (this.dataBucketHashing >= 0) { + allFiles = partition2Files.entrySet().stream().flatMap(entry -> { + String partitionPath = entry.getKey(); + int numBuckets = numBucketsFunction.getNumBuckets(partitionPath); + int bucketId = BucketIdentifier.getBucketId(this.dataBucketHashing, numBuckets); + String bucketIdStr = BucketIdentifier.bucketIdStr(bucketId); + List<StoragePathInfo> innerAllFiles = entry.getValue(); + return innerAllFiles.stream().filter(fileInfo -> fileInfo.getPath().getName().contains(bucketIdStr)); + }).collect(Collectors.toList()); + } else { + allFiles = FSUtils.getFilesInPartitions( + new HoodieFlinkEngineContext(hadoopConf), + new HoodieHadoopStorage(path, HadoopFSUtils.getStorageConf(hadoopConf)), metadataConfig, path.toString(), partitions) + .values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } if (allFiles.isEmpty()) { // returns early for empty table. return allFiles; } - // bucket pruning - if (this.dataBucket >= 0) { - String bucketIdStr = BucketIdentifier.bucketIdStr(this.dataBucket); - List<StoragePathInfo> filesAfterBucketPruning = allFiles.stream() - .filter(fileInfo -> fileInfo.getPath().getName().contains(bucketIdStr)) - .collect(Collectors.toList()); - logPruningMsg(allFiles.size(), filesAfterBucketPruning.size(), "bucket pruning"); - allFiles = filesAfterBucketPruning; - } - // data skipping Set<String> candidateFiles = fileStatsIndex.computeCandidateFiles( colStatsProbe, allFiles.stream().map(f -> f.getPath().getName()).collect(Collectors.toList())); @@ -286,7 +299,8 @@ public class FileIndex implements Serializable { private RowType rowType; private ColumnStatsProbe columnStatsProbe; private PartitionPruners.PartitionPruner partitionPruner; - private int dataBucket = PrimaryKeyPruners.BUCKET_ID_NO_PRUNING; + private int dataBucketHashing = PrimaryKeyPruners.BUCKET_ID_NO_PRUNING; + private NumBucketsFunction numBucketFunction; private Builder() { } @@ -316,14 +330,19 @@ public class FileIndex implements Serializable { return this; } - public Builder dataBucket(int dataBucket) { - this.dataBucket = dataBucket; + public Builder dataBucketHashing(int dataBucketHashing) { + this.dataBucketHashing = dataBucketHashing; + return this; + } + + public Builder numBucketFunction(NumBucketsFunction function) { + this.numBucketFunction = function; return this; } public FileIndex build() { return new FileIndex(Objects.requireNonNull(path), Objects.requireNonNull(conf), Objects.requireNonNull(rowType), - columnStatsProbe, partitionPruner, dataBucket); + columnStatsProbe, partitionPruner, dataBucketHashing, numBucketFunction); } } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java index 05b69bdcf76..98cf6a1bede 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java @@ -45,7 +45,7 @@ public class PrimaryKeyPruners { public static final int BUCKET_ID_NO_PRUNING = -1; - public static int getBucketId(List<ResolvedExpression> hashKeyFilters, Configuration conf) { + public static int getBucketFieldHashing(List<ResolvedExpression> hashKeyFilters, Configuration conf) { List<String> pkFields = Arrays.asList(conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",")); // step1: resolve the hash key values final boolean logicalTimestamp = OptionsResolver.isConsistentLogicalTimestampEnabled(conf); @@ -60,7 +60,7 @@ public class PrimaryKeyPruners { .map(Pair::getValue) .collect(Collectors.toList()); // step2: generate bucket id - return BucketIdentifier.getBucketId(values, conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS)); + return BucketIdentifier.getFieldsHashing(values); } private static Pair<FieldReferenceExpression, ValueLiteralExpression> castChildAs(List<Expression> children) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 5c9cde00ba9..c2c9f10ac5a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -38,6 +38,8 @@ import org.apache.hudi.configuration.OptionsInference; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieValidationException; +import org.apache.hudi.index.bucket.BucketIdentifier; +import org.apache.hudi.index.bucket.partition.NumBucketsFunction; import org.apache.hudi.sink.utils.Pipelines; import org.apache.hudi.source.ExpressionEvaluators; import org.apache.hudi.source.ExpressionPredicates; @@ -156,7 +158,7 @@ public class HoodieTableSource implements private List<Predicate> predicates; private ColumnStatsProbe columnStatsProbe; private PartitionPruners.PartitionPruner partitionPruner; - private int dataBucket; + private int dataBucketHashing; private transient FileIndex fileIndex; public HoodieTableSource( @@ -177,7 +179,7 @@ public class HoodieTableSource implements @Nullable List<Predicate> predicates, @Nullable ColumnStatsProbe columnStatsProbe, @Nullable PartitionPruners.PartitionPruner partitionPruner, - int dataBucket, + int dataBucketHashing, @Nullable int[] requiredPos, @Nullable Long limit, @Nullable HoodieTableMetaClient metaClient, @@ -191,7 +193,7 @@ public class HoodieTableSource implements this.predicates = Optional.ofNullable(predicates).orElse(Collections.emptyList()); this.columnStatsProbe = columnStatsProbe; this.partitionPruner = partitionPruner; - this.dataBucket = dataBucket; + this.dataBucketHashing = dataBucketHashing; this.requiredPos = Optional.ofNullable(requiredPos).orElseGet(() -> IntStream.range(0, this.tableRowType.getFieldCount()).toArray()); this.limit = Optional.ofNullable(limit).orElse(NO_LIMIT_CONSTANT); this.hadoopConf = new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)); @@ -266,7 +268,7 @@ public class HoodieTableSource implements @Override public DynamicTableSource copy() { return new HoodieTableSource(schema, path, partitionKeys, defaultPartName, - conf, predicates, columnStatsProbe, partitionPruner, dataBucket, requiredPos, limit, metaClient, internalSchemaManager); + conf, predicates, columnStatsProbe, partitionPruner, dataBucketHashing, requiredPos, limit, metaClient, internalSchemaManager); } @Override @@ -281,7 +283,7 @@ public class HoodieTableSource implements this.predicates = ExpressionPredicates.fromExpression(splitFilters.f0); this.columnStatsProbe = ColumnStatsProbe.newInstance(splitFilters.f0); this.partitionPruner = createPartitionPruner(splitFilters.f1, columnStatsProbe); - this.dataBucket = getDataBucket(splitFilters.f0); + this.dataBucketHashing = getDataBucketFieldHashing(splitFilters.f0); // refuse all the filters now return SupportsFilterPushDown.Result.of(new ArrayList<>(splitFilters.f1), new ArrayList<>(filters)); } @@ -365,7 +367,7 @@ public class HoodieTableSource implements .build(); } - private int getDataBucket(List<ResolvedExpression> dataFilters) { + private int getDataBucketFieldHashing(List<ResolvedExpression> dataFilters) { if (!OptionsResolver.isBucketIndexType(conf) || dataFilters.isEmpty()) { return PrimaryKeyPruners.BUCKET_ID_NO_PRUNING; } @@ -374,7 +376,7 @@ public class HoodieTableSource implements if (!ExpressionUtils.isFilteringByAllFields(indexKeyFilters, indexKeyFields)) { return PrimaryKeyPruners.BUCKET_ID_NO_PRUNING; } - return PrimaryKeyPruners.getBucketId(indexKeyFilters, conf); + return PrimaryKeyPruners.getBucketFieldHashing(indexKeyFilters, conf); } private List<MergeOnReadInputSplit> buildInputSplits() { @@ -611,7 +613,8 @@ public class HoodieTableSource implements .rowType(this.tableRowType) .columnStatsProbe(this.columnStatsProbe) .partitionPruner(this.partitionPruner) - .dataBucket(this.dataBucket) + .dataBucketHashing(this.dataBucketHashing) + .numBucketFunction(NumBucketsFunction.fromMetaClient(metaClient, conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS))) .build(); } return this.fileIndex; @@ -696,6 +699,11 @@ public class HoodieTableSource implements @VisibleForTesting public int getDataBucket() { - return dataBucket; + return BucketIdentifier.getBucketId(this.dataBucketHashing, conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS)); + } + + @VisibleForTesting + public int getDataBucketHashing() { + return dataBucketHashing; } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestPartitionBucketPruning.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestPartitionBucketPruning.java new file mode 100644 index 00000000000..b2f3f50a861 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestPartitionBucketPruning.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table; + +import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.index.bucket.BucketIdentifier; +import org.apache.hudi.source.prune.PrimaryKeyPruners; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; +import org.apache.hudi.util.SerializableSchema; +import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.utils.TestConfigurations; +import org.apache.hudi.utils.TestData; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.types.DataType; +import org.apache.hadoop.fs.Path; +import org.hamcrest.CoreMatchers; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +public class TestPartitionBucketPruning { + + @TempDir + File tempFile; + + /** + * test single primary key filtering + * @throws Exception + */ + @Test + void testPartitionBucketPruningWithSinglePK() throws Exception { + String tablePath1 = new Path(tempFile.getAbsolutePath(), "tbl1").toString(); + int bucketNumber = 10000; + String expression = "par1|par2|par3|par4,4"; + String rule = "regex"; + Configuration conf1 = TestConfigurations.getDefaultConf(tablePath1); + conf1.setString(FlinkOptions.INDEX_TYPE, "BUCKET"); + conf1.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, bucketNumber); + conf1.set(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS, expression); + conf1.set(FlinkOptions.BUCKET_INDEX_PARTITION_RULE, rule); + + HoodieTableMetaClient metaClient = StreamerUtil.initTableIfNotExists(conf1); + PartitionBucketIndexHashingConfig.saveHashingConfig(metaClient, expression, rule, bucketNumber, null); + + // test single primary key filtering + TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, conf1); + HoodieTableSource tableSource1 = createHoodieTableSource(conf1); + tableSource1.applyFilters(Collections.singletonList( + createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(), "id1"))); + + assertThat(BucketIdentifier.getBucketId(tableSource1.getDataBucketHashing(), 4), is(1)); + List<StoragePathInfo> fileList = tableSource1.getReadFiles(); + assertThat("Files should be pruned by bucket id 1", fileList.size(), CoreMatchers.is(2)); + } + + /** + * test multiple primary keys filtering + * @throws Exception + */ + @Test + void testPartitionBucketPruningWithMultiPK() throws Exception { + String tablePath = new Path(tempFile.getAbsolutePath(), "tbl1").toString(); + int bucketNumber = 10000; + String expression = "par1|par2|par3|par4,4"; + String rule = "regex"; + Configuration conf = TestConfigurations.getDefaultConf(tablePath); + conf.setString(FlinkOptions.INDEX_TYPE, "BUCKET"); + conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, bucketNumber); + conf.set(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS, expression); + conf.set(FlinkOptions.BUCKET_INDEX_PARTITION_RULE, rule); + conf.setString(FlinkOptions.RECORD_KEY_FIELD, "uuid,name"); + conf.setString(FlinkOptions.KEYGEN_TYPE, "COMPLEX"); + + HoodieTableMetaClient metaClient = StreamerUtil.initTableIfNotExists(conf); + PartitionBucketIndexHashingConfig.saveHashingConfig(metaClient, expression, rule, bucketNumber, null); + TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, conf); + HoodieTableSource tableSource = createHoodieTableSource(conf); + tableSource.applyFilters(Arrays.asList( + createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(), "id1"), + createLitEquivalenceExpr("name", 1, DataTypes.STRING().notNull(), "Danny"))); + assertThat(BucketIdentifier.getBucketId(tableSource.getDataBucketHashing(), 4), is(3)); + List<StoragePathInfo> fileList = tableSource.getReadFiles(); + assertThat("Files should be pruned by bucket id 3", fileList.size(), CoreMatchers.is(3)); + } + + /** + * test partial primary keys filtering + * @throws Exception + */ + @Test + void testPartialPartitionBucketPruningWithMultiPK() throws Exception { + String tablePath = new Path(tempFile.getAbsolutePath(), "tbl1").toString(); + int bucketNumber = 10000; + String expression = "par1|par2|par3|par4,4"; + String rule = "regex"; + Configuration conf = TestConfigurations.getDefaultConf(tablePath); + conf.setString(FlinkOptions.INDEX_TYPE, "BUCKET"); + conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, bucketNumber); + conf.set(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS, expression); + conf.set(FlinkOptions.BUCKET_INDEX_PARTITION_RULE, rule); + conf.setString(FlinkOptions.RECORD_KEY_FIELD, "uuid,name"); + conf.setString(FlinkOptions.KEYGEN_TYPE, "COMPLEX"); + + HoodieTableMetaClient metaClient = StreamerUtil.initTableIfNotExists(conf); + PartitionBucketIndexHashingConfig.saveHashingConfig(metaClient, expression, rule, bucketNumber, null); + TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, conf); + HoodieTableSource tableSource = createHoodieTableSource(conf); + tableSource.applyFilters(Collections.singletonList( + createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(), "id1"))); + + assertThat(BucketIdentifier.getBucketId(tableSource.getDataBucketHashing(), 4), is(PrimaryKeyPruners.BUCKET_ID_NO_PRUNING)); + List<StoragePathInfo> fileList = tableSource.getReadFiles(); + assertThat("Partial pk filtering does not prune any files", fileList.size(), + CoreMatchers.is(7)); + } + + /** + * test single primary keys filtering together with non-primary key predicate + * @throws Exception + */ + @Test + void testPartitionBucketPruningWithMixedFilter() throws Exception { + String tablePath = new Path(tempFile.getAbsolutePath(), "tbl1").toString(); + int bucketNumber = 10000; + String expression = "par1|par2|par3|par4,4"; + String rule = "regex"; + Configuration conf = TestConfigurations.getDefaultConf(tablePath); + conf.setString(FlinkOptions.INDEX_TYPE, "BUCKET"); + conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, bucketNumber); + conf.set(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS, expression); + conf.set(FlinkOptions.BUCKET_INDEX_PARTITION_RULE, rule); + + HoodieTableMetaClient metaClient = StreamerUtil.initTableIfNotExists(conf); + PartitionBucketIndexHashingConfig.saveHashingConfig(metaClient, expression, rule, bucketNumber, null); + TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, conf); + HoodieTableSource tableSource = createHoodieTableSource(conf); + tableSource.applyFilters(Arrays.asList( + createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(), "id1"), + createLitEquivalenceExpr("name", 1, DataTypes.STRING().notNull(), "Danny"))); + + assertThat(BucketIdentifier.getBucketId(tableSource.getDataBucketHashing(), 4), is(1)); + List<StoragePathInfo> fileList = tableSource.getReadFiles(); + assertThat("Files should be pruned by bucket id 1", fileList.size(), CoreMatchers.is(2)); + } + + private HoodieTableSource createHoodieTableSource(Configuration conf) { + return new HoodieTableSource( + SerializableSchema.create(TestConfigurations.TABLE_SCHEMA), + new StoragePath(conf.getString(FlinkOptions.PATH)), + Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")), + "default-par", + conf); + } + + private ResolvedExpression createLitEquivalenceExpr(String fieldName, int fieldIdx, DataType dataType, Object val) { + FieldReferenceExpression ref = new FieldReferenceExpression(fieldName, dataType, fieldIdx, fieldIdx); + ValueLiteralExpression literal = new ValueLiteralExpression(val, dataType); + return new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(ref, literal), + DataTypes.BOOLEAN()); + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala index f91fa8b0a42..a0cbd8dfb1d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala @@ -101,8 +101,8 @@ class BucketIndexSupport(spark: SparkSession, candidateFiles.toSet } - def filterQueriesWithBucketHashField(queryFilters: Seq[Expression]): Option[BitSet] = { - val bucketNumber = metadataConfig.getIntOrDefault(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS) + def filterQueriesWithBucketHashField(queryFilters: Seq[Expression], + bucketNumber: Int = metadataConfig.getIntOrDefault(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS)): Option[BitSet] = { if (indexBucketHashFieldsOpt.isEmpty || queryFilters.isEmpty) { None } else { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index a8c0dd87ec0..769f82090a2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieLogFile} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.StringUtils import org.apache.hudi.exception.HoodieException +import org.apache.hudi.index.bucket.partition.PartitionBucketIndexUtils import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} import org.apache.hudi.storage.{StoragePath, StoragePathInfo} import org.apache.hudi.util.JFunction @@ -103,6 +104,15 @@ case class HoodieFileIndex(spark: SparkSession, endCompletionTime = options.get(DataSourceReadOptions.END_COMMIT.key)) with FileIndex { @transient protected var hasPushedDownPartitionPredicates: Boolean = false + private val isPartitionSimpleBucketIndex = PartitionBucketIndexUtils.isPartitionSimpleBucketIndex(spark.sparkContext.hadoopConfiguration, + metaClient.getBasePath.toString) + + @transient private lazy val bucketIndexSupport = if (isPartitionSimpleBucketIndex) { + val specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant) + new PartitionBucketIndexSupport(spark, metadataConfig, metaClient, specifiedQueryInstant) + } else { + new BucketIndexSupport(spark, metadataConfig, metaClient) + } /** * NOTE: [[indicesSupport]] is a transient state, since it's only relevant while logical plan @@ -112,7 +122,7 @@ case class HoodieFileIndex(spark: SparkSession, */ @transient private lazy val indicesSupport: List[SparkBaseIndexSupport] = List( new RecordLevelIndexSupport(spark, metadataConfig, metaClient), - new BucketIndexSupport(spark, metadataConfig, metaClient), + bucketIndexSupport, new SecondaryIndexSupport(spark, metadataConfig, metaClient), new ExpressionIndexSupport(spark, schema, metadataConfig, metaClient), new BloomFiltersIndexSupport(spark, metadataConfig, metaClient), diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionBucketIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionBucketIndexSupport.scala new file mode 100644 index 00000000000..7913c31fbe0 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionBucketIndexSupport.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.model.{FileSlice, PartitionBucketIndexHashingConfig} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.index.bucket.partition.PartitionBucketIndexCalculator + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.Expression +import org.slf4j.LoggerFactory + +class PartitionBucketIndexSupport(spark: SparkSession, + metadataConfig: HoodieMetadataConfig, + metaClient: HoodieTableMetaClient, + specifiedQueryInstant: Option[String] = Option.empty) + extends BucketIndexSupport(spark, metadataConfig, metaClient) { + + private val log = LoggerFactory.getLogger(getClass) + private def initCalc(): Option[PartitionBucketIndexCalculator] = { + if (specifiedQueryInstant.isDefined) { + val hashingConfigOption = PartitionBucketIndexHashingConfig.loadingLatestHashingConfigBeforeOrOn(metaClient, specifiedQueryInstant.get) + if (hashingConfigOption.isPresent) { + Option.apply(PartitionBucketIndexCalculator.getInstance(hashingConfigOption.get().getExpressions, + hashingConfigOption.get().getRule, + hashingConfigOption.get().getDefaultBucketNumber)) + } + // could be empty, if user upgrade to partition level bucket index after specifiedQueryInstant + Option.empty + } else { + // specifiedQueryInstant is null or empty, so load latest hashing config directly + val hashingConfig = PartitionBucketIndexHashingConfig.loadingLatestHashingConfig(metaClient) + Option.apply(PartitionBucketIndexCalculator.getInstance(hashingConfig.getExpressions, + hashingConfig.getRule, hashingConfig.getDefaultBucketNumber)) + } + } + + + private val calc: Option[PartitionBucketIndexCalculator] = initCalc() + + override def computeCandidateFileNames(fileIndex: HoodieFileIndex, + queryFilters: Seq[Expression], + queryReferencedColumns: Seq[String], + prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])], + shouldPushDownFilesFilter: Boolean): Option[Set[String]] = { + if (calc.isEmpty) { + // fall back to BucketIndexSupport + super.computeCandidateFileNames(fileIndex, queryFilters, queryReferencedColumns, prunedPartitionsAndFileSlices, shouldPushDownFilesFilter) + } else { + Option.apply(prunedPartitionsAndFileSlices.flatMap(v => { + val partitionPathOption = v._1 + val fileSlices = v._2 + val numBuckets = calc.get.computeNumBuckets(partitionPathOption.getOrElse(new PartitionPath("", Array())).path) + val bucketIdsBitMapByFilter = filterQueriesWithBucketHashField(queryFilters, numBuckets) + if (bucketIdsBitMapByFilter.isDefined && bucketIdsBitMapByFilter.get.cardinality() > 0) { + val allFilesName = getPrunedPartitionsAndFileNames(fileIndex, Seq((partitionPathOption, fileSlices)))._2 + getCandidateFiles(allFilesName, bucketIdsBitMapByFilter.get) + } else { + Seq() + }}).toSet) + } + } +} + +object PartitionBucketIndexSupport { + val INDEX_NAME = "BUCKET" +} + diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionBucketIndexSupport.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionBucketIndexSupport.scala new file mode 100644 index 00000000000..029e887d601 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionBucketIndexSupport.scala @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.{BaseHoodieTableFileIndex, HoodieFileIndex, PartitionBucketIndexSupport} +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, PartitionBucketIndexHashingConfig} +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.config.HoodieIndexConfig +import org.apache.hudi.index.bucket.BucketIdentifier +import org.apache.hudi.keygen.NonpartitionedKeyGenerator +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.storage.{StoragePath, StoragePathInfo} + +import org.apache.avro.generic.GenericData +import org.apache.spark.sql.HoodieCatalystExpressionUtils +import org.apache.spark.sql.catalyst.encoders.DummyExpressionHolder +import org.junit.jupiter.api.{BeforeEach, Tag, Test} +import org.mockito.Mockito + +@Tag("functional") +class TestPartitionBucketIndexSupport extends TestBucketIndexSupport { + + private val DEFAULT_RULE = "regex" + private val EXPRESSION_BUCKET_NUMBER = 19 + private val DEFAULT_EXPRESSIONS = "\\d{4}\\-(06\\-(01|17|18)|11\\-(01|10|11))," + EXPRESSION_BUCKET_NUMBER + private val DEFAULT_BUCKET_NUMBER = 10 + private val DEFAULT_PARTITION_PATH = Array("2025-06-17", "2025-06-18") + private var fileIndex: HoodieFileIndex = null + @BeforeEach + override def setUp(): Unit = { + super.setUp() + PartitionBucketIndexHashingConfig.saveHashingConfig(metaClient, DEFAULT_EXPRESSIONS, DEFAULT_RULE, DEFAULT_BUCKET_NUMBER, null) + fileIndex = Mockito.mock(classOf[HoodieFileIndex]) + } + + @Test + override def testSingleHashFieldsExpression: Unit = { + val configProperties = new TypedProperties() + configProperties.setProperty(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key, "A") + configProperties.setProperty(HoodieTableConfig.RECORDKEY_FIELDS.key, "A") + configProperties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key, "A") + + configProperties.setProperty(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key, String.valueOf(DEFAULT_BUCKET_NUMBER)) + metaClient.getTableConfig.setValue(HoodieTableConfig.CREATE_SCHEMA.key(), avroSchemaStr) + val metadataConfig = HoodieMetadataConfig.newBuilder + .fromProperties(configProperties) + .enable(configProperties.getBoolean(HoodieMetadataConfig.ENABLE.key, true)).build() + val bucketIndexSupport: PartitionBucketIndexSupport = new PartitionBucketIndexSupport(spark, metadataConfig, metaClient) + + + // init + val testKeyGenerator = new NonpartitionedKeyGenerator(configProperties) + var record = new GenericData.Record(avroSchema) + record.put("A", "1") + val bucket1Id4 = BucketIdentifier.getBucketId(testKeyGenerator.getKey(record).getRecordKey, "A", EXPRESSION_BUCKET_NUMBER) + record = new GenericData.Record(avroSchema) + record.put("A", "2") + val bucket2Id5 = BucketIdentifier.getBucketId(testKeyGenerator.getKey(record).getRecordKey, "A", EXPRESSION_BUCKET_NUMBER) + record = new GenericData.Record(avroSchema) + record.put("A", "3") + val bucket3Id6 = BucketIdentifier.getBucketId(testKeyGenerator.getKey(record).getRecordKey, "A", EXPRESSION_BUCKET_NUMBER) + record = new GenericData.Record(avroSchema) + record.put("A", "4") + val bucket4Id7 = BucketIdentifier.getBucketId(testKeyGenerator.getKey(record).getRecordKey, "A", EXPRESSION_BUCKET_NUMBER) + record = new GenericData.Record(avroSchema) + record.put("A", "5") + val bucket5Id8 = BucketIdentifier.getBucketId(testKeyGenerator.getKey(record).getRecordKey, "A", EXPRESSION_BUCKET_NUMBER) + assert(bucket1Id4 == 4 && bucket2Id5 == 5 && bucket3Id6 == 6 && bucket4Id7 == 7 && bucket5Id8 == 8) + + // fileIdStr + val token = FSUtils.makeWriteToken(1, 0, 1) + val bucket1Id4FileName = FSUtils.makeBaseFileName("00000000000000000", token, BucketIdentifier.newBucketFileIdPrefix(bucket1Id4) + "-0", HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension) + val bucket2Id5FileName = FSUtils.makeBaseFileName("00000000000000000", token, BucketIdentifier.newBucketFileIdPrefix(bucket2Id5) + "-0", HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension) + val bucket3Id6FileName = FSUtils.makeBaseFileName("00000000000000000", token, BucketIdentifier.newBucketFileIdPrefix(bucket3Id6) + "-0", HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension) + val bucket4Id7FileName = FSUtils.makeBaseFileName("00000000000000000", token, BucketIdentifier.newBucketFileIdPrefix(bucket4Id7) + "-0", HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension) + val bucket5Id8FileName = FSUtils.makeBaseFileName("00000000000000000", token, BucketIdentifier.newBucketFileIdPrefix(bucket5Id8) + "-0", HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension) + + + val allFileNames = Set.apply(bucket1Id4FileName, bucket2Id5FileName, bucket3Id6FileName, bucket4Id7FileName, bucket5Id8FileName) + var equalTo = "A = 3" + exprFilePathAnswerCheck(bucketIndexSupport, equalTo, Set.apply(bucket3Id6FileName), allFileNames) + equalTo = "A = 3 And A = 4 and B = '6'" + exprFilePathAnswerCheck(bucketIndexSupport, equalTo, Set.empty, allFileNames) + equalTo = "A = 5 And B = 'abc'" + exprFilePathAnswerCheck(bucketIndexSupport, equalTo, Set.apply(bucket5Id8FileName), allFileNames) + equalTo = "A = C and A = 1" + exprFilePathAnswerCheck(bucketIndexSupport, equalTo, Set.apply(bucket1Id4FileName), allFileNames) + equalTo = "A = 5 Or A = 2" + exprFilePathAnswerCheck(bucketIndexSupport, equalTo, Set.apply(bucket5Id8FileName, bucket2Id5FileName), allFileNames) + equalTo = "A = 5 Or A = 2 Or A = 8" + exprFilePathAnswerCheck(bucketIndexSupport, equalTo, Set.apply(bucket5Id8FileName, bucket2Id5FileName), allFileNames) + equalTo = "A = 5 Or (A = 2 and B = 'abc')" + exprFilePathAnswerCheck(bucketIndexSupport, equalTo, Set.apply(bucket5Id8FileName, bucket2Id5FileName), allFileNames) + equalTo = "A = 5 And (A = 2 Or B = 'abc')" + exprFilePathAnswerCheck(bucketIndexSupport, equalTo, Set.apply(bucket5Id8FileName), allFileNames) + equalTo = "A = 5 And (A = 2 Or B = 'abc')" + exprFilePathAnswerCheck(bucketIndexSupport, equalTo, Set.apply(bucket5Id8FileName), allFileNames) + } + + @Test + override def testMultipleHashFieldsExpress(): Unit = { + val configProperties = new TypedProperties() + configProperties.setProperty(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key, "A,B") + configProperties.setProperty(HoodieTableConfig.RECORDKEY_FIELDS.key, "A,B") + configProperties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key, "A,B") + configProperties.setProperty(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key, String.valueOf(DEFAULT_BUCKET_NUMBER)) + metaClient.getTableConfig.setValue(HoodieTableConfig.CREATE_SCHEMA.key(), avroSchemaStr) + val metadataConfig = HoodieMetadataConfig.newBuilder + .fromProperties(configProperties) + .enable(configProperties.getBoolean(HoodieMetadataConfig.ENABLE.key, true)).build() + val bucketIndexSupport = new PartitionBucketIndexSupport(spark, metadataConfig, metaClient) + val keyGenerator = bucketIndexSupport.getKeyGenerator + assert(keyGenerator.isInstanceOf[NonpartitionedKeyGenerator]) + + // init + val testKeyGenerator = new NonpartitionedKeyGenerator(configProperties) + var record = new GenericData.Record(avroSchema) + record.put("A", "1") + record.put("B", "2") + val bucket1Id4 = BucketIdentifier.getBucketId(testKeyGenerator.getKey(record).getRecordKey, "A,B", EXPRESSION_BUCKET_NUMBER) + record = new GenericData.Record(avroSchema) + record.put("A", "2") + record.put("B", "3") + val bucket2Id5 = BucketIdentifier.getBucketId(testKeyGenerator.getKey(record).getRecordKey, "A,B", EXPRESSION_BUCKET_NUMBER) + record = new GenericData.Record(avroSchema) + record.put("A", "3") + record.put("B", "4") + val bucket3Id6 = BucketIdentifier.getBucketId(testKeyGenerator.getKey(record).getRecordKey, "A,B", EXPRESSION_BUCKET_NUMBER) + record = new GenericData.Record(avroSchema) + record.put("A", "4") + record.put("B", "5") + val bucket4Id7 = BucketIdentifier.getBucketId(testKeyGenerator.getKey(record).getRecordKey, "A,B", EXPRESSION_BUCKET_NUMBER) + record = new GenericData.Record(avroSchema) + record.put("A", "5") + record.put("B", "6") + val bucket5Id8 = BucketIdentifier.getBucketId(testKeyGenerator.getKey(record).getRecordKey, "A,B", EXPRESSION_BUCKET_NUMBER) + assert(bucket1Id4 == 3 && bucket2Id5 == 16 && bucket3Id6 == 10 && bucket4Id7 == 4 && bucket5Id8 == 17) + + // fileIdStr + val token = FSUtils.makeWriteToken(1, 0, 1) + val bucket1Id4FileName = FSUtils.makeBaseFileName("00000000000000000", token, BucketIdentifier.newBucketFileIdPrefix(bucket1Id4) + "-0", HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension) + val bucket2Id5FileName = FSUtils.makeBaseFileName("00000000000000000", token, BucketIdentifier.newBucketFileIdPrefix(bucket2Id5) + "-0", HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension) + val bucket3Id6FileName = FSUtils.makeBaseFileName("00000000000000000", token, BucketIdentifier.newBucketFileIdPrefix(bucket3Id6) + "-0", HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension) + val bucket4Id7FileName = FSUtils.makeBaseFileName("00000000000000000", token, BucketIdentifier.newBucketFileIdPrefix(bucket4Id7) + "-0", HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension) + val bucket5Id8FileName = FSUtils.makeBaseFileName("00000000000000000", token, BucketIdentifier.newBucketFileIdPrefix(bucket5Id8) + "-0", HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension) + + val allFileNames = Set.apply(bucket1Id4FileName, bucket2Id5FileName, bucket3Id6FileName, bucket4Id7FileName, bucket5Id8FileName) + + var equalTo = "A = 2 and B = '3'" + exprFilePathAnswerCheck(bucketIndexSupport, equalTo, Set.apply(bucket2Id5FileName), allFileNames) + + equalTo = "A = 4 and B = '5'" + exprFilePathAnswerCheck(bucketIndexSupport, equalTo, Set.apply(bucket4Id7FileName), allFileNames) + } + + def exprFilePathAnswerCheck(bucketIndexSupport: PartitionBucketIndexSupport, exprRaw: String, expectResult: Set[String], + allFileStatus: Set[String]): Unit = { + val resolveExpr = HoodieCatalystExpressionUtils.resolveExpr(spark, exprRaw, structSchema) + val optimizerPlan = spark.sessionState.optimizer.execute(DummyExpressionHolder(Seq(resolveExpr))) + val optimizerExpr = optimizerPlan.asInstanceOf[DummyExpressionHolder].exprs.head + + // split input files into different partitions + val partitionPath1 = DEFAULT_PARTITION_PATH(0) + val allFileSlices1: Seq[FileSlice] = allFileStatus.slice(0, 3).map(fileName => { + val slice = new FileSlice(partitionPath1, "00000000000000000", FSUtils.getFileId(fileName)) + slice.setBaseFile(new HoodieBaseFile(new StoragePathInfo(new StoragePath(fileName), 0L, false, 0, 0, 0))) + slice + }).toSeq + + val partitionPath2 = DEFAULT_PARTITION_PATH(1) + val allFileSlices2: Seq[FileSlice] = allFileStatus.slice(3, 5).map(fileName => { + val slice = new FileSlice(partitionPath1, "00000000000000000", FSUtils.getFileId(fileName)) + slice.setBaseFile(new HoodieBaseFile(new StoragePathInfo(new StoragePath(fileName), 0L, false, 0, 0, 0))) + slice + }).toSeq + + val input = Seq((Option.apply(new BaseHoodieTableFileIndex.PartitionPath(partitionPath1, Array())), allFileSlices1), + (Option.apply(new BaseHoodieTableFileIndex.PartitionPath(partitionPath2, Array())), allFileSlices2)) + val candidate = bucketIndexSupport.computeCandidateFileNames(fileIndex, splitConjunctivePredicates(optimizerExpr), + Seq(), input, false) + + assert(candidate.get.equals(expectResult)) + } +}