This is an automated email from the ASF dual-hosted git repository.

wombatukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 62af6583e96 [HUDI-8326] Add some more functional index tests (#12153)
62af6583e96 is described below

commit 62af6583e96e91ffeee1b5e25ff890b90e0a4feb
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Nov 5 07:30:07 2024 +0530

    [HUDI-8326] Add some more functional index tests (#12153)
    
    * bas emethod
    
    * [HUDI-8326] Add some more functional index tests
    
    * fix hive sync test for func index
---
 .../org/apache/hudi/FunctionalIndexSupport.scala   |  10 +-
 .../hudi/command/index/TestFunctionalIndex.scala   | 221 ++++++++++++++++++++-
 2 files changed, 216 insertions(+), 15 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
index 5f66782f733..8d7ee0bf342 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
@@ -100,13 +100,11 @@ class FunctionalIndexSupport(spark: SparkSession,
       checkState(functionToColumnNames.size == 1, "Currently, only one 
function with functional index in the query is supported")
       val (indexFunction, targetColumnName) = functionToColumnNames.head
       val indexDefinitions = 
metaClient.getIndexMetadata.get().getIndexDefinitions
-      indexDefinitions.asScala.foreach {
-        case (indexPartition, indexDefinition) =>
-          if (indexDefinition.getIndexFunction.equals(indexFunction) && 
indexDefinition.getSourceFields.contains(targetColumnName)) {
-            Option.apply(indexPartition)
-          }
+      indexDefinitions.asScala.collectFirst {
+        case (indexPartition, indexDefinition)
+          if indexDefinition.getIndexFunction.equals(indexFunction) && 
indexDefinition.getSourceFields.contains(targetColumnName) =>
+          indexPartition
       }
-      Option.empty
     } else {
       Option.empty
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
index c4b76a6de88..7a25a442f95 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
@@ -19,26 +19,34 @@
 
 package org.apache.spark.sql.hudi.command.index
 
-import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.{DataSourceReadOptions, HoodieFileIndex, 
HoodieSparkUtils}
 import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.model.FileSlice
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.common.util.Option
-import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig}
+import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig, 
HoodieWriteConfig}
 import org.apache.hudi.hive.HiveSyncConfigHolder._
 import org.apache.hudi.hive.testutils.HiveTestUtil
 import org.apache.hudi.hive.{HiveSyncTool, HoodieHiveSyncClient}
-import org.apache.hudi.metadata.MetadataPartitionType
+import org.apache.hudi.metadata.{HoodieMetadataFileSystemView, 
MetadataPartitionType}
 import org.apache.hudi.sync.common.HoodieSyncConfig.{META_SYNC_BASE_PATH, 
META_SYNC_DATABASE_NAME, META_SYNC_NO_PARTITION_METADATA, META_SYNC_TABLE_NAME}
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
-import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.hudi.util.JFunction
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.{EqualTo, Expression, 
FromUnixTime, Literal}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.hudi.command.{CreateIndexCommand, 
ShowIndexesCommand}
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.scalatest.Ignore
 
+import scala.collection.JavaConverters
+
 @Ignore
 class TestFunctionalIndex extends HoodieSparkSqlTestBase {
 
@@ -100,11 +108,13 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
           tool.syncHoodieTable()
 
           // assert table created and no partition metadata
-          val hiveClient = new 
HoodieHiveSyncClient(HiveTestUtil.getHiveSyncConfig, metaClient)
-          assertTrue(hiveClient.tableExists("h0_ro"))
-          assertTrue(hiveClient.tableExists("h0_rt"))
-          assertEquals(0, hiveClient.getAllPartitions("h0_ro").size())
-          assertEquals(0, hiveClient.getAllPartitions("h0_rt").size())
+          val hiveClient = new 
HoodieHiveSyncClient(HiveTestUtil.getHiveSyncConfig, 
HoodieTableMetaClient.reload(metaClient))
+          val roTable = tableName + "_ro"
+          val rtTable = tableName + "_rt"
+          assertTrue(hiveClient.tableExists(roTable))
+          assertTrue(hiveClient.tableExists(rtTable))
+          assertEquals(0, hiveClient.getAllPartitions(roTable).size())
+          assertEquals(0, hiveClient.getAllPartitions(rtTable).size())
 
           // check query result
           checkAnswer(s"select id, name from $tableName where 
from_unixtime(ts, 'yyyy-MM-dd') = '1970-01-01'")(
@@ -648,4 +658,197 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
     assertResult(Some(expectedDatabaseName))(catalogTable.identifier.database)
     assertResult(expectedTableName)(catalogTable.identifier.table)
   }
+
+  test("Test Functional Index Insert after Initialization") {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      withTempDir { tmp =>
+        Seq("cow", "mor").foreach { tableType =>
+          val isPartitioned = true
+          val tableName = generateTableName + s"_init_$tableType$isPartitioned"
+          val partitionByClause = if (isPartitioned) "partitioned by(price)" 
else ""
+          val basePath = s"${tmp.getCanonicalPath}/$tableName"
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  name string,
+               |  ts long,
+               |  price int
+               |) using hudi
+               | options (
+               |  primaryKey ='id',
+               |  type = '$tableType',
+               |  preCombineField = 'ts'
+               | )
+               | $partitionByClause
+               | location '$basePath'
+       """.stripMargin)
+
+          writeRecordsAndValidateFunctionalIndex(tableName, basePath, 
"update", isDelete = false, shouldCompact = false, shouldCluster = false, 
shouldRollback = false)
+        }
+      }
+    }
+  }
+
+  test("Test Functional Index Rollback") {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      withTempDir { tmp =>
+        Seq("cow", "mor").foreach { tableType =>
+          val isPartitioned = true
+          val tableName = generateTableName + 
s"_rollback_$tableType$isPartitioned"
+          val partitionByClause = if (isPartitioned) "partitioned by(price)" 
else ""
+          val basePath = s"${tmp.getCanonicalPath}/$tableName"
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  name string,
+               |  ts long,
+               |  price int
+               |) using hudi
+               | options (
+               |  primaryKey ='id',
+               |  type = '$tableType',
+               |  preCombineField = 'ts'
+               | )
+               | $partitionByClause
+               | location '$basePath'
+       """.stripMargin)
+
+          writeRecordsAndValidateFunctionalIndex(tableName, basePath, 
"update", isDelete = false, shouldCompact = false, shouldCluster = false, 
shouldRollback = true)
+        }
+      }
+    }
+  }
+
+  /**
+   * Write records to the table with the given operation type and do updates 
or deletes, and then validate functional index.
+   */
+  private def writeRecordsAndValidateFunctionalIndex(tableName: String,
+                                                     basePath: String,
+                                                     operationType: String,
+                                                     isDelete: Boolean,
+                                                     shouldCompact: Boolean,
+                                                     shouldCluster: Boolean,
+                                                     shouldRollback: Boolean,
+                                                     shouldValidate: Boolean = 
true): Unit = {
+    // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2020-09-26
+    spark.sql(s"insert into $tableName values(1, 'a1', 1601098924, 10)")
+    // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2021-09-26
+    spark.sql(s"insert into $tableName values(2, 'a2', 1632634924, 100)")
+    // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2022-09-26
+    spark.sql(s"insert into $tableName values(3, 'a3', 1664170924, 1000)")
+    // create functional index
+    spark.sql(s"create index idx_datestr on $tableName using column_stats(ts) 
options(func='from_unixtime', format='yyyy-MM-dd')")
+    val metaClient = createMetaClient(spark, basePath)
+    // verify file pruning with filter on from_unixtime(ts, 'yyyy-MM-dd') = 
2020-09-26
+    val opts = Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> 
"true", HoodieMetadataConfig.ENABLE.key -> "true")
+    val dataFilter = {
+      val tsColumn = UnresolvedAttribute("ts")
+
+      // Define the format "yyyy-MM-dd" as a literal
+      val format = Literal("yyyy-MM-dd")
+
+      // Create the from_unixtime(ts, 'yyyy-MM-dd') expression
+      val fromUnixTimeExpr = FromUnixTime(tsColumn, format)
+
+      // Define the date to compare against as a literal
+      val targetDate = Literal("2024-03-26")
+
+      // Create the equality expression from_unixtime(ts, 'yyyy-MM-dd') = 
'2024-03-26'
+      EqualTo(fromUnixTimeExpr, targetDate)
+    }
+    verifyFilePruning(opts, dataFilter, metaClient)
+
+    // do the operation
+    if (isDelete) {
+      spark.sql(s"delete from $tableName where id=1")
+    } else {
+      spark.sql(s"insert into $tableName values(4, 'a4', 1727329324, 10000)")
+    }
+
+    // validate the functional index
+    val metadataSql = s"select ColumnStatsMetadata.minValue.member6.value, 
ColumnStatsMetadata.maxValue.member6.value, ColumnStatsMetadata.isDeleted from 
hudi_metadata('$tableName') where type=3"
+    // validate the functional index
+    checkAnswer(metadataSql)(
+      Seq("2020-09-26", "2020-09-26", false),
+      Seq("2021-09-26", "2021-09-26", false),
+      Seq("2022-09-26", "2022-09-26", false),
+      Seq("2024-09-26", "2024-09-26", false)
+    )
+
+    if (shouldRollback) {
+      // rollback the operation
+      val lastCompletedInstant = 
metaClient.reloadActiveTimeline().getCommitsTimeline.filterCompletedInstants().lastInstant()
+      val writeClient = new SparkRDDWriteClient(new 
HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)), 
getWriteConfig(Map.empty, metaClient.getBasePath.toString))
+      writeClient.rollback(lastCompletedInstant.get().getTimestamp)
+      // validate the functional index
+      checkAnswer(metadataSql)(
+        // the last commit is rolledback so no records for that
+        Seq("2020-09-26", "2020-09-26", false),
+        Seq("2021-09-26", "2021-09-26", false),
+        Seq("2022-09-26", "2022-09-26", false)
+      )
+    }
+  }
+
+  private def verifyFilePruning(opts: Map[String, String], dataFilter: 
Expression, metaClient: HoodieTableMetaClient, isDataSkippingExpected: Boolean 
= false, isNoScanExpected: Boolean = false): Unit = {
+    // with data skipping
+    val commonOpts = opts + ("path" -> metaClient.getBasePath.toString)
+    var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts, 
includeLogFiles = true)
+    try {
+      val filteredPartitionDirectories = fileIndex.listFiles(Seq(), 
Seq(dataFilter))
+      val filteredFilesCount = filteredPartitionDirectories.flatMap(s => 
s.files).size
+      val latestDataFilesCount = getLatestDataFilesCount(metaClient = 
metaClient)
+      if (isDataSkippingExpected) {
+        assertTrue(filteredFilesCount < latestDataFilesCount)
+        if (isNoScanExpected) {
+          assertTrue(filteredFilesCount == 0)
+        } else {
+          assertTrue(filteredFilesCount > 0)
+        }
+      } else {
+        assertTrue(filteredFilesCount == latestDataFilesCount)
+      }
+
+      // with no data skipping
+      fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts + 
(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "false"), includeLogFiles = 
true)
+      val filesCountWithNoSkipping = fileIndex.listFiles(Seq(), 
Seq(dataFilter)).flatMap(s => s.files).size
+      assertTrue(filesCountWithNoSkipping == latestDataFilesCount)
+    } finally {
+      fileIndex.close()
+    }
+  }
+
+  private def getLatestDataFilesCount(includeLogFiles: Boolean = true, 
metaClient: HoodieTableMetaClient) = {
+    var totalLatestDataFiles = 0L
+    val fsView: HoodieMetadataFileSystemView = 
getTableFileSystemView(metaClient)
+    try {
+      
fsView.getAllLatestFileSlicesBeforeOrOn(metaClient.getActiveTimeline.lastInstant().get().getTimestamp)
+        .values()
+        .forEach(JFunction.toJavaConsumer[java.util.stream.Stream[FileSlice]]
+          (slices => slices.forEach(JFunction.toJavaConsumer[FileSlice](
+            slice => totalLatestDataFiles += (if (includeLogFiles) 
slice.getLogFiles.count() else 0)
+              + (if (slice.getBaseFile.isPresent) 1 else 0)))))
+    } finally {
+      fsView.close()
+    }
+    totalLatestDataFiles
+  }
+
+  private def getTableFileSystemView(metaClient: HoodieTableMetaClient): 
HoodieMetadataFileSystemView = {
+    new HoodieMetadataFileSystemView(
+      new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)),
+      metaClient,
+      metaClient.getActiveTimeline,
+      
HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexPartitionStats(true).build())
+  }
+
+  private def getWriteConfig(hudiOpts: Map[String, String], basePath: String): 
HoodieWriteConfig = {
+    val props = 
TypedProperties.fromMap(JavaConverters.mapAsJavaMapConverter(hudiOpts).asJava)
+    HoodieWriteConfig.newBuilder()
+      .withProps(props)
+      .withPath(basePath)
+      .build()
+  }
 }

Reply via email to