This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch HUDI-8990-Part2
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 2a2886f4340ef90adb653849a38307c5b7f9a5c9
Author: YueZhang <zhangyue921...@163.com>
AuthorDate: Mon Mar 31 21:29:02 2025 +0800

    Flink && Spark query adopt partition bucket index based buckid pruning
---
 .../apache/hudi/index/bucket/BucketIdentifier.java |  10 +-
 .../index/bucket/partition/NumBucketsFunction.java |  22 ++-
 .../model/PartitionBucketIndexHashingConfig.java   |  34 +++-
 .../java/org/apache/hudi/source/FileIndex.java     |  65 ++++---
 .../hudi/source/prune/PrimaryKeyPruners.java       |   4 +-
 .../org/apache/hudi/table/HoodieTableSource.java   |  26 ++-
 .../hudi/table/TestPartitionBucketPruning.java     | 196 ++++++++++++++++++++
 .../scala/org/apache/hudi/BucketIndexSupport.scala |   4 +-
 .../scala/org/apache/hudi/HoodieFileIndex.scala    |  12 +-
 .../apache/hudi/PartitionBucketIndexSupport.scala  |  85 +++++++++
 .../TestPartitionBucketIndexSupport.scala          | 201 +++++++++++++++++++++
 11 files changed, 613 insertions(+), 46 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
index 1f7b141061c..5c15a4c5683 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
@@ -38,7 +38,15 @@ public class BucketIdentifier implements Serializable {
   }
 
   public static int getBucketId(List<String> hashKeyFields, int numBuckets) {
-    return (hashKeyFields.hashCode() & Integer.MAX_VALUE) % numBuckets;
+    return getBucketId(getFieldsHashing(hashKeyFields), numBuckets);
+  }
+
+  public static int getBucketId(int fieldsHashing, int numBuckets) {
+    return fieldsHashing % numBuckets;
+  }
+
+  public static int getFieldsHashing(List<String> hashKeyFields) {
+    return (hashKeyFields.hashCode() & Integer.MAX_VALUE);
   }
 
   protected static List<String> getHashKeys(String recordKey, String 
indexKeyFields) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/partition/NumBucketsFunction.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/partition/NumBucketsFunction.java
index 0587304d760..c290fcf23b4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/partition/NumBucketsFunction.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/partition/NumBucketsFunction.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.index.bucket.partition;
 
+import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 
@@ -49,8 +51,6 @@ public class NumBucketsFunction implements Serializable {
    * Calculator for partition-specific bucket numbers.
    */
   private final PartitionBucketIndexCalculator calculator;
-  private final String expressions;
-  private final String ruleType;
 
   /**
    * Creates a NumBucketsFunction with the given configuration.
@@ -59,8 +59,6 @@ public class NumBucketsFunction implements Serializable {
    */
   public NumBucketsFunction(String expressions, String ruleType, int 
defaultBucketNumber) {
     this.defaultBucketNumber = defaultBucketNumber;
-    this.expressions = expressions;
-    this.ruleType = ruleType;
     this.isPartitionLevelBucketIndexEnabled = 
StringUtils.nonEmpty(expressions);
     if (isPartitionLevelBucketIndexEnabled) {
       this.calculator = PartitionBucketIndexCalculator.getInstance(
@@ -73,6 +71,12 @@ public class NumBucketsFunction implements Serializable {
     }
   }
 
+  public NumBucketsFunction(int defaultBucketNumber) {
+    this.defaultBucketNumber = defaultBucketNumber;
+    this.isPartitionLevelBucketIndexEnabled = false;
+    this.calculator = null;
+  }
+
   public static NumBucketsFunction fromWriteConfig(HoodieWriteConfig 
writeConfig) {
     String expression = writeConfig.getBucketIndexPartitionExpression();
     String ruleType = writeConfig.getBucketIndexPartitionRuleType();
@@ -80,6 +84,16 @@ public class NumBucketsFunction implements Serializable {
     return new NumBucketsFunction(expression, ruleType, numBuckets);
   }
 
+  public static NumBucketsFunction fromMetaClient(HoodieTableMetaClient 
metaClient, int defaultBucketNumber) {
+    if 
(PartitionBucketIndexUtils.isPartitionSimpleBucketIndex(metaClient.getStorageConf(),
 metaClient.getBasePath().toString())) {
+      PartitionBucketIndexHashingConfig hashingConfig = 
PartitionBucketIndexHashingConfig.loadingLatestHashingConfig(metaClient);
+      return new NumBucketsFunction(hashingConfig.getExpressions(), 
hashingConfig.getRule(), hashingConfig.getDefaultBucketNumber());
+    } else {
+      return new NumBucketsFunction(defaultBucketNumber);
+    }
+  }
+
+
   /**
    * Gets the number of buckets for the given partition path.
    *
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java
index c186e4f049e..9c3431050b3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java
@@ -196,24 +196,50 @@ public class PartitionBucketIndexHashingConfig implements 
Serializable {
 
   /**
    * Get Latest committed hashing config instant to load.
+   * If instant is empty, then return latest hashing config instant
    */
-  public static String 
getLatestHashingConfigInstantToLoad(HoodieTableMetaClient metaClient) {
+  public static Option<String> 
getHashingConfigInstantToLoad(HoodieTableMetaClient metaClient, Option<String> 
instant) {
     try {
       List<String> allCommittedHashingConfig = 
getCommittedHashingConfigInstants(metaClient);
-      return allCommittedHashingConfig.get(allCommittedHashingConfig.size() - 
1);
+      if (instant.isPresent()) {
+        Option<String> res = 
getHashingConfigInstantToLoadBeforeOrOn(allCommittedHashingConfig, 
instant.get());
+        // fall back to look up archived hashing config instant before return 
empty
+        return res.isPresent() ? res : 
getHashingConfigInstantToLoadBeforeOrOn(getArchiveHashingConfigInstants(metaClient),
 instant.get());
+      } else {
+        return 
Option.of(allCommittedHashingConfig.get(allCommittedHashingConfig.size() - 1));
+      }
     } catch (Exception e) {
       throw new HoodieException("Failed to get hashing config instant to 
load.", e);
     }
   }
 
+  private static Option<String> 
getHashingConfigInstantToLoadBeforeOrOn(List<String> hashingConfigInstants, 
String instant) {
+    List<String> res = 
hashingConfigInstants.stream().filter(hashingConfigInstant -> {
+      return hashingConfigInstant.compareTo(instant) <= 0;
+    }).collect(Collectors.toList());
+    return res.isEmpty() ? Option.empty() : Option.of(res.get(res.size() - 1));
+  }
+
   public static PartitionBucketIndexHashingConfig 
loadingLatestHashingConfig(HoodieTableMetaClient metaClient) {
-    String instantToLoad = getLatestHashingConfigInstantToLoad(metaClient);
-    Option<PartitionBucketIndexHashingConfig> latestHashingConfig = 
loadHashingConfig(metaClient.getStorage(), 
getHashingConfigPath(metaClient.getBasePath().toString(), instantToLoad));
+    Option<String> instantToLoad = getHashingConfigInstantToLoad(metaClient, 
Option.empty());
+    ValidationUtils.checkArgument(instantToLoad.isPresent(), "Can not load 
latest hashing config " + instantToLoad);
+    Option<PartitionBucketIndexHashingConfig> latestHashingConfig = 
loadHashingConfig(metaClient.getStorage(), 
getHashingConfigPath(metaClient.getBasePath().toString(), instantToLoad.get()));
     ValidationUtils.checkArgument(latestHashingConfig.isPresent(), "Can not 
load latest hashing config " + instantToLoad);
 
     return latestHashingConfig.get();
   }
 
+  public static Option<PartitionBucketIndexHashingConfig> 
loadingLatestHashingConfigBeforeOrOn(HoodieTableMetaClient metaClient, String 
instant) {
+    Option<String> hashingConfigInstantToLoad = 
getHashingConfigInstantToLoad(metaClient, Option.of(instant));
+    if (hashingConfigInstantToLoad.isPresent()) {
+      Option<PartitionBucketIndexHashingConfig> latestHashingConfig = 
loadHashingConfig(metaClient.getStorage(), 
getHashingConfigPath(metaClient.getBasePath().toString(), 
hashingConfigInstantToLoad.get()));
+      ValidationUtils.checkArgument(latestHashingConfig.isPresent(), "Can not 
load hashing config " + hashingConfigInstantToLoad + " based on " + instant);
+      return latestHashingConfig;
+    } else {
+      return Option.empty();
+    }
+  }
+
   /**
    * Archive hashing config.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
index a8ff13fe621..fdc629f9d56 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
@@ -25,6 +25,7 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.index.bucket.partition.NumBucketsFunction;
 import org.apache.hudi.source.prune.ColumnStatsProbe;
 import org.apache.hudi.source.prune.PartitionPruners;
 import org.apache.hudi.source.prune.PrimaryKeyPruners;
@@ -68,9 +69,10 @@ public class FileIndex implements Serializable {
   private final org.apache.hadoop.conf.Configuration hadoopConf;
   private final PartitionPruners.PartitionPruner partitionPruner; // for 
partition pruning
   private final ColumnStatsProbe colStatsProbe;                   // for 
probing column stats
-  private final int dataBucket;                                   // for 
bucket pruning
+  private final int dataBucketHashing;                                   // 
for bucket pruning
   private List<String> partitionPaths;                            // cache of 
partition paths
   private final FileStatsIndex fileStatsIndex;                    // for data 
skipping
+  private final NumBucketsFunction numBucketsFunction;
 
   private FileIndex(
       StoragePath path,
@@ -78,15 +80,19 @@ public class FileIndex implements Serializable {
       RowType rowType,
       ColumnStatsProbe colStatsProbe,
       PartitionPruners.PartitionPruner partitionPruner,
-      int dataBucket) {
+      int dataBucketHashing,
+      NumBucketsFunction numBucketsFunction) {
     this.path = path;
     this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
     this.tableExists = StreamerUtil.tableExists(path.toString(), hadoopConf);
     this.metadataConfig = StreamerUtil.metadataConfig(conf);
     this.colStatsProbe = 
isDataSkippingFeasible(conf.get(FlinkOptions.READ_DATA_SKIPPING_ENABLED)) ? 
colStatsProbe : null;
     this.partitionPruner = partitionPruner;
-    this.dataBucket = dataBucket;
+    this.dataBucketHashing = dataBucketHashing;
     this.fileStatsIndex = new FileStatsIndex(path.toString(), rowType, 
metadataConfig);
+    this.numBucketsFunction = numBucketsFunction == null
+        ? new 
NumBucketsFunction(conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS))
+        : numBucketsFunction;
   }
 
   /**
@@ -157,28 +163,35 @@ public class FileIndex implements Serializable {
     if (partitions.length < 1) {
       return Collections.emptyList();
     }
-    List<StoragePathInfo> allFiles = FSUtils.getFilesInPartitions(
-            new HoodieFlinkEngineContext(hadoopConf),
-            new HoodieHadoopStorage(path, 
HadoopFSUtils.getStorageConf(hadoopConf)), metadataConfig, path.toString(), 
partitions)
-        .values().stream()
-        .flatMap(Collection::stream)
-        .collect(Collectors.toList());
+    Map<String, List<StoragePathInfo>> partition2Files = 
FSUtils.getFilesInPartitions(
+        new HoodieFlinkEngineContext(hadoopConf),
+        new HoodieHadoopStorage(path, 
HadoopFSUtils.getStorageConf(hadoopConf)), metadataConfig, path.toString(), 
partitions);
+
+    List<StoragePathInfo> allFiles;
+    // bucket pruning
+    if (this.dataBucketHashing >= 0) {
+      allFiles = partition2Files.entrySet().stream().flatMap(entry -> {
+        String partitionPath = entry.getKey();
+        int numBuckets = numBucketsFunction.getNumBuckets(partitionPath);
+        int bucketId = BucketIdentifier.getBucketId(this.dataBucketHashing, 
numBuckets);
+        String bucketIdStr = BucketIdentifier.bucketIdStr(bucketId);
+        List<StoragePathInfo> innerAllFiles = entry.getValue();
+        return innerAllFiles.stream().filter(fileInfo -> 
fileInfo.getPath().getName().contains(bucketIdStr));
+      }).collect(Collectors.toList());
+    } else {
+      allFiles = FSUtils.getFilesInPartitions(
+              new HoodieFlinkEngineContext(hadoopConf),
+              new HoodieHadoopStorage(path, 
HadoopFSUtils.getStorageConf(hadoopConf)), metadataConfig, path.toString(), 
partitions)
+          .values().stream()
+          .flatMap(Collection::stream)
+          .collect(Collectors.toList());
+    }
 
     if (allFiles.isEmpty()) {
       // returns early for empty table.
       return allFiles;
     }
 
-    // bucket pruning
-    if (this.dataBucket >= 0) {
-      String bucketIdStr = BucketIdentifier.bucketIdStr(this.dataBucket);
-      List<StoragePathInfo> filesAfterBucketPruning = allFiles.stream()
-          .filter(fileInfo -> 
fileInfo.getPath().getName().contains(bucketIdStr))
-          .collect(Collectors.toList());
-      logPruningMsg(allFiles.size(), filesAfterBucketPruning.size(), "bucket 
pruning");
-      allFiles = filesAfterBucketPruning;
-    }
-
     // data skipping
     Set<String> candidateFiles = fileStatsIndex.computeCandidateFiles(
         colStatsProbe, allFiles.stream().map(f -> 
f.getPath().getName()).collect(Collectors.toList()));
@@ -286,7 +299,8 @@ public class FileIndex implements Serializable {
     private RowType rowType;
     private ColumnStatsProbe columnStatsProbe;
     private PartitionPruners.PartitionPruner partitionPruner;
-    private int dataBucket = PrimaryKeyPruners.BUCKET_ID_NO_PRUNING;
+    private int dataBucketHashing = PrimaryKeyPruners.BUCKET_ID_NO_PRUNING;
+    private NumBucketsFunction numBucketFunction;
 
     private Builder() {
     }
@@ -316,14 +330,19 @@ public class FileIndex implements Serializable {
       return this;
     }
 
-    public Builder dataBucket(int dataBucket) {
-      this.dataBucket = dataBucket;
+    public Builder dataBucketHashing(int dataBucketHashing) {
+      this.dataBucketHashing = dataBucketHashing;
+      return this;
+    }
+
+    public Builder numBucketFunction(NumBucketsFunction function) {
+      this.numBucketFunction = function;
       return this;
     }
 
     public FileIndex build() {
       return new FileIndex(Objects.requireNonNull(path), 
Objects.requireNonNull(conf), Objects.requireNonNull(rowType),
-          columnStatsProbe, partitionPruner, dataBucket);
+          columnStatsProbe, partitionPruner, dataBucketHashing, 
numBucketFunction);
     }
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java
index 05b69bdcf76..98cf6a1bede 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java
@@ -45,7 +45,7 @@ public class PrimaryKeyPruners {
 
   public static final int BUCKET_ID_NO_PRUNING = -1;
 
-  public static int getBucketId(List<ResolvedExpression> hashKeyFilters, 
Configuration conf) {
+  public static int getBucketFieldHashing(List<ResolvedExpression> 
hashKeyFilters, Configuration conf) {
     List<String> pkFields = 
Arrays.asList(conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
     // step1: resolve the hash key values
     final boolean logicalTimestamp = 
OptionsResolver.isConsistentLogicalTimestampEnabled(conf);
@@ -60,7 +60,7 @@ public class PrimaryKeyPruners {
         .map(Pair::getValue)
         .collect(Collectors.toList());
     // step2: generate bucket id
-    return BucketIdentifier.getBucketId(values, 
conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS));
+    return BucketIdentifier.getFieldsHashing(values);
   }
 
   private static Pair<FieldReferenceExpression, ValueLiteralExpression> 
castChildAs(List<Expression> children) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 5c9cde00ba9..c2c9f10ac5a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -38,6 +38,8 @@ import org.apache.hudi.configuration.OptionsInference;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.index.bucket.partition.NumBucketsFunction;
 import org.apache.hudi.sink.utils.Pipelines;
 import org.apache.hudi.source.ExpressionEvaluators;
 import org.apache.hudi.source.ExpressionPredicates;
@@ -156,7 +158,7 @@ public class HoodieTableSource implements
   private List<Predicate> predicates;
   private ColumnStatsProbe columnStatsProbe;
   private PartitionPruners.PartitionPruner partitionPruner;
-  private int dataBucket;
+  private int dataBucketHashing;
   private transient FileIndex fileIndex;
 
   public HoodieTableSource(
@@ -177,7 +179,7 @@ public class HoodieTableSource implements
       @Nullable List<Predicate> predicates,
       @Nullable ColumnStatsProbe columnStatsProbe,
       @Nullable PartitionPruners.PartitionPruner partitionPruner,
-      int dataBucket,
+      int dataBucketHashing,
       @Nullable int[] requiredPos,
       @Nullable Long limit,
       @Nullable HoodieTableMetaClient metaClient,
@@ -191,7 +193,7 @@ public class HoodieTableSource implements
     this.predicates = 
Optional.ofNullable(predicates).orElse(Collections.emptyList());
     this.columnStatsProbe = columnStatsProbe;
     this.partitionPruner = partitionPruner;
-    this.dataBucket = dataBucket;
+    this.dataBucketHashing = dataBucketHashing;
     this.requiredPos = Optional.ofNullable(requiredPos).orElseGet(() -> 
IntStream.range(0, this.tableRowType.getFieldCount()).toArray());
     this.limit = Optional.ofNullable(limit).orElse(NO_LIMIT_CONSTANT);
     this.hadoopConf = new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf));
@@ -266,7 +268,7 @@ public class HoodieTableSource implements
   @Override
   public DynamicTableSource copy() {
     return new HoodieTableSource(schema, path, partitionKeys, defaultPartName,
-        conf, predicates, columnStatsProbe, partitionPruner, dataBucket, 
requiredPos, limit, metaClient, internalSchemaManager);
+        conf, predicates, columnStatsProbe, partitionPruner, 
dataBucketHashing, requiredPos, limit, metaClient, internalSchemaManager);
   }
 
   @Override
@@ -281,7 +283,7 @@ public class HoodieTableSource implements
     this.predicates = ExpressionPredicates.fromExpression(splitFilters.f0);
     this.columnStatsProbe = ColumnStatsProbe.newInstance(splitFilters.f0);
     this.partitionPruner = createPartitionPruner(splitFilters.f1, 
columnStatsProbe);
-    this.dataBucket = getDataBucket(splitFilters.f0);
+    this.dataBucketHashing = getDataBucketFieldHashing(splitFilters.f0);
     // refuse all the filters now
     return SupportsFilterPushDown.Result.of(new ArrayList<>(splitFilters.f1), 
new ArrayList<>(filters));
   }
@@ -365,7 +367,7 @@ public class HoodieTableSource implements
         .build();
   }
 
-  private int getDataBucket(List<ResolvedExpression> dataFilters) {
+  private int getDataBucketFieldHashing(List<ResolvedExpression> dataFilters) {
     if (!OptionsResolver.isBucketIndexType(conf) || dataFilters.isEmpty()) {
       return PrimaryKeyPruners.BUCKET_ID_NO_PRUNING;
     }
@@ -374,7 +376,7 @@ public class HoodieTableSource implements
     if (!ExpressionUtils.isFilteringByAllFields(indexKeyFilters, 
indexKeyFields)) {
       return PrimaryKeyPruners.BUCKET_ID_NO_PRUNING;
     }
-    return PrimaryKeyPruners.getBucketId(indexKeyFilters, conf);
+    return PrimaryKeyPruners.getBucketFieldHashing(indexKeyFilters, conf);
   }
 
   private List<MergeOnReadInputSplit> buildInputSplits() {
@@ -611,7 +613,8 @@ public class HoodieTableSource implements
           .rowType(this.tableRowType)
           .columnStatsProbe(this.columnStatsProbe)
           .partitionPruner(this.partitionPruner)
-          .dataBucket(this.dataBucket)
+          .dataBucketHashing(this.dataBucketHashing)
+          .numBucketFunction(NumBucketsFunction.fromMetaClient(metaClient, 
conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS)))
           .build();
     }
     return this.fileIndex;
@@ -696,6 +699,11 @@ public class HoodieTableSource implements
 
   @VisibleForTesting
   public int getDataBucket() {
-    return dataBucket;
+    return BucketIdentifier.getBucketId(this.dataBucketHashing, 
conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS));
+  }
+
+  @VisibleForTesting
+  public int getDataBucketHashing() {
+    return dataBucketHashing;
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestPartitionBucketPruning.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestPartitionBucketPruning.java
new file mode 100644
index 00000000000..b2f3f50a861
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestPartitionBucketPruning.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.source.prune.PrimaryKeyPruners;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.util.SerializableSchema;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.DataType;
+import org.apache.hadoop.fs.Path;
+import org.hamcrest.CoreMatchers;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+public class TestPartitionBucketPruning {
+
+  @TempDir
+  File tempFile;
+
+  /**
+   * test single primary key filtering
+   * @throws Exception
+   */
+  @Test
+  void testPartitionBucketPruningWithSinglePK() throws Exception {
+    String tablePath1 = new Path(tempFile.getAbsolutePath(), 
"tbl1").toString();
+    int bucketNumber = 10000;
+    String expression = "par1|par2|par3|par4,4";
+    String rule = "regex";
+    Configuration conf1 = TestConfigurations.getDefaultConf(tablePath1);
+    conf1.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
+    conf1.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, bucketNumber);
+    conf1.set(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS, expression);
+    conf1.set(FlinkOptions.BUCKET_INDEX_PARTITION_RULE, rule);
+
+    HoodieTableMetaClient metaClient = 
StreamerUtil.initTableIfNotExists(conf1);
+    PartitionBucketIndexHashingConfig.saveHashingConfig(metaClient, 
expression, rule, bucketNumber, null);
+
+    // test single primary key filtering
+    TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, conf1);
+    HoodieTableSource tableSource1 = createHoodieTableSource(conf1);
+    tableSource1.applyFilters(Collections.singletonList(
+        createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(), 
"id1")));
+
+    
assertThat(BucketIdentifier.getBucketId(tableSource1.getDataBucketHashing(), 
4), is(1));
+    List<StoragePathInfo> fileList = tableSource1.getReadFiles();
+    assertThat("Files should be pruned by bucket id 1", fileList.size(), 
CoreMatchers.is(2));
+  }
+
+  /**
+   * test multiple primary keys filtering
+   * @throws Exception
+   */
+  @Test
+  void testPartitionBucketPruningWithMultiPK() throws Exception {
+    String tablePath = new Path(tempFile.getAbsolutePath(), "tbl1").toString();
+    int bucketNumber = 10000;
+    String expression = "par1|par2|par3|par4,4";
+    String rule = "regex";
+    Configuration conf = TestConfigurations.getDefaultConf(tablePath);
+    conf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
+    conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, bucketNumber);
+    conf.set(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS, expression);
+    conf.set(FlinkOptions.BUCKET_INDEX_PARTITION_RULE, rule);
+    conf.setString(FlinkOptions.RECORD_KEY_FIELD, "uuid,name");
+    conf.setString(FlinkOptions.KEYGEN_TYPE, "COMPLEX");
+
+    HoodieTableMetaClient metaClient = StreamerUtil.initTableIfNotExists(conf);
+    PartitionBucketIndexHashingConfig.saveHashingConfig(metaClient, 
expression, rule, bucketNumber, null);
+    TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, conf);
+    HoodieTableSource tableSource = createHoodieTableSource(conf);
+    tableSource.applyFilters(Arrays.asList(
+        createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(), 
"id1"),
+        createLitEquivalenceExpr("name", 1, DataTypes.STRING().notNull(), 
"Danny")));
+    
assertThat(BucketIdentifier.getBucketId(tableSource.getDataBucketHashing(), 4), 
is(3));
+    List<StoragePathInfo> fileList = tableSource.getReadFiles();
+    assertThat("Files should be pruned by bucket id 3", fileList.size(), 
CoreMatchers.is(3));
+  }
+
+  /**
+   * test partial primary keys filtering
+   * @throws Exception
+   */
+  @Test
+  void testPartialPartitionBucketPruningWithMultiPK() throws Exception {
+    String tablePath = new Path(tempFile.getAbsolutePath(), "tbl1").toString();
+    int bucketNumber = 10000;
+    String expression = "par1|par2|par3|par4,4";
+    String rule = "regex";
+    Configuration conf = TestConfigurations.getDefaultConf(tablePath);
+    conf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
+    conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, bucketNumber);
+    conf.set(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS, expression);
+    conf.set(FlinkOptions.BUCKET_INDEX_PARTITION_RULE, rule);
+    conf.setString(FlinkOptions.RECORD_KEY_FIELD, "uuid,name");
+    conf.setString(FlinkOptions.KEYGEN_TYPE, "COMPLEX");
+
+    HoodieTableMetaClient metaClient = StreamerUtil.initTableIfNotExists(conf);
+    PartitionBucketIndexHashingConfig.saveHashingConfig(metaClient, 
expression, rule, bucketNumber, null);
+    TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, conf);
+    HoodieTableSource tableSource = createHoodieTableSource(conf);
+    tableSource.applyFilters(Collections.singletonList(
+        createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(), 
"id1")));
+
+    
assertThat(BucketIdentifier.getBucketId(tableSource.getDataBucketHashing(), 4), 
is(PrimaryKeyPruners.BUCKET_ID_NO_PRUNING));
+    List<StoragePathInfo> fileList = tableSource.getReadFiles();
+    assertThat("Partial pk filtering does not prune any files", 
fileList.size(),
+        CoreMatchers.is(7));
+  }
+
+  /**
+   * test single primary keys filtering together with non-primary key predicate
+   * @throws Exception
+   */
+  @Test
+  void testPartitionBucketPruningWithMixedFilter() throws Exception {
+    String tablePath = new Path(tempFile.getAbsolutePath(), "tbl1").toString();
+    int bucketNumber = 10000;
+    String expression = "par1|par2|par3|par4,4";
+    String rule = "regex";
+    Configuration conf = TestConfigurations.getDefaultConf(tablePath);
+    conf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
+    conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, bucketNumber);
+    conf.set(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS, expression);
+    conf.set(FlinkOptions.BUCKET_INDEX_PARTITION_RULE, rule);
+
+    HoodieTableMetaClient metaClient = StreamerUtil.initTableIfNotExists(conf);
+    PartitionBucketIndexHashingConfig.saveHashingConfig(metaClient, 
expression, rule, bucketNumber, null);
+    TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, conf);
+    HoodieTableSource tableSource = createHoodieTableSource(conf);
+    tableSource.applyFilters(Arrays.asList(
+        createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(), 
"id1"),
+        createLitEquivalenceExpr("name", 1, DataTypes.STRING().notNull(), 
"Danny")));
+
+    
assertThat(BucketIdentifier.getBucketId(tableSource.getDataBucketHashing(), 4), 
is(1));
+    List<StoragePathInfo> fileList = tableSource.getReadFiles();
+    assertThat("Files should be pruned by bucket id 1", fileList.size(), 
CoreMatchers.is(2));
+  }
+
+  private HoodieTableSource createHoodieTableSource(Configuration conf) {
+    return new HoodieTableSource(
+        SerializableSchema.create(TestConfigurations.TABLE_SCHEMA),
+        new StoragePath(conf.getString(FlinkOptions.PATH)),
+        
Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")),
+        "default-par",
+        conf);
+  }
+
+  private ResolvedExpression createLitEquivalenceExpr(String fieldName, int 
fieldIdx, DataType dataType, Object val) {
+    FieldReferenceExpression ref = new FieldReferenceExpression(fieldName, 
dataType, fieldIdx, fieldIdx);
+    ValueLiteralExpression literal = new ValueLiteralExpression(val, dataType);
+    return new CallExpression(
+        BuiltInFunctionDefinitions.EQUALS,
+        Arrays.asList(ref, literal),
+        DataTypes.BOOLEAN());
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
index f91fa8b0a42..a0cbd8dfb1d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
@@ -101,8 +101,8 @@ class BucketIndexSupport(spark: SparkSession,
     candidateFiles.toSet
   }
 
-  def filterQueriesWithBucketHashField(queryFilters: Seq[Expression]): 
Option[BitSet] = {
-    val bucketNumber = 
metadataConfig.getIntOrDefault(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS)
+  def filterQueriesWithBucketHashField(queryFilters: Seq[Expression],
+                                       bucketNumber: Int = 
metadataConfig.getIntOrDefault(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS)): 
Option[BitSet] = {
     if (indexBucketHashFieldsOpt.isEmpty || queryFilters.isEmpty) {
       None
     } else {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index a8c0dd87ec0..769f82090a2 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.{FileSlice, 
HoodieBaseFile, HoodieLogFile}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.index.bucket.partition.PartitionBucketIndexUtils
 import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, 
TimestampBasedKeyGenerator}
 import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
 import org.apache.hudi.util.JFunction
@@ -103,6 +104,15 @@ case class HoodieFileIndex(spark: SparkSession,
     endCompletionTime = options.get(DataSourceReadOptions.END_COMMIT.key)) 
with FileIndex {
 
   @transient protected var hasPushedDownPartitionPredicates: Boolean = false
+  private val isPartitionSimpleBucketIndex = 
PartitionBucketIndexUtils.isPartitionSimpleBucketIndex(spark.sparkContext.hadoopConfiguration,
+    metaClient.getBasePath.toString)
+
+  @transient private lazy val bucketIndexSupport = if 
(isPartitionSimpleBucketIndex) {
+    val specifiedQueryInstant = 
options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant)
+    new PartitionBucketIndexSupport(spark, metadataConfig, metaClient, 
specifiedQueryInstant)
+  } else {
+    new BucketIndexSupport(spark, metadataConfig, metaClient)
+  }
 
   /**
    * NOTE: [[indicesSupport]] is a transient state, since it's only relevant 
while logical plan
@@ -112,7 +122,7 @@ case class HoodieFileIndex(spark: SparkSession,
    */
   @transient private lazy val indicesSupport: List[SparkBaseIndexSupport] = 
List(
     new RecordLevelIndexSupport(spark, metadataConfig, metaClient),
-    new BucketIndexSupport(spark, metadataConfig, metaClient),
+    bucketIndexSupport,
     new SecondaryIndexSupport(spark, metadataConfig, metaClient),
     new ExpressionIndexSupport(spark, schema, metadataConfig, metaClient),
     new BloomFiltersIndexSupport(spark, metadataConfig, metaClient),
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionBucketIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionBucketIndexSupport.scala
new file mode 100644
index 00000000000..7913c31fbe0
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionBucketIndexSupport.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.{FileSlice, 
PartitionBucketIndexHashingConfig}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.index.bucket.partition.PartitionBucketIndexCalculator
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.slf4j.LoggerFactory
+
+class PartitionBucketIndexSupport(spark: SparkSession,
+                         metadataConfig: HoodieMetadataConfig,
+                         metaClient: HoodieTableMetaClient,
+                         specifiedQueryInstant: Option[String] = Option.empty)
+  extends BucketIndexSupport(spark, metadataConfig, metaClient) {
+
+  private val log = LoggerFactory.getLogger(getClass)
+  private def initCalc(): Option[PartitionBucketIndexCalculator] = {
+    if (specifiedQueryInstant.isDefined) {
+      val hashingConfigOption = 
PartitionBucketIndexHashingConfig.loadingLatestHashingConfigBeforeOrOn(metaClient,
 specifiedQueryInstant.get)
+      if (hashingConfigOption.isPresent) {
+        
Option.apply(PartitionBucketIndexCalculator.getInstance(hashingConfigOption.get().getExpressions,
+          hashingConfigOption.get().getRule,
+          hashingConfigOption.get().getDefaultBucketNumber))
+      }
+      // could be empty, if user upgrade to partition level bucket index after 
specifiedQueryInstant
+      Option.empty
+    } else {
+      // specifiedQueryInstant is null or empty, so load latest hashing config 
directly
+      val hashingConfig = 
PartitionBucketIndexHashingConfig.loadingLatestHashingConfig(metaClient)
+      
Option.apply(PartitionBucketIndexCalculator.getInstance(hashingConfig.getExpressions,
+        hashingConfig.getRule, hashingConfig.getDefaultBucketNumber))
+    }
+  }
+
+
+  private val calc: Option[PartitionBucketIndexCalculator] = initCalc()
+
+  override def computeCandidateFileNames(fileIndex: HoodieFileIndex,
+                                         queryFilters: Seq[Expression],
+                                         queryReferencedColumns: Seq[String],
+                                         prunedPartitionsAndFileSlices: 
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
+                                         shouldPushDownFilesFilter: Boolean): 
Option[Set[String]] = {
+    if (calc.isEmpty) {
+      // fall back to BucketIndexSupport
+      super.computeCandidateFileNames(fileIndex, queryFilters, 
queryReferencedColumns, prunedPartitionsAndFileSlices, 
shouldPushDownFilesFilter)
+    } else {
+      Option.apply(prunedPartitionsAndFileSlices.flatMap(v => {
+        val partitionPathOption = v._1
+        val fileSlices = v._2
+        val numBuckets = 
calc.get.computeNumBuckets(partitionPathOption.getOrElse(new PartitionPath("", 
Array())).path)
+        val bucketIdsBitMapByFilter = 
filterQueriesWithBucketHashField(queryFilters, numBuckets)
+        if (bucketIdsBitMapByFilter.isDefined && 
bucketIdsBitMapByFilter.get.cardinality() > 0) {
+          val allFilesName = getPrunedPartitionsAndFileNames(fileIndex, 
Seq((partitionPathOption, fileSlices)))._2
+          getCandidateFiles(allFilesName, bucketIdsBitMapByFilter.get)
+        } else {
+          Seq()
+        }}).toSet)
+    }
+  }
+}
+
+object PartitionBucketIndexSupport {
+  val INDEX_NAME = "BUCKET"
+}
+
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionBucketIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionBucketIndexSupport.scala
new file mode 100644
index 00000000000..029e887d601
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionBucketIndexSupport.scala
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.{BaseHoodieTableFileIndex, HoodieFileIndex, 
PartitionBucketIndexSupport}
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, 
PartitionBucketIndexHashingConfig}
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.config.HoodieIndexConfig
+import org.apache.hudi.index.bucket.BucketIdentifier
+import org.apache.hudi.keygen.NonpartitionedKeyGenerator
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
+
+import org.apache.avro.generic.GenericData
+import org.apache.spark.sql.HoodieCatalystExpressionUtils
+import org.apache.spark.sql.catalyst.encoders.DummyExpressionHolder
+import org.junit.jupiter.api.{BeforeEach, Tag, Test}
+import org.mockito.Mockito
+
+@Tag("functional")
+class TestPartitionBucketIndexSupport extends TestBucketIndexSupport {
+
+  private val DEFAULT_RULE = "regex"
+  private val EXPRESSION_BUCKET_NUMBER = 19
+  private val DEFAULT_EXPRESSIONS = 
"\\d{4}\\-(06\\-(01|17|18)|11\\-(01|10|11))," + EXPRESSION_BUCKET_NUMBER
+  private val DEFAULT_BUCKET_NUMBER = 10
+  private val DEFAULT_PARTITION_PATH = Array("2025-06-17", "2025-06-18")
+  private var fileIndex: HoodieFileIndex = null
+  @BeforeEach
+  override def setUp(): Unit = {
+    super.setUp()
+    PartitionBucketIndexHashingConfig.saveHashingConfig(metaClient, 
DEFAULT_EXPRESSIONS, DEFAULT_RULE, DEFAULT_BUCKET_NUMBER, null)
+    fileIndex = Mockito.mock(classOf[HoodieFileIndex])
+  }
+
+  @Test
+  override def testSingleHashFieldsExpression: Unit = {
+    val configProperties = new TypedProperties()
+    
configProperties.setProperty(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key, "A")
+    configProperties.setProperty(HoodieTableConfig.RECORDKEY_FIELDS.key, "A")
+    configProperties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key, 
"A")
+
+    
configProperties.setProperty(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key, 
String.valueOf(DEFAULT_BUCKET_NUMBER))
+    metaClient.getTableConfig.setValue(HoodieTableConfig.CREATE_SCHEMA.key(), 
avroSchemaStr)
+    val metadataConfig = HoodieMetadataConfig.newBuilder
+      .fromProperties(configProperties)
+      .enable(configProperties.getBoolean(HoodieMetadataConfig.ENABLE.key, 
true)).build()
+    val bucketIndexSupport: PartitionBucketIndexSupport = new 
PartitionBucketIndexSupport(spark, metadataConfig, metaClient)
+
+
+    // init
+    val testKeyGenerator = new NonpartitionedKeyGenerator(configProperties)
+    var record = new GenericData.Record(avroSchema)
+    record.put("A", "1")
+    val bucket1Id4 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record).getRecordKey, "A", 
EXPRESSION_BUCKET_NUMBER)
+    record = new GenericData.Record(avroSchema)
+    record.put("A", "2")
+    val bucket2Id5 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record).getRecordKey, "A", 
EXPRESSION_BUCKET_NUMBER)
+    record = new GenericData.Record(avroSchema)
+    record.put("A", "3")
+    val bucket3Id6 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record).getRecordKey, "A", 
EXPRESSION_BUCKET_NUMBER)
+    record = new GenericData.Record(avroSchema)
+    record.put("A", "4")
+    val bucket4Id7 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record).getRecordKey, "A", 
EXPRESSION_BUCKET_NUMBER)
+    record = new GenericData.Record(avroSchema)
+    record.put("A", "5")
+    val bucket5Id8 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record).getRecordKey, "A", 
EXPRESSION_BUCKET_NUMBER)
+    assert(bucket1Id4 == 4 && bucket2Id5 == 5 && bucket3Id6 == 6 && bucket4Id7 
== 7 && bucket5Id8 == 8)
+
+    // fileIdStr
+    val token = FSUtils.makeWriteToken(1, 0, 1)
+    val bucket1Id4FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket1Id4) + "-0", 
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+    val bucket2Id5FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket2Id5) + "-0", 
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+    val bucket3Id6FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket3Id6) + "-0", 
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+    val bucket4Id7FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket4Id7) + "-0", 
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+    val bucket5Id8FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket5Id8) + "-0", 
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+
+
+    val allFileNames = Set.apply(bucket1Id4FileName, bucket2Id5FileName, 
bucket3Id6FileName, bucket4Id7FileName, bucket5Id8FileName)
+    var equalTo = "A = 3"
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, 
Set.apply(bucket3Id6FileName), allFileNames)
+    equalTo = "A = 3 And A = 4 and B = '6'"
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, Set.empty, 
allFileNames)
+    equalTo = "A = 5 And B = 'abc'"
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, 
Set.apply(bucket5Id8FileName), allFileNames)
+    equalTo = "A = C and A = 1"
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, 
Set.apply(bucket1Id4FileName), allFileNames)
+    equalTo = "A = 5 Or A = 2"
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, 
Set.apply(bucket5Id8FileName, bucket2Id5FileName), allFileNames)
+    equalTo = "A = 5 Or A = 2 Or A = 8"
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, 
Set.apply(bucket5Id8FileName, bucket2Id5FileName), allFileNames)
+    equalTo = "A = 5 Or (A = 2 and B = 'abc')"
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, 
Set.apply(bucket5Id8FileName, bucket2Id5FileName), allFileNames)
+    equalTo = "A = 5 And (A = 2 Or B = 'abc')"
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, 
Set.apply(bucket5Id8FileName), allFileNames)
+    equalTo = "A = 5 And (A = 2 Or B = 'abc')"
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, 
Set.apply(bucket5Id8FileName), allFileNames)
+  }
+
+  @Test
+  override def testMultipleHashFieldsExpress(): Unit = {
+    val configProperties = new TypedProperties()
+    
configProperties.setProperty(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key, 
"A,B")
+    configProperties.setProperty(HoodieTableConfig.RECORDKEY_FIELDS.key, "A,B")
+    configProperties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key, 
"A,B")
+    
configProperties.setProperty(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key, 
String.valueOf(DEFAULT_BUCKET_NUMBER))
+    metaClient.getTableConfig.setValue(HoodieTableConfig.CREATE_SCHEMA.key(), 
avroSchemaStr)
+    val metadataConfig = HoodieMetadataConfig.newBuilder
+      .fromProperties(configProperties)
+      .enable(configProperties.getBoolean(HoodieMetadataConfig.ENABLE.key, 
true)).build()
+    val bucketIndexSupport = new PartitionBucketIndexSupport(spark, 
metadataConfig, metaClient)
+    val keyGenerator = bucketIndexSupport.getKeyGenerator
+    assert(keyGenerator.isInstanceOf[NonpartitionedKeyGenerator])
+
+    // init
+    val testKeyGenerator = new NonpartitionedKeyGenerator(configProperties)
+    var record = new GenericData.Record(avroSchema)
+    record.put("A", "1")
+    record.put("B", "2")
+    val bucket1Id4 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record).getRecordKey, 
"A,B", EXPRESSION_BUCKET_NUMBER)
+    record = new GenericData.Record(avroSchema)
+    record.put("A", "2")
+    record.put("B", "3")
+    val bucket2Id5 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record).getRecordKey, 
"A,B", EXPRESSION_BUCKET_NUMBER)
+    record = new GenericData.Record(avroSchema)
+    record.put("A", "3")
+    record.put("B", "4")
+    val bucket3Id6 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record).getRecordKey, 
"A,B", EXPRESSION_BUCKET_NUMBER)
+    record = new GenericData.Record(avroSchema)
+    record.put("A", "4")
+    record.put("B", "5")
+    val bucket4Id7 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record).getRecordKey, 
"A,B", EXPRESSION_BUCKET_NUMBER)
+    record = new GenericData.Record(avroSchema)
+    record.put("A", "5")
+    record.put("B", "6")
+    val bucket5Id8 = 
BucketIdentifier.getBucketId(testKeyGenerator.getKey(record).getRecordKey, 
"A,B", EXPRESSION_BUCKET_NUMBER)
+    assert(bucket1Id4 == 3 && bucket2Id5 == 16 && bucket3Id6 == 10 && 
bucket4Id7 == 4 && bucket5Id8 == 17)
+
+    // fileIdStr
+    val token = FSUtils.makeWriteToken(1, 0, 1)
+    val bucket1Id4FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket1Id4) + "-0", 
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+    val bucket2Id5FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket2Id5) + "-0", 
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+    val bucket3Id6FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket3Id6) + "-0", 
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+    val bucket4Id7FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket4Id7) + "-0", 
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+    val bucket5Id8FileName = FSUtils.makeBaseFileName("00000000000000000", 
token, BucketIdentifier.newBucketFileIdPrefix(bucket5Id8) + "-0", 
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue.getFileExtension)
+
+    val allFileNames = Set.apply(bucket1Id4FileName, bucket2Id5FileName, 
bucket3Id6FileName, bucket4Id7FileName, bucket5Id8FileName)
+
+    var equalTo = "A = 2 and B = '3'"
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, 
Set.apply(bucket2Id5FileName), allFileNames)
+
+    equalTo = "A = 4 and B = '5'"
+    exprFilePathAnswerCheck(bucketIndexSupport, equalTo, 
Set.apply(bucket4Id7FileName), allFileNames)
+  }
+
+  def exprFilePathAnswerCheck(bucketIndexSupport: PartitionBucketIndexSupport, 
exprRaw: String, expectResult: Set[String],
+                              allFileStatus: Set[String]): Unit = {
+    val resolveExpr = HoodieCatalystExpressionUtils.resolveExpr(spark, 
exprRaw, structSchema)
+    val optimizerPlan = 
spark.sessionState.optimizer.execute(DummyExpressionHolder(Seq(resolveExpr)))
+    val optimizerExpr = 
optimizerPlan.asInstanceOf[DummyExpressionHolder].exprs.head
+
+    // split input files into different partitions
+    val partitionPath1 = DEFAULT_PARTITION_PATH(0)
+    val allFileSlices1: Seq[FileSlice] = allFileStatus.slice(0, 
3).map(fileName => {
+      val slice = new FileSlice(partitionPath1, "00000000000000000", 
FSUtils.getFileId(fileName))
+      slice.setBaseFile(new HoodieBaseFile(new StoragePathInfo(new 
StoragePath(fileName), 0L, false, 0, 0, 0)))
+      slice
+    }).toSeq
+
+    val partitionPath2 = DEFAULT_PARTITION_PATH(1)
+    val allFileSlices2: Seq[FileSlice] = allFileStatus.slice(3, 
5).map(fileName => {
+      val slice = new FileSlice(partitionPath1, "00000000000000000", 
FSUtils.getFileId(fileName))
+      slice.setBaseFile(new HoodieBaseFile(new StoragePathInfo(new 
StoragePath(fileName), 0L, false, 0, 0, 0)))
+      slice
+    }).toSeq
+
+    val input = Seq((Option.apply(new 
BaseHoodieTableFileIndex.PartitionPath(partitionPath1, Array())), 
allFileSlices1),
+      (Option.apply(new BaseHoodieTableFileIndex.PartitionPath(partitionPath2, 
Array())), allFileSlices2))
+    val candidate = bucketIndexSupport.computeCandidateFileNames(fileIndex, 
splitConjunctivePredicates(optimizerExpr),
+      Seq(), input, false)
+
+    assert(candidate.get.equals(expectResult))
+  }
+}

Reply via email to