This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 976840e8eb [HUDI-3812] Fixing Data Skipping configuration to respect Metadata Table configs (#5244) 976840e8eb is described below commit 976840e8eb5fee9bfb2ed029e65aec5b7033faf2 Author: Alexey Kudinkin <ale...@infinilake.com> AuthorDate: Sun Apr 10 10:43:47 2022 -0700 [HUDI-3812] Fixing Data Skipping configuration to respect Metadata Table configs (#5244) Addressing the problem of Data Skipping not respecting Metadata Table configs which might differ b/w write/read paths. More details could be found in HUDI-3812. - Fixing Data Skipping configuration to respect MT configs (on the Read path) - Tightening up DS handling of cases when no top-level columns are in the target query - Enhancing tests to cover all possible case --- .../org/apache/hudi/ColumnStatsIndexSupport.scala | 5 +- .../scala/org/apache/hudi/HoodieFileIndex.scala | 47 +++++++++---- .../org/apache/hudi/TestHoodieFileIndex.scala | 81 ++++++++++++++-------- .../hudi/functional/TestColumnStatsIndex.scala | 67 +++++++++--------- 4 files changed, 122 insertions(+), 78 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala index 7439323412..4ee5a6005e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala @@ -68,7 +68,7 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport { if (targetColumns.nonEmpty) { readColumnStatsIndexForColumnsInternal(spark, targetColumns, metadataConfig, tableBasePath) } else { - readFullColumnStatsIndexInternal(spark, tableBasePath) + readFullColumnStatsIndexInternal(spark, metadataConfig, tableBasePath) } } @@ -181,10 +181,11 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport { spark.createDataFrame(transposedRDD, indexSchema) } - private def readFullColumnStatsIndexInternal(spark: SparkSession, tableBasePath: String) = { + private def readFullColumnStatsIndexInternal(spark: SparkSession, metadataConfig: HoodieMetadataConfig, tableBasePath: String): DataFrame = { val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(tableBasePath) // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]] spark.read.format("org.apache.hudi") + .options(metadataConfig.getProps.asScala) .load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}") } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 0ea4d1cef2..08d0d722b2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -85,11 +85,6 @@ case class HoodieFileIndex(spark: SparkSession, override def rootPaths: Seq[Path] = queryPaths.asScala - def isDataSkippingEnabled: Boolean = { - options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), - spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean - } - /** * Returns the FileStatus for all the base files (excluding log files). This should be used only for * cases where Spark directly fetches the list of files via HoodieFileIndex or for read optimized query logic @@ -196,12 +191,20 @@ case class HoodieFileIndex(spark: SparkSession, * @return list of pruned (data-skipped) candidate base-files' names */ private def lookupCandidateFilesInMetadataTable(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try { - if (!isDataSkippingEnabled || queryFilters.isEmpty || !HoodieTableMetadataUtil.getCompletedMetadataPartitions(metaClient.getTableConfig) - .contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)) { + // NOTE: Data Skipping is only effective when it references columns that are indexed w/in + // the Column Stats Index (CSI). Following cases could not be effectively handled by Data Skipping: + // - Expressions on top-level column's fields (ie, for ex filters like "struct.field > 0", since + // CSI only contains stats for top-level columns, in this case for "struct") + // - Any expression not directly referencing top-level column (for ex, sub-queries, since there's + // nothing CSI in particular could be applied for) + lazy val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema) + + if (!isMetadataTableEnabled || !isColumnStatsIndexAvailable || !isDataSkippingEnabled) { + validateConfig() + Option.empty + } else if (queryFilters.isEmpty || queryReferencedColumns.isEmpty) { Option.empty } else { - val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema) - val colStatsDF: DataFrame = readColumnStatsIndex(spark, basePath, metadataConfig, queryReferencedColumns) // Persist DF to avoid re-computing column statistics unraveling @@ -245,13 +248,27 @@ case class HoodieFileIndex(spark: SparkSession, override def refresh(): Unit = super.refresh() - override def inputFiles: Array[String] = { - val fileStatusList = allFiles - fileStatusList.map(_.getPath.toString).toArray - } + override def inputFiles: Array[String] = + allFiles.map(_.getPath.toString).toArray - override def sizeInBytes: Long = { - cachedFileSize + override def sizeInBytes: Long = cachedFileSize + + private def isColumnStatsIndexAvailable = + HoodieTableMetadataUtil.getCompletedMetadataPartitions(metaClient.getTableConfig) + .contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) + + private def isDataSkippingEnabled: Boolean = + options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), + spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean + + private def isMetadataTableEnabled: Boolean = metadataConfig.enabled() + private def isColumnStatsIndexEnabled: Boolean = metadataConfig.isColumnStatsIndexEnabled + + private def validateConfig(): Unit = { + if (isDataSkippingEnabled && (!isMetadataTableEnabled || !isColumnStatsIndexEnabled)) { + logWarning("Data skipping requires both Metadata Table and Column Stats Index to be enabled as well! " + + s"(isMetadataTableEnabled = ${isMetadataTableEnabled}, isColumnStatsIndexEnabled = ${isColumnStatsIndexEnabled}") + } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index feed6fd334..1d4dbfb1ea 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -25,10 +25,9 @@ import org.apache.hudi.client.HoodieJavaWriteClient import org.apache.hudi.client.common.HoodieJavaEngineContext import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.engine.EngineType -import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.{HoodieRecord, HoodieTableQueryType, HoodieTableType} -import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType} import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestUtils} @@ -38,17 +37,15 @@ import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig} import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.TimestampType import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config -import org.apache.hudi.metadata.{HoodieTableMetadata, MetadataPartitionType} -import org.apache.hudi.testutils.{HoodieClientTestBase, SparkClientFunctionalTestHarness} +import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, GreaterThanOrEqual, LessThan, Literal} import org.apache.spark.sql.execution.datasources.{NoopCache, PartitionDirectory} import org.apache.spark.sql.functions.{lit, struct} import org.apache.spark.sql.types.{IntegerType, StringType} import org.apache.spark.sql.{DataFrameWriter, Row, SaveMode, SparkSession} import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.{BeforeEach, Tag, Test} +import org.junit.jupiter.api.{BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, CsvSource, MethodSource, ValueSource} @@ -343,16 +340,19 @@ class TestHoodieFileIndex extends HoodieClientTestBase { import _spark.implicits._ val inputDF = tuples.toDF("id", "inv_id", "str", "rand") + val writeMetadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + val opts = Map( "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4", HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", RECORDKEY_FIELD.key -> "id", PRECOMBINE_FIELD.key -> "id", - HoodieMetadataConfig.ENABLE.key -> "true", - HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" - ) + ) ++ writeMetadataOpts // If there are any failures in the Data Skipping flow, test should fail spark.sqlContext.setConf(DataSkippingFailureMode.configName, DataSkippingFailureMode.Strict.value); @@ -368,26 +368,46 @@ class TestHoodieFileIndex extends HoodieClientTestBase { metaClient = HoodieTableMetaClient.reload(metaClient) - val props = Map[String, String]( - "path" -> basePath, - QUERY_TYPE.key -> QUERY_TYPE_SNAPSHOT_OPT_VAL, - DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", - // NOTE: Metadata Table has to be enabled on the read path as well - HoodieMetadataConfig.ENABLE.key -> "true", - HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" - ) - - val fileIndex = HoodieFileIndex(spark, metaClient, Option.empty, props, NoopCache) - - val allFilesPartitions = fileIndex.listFiles(Seq(), Seq()) - assertEquals(10, allFilesPartitions.head.files.length) - - // We're selecting a single file that contains "id" == 1 row, which there should be - // strictly 1. Given that 1 is minimal possible value, Data Skipping should be able to - // truncate search space to just a single file - val dataFilter = EqualTo(AttributeReference("id", IntegerType, nullable = false)(), Literal(1)) - val filteredPartitions = fileIndex.listFiles(Seq(), Seq(dataFilter)) - assertEquals(1, filteredPartitions.head.files.length) + case class TestCase(enableMetadata: Boolean, + enableColumnStats: Boolean, + enableDataSkipping: Boolean) + + val testCases: Seq[TestCase] = + TestCase(enableMetadata = false, enableColumnStats = false, enableDataSkipping = false) :: + TestCase(enableMetadata = false, enableColumnStats = false, enableDataSkipping = true) :: + TestCase(enableMetadata = true, enableColumnStats = false, enableDataSkipping = true) :: + TestCase(enableMetadata = false, enableColumnStats = true, enableDataSkipping = true) :: + TestCase(enableMetadata = true, enableColumnStats = true, enableDataSkipping = true) :: + Nil + + for (testCase <- testCases) { + val readMetadataOpts = Map( + // NOTE: Metadata Table has to be enabled on the read path as well + HoodieMetadataConfig.ENABLE.key -> testCase.enableMetadata.toString, + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> testCase.enableColumnStats.toString, + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" + ) + + val props = Map[String, String]( + "path" -> basePath, + QUERY_TYPE.key -> QUERY_TYPE_SNAPSHOT_OPT_VAL, + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> testCase.enableDataSkipping.toString + ) ++ readMetadataOpts + + val fileIndex = HoodieFileIndex(spark, metaClient, Option.empty, props, NoopCache) + + val allFilesPartitions = fileIndex.listFiles(Seq(), Seq()) + assertEquals(10, allFilesPartitions.head.files.length) + + if (testCase.enableDataSkipping && testCase.enableMetadata) { + // We're selecting a single file that contains "id" == 1 row, which there should be + // strictly 1. Given that 1 is minimal possible value, Data Skipping should be able to + // truncate search space to just a single file + val dataFilter = EqualTo(AttributeReference("id", IntegerType, nullable = false)(), Literal(1)) + val filteredPartitions = fileIndex.listFiles(Seq(), Seq(dataFilter)) + assertEquals(1, filteredPartitions.head.files.length) + } + } } private def attribute(partition: String): AttributeReference = { @@ -411,6 +431,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase { } object TestHoodieFileIndex { + def keyGeneratorParameters(): java.util.stream.Stream[Arguments] = { java.util.stream.Stream.of( Arguments.arguments(null.asInstanceOf[String]), diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala index e3cde53951..841041e40c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala @@ -19,7 +19,7 @@ package org.apache.hudi.functional import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path} +import org.apache.hadoop.fs.{LocatedFileStatus, Path} import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, RECORDKEY_FIELD} import org.apache.hudi.HoodieConversionUtils.toProperties @@ -27,6 +27,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.ParquetUtils import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig} +import org.apache.hudi.functional.TestColumnStatsIndex.ColumnStatsTestCase import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions} import org.apache.spark.sql._ @@ -35,7 +36,7 @@ import org.apache.spark.sql.types._ import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue} import org.junit.jupiter.api._ import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource +import org.junit.jupiter.params.provider.{Arguments, MethodSource} import java.math.BigInteger import java.sql.{Date, Timestamp} @@ -72,19 +73,25 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup } @ParameterizedTest - @ValueSource(booleans = Array(true, false)) - def testMetadataColumnStatsIndex(forceFullLogScan: Boolean): Unit = { + @MethodSource(Array("testMetadataColumnStatsIndexParams")) + def testMetadataColumnStatsIndex(testCase: ColumnStatsTestCase): Unit = { + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + ) + val opts = Map( "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4", HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", RECORDKEY_FIELD.key -> "c1", PRECOMBINE_FIELD.key -> "c1", - HoodieMetadataConfig.ENABLE.key -> "true", - HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", - HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.key -> forceFullLogScan.toString, + // NOTE: Currently only this setting is used like following by different MT partitions: + // - Files: using it + // - Column Stats: NOT using it (defaults to doing "point-lookups") + HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.key -> testCase.forceFullLogScan.toString, HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" - ) + ) ++ metadataOpts setTableName("hoodie_test") initMetaClient() @@ -108,10 +115,17 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup metaClient = HoodieTableMetaClient.reload(metaClient) val metadataConfig = HoodieMetadataConfig.newBuilder() - .fromProperties(toProperties(opts)) + .fromProperties(toProperties(metadataOpts)) .build() - val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, sourceTableSchema.fieldNames) + val targetColumnsToRead: Seq[String] = { + // Providing empty seq of columns to [[readColumnStatsIndex]] will lead to the whole + // MT to be read, and subsequently filtered + if (testCase.readFullMetadataTable) Seq.empty + else sourceTableSchema.fieldNames + } + + val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, targetColumnsToRead) val transposedColStatsDF = transposeColumnStatsIndex(spark, colStatsDF, sourceTableSchema.fieldNames, sourceTableSchema) val expectedColStatsSchema = composeIndexSchema(sourceTableSchema.fieldNames, sourceTableSchema) @@ -151,7 +165,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup metaClient = HoodieTableMetaClient.reload(metaClient) - val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, sourceTableSchema.fieldNames) + val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, targetColumnsToRead) val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark, updatedColStatsDF, sourceTableSchema.fieldNames, sourceTableSchema) val expectedColStatsIndexUpdatedDF = @@ -243,26 +257,6 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup ) } - def bootstrapParquetInputTableFromJSON(sourceJSONTablePath: String, targetParquetTablePath: String): Unit = { - val jsonInputDF = - // NOTE: Schema here is provided for validation that the input date is in the appropriate format - spark.read - .schema(sourceTableSchema) - .json(sourceJSONTablePath) - - jsonInputDF - .sort("c1") - .repartition(4, new Column("c1")) - .write - .format("parquet") - .mode("overwrite") - .save(targetParquetTablePath) - - val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) - // Have to cleanup additional artefacts of Spark write - fs.delete(new Path(targetParquetTablePath, "_SUCCESS"), false) - } - private def generateRandomDataFrame(spark: SparkSession): DataFrame = { val sourceTableSchema = new StructType() @@ -316,3 +310,14 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup } } + +object TestColumnStatsIndex { + + case class ColumnStatsTestCase(forceFullLogScan: Boolean, readFullMetadataTable: Boolean) + + def testMetadataColumnStatsIndexParams: java.util.stream.Stream[Arguments] = + java.util.stream.Stream.of( + Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = false, readFullMetadataTable = false)), + Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = true, readFullMetadataTable = true)) + ) +}