This is an automated email from the ASF dual-hosted git repository.
rui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 904173fa9e [VL] Remove rule `PullOutDuplicateProject` (#11667)
904173fa9e is described below
commit 904173fa9e6090321ee88e65b2527f9aa84f6d7e
Author: Mingliang Zhu <[email protected]>
AuthorDate: Fri Feb 27 17:47:18 2026 +0800
[VL] Remove rule `PullOutDuplicateProject` (#11667)
---
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 2 -
.../gluten/extension/PullOutDuplicateProject.scala | 137 ---------------------
.../gluten/execution/VeloxHashJoinSuite.scala | 55 +++------
3 files changed, 15 insertions(+), 179 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index fcc00389d5..773868b0c4 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -120,7 +120,6 @@ object VeloxRuleApi {
injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
injector.injectPostTransform(_ => EnsureLocalSortRequirements)
injector.injectPostTransform(_ => EliminateLocalSort)
- injector.injectPostTransform(_ => PullOutDuplicateProject)
injector.injectPostTransform(_ => CollapseProjectExecTransformer)
injector.injectPostTransform(c =>
FlushableHashAggregateRule.apply(c.session))
injector.injectPostTransform(_ => CollectLimitTransformerRule())
@@ -226,7 +225,6 @@ object VeloxRuleApi {
injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
injector.injectPostTransform(_ => EnsureLocalSortRequirements)
injector.injectPostTransform(_ => EliminateLocalSort)
- injector.injectPostTransform(_ => PullOutDuplicateProject)
injector.injectPostTransform(_ => CollapseProjectExecTransformer)
injector.injectPostTransform(c =>
FlushableHashAggregateRule.apply(c.session))
injector.injectPostTransform(_ => CollectLimitTransformerRule())
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/PullOutDuplicateProject.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/PullOutDuplicateProject.scala
deleted file mode 100644
index 1a8f160074..0000000000
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/PullOutDuplicateProject.scala
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.extension
-
-import org.apache.gluten.execution.{BroadcastHashJoinExecTransformer,
FilterExecTransformer, LimitExecTransformer, ProjectExecTransformer}
-
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeMap, AttributeSet, PredicateHelper}
-import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution._
-
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * Velox does not allow duplicate projections in HashProbe and FilterProject,
this rule pull out
- * duplicate projections to a new project outside.
- */
-object PullOutDuplicateProject extends Rule[SparkPlan] with PredicateHelper {
- override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
- case l @ LimitExecTransformer(p: ProjectExecTransformer, _, _) =>
- val duplicates = calculateDuplicates(p, AttributeSet.empty)
- if (duplicates.isEmpty) {
- l
- } else {
- val pullOutAliases = new ArrayBuffer[Alias]()
- val newChild = rewriteProject(p, AttributeSet.empty, pullOutAliases,
duplicates)
- outerProject(l.copy(child = newChild), l.output, pullOutAliases)
- }
- case p @ ProjectExecTransformer(_, child: ProjectExecTransformer) =>
- val duplicates = calculateDuplicates(child, AttributeSet.empty)
- if (duplicates.isEmpty) {
- p
- } else {
- val pullOutAliases = new ArrayBuffer[Alias]()
- val newChild = rewriteProject(child, AttributeSet.empty,
pullOutAliases, duplicates)
- val aliasMap = AttributeMap(pullOutAliases.map(a => a.toAttribute ->
a))
- val newProjectList = p.projectList.map(replaceAliasButKeepName(_,
aliasMap))
- ProjectExecTransformer(newProjectList, newChild)
- }
- case f @ FilterExecTransformer(_, child: ProjectExecTransformer) =>
- val duplicates = calculateDuplicates(child, f.references)
- if (duplicates.isEmpty) {
- f
- } else {
- val pullOutAliases = new ArrayBuffer[Alias]()
- val newChild = rewriteProject(child, f.references, pullOutAliases,
duplicates)
- outerProject(f.copy(child = newChild), f.output, pullOutAliases)
- }
- case bhj: BroadcastHashJoinExecTransformer
- if bhj.streamedPlan.isInstanceOf[ProjectExecTransformer] =>
- val duplicates =
-
calculateDuplicates(bhj.streamedPlan.asInstanceOf[ProjectExecTransformer],
bhj.references)
- if (duplicates.isEmpty) {
- bhj
- } else {
- val pullOutAliases = new ArrayBuffer[Alias]()
- val newStreamedPlan = rewriteProject(
- bhj.streamedPlan.asInstanceOf[ProjectExecTransformer],
- bhj.references,
- pullOutAliases,
- duplicates)
- val newBhj = bhj.joinBuildSide match {
- case BuildLeft => bhj.copy(right = newStreamedPlan)
- case BuildRight => bhj.copy(left = newStreamedPlan)
- }
- outerProject(newBhj, bhj.output, pullOutAliases)
- }
- }
-
- private def outerProject(
- child: SparkPlan,
- output: Seq[Attribute],
- pullOutAliases: ArrayBuffer[Alias]): ProjectExecTransformer = {
- val aliasMap = AttributeMap(pullOutAliases.map(a => a.toAttribute -> a))
- val newProjectList = output.map(attr => aliasMap.getOrElse(attr, attr))
- ProjectExecTransformer(newProjectList, child)
- }
-
- /** Calculate the original attributes corresponding to duplicate
projections. */
- private def calculateDuplicates(
- project: ProjectExecTransformer,
- references: AttributeSet): AttributeSet = {
- val projectList = project.projectList
- AttributeSet(
- projectList
- .collect {
- case attr: Attribute if !references.contains(attr) => attr
- case a @ Alias(attr: Attribute, _)
- if !references.contains(a) && !references.contains(attr) =>
- attr
- }
- .groupBy(_.exprId)
- .filter(_._2.size > 1)
- .map(_._2.head))
- }
-
- /**
- * If there are duplicate projections and not refer to parent, only the
original attribute is kept
- * in the project.
- */
- private def rewriteProject(
- project: ProjectExecTransformer,
- references: AttributeSet,
- pullOutAliases: ArrayBuffer[Alias],
- duplicates: AttributeSet): SparkPlan = {
- val projectList = project.projectList
- val newProjectList = projectList.distinct.filter {
- case a @ Alias(attr: Attribute, _) if !references.contains(a) &&
duplicates.contains(attr) =>
- pullOutAliases.append(a)
- false
- case _ => true
- } ++ duplicates.filter(!project.outputSet.contains(_)).toSeq
- val newProject = project.copy(projectList = newProjectList)
- newProject.copyTagsFrom(project)
- // If the output of the new project is the same as the child, delete it to
simplify the plan.
- if (newProject.outputSet.equals(project.child.outputSet)) {
- project.child
- } else {
- newProject
-
- }
- }
-}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
index 8db6b95775..4fca03fa85 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
@@ -21,7 +21,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec,
ColumnarSubqueryBroadcastExec, InputIteratorTransformer}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ReusedExchangeExec}
@@ -230,12 +230,12 @@ class VeloxHashJoinSuite extends
VeloxWholeStageTransformerSuite {
})
}
- test("pull out duplicate projections for HashProbe and FilterProject") {
+ test("duplicate projections") {
withTable("t1", "t2", "t3") {
Seq((1, 1), (2, 2)).toDF("c1", "c2").write.saveAsTable("t1")
Seq(1, 2, 3).toDF("c1").write.saveAsTable("t2")
Seq(1, 2, 3).toDF("c1").write.saveAsTable("t3")
- // test HashProbe, pull out `c2 as a,c2 as b`.
+ // Test HashProbe.
val q1 =
"""
|select tt1.* from
@@ -260,50 +260,25 @@ class VeloxHashJoinSuite extends
VeloxWholeStageTransformerSuite {
|left join t3
|on tt1.c1 = t3.c1
|""".stripMargin
- Seq(q1, q2, q3).foreach {
- runQueryAndCompare(_) {
- df =>
- {
- val executedPlan = getExecutedPlan(df)
- val projects = executedPlan.collect {
- case p @ ProjectExecTransformer(_, _:
BroadcastHashJoinExecTransformer) => p
- }
- assert(projects.nonEmpty)
- val aliases = projects.last.projectList.collect { case a: Alias
=> a }
- assert(aliases.size == 2)
- }
- }
- }
-
- // test FilterProject, only pull out `c2 as b`.
val q4 =
- """
- |select c1, c2, a, b from
- |(select c1, c2, c2 as a, c2 as b, rand() as c from t1) tt1
- |where c > -1 and b > 1
- |""".stripMargin
- runQueryAndCompare(q4) {
- df =>
- {
- val executedPlan = getExecutedPlan(df)
- val projects = executedPlan.collect {
- case p @ ProjectExecTransformer(_, _: FilterExecTransformer) => p
- }
- assert(projects.nonEmpty)
- val aliases = projects.last.projectList.collect { case a: Alias =>
a }
- assert(aliases.size == 1)
- }
- }
-
- // Test HashProbe operation when projecting a column multiple times
without using an alias.
- val q5 =
"""
|select tt1.* from
|(select c1, c2, c2 from t1) tt1
|left join t2
|on tt1.c1 = t2.c1
|""".stripMargin
- runQueryAndCompare(q5) { _ => }
+
+ // Test FilterProject.
+ val q5 =
+ """
+ |select c1, c2, a, b from
+ |(select c1, c2, c2 as a, c2 as b, rand() as c from t1) tt1
+ |where c > -1 and b > 1
+ |""".stripMargin
+
+ Seq(q1, q2, q3, q4, q5).foreach {
+ runQueryAndCompare(_) { _ => }
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]