This is an automated email from the ASF dual-hosted git repository.

mengtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d046f9  [HUDI-3008] Fixing HoodieFileIndex partition column parsing 
for nested fields
     new b5890cd  Merge pull request #4308 from harsh1231/HUDI-3008
7d046f9 is described below

commit 7d046f914a059b2623d7f2a7627c44b15ccc0ddb
Author: harshal patil <harshal.j.pa...@gmail.com>
AuthorDate: Tue Dec 14 17:28:18 2021 +0530

    [HUDI-3008] Fixing HoodieFileIndex partition column parsing for nested 
fields
---
 .../scala/org/apache/hudi/HoodieFileIndex.scala    | 25 ++++++++++++++++----
 .../org/apache/hudi/TestHoodieFileIndex.scala      | 27 ++++++++++++++++++++--
 2 files changed, 46 insertions(+), 6 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index f9b68cb..0ed1b48 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -18,7 +18,6 @@
 package org.apache.hudi
 
 import org.apache.hadoop.fs.{FileStatus, Path}
-
 import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, 
QUERY_TYPE_SNAPSHOT_OPT_VAL}
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.config.HoodieMetadataConfig
@@ -27,7 +26,6 @@ import org.apache.hudi.common.model.FileSlice
 import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ
 import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, 
HoodieTableFileSystemView}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
-
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
@@ -37,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.{FileIndex, 
FileStatusCache, N
 import 
org.apache.spark.sql.hudi.DataSkippingUtils.createColumnStatsIndexFilterExpr
 import org.apache.spark.sql.hudi.HoodieSqlUtils
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.sql.{AnalysisException, Column, SparkSession}
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -108,7 +106,7 @@ case class HoodieFileIndex(
   private lazy val _partitionSchemaFromProperties: StructType = {
     val tableConfig = metaClient.getTableConfig
     val partitionColumns = tableConfig.getPartitionFields
-    val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+    val nameFieldMap = generateNameFieldMap(Right(schema))
 
     if (partitionColumns.isPresent) {
       val partitionFields = partitionColumns.get().map(column =>
@@ -123,6 +121,25 @@ case class HoodieFileIndex(
     }
   }
 
+  /**
+   * This method traverses StructType recursively to build map of columnName 
-> StructField
+   * Note : If there is nesting of columns like ["a.b.c.d", "a.b.c.e"]  -> 
final map will have keys corresponding
+   * only to ["a.b.c.d", "a.b.c.e"] and not for subsets like ["a.b.c", "a.b"]
+   * @param structField
+   * @return map of ( columns names -> StructField )
+   */
+  private def generateNameFieldMap(structField: Either[StructField, 
StructType]) : Map[String, StructField] = {
+    structField match {
+      case Right(field) => field.fields.map(f => 
generateNameFieldMap(Left(f))).flatten.toMap
+      case Left(field) => field.dataType match {
+        case struct: StructType => generateNameFieldMap(Right(struct)).map {
+          case (key: String, sf: StructField)  => (field.name + "." + key, sf)
+        }
+        case _ => Map(field.name -> field)
+      }
+    }
+  }
+
   private lazy val engineContext = new HoodieSparkEngineContext(new 
JavaSparkContext(spark.sparkContext))
 
   private lazy val configProperties = {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index 0c3918b..62f98cf 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -18,7 +18,6 @@
 package org.apache.hudi
 
 import java.util.Properties
-
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -31,6 +30,7 @@ import org.apache.hudi.keygen.ComplexKeyGenerator
 import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.{Config, 
TimestampType}
 import org.apache.hudi.testutils.HoodieClientTestBase
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.functions.{lit, struct}
 import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
EqualTo, GreaterThanOrEqual, LessThan, Literal}
 import org.apache.spark.sql.execution.datasources.PartitionDirectory
 import org.apache.spark.sql.types.StringType
@@ -38,7 +38,7 @@ import org.apache.spark.sql.{SaveMode, SparkSession}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.BeforeEach
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -253,6 +253,29 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
     assertEquals(5, readDF2.filter("dt = '2021/03/01' and hh ='10'").count())
   }
 
+  @ParameterizedTest
+  @CsvSource(Array("true,a.b.c","false,a.b.c","true,c","false,c"))
+  def testQueryPartitionPathsForNestedPartition(useMetaFileList:Boolean, 
partitionBy:String): Unit = {
+    val inputDF = spark.range(100)
+      .withColumn("c",lit("c"))
+      .withColumn("b",struct("c"))
+      .withColumn("a",struct("b"))
+    inputDF.write.format("hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(RECORDKEY_FIELD.key, "id")
+      .option(PRECOMBINE_FIELD.key, "id")
+      .option(PARTITIONPATH_FIELD.key, partitionBy)
+      .option(HoodieMetadataConfig.ENABLE.key(), useMetaFileList)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    val fileIndex = HoodieFileIndex(spark, metaClient, None,
+      queryOpts ++ Map(HoodieMetadataConfig.ENABLE.key -> 
useMetaFileList.toString))
+    // test if table is partitioned on nested columns, 
getAllQueryPartitionPaths does not break
+    
assert(fileIndex.getAllQueryPartitionPaths.get(0).partitionPath.equals("c"))
+  }
+
   private def attribute(partition: String): AttributeReference = {
     AttributeReference(partition, StringType, true)()
   }

Reply via email to