JoshRosen commented on code in PR #49416: URL: https://github.com/apache/spark/pull/49416#discussion_r1908148124
########## sql/core/src/main/scala/org/apache/spark/sql/execution/InsertSortForLimitAndOffset.scala: ########## @@ -43,33 +41,60 @@ object InsertSortForLimitAndOffset extends Rule[SparkPlan] { plan transform { case l @ GlobalLimitExec( _, - SinglePartitionShuffleWithGlobalOrdering(ordering), - _) => - val newChild = SortExec(ordering, global = false, child = l.child) - l.withNewChildren(Seq(newChild)) - } - } - - object SinglePartitionShuffleWithGlobalOrdering { - @tailrec - def unapply(plan: SparkPlan): Option[Seq[SortOrder]] = plan match { - case ShuffleExchangeExec(SinglePartition, SparkPlanWithGlobalOrdering(ordering), _, _) => - Some(ordering) - case p: AQEShuffleReadExec => unapply(p.child) - case p: ShuffleQueryStageExec => unapply(p.plan) - case _ => None + // Should not match AQE shuffle stage because we only target un-submitted stages which + // we can still rewrite the query plan. + s @ ShuffleExchangeExec(SinglePartition, child, _, _), + _) if child.logicalLink.isDefined => + extractOrderingAndPropagateOrderingColumns(child) match { + case Some((ordering, newChild)) => + val newShuffle = s.withNewChildren(Seq(newChild)) + val sorted = SortExec(ordering, global = false, child = newShuffle) + // We must set the logical plan link to avoid losing the added SortExec and ProjectExec + // during AQE re-optimization, where we turn physical plan back to logical plan. + val logicalSort = Sort(ordering, global = false, child = s.child.logicalLink.get) + sorted.setLogicalLink(logicalSort) + val projected = if (sorted.output == s.output) { + sorted + } else { + val p = ProjectExec(s.output, sorted) + p.setLogicalLink(Project(s.output, logicalSort)) + p + } + l.withNewChildren(Seq(projected)) + case _ => l + } } } // Note: this is not implementing a generalized notion of "global order preservation", but just // tackles the regular ORDER BY semantics with optional LIMIT (top-K). Review Comment: Brainstorming: are there any other types of operators that could sit between the global sort and the global limit for which we might want to preserve ordering? Off the top of my head, one case that might not be covered is external UDF evaluation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org