cloud-fan commented on code in PR #49571: URL: https://github.com/apache/spark/pull/49571#discussion_r1924881213
########## sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala: ########## @@ -714,6 +718,147 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan { copy(children = newChildren) } +/** + * The physical node for recursion. Currently only UNION ALL case is supported. + * + * @param loopId The id of the loop. + * @param anchor The logical plan of the initial element of the loop. + * @param recursion The logical plan that describes the recursion with an [[UnionLoopRef]] node. + * @param output The output attributes of this loop. + * @param limit In case we have a plan with the limit node, it is pushed down to UnionLoop and then + * transferred to UnionLoopExec, to stop the recursion after specific amount of rows + * is generated. + */ +case class UnionLoopExec( + loopId: Long, + @transient anchor: LogicalPlan, + @transient recursion: LogicalPlan, + override val output: Seq[Attribute], + limit: Option[Int] = None) extends LeafExecNode { + + val levelLimit = conf.getConf(SQLConf.CTE_RECURSION_LEVEL_LIMIT) + val cacheMode = CTERecursionCacheMode.withName(conf.getConf(SQLConf.CTE_RECURSION_CACHE_MODE)) + + // We store the initial and generated children of the loop in this buffer. + // Please note that all elements are cached because of performance reasons as they are needed for + // next iteration. + @transient private val unionDFs = mutable.ArrayBuffer.empty[DataFrame] + + override def innerChildren: Seq[QueryPlan[_]] = Seq(anchor, recursion) + + override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + + /** + * This function executes the plan (optionally with appended limit node) and caches the result, + * with the caching mode specified in config. + */ + private def executeAndCacheAndCount( + plan: LogicalPlan, limit: Option[Long]) = { + // In case limit is defined, we create a limit node above the plan and execute + // the newly created plan. + val limitedPlan = limit.map(l => Limit(Literal(l.toInt), plan)).getOrElse(plan) Review Comment: ah it's actually local limit: https://github.com/apache/spark/pull/49571/files#diff-11264d807efa58054cca2d220aae8fba644ee0f0f2a4722c46d52828394846efR853 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org