This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b52e27eb3e [HUDI-7954] Fix data skipping with secondary index when 
there are no log files (#11575)
4b52e27eb3e is described below

commit 4b52e27eb3eb8e607e24a62e5ff672c67d72b020
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri Jul 5 21:54:49 2024 +0530

    [HUDI-7954] Fix data skipping with secondary index when there are no log 
files (#11575)
---
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  9 +++++++--
 .../hudi/functional/SecondaryIndexTestBase.scala   | 22 ++++++++++++++--------
 .../functional/TestSecondaryIndexPruning.scala     | 10 ++++++++--
 3 files changed, 29 insertions(+), 12 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index aa13ab0f0ad..04839c57a94 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -952,7 +952,6 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     Map<String, List<HoodieRecord<HoodieMetadataPayload>>> resultMap = new 
HashMap<>();
     if (reader == null) {
       // No base file at all
-      timings.add(timer.endTimer());
       logRecordsMap.forEach((secondaryKey, logRecords) -> {
         List<HoodieRecord<HoodieMetadataPayload>> recordList = new 
ArrayList<>();
         logRecords.values().forEach(record -> {
@@ -960,13 +959,19 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
         });
         resultMap.put(secondaryKey, recordList);
       });
+      timings.add(timer.endTimer());
       return resultMap;
     }
 
     HoodieTimer readTimer = HoodieTimer.start();
-
     Map<String, List<HoodieRecord<HoodieMetadataPayload>>> baseFileRecordsMap =
         fetchBaseFileAllRecordsByKeys(reader, sortedKeys, true, partitionName);
+    if (logRecordsMap.isEmpty() && !baseFileRecordsMap.isEmpty()) {
+      // file slice has only base file
+      timings.add(timer.endTimer());
+      return baseFileRecordsMap;
+    }
+
     logRecordsMap.forEach((secondaryKey, logRecords) -> {
       if (!baseFileRecordsMap.containsKey(secondaryKey)) {
         List<HoodieRecord<HoodieMetadataPayload>> recordList = logRecords
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/SecondaryIndexTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/SecondaryIndexTestBase.scala
index ae3f383ec63..e346da37181 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/SecondaryIndexTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/SecondaryIndexTestBase.scala
@@ -90,22 +90,28 @@ class SecondaryIndexTestBase extends 
HoodieSparkClientTestBase {
     var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts, 
includeLogFiles = true)
     val filteredPartitionDirectories = fileIndex.listFiles(Seq(), 
Seq(dataFilter))
     val filteredFilesCount = filteredPartitionDirectories.flatMap(s => 
s.files).size
-    assertTrue(filteredFilesCount < getLatestDataFilesCount(opts))
+    val latestDataFilesCount = getLatestDataFilesCount(opts)
+    assertTrue(filteredFilesCount > 0 && filteredFilesCount < 
latestDataFilesCount)
 
     // with no data skipping
     fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts + 
(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "false"), includeLogFiles = 
true)
     val filesCountWithNoSkipping = fileIndex.listFiles(Seq(), 
Seq(dataFilter)).flatMap(s => s.files).size
-    assertTrue(filesCountWithNoSkipping == getLatestDataFilesCount(opts))
+    assertTrue(filesCountWithNoSkipping == latestDataFilesCount)
   }
 
   private def getLatestDataFilesCount(opts: Map[String, String], 
includeLogFiles: Boolean = true) = {
     var totalLatestDataFiles = 0L
-    
getTableFileSystemView(opts).getAllLatestFileSlicesBeforeOrOn(metaClient.getActiveTimeline.lastInstant().get().getTimestamp)
-      .values()
-      .forEach(JFunction.toJavaConsumer[java.util.stream.Stream[FileSlice]]
-        (slices => slices.forEach(JFunction.toJavaConsumer[FileSlice](
-          slice => totalLatestDataFiles += (if (includeLogFiles) 
slice.getLogFiles.count() else 0)
-            + (if (slice.getBaseFile.isPresent) 1 else 0)))))
+    val fsView: HoodieMetadataFileSystemView = getTableFileSystemView(opts)
+    try {
+      
fsView.getAllLatestFileSlicesBeforeOrOn(metaClient.getActiveTimeline.lastInstant().get().getTimestamp)
+        .values()
+        .forEach(JFunction.toJavaConsumer[java.util.stream.Stream[FileSlice]]
+          (slices => slices.forEach(JFunction.toJavaConsumer[FileSlice](
+            slice => totalLatestDataFiles += (if (includeLogFiles) 
slice.getLogFiles.count() else 0)
+              + (if (slice.getBaseFile.isPresent) 1 else 0)))))
+    } finally {
+      fsView.close()
+    }
     totalLatestDataFiles
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index 9db6f6b9d99..e90e752502e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -71,14 +71,20 @@ class TestSecondaryIndexPruning extends 
SecondaryIndexTestBase {
         .setConf(HoodieTestUtils.getDefaultStorageConf)
         .build()
       
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
-      // validate data skipping
-      verifyQueryPredicate(hudiOpts, "not_record_key_col")
       // validate the secondary index records themselves
       checkAnswer(s"select key, SecondaryIndexMetadata.recordKey from 
hudi_metadata('$basePath') where type=7")(
         Seq("abc", "row1"),
         Seq("cde", "row2"),
         Seq("def", "row3")
       )
+      // validate data skipping with filters on secondary key column
+      spark.sql("set hoodie.metadata.enable=true")
+      spark.sql("set hoodie.enable.data.skipping=true")
+      spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+      checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col = 'abc'")(
+        Seq(1, "row1", "abc", "p1")
+      )
+      verifyQueryPredicate(hudiOpts, "not_record_key_col")
 
       // create another secondary index on non-string column
       spark.sql(s"create index idx_ts on $tableName using secondary_index(ts)")

Reply via email to