milanisvet commented on code in PR #49571:
URL: https://github.com/apache/spark/pull/49571#discussion_r1932461569


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala:
##########
@@ -714,6 +717,133 @@ case class UnionExec(children: Seq[SparkPlan]) extends 
SparkPlan {
     copy(children = newChildren)
 }
 
+/**
+ * The physical node for recursion. Currently only UNION ALL case is supported.
+ * In the first iteration, anchor term is executed.
+ * Then, in each following iteration, the UnionLoopRef node is substituted 
with the plan from the
+ * previous iteration, and such plan is executed.
+ * After every iteration, the dataframe is repartitioned.
+ * The recursion stops when the generated dataframe is empty, or either the 
limit or
+ * the specified maximum depth from the config is reached.
+ *
+ * @param loopId The id of the loop.
+ * @param anchor The logical plan of the initial element of the loop.
+ * @param recursion The logical plan that describes the recursion with an 
[[UnionLoopRef]] node.
+ * @param output The output attributes of this loop.
+ * @param limit In case we have a plan with the limit node, it is pushed down 
to UnionLoop and then
+ *              transferred to UnionLoopExec, to stop the recursion after 
specific amount of rows
+ *              is generated.
+ *              Note here: limit can be applied in the main query calling the 
recursive CTE, and not
+ *              inside the recursive term of recursive CTE.
+ */
+case class UnionLoopExec(
+    loopId: Long,
+    @transient anchor: LogicalPlan,
+    @transient recursion: LogicalPlan,
+    override val output: Seq[Attribute],
+    limit: Option[Int] = None) extends LeafExecNode {
+
+  override def innerChildren: Seq[QueryPlan[_]] = Seq(anchor, recursion)
+
+  override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
+
+  /**
+   * This function executes the plan (optionally with appended limit node) and 
caches the result,
+   * with the caching mode specified in config.
+   */
+  private def executeAndCacheAndCount(
+     plan: LogicalPlan, currentLimit: Int) = {
+    // In case limit is defined, we create a (global) limit node above the 
plan and execute
+    // the newly created plan.
+    // Note here: global limit requires coordination (shuffle) between 
partitions.
+    val planOrLimitedPlan = if (limit.isDefined) {
+      Limit(Literal(currentLimit), plan)
+    } else {
+      plan
+    }
+    val df = Dataset.ofRows(session, planOrLimitedPlan)
+    val cachedDF = df.repartition()
+    val count = cachedDF.count()
+    (cachedDF, count)
+  }
+
+  override protected def doExecute(): RDD[InternalRow] = {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to