This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ad8f3d4e68a [HUDI-6891] Fix RO queries with RLI and record key 
predicate (#11975)
ad8f3d4e68a is described below

commit ad8f3d4e68a605c05e9aef7c443e56ecf4b01892
Author: Sagar Sumit <[email protected]>
AuthorDate: Wed Sep 25 17:00:58 2024 +0530

    [HUDI-6891] Fix RO queries with RLI and record key predicate (#11975)
---
 .../org/apache/hudi/BaseHoodieTableFileIndex.java  |  4 ++
 .../org/apache/hudi/ColumnStatsIndexSupport.scala  |  8 +--
 .../scala/org/apache/hudi/HoodieFileIndex.scala    |  2 +-
 .../org/apache/hudi/RecordLevelIndexSupport.scala  | 27 ++++++-
 .../org/apache/hudi/SparkBaseIndexSupport.scala    |  9 ++-
 .../apache/hudi/functional/TestMORDataSource.scala | 82 +++++++++++++++++++++-
 6 files changed, 122 insertions(+), 10 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java 
b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 5a0fd79fcc4..4688c160d69 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -195,6 +195,10 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
     resetTableMetadata(null);
   }
 
+  public HoodieTableQueryType getQueryType() {
+    return queryType;
+  }
+
   protected String[] getPartitionColumns() {
     return partitionColumns;
   }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index c3b5228d195..c471649d9a6 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -348,10 +348,10 @@ class ColumnStatsIndexSupport(spark: SparkSession,
     val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
       // NOTE: Explicit conversion is required for Scala 2.11
       metadataRecords.map(JFunction.toJavaSerializableFunction(record => {
-        toScalaOption(record.getData.getInsertValue(null, null))
-          .map(metadataRecord => 
metadataRecord.asInstanceOf[HoodieMetadataRecord].getColumnStatsMetadata)
-          .orNull
-      }))
+          toScalaOption(record.getData.getInsertValue(null, null))
+            .map(metadataRecord => 
metadataRecord.asInstanceOf[HoodieMetadataRecord].getColumnStatsMetadata)
+            .orNull
+        }))
         .filter(JFunction.toJavaSerializableFunction(columnStatsRecord => 
columnStatsRecord != null))
 
     columnStatsRecords
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 5ea4d460714..0c645c81893 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -400,7 +400,7 @@ case class HoodieFileIndex(spark: SparkSession,
     lazy val queryReferencedColumns = collectReferencedColumns(spark, 
queryFilters, schema)
     if (isDataSkippingEnabled) {
       for(indexSupport: SparkBaseIndexSupport <- indicesSupport) {
-        if (indexSupport.isIndexAvailable) {
+        if (indexSupport.isIndexAvailable && 
indexSupport.supportsQueryType(options)) {
           val prunedFileNames = indexSupport.computeCandidateIsStrict(spark, 
this, queryFilters, queryReferencedColumns,
             prunedPartitionsAndFileSlices, shouldPushDownFilesFilter)
           if (prunedFileNames.nonEmpty) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
index f37beef0727..708e0d47a0a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
@@ -17,17 +17,20 @@
 
 package org.apache.hudi
 
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, 
TIME_TRAVEL_AS_OF_INSTANT}
 import org.apache.hudi.RecordLevelIndexSupport.getPrunedStoragePaths
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.FileSlice
 import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField
+import org.apache.hudi.common.model.HoodieTableQueryType.SNAPSHOT
 import org.apache.hudi.common.table.HoodieTableMetaClient
+import 
org.apache.hudi.common.table.timeline.HoodieTimeline.{GREATER_THAN_OR_EQUALS, 
compareTimestamps}
 import org.apache.hudi.metadata.HoodieTableMetadataUtil
 import org.apache.hudi.storage.StoragePath
-
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, In, Literal}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
 
 import scala.collection.JavaConverters._
 import scala.collection.{JavaConverters, mutable}
@@ -91,6 +94,28 @@ class RecordLevelIndexSupport(spark: SparkSession,
   def isIndexAvailable: Boolean = {
     metadataConfig.isEnabled && 
metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)
   }
+
+  /**
+   * Returns true if the query type is supported by the index.
+   */
+  override def supportsQueryType(options: Map[String, String]): Boolean = {
+    if (!options.getOrElse(QUERY_TYPE.key, 
QUERY_TYPE.defaultValue).equalsIgnoreCase(SNAPSHOT.name)) {
+      // Disallow RLI for non-snapshot query types
+      false
+    } else {
+      // Now handle the time-travel case for snapshot queries
+      options.get(TIME_TRAVEL_AS_OF_INSTANT.key)
+        .fold {
+          // No time travel instant specified, so allow if it's a snapshot 
query
+          true
+        } { instant =>
+          // Check if the as.of.instant is greater than or equal to the last 
completed instant.
+          // We can still use RLI for data skipping for the latest snapshot.
+          compareTimestamps(HoodieSqlCommonUtils.formatQueryInstant(instant),
+            GREATER_THAN_OR_EQUALS, 
metaClient.getCommitsTimeline.filterCompletedInstants.lastInstant.get.getTimestamp)
+        }
+    }
+  }
 }
 
 object RecordLevelIndexSupport {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
index 2371e4b066e..32c318f1049 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.FileSlice
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata}
 import org.apache.hudi.util.JFunction
-
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.catalyst.expressions.{And, Expression}
 import 
org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr
@@ -45,6 +44,14 @@ abstract class SparkBaseIndexSupport(spark: SparkSession,
 
   def isIndexAvailable: Boolean
 
+  /**
+   * Returns true if the query type is supported by the index.
+   *
+   * TODO: The default implementation should be changed to throw
+   * an exception once time travel support for metadata table is added.
+   */
+  def supportsQueryType(options: Map[String, String]): Boolean = true
+
   def computeCandidateIsStrict(spark: SparkSession,
                                fileIndex: HoodieFileIndex,
                                queryFilters: Seq[Expression],
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 48fecc6cd95..a78b216cc38 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -35,9 +35,9 @@ import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.table.action.compact.CompactionTriggerStrategy
 import org.apache.hudi.testutils.{DataSourceTestUtils, 
HoodieSparkClientTestBase}
 import org.apache.hudi.util.JFunction
-import org.apache.hudi.{DataSourceReadOptions, DataSourceUtils, 
DataSourceWriteOptions, HoodieDataSourceHelpers, DefaultSparkRecordMerger, 
SparkDatasetMixin}
-
+import org.apache.hudi.{DataSourceReadOptions, DataSourceUtils, 
DataSourceWriteOptions, DefaultSparkRecordMerger, HoodieDataSourceHelpers, 
SparkDatasetMixin}
 import org.apache.hadoop.fs.Path
+import org.apache.hudi.QuickstartUtils.convertToStringList
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
@@ -49,7 +49,6 @@ import org.junit.jupiter.params.provider.{CsvSource, 
EnumSource, ValueSource}
 import org.slf4j.LoggerFactory
 
 import java.util.function.Consumer
-
 import scala.collection.JavaConverters._
 
 /**
@@ -1418,4 +1417,81 @@ class TestMORDataSource extends 
HoodieSparkClientTestBase with SparkDatasetMixin
     metaClient = createMetaClient(spark, basePath)
     assertEquals(metaClient.getTableConfig.getRecordMergerStrategy, 
mergerStrategyName)
   }
+
+  /**
+   * Test Read-Optimized and time travel query on MOR table with RECORD_INDEX 
enabled.
+   */
+  @Test
+  def testReadOptimizedAndTimeTravelWithRecordIndex(): Unit = {
+    var (writeOpts, readOpts) = getWriterReaderOpts()
+    writeOpts = writeOpts ++ Map(
+      DataSourceWriteOptions.TABLE_TYPE.key -> 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
+      HoodieCompactionConfig.INLINE_COMPACT.key -> "false",
+      HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key -> "0",
+      HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true",
+      HoodieIndexConfig.INDEX_TYPE.key -> IndexType.RECORD_INDEX.name()
+    )
+    readOpts = readOpts ++ Map(
+      HoodieMetadataConfig.ENABLE.key -> "true",
+      DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true"
+    )
+    initMetaClient(HoodieTableType.MERGE_ON_READ)
+    // Create a MOR table and add three records to the table.
+    val records = recordsToStrings(dataGen.generateInserts("000", 
3)).asScala.toSeq
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+    inputDF.write.format("org.apache.hudi")
+      .options(writeOpts)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    var roDf = spark.read.format("hudi")
+      .options(readOpts)
+      .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+      .load(basePath)
+    // assert count
+    assertEquals(3, roDf.count())
+
+    // choose a record to delete
+    val deleteRecord = 
recordsToStrings(dataGen.generateUniqueDeleteRecords("002", 1)).asScala.toSeq
+    // get the record key from the deleted record records2
+    val recordKey = 
deleteRecord.head.split(",")(1).split(":")(1).trim.replace("\"", "")
+    // delete the record
+    val inputDF2 = 
spark.read.json(spark.sparkContext.parallelize(deleteRecord, 1))
+    inputDF2.write.format("org.apache.hudi")
+      .options(writeOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    // load RO view again with data skipping enabled
+    roDf = spark.read.format("hudi")
+      .options(readOpts)
+      .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+      .load(basePath)
+
+    // There should still be 3 records in RO view
+    assertEquals(3, roDf.count())
+    // deleted record should still show in RO view
+    assertEquals(1, roDf.where(s"_row_key = '$recordKey'").count())
+
+    // load snapshot view
+    val snapshotDF = spark.read.format("hudi")
+      .options(readOpts)
+      .load(basePath)
+    // There should be only 2 records in snapshot view
+    assertEquals(2, snapshotDF.count())
+    // deleted record should NOT show in snapshot view
+    assertEquals(0, snapshotDF.where(s"_row_key = '$recordKey'").count())
+
+    // get the first instant on the timeline
+    val firstInstant = 
metaClient.reloadActiveTimeline().filterCompletedInstants().firstInstant().get()
+    // do a time travel query with data skipping enabled
+    val timeTravelDF = spark.read.format("hudi")
+      .options(readOpts)
+      .option("as.of.instant", firstInstant.getTimestamp)
+      .load(basePath)
+    // there should still be 3 records in time travel view
+    assertEquals(3, timeTravelDF.count())
+    // deleted record should still show in time travel view
+    assertEquals(1, timeTravelDF.where(s"_row_key = '$recordKey'").count())
+  }
 }

Reply via email to