Kimahriman commented on code in PR #785:
URL: https://github.com/apache/datafusion-comet/pull/785#discussion_r1705545450


##########
spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala:
##########
@@ -950,28 +950,31 @@ class CometSparkSessionExtensions
   }
 
   // This rule is responsible for eliminating redundant transitions between 
row-based and
-  // columnar-based operators for Comet. Currently, two potential redundant 
transitions are:
+  // columnar-based operators for Comet. Currently, three potential redundant 
transitions are:
   // 1. `ColumnarToRowExec` on top of an ending `CometCollectLimitExec` 
operator, which is
   //    redundant as `CometCollectLimitExec` already wraps a 
`ColumnarToRowExec` for row-based
   //    output.
-  // 2. Consecutive operators of `CometRowToColumnarExec` and 
`ColumnarToRowExec`.
+  // 2. Consecutive operators of `CometSparkToColumnarExec` and 
`ColumnarToRowExec`.
+  // 3. AQE inserts an additional `CometSparkToColumnarExec` in addition to 
the one inserted in the
+  //    original plan.
   //
   // Note about the first case: The `ColumnarToRowExec` was added during
   // ApplyColumnarRulesAndInsertTransitions' insertTransitions phase when 
Spark requests row-based
   // output such as a `collect` call. It's correct to add a redundant 
`ColumnarToRowExec` for
   // `CometExec`. However, for certain operators such as 
`CometCollectLimitExec` which overrides
   // `executeCollect`, the redundant `ColumnarToRowExec` makes the override 
ineffective.
   //
-  // Note about the second case: When `spark.comet.rowToColumnar.enabled` is 
set, Comet will add
-  // `CometRowToColumnarExec` on top of row-based operators first, but the 
downstream operator
+  // Note about the second case: When `spark.comet.sparkToColumnar.enabled` is 
set, Comet will add
+  // `CometSparkToColumnarExec` on top of row-based operators first, but the 
downstream operator
   // only takes row-based input as it's a vanilla Spark operator(as Comet 
cannot convert it for
   // various reasons) or Spark requests row-based output such as a `collect` 
call. Spark will adds
-  // another `ColumnarToRowExec` on top of `CometRowToColumnarExec`. In this 
case, the pair could
+  // another `ColumnarToRowExec` on top of `CometSparkToColumnarExec`. In this 
case, the pair could
   // be removed.
   case class EliminateRedundantTransitions(session: SparkSession) extends 
Rule[SparkPlan] {
     override def apply(plan: SparkPlan): SparkPlan = {
       val eliminatedPlan = plan transformUp {
-        case ColumnarToRowExec(rowToColumnar: CometRowToColumnarExec) => 
rowToColumnar.child
+        case ColumnarToRowExec(sparkToColumnar: CometSparkToColumnarExec) => 
sparkToColumnar.child
+        case CometSparkToColumnarExec(child: CometSparkToColumnarExec) => child

Review Comment:
   This is the bug fix for duplicate nodes from AQE, I can make this a separate 
PR if needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to