This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4b52e27eb3e [HUDI-7954] Fix data skipping with secondary index when
there are no log files (#11575)
4b52e27eb3e is described below
commit 4b52e27eb3eb8e607e24a62e5ff672c67d72b020
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri Jul 5 21:54:49 2024 +0530
[HUDI-7954] Fix data skipping with secondary index when there are no log
files (#11575)
---
.../hudi/metadata/HoodieBackedTableMetadata.java | 9 +++++++--
.../hudi/functional/SecondaryIndexTestBase.scala | 22 ++++++++++++++--------
.../functional/TestSecondaryIndexPruning.scala | 10 ++++++++--
3 files changed, 29 insertions(+), 12 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index aa13ab0f0ad..04839c57a94 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -952,7 +952,6 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
Map<String, List<HoodieRecord<HoodieMetadataPayload>>> resultMap = new
HashMap<>();
if (reader == null) {
// No base file at all
- timings.add(timer.endTimer());
logRecordsMap.forEach((secondaryKey, logRecords) -> {
List<HoodieRecord<HoodieMetadataPayload>> recordList = new
ArrayList<>();
logRecords.values().forEach(record -> {
@@ -960,13 +959,19 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
});
resultMap.put(secondaryKey, recordList);
});
+ timings.add(timer.endTimer());
return resultMap;
}
HoodieTimer readTimer = HoodieTimer.start();
-
Map<String, List<HoodieRecord<HoodieMetadataPayload>>> baseFileRecordsMap =
fetchBaseFileAllRecordsByKeys(reader, sortedKeys, true, partitionName);
+ if (logRecordsMap.isEmpty() && !baseFileRecordsMap.isEmpty()) {
+ // file slice has only base file
+ timings.add(timer.endTimer());
+ return baseFileRecordsMap;
+ }
+
logRecordsMap.forEach((secondaryKey, logRecords) -> {
if (!baseFileRecordsMap.containsKey(secondaryKey)) {
List<HoodieRecord<HoodieMetadataPayload>> recordList = logRecords
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/SecondaryIndexTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/SecondaryIndexTestBase.scala
index ae3f383ec63..e346da37181 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/SecondaryIndexTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/SecondaryIndexTestBase.scala
@@ -90,22 +90,28 @@ class SecondaryIndexTestBase extends
HoodieSparkClientTestBase {
var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts,
includeLogFiles = true)
val filteredPartitionDirectories = fileIndex.listFiles(Seq(),
Seq(dataFilter))
val filteredFilesCount = filteredPartitionDirectories.flatMap(s =>
s.files).size
- assertTrue(filteredFilesCount < getLatestDataFilesCount(opts))
+ val latestDataFilesCount = getLatestDataFilesCount(opts)
+ assertTrue(filteredFilesCount > 0 && filteredFilesCount <
latestDataFilesCount)
// with no data skipping
fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts +
(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "false"), includeLogFiles =
true)
val filesCountWithNoSkipping = fileIndex.listFiles(Seq(),
Seq(dataFilter)).flatMap(s => s.files).size
- assertTrue(filesCountWithNoSkipping == getLatestDataFilesCount(opts))
+ assertTrue(filesCountWithNoSkipping == latestDataFilesCount)
}
private def getLatestDataFilesCount(opts: Map[String, String],
includeLogFiles: Boolean = true) = {
var totalLatestDataFiles = 0L
-
getTableFileSystemView(opts).getAllLatestFileSlicesBeforeOrOn(metaClient.getActiveTimeline.lastInstant().get().getTimestamp)
- .values()
- .forEach(JFunction.toJavaConsumer[java.util.stream.Stream[FileSlice]]
- (slices => slices.forEach(JFunction.toJavaConsumer[FileSlice](
- slice => totalLatestDataFiles += (if (includeLogFiles)
slice.getLogFiles.count() else 0)
- + (if (slice.getBaseFile.isPresent) 1 else 0)))))
+ val fsView: HoodieMetadataFileSystemView = getTableFileSystemView(opts)
+ try {
+
fsView.getAllLatestFileSlicesBeforeOrOn(metaClient.getActiveTimeline.lastInstant().get().getTimestamp)
+ .values()
+ .forEach(JFunction.toJavaConsumer[java.util.stream.Stream[FileSlice]]
+ (slices => slices.forEach(JFunction.toJavaConsumer[FileSlice](
+ slice => totalLatestDataFiles += (if (includeLogFiles)
slice.getLogFiles.count() else 0)
+ + (if (slice.getBaseFile.isPresent) 1 else 0)))))
+ } finally {
+ fsView.close()
+ }
totalLatestDataFiles
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index 9db6f6b9d99..e90e752502e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -71,14 +71,20 @@ class TestSecondaryIndexPruning extends
SecondaryIndexTestBase {
.setConf(HoodieTestUtils.getDefaultStorageConf)
.build()
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
- // validate data skipping
- verifyQueryPredicate(hudiOpts, "not_record_key_col")
// validate the secondary index records themselves
checkAnswer(s"select key, SecondaryIndexMetadata.recordKey from
hudi_metadata('$basePath') where type=7")(
Seq("abc", "row1"),
Seq("cde", "row2"),
Seq("def", "row3")
)
+ // validate data skipping with filters on secondary key column
+ spark.sql("set hoodie.metadata.enable=true")
+ spark.sql("set hoodie.enable.data.skipping=true")
+ spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+ checkAnswer(s"select ts, record_key_col, not_record_key_col,
partition_key_col from $tableName where not_record_key_col = 'abc'")(
+ Seq(1, "row1", "abc", "p1")
+ )
+ verifyQueryPredicate(hudiOpts, "not_record_key_col")
// create another secondary index on non-string column
spark.sql(s"create index idx_ts on $tableName using secondary_index(ts)")