This is an automated email from the ASF dual-hosted git repository. mengtao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 7d046f9 [HUDI-3008] Fixing HoodieFileIndex partition column parsing for nested fields new b5890cd Merge pull request #4308 from harsh1231/HUDI-3008 7d046f9 is described below commit 7d046f914a059b2623d7f2a7627c44b15ccc0ddb Author: harshal patil <harshal.j.pa...@gmail.com> AuthorDate: Tue Dec 14 17:28:18 2021 +0530 [HUDI-3008] Fixing HoodieFileIndex partition column parsing for nested fields --- .../scala/org/apache/hudi/HoodieFileIndex.scala | 25 ++++++++++++++++---- .../org/apache/hudi/TestHoodieFileIndex.scala | 27 ++++++++++++++++++++-- 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index f9b68cb..0ed1b48 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -18,7 +18,6 @@ package org.apache.hudi import org.apache.hadoop.fs.{FileStatus, Path} - import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL} import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.HoodieMetadataConfig @@ -27,7 +26,6 @@ import org.apache.hudi.common.model.FileSlice import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} - import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate} @@ -37,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, N import org.apache.spark.sql.hudi.DataSkippingUtils.createColumnStatsIndexFilterExpr import org.apache.spark.sql.hudi.HoodieSqlUtils import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{AnalysisException, Column, SparkSession} import org.apache.spark.unsafe.types.UTF8String @@ -108,7 +106,7 @@ case class HoodieFileIndex( private lazy val _partitionSchemaFromProperties: StructType = { val tableConfig = metaClient.getTableConfig val partitionColumns = tableConfig.getPartitionFields - val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap + val nameFieldMap = generateNameFieldMap(Right(schema)) if (partitionColumns.isPresent) { val partitionFields = partitionColumns.get().map(column => @@ -123,6 +121,25 @@ case class HoodieFileIndex( } } + /** + * This method traverses StructType recursively to build map of columnName -> StructField + * Note : If there is nesting of columns like ["a.b.c.d", "a.b.c.e"] -> final map will have keys corresponding + * only to ["a.b.c.d", "a.b.c.e"] and not for subsets like ["a.b.c", "a.b"] + * @param structField + * @return map of ( columns names -> StructField ) + */ + private def generateNameFieldMap(structField: Either[StructField, StructType]) : Map[String, StructField] = { + structField match { + case Right(field) => field.fields.map(f => generateNameFieldMap(Left(f))).flatten.toMap + case Left(field) => field.dataType match { + case struct: StructType => generateNameFieldMap(Right(struct)).map { + case (key: String, sf: StructField) => (field.name + "." + key, sf) + } + case _ => Map(field.name -> field) + } + } + } + private lazy val engineContext = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)) private lazy val configProperties = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index 0c3918b..62f98cf 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -18,7 +18,6 @@ package org.apache.hudi import java.util.Properties - import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.table.HoodieTableMetaClient @@ -31,6 +30,7 @@ import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.{Config, TimestampType} import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.functions.{lit, struct} import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, GreaterThanOrEqual, LessThan, Literal} import org.apache.spark.sql.execution.datasources.PartitionDirectory import org.apache.spark.sql.types.StringType @@ -38,7 +38,7 @@ import org.apache.spark.sql.{SaveMode, SparkSession} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource +import org.junit.jupiter.params.provider.{CsvSource, ValueSource} import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -253,6 +253,29 @@ class TestHoodieFileIndex extends HoodieClientTestBase { assertEquals(5, readDF2.filter("dt = '2021/03/01' and hh ='10'").count()) } + @ParameterizedTest + @CsvSource(Array("true,a.b.c","false,a.b.c","true,c","false,c")) + def testQueryPartitionPathsForNestedPartition(useMetaFileList:Boolean, partitionBy:String): Unit = { + val inputDF = spark.range(100) + .withColumn("c",lit("c")) + .withColumn("b",struct("c")) + .withColumn("a",struct("b")) + inputDF.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "id") + .option(PARTITIONPATH_FIELD.key, partitionBy) + .option(HoodieMetadataConfig.ENABLE.key(), useMetaFileList) + .mode(SaveMode.Overwrite) + .save(basePath) + metaClient = HoodieTableMetaClient.reload(metaClient) + val fileIndex = HoodieFileIndex(spark, metaClient, None, + queryOpts ++ Map(HoodieMetadataConfig.ENABLE.key -> useMetaFileList.toString)) + // test if table is partitioned on nested columns, getAllQueryPartitionPaths does not break + assert(fileIndex.getAllQueryPartitionPaths.get(0).partitionPath.equals("c")) + } + private def attribute(partition: String): AttributeReference = { AttributeReference(partition, StringType, true)() }