cloud-fan commented on code in PR #49955: URL: https://github.com/apache/spark/pull/49955#discussion_r1959756734
########## sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala: ########## @@ -714,6 +717,177 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan { copy(children = newChildren) } +/** + * The physical node for recursion. Currently only UNION ALL case is supported. + * For the details about the execution, look at the comment above doExecute function. + * + * A simple recursive query: + * {{{ + * WITH RECURSIVE t(n) AS ( + * SELECT 1 + * UNION ALL + * SELECT n+1 FROM t WHERE n < 5) + * SELECT * FROM t; + * }}} + * Corresponding logical plan for the recursive query above: + * {{{ + * WithCTE + * :- CTERelationDef 0, false + * : +- SubqueryAlias t + * : +- Project [1#0 AS n#3] + * : +- UnionLoop 0 + * : :- Project [1 AS 1#0] + * : : +- OneRowRelation + * : +- Project [(n#1 + 1) AS (n + 1)#2] + * : +- Filter (n#1 < 5) + * : +- SubqueryAlias t + * : +- Project [1#0 AS n#1] + * : +- UnionLoopRef 0, [1#0], false + * +- Project [n#3] + * +- SubqueryAlias t + * +- CTERelationRef 0, true, [n#3], false, false + * }}} + * + * @param loopId This is id of the CTERelationDef containing the recursive query. Its value is + * first passed down to UnionLoop when creating it, and then to UnionLoopExec in + * SparkStrategies. + * @param anchor The logical plan of the initial element of the loop. + * @param recursion The logical plan that describes the recursion with an [[UnionLoopRef]] node. + * CTERelationRef, which is marked as recursive, gets substituted with + * [[UnionLoopRef]] in ResolveWithCTE. + * Both anchor and recursion are marked with @transient annotation, so that they + * are not serialized. + * @param output The output attributes of this loop. + * @param limit If defined, the total number of rows output by this operator will be bounded by + * limit. + * Its value is pushed down to UnionLoop in Optimizer in case Limit node is present + * in the logical plan and then transferred to UnionLoopExec in SparkStrategies. + * Note here: limit can be applied in the main query calling the recursive CTE, and not + * inside the recursive term of recursive CTE. + */ +case class UnionLoopExec( + loopId: Long, + @transient anchor: LogicalPlan, + @transient recursion: LogicalPlan, + override val output: Seq[Attribute], + limit: Option[Int] = None) extends LeafExecNode { + + override def innerChildren: Seq[QueryPlan[_]] = Seq(anchor, recursion) + + override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numRecursiveLoops" -> SQLMetrics.createMetric(sparkContext, "number of recursive loops")) + + /** + * This function executes the plan (optionally with appended limit node) and caches the result, + * with the caching mode specified in config. + */ + private def executeAndCacheAndCount( + plan: LogicalPlan, currentLimit: Int) = { + // In case limit is defined, we create a (global) limit node above the plan and execute + // the newly created plan. + // Note here: global limit requires coordination (shuffle) between partitions. + val planOrLimitedPlan = if (limit.isDefined) { + Limit(Literal(currentLimit), plan) + } else { + plan + } + val df = Dataset.ofRows(session, planOrLimitedPlan) + val cachedDF = df.repartition() + val count = cachedDF.count() + (cachedDF, count) + } + + /** + * In the first iteration, anchor term is executed. + * Then, in each following iteration, the UnionLoopRef node is substituted with the plan from the + * previous iteration, and such plan is executed. + * After every iteration, the dataframe is repartitioned. + * The recursion stops when the generated dataframe is empty, or either the limit or + * the specified maximum depth from the config is reached. + */ + override protected def doExecute(): RDD[InternalRow] = { + val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + val numOutputRows = longMetric("numOutputRows") + val numRecursiveLoops = longMetric("numRecursiveLoops") + val levelLimit = conf.getConf(SQLConf.CTE_RECURSION_LEVEL_LIMIT) + + // currentLimit is initialized from the limit argument, and in each step it is decreased by + // the number of rows generated in that step. + // If limit is not passed down, currentLimit is set to be zero and won't be considered in the + // condition of while loop down (limit.isEmpty will be true). + var currentLimit = limit.getOrElse(0) + val unionChildren = mutable.ArrayBuffer.empty[LogicalRDD] + + var (prevDF, prevCount) = executeAndCacheAndCount(anchor, currentLimit) + + var currentLevel = 1 + + // Main loop for obtaining the result of the recursive query. + while (prevCount > 0 && (limit.isEmpty || currentLimit > 0)) { Review Comment: ```suggestion while (prevCount > 0 && currentLimit > 0) { ``` ########## sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala: ########## @@ -714,6 +717,177 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan { copy(children = newChildren) } +/** + * The physical node for recursion. Currently only UNION ALL case is supported. + * For the details about the execution, look at the comment above doExecute function. + * + * A simple recursive query: + * {{{ + * WITH RECURSIVE t(n) AS ( + * SELECT 1 + * UNION ALL + * SELECT n+1 FROM t WHERE n < 5) + * SELECT * FROM t; + * }}} + * Corresponding logical plan for the recursive query above: + * {{{ + * WithCTE + * :- CTERelationDef 0, false + * : +- SubqueryAlias t + * : +- Project [1#0 AS n#3] + * : +- UnionLoop 0 + * : :- Project [1 AS 1#0] + * : : +- OneRowRelation + * : +- Project [(n#1 + 1) AS (n + 1)#2] + * : +- Filter (n#1 < 5) + * : +- SubqueryAlias t + * : +- Project [1#0 AS n#1] + * : +- UnionLoopRef 0, [1#0], false + * +- Project [n#3] + * +- SubqueryAlias t + * +- CTERelationRef 0, true, [n#3], false, false + * }}} + * + * @param loopId This is id of the CTERelationDef containing the recursive query. Its value is + * first passed down to UnionLoop when creating it, and then to UnionLoopExec in + * SparkStrategies. + * @param anchor The logical plan of the initial element of the loop. + * @param recursion The logical plan that describes the recursion with an [[UnionLoopRef]] node. + * CTERelationRef, which is marked as recursive, gets substituted with + * [[UnionLoopRef]] in ResolveWithCTE. + * Both anchor and recursion are marked with @transient annotation, so that they + * are not serialized. + * @param output The output attributes of this loop. + * @param limit If defined, the total number of rows output by this operator will be bounded by + * limit. + * Its value is pushed down to UnionLoop in Optimizer in case Limit node is present + * in the logical plan and then transferred to UnionLoopExec in SparkStrategies. + * Note here: limit can be applied in the main query calling the recursive CTE, and not + * inside the recursive term of recursive CTE. + */ +case class UnionLoopExec( + loopId: Long, + @transient anchor: LogicalPlan, + @transient recursion: LogicalPlan, + override val output: Seq[Attribute], + limit: Option[Int] = None) extends LeafExecNode { + + override def innerChildren: Seq[QueryPlan[_]] = Seq(anchor, recursion) + + override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numRecursiveLoops" -> SQLMetrics.createMetric(sparkContext, "number of recursive loops")) + + /** + * This function executes the plan (optionally with appended limit node) and caches the result, + * with the caching mode specified in config. + */ + private def executeAndCacheAndCount( + plan: LogicalPlan, currentLimit: Int) = { + // In case limit is defined, we create a (global) limit node above the plan and execute + // the newly created plan. + // Note here: global limit requires coordination (shuffle) between partitions. + val planOrLimitedPlan = if (limit.isDefined) { Review Comment: maybe `currentLimit > 0`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org