This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new eeafa734a6c [HUDI-7903] Fix storage partition stats index to skip data
(#11472)
eeafa734a6c is described below
commit eeafa734a6cfb431b608cc67f8dcfe66249d7ec3
Author: Sagar Sumit <[email protected]>
AuthorDate: Sat Jun 29 20:08:11 2024 +0530
[HUDI-7903] Fix storage partition stats index to skip data (#11472)
* [HUDI-7903] Fix storage partition stats index to skip data
* Fix SQL and checkstyle
* handle npe in partition stats records
* Remove decimal field due to Spark 2.4 cast issue
* Address test comments
---------
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../hudi/metadata/HoodieMetadataPayload.java | 5 +-
.../apache/hudi/PartitionStatsIndexSupport.scala | 4 +-
.../TestPartitionStatsIndexWithSql.scala | 363 +++++++++++++++++----
3 files changed, 302 insertions(+), 70 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 7b9359d5dbb..36ed57c87f5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -68,6 +68,7 @@ import java.util.stream.Stream;
import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
+import static org.apache.hudi.common.util.StringUtils.nonEmpty;
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -686,10 +687,11 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
return columnRangeMetadataList.stream().map(columnRangeMetadata -> {
HoodieKey key = new HoodieKey(getPartitionStatsIndexKey(partitionPath,
columnRangeMetadata.getColumnName()),
MetadataPartitionType.PARTITION_STATS.getPartitionPath());
+ String fileName = nonEmpty(columnRangeMetadata.getFilePath()) ? new
StoragePath(columnRangeMetadata.getFilePath()).getName() : null;
HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(),
HoodieMetadataColumnStats.newBuilder()
- .setFileName(null)
+ .setFileName(fileName)
.setColumnName(columnRangeMetadata.getColumnName())
.setMinValue(wrapValueIntoAvro(columnRangeMetadata.getMinValue()))
.setMaxValue(wrapValueIntoAvro(columnRangeMetadata.getMaxValue()))
@@ -713,7 +715,6 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
@SuppressWarnings({"rawtypes", "unchecked"})
private static HoodieMetadataColumnStats
mergeColumnStatsRecords(HoodieMetadataColumnStats prevColumnStats,
HoodieMetadataColumnStats newColumnStats) {
- checkArgument(Objects.equals(prevColumnStats.getFileName(),
newColumnStats.getFileName()));
checkArgument(Objects.equals(prevColumnStats.getColumnName(),
newColumnStats.getColumnName()));
// We're handling 2 cases in here
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
index 30e9b936aad..51b9f8eaaeb 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
@@ -29,6 +29,7 @@ import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.hash.ColumnIndexID
import org.apache.hudi.metadata.{HoodieMetadataPayload,
HoodieTableMetadataUtil}
import org.apache.hudi.util.JFunction
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
@@ -39,7 +40,7 @@ class PartitionStatsIndexSupport(spark: SparkSession,
@transient metadataConfig:
HoodieMetadataConfig,
@transient metaClient: HoodieTableMetaClient,
allowCaching: Boolean = false)
- extends ColumnStatsIndexSupport(spark, tableSchema, metadataConfig,
metaClient, allowCaching) {
+ extends ColumnStatsIndexSupport(spark, tableSchema, metadataConfig,
metaClient, allowCaching) with Logging {
override def getIndexName: String = PartitionStatsIndexSupport.INDEX_NAME
@@ -51,6 +52,7 @@ class PartitionStatsIndexSupport(spark: SparkSession,
override def loadColumnStatsIndexRecords(targetColumns: Seq[String],
shouldReadInMemory: Boolean): HoodieData[HoodieMetadataColumnStats] = {
checkState(targetColumns.nonEmpty)
val encodedTargetColumnNames = targetColumns.map(colName => new
ColumnIndexID(colName).asBase64EncodedString())
+ logDebug(s"Loading column stats for columns: ${targetColumns.mkString(",
")}, Encoded column names: ${encodedTargetColumnNames.mkString(", ")}")
val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames.asJava,
HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS, shouldReadInMemory)
val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
index 0e337cc3995..2be4a37c915 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
@@ -19,11 +19,20 @@
package org.apache.hudi.functional
-import org.apache.hudi.common.model.WriteOperationType
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.{FileSlice, WriteOperationType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.testutils.HoodieTestUtils
-
+import org.apache.hudi.metadata.HoodieMetadataFileSystemView
+import org.apache.hudi.metadata.MetadataPartitionType.PARTITION_STATS
+import org.apache.hudi.util.JFunction
+import org.apache.hudi.{DataSourceReadOptions, HoodieFileIndex}
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression, GreaterThan, LessThan, Literal}
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+import org.apache.spark.sql.types.{IntegerType, StringType}
+import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Tag
import scala.collection.JavaConverters._
@@ -34,74 +43,294 @@ class TestPartitionStatsIndexWithSql extends
HoodieSparkSqlTestBase {
val sqlTempTable = "hudi_tbl"
test("Test partition stats index following insert, merge into, update and
delete") {
- withTempDir { tmp =>
- val tableName = generateTableName
- val tablePath = s"${tmp.getCanonicalPath}/$tableName"
- // Create table with date type partition
- spark.sql(
- s"""
- | create table $tableName using hudi
- | partitioned by (dt)
- | tblproperties(
- | primaryKey = 'id',
- | preCombineField = 'ts',
- | 'hoodie.metadata.index.partition.stats.enable' = 'true'
- | )
- | location '$tablePath'
- | AS
- | select 1 as id, 'a1' as name, 10 as price, 1000 as ts,
cast('2021-05-06' as date) as dt
+ Seq("cow", "mor").foreach { tableType =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ // Create table with date type partition
+ spark.sql(
+ s"""
+ | create table $tableName using hudi
+ | partitioned by (dt)
+ | tblproperties(
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | 'hoodie.metadata.index.partition.stats.enable' = 'true',
+ | 'hoodie.metadata.index.column.stats.column.list' = 'name'
+ | )
+ | location '$tablePath'
+ | AS
+ | select 1 as id, 'a1' as name, 10 as price, 1000 as ts,
cast('2021-05-06' as date) as dt
""".stripMargin
- )
+ )
+
+ assertResult(WriteOperationType.BULK_INSERT) {
+ HoodieSparkSqlTestBase.getLastCommitMetadata(spark,
tablePath).getOperationType
+ }
+ checkAnswer(s"select id, name, price, ts, cast(dt as string) from
$tableName")(
+ Seq(1, "a1", 10, 1000, "2021-05-06")
+ )
+
+ val partitionValue = "2021-05-06"
+
+ // Check the missing properties for spark sql
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(tablePath)
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .build()
+ val properties = metaClient.getTableConfig.getProps.asScala.toMap
+
assertResult(true)(properties.contains(HoodieTableConfig.CREATE_SCHEMA.key))
+ assertResult("dt")(properties(HoodieTableConfig.PARTITION_FIELDS.key))
+ assertResult("ts")(properties(HoodieTableConfig.PRECOMBINE_FIELD.key))
+ assertResult(tableName)(metaClient.getTableConfig.getTableName)
+ // Validate partition_stats index exists
+
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(PARTITION_STATS.getPartitionPath))
+
+ // Test insert into
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000,
cast('$partitionValue' as date))")
+ checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id,
name, price, ts, cast(dt as string) from $tableName order by id")(
+ Seq("1", s"dt=$partitionValue", 1, "a1", 10, 1000, partitionValue),
+ Seq("2", s"dt=$partitionValue", 2, "a2", 10, 1000, partitionValue)
+ )
+ // Test merge into
+ spark.sql(
+ s"""
+ |merge into $tableName h0
+ |using (select 1 as id, 'a1' as name, 11 as price, 1001 as ts,
'$partitionValue' as dt) s0
+ |on h0.id = s0.id
+ |when matched then update set *
+ |""".stripMargin)
+ checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id,
name, price, ts, cast(dt as string) from $tableName order by id")(
+ Seq("1", s"dt=$partitionValue", 1, "a1", 11, 1001, partitionValue),
+ Seq("2", s"dt=$partitionValue", 2, "a2", 10, 1000, partitionValue)
+ )
+ // Test update
+ spark.sql(s"update $tableName set price = price + 1 where id = 2")
+ checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id,
name, price, ts, cast(dt as string) from $tableName order by id")(
+ Seq("1", s"dt=$partitionValue", 1, "a1", 11, 1001, partitionValue),
+ Seq("2", s"dt=$partitionValue", 2, "a2", 11, 1000, partitionValue)
+ )
+ // Test delete
+ spark.sql(s"delete from $tableName where id = 1")
+ checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id,
name, price, ts, cast(dt as string) from $tableName order by id")(
+ Seq("2", s"dt=$partitionValue", 2, "a2", 11, 1000, partitionValue)
+ )
+ }
+ }
+ }
+
+ test("Test partition stats index on string type field with insert and file
pruning") {
+ Seq("cow", "mor").foreach { tableType =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ // Create table with date type partition
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | ts BIGINT,
+ | uuid STRING,
+ | rider STRING,
+ | driver STRING,
+ | city STRING,
+ | state STRING
+ |) using hudi
+ | options(
+ | type = '$tableType',
+ | primaryKey ='uuid',
+ | preCombineField = 'ts',
+ | hoodie.metadata.index.partition.stats.enable = 'true',
+ | hoodie.metadata.index.column.stats.column.list = 'rider'
+ |)
+ |PARTITIONED BY (state)
+ |location '$tablePath'
+ """.stripMargin
+ )
+ // set small file limit to 0 so that each insert creates a new file
+ spark.sql("set hoodie.parquet.small.file.limit=0")
+ // insert data in below pattern so that multiple records for 'texas'
and 'california' partition are in same file
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | values
(1695159649,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K','san_francisco','california'),
(1695091554,'e96c4396-3fad-413a-a942-4cb36106d721','rider-F','driver-M','sunnyvale','california')
+ | """.stripMargin
+ )
+ spark.sql(s"INSERT INTO $tableName VALUES
(1695332066,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-B','driver-L','new
york city','new york')")
+ spark.sql(s"INSERT INTO $tableName VALUES
(1695516137,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-D','driver-M','princeton','new
jersey')")
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | values
(1695516137,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-C','driver-P','houston','texas'),
(1695332066,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O','austin','texas')
+ | """.stripMargin
+ )
+
+ // Validate partition_stats index exists
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(tablePath)
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .build()
+ assertResult(tableName)(metaClient.getTableConfig.getTableName)
+
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(PARTITION_STATS.getPartitionPath))
+
+ // Test pruning
+ spark.sql("set hoodie.metadata.enable=true")
+ spark.sql("set hoodie.enable.data.skipping=true")
+ spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+ checkAnswer(s"select uuid, rider, city, state from $tableName where
rider > 'rider-D'")(
+ Seq("1dced545-862b-4ceb-8b43-d2a568f6616b", "rider-E", "austin",
"texas"),
+ Seq("e96c4396-3fad-413a-a942-4cb36106d721", "rider-F", "sunnyvale",
"california")
+ )
+
+ verifyFilePruning(
+ Map(
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
+ HoodieMetadataConfig.ENABLE.key -> "true"),
+ GreaterThan(AttributeReference("rider", StringType)(),
Literal("rider-D")),
+ HoodieTableMetaClient.reload(metaClient),
+ isDataSkippingExpected = true)
+
+ // Test predicate that does not match any partition, should scan no
files
+ checkAnswer(s"select uuid, rider, city, state from $tableName where
rider > 'rider-Z'")()
+ verifyFilePruning(
+ Map(
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
+ HoodieMetadataConfig.ENABLE.key -> "true"),
+ GreaterThan(AttributeReference("rider", StringType)(),
Literal("rider-Z")),
+ HoodieTableMetaClient.reload(metaClient),
+ isDataSkippingExpected = true,
+ isNoScanExpected = true)
+ // Test predicate that matches all partitions, will end up scanning
all partitions
+ checkAnswer(s"select uuid, rider, city, state from $tableName where
rider < 'rider-Z'")(
+ Seq("334e26e9-8355-45cc-97c6-c31daf0df330", "rider-A",
"san_francisco", "california"),
+ Seq("7a84095f-737f-40bc-b62f-6b69664712d2", "rider-B", "new york
city", "new york"),
+ Seq("e3cf430c-889d-4015-bc98-59bdce1e530c", "rider-C", "houston",
"texas"),
+ Seq("3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04", "rider-D", "princeton",
"new jersey"),
+ Seq("1dced545-862b-4ceb-8b43-d2a568f6616b", "rider-E", "austin",
"texas"),
+ Seq("e96c4396-3fad-413a-a942-4cb36106d721", "rider-F", "sunnyvale",
"california")
+ )
+
+ verifyFilePruning(
+ Map(
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
+ HoodieMetadataConfig.ENABLE.key -> "true"),
+ LessThan(AttributeReference("rider", StringType)(),
Literal("rider-Z")),
+ HoodieTableMetaClient.reload(metaClient),
+ isDataSkippingExpected = false)
+ }
+ }
+ }
+
+ test(s"Test partition stats index on int type field with update and file
pruning") {
+ Seq("cow", "mor").foreach { tableType =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | ts long,
+ | price int
+ |) using hudi
+ |partitioned by (ts)
+ |tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'price',
+ | hoodie.metadata.index.partition.stats.enable = 'true',
+ | hoodie.metadata.index.column.stats.column.list = 'price'
+ |)
+ |location '$tablePath'
+ |""".stripMargin
+ )
+
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | values (1, 'a1', 1000, 10), (2, 'a2', 2000, 20), (3, 'a3',
3000, 30), (4, 'a4', 2000, 10), (5, 'a5', 3000, 20), (6, 'a6', 4000, 30)
+ | """.stripMargin
+ )
+
+ // Validate partition_stats index exists
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(tablePath)
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .build()
+ assertResult(tableName)(metaClient.getTableConfig.getTableName)
+
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(PARTITION_STATS.getPartitionPath))
+
+ spark.sql("set hoodie.metadata.enable=true")
+ spark.sql("set hoodie.enable.data.skipping=true")
+ spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+ checkAnswer(s"select id, name, price, ts from $tableName where
price>3000")(
+ Seq(6, "a6", 4000, 30)
+ )
- assertResult(WriteOperationType.BULK_INSERT) {
- HoodieSparkSqlTestBase.getLastCommitMetadata(spark,
tablePath).getOperationType
+ // Test price update, assert latest value and ensure file pruning
+ spark.sql(s"update $tableName set price = price + 1 where id = 6")
+ checkAnswer(s"select id, name, price, ts from $tableName where
price>3000")(
+ Seq(6, "a6", 4001, 30)
+ )
+
+ verifyFilePruning(
+ Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
HoodieMetadataConfig.ENABLE.key -> "true"),
+ GreaterThan(AttributeReference("price", IntegerType)(),
Literal(3000)),
+ HoodieTableMetaClient.reload(metaClient),
+ isDataSkippingExpected = true)
}
- checkAnswer(s"select id, name, price, ts, cast(dt as string) from
$tableName")(
- Seq(1, "a1", 10, 1000, "2021-05-06")
- )
-
- val partitionValue = "2021-05-06"
-
- // Check the missing properties for spark sql
- val metaClient = HoodieTableMetaClient.builder()
- .setBasePath(tablePath)
- .setConf(HoodieTestUtils.getDefaultStorageConf)
- .build()
- val properties = metaClient.getTableConfig.getProps.asScala.toMap
-
assertResult(true)(properties.contains(HoodieTableConfig.CREATE_SCHEMA.key))
- assertResult("dt")(properties(HoodieTableConfig.PARTITION_FIELDS.key))
- assertResult("ts")(properties(HoodieTableConfig.PRECOMBINE_FIELD.key))
- assertResult(tableName)(metaClient.getTableConfig.getTableName)
-
- // Test insert into
- spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000,
cast('$partitionValue' as date))")
- checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id,
name, price, ts, cast(dt as string) from $tableName order by id")(
- Seq("1", s"dt=$partitionValue", 1, "a1", 10, 1000, partitionValue),
- Seq("2", s"dt=$partitionValue", 2, "a2", 10, 1000, partitionValue)
- )
- // Test merge into
- spark.sql(
- s"""
- |merge into $tableName h0
- |using (select 1 as id, 'a1' as name, 11 as price, 1001 as ts,
'$partitionValue' as dt) s0
- |on h0.id = s0.id
- |when matched then update set *
- |""".stripMargin)
- checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id,
name, price, ts, cast(dt as string) from $tableName order by id")(
- Seq("1", s"dt=$partitionValue", 1, "a1", 11, 1001, partitionValue),
- Seq("2", s"dt=$partitionValue", 2, "a2", 10, 1000, partitionValue)
- )
- // Test update
- spark.sql(s"update $tableName set price = price + 1 where id = 2")
- checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id,
name, price, ts, cast(dt as string) from $tableName order by id")(
- Seq("1", s"dt=$partitionValue", 1, "a1", 11, 1001, partitionValue),
- Seq("2", s"dt=$partitionValue", 2, "a2", 11, 1000, partitionValue)
- )
- // Test delete
- spark.sql(s"delete from $tableName where id = 1")
- checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id,
name, price, ts, cast(dt as string) from $tableName order by id")(
- Seq("2", s"dt=$partitionValue", 2, "a2", 11, 1000, partitionValue)
- )
}
}
+
+ private def verifyFilePruning(opts: Map[String, String], dataFilter:
Expression, metaClient: HoodieTableMetaClient, isDataSkippingExpected: Boolean,
isNoScanExpected: Boolean = false): Unit = {
+ // with data skipping
+ val commonOpts = opts + ("path" -> metaClient.getBasePath.toString)
+ var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts,
includeLogFiles = true)
+ try {
+ val filteredPartitionDirectories = fileIndex.listFiles(Seq(),
Seq(dataFilter))
+ val filteredFilesCount = filteredPartitionDirectories.flatMap(s =>
s.files).size
+ val latestDataFilesCount = getLatestDataFilesCount(metaClient =
metaClient)
+ if (isDataSkippingExpected) {
+ assertTrue(filteredFilesCount < latestDataFilesCount)
+ if (isNoScanExpected) {
+ assertTrue(filteredFilesCount == 0)
+ }
+ } else {
+ assertTrue(filteredFilesCount == latestDataFilesCount)
+ }
+
+ // with no data skipping
+ fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts +
(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "false"), includeLogFiles =
true)
+ val filesCountWithNoSkipping = fileIndex.listFiles(Seq(),
Seq(dataFilter)).flatMap(s => s.files).size
+ assertTrue(filesCountWithNoSkipping == latestDataFilesCount)
+ } finally {
+ fileIndex.close()
+ }
+ }
+
+ private def getLatestDataFilesCount(includeLogFiles: Boolean = true,
metaClient: HoodieTableMetaClient) = {
+ var totalLatestDataFiles = 0L
+ val fsView: HoodieMetadataFileSystemView =
getTableFileSystemView(metaClient)
+ try {
+
fsView.getAllLatestFileSlicesBeforeOrOn(metaClient.getActiveTimeline.lastInstant().get().getTimestamp)
+ .values()
+ .forEach(JFunction.toJavaConsumer[java.util.stream.Stream[FileSlice]]
+ (slices => slices.forEach(JFunction.toJavaConsumer[FileSlice](
+ slice => totalLatestDataFiles += (if (includeLogFiles)
slice.getLogFiles.count() else 0)
+ + (if (slice.getBaseFile.isPresent) 1 else 0)))))
+ } finally {
+ fsView.close()
+ }
+ totalLatestDataFiles
+ }
+
+ private def getTableFileSystemView(metaClient: HoodieTableMetaClient):
HoodieMetadataFileSystemView = {
+ new HoodieMetadataFileSystemView(
+ new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)),
+ metaClient,
+ metaClient.getActiveTimeline,
+
HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexPartitionStats(true).build())
+ }
+
}