This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 976840e8eb [HUDI-3812] Fixing Data Skipping configuration to respect 
Metadata Table configs (#5244)
976840e8eb is described below

commit 976840e8eb5fee9bfb2ed029e65aec5b7033faf2
Author: Alexey Kudinkin <ale...@infinilake.com>
AuthorDate: Sun Apr 10 10:43:47 2022 -0700

    [HUDI-3812] Fixing Data Skipping configuration to respect Metadata Table 
configs (#5244)
    
    Addressing the problem of Data Skipping not respecting Metadata Table 
configs which might differ b/w write/read paths. More details could be found in 
HUDI-3812.
    
    - Fixing Data Skipping configuration to respect MT configs (on the Read 
path)
    - Tightening up DS handling of cases when no top-level columns are in the 
target query
    - Enhancing tests to cover all possible case
---
 .../org/apache/hudi/ColumnStatsIndexSupport.scala  |  5 +-
 .../scala/org/apache/hudi/HoodieFileIndex.scala    | 47 +++++++++----
 .../org/apache/hudi/TestHoodieFileIndex.scala      | 81 ++++++++++++++--------
 .../hudi/functional/TestColumnStatsIndex.scala     | 67 +++++++++---------
 4 files changed, 122 insertions(+), 78 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index 7439323412..4ee5a6005e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -68,7 +68,7 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport {
       if (targetColumns.nonEmpty) {
         readColumnStatsIndexForColumnsInternal(spark, targetColumns, 
metadataConfig, tableBasePath)
       } else {
-        readFullColumnStatsIndexInternal(spark, tableBasePath)
+        readFullColumnStatsIndexInternal(spark, metadataConfig, tableBasePath)
       }
     }
 
@@ -181,10 +181,11 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport 
{
     spark.createDataFrame(transposedRDD, indexSchema)
   }
 
-  private def readFullColumnStatsIndexInternal(spark: SparkSession, 
tableBasePath: String) = {
+  private def readFullColumnStatsIndexInternal(spark: SparkSession, 
metadataConfig: HoodieMetadataConfig, tableBasePath: String): DataFrame = {
     val metadataTablePath = 
HoodieTableMetadata.getMetadataTableBasePath(tableBasePath)
     // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
     spark.read.format("org.apache.hudi")
+      .options(metadataConfig.getProps.asScala)
       
.load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 0ea4d1cef2..08d0d722b2 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -85,11 +85,6 @@ case class HoodieFileIndex(spark: SparkSession,
 
   override def rootPaths: Seq[Path] = queryPaths.asScala
 
-  def isDataSkippingEnabled: Boolean = {
-    options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
-      
spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
 "false")).toBoolean
-  }
-
   /**
    * Returns the FileStatus for all the base files (excluding log files). This 
should be used only for
    * cases where Spark directly fetches the list of files via HoodieFileIndex 
or for read optimized query logic
@@ -196,12 +191,20 @@ case class HoodieFileIndex(spark: SparkSession,
    * @return list of pruned (data-skipped) candidate base-files' names
    */
   private def lookupCandidateFilesInMetadataTable(queryFilters: 
Seq[Expression]): Try[Option[Set[String]]] = Try {
-    if (!isDataSkippingEnabled || queryFilters.isEmpty || 
!HoodieTableMetadataUtil.getCompletedMetadataPartitions(metaClient.getTableConfig)
-      .contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)) {
+    // NOTE: Data Skipping is only effective when it references columns that 
are indexed w/in
+    //       the Column Stats Index (CSI). Following cases could not be 
effectively handled by Data Skipping:
+    //          - Expressions on top-level column's fields (ie, for ex filters 
like "struct.field > 0", since
+    //          CSI only contains stats for top-level columns, in this case 
for "struct")
+    //          - Any expression not directly referencing top-level column 
(for ex, sub-queries, since there's
+    //          nothing CSI in particular could be applied for)
+    lazy val queryReferencedColumns = collectReferencedColumns(spark, 
queryFilters, schema)
+
+    if (!isMetadataTableEnabled || !isColumnStatsIndexAvailable || 
!isDataSkippingEnabled) {
+      validateConfig()
+      Option.empty
+    } else if (queryFilters.isEmpty || queryReferencedColumns.isEmpty) {
       Option.empty
     } else {
-      val queryReferencedColumns = collectReferencedColumns(spark, 
queryFilters, schema)
-
       val colStatsDF: DataFrame = readColumnStatsIndex(spark, basePath, 
metadataConfig, queryReferencedColumns)
 
       // Persist DF to avoid re-computing column statistics unraveling
@@ -245,13 +248,27 @@ case class HoodieFileIndex(spark: SparkSession,
 
   override def refresh(): Unit = super.refresh()
 
-  override def inputFiles: Array[String] = {
-    val fileStatusList = allFiles
-    fileStatusList.map(_.getPath.toString).toArray
-  }
+  override def inputFiles: Array[String] =
+    allFiles.map(_.getPath.toString).toArray
 
-  override def sizeInBytes: Long = {
-    cachedFileSize
+  override def sizeInBytes: Long = cachedFileSize
+
+  private def isColumnStatsIndexAvailable =
+    
HoodieTableMetadataUtil.getCompletedMetadataPartitions(metaClient.getTableConfig)
+      .contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
+
+  private def isDataSkippingEnabled: Boolean =
+    options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
+      
spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
 "false")).toBoolean
+
+  private def isMetadataTableEnabled: Boolean = metadataConfig.enabled()
+  private def isColumnStatsIndexEnabled: Boolean = 
metadataConfig.isColumnStatsIndexEnabled
+
+  private def validateConfig(): Unit = {
+    if (isDataSkippingEnabled && (!isMetadataTableEnabled || 
!isColumnStatsIndexEnabled)) {
+      logWarning("Data skipping requires both Metadata Table and Column Stats 
Index to be enabled as well! " +
+        s"(isMetadataTableEnabled = ${isMetadataTableEnabled}, 
isColumnStatsIndexEnabled = ${isColumnStatsIndexEnabled}")
+    }
   }
 }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index feed6fd334..1d4dbfb1ea 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -25,10 +25,9 @@ import org.apache.hudi.client.HoodieJavaWriteClient
 import org.apache.hudi.client.common.HoodieJavaEngineContext
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.engine.EngineType
-import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{HoodieRecord, HoodieTableQueryType, 
HoodieTableType}
-import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, 
HoodieTestUtils}
@@ -38,17 +37,15 @@ import org.apache.hudi.config.{HoodieStorageConfig, 
HoodieWriteConfig}
 import org.apache.hudi.keygen.ComplexKeyGenerator
 import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.TimestampType
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
-import org.apache.hudi.metadata.{HoodieTableMetadata, MetadataPartitionType}
-import org.apache.hudi.testutils.{HoodieClientTestBase, 
SparkClientFunctionalTestHarness}
+import org.apache.hudi.testutils.HoodieClientTestBase
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
EqualTo, GreaterThanOrEqual, LessThan, Literal}
 import org.apache.spark.sql.execution.datasources.{NoopCache, 
PartitionDirectory}
 import org.apache.spark.sql.functions.{lit, struct}
 import org.apache.spark.sql.types.{IntegerType, StringType}
 import org.apache.spark.sql.{DataFrameWriter, Row, SaveMode, SparkSession}
 import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{BeforeEach, Tag, Test}
+import org.junit.jupiter.api.{BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{Arguments, CsvSource, MethodSource, 
ValueSource}
 
@@ -343,16 +340,19 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
     import _spark.implicits._
     val inputDF = tuples.toDF("id", "inv_id", "str", "rand")
 
+    val writeMetadataOpts = Map(
+      HoodieMetadataConfig.ENABLE.key -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+    )
+
     val opts = Map(
       "hoodie.insert.shuffle.parallelism" -> "4",
       "hoodie.upsert.shuffle.parallelism" -> "4",
       HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
       RECORDKEY_FIELD.key -> "id",
       PRECOMBINE_FIELD.key -> "id",
-      HoodieMetadataConfig.ENABLE.key -> "true",
-      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
       HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
-    )
+    ) ++ writeMetadataOpts
 
     // If there are any failures in the Data Skipping flow, test should fail
     spark.sqlContext.setConf(DataSkippingFailureMode.configName, 
DataSkippingFailureMode.Strict.value);
@@ -368,26 +368,46 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
 
     metaClient = HoodieTableMetaClient.reload(metaClient)
 
-    val props = Map[String, String](
-      "path" -> basePath,
-      QUERY_TYPE.key -> QUERY_TYPE_SNAPSHOT_OPT_VAL,
-      DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
-      // NOTE: Metadata Table has to be enabled on the read path as well
-      HoodieMetadataConfig.ENABLE.key -> "true",
-      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
-    )
-
-    val fileIndex = HoodieFileIndex(spark, metaClient, Option.empty, props, 
NoopCache)
-
-    val allFilesPartitions = fileIndex.listFiles(Seq(), Seq())
-    assertEquals(10, allFilesPartitions.head.files.length)
-
-    // We're selecting a single file that contains "id" == 1 row, which there 
should be
-    // strictly 1. Given that 1 is minimal possible value, Data Skipping 
should be able to
-    // truncate search space to just a single file
-    val dataFilter = EqualTo(AttributeReference("id", IntegerType, nullable = 
false)(), Literal(1))
-    val filteredPartitions = fileIndex.listFiles(Seq(), Seq(dataFilter))
-    assertEquals(1, filteredPartitions.head.files.length)
+    case class TestCase(enableMetadata: Boolean,
+                                    enableColumnStats: Boolean,
+                                    enableDataSkipping: Boolean)
+
+    val testCases: Seq[TestCase] =
+      TestCase(enableMetadata = false, enableColumnStats = false, 
enableDataSkipping = false) ::
+      TestCase(enableMetadata = false, enableColumnStats = false, 
enableDataSkipping = true) ::
+      TestCase(enableMetadata = true, enableColumnStats = false, 
enableDataSkipping = true) ::
+      TestCase(enableMetadata = false, enableColumnStats = true, 
enableDataSkipping = true) ::
+      TestCase(enableMetadata = true, enableColumnStats = true, 
enableDataSkipping = true) ::
+      Nil
+
+    for (testCase <- testCases) {
+      val readMetadataOpts = Map(
+        // NOTE: Metadata Table has to be enabled on the read path as well
+        HoodieMetadataConfig.ENABLE.key -> testCase.enableMetadata.toString,
+        HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> 
testCase.enableColumnStats.toString,
+        HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+      )
+
+      val props = Map[String, String](
+        "path" -> basePath,
+        QUERY_TYPE.key -> QUERY_TYPE_SNAPSHOT_OPT_VAL,
+        DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> 
testCase.enableDataSkipping.toString
+      ) ++ readMetadataOpts
+
+      val fileIndex = HoodieFileIndex(spark, metaClient, Option.empty, props, 
NoopCache)
+
+      val allFilesPartitions = fileIndex.listFiles(Seq(), Seq())
+      assertEquals(10, allFilesPartitions.head.files.length)
+
+      if (testCase.enableDataSkipping && testCase.enableMetadata) {
+        // We're selecting a single file that contains "id" == 1 row, which 
there should be
+        // strictly 1. Given that 1 is minimal possible value, Data Skipping 
should be able to
+        // truncate search space to just a single file
+        val dataFilter = EqualTo(AttributeReference("id", IntegerType, 
nullable = false)(), Literal(1))
+        val filteredPartitions = fileIndex.listFiles(Seq(), Seq(dataFilter))
+        assertEquals(1, filteredPartitions.head.files.length)
+      }
+    }
   }
 
   private def attribute(partition: String): AttributeReference = {
@@ -411,6 +431,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
 }
 
 object TestHoodieFileIndex {
+
   def keyGeneratorParameters(): java.util.stream.Stream[Arguments] = {
     java.util.stream.Stream.of(
       Arguments.arguments(null.asInstanceOf[String]),
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index e3cde53951..841041e40c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -19,7 +19,7 @@
 package org.apache.hudi.functional
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
+import org.apache.hadoop.fs.{LocatedFileStatus, Path}
 import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
 import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, 
RECORDKEY_FIELD}
 import org.apache.hudi.HoodieConversionUtils.toProperties
@@ -27,6 +27,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.util.ParquetUtils
 import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
+import org.apache.hudi.functional.TestColumnStatsIndex.ColumnStatsTestCase
 import org.apache.hudi.testutils.HoodieClientTestBase
 import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
 import org.apache.spark.sql._
@@ -35,7 +36,7 @@ import org.apache.spark.sql.types._
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, 
assertTrue}
 import org.junit.jupiter.api._
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
 
 import java.math.BigInteger
 import java.sql.{Date, Timestamp}
@@ -72,19 +73,25 @@ class TestColumnStatsIndex extends HoodieClientTestBase 
with ColumnStatsIndexSup
   }
 
   @ParameterizedTest
-  @ValueSource(booleans = Array(true, false))
-  def testMetadataColumnStatsIndex(forceFullLogScan: Boolean): Unit = {
+  @MethodSource(Array("testMetadataColumnStatsIndexParams"))
+  def testMetadataColumnStatsIndex(testCase: ColumnStatsTestCase): Unit = {
+    val metadataOpts = Map(
+      HoodieMetadataConfig.ENABLE.key -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+    )
+
     val opts = Map(
       "hoodie.insert.shuffle.parallelism" -> "4",
       "hoodie.upsert.shuffle.parallelism" -> "4",
       HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
       RECORDKEY_FIELD.key -> "c1",
       PRECOMBINE_FIELD.key -> "c1",
-      HoodieMetadataConfig.ENABLE.key -> "true",
-      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
-      HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.key -> 
forceFullLogScan.toString,
+      // NOTE: Currently only this setting is used like following by different 
MT partitions:
+      //          - Files: using it
+      //          - Column Stats: NOT using it (defaults to doing 
"point-lookups")
+      HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.key -> 
testCase.forceFullLogScan.toString,
       HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
-    )
+    ) ++ metadataOpts
 
     setTableName("hoodie_test")
     initMetaClient()
@@ -108,10 +115,17 @@ class TestColumnStatsIndex extends HoodieClientTestBase 
with ColumnStatsIndexSup
     metaClient = HoodieTableMetaClient.reload(metaClient)
 
     val metadataConfig = HoodieMetadataConfig.newBuilder()
-      .fromProperties(toProperties(opts))
+      .fromProperties(toProperties(metadataOpts))
       .build()
 
-    val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, 
sourceTableSchema.fieldNames)
+    val targetColumnsToRead: Seq[String] = {
+      // Providing empty seq of columns to [[readColumnStatsIndex]] will lead 
to the whole
+      // MT to be read, and subsequently filtered
+      if (testCase.readFullMetadataTable) Seq.empty
+      else sourceTableSchema.fieldNames
+    }
+
+    val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, 
targetColumnsToRead)
     val transposedColStatsDF = transposeColumnStatsIndex(spark, colStatsDF, 
sourceTableSchema.fieldNames, sourceTableSchema)
 
     val expectedColStatsSchema = 
composeIndexSchema(sourceTableSchema.fieldNames, sourceTableSchema)
@@ -151,7 +165,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase 
with ColumnStatsIndexSup
 
     metaClient = HoodieTableMetaClient.reload(metaClient)
 
-    val updatedColStatsDF = readColumnStatsIndex(spark, basePath, 
metadataConfig, sourceTableSchema.fieldNames)
+    val updatedColStatsDF = readColumnStatsIndex(spark, basePath, 
metadataConfig, targetColumnsToRead)
     val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark, 
updatedColStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)
 
     val expectedColStatsIndexUpdatedDF =
@@ -243,26 +257,6 @@ class TestColumnStatsIndex extends HoodieClientTestBase 
with ColumnStatsIndexSup
     )
   }
 
-  def bootstrapParquetInputTableFromJSON(sourceJSONTablePath: String, 
targetParquetTablePath: String): Unit = {
-    val jsonInputDF =
-    // NOTE: Schema here is provided for validation that the input date is in 
the appropriate format
-      spark.read
-        .schema(sourceTableSchema)
-        .json(sourceJSONTablePath)
-
-    jsonInputDF
-      .sort("c1")
-      .repartition(4, new Column("c1"))
-      .write
-      .format("parquet")
-      .mode("overwrite")
-      .save(targetParquetTablePath)
-
-    val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
-    // Have to cleanup additional artefacts of Spark write
-    fs.delete(new Path(targetParquetTablePath, "_SUCCESS"), false)
-  }
-
   private def generateRandomDataFrame(spark: SparkSession): DataFrame = {
     val sourceTableSchema =
       new StructType()
@@ -316,3 +310,14 @@ class TestColumnStatsIndex extends HoodieClientTestBase 
with ColumnStatsIndexSup
   }
 
 }
+
+object TestColumnStatsIndex {
+
+  case class ColumnStatsTestCase(forceFullLogScan: Boolean, 
readFullMetadataTable: Boolean)
+
+  def testMetadataColumnStatsIndexParams: java.util.stream.Stream[Arguments] =
+    java.util.stream.Stream.of(
+      Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = false, 
readFullMetadataTable = false)),
+      Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = true, 
readFullMetadataTable = true))
+    )
+}

Reply via email to