This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new eeafa734a6c [HUDI-7903] Fix storage partition stats index to skip data 
(#11472)
eeafa734a6c is described below

commit eeafa734a6cfb431b608cc67f8dcfe66249d7ec3
Author: Sagar Sumit <[email protected]>
AuthorDate: Sat Jun 29 20:08:11 2024 +0530

    [HUDI-7903] Fix storage partition stats index to skip data (#11472)
    
    * [HUDI-7903] Fix storage partition stats index to skip data
    
    * Fix SQL and checkstyle
    
    * handle npe in partition stats records
    
    * Remove decimal field due to Spark 2.4 cast issue
    
    * Address test comments
    
    ---------
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../hudi/metadata/HoodieMetadataPayload.java       |   5 +-
 .../apache/hudi/PartitionStatsIndexSupport.scala   |   4 +-
 .../TestPartitionStatsIndexWithSql.scala           | 363 +++++++++++++++++----
 3 files changed, 302 insertions(+), 70 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 7b9359d5dbb..36ed57c87f5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -68,6 +68,7 @@ import java.util.stream.Stream;
 
 import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
 import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
+import static org.apache.hudi.common.util.StringUtils.nonEmpty;
 import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -686,10 +687,11 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
     return columnRangeMetadataList.stream().map(columnRangeMetadata -> {
       HoodieKey key = new HoodieKey(getPartitionStatsIndexKey(partitionPath, 
columnRangeMetadata.getColumnName()),
           MetadataPartitionType.PARTITION_STATS.getPartitionPath());
+      String fileName = nonEmpty(columnRangeMetadata.getFilePath()) ? new 
StoragePath(columnRangeMetadata.getFilePath()).getName() : null;
 
       HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(),
           HoodieMetadataColumnStats.newBuilder()
-              .setFileName(null)
+              .setFileName(fileName)
               .setColumnName(columnRangeMetadata.getColumnName())
               
.setMinValue(wrapValueIntoAvro(columnRangeMetadata.getMinValue()))
               
.setMaxValue(wrapValueIntoAvro(columnRangeMetadata.getMaxValue()))
@@ -713,7 +715,6 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
   @SuppressWarnings({"rawtypes", "unchecked"})
   private static HoodieMetadataColumnStats 
mergeColumnStatsRecords(HoodieMetadataColumnStats prevColumnStats,
                                                                    
HoodieMetadataColumnStats newColumnStats) {
-    checkArgument(Objects.equals(prevColumnStats.getFileName(), 
newColumnStats.getFileName()));
     checkArgument(Objects.equals(prevColumnStats.getColumnName(), 
newColumnStats.getColumnName()));
 
     // We're handling 2 cases in here
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
index 30e9b936aad..51b9f8eaaeb 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
@@ -29,6 +29,7 @@ import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.common.util.hash.ColumnIndexID
 import org.apache.hudi.metadata.{HoodieMetadataPayload, 
HoodieTableMetadataUtil}
 import org.apache.hudi.util.JFunction
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.types.StructType
 
@@ -39,7 +40,7 @@ class PartitionStatsIndexSupport(spark: SparkSession,
                                  @transient metadataConfig: 
HoodieMetadataConfig,
                                  @transient metaClient: HoodieTableMetaClient,
                                  allowCaching: Boolean = false)
-  extends ColumnStatsIndexSupport(spark, tableSchema, metadataConfig, 
metaClient, allowCaching) {
+  extends ColumnStatsIndexSupport(spark, tableSchema, metadataConfig, 
metaClient, allowCaching) with Logging {
 
   override def getIndexName: String = PartitionStatsIndexSupport.INDEX_NAME
 
@@ -51,6 +52,7 @@ class PartitionStatsIndexSupport(spark: SparkSession,
   override def loadColumnStatsIndexRecords(targetColumns: Seq[String], 
shouldReadInMemory: Boolean): HoodieData[HoodieMetadataColumnStats] = {
     checkState(targetColumns.nonEmpty)
     val encodedTargetColumnNames = targetColumns.map(colName => new 
ColumnIndexID(colName).asBase64EncodedString())
+    logDebug(s"Loading column stats for columns: ${targetColumns.mkString(", 
")},  Encoded column names: ${encodedTargetColumnNames.mkString(", ")}")
     val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
       metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames.asJava, 
HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS, shouldReadInMemory)
     val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
index 0e337cc3995..2be4a37c915 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
@@ -19,11 +19,20 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hudi.common.model.WriteOperationType
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.{FileSlice, WriteOperationType}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.testutils.HoodieTestUtils
-
+import org.apache.hudi.metadata.HoodieMetadataFileSystemView
+import org.apache.hudi.metadata.MetadataPartitionType.PARTITION_STATS
+import org.apache.hudi.util.JFunction
+import org.apache.hudi.{DataSourceReadOptions, HoodieFileIndex}
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, GreaterThan, LessThan, Literal}
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+import org.apache.spark.sql.types.{IntegerType, StringType}
+import org.junit.jupiter.api.Assertions.assertTrue
 import org.junit.jupiter.api.Tag
 
 import scala.collection.JavaConverters._
@@ -34,74 +43,294 @@ class TestPartitionStatsIndexWithSql extends 
HoodieSparkSqlTestBase {
   val sqlTempTable = "hudi_tbl"
 
   test("Test partition stats index following insert, merge into, update and 
delete") {
-    withTempDir { tmp =>
-      val tableName = generateTableName
-      val tablePath = s"${tmp.getCanonicalPath}/$tableName"
-      // Create table with date type partition
-      spark.sql(
-        s"""
-           | create table $tableName using hudi
-           | partitioned by (dt)
-           | tblproperties(
-           |    primaryKey = 'id',
-           |    preCombineField = 'ts',
-           |    'hoodie.metadata.index.partition.stats.enable' = 'true'
-           | )
-           | location '$tablePath'
-           | AS
-           | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, 
cast('2021-05-06' as date) as dt
+    Seq("cow", "mor").foreach { tableType =>
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+        // Create table with date type partition
+        spark.sql(
+          s"""
+             | create table $tableName using hudi
+             | partitioned by (dt)
+             | tblproperties(
+             |    type = '$tableType',
+             |    primaryKey = 'id',
+             |    preCombineField = 'ts',
+             |    'hoodie.metadata.index.partition.stats.enable' = 'true',
+             |    'hoodie.metadata.index.column.stats.column.list' = 'name'
+             | )
+             | location '$tablePath'
+             | AS
+             | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, 
cast('2021-05-06' as date) as dt
          """.stripMargin
-      )
+        )
+
+        assertResult(WriteOperationType.BULK_INSERT) {
+          HoodieSparkSqlTestBase.getLastCommitMetadata(spark, 
tablePath).getOperationType
+        }
+        checkAnswer(s"select id, name, price, ts, cast(dt as string) from 
$tableName")(
+          Seq(1, "a1", 10, 1000, "2021-05-06")
+        )
+
+        val partitionValue = "2021-05-06"
+
+        // Check the missing properties for spark sql
+        val metaClient = HoodieTableMetaClient.builder()
+          .setBasePath(tablePath)
+          .setConf(HoodieTestUtils.getDefaultStorageConf)
+          .build()
+        val properties = metaClient.getTableConfig.getProps.asScala.toMap
+        
assertResult(true)(properties.contains(HoodieTableConfig.CREATE_SCHEMA.key))
+        assertResult("dt")(properties(HoodieTableConfig.PARTITION_FIELDS.key))
+        assertResult("ts")(properties(HoodieTableConfig.PRECOMBINE_FIELD.key))
+        assertResult(tableName)(metaClient.getTableConfig.getTableName)
+        // Validate partition_stats index exists
+        
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(PARTITION_STATS.getPartitionPath))
+
+        // Test insert into
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, 
cast('$partitionValue' as date))")
+        checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, 
name, price, ts, cast(dt as string) from $tableName order by id")(
+          Seq("1", s"dt=$partitionValue", 1, "a1", 10, 1000, partitionValue),
+          Seq("2", s"dt=$partitionValue", 2, "a2", 10, 1000, partitionValue)
+        )
+        // Test merge into
+        spark.sql(
+          s"""
+             |merge into $tableName h0
+             |using (select 1 as id, 'a1' as name, 11 as price, 1001 as ts, 
'$partitionValue' as dt) s0
+             |on h0.id = s0.id
+             |when matched then update set *
+             |""".stripMargin)
+        checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, 
name, price, ts, cast(dt as string) from $tableName order by id")(
+          Seq("1", s"dt=$partitionValue", 1, "a1", 11, 1001, partitionValue),
+          Seq("2", s"dt=$partitionValue", 2, "a2", 10, 1000, partitionValue)
+        )
+        // Test update
+        spark.sql(s"update $tableName set price = price + 1 where id = 2")
+        checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, 
name, price, ts, cast(dt as string) from $tableName order by id")(
+          Seq("1", s"dt=$partitionValue", 1, "a1", 11, 1001, partitionValue),
+          Seq("2", s"dt=$partitionValue", 2, "a2", 11, 1000, partitionValue)
+        )
+        // Test delete
+        spark.sql(s"delete from $tableName where id = 1")
+        checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, 
name, price, ts, cast(dt as string) from $tableName order by id")(
+          Seq("2", s"dt=$partitionValue", 2, "a2", 11, 1000, partitionValue)
+        )
+      }
+    }
+  }
+
+  test("Test partition stats index on string type field with insert and file 
pruning") {
+    Seq("cow", "mor").foreach { tableType =>
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+        // Create table with date type partition
+        spark.sql(
+          s"""
+             | create table $tableName (
+             |    ts BIGINT,
+             |    uuid STRING,
+             |    rider STRING,
+             |    driver STRING,
+             |    city STRING,
+             |    state STRING
+             |) using hudi
+             | options(
+             |    type = '$tableType',
+             |    primaryKey ='uuid',
+             |    preCombineField = 'ts',
+             |    hoodie.metadata.index.partition.stats.enable = 'true',
+             |    hoodie.metadata.index.column.stats.column.list = 'rider'
+             |)
+             |PARTITIONED BY (state)
+             |location '$tablePath'
+         """.stripMargin
+        )
+        // set small file limit to 0 so that each insert creates a new file
+        spark.sql("set hoodie.parquet.small.file.limit=0")
+        // insert data in below pattern so that multiple records for 'texas' 
and 'california' partition are in same file
+        spark.sql(
+          s"""
+             | insert into $tableName
+             | values 
(1695159649,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K','san_francisco','california'),
 
(1695091554,'e96c4396-3fad-413a-a942-4cb36106d721','rider-F','driver-M','sunnyvale','california')
+             | """.stripMargin
+        )
+        spark.sql(s"INSERT INTO $tableName VALUES 
(1695332066,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-B','driver-L','new 
york city','new york')")
+        spark.sql(s"INSERT INTO $tableName VALUES 
(1695516137,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-D','driver-M','princeton','new
 jersey')")
+        spark.sql(
+          s"""
+             | insert into $tableName
+             | values 
(1695516137,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-C','driver-P','houston','texas'),
 
(1695332066,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O','austin','texas')
+             | """.stripMargin
+        )
+
+        // Validate partition_stats index exists
+        val metaClient = HoodieTableMetaClient.builder()
+          .setBasePath(tablePath)
+          .setConf(HoodieTestUtils.getDefaultStorageConf)
+          .build()
+        assertResult(tableName)(metaClient.getTableConfig.getTableName)
+        
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(PARTITION_STATS.getPartitionPath))
+
+        // Test pruning
+        spark.sql("set hoodie.metadata.enable=true")
+        spark.sql("set hoodie.enable.data.skipping=true")
+        spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+        checkAnswer(s"select uuid, rider, city, state from $tableName where 
rider > 'rider-D'")(
+          Seq("1dced545-862b-4ceb-8b43-d2a568f6616b", "rider-E", "austin", 
"texas"),
+          Seq("e96c4396-3fad-413a-a942-4cb36106d721", "rider-F", "sunnyvale", 
"california")
+        )
+
+        verifyFilePruning(
+          Map(
+            DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
+            HoodieMetadataConfig.ENABLE.key -> "true"),
+          GreaterThan(AttributeReference("rider", StringType)(), 
Literal("rider-D")),
+          HoodieTableMetaClient.reload(metaClient),
+          isDataSkippingExpected = true)
+
+        // Test predicate that does not match any partition, should scan no 
files
+        checkAnswer(s"select uuid, rider, city, state from $tableName where 
rider > 'rider-Z'")()
+        verifyFilePruning(
+          Map(
+            DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
+            HoodieMetadataConfig.ENABLE.key -> "true"),
+          GreaterThan(AttributeReference("rider", StringType)(), 
Literal("rider-Z")),
+          HoodieTableMetaClient.reload(metaClient),
+          isDataSkippingExpected = true,
+          isNoScanExpected = true)
+        // Test predicate that matches all partitions, will end up scanning 
all partitions
+        checkAnswer(s"select uuid, rider, city, state from $tableName where 
rider < 'rider-Z'")(
+          Seq("334e26e9-8355-45cc-97c6-c31daf0df330", "rider-A", 
"san_francisco", "california"),
+          Seq("7a84095f-737f-40bc-b62f-6b69664712d2", "rider-B", "new york 
city", "new york"),
+          Seq("e3cf430c-889d-4015-bc98-59bdce1e530c", "rider-C", "houston", 
"texas"),
+          Seq("3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04", "rider-D", "princeton", 
"new jersey"),
+          Seq("1dced545-862b-4ceb-8b43-d2a568f6616b", "rider-E", "austin", 
"texas"),
+          Seq("e96c4396-3fad-413a-a942-4cb36106d721", "rider-F", "sunnyvale", 
"california")
+        )
+
+        verifyFilePruning(
+          Map(
+            DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
+            HoodieMetadataConfig.ENABLE.key -> "true"),
+          LessThan(AttributeReference("rider", StringType)(), 
Literal("rider-Z")),
+          HoodieTableMetaClient.reload(metaClient),
+          isDataSkippingExpected = false)
+      }
+    }
+  }
+
+  test(s"Test partition stats index on int type field with update and file 
pruning") {
+    Seq("cow", "mor").foreach { tableType =>
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  ts long,
+             |  price int
+             |) using hudi
+             |partitioned by (ts)
+             |tblproperties (
+             |  type = '$tableType',
+             |  primaryKey = 'id',
+             |  preCombineField = 'price',
+             |  hoodie.metadata.index.partition.stats.enable = 'true',
+             |  hoodie.metadata.index.column.stats.column.list = 'price'
+             |)
+             |location '$tablePath'
+             |""".stripMargin
+        )
+
+        spark.sql(
+          s"""
+             | insert into $tableName
+             | values (1, 'a1', 1000, 10), (2, 'a2', 2000, 20), (3, 'a3', 
3000, 30), (4, 'a4', 2000, 10), (5, 'a5', 3000, 20), (6, 'a6', 4000, 30)
+             | """.stripMargin
+        )
+
+        // Validate partition_stats index exists
+        val metaClient = HoodieTableMetaClient.builder()
+          .setBasePath(tablePath)
+          .setConf(HoodieTestUtils.getDefaultStorageConf)
+          .build()
+        assertResult(tableName)(metaClient.getTableConfig.getTableName)
+        
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(PARTITION_STATS.getPartitionPath))
+
+        spark.sql("set hoodie.metadata.enable=true")
+        spark.sql("set hoodie.enable.data.skipping=true")
+        spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+        checkAnswer(s"select id, name, price, ts from $tableName where 
price>3000")(
+          Seq(6, "a6", 4000, 30)
+        )
 
-      assertResult(WriteOperationType.BULK_INSERT) {
-        HoodieSparkSqlTestBase.getLastCommitMetadata(spark, 
tablePath).getOperationType
+        // Test price update, assert latest value and ensure file pruning
+        spark.sql(s"update $tableName set price = price + 1 where id = 6")
+        checkAnswer(s"select id, name, price, ts from $tableName where 
price>3000")(
+          Seq(6, "a6", 4001, 30)
+        )
+
+        verifyFilePruning(
+          Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", 
HoodieMetadataConfig.ENABLE.key -> "true"),
+          GreaterThan(AttributeReference("price", IntegerType)(), 
Literal(3000)),
+          HoodieTableMetaClient.reload(metaClient),
+          isDataSkippingExpected = true)
       }
-      checkAnswer(s"select id, name, price, ts, cast(dt as string) from 
$tableName")(
-        Seq(1, "a1", 10, 1000, "2021-05-06")
-      )
-
-      val partitionValue = "2021-05-06"
-
-      // Check the missing properties for spark sql
-      val metaClient = HoodieTableMetaClient.builder()
-        .setBasePath(tablePath)
-        .setConf(HoodieTestUtils.getDefaultStorageConf)
-        .build()
-      val properties = metaClient.getTableConfig.getProps.asScala.toMap
-      
assertResult(true)(properties.contains(HoodieTableConfig.CREATE_SCHEMA.key))
-      assertResult("dt")(properties(HoodieTableConfig.PARTITION_FIELDS.key))
-      assertResult("ts")(properties(HoodieTableConfig.PRECOMBINE_FIELD.key))
-      assertResult(tableName)(metaClient.getTableConfig.getTableName)
-
-      // Test insert into
-      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, 
cast('$partitionValue' as date))")
-      checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, 
name, price, ts, cast(dt as string) from $tableName order by id")(
-        Seq("1", s"dt=$partitionValue", 1, "a1", 10, 1000, partitionValue),
-        Seq("2", s"dt=$partitionValue", 2, "a2", 10, 1000, partitionValue)
-      )
-      // Test merge into
-      spark.sql(
-        s"""
-           |merge into $tableName h0
-           |using (select 1 as id, 'a1' as name, 11 as price, 1001 as ts, 
'$partitionValue' as dt) s0
-           |on h0.id = s0.id
-           |when matched then update set *
-           |""".stripMargin)
-      checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, 
name, price, ts, cast(dt as string) from $tableName order by id")(
-        Seq("1", s"dt=$partitionValue", 1, "a1", 11, 1001, partitionValue),
-        Seq("2", s"dt=$partitionValue", 2, "a2", 10, 1000, partitionValue)
-      )
-      // Test update
-      spark.sql(s"update $tableName set price = price + 1 where id = 2")
-      checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, 
name, price, ts, cast(dt as string) from $tableName order by id")(
-        Seq("1", s"dt=$partitionValue", 1, "a1", 11, 1001, partitionValue),
-        Seq("2", s"dt=$partitionValue", 2, "a2", 11, 1000, partitionValue)
-      )
-      // Test delete
-      spark.sql(s"delete from $tableName where id = 1")
-      checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, 
name, price, ts, cast(dt as string) from $tableName order by id")(
-        Seq("2", s"dt=$partitionValue", 2, "a2", 11, 1000, partitionValue)
-      )
     }
   }
+
+  private def verifyFilePruning(opts: Map[String, String], dataFilter: 
Expression, metaClient: HoodieTableMetaClient, isDataSkippingExpected: Boolean, 
isNoScanExpected: Boolean = false): Unit = {
+    // with data skipping
+    val commonOpts = opts + ("path" -> metaClient.getBasePath.toString)
+    var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts, 
includeLogFiles = true)
+    try {
+      val filteredPartitionDirectories = fileIndex.listFiles(Seq(), 
Seq(dataFilter))
+      val filteredFilesCount = filteredPartitionDirectories.flatMap(s => 
s.files).size
+      val latestDataFilesCount = getLatestDataFilesCount(metaClient = 
metaClient)
+      if (isDataSkippingExpected) {
+        assertTrue(filteredFilesCount < latestDataFilesCount)
+        if (isNoScanExpected) {
+          assertTrue(filteredFilesCount == 0)
+        }
+      } else {
+        assertTrue(filteredFilesCount == latestDataFilesCount)
+      }
+
+      // with no data skipping
+      fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts + 
(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "false"), includeLogFiles = 
true)
+      val filesCountWithNoSkipping = fileIndex.listFiles(Seq(), 
Seq(dataFilter)).flatMap(s => s.files).size
+      assertTrue(filesCountWithNoSkipping == latestDataFilesCount)
+    } finally {
+      fileIndex.close()
+    }
+  }
+
+  private def getLatestDataFilesCount(includeLogFiles: Boolean = true, 
metaClient: HoodieTableMetaClient) = {
+    var totalLatestDataFiles = 0L
+    val fsView: HoodieMetadataFileSystemView = 
getTableFileSystemView(metaClient)
+    try {
+      
fsView.getAllLatestFileSlicesBeforeOrOn(metaClient.getActiveTimeline.lastInstant().get().getTimestamp)
+        .values()
+        .forEach(JFunction.toJavaConsumer[java.util.stream.Stream[FileSlice]]
+          (slices => slices.forEach(JFunction.toJavaConsumer[FileSlice](
+            slice => totalLatestDataFiles += (if (includeLogFiles) 
slice.getLogFiles.count() else 0)
+              + (if (slice.getBaseFile.isPresent) 1 else 0)))))
+    } finally {
+      fsView.close()
+    }
+    totalLatestDataFiles
+  }
+
+  private def getTableFileSystemView(metaClient: HoodieTableMetaClient): 
HoodieMetadataFileSystemView = {
+    new HoodieMetadataFileSystemView(
+      new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)),
+      metaClient,
+      metaClient.getActiveTimeline,
+      
HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexPartitionStats(true).build())
+  }
+
 }

Reply via email to