Pajaraja commented on code in PR #50717:
URL: https://github.com/apache/spark/pull/50717#discussion_r2079513363
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -456,6 +456,196 @@ abstract class Optimizer(catalogManager: CatalogManager)
}
}
+ /**
+ * Attempts to eliminate the reading of unneeded columns from the query plan.
+ *
+ * Since adding Project before Filter conflicts with
PushPredicatesThroughProject, this rule will
+ * remove the Project p2 in the following pattern:
+ *
+ * p1 @ Project(_, Filter(_, p2 @ Project(_, child))) if
p2.outputSet.subsetOf(p2.inputSet)
+ *
+ * p2 is usually inserted by this rule and useless, p1 could prune the
columns anyway.
+ */
+ object ColumnPruning extends Rule[LogicalPlan] {
+
+ def apply(plan: LogicalPlan): LogicalPlan = removeProjectBeforeFilter(
+ plan.transformWithPruning(AlwaysProcess.fn, ruleId) {
+ // Prunes the unused columns from project list of
Project/Aggregate/Expand
+ case p @ Project(_, p2: Project) if
!p2.outputSet.subsetOf(p.references) =>
+ p.copy(child = p2.copy(projectList =
p2.projectList.filter(p.references.contains)))
+ case p @ Project(_, a: Aggregate) if
!a.outputSet.subsetOf(p.references) =>
+ p.copy(
+ child = a.copy(aggregateExpressions =
+ a.aggregateExpressions.filter(p.references.contains)))
+ case a @ Project(_, e @ Expand(_, _, grandChild)) if
!e.outputSet.subsetOf(a.references) =>
+ val newOutput = e.output.filter(a.references.contains(_))
+ val newProjects = e.projections.map { proj =>
+ proj.zip(e.output).filter { case (_, a) =>
+ newOutput.contains(a)
+ }.map(_._1)
+ }
+ a.copy(child = Expand(newProjects, newOutput, grandChild))
+
+ // Prune and drop AttachDistributedSequence if the produced attribute
is not referred.
+ case p @ Project(_, a @ AttachDistributedSequence(_, grandChild))
+ if !p.references.contains(a.sequenceAttr) =>
+ p.copy(child = prunedChild(grandChild, p.references))
+
+ // Prunes the unused columns from child of `DeserializeToObject`
+ case d @ DeserializeToObject(_, _, child) if
!child.outputSet.subsetOf(d.references) =>
+ d.copy(child = prunedChild(child, d.references))
+
+ // Prunes the unused columns from child of
Aggregate/Expand/Generate/ScriptTransformation
+ case a @ Aggregate(_, _, child, _) if
!child.outputSet.subsetOf(a.references) =>
+ a.copy(child = prunedChild(child, a.references))
+ case f @ FlatMapGroupsInPandas(_, _, _, child) if
!child.outputSet.subsetOf(f.references) =>
+ f.copy(child = prunedChild(child, f.references))
+ case e @ Expand(_, _, child) if
!child.outputSet.subsetOf(e.references) =>
+ e.copy(child = prunedChild(child, e.references))
+
+ // prune unused columns from child of MergeRows for row-level
operations
+ case e @ MergeRows(_, _, _, _, _, _, _, child) if
!child.outputSet.subsetOf(e.references) =>
+ e.copy(child = prunedChild(child, e.references))
+
+ // prune unrequired references
+ case p @ Project(_, g: Generate) if p.references != g.outputSet =>
+ val requiredAttrs = p.references -- g.producedAttributes ++
g.generator.references
+ val newChild = prunedChild(g.child, requiredAttrs)
+ val unrequired = g.generator.references -- p.references
+ val unrequiredIndices = newChild.output.zipWithIndex
+ .filter(t => unrequired.contains(t._1))
+ .map(_._2)
+ p.copy(child = g.copy(child = newChild, unrequiredChildIndex =
unrequiredIndices))
+
+ // prune unrequired nested fields from `Generate`.
+ case GeneratorNestedColumnAliasing(rewrittenPlan) => rewrittenPlan
+
+ // Eliminate unneeded attributes from right side of a Left Existence
Join.
+ case j @ Join(_, right, LeftExistence(_), _, _) =>
+ j.copy(right = prunedChild(right, j.references))
+
+ // all the columns will be used to compare, so we can't prune them
+ case p @ Project(_, _: SetOperation) => p
+ case p @ Project(_, _: Distinct) => p
+ // Eliminate unneeded attributes from children of Union.
+ case p @ Project(_, u: Union) =>
+ if (!u.outputSet.subsetOf(p.references)) {
+ val firstChild = u.children.head
+ val newOutput = prunedChild(firstChild, p.references).output
+ // pruning the columns of all children based on the pruned first
child.
+ val newChildren = u.children.map { p =>
+ val selected = p.output.zipWithIndex.filter { case (a, i) =>
+ newOutput.contains(firstChild.output(i))
+ }.map(_._1)
+ Project(selected, p)
+ }
+ p.copy(child = u.withNewChildren(newChildren))
+ } else {
+ p
+ }
+
+ case p @ Project(_, ul: UnionLoop) =>
Review Comment:
Changed!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]