KnightChess commented on code in PR #12692:
URL: https://github.com/apache/hudi/pull/12692#discussion_r1927017233


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -46,6 +46,11 @@ object HoodieAnalysis extends SparkAdapterSupport {
   def customResolutionRules: Seq[RuleBuilder] = {
     val rules: ListBuffer[RuleBuilder] = ListBuffer()
 
+    if (HoodieSparkUtils.gteqSpark3_5) {

Review Comment:
   this rule should be applied after the 
`HoodieSpark35DataSourceV2ToV1Fallback` rule. Before applying 
RelationInsertInto again, the v2Relation has already been fallback to 
v1Relation.



##########
hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark35Analysis.scala:
##########
@@ -64,3 +72,132 @@ case class 
HoodieSpark35DataSourceV2ToV1Fallback(sparkSession: SparkSession) ext
     LogicalRelation(relation, output, catalogTable, isStreaming = false)
   }
 }
+
+/**
+ * In Spark 3.5, the following Resolution rules are removed,
+ * [[ResolveUserSpecifiedColumns]] and [[ResolveDefaultColumns]]
+ * (see code changes in [[org.apache.spark.sql.catalyst.analysis.Analyzer]]
+ * from https://github.com/apache/spark/pull/41262).
+ * The same logic of resolving the user specified columns and default values,
+ * which are required for a subset of columns as user specified compared to 
the table
+ * schema to work properly, are deferred to [[PreprocessTableInsertion]] for 
v1 INSERT.
+ *
+ * Note that [[HoodieAnalysis]] intercepts the [[InsertIntoStatement]] after 
Spark's built-in
+ * Resolution rules are applies, the logic of resolving the user specified 
columns and default
+ * values may no longer be applied. To make INSERT with a subset of columns 
specified by user
+ * to work, this custom resolution rule 
[[HoodieSpark35ResolveColumnsForInsertInto]] is added
+ * to achieve the same, before converting [[InsertIntoStatement]] into
+ * [[InsertIntoHoodieTableCommand]].
+ *
+ * Also note that, the project logic in [[ResolveImplementationsEarly]] for 
INSERT is still
+ * needed in the case of INSERT with all columns in a different ordering.
+ */
+case class HoodieSpark35ResolveColumnsForInsertInto() extends 
ResolveInsertionBase {
+  // NOTE: This is copied from [[PreprocessTableInsertion]] with additional 
handling of Hudi relations
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan match {
+      case i@InsertIntoStatement(table, _, _, query, _, _, _)
+        if table.resolved && query.resolved
+          && i.userSpecifiedCols.nonEmpty && 
i.table.isInstanceOf[LogicalRelation]
+          && 
sparkAdapter.isHoodieTable(i.table.asInstanceOf[LogicalRelation].catalogTable.get)
 =>
+        table match {

Review Comment:
   And with separate implementations for different versions, if full support 
for v2 relations is achieved in the future, this Rule can simply be removed 
without incurring significant refactoring costs. What do you think?



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -46,6 +46,11 @@ object HoodieAnalysis extends SparkAdapterSupport {
   def customResolutionRules: Seq[RuleBuilder] = {
     val rules: ListBuffer[RuleBuilder] = ListBuffer()
 
+    if (HoodieSparkUtils.gteqSpark3_5) {
+      rules += (_ => instantiateKlass(
+        
"org.apache.spark.sql.hudi.analysis.HoodieSpark35ResolveColumnsForInsertInto"))

Review Comment:
   Yes, if the above logic is added, then the logic here can be removed.



##########
hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark35Analysis.scala:
##########
@@ -64,3 +72,132 @@ case class 
HoodieSpark35DataSourceV2ToV1Fallback(sparkSession: SparkSession) ext
     LogicalRelation(relation, output, catalogTable, isStreaming = false)
   }
 }
+
+/**
+ * In Spark 3.5, the following Resolution rules are removed,
+ * [[ResolveUserSpecifiedColumns]] and [[ResolveDefaultColumns]]
+ * (see code changes in [[org.apache.spark.sql.catalyst.analysis.Analyzer]]
+ * from https://github.com/apache/spark/pull/41262).
+ * The same logic of resolving the user specified columns and default values,
+ * which are required for a subset of columns as user specified compared to 
the table
+ * schema to work properly, are deferred to [[PreprocessTableInsertion]] for 
v1 INSERT.
+ *
+ * Note that [[HoodieAnalysis]] intercepts the [[InsertIntoStatement]] after 
Spark's built-in
+ * Resolution rules are applies, the logic of resolving the user specified 
columns and default
+ * values may no longer be applied. To make INSERT with a subset of columns 
specified by user
+ * to work, this custom resolution rule 
[[HoodieSpark35ResolveColumnsForInsertInto]] is added
+ * to achieve the same, before converting [[InsertIntoStatement]] into
+ * [[InsertIntoHoodieTableCommand]].
+ *
+ * Also note that, the project logic in [[ResolveImplementationsEarly]] for 
INSERT is still
+ * needed in the case of INSERT with all columns in a different ordering.
+ */
+case class HoodieSpark35ResolveColumnsForInsertInto() extends 
ResolveInsertionBase {
+  // NOTE: This is copied from [[PreprocessTableInsertion]] with additional 
handling of Hudi relations
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan match {
+      case i@InsertIntoStatement(table, _, _, query, _, _, _)
+        if table.resolved && query.resolved
+          && i.userSpecifiedCols.nonEmpty && 
i.table.isInstanceOf[LogicalRelation]
+          && 
sparkAdapter.isHoodieTable(i.table.asInstanceOf[LogicalRelation].catalogTable.get)
 =>
+        table match {

Review Comment:
   > But I'm wondering if I should extract the functionality of projection so 
it's easier to maintain
   
   I think both approaches are fine, but I prefer having separate 
implementations for different versions, even if it leads to some code 
duplication. Even if the abstraction works for Spark 3.4 and Spark 3.5 at the 
moment, there might be significant changes in future versions like Spark 4.0 
and Spark 4.1, making it difficult for a single logic to adapt to multiple 
versions. This could even result in the creation of abstractions like 
spark3_45Common and spark4_123Common in the future, which would further 
complicate things.



##########
hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark35Analysis.scala:
##########
@@ -64,3 +72,132 @@ case class 
HoodieSpark35DataSourceV2ToV1Fallback(sparkSession: SparkSession) ext
     LogicalRelation(relation, output, catalogTable, isStreaming = false)
   }
 }
+
+/**
+ * In Spark 3.5, the following Resolution rules are removed,
+ * [[ResolveUserSpecifiedColumns]] and [[ResolveDefaultColumns]]
+ * (see code changes in [[org.apache.spark.sql.catalyst.analysis.Analyzer]]
+ * from https://github.com/apache/spark/pull/41262).
+ * The same logic of resolving the user specified columns and default values,
+ * which are required for a subset of columns as user specified compared to 
the table
+ * schema to work properly, are deferred to [[PreprocessTableInsertion]] for 
v1 INSERT.
+ *
+ * Note that [[HoodieAnalysis]] intercepts the [[InsertIntoStatement]] after 
Spark's built-in
+ * Resolution rules are applies, the logic of resolving the user specified 
columns and default
+ * values may no longer be applied. To make INSERT with a subset of columns 
specified by user
+ * to work, this custom resolution rule 
[[HoodieSpark35ResolveColumnsForInsertInto]] is added
+ * to achieve the same, before converting [[InsertIntoStatement]] into
+ * [[InsertIntoHoodieTableCommand]].
+ *
+ * Also note that, the project logic in [[ResolveImplementationsEarly]] for 
INSERT is still
+ * needed in the case of INSERT with all columns in a different ordering.
+ */
+case class HoodieSpark35ResolveColumnsForInsertInto() extends 
ResolveInsertionBase {
+  // NOTE: This is copied from [[PreprocessTableInsertion]] with additional 
handling of Hudi relations
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan match {
+      case i@InsertIntoStatement(table, _, _, query, _, _, _)
+        if table.resolved && query.resolved
+          && i.userSpecifiedCols.nonEmpty && 
i.table.isInstanceOf[LogicalRelation]
+          && 
sparkAdapter.isHoodieTable(i.table.asInstanceOf[LogicalRelation].catalogTable.get)
 =>
+        table match {

Review Comment:
   > Also are these relations v2? It seems that they are mixed with v1 logic.
   hudi is currently mostly implemented using the v1 approach, and v2 relations 
in writes are generally fallback to v1 relations. So i think it's ok now 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to