cloud-fan commented on code in PR #50402:
URL: https://github.com/apache/spark/pull/50402#discussion_r2062943825


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/UnionLoopExec.scala:
##########
@@ -127,28 +134,67 @@ case class UnionLoopExec(
     val numAnchorOutputRows = longMetric("numAnchorOutputRows")
     val levelLimit = conf.getConf(SQLConf.CTE_RECURSION_LEVEL_LIMIT)
     val rowLimit = conf.getConf(SQLConf.CTE_RECURSION_ROW_LIMIT)
+    val AnchorRowLimitToConverToLocalLimit =
+      
conf.getConf(SQLConf.CTE_RECURSION_ANCHOR_ROWS_LIMIT_TO_CONVERT_TO_LOCAL_RELATION)
 
     // currentLimit is initialized from the limit argument, and in each step 
it is decreased by
     // the number of rows generated in that step.
     // If limit is not passed down, currentLimit is set to be zero and won't 
be considered in the
     // condition of while loop down (limit.isEmpty will be true).
     var currentLimit = limit.getOrElse(-1)
 
-    val unionChildren = mutable.ArrayBuffer.empty[LogicalRDD]
+    val unionChildren = mutable.ArrayBuffer.empty[LogicalPlan]
 
     var (prevDF, prevCount) = executeAndCacheAndCount(anchor, currentLimit)
 
     numAnchorOutputRows += prevCount
 
     var currentLevel = 1
 
-    var currentNumRows = 0
+    var currentNumRows: Long = 0
 
     var limitReached: Boolean = false
 
     val numPartitions = prevDF.queryExecution.toRdd.partitions.length
+
+    // In the case we return a sufficiently small number of rows when 
executing the anchor,
+    // we convert the result of the anchor into a LocalRelation, so that, if 
the recursion doesn't
+    // reference any external tables, we are able to calculate everything in 
the optimizer, using
+    // the ConvertToLocalRelation rule, which significantly improves runtime.
+    if (prevCount <= AnchorRowLimitToConverToLocalLimit &&
+      !prevDF.queryExecution.optimizedPlan.isInstanceOf[LocalRelation]) {
+      val local = LocalRelation.fromExternalRows(anchor.output, 
prevDF.collect().toIndexedSeq)
+      prevDF = Dataset.ofRows(session, local)
+    }
     // Main loop for obtaining the result of the recursive query.
     while (prevCount > 0 && !limitReached) {
+      var prevPlan: LogicalPlan = null
+      // the current plan is created by substituting UnionLoopRef node with 
the project node of
+      // the previous plan.
+      // This way we support only UNION ALL case. Additional case should be 
added for UNION case.
+      // One way of supporting UNION case can be seen at SPARK-24497 PR from 
Peter Toth.
+      val newRecursion = recursion.transform {
+        case r: UnionLoopRef if r.loopId == loopId =>
+          prevDF.queryExecution.optimizedPlan match {
+            case l: LocalRelation =>
+              prevPlan = l
+              l.copy(output = r.output)
+            case p @ Project(projectList, _: OneRowRelation) =>

Review Comment:
   Can we add a comment to explain that this case is still needed if the 
to-local-relation optimization is disabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to