viirya commented on code in PR #49416:
URL: https://github.com/apache/spark/pull/49416#discussion_r1908336236


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/InsertSortForLimitAndOffset.scala:
##########
@@ -43,33 +42,61 @@ object InsertSortForLimitAndOffset extends Rule[SparkPlan] {
     plan transform {
       case l @ GlobalLimitExec(
           _,
-          SinglePartitionShuffleWithGlobalOrdering(ordering),
-          _) =>
-        val newChild = SortExec(ordering, global = false, child = l.child)
-        l.withNewChildren(Seq(newChild))
-    }
-  }
-
-  object SinglePartitionShuffleWithGlobalOrdering {
-    @tailrec
-    def unapply(plan: SparkPlan): Option[Seq[SortOrder]] = plan match {
-      case ShuffleExchangeExec(SinglePartition, 
SparkPlanWithGlobalOrdering(ordering), _, _) =>
-        Some(ordering)
-      case p: AQEShuffleReadExec => unapply(p.child)
-      case p: ShuffleQueryStageExec => unapply(p.plan)
-      case _ => None
+          // Should not match AQE shuffle stage because we only target 
un-submitted stages which
+          // we can still rewrite the query plan.
+          s @ ShuffleExchangeExec(SinglePartition, child, _, _),
+          _) if child.logicalLink.isDefined =>
+        extractOrderingAndPropagateOrderingColumns(child) match {
+          case Some((ordering, newChild)) =>
+            val newShuffle = s.withNewChildren(Seq(newChild))
+            val sorted = SortExec(ordering, global = false, child = newShuffle)
+            // We must set the logical plan link to avoid losing the added 
SortExec and ProjectExec
+            // during AQE re-optimization, where we turn physical plan back to 
logical plan.
+            val logicalSort = Sort(ordering, global = false, child = 
s.child.logicalLink.get)
+            sorted.setLogicalLink(logicalSort)
+            val projected = if (sorted.output == s.output) {
+              sorted
+            } else {
+              val p = ProjectExec(s.output, sorted)
+              p.setLogicalLink(Project(s.output, logicalSort))
+              p
+            }
+            l.withNewChildren(Seq(projected))
+          case _ => l
+        }
     }
   }
 
   // Note: this is not implementing a generalized notion of "global order 
preservation", but just
-  // tackles the regular ORDER BY semantics with optional LIMIT (top-K).
-  object SparkPlanWithGlobalOrdering {
-    @tailrec
-    def unapply(plan: SparkPlan): Option[Seq[SortOrder]] = plan match {
-      case p: SortExec if p.global => Some(p.sortOrder)
-      case p: LocalLimitExec => unapply(p.child)
-      case p: WholeStageCodegenExec => unapply(p.child)
-      case _ => None
-    }
+  // a best effort to catch the common query patterns that the data ordering 
should be preserved.
+  private def extractOrderingAndPropagateOrderingColumns(
+      plan: SparkPlan): Option[(Seq[SortOrder], SparkPlan)] = plan match {
+    case p: SortExec if p.global => Some(p.sortOrder, p)
+    case p: UnaryExecNode if
+        p.isInstanceOf[LocalLimitExec] ||
+          p.isInstanceOf[WholeStageCodegenExec] ||
+          p.isInstanceOf[FilterExec] ||
+          p.isInstanceOf[EvalPythonExec] =>
+      extractOrderingAndPropagateOrderingColumns(p.child) match {
+        case Some((ordering, newChild)) => Some((ordering, 
p.withNewChildren(Seq(newChild))))
+        case _ => None
+      }
+    case p: ProjectExec =>
+      extractOrderingAndPropagateOrderingColumns(p.child) match {
+        case Some((ordering, newChild)) =>

Review Comment:
   Hmm, so can I say you want to keep the data ordering unchanged after (single 
partition) shuffle? It may not form a particular ordering by column, but just 
keep the physical data ordering same before and after the shuffle of global 
limit?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to