nemanjapetr-db commented on code in PR #49351:
URL: https://github.com/apache/spark/pull/49351#discussion_r1912546917


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  // Substitute CTERelationRef with UnionLoopRef.
+  private def transformRefs(plan: LogicalPlan) = {
+    plan.transformWithPruning(_.containsPattern(CTE)) {
+      case r: CTERelationRef if r.recursive =>
+        UnionLoopRef(r.cteId, r.output, false)
+    }
+  }
+
+  // Update the definition's recursiveAnchor if the anchor is resolved.
+  private def recursiveAnchorResolved(cteDef: CTERelationDef): 
Option[LogicalPlan] = {
+    cteDef.child match {
+      case SubqueryAlias(_, ul: UnionLoop) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, Distinct(ul: UnionLoop)) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) 
=>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: 
UnionLoop))) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case _ =>
+        cteDef.failAnalysis(
+          errorClass = "INVALID_RECURSIVE_CTE",
+          messageParameters = Map.empty)
+    }
+  }
+
   private def resolveWithCTE(
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
       case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
+        val newCTEDefs = cteDefs.map { cteDef =>
+          val newCTEDef = if (cteDef.recursive) {
+            cteDef.child match {
+              // Substitutions to UnionLoop and UnionLoopRef.
+              case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, 
false)) =>

Review Comment:
   I will address this one in the next round.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  // Substitute CTERelationRef with UnionLoopRef.
+  private def transformRefs(plan: LogicalPlan) = {
+    plan.transformWithPruning(_.containsPattern(CTE)) {
+      case r: CTERelationRef if r.recursive =>
+        UnionLoopRef(r.cteId, r.output, false)
+    }
+  }
+
+  // Update the definition's recursiveAnchor if the anchor is resolved.
+  private def recursiveAnchorResolved(cteDef: CTERelationDef): 
Option[LogicalPlan] = {
+    cteDef.child match {
+      case SubqueryAlias(_, ul: UnionLoop) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, Distinct(ul: UnionLoop)) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) 
=>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: 
UnionLoop))) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case _ =>
+        cteDef.failAnalysis(
+          errorClass = "INVALID_RECURSIVE_CTE",
+          messageParameters = Map.empty)
+    }
+  }
+
   private def resolveWithCTE(
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
       case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
+        val newCTEDefs = cteDefs.map { cteDef =>
+          val newCTEDef = if (cteDef.recursive) {
+            cteDef.child match {
+              // Substitutions to UnionLoop and UnionLoopRef.
+              case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, 
false)) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    UnionLoop(cteDef.id, anchor, transformRefs(recursion))))
+              case a @ SubqueryAlias(_,
+              ca @ UnresolvedSubqueryColumnAliases(_,
+              Union(Seq(anchor, recursion), false, false))) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    ca.copy(child =
+                      UnionLoop(cteDef.id, anchor, transformRefs(recursion)))))
+              // If the recursion is described with an UNION (deduplicating) 
clause then the
+              // recursive term should not return those rows that have been 
calculated previously,
+              // and we exclude those rows from the current iteration result.
+              case a @ SubqueryAlias(_, Distinct(Union(Seq(anchor, recursion), 
false, false))) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    UnionLoop(cteDef.id,
+                      Distinct(anchor),
+                      Except(
+                        transformRefs(recursion),
+                        UnionLoopRef(cteDef.id, cteDef.output, true),
+                        false))))
+              case a @ SubqueryAlias(_,
+              ca @ UnresolvedSubqueryColumnAliases(_, 
Distinct(Union(Seq(anchor, recursion),
+              false, false)))) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    ca.copy(child =
+                      UnionLoop(cteDef.id,
+                        Distinct(anchor),
+                        Except(
+                          transformRefs(recursion),
+                          UnionLoopRef(cteDef.id, cteDef.output, true),
+                          false)))))
+              case _ =>
+                // We do not support cases of sole Union (needs a 
SubqueryAlias above it), nor
+                // Project (as UnresolvedSubqueryColumnAliases have not been 
substituted with the
+                // Project yet), leaving us with cases of SubqueryAlias->Union 
and SubqueryAlias->
+                // UnresolvedSubqueryColumnAliases->Union. The same applies to 
Distinct Union.
+                cteDef.failAnalysis(
+                    errorClass = "INVALID_RECURSIVE_CTE",
+                    messageParameters = Map.empty)
+            }
+          } else {
+            cteDef
           }
+
+          if (newCTEDef.recursive) {
+            // cteDefMap holds "partially" resolved (only via anchor) CTE 
definitions in the
+            // recursive case.
+            if (newCTEDef.resolved) {
+              newCTEDef.failAnalysis(
+                errorClass = "INVALID_RECURSIVE_CTE",
+                messageParameters = Map.empty)
+            }
+            if (recursiveAnchorResolved(newCTEDef).isDefined) {
+              cteDefMap.put(newCTEDef.id, newCTEDef)
+            }
+          } else {
+            if (newCTEDef.resolved) {
+              cteDefMap.put(newCTEDef.id, newCTEDef)
+            }
+          }
+
+          newCTEDef
         }
-        w
+        w.copy(cteDefs = newCTEDefs)
 
       case ref: CTERelationRef if !ref.resolved =>
         cteDefMap.get(ref.cteId).map { cteDef =>
-          CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output, 
cteDef.isStreaming)
+          if (ref.recursive) {
+            // Recursive references can be resolved from the anchor term. 
Non-resolved ref
+            // implies non-resolved definition. Since the definition was 
present in the map of
+            // resolved and "partially" resolved definitions, the only 
explanation is that
+            // definition was "partially" resolved.
+            val anchorResolved = recursiveAnchorResolved(cteDef)
+            if (anchorResolved.isDefined) {
+              ref.copy(_resolved = true, output = anchorResolved.get.output,
+                isStreaming = cteDef.isStreaming)
+            } else {
+              cteDef.failAnalysis(
+                errorClass = "INVALID_RECURSIVE_CTE",
+                messageParameters = Map.empty)
+            }
+          } else if (cteDef.resolved) {
+            ref.copy(_resolved = true, output = cteDef.output, isStreaming = 
cteDef.isStreaming)
+          } else {
+            // In the non-recursive case, cteDefMap contains only resolved 
Definitions.
+            cteDef.failAnalysis(

Review Comment:
   In the non-recursive case:
   
   We are pulling this def out from cteDefMap, and it is in that map precisely 
because it is resolved. The map insertion code above is:
   
   if (newCTEDef.resolved) {
     cteDefMap.put(newCTEDef.id, newCTEDef)
   }
   
   This is a sanity check that is enforcing the invariant.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -539,48 +580,56 @@ case class Union(
     children.length > 1 && !(byName || allowMissingCol) && childrenResolved && 
allChildrenCompatible
   }
 
-  private lazy val lazyOutput: Seq[Attribute] = computeOutput()
+  override protected def withNewChildrenInternal(newChildren: 
IndexedSeq[LogicalPlan]): Union =
+    copy(children = newChildren)
+}
 
-  private def computeOutput(): Seq[Attribute] = 
Union.mergeChildOutputs(children.map(_.output))
+/**
+ * The logical node for recursion, that contains a initial (anchor) and a 
recursion describing term,
+ * that contains an [[UnionLoopRef]] node.
+ * The node is very similar to [[Union]] because the initial and "generated" 
children are union-ed
+ * and it is also similar to a loop because the recursion continues until the 
last generated child
+ * is not empty.
+ *
+ * @param id The id of the loop, inherited from [[CTERelationDef]]
+ * @param anchor The plan of the initial element of the loop.
+ * @param recursion The plan that describes the recursion with an 
[[UnionLoopRef]] node.
+ * @param limit An optional limit that can be pushed down to the node to stop 
the loop earlier.
+ */
+case class UnionLoop(id: Long,
+                     anchor: LogicalPlan,
+                     recursion: LogicalPlan,
+                     limit: Option[Int] = None) extends UnionBase {
+  override def children: Seq[LogicalPlan] = Seq(anchor, recursion)
+
+  override protected def withNewChildrenInternal(newChildren: 
IndexedSeq[LogicalPlan]): UnionLoop =
+    copy(anchor = newChildren(0), recursion = newChildren(1))
+}
 
-  /**
-   * Maps the constraints containing a given (original) sequence of attributes 
to those with a
-   * given (reference) sequence of attributes. Given the nature of union, we 
expect that the
-   * mapping between the original and reference sequences are symmetric.
-   */
-  private def rewriteConstraints(
-      reference: Seq[Attribute],
-      original: Seq[Attribute],
-      constraints: ExpressionSet): ExpressionSet = {
-    require(reference.size == original.size)
-    val attributeRewrites = AttributeMap(original.zip(reference))
-    constraints.map(_ transform {
-      case a: Attribute => attributeRewrites(a)
-    })
-  }
+/**
+ * The recursive reference in the recursive term of an [[UnionLoop]] node.
+ *
+ * @param loopId The id of the loop, inherited from [[CTERelationRef]]
+ * @param output The output attributes of this recursive reference.
+ * @param accumulated If false the the reference stands for the result of the 
previous iteration.
+ *                    If it is true then then it stands for the union of all 
previous iteration
+ *                    results.
+ */
+case class UnionLoopRef(loopId: Long,

Review Comment:
   Will do at the very end, when all other non-refactoring comments are 
resolved.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala:
##########
@@ -762,4 +762,15 @@ object QueryPlan extends PredicateHelper {
       case e: AnalysisException => append(e.toString)
     }
   }
+
+  /**
+   * Generate detailed field string with different format based on type of 
input value

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  // Substitute CTERelationRef with UnionLoopRef.
+  private def transformRefs(plan: LogicalPlan) = {
+    plan.transformWithPruning(_.containsPattern(CTE)) {
+      case r: CTERelationRef if r.recursive =>
+        UnionLoopRef(r.cteId, r.output, false)
+    }
+  }
+
+  // Update the definition's recursiveAnchor if the anchor is resolved.
+  private def recursiveAnchorResolved(cteDef: CTERelationDef): 
Option[LogicalPlan] = {
+    cteDef.child match {
+      case SubqueryAlias(_, ul: UnionLoop) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, Distinct(ul: UnionLoop)) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) 
=>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: 
UnionLoop))) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case _ =>
+        cteDef.failAnalysis(
+          errorClass = "INVALID_RECURSIVE_CTE",
+          messageParameters = Map.empty)
+    }
+  }
+
   private def resolveWithCTE(
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
       case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
+        val newCTEDefs = cteDefs.map { cteDef =>
+          val newCTEDef = if (cteDef.recursive) {
+            cteDef.child match {
+              // Substitutions to UnionLoop and UnionLoopRef.
+              case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, 
false)) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    UnionLoop(cteDef.id, anchor, transformRefs(recursion))))
+              case a @ SubqueryAlias(_,
+              ca @ UnresolvedSubqueryColumnAliases(_,
+              Union(Seq(anchor, recursion), false, false))) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    ca.copy(child =
+                      UnionLoop(cteDef.id, anchor, transformRefs(recursion)))))
+              // If the recursion is described with an UNION (deduplicating) 
clause then the
+              // recursive term should not return those rows that have been 
calculated previously,
+              // and we exclude those rows from the current iteration result.
+              case a @ SubqueryAlias(_, Distinct(Union(Seq(anchor, recursion), 
false, false))) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    UnionLoop(cteDef.id,
+                      Distinct(anchor),
+                      Except(
+                        transformRefs(recursion),
+                        UnionLoopRef(cteDef.id, cteDef.output, true),
+                        false))))
+              case a @ SubqueryAlias(_,
+              ca @ UnresolvedSubqueryColumnAliases(_, 
Distinct(Union(Seq(anchor, recursion),
+              false, false)))) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    ca.copy(child =
+                      UnionLoop(cteDef.id,
+                        Distinct(anchor),
+                        Except(
+                          transformRefs(recursion),
+                          UnionLoopRef(cteDef.id, cteDef.output, true),
+                          false)))))
+              case _ =>
+                // We do not support cases of sole Union (needs a 
SubqueryAlias above it), nor
+                // Project (as UnresolvedSubqueryColumnAliases have not been 
substituted with the
+                // Project yet), leaving us with cases of SubqueryAlias->Union 
and SubqueryAlias->
+                // UnresolvedSubqueryColumnAliases->Union. The same applies to 
Distinct Union.
+                cteDef.failAnalysis(
+                    errorClass = "INVALID_RECURSIVE_CTE",

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  // Substitute CTERelationRef with UnionLoopRef.
+  private def transformRefs(plan: LogicalPlan) = {
+    plan.transformWithPruning(_.containsPattern(CTE)) {
+      case r: CTERelationRef if r.recursive =>
+        UnionLoopRef(r.cteId, r.output, false)
+    }
+  }
+
+  // Update the definition's recursiveAnchor if the anchor is resolved.
+  private def recursiveAnchorResolved(cteDef: CTERelationDef): 
Option[LogicalPlan] = {
+    cteDef.child match {
+      case SubqueryAlias(_, ul: UnionLoop) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, Distinct(ul: UnionLoop)) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) 
=>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: 
UnionLoop))) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case _ =>
+        cteDef.failAnalysis(
+          errorClass = "INVALID_RECURSIVE_CTE",
+          messageParameters = Map.empty)
+    }
+  }
+
   private def resolveWithCTE(
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
       case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
+        val newCTEDefs = cteDefs.map { cteDef =>
+          val newCTEDef = if (cteDef.recursive) {
+            cteDef.child match {
+              // Substitutions to UnionLoop and UnionLoopRef.
+              case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, 
false)) =>

Review Comment:
   Addressed, though this code may be changed to address the other feedback.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -539,48 +580,56 @@ case class Union(
     children.length > 1 && !(byName || allowMissingCol) && childrenResolved && 
allChildrenCompatible
   }
 
-  private lazy val lazyOutput: Seq[Attribute] = computeOutput()
+  override protected def withNewChildrenInternal(newChildren: 
IndexedSeq[LogicalPlan]): Union =
+    copy(children = newChildren)
+}
 
-  private def computeOutput(): Seq[Attribute] = 
Union.mergeChildOutputs(children.map(_.output))
+/**
+ * The logical node for recursion, that contains a initial (anchor) and a 
recursion describing term,
+ * that contains an [[UnionLoopRef]] node.
+ * The node is very similar to [[Union]] because the initial and "generated" 
children are union-ed
+ * and it is also similar to a loop because the recursion continues until the 
last generated child
+ * is not empty.
+ *
+ * @param id The id of the loop, inherited from [[CTERelationDef]]
+ * @param anchor The plan of the initial element of the loop.
+ * @param recursion The plan that describes the recursion with an 
[[UnionLoopRef]] node.
+ * @param limit An optional limit that can be pushed down to the node to stop 
the loop earlier.
+ */
+case class UnionLoop(id: Long,

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -462,6 +462,57 @@ object Union {
   }
 }
 
+abstract class UnionBase extends LogicalPlan {

Review Comment:
   Added a comment that reflects the helper nature of this class, it is never 
instantiated per se and replects commonalities between two different use cases 
of UNION (ALL).



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  // Substitute CTERelationRef with UnionLoopRef.
+  private def transformRefs(plan: LogicalPlan) = {
+    plan.transformWithPruning(_.containsPattern(CTE)) {
+      case r: CTERelationRef if r.recursive =>
+        UnionLoopRef(r.cteId, r.output, false)
+    }
+  }
+
+  // Update the definition's recursiveAnchor if the anchor is resolved.
+  private def recursiveAnchorResolved(cteDef: CTERelationDef): 
Option[LogicalPlan] = {
+    cteDef.child match {
+      case SubqueryAlias(_, ul: UnionLoop) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, Distinct(ul: UnionLoop)) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) 
=>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: 
UnionLoop))) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case _ =>
+        cteDef.failAnalysis(
+          errorClass = "INVALID_RECURSIVE_CTE",
+          messageParameters = Map.empty)
+    }
+  }
+
   private def resolveWithCTE(
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
       case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
+        val newCTEDefs = cteDefs.map { cteDef =>
+          val newCTEDef = if (cteDef.recursive) {
+            cteDef.child match {
+              // Substitutions to UnionLoop and UnionLoopRef.
+              case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, 
false)) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    UnionLoop(cteDef.id, anchor, transformRefs(recursion))))
+              case a @ SubqueryAlias(_,
+              ca @ UnresolvedSubqueryColumnAliases(_,
+              Union(Seq(anchor, recursion), false, false))) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    ca.copy(child =
+                      UnionLoop(cteDef.id, anchor, transformRefs(recursion)))))
+              // If the recursion is described with an UNION (deduplicating) 
clause then the
+              // recursive term should not return those rows that have been 
calculated previously,
+              // and we exclude those rows from the current iteration result.
+              case a @ SubqueryAlias(_, Distinct(Union(Seq(anchor, recursion), 
false, false))) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    UnionLoop(cteDef.id,
+                      Distinct(anchor),
+                      Except(
+                        transformRefs(recursion),
+                        UnionLoopRef(cteDef.id, cteDef.output, true),
+                        false))))
+              case a @ SubqueryAlias(_,
+              ca @ UnresolvedSubqueryColumnAliases(_, 
Distinct(Union(Seq(anchor, recursion),
+              false, false)))) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    ca.copy(child =
+                      UnionLoop(cteDef.id,
+                        Distinct(anchor),
+                        Except(
+                          transformRefs(recursion),
+                          UnionLoopRef(cteDef.id, cteDef.output, true),
+                          false)))))
+              case _ =>
+                // We do not support cases of sole Union (needs a 
SubqueryAlias above it), nor
+                // Project (as UnresolvedSubqueryColumnAliases have not been 
substituted with the
+                // Project yet), leaving us with cases of SubqueryAlias->Union 
and SubqueryAlias->
+                // UnresolvedSubqueryColumnAliases->Union. The same applies to 
Distinct Union.
+                cteDef.failAnalysis(
+                    errorClass = "INVALID_RECURSIVE_CTE",
+                    messageParameters = Map.empty)
+            }
+          } else {
+            cteDef
           }
+
+          if (newCTEDef.recursive) {
+            // cteDefMap holds "partially" resolved (only via anchor) CTE 
definitions in the
+            // recursive case.
+            if (newCTEDef.resolved) {
+              newCTEDef.failAnalysis(
+                errorClass = "INVALID_RECURSIVE_CTE",
+                messageParameters = Map.empty)
+            }
+            if (recursiveAnchorResolved(newCTEDef).isDefined) {
+              cteDefMap.put(newCTEDef.id, newCTEDef)
+            }
+          } else {
+            if (newCTEDef.resolved) {
+              cteDefMap.put(newCTEDef.id, newCTEDef)
+            }
+          }
+
+          newCTEDef
         }
-        w
+        w.copy(cteDefs = newCTEDefs)
 
       case ref: CTERelationRef if !ref.resolved =>
         cteDefMap.get(ref.cteId).map { cteDef =>

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to