yihua commented on code in PR #18126:
URL: https://github.com/apache/hudi/pull/18126#discussion_r3250719239


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala:
##########
@@ -488,6 +534,76 @@ object SparkHoodieTableFileIndex extends 
SparkAdapterSupport {
   private val LOG = LoggerFactory.getLogger(classOf[SparkHoodieTableFileIndex])
   private val PUT_LEAF_FILES_METHOD_NAME = "putLeafFiles"
 
+  private case class NestedFieldNode(
+      leafType: Option[org.apache.spark.sql.types.DataType],
+      children: scala.collection.mutable.LinkedHashMap[String, NestedFieldNode]

Review Comment:
   Import `org.apache.spark.sql.types.DataType` and 
`scala.collection.mutable.LinkedHashMap`



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -2616,9 +2616,300 @@ class TestCOWDataSource extends 
HoodieSparkClientTestBase with ScalaAssertionSup
     assertEquals("row3", results(2).getAs[String]("_row_key"))
     assertEquals("value3", results(2).getAs[String]("data"))
   }
+
+  @Test
+  def testNestedFieldPartition(): Unit = {
+    TestCOWDataSource.runNestedFieldPartitionTest(spark, basePath, storage, 
"COW")
+  }
 }
 
 object TestCOWDataSource {
+
+  /**
+   * Shared test logic for nested field partition (COW and MOR).
+   * Used by TestCOWDataSource.testNestedFieldPartition and 
TestMORDataSource.testNestedFieldPartition.
+   */
+  def runNestedFieldPartitionTest(spark: SparkSession, basePath: String, 
storage: HoodieStorage, tableType: String): Unit = {
+    // Define schema with nested_record containing level field
+    val nestedSchema = StructType(Seq(
+      StructField("nested_int", IntegerType, nullable = false),
+      StructField("level", StringType, nullable = false)
+    ))
+
+    val schema = StructType(Seq(
+      StructField("key", StringType, nullable = false),
+      StructField("ts", LongType, nullable = false),
+      StructField("level", StringType, nullable = false),
+      StructField("int_field", IntegerType, nullable = false),
+      StructField("string_field", StringType, nullable = true),
+      StructField("nested_record", nestedSchema, nullable = true)
+    ))
+
+    // Create test data where top-level 'level' and 'nested_record.level' have 
DIFFERENT values
+    // This helps verify we're correctly partitioning/filtering on the nested 
field
+    val recordsCommit1 = Seq(
+      Row("key1", 1L, "L1", 1, "str1", Row(10, "INFO")),
+      Row("key2", 2L, "L2", 2, "str2", Row(20, "ERROR")),
+      Row("key3", 3L, "L3", 3, "str3", Row(30, "INFO")),
+      Row("key4", 4L, "L4", 4, "str4", Row(40, "DEBUG")),
+      Row("key5", 5L, "L5", 5, "str5", Row(50, "INFO"))
+    )
+
+    val tableTypeOptVal = if (tableType == "MOR") {
+      DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL
+    } else {
+      DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL
+    }
+
+    val baseWriteOpts = Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "key",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "nested_record.level",
+      HoodieTableConfig.ORDERING_FIELDS.key -> "ts",
+      HoodieWriteConfig.TBL_NAME.key -> "test_nested_partition",
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableTypeOptVal
+    )
+    val writeOpts = if (tableType == "MOR") {
+      baseWriteOpts + ("hoodie.compact.inline" -> "false")
+    } else {
+      baseWriteOpts
+    }
+
+    // Commit 1 - Initial insert
+    val inputDF1 = spark.createDataFrame(
+      spark.sparkContext.parallelize(recordsCommit1),
+      schema
+    )
+    inputDF1.write.format("hudi")
+      .options(writeOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    val commit1 = DataSourceTestUtils.latestCommitCompletionTime(storage, 
basePath)
+
+    // Commit 2 - Upsert: update key1 (int_field 1->100), insert key6 (INFO)
+    val recordsCommit2 = Seq(
+      Row("key1", 10L, "L1", 100, "str1", Row(10, "INFO")),
+      Row("key6", 6L, "L6", 6, "str6", Row(60, "INFO"))
+    )
+    val inputDF2 = spark.createDataFrame(
+      spark.sparkContext.parallelize(recordsCommit2),
+      schema
+    )
+    inputDF2.write.format("hudi")
+      .options(writeOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val commit2 = DataSourceTestUtils.latestCommitCompletionTime(storage, 
basePath)
+
+    // Commit 3 - Upsert: update key3 (int_field 3->300), insert key7 (INFO)
+    val recordsCommit3 = Seq(
+      Row("key3", 30L, "L3", 300, "str3", Row(30, "INFO")),
+      Row("key7", 7L, "L7", 7, "str7", Row(70, "INFO"))
+    )
+    val inputDF3 = spark.createDataFrame(
+      spark.sparkContext.parallelize(recordsCommit3),
+      schema
+    )
+    inputDF3.write.format("hudi")
+      .options(writeOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val commit3 = DataSourceTestUtils.latestCommitCompletionTime(storage, 
basePath)
+
+    // Verify partition structure - we should have 3 partitions: INFO, ERROR, 
DEBUG
+    val allPartitions = storage.listDirectEntries(new StoragePath(basePath))
+      .asScala.filter(_.isDirectory)
+      .map(_.getPath.getName)
+      .filterNot(_.startsWith("."))  // Filter out .hoodie and other hidden 
directories
+      .sorted
+    assertEquals(3, allPartitions.size, s"Expected 3 partitions for 
$tableType, but got: ${allPartitions.mkString(", ")}")
+    assertTrue(allPartitions.contains("INFO"), s"Missing INFO partition for 
$tableType")
+    assertTrue(allPartitions.contains("ERROR"), s"Missing ERROR partition for 
$tableType")
+    assertTrue(allPartitions.contains("DEBUG"), s"Missing DEBUG partition for 
$tableType")
+
+    // Snapshot read - filter on nested_record.level = 'INFO' (latest state: 5 
records)
+    val snapshotDF = spark.read.format("hudi")
+      .load(basePath)
+      .filter("nested_record.level = 'INFO'")
+      .select("key", "ts", "level", "int_field", "string_field", 
"nested_record")
+      .orderBy("key")
+
+    // VERIFICATION 1: Check partition schema contains the nested field
+    val snapshotRelation = 
snapshotDF.queryExecution.optimizedPlan.collectFirst {
+      case lr: org.apache.spark.sql.execution.datasources.LogicalRelation => lr
+    }
+    assertTrue(snapshotRelation.isDefined, s"LogicalRelation should exist for 
$tableType")
+    val fileIndex = snapshotRelation.get.relation match {
+      case fsRelation: 
org.apache.spark.sql.execution.datasources.HadoopFsRelation =>
+        fsRelation.location.asInstanceOf[org.apache.hudi.HoodieFileIndex]
+      case baseRelation: org.apache.hudi.HoodieBaseRelation =>

Review Comment:
   import `org.apache.spark.sql.execution.datasources.HadoopFsRelation`, 
`org.apache.hudi.HoodieFileIndex`, `org.apache.hudi.HoodieBaseRelation`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to