Kimahriman commented on code in PR #785:
URL: https://github.com/apache/datafusion-comet/pull/785#discussion_r1705545450
##########
spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala:
##########
@@ -950,28 +950,31 @@ class CometSparkSessionExtensions
}
// This rule is responsible for eliminating redundant transitions between
row-based and
- // columnar-based operators for Comet. Currently, two potential redundant
transitions are:
+ // columnar-based operators for Comet. Currently, three potential redundant
transitions are:
// 1. `ColumnarToRowExec` on top of an ending `CometCollectLimitExec`
operator, which is
// redundant as `CometCollectLimitExec` already wraps a
`ColumnarToRowExec` for row-based
// output.
- // 2. Consecutive operators of `CometRowToColumnarExec` and
`ColumnarToRowExec`.
+ // 2. Consecutive operators of `CometSparkToColumnarExec` and
`ColumnarToRowExec`.
+ // 3. AQE inserts an additional `CometSparkToColumnarExec` in addition to
the one inserted in the
+ // original plan.
//
// Note about the first case: The `ColumnarToRowExec` was added during
// ApplyColumnarRulesAndInsertTransitions' insertTransitions phase when
Spark requests row-based
// output such as a `collect` call. It's correct to add a redundant
`ColumnarToRowExec` for
// `CometExec`. However, for certain operators such as
`CometCollectLimitExec` which overrides
// `executeCollect`, the redundant `ColumnarToRowExec` makes the override
ineffective.
//
- // Note about the second case: When `spark.comet.rowToColumnar.enabled` is
set, Comet will add
- // `CometRowToColumnarExec` on top of row-based operators first, but the
downstream operator
+ // Note about the second case: When `spark.comet.sparkToColumnar.enabled` is
set, Comet will add
+ // `CometSparkToColumnarExec` on top of row-based operators first, but the
downstream operator
// only takes row-based input as it's a vanilla Spark operator(as Comet
cannot convert it for
// various reasons) or Spark requests row-based output such as a `collect`
call. Spark will adds
- // another `ColumnarToRowExec` on top of `CometRowToColumnarExec`. In this
case, the pair could
+ // another `ColumnarToRowExec` on top of `CometSparkToColumnarExec`. In this
case, the pair could
// be removed.
case class EliminateRedundantTransitions(session: SparkSession) extends
Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
val eliminatedPlan = plan transformUp {
- case ColumnarToRowExec(rowToColumnar: CometRowToColumnarExec) =>
rowToColumnar.child
+ case ColumnarToRowExec(sparkToColumnar: CometSparkToColumnarExec) =>
sparkToColumnar.child
+ case CometSparkToColumnarExec(child: CometSparkToColumnarExec) => child
Review Comment:
This is the bug fix for duplicate nodes from AQE, I can make this a separate
PR if needed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]