dtenedor commented on code in PR #49351:
URL: https://github.com/apache/spark/pull/49351#discussion_r1911181511


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala:
##########
@@ -762,4 +762,15 @@ object QueryPlan extends PredicateHelper {
       case e: AnalysisException => append(e.toString)
     }
   }
+
+  /**
+   * Generate detailed field string with different format based on type of 
input value

Review Comment:
   this comment and method name are a bit generic; could we expand them to 
mention what type of field we are referring to here, and when this might be 
used? can we give a brief example?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  // Substitute CTERelationRef with UnionLoopRef.
+  private def transformRefs(plan: LogicalPlan) = {
+    plan.transformWithPruning(_.containsPattern(CTE)) {
+      case r: CTERelationRef if r.recursive =>
+        UnionLoopRef(r.cteId, r.output, false)
+    }
+  }
+
+  // Update the definition's recursiveAnchor if the anchor is resolved.
+  private def recursiveAnchorResolved(cteDef: CTERelationDef): 
Option[LogicalPlan] = {
+    cteDef.child match {
+      case SubqueryAlias(_, ul: UnionLoop) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, Distinct(ul: UnionLoop)) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) 
=>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: 
UnionLoop))) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case _ =>
+        cteDef.failAnalysis(
+          errorClass = "INVALID_RECURSIVE_CTE",
+          messageParameters = Map.empty)
+    }
+  }
+
   private def resolveWithCTE(
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
       case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
+        val newCTEDefs = cteDefs.map { cteDef =>
+          val newCTEDef = if (cteDef.recursive) {
+            cteDef.child match {
+              // Substitutions to UnionLoop and UnionLoopRef.
+              case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, 
false)) =>

Review Comment:
   +1 this code seems duplicated, we can use a helper to dedup it



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -539,48 +580,56 @@ case class Union(
     children.length > 1 && !(byName || allowMissingCol) && childrenResolved && 
allChildrenCompatible
   }
 
-  private lazy val lazyOutput: Seq[Attribute] = computeOutput()
+  override protected def withNewChildrenInternal(newChildren: 
IndexedSeq[LogicalPlan]): Union =
+    copy(children = newChildren)
+}
 
-  private def computeOutput(): Seq[Attribute] = 
Union.mergeChildOutputs(children.map(_.output))
+/**
+ * The logical node for recursion, that contains a initial (anchor) and a 
recursion describing term,
+ * that contains an [[UnionLoopRef]] node.
+ * The node is very similar to [[Union]] because the initial and "generated" 
children are union-ed
+ * and it is also similar to a loop because the recursion continues until the 
last generated child
+ * is not empty.
+ *
+ * @param id The id of the loop, inherited from [[CTERelationDef]]
+ * @param anchor The plan of the initial element of the loop.
+ * @param recursion The plan that describes the recursion with an 
[[UnionLoopRef]] node.
+ * @param limit An optional limit that can be pushed down to the node to stop 
the loop earlier.
+ */
+case class UnionLoop(id: Long,

Review Comment:
   please fix indentation (start the params on the next line indented by +4 
spaces each), here and elsewhere in the PR



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -539,48 +580,56 @@ case class Union(
     children.length > 1 && !(byName || allowMissingCol) && childrenResolved && 
allChildrenCompatible
   }
 
-  private lazy val lazyOutput: Seq[Attribute] = computeOutput()
+  override protected def withNewChildrenInternal(newChildren: 
IndexedSeq[LogicalPlan]): Union =
+    copy(children = newChildren)
+}
 
-  private def computeOutput(): Seq[Attribute] = 
Union.mergeChildOutputs(children.map(_.output))
+/**
+ * The logical node for recursion, that contains a initial (anchor) and a 
recursion describing term,
+ * that contains an [[UnionLoopRef]] node.
+ * The node is very similar to [[Union]] because the initial and "generated" 
children are union-ed
+ * and it is also similar to a loop because the recursion continues until the 
last generated child
+ * is not empty.
+ *
+ * @param id The id of the loop, inherited from [[CTERelationDef]]
+ * @param anchor The plan of the initial element of the loop.
+ * @param recursion The plan that describes the recursion with an 
[[UnionLoopRef]] node.
+ * @param limit An optional limit that can be pushed down to the node to stop 
the loop earlier.
+ */
+case class UnionLoop(id: Long,
+                     anchor: LogicalPlan,
+                     recursion: LogicalPlan,
+                     limit: Option[Int] = None) extends UnionBase {
+  override def children: Seq[LogicalPlan] = Seq(anchor, recursion)
+
+  override protected def withNewChildrenInternal(newChildren: 
IndexedSeq[LogicalPlan]): UnionLoop =
+    copy(anchor = newChildren(0), recursion = newChildren(1))
+}
 
-  /**
-   * Maps the constraints containing a given (original) sequence of attributes 
to those with a
-   * given (reference) sequence of attributes. Given the nature of union, we 
expect that the
-   * mapping between the original and reference sequences are symmetric.
-   */
-  private def rewriteConstraints(
-      reference: Seq[Attribute],
-      original: Seq[Attribute],
-      constraints: ExpressionSet): ExpressionSet = {
-    require(reference.size == original.size)
-    val attributeRewrites = AttributeMap(original.zip(reference))
-    constraints.map(_ transform {
-      case a: Attribute => attributeRewrites(a)
-    })
-  }
+/**
+ * The recursive reference in the recursive term of an [[UnionLoop]] node.
+ *
+ * @param loopId The id of the loop, inherited from [[CTERelationRef]]
+ * @param output The output attributes of this recursive reference.
+ * @param accumulated If false the the reference stands for the result of the 
previous iteration.
+ *                    If it is true then then it stands for the union of all 
previous iteration
+ *                    results.
+ */
+case class UnionLoopRef(loopId: Long,

Review Comment:
   Could we move this group of operators out of this 
`basicLogicalOperators.scala` and into a new file dedicated to CTE handling? 
They're no longer basic operators anymore :) and it could be useful to view 
them in the same context.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  // Substitute CTERelationRef with UnionLoopRef.
+  private def transformRefs(plan: LogicalPlan) = {
+    plan.transformWithPruning(_.containsPattern(CTE)) {
+      case r: CTERelationRef if r.recursive =>
+        UnionLoopRef(r.cteId, r.output, false)
+    }
+  }
+
+  // Update the definition's recursiveAnchor if the anchor is resolved.
+  private def recursiveAnchorResolved(cteDef: CTERelationDef): 
Option[LogicalPlan] = {
+    cteDef.child match {
+      case SubqueryAlias(_, ul: UnionLoop) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, Distinct(ul: UnionLoop)) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) 
=>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: 
UnionLoop))) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case _ =>
+        cteDef.failAnalysis(
+          errorClass = "INVALID_RECURSIVE_CTE",
+          messageParameters = Map.empty)
+    }
+  }
+
   private def resolveWithCTE(
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
       case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
+        val newCTEDefs = cteDefs.map { cteDef =>
+          val newCTEDef = if (cteDef.recursive) {
+            cteDef.child match {
+              // Substitutions to UnionLoop and UnionLoopRef.
+              case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, 
false)) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    UnionLoop(cteDef.id, anchor, transformRefs(recursion))))
+              case a @ SubqueryAlias(_,
+              ca @ UnresolvedSubqueryColumnAliases(_,
+              Union(Seq(anchor, recursion), false, false))) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    ca.copy(child =
+                      UnionLoop(cteDef.id, anchor, transformRefs(recursion)))))
+              // If the recursion is described with an UNION (deduplicating) 
clause then the
+              // recursive term should not return those rows that have been 
calculated previously,
+              // and we exclude those rows from the current iteration result.
+              case a @ SubqueryAlias(_, Distinct(Union(Seq(anchor, recursion), 
false, false))) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    UnionLoop(cteDef.id,
+                      Distinct(anchor),
+                      Except(
+                        transformRefs(recursion),
+                        UnionLoopRef(cteDef.id, cteDef.output, true),
+                        false))))
+              case a @ SubqueryAlias(_,
+              ca @ UnresolvedSubqueryColumnAliases(_, 
Distinct(Union(Seq(anchor, recursion),
+              false, false)))) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    ca.copy(child =
+                      UnionLoop(cteDef.id,
+                        Distinct(anchor),
+                        Except(
+                          transformRefs(recursion),
+                          UnionLoopRef(cteDef.id, cteDef.output, true),
+                          false)))))
+              case _ =>
+                // We do not support cases of sole Union (needs a 
SubqueryAlias above it), nor
+                // Project (as UnresolvedSubqueryColumnAliases have not been 
substituted with the
+                // Project yet), leaving us with cases of SubqueryAlias->Union 
and SubqueryAlias->
+                // UnresolvedSubqueryColumnAliases->Union. The same applies to 
Distinct Union.
+                cteDef.failAnalysis(
+                    errorClass = "INVALID_RECURSIVE_CTE",

Review Comment:
   Yes let's please do this



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -462,6 +462,57 @@ object Union {
   }
 }
 
+abstract class UnionBase extends LogicalPlan {

Review Comment:
   can we have a descriptive comment for this operator saying what it 
represents, when it is constructed, examples, etc.?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  // Substitute CTERelationRef with UnionLoopRef.

Review Comment:
   +1, let's expand all the comments in this PR heavily to give a lot of 
context about the total algorithm to be performed, steps taken, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to