KnightChess commented on code in PR #12692:
URL: https://github.com/apache/hudi/pull/12692#discussion_r1927017233
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -46,6 +46,11 @@ object HoodieAnalysis extends SparkAdapterSupport {
def customResolutionRules: Seq[RuleBuilder] = {
val rules: ListBuffer[RuleBuilder] = ListBuffer()
+ if (HoodieSparkUtils.gteqSpark3_5) {
Review Comment:
this rule should be applied after the
`HoodieSpark35DataSourceV2ToV1Fallback` rule. Before applying
RelationInsertInto again, the v2Relation has already been fallback to
v1Relation.
##########
hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark35Analysis.scala:
##########
@@ -64,3 +72,132 @@ case class
HoodieSpark35DataSourceV2ToV1Fallback(sparkSession: SparkSession) ext
LogicalRelation(relation, output, catalogTable, isStreaming = false)
}
}
+
+/**
+ * In Spark 3.5, the following Resolution rules are removed,
+ * [[ResolveUserSpecifiedColumns]] and [[ResolveDefaultColumns]]
+ * (see code changes in [[org.apache.spark.sql.catalyst.analysis.Analyzer]]
+ * from https://github.com/apache/spark/pull/41262).
+ * The same logic of resolving the user specified columns and default values,
+ * which are required for a subset of columns as user specified compared to
the table
+ * schema to work properly, are deferred to [[PreprocessTableInsertion]] for
v1 INSERT.
+ *
+ * Note that [[HoodieAnalysis]] intercepts the [[InsertIntoStatement]] after
Spark's built-in
+ * Resolution rules are applies, the logic of resolving the user specified
columns and default
+ * values may no longer be applied. To make INSERT with a subset of columns
specified by user
+ * to work, this custom resolution rule
[[HoodieSpark35ResolveColumnsForInsertInto]] is added
+ * to achieve the same, before converting [[InsertIntoStatement]] into
+ * [[InsertIntoHoodieTableCommand]].
+ *
+ * Also note that, the project logic in [[ResolveImplementationsEarly]] for
INSERT is still
+ * needed in the case of INSERT with all columns in a different ordering.
+ */
+case class HoodieSpark35ResolveColumnsForInsertInto() extends
ResolveInsertionBase {
+ // NOTE: This is copied from [[PreprocessTableInsertion]] with additional
handling of Hudi relations
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan match {
+ case i@InsertIntoStatement(table, _, _, query, _, _, _)
+ if table.resolved && query.resolved
+ && i.userSpecifiedCols.nonEmpty &&
i.table.isInstanceOf[LogicalRelation]
+ &&
sparkAdapter.isHoodieTable(i.table.asInstanceOf[LogicalRelation].catalogTable.get)
=>
+ table match {
Review Comment:
And with separate implementations for different versions, if full support
for v2 relations is achieved in the future, this Rule can simply be removed
without incurring significant refactoring costs. What do you think?
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -46,6 +46,11 @@ object HoodieAnalysis extends SparkAdapterSupport {
def customResolutionRules: Seq[RuleBuilder] = {
val rules: ListBuffer[RuleBuilder] = ListBuffer()
+ if (HoodieSparkUtils.gteqSpark3_5) {
+ rules += (_ => instantiateKlass(
+
"org.apache.spark.sql.hudi.analysis.HoodieSpark35ResolveColumnsForInsertInto"))
Review Comment:
Yes, if the above logic is added, then the logic here can be removed.
##########
hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark35Analysis.scala:
##########
@@ -64,3 +72,132 @@ case class
HoodieSpark35DataSourceV2ToV1Fallback(sparkSession: SparkSession) ext
LogicalRelation(relation, output, catalogTable, isStreaming = false)
}
}
+
+/**
+ * In Spark 3.5, the following Resolution rules are removed,
+ * [[ResolveUserSpecifiedColumns]] and [[ResolveDefaultColumns]]
+ * (see code changes in [[org.apache.spark.sql.catalyst.analysis.Analyzer]]
+ * from https://github.com/apache/spark/pull/41262).
+ * The same logic of resolving the user specified columns and default values,
+ * which are required for a subset of columns as user specified compared to
the table
+ * schema to work properly, are deferred to [[PreprocessTableInsertion]] for
v1 INSERT.
+ *
+ * Note that [[HoodieAnalysis]] intercepts the [[InsertIntoStatement]] after
Spark's built-in
+ * Resolution rules are applies, the logic of resolving the user specified
columns and default
+ * values may no longer be applied. To make INSERT with a subset of columns
specified by user
+ * to work, this custom resolution rule
[[HoodieSpark35ResolveColumnsForInsertInto]] is added
+ * to achieve the same, before converting [[InsertIntoStatement]] into
+ * [[InsertIntoHoodieTableCommand]].
+ *
+ * Also note that, the project logic in [[ResolveImplementationsEarly]] for
INSERT is still
+ * needed in the case of INSERT with all columns in a different ordering.
+ */
+case class HoodieSpark35ResolveColumnsForInsertInto() extends
ResolveInsertionBase {
+ // NOTE: This is copied from [[PreprocessTableInsertion]] with additional
handling of Hudi relations
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan match {
+ case i@InsertIntoStatement(table, _, _, query, _, _, _)
+ if table.resolved && query.resolved
+ && i.userSpecifiedCols.nonEmpty &&
i.table.isInstanceOf[LogicalRelation]
+ &&
sparkAdapter.isHoodieTable(i.table.asInstanceOf[LogicalRelation].catalogTable.get)
=>
+ table match {
Review Comment:
> But I'm wondering if I should extract the functionality of projection so
it's easier to maintain
I think both approaches are fine, but I prefer having separate
implementations for different versions, even if it leads to some code
duplication. Even if the abstraction works for Spark 3.4 and Spark 3.5 at the
moment, there might be significant changes in future versions like Spark 4.0
and Spark 4.1, making it difficult for a single logic to adapt to multiple
versions. This could even result in the creation of abstractions like
spark3_45Common and spark4_123Common in the future, which would further
complicate things.
##########
hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark35Analysis.scala:
##########
@@ -64,3 +72,132 @@ case class
HoodieSpark35DataSourceV2ToV1Fallback(sparkSession: SparkSession) ext
LogicalRelation(relation, output, catalogTable, isStreaming = false)
}
}
+
+/**
+ * In Spark 3.5, the following Resolution rules are removed,
+ * [[ResolveUserSpecifiedColumns]] and [[ResolveDefaultColumns]]
+ * (see code changes in [[org.apache.spark.sql.catalyst.analysis.Analyzer]]
+ * from https://github.com/apache/spark/pull/41262).
+ * The same logic of resolving the user specified columns and default values,
+ * which are required for a subset of columns as user specified compared to
the table
+ * schema to work properly, are deferred to [[PreprocessTableInsertion]] for
v1 INSERT.
+ *
+ * Note that [[HoodieAnalysis]] intercepts the [[InsertIntoStatement]] after
Spark's built-in
+ * Resolution rules are applies, the logic of resolving the user specified
columns and default
+ * values may no longer be applied. To make INSERT with a subset of columns
specified by user
+ * to work, this custom resolution rule
[[HoodieSpark35ResolveColumnsForInsertInto]] is added
+ * to achieve the same, before converting [[InsertIntoStatement]] into
+ * [[InsertIntoHoodieTableCommand]].
+ *
+ * Also note that, the project logic in [[ResolveImplementationsEarly]] for
INSERT is still
+ * needed in the case of INSERT with all columns in a different ordering.
+ */
+case class HoodieSpark35ResolveColumnsForInsertInto() extends
ResolveInsertionBase {
+ // NOTE: This is copied from [[PreprocessTableInsertion]] with additional
handling of Hudi relations
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan match {
+ case i@InsertIntoStatement(table, _, _, query, _, _, _)
+ if table.resolved && query.resolved
+ && i.userSpecifiedCols.nonEmpty &&
i.table.isInstanceOf[LogicalRelation]
+ &&
sparkAdapter.isHoodieTable(i.table.asInstanceOf[LogicalRelation].catalogTable.get)
=>
+ table match {
Review Comment:
> Also are these relations v2? It seems that they are mixed with v1 logic.
hudi is currently mostly implemented using the v1 approach, and v2 relations
in writes are generally fallback to v1 relations. So i think it's ok now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]