This is an automated email from the ASF dual-hosted git repository.

rui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 904173fa9e [VL] Remove rule `PullOutDuplicateProject` (#11667)
904173fa9e is described below

commit 904173fa9e6090321ee88e65b2527f9aa84f6d7e
Author: Mingliang Zhu <[email protected]>
AuthorDate: Fri Feb 27 17:47:18 2026 +0800

    [VL] Remove rule `PullOutDuplicateProject` (#11667)
---
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |   2 -
 .../gluten/extension/PullOutDuplicateProject.scala | 137 ---------------------
 .../gluten/execution/VeloxHashJoinSuite.scala      |  55 +++------
 3 files changed, 15 insertions(+), 179 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index fcc00389d5..773868b0c4 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -120,7 +120,6 @@ object VeloxRuleApi {
     injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
     injector.injectPostTransform(_ => EnsureLocalSortRequirements)
     injector.injectPostTransform(_ => EliminateLocalSort)
-    injector.injectPostTransform(_ => PullOutDuplicateProject)
     injector.injectPostTransform(_ => CollapseProjectExecTransformer)
     injector.injectPostTransform(c => 
FlushableHashAggregateRule.apply(c.session))
     injector.injectPostTransform(_ => CollectLimitTransformerRule())
@@ -226,7 +225,6 @@ object VeloxRuleApi {
     injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
     injector.injectPostTransform(_ => EnsureLocalSortRequirements)
     injector.injectPostTransform(_ => EliminateLocalSort)
-    injector.injectPostTransform(_ => PullOutDuplicateProject)
     injector.injectPostTransform(_ => CollapseProjectExecTransformer)
     injector.injectPostTransform(c => 
FlushableHashAggregateRule.apply(c.session))
     injector.injectPostTransform(_ => CollectLimitTransformerRule())
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/PullOutDuplicateProject.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/PullOutDuplicateProject.scala
deleted file mode 100644
index 1a8f160074..0000000000
--- 
a/backends-velox/src/main/scala/org/apache/gluten/extension/PullOutDuplicateProject.scala
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.extension
-
-import org.apache.gluten.execution.{BroadcastHashJoinExecTransformer, 
FilterExecTransformer, LimitExecTransformer, ProjectExecTransformer}
-
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeMap, AttributeSet, PredicateHelper}
-import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution._
-
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * Velox does not allow duplicate projections in HashProbe and FilterProject, 
this rule pull out
- * duplicate projections to a new project outside.
- */
-object PullOutDuplicateProject extends Rule[SparkPlan] with PredicateHelper {
-  override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
-    case l @ LimitExecTransformer(p: ProjectExecTransformer, _, _) =>
-      val duplicates = calculateDuplicates(p, AttributeSet.empty)
-      if (duplicates.isEmpty) {
-        l
-      } else {
-        val pullOutAliases = new ArrayBuffer[Alias]()
-        val newChild = rewriteProject(p, AttributeSet.empty, pullOutAliases, 
duplicates)
-        outerProject(l.copy(child = newChild), l.output, pullOutAliases)
-      }
-    case p @ ProjectExecTransformer(_, child: ProjectExecTransformer) =>
-      val duplicates = calculateDuplicates(child, AttributeSet.empty)
-      if (duplicates.isEmpty) {
-        p
-      } else {
-        val pullOutAliases = new ArrayBuffer[Alias]()
-        val newChild = rewriteProject(child, AttributeSet.empty, 
pullOutAliases, duplicates)
-        val aliasMap = AttributeMap(pullOutAliases.map(a => a.toAttribute -> 
a))
-        val newProjectList = p.projectList.map(replaceAliasButKeepName(_, 
aliasMap))
-        ProjectExecTransformer(newProjectList, newChild)
-      }
-    case f @ FilterExecTransformer(_, child: ProjectExecTransformer) =>
-      val duplicates = calculateDuplicates(child, f.references)
-      if (duplicates.isEmpty) {
-        f
-      } else {
-        val pullOutAliases = new ArrayBuffer[Alias]()
-        val newChild = rewriteProject(child, f.references, pullOutAliases, 
duplicates)
-        outerProject(f.copy(child = newChild), f.output, pullOutAliases)
-      }
-    case bhj: BroadcastHashJoinExecTransformer
-        if bhj.streamedPlan.isInstanceOf[ProjectExecTransformer] =>
-      val duplicates =
-        
calculateDuplicates(bhj.streamedPlan.asInstanceOf[ProjectExecTransformer], 
bhj.references)
-      if (duplicates.isEmpty) {
-        bhj
-      } else {
-        val pullOutAliases = new ArrayBuffer[Alias]()
-        val newStreamedPlan = rewriteProject(
-          bhj.streamedPlan.asInstanceOf[ProjectExecTransformer],
-          bhj.references,
-          pullOutAliases,
-          duplicates)
-        val newBhj = bhj.joinBuildSide match {
-          case BuildLeft => bhj.copy(right = newStreamedPlan)
-          case BuildRight => bhj.copy(left = newStreamedPlan)
-        }
-        outerProject(newBhj, bhj.output, pullOutAliases)
-      }
-  }
-
-  private def outerProject(
-      child: SparkPlan,
-      output: Seq[Attribute],
-      pullOutAliases: ArrayBuffer[Alias]): ProjectExecTransformer = {
-    val aliasMap = AttributeMap(pullOutAliases.map(a => a.toAttribute -> a))
-    val newProjectList = output.map(attr => aliasMap.getOrElse(attr, attr))
-    ProjectExecTransformer(newProjectList, child)
-  }
-
-  /** Calculate the original attributes corresponding to duplicate 
projections. */
-  private def calculateDuplicates(
-      project: ProjectExecTransformer,
-      references: AttributeSet): AttributeSet = {
-    val projectList = project.projectList
-    AttributeSet(
-      projectList
-        .collect {
-          case attr: Attribute if !references.contains(attr) => attr
-          case a @ Alias(attr: Attribute, _)
-              if !references.contains(a) && !references.contains(attr) =>
-            attr
-        }
-        .groupBy(_.exprId)
-        .filter(_._2.size > 1)
-        .map(_._2.head))
-  }
-
-  /**
-   * If there are duplicate projections and not refer to parent, only the 
original attribute is kept
-   * in the project.
-   */
-  private def rewriteProject(
-      project: ProjectExecTransformer,
-      references: AttributeSet,
-      pullOutAliases: ArrayBuffer[Alias],
-      duplicates: AttributeSet): SparkPlan = {
-    val projectList = project.projectList
-    val newProjectList = projectList.distinct.filter {
-      case a @ Alias(attr: Attribute, _) if !references.contains(a) && 
duplicates.contains(attr) =>
-        pullOutAliases.append(a)
-        false
-      case _ => true
-    } ++ duplicates.filter(!project.outputSet.contains(_)).toSeq
-    val newProject = project.copy(projectList = newProjectList)
-    newProject.copyTagsFrom(project)
-    // If the output of the new project is the same as the child, delete it to 
simplify the plan.
-    if (newProject.outputSet.equals(project.child.outputSet)) {
-      project.child
-    } else {
-      newProject
-
-    }
-  }
-}
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
index 8db6b95775..4fca03fa85 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
@@ -21,7 +21,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, 
ColumnarSubqueryBroadcastExec, InputIteratorTransformer}
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ReusedExchangeExec}
 
@@ -230,12 +230,12 @@ class VeloxHashJoinSuite extends 
VeloxWholeStageTransformerSuite {
         })
   }
 
-  test("pull out duplicate projections for HashProbe and FilterProject") {
+  test("duplicate projections") {
     withTable("t1", "t2", "t3") {
       Seq((1, 1), (2, 2)).toDF("c1", "c2").write.saveAsTable("t1")
       Seq(1, 2, 3).toDF("c1").write.saveAsTable("t2")
       Seq(1, 2, 3).toDF("c1").write.saveAsTable("t3")
-      // test HashProbe, pull out `c2 as a,c2 as b`.
+      // Test HashProbe.
       val q1 =
         """
           |select tt1.* from
@@ -260,50 +260,25 @@ class VeloxHashJoinSuite extends 
VeloxWholeStageTransformerSuite {
           |left join t3
           |on tt1.c1 = t3.c1
           |""".stripMargin
-      Seq(q1, q2, q3).foreach {
-        runQueryAndCompare(_) {
-          df =>
-            {
-              val executedPlan = getExecutedPlan(df)
-              val projects = executedPlan.collect {
-                case p @ ProjectExecTransformer(_, _: 
BroadcastHashJoinExecTransformer) => p
-              }
-              assert(projects.nonEmpty)
-              val aliases = projects.last.projectList.collect { case a: Alias 
=> a }
-              assert(aliases.size == 2)
-            }
-        }
-      }
-
-      // test FilterProject, only pull out `c2 as b`.
       val q4 =
-        """
-          |select c1, c2, a, b from
-          |(select c1, c2, c2 as a, c2 as b, rand() as c from t1) tt1
-          |where c > -1 and b > 1
-          |""".stripMargin
-      runQueryAndCompare(q4) {
-        df =>
-          {
-            val executedPlan = getExecutedPlan(df)
-            val projects = executedPlan.collect {
-              case p @ ProjectExecTransformer(_, _: FilterExecTransformer) => p
-            }
-            assert(projects.nonEmpty)
-            val aliases = projects.last.projectList.collect { case a: Alias => 
a }
-            assert(aliases.size == 1)
-          }
-      }
-
-      // Test HashProbe operation when projecting a column multiple times 
without using an alias.
-      val q5 =
         """
           |select tt1.* from
           |(select c1, c2, c2 from t1) tt1
           |left join t2
           |on tt1.c1 = t2.c1
           |""".stripMargin
-      runQueryAndCompare(q5) { _ => }
+
+      // Test FilterProject.
+      val q5 =
+        """
+          |select c1, c2, a, b from
+          |(select c1, c2, c2 as a, c2 as b, rand() as c from t1) tt1
+          |where c > -1 and b > 1
+          |""".stripMargin
+
+      Seq(q1, q2, q3, q4, q5).foreach {
+        runQueryAndCompare(_) { _ => }
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to