ulysses-you commented on code in PR #49715:
URL: https://github.com/apache/spark/pull/49715#discussion_r1944256813


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -344,56 +350,50 @@ case class AdaptiveSparkPlanExec(
         if (errors.nonEmpty) {
           cleanUpAndThrowException(errors.toSeq, None)
         }
-
-        // Try re-optimizing and re-planning. Adopt the new plan if its cost 
is equal to or less
-        // than that of the current plan; otherwise keep the current physical 
plan together with
-        // the current logical plan since the physical plan's logical links 
point to the logical
-        // plan it has originated from.
-        // Meanwhile, we keep a list of the query stages that have been 
created since last plan
-        // update, which stands for the "semantic gap" between the current 
logical and physical
-        // plans. And each time before re-planning, we replace the 
corresponding nodes in the
-        // current logical plan with logical query stages to make it 
semantically in sync with
-        // the current physical plan. Once a new plan is adopted and both 
logical and physical
-        // plans are updated, we can clear the query stage list because at 
this point the two plans
-        // are semantically and physically in sync again.
-        val logicalPlan = 
replaceWithQueryStagesInLogicalPlan(currentLogicalPlan, stagesToReplace)
-        val afterReOptimize = reOptimize(logicalPlan)
-        if (afterReOptimize.isDefined) {
-          val (newPhysicalPlan, newLogicalPlan) = afterReOptimize.get
-          val origCost = costEvaluator.evaluateCost(currentPhysicalPlan)
-          val newCost = costEvaluator.evaluateCost(newPhysicalPlan)
-          if (newCost < origCost ||
-            (newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) {
-            lazy val plans =
-              sideBySide(currentPhysicalPlan.treeString, 
newPhysicalPlan.treeString).mkString("\n")
-            logOnLevel(log"Plan changed:\n${MDC(QUERY_PLAN, plans)}")
-            cleanUpTempTags(newPhysicalPlan)
-            currentPhysicalPlan = newPhysicalPlan
-            currentLogicalPlan = newLogicalPlan
-            stagesToReplace = Seq.empty[QueryStageExec]
+        if (!currentPhysicalPlan.isInstanceOf[ResultQueryStageExec]) {

Review Comment:
   why do we need to skip `ResultQueryStageExec` ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -215,6 +215,14 @@ case class AdaptiveSparkPlanExec(
 
   @volatile private var currentPhysicalPlan = initialPlan
 
+  // Use inputPlan logicalLink here in case some top level physical nodes may 
be removed
+  // during `initialPlan`
+  @transient @volatile private var currentLogicalPlan: LogicalPlan = {
+    inputPlan.logicalLink.get
+  }
+
+  val stagesToReplace = mutable.ArrayBuffer.empty[QueryStageExec]

Review Comment:
   is this change necessary ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -268,25 +276,23 @@ case class AdaptiveSparkPlanExec(
 
   def finalPhysicalPlan: SparkPlan = withFinalPlanUpdate(identity)
 
-  private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
-    if (isFinalPlan) return currentPhysicalPlan
-
+  /**
+   * Run `fun` on finalized physical plan
+   */
+  def withFinalPlanUpdate[T](fun: SparkPlan => T): T = lock.synchronized {
+    _isFinalPlan = false

Review Comment:
   so when we call df.collect multi-times, we will re-optimize final stage 
multi-times. It is due to for each call we need to wrap new 
ResultQueryStageExec.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -344,56 +350,50 @@ case class AdaptiveSparkPlanExec(
         if (errors.nonEmpty) {
           cleanUpAndThrowException(errors.toSeq, None)
         }
-
-        // Try re-optimizing and re-planning. Adopt the new plan if its cost 
is equal to or less
-        // than that of the current plan; otherwise keep the current physical 
plan together with
-        // the current logical plan since the physical plan's logical links 
point to the logical
-        // plan it has originated from.
-        // Meanwhile, we keep a list of the query stages that have been 
created since last plan
-        // update, which stands for the "semantic gap" between the current 
logical and physical
-        // plans. And each time before re-planning, we replace the 
corresponding nodes in the
-        // current logical plan with logical query stages to make it 
semantically in sync with
-        // the current physical plan. Once a new plan is adopted and both 
logical and physical
-        // plans are updated, we can clear the query stage list because at 
this point the two plans
-        // are semantically and physically in sync again.
-        val logicalPlan = 
replaceWithQueryStagesInLogicalPlan(currentLogicalPlan, stagesToReplace)
-        val afterReOptimize = reOptimize(logicalPlan)
-        if (afterReOptimize.isDefined) {
-          val (newPhysicalPlan, newLogicalPlan) = afterReOptimize.get
-          val origCost = costEvaluator.evaluateCost(currentPhysicalPlan)
-          val newCost = costEvaluator.evaluateCost(newPhysicalPlan)
-          if (newCost < origCost ||
-            (newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) {
-            lazy val plans =
-              sideBySide(currentPhysicalPlan.treeString, 
newPhysicalPlan.treeString).mkString("\n")
-            logOnLevel(log"Plan changed:\n${MDC(QUERY_PLAN, plans)}")
-            cleanUpTempTags(newPhysicalPlan)
-            currentPhysicalPlan = newPhysicalPlan
-            currentLogicalPlan = newLogicalPlan
-            stagesToReplace = Seq.empty[QueryStageExec]
+        if (!currentPhysicalPlan.isInstanceOf[ResultQueryStageExec]) {
+          // Try re-optimizing and re-planning. Adopt the new plan if its cost 
is equal to or less
+          // than that of the current plan; otherwise keep the current 
physical plan together with
+          // the current logical plan since the physical plan's logical links 
point to the logical
+          // plan it has originated from.
+          // Meanwhile, we keep a list of the query stages that have been 
created since last plan
+          // update, which stands for the "semantic gap" between the current 
logical and physical
+          // plans. And each time before re-planning, we replace the 
corresponding nodes in the
+          // current logical plan with logical query stages to make it 
semantically in sync with
+          // the current physical plan. Once a new plan is adopted and both 
logical and physical
+          // plans are updated, we can clear the query stage list because at 
this point the two
+          // plans are semantically and physically in sync again.
+          val logicalPlan = replaceWithQueryStagesInLogicalPlan(
+            currentLogicalPlan, stagesToReplace.toSeq)
+          val afterReOptimize = reOptimize(logicalPlan)
+          if (afterReOptimize.isDefined) {
+            val (newPhysicalPlan, newLogicalPlan) = afterReOptimize.get
+            val origCost = costEvaluator.evaluateCost(currentPhysicalPlan)
+            val newCost = costEvaluator.evaluateCost(newPhysicalPlan)
+            if (newCost < origCost ||
+              (newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) 
{
+              lazy val plans = sideBySide(
+                currentPhysicalPlan.treeString, 
newPhysicalPlan.treeString).mkString("\n")
+              logOnLevel(log"Plan changed:\n${MDC(QUERY_PLAN, plans)}")
+              cleanUpTempTags(newPhysicalPlan)
+              currentPhysicalPlan = newPhysicalPlan
+              currentLogicalPlan = newLogicalPlan
+              stagesToReplace.clear()
+            }
           }
         }
         // Now that some stages have finished, we can try creating new stages.
-        result = createQueryStages(currentPhysicalPlan)
+        result = createQueryStages(fun, currentPhysicalPlan, firstRun = false)
       }
-
-      // Run the final plan when there's no more unfinished stages.
-      currentPhysicalPlan = applyPhysicalRules(
-        optimizeQueryStage(result.newPlan, isFinalStage = true),
-        postStageCreationRules(supportsColumnar),
-        Some((planChangeLogger, "AQE Post Stage Creation")))
-      _isFinalPlan = true
-      executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
-      currentPhysicalPlan
     }
+    _isFinalPlan = true
+    finalPlanUpdate
+    
currentPhysicalPlan.asInstanceOf[ResultQueryStageExec].resultOption.get().get.asInstanceOf[T]

Review Comment:
   does it mean we would cache result data ? is it expected ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to