This is an automated email from the ASF dual-hosted git repository.
voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ba8c4c7b82c7 fix(spark): align CTAS partition fields by table
partition order (#18899)
ba8c4c7b82c7 is described below
commit ba8c4c7b82c7c94405f712e5d1964226412f4e13
Author: fhan <[email protected]>
AuthorDate: Wed Jun 3 18:44:39 2026 +0800
fix(spark): align CTAS partition fields by table partition order (#18899)
* fix(spark): fix CTAS partition field order
* fix(spark): add a short comment
---------
Co-authored-by: fhan <[email protected]>
---
.../spark/sql/hudi/analysis/HoodieAnalysis.scala | 34 +++++++++++++++++-
.../spark/sql/hudi/ddl/TestCreateTable.scala | 40 ++++++++++++++++++++++
2 files changed, 73 insertions(+), 1 deletion(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 2b790e427647..0065c910480c 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -410,7 +410,8 @@ case class ResolveImplementationsEarly(spark: SparkSession)
extends Rule[Logical
// Convert to CreateHoodieTableAsSelectCommand
case ct @ CreateTable(table, mode, Some(query))
if sparkAdapter.isHoodieTable(table) && ct.query.forall(_.resolved) =>
- val alignedQuery = stripMetaFieldAttributes(query)
+ val alignedQuery = alignCtasQueryByPartitionOrder(
+ stripMetaFieldAttributes(query), table.partitionColumnNames)
CreateHoodieTableAsSelectCommand(table, mode, alignedQuery)
case ct: CreateTable =>
@@ -432,6 +433,37 @@ case class ResolveImplementationsEarly(spark:
SparkSession) extends Rule[Logical
case _ => plan
}
}
+
+ private def alignCtasQueryByPartitionOrder(query: LogicalPlan,
partitionColumns: Seq[String]): LogicalPlan = {
+ if (partitionColumns.isEmpty) {
+ query
+ } else {
+ val resolver = spark.sessionState.conf.resolver
+ val (dataAttrs, partitionAttrs) = query.output.partition { attr =>
+ !partitionColumns.exists(partition => resolver(partition, attr.name))
+ }
+
+ if (partitionAttrs.size != partitionColumns.size) {
+ throw new HoodieAnalysisException(s"Partition columns
${partitionColumns.mkString("[", ", ", "]")} " +
+ s"do not match query output ${query.output.map(_.name).mkString("[",
", ", "]")}")
+ }
+
+ val alreadyAligned = partitionColumns.zip(partitionAttrs).forall {
+ case (partition, attr) => resolver(partition, attr.name)
+ }
+ // Avoid adding a redundant Project when partition columns are already
in the table-defined order.
+ if (alreadyAligned) {
+ query
+ } else {
+ val orderedPartitionAttrs = partitionColumns.map { partition =>
+ partitionAttrs.find(attr => resolver(partition,
attr.name)).getOrElse {
+ throw new HoodieAnalysisException(s"Cannot resolve partition
column $partition in CTAS query output")
+ }
+ }
+ Project(dataAttrs ++ orderedPartitionAttrs, query)
+ }
+ }
+ }
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
index ead994342162..88ec4b7fb4c3 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
@@ -416,6 +416,46 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
Seq(1, "a1", 10, "2021-04-01")
)
+ // Create table with multi-level partition
+ val tableNameMultiLevelPartition = generateTableName
+ spark.sql(
+ s"""
+ | create table $tableNameMultiLevelPartition using hudi
+ | partitioned by (year, month, day)
+ | tblproperties(
+ | primaryKey = 'id',
+ | type = '$tableType'
+ | )
+ | location '${tmp.getCanonicalPath}/$tableNameMultiLevelPartition'
+ | AS
+ | select 1 as id, 'a1' as name, 10 as price, '2021' as year, '04'
as month, '01' as day
+ """.stripMargin
+ )
+
+ checkAnswer(s"select id, name, price, year, month, day from
$tableNameMultiLevelPartition")(
+ Seq(1, "a1", 10, "2021", "04", "01")
+ )
+
+ // Create table with multi-level partition and out-of-order partition
columns
+ val tableNameMultiLevelPartitionDisorder = generateTableName
+ spark.sql(
+ s"""
+ | create table $tableNameMultiLevelPartitionDisorder using hudi
+ | partitioned by (year, month, day)
+ | tblproperties(
+ | primaryKey = 'id',
+ | type = '$tableType'
+ | )
+ | location
'${tmp.getCanonicalPath}/$tableNameMultiLevelPartitionDisorder'
+ | AS
+ | select 1 as id, 'a1' as name, 10 as price, '04' as month, '01'
as day, '2021' as year
+ """.stripMargin
+ )
+
+ checkAnswer(s"select id, name, price, year, month, day from
$tableNameMultiLevelPartitionDisorder")(
+ Seq(1, "a1", 10, "2021", "04", "01")
+ )
+
// Create Partitioned table with timestamp data type
val tableName3 = generateTableName
// CTAS failed with null primaryKey