cloud-fan commented on code in PR #50402: URL: https://github.com/apache/spark/pull/50402#discussion_r2062943825
########## sql/core/src/main/scala/org/apache/spark/sql/execution/UnionLoopExec.scala: ########## @@ -127,28 +134,67 @@ case class UnionLoopExec( val numAnchorOutputRows = longMetric("numAnchorOutputRows") val levelLimit = conf.getConf(SQLConf.CTE_RECURSION_LEVEL_LIMIT) val rowLimit = conf.getConf(SQLConf.CTE_RECURSION_ROW_LIMIT) + val AnchorRowLimitToConverToLocalLimit = + conf.getConf(SQLConf.CTE_RECURSION_ANCHOR_ROWS_LIMIT_TO_CONVERT_TO_LOCAL_RELATION) // currentLimit is initialized from the limit argument, and in each step it is decreased by // the number of rows generated in that step. // If limit is not passed down, currentLimit is set to be zero and won't be considered in the // condition of while loop down (limit.isEmpty will be true). var currentLimit = limit.getOrElse(-1) - val unionChildren = mutable.ArrayBuffer.empty[LogicalRDD] + val unionChildren = mutable.ArrayBuffer.empty[LogicalPlan] var (prevDF, prevCount) = executeAndCacheAndCount(anchor, currentLimit) numAnchorOutputRows += prevCount var currentLevel = 1 - var currentNumRows = 0 + var currentNumRows: Long = 0 var limitReached: Boolean = false val numPartitions = prevDF.queryExecution.toRdd.partitions.length + + // In the case we return a sufficiently small number of rows when executing the anchor, + // we convert the result of the anchor into a LocalRelation, so that, if the recursion doesn't + // reference any external tables, we are able to calculate everything in the optimizer, using + // the ConvertToLocalRelation rule, which significantly improves runtime. + if (prevCount <= AnchorRowLimitToConverToLocalLimit && + !prevDF.queryExecution.optimizedPlan.isInstanceOf[LocalRelation]) { + val local = LocalRelation.fromExternalRows(anchor.output, prevDF.collect().toIndexedSeq) + prevDF = Dataset.ofRows(session, local) + } // Main loop for obtaining the result of the recursive query. while (prevCount > 0 && !limitReached) { + var prevPlan: LogicalPlan = null + // the current plan is created by substituting UnionLoopRef node with the project node of + // the previous plan. + // This way we support only UNION ALL case. Additional case should be added for UNION case. + // One way of supporting UNION case can be seen at SPARK-24497 PR from Peter Toth. + val newRecursion = recursion.transform { + case r: UnionLoopRef if r.loopId == loopId => + prevDF.queryExecution.optimizedPlan match { + case l: LocalRelation => + prevPlan = l + l.copy(output = r.output) + case p @ Project(projectList, _: OneRowRelation) => Review Comment: Can we add a comment to explain that this case is still needed if the to-local-relation optimization is disabled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org