nemanjapetr-db commented on code in PR #49351:
URL: https://github.com/apache/spark/pull/49351#discussion_r1915799531


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -539,48 +582,58 @@ case class Union(
     children.length > 1 && !(byName || allowMissingCol) && childrenResolved && 
allChildrenCompatible
   }
 
-  private lazy val lazyOutput: Seq[Attribute] = computeOutput()
-
-  private def computeOutput(): Seq[Attribute] = 
Union.mergeChildOutputs(children.map(_.output))
-
-  /**
-   * Maps the constraints containing a given (original) sequence of attributes 
to those with a
-   * given (reference) sequence of attributes. Given the nature of union, we 
expect that the
-   * mapping between the original and reference sequences are symmetric.
-   */
-  private def rewriteConstraints(
-      reference: Seq[Attribute],
-      original: Seq[Attribute],
-      constraints: ExpressionSet): ExpressionSet = {
-    require(reference.size == original.size)
-    val attributeRewrites = AttributeMap(original.zip(reference))
-    constraints.map(_ transform {
-      case a: Attribute => attributeRewrites(a)
-    })
-  }
+  override protected def withNewChildrenInternal(newChildren: 
IndexedSeq[LogicalPlan]): Union =
+    copy(children = newChildren)
+}
 
-  private def merge(a: ExpressionSet, b: ExpressionSet): ExpressionSet = {
-    val common = a.intersect(b)
-    // The constraint with only one reference could be easily inferred as 
predicate
-    // Grouping the constraints by it's references so we can combine the 
constraints with same
-    // reference together
-    val othera = a.diff(common).filter(_.references.size == 
1).groupBy(_.references.head)
-    val otherb = b.diff(common).filter(_.references.size == 
1).groupBy(_.references.head)
-    // loose the constraints by: A1 && B1 || A2 && B2  ->  (A1 || A2) && (B1 
|| B2)
-    val others = (othera.keySet intersect otherb.keySet).map { attr =>
-      Or(othera(attr).reduceLeft(And), otherb(attr).reduceLeft(And))
-    }
-    common ++ others
-  }
+/**
+ * The logical node for recursion, that contains a initial (anchor) and a 
recursion describing term,
+ * that contains an [[UnionLoopRef]] node.
+ * The node is very similar to [[Union]] because the initial and "generated" 
children are union-ed
+ * and it is also similar to a loop because the recursion continues until the 
last generated child
+ * is not empty.
+ *
+ * @param id The id of the loop, inherited from [[CTERelationDef]]
+ * @param anchor The plan of the initial element of the loop.
+ * @param recursion The plan that describes the recursion with an 
[[UnionLoopRef]] node.
+ * @param limit An optional limit that can be pushed down to the node to stop 
the loop earlier.
+ */
+case class UnionLoop(
+                      id: Long,
+                      anchor: LogicalPlan,
+                      recursion: LogicalPlan,
+                      limit: Option[Int] = None) extends UnionBase {
+  override def children: Seq[LogicalPlan] = Seq(anchor, recursion)
+
+  override protected def withNewChildrenInternal(newChildren: 
IndexedSeq[LogicalPlan]): UnionLoop =
+    copy(anchor = newChildren(0), recursion = newChildren(1))
+}
 
-  override protected lazy val validConstraints: ExpressionSet = {
-    children
-      .map(child => rewriteConstraints(children.head.output, child.output, 
child.constraints))
-      .reduce(merge(_, _))
+/**
+ * The recursive reference in the recursive term of an [[UnionLoop]] node.
+ *
+ * @param loopId The id of the loop, inherited from [[CTERelationRef]]
+ * @param output The output attributes of this recursive reference.
+ * @param accumulated If false the the reference stands for the result of the 
previous iteration.
+ *                    If it is true then then it stands for the union of all 
previous iteration
+ *                    results.
+ */
+case class UnionLoopRef(

Review Comment:
   Removed InsertLoops.scala as the substitution is now done from within 
ResolveWithCTE.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  // Substitute CTERelationRef with UnionLoopRef.
+  private def transformRefs(plan: LogicalPlan) = {

Review Comment:
   Renamed to be more readable and moved below.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  // Substitute CTERelationRef with UnionLoopRef.
+  private def transformRefs(plan: LogicalPlan) = {
+    plan.transformWithPruning(_.containsPattern(CTE)) {
+      case r: CTERelationRef if r.recursive =>
+        UnionLoopRef(r.cteId, r.output, false)
+    }
+  }
+
+  // Update the definition's recursiveAnchor if the anchor is resolved.
+  private def recursiveAnchorResolved(cteDef: CTERelationDef): 
Option[LogicalPlan] = {
+    cteDef.child match {
+      case SubqueryAlias(_, ul: UnionLoop) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, Distinct(ul: UnionLoop)) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) 
=>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: 
UnionLoop))) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case _ =>
+        cteDef.failAnalysis(
+          errorClass = "INVALID_RECURSIVE_CTE",
+          messageParameters = Map.empty)
+    }
+  }
+
   private def resolveWithCTE(
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
       case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
+        val newCTEDefs = cteDefs.map { cteDef =>

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  // Substitute CTERelationRef with UnionLoopRef.
+  private def transformRefs(plan: LogicalPlan) = {
+    plan.transformWithPruning(_.containsPattern(CTE)) {
+      case r: CTERelationRef if r.recursive =>
+        UnionLoopRef(r.cteId, r.output, false)
+    }
+  }
+
+  // Update the definition's recursiveAnchor if the anchor is resolved.
+  private def recursiveAnchorResolved(cteDef: CTERelationDef): 
Option[LogicalPlan] = {
+    cteDef.child match {
+      case SubqueryAlias(_, ul: UnionLoop) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, Distinct(ul: UnionLoop)) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) 
=>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: 
UnionLoop))) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case _ =>
+        cteDef.failAnalysis(
+          errorClass = "INVALID_RECURSIVE_CTE",
+          messageParameters = Map.empty)
+    }
+  }
+
   private def resolveWithCTE(
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
       case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
+        val newCTEDefs = cteDefs.map { cteDef =>
+          val newCTEDef = if (cteDef.recursive) {
+            cteDef.child match {
+              // Substitutions to UnionLoop and UnionLoopRef.
+              case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, 
false)) =>

Review Comment:
   The change was not possible. Regardless the comment is now moot as we 
changed the underlying code.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -539,48 +588,56 @@ case class Union(
     children.length > 1 && !(byName || allowMissingCol) && childrenResolved && 
allChildrenCompatible
   }
 
-  private lazy val lazyOutput: Seq[Attribute] = computeOutput()
+  override protected def withNewChildrenInternal(newChildren: 
IndexedSeq[LogicalPlan]): Union =
+    copy(children = newChildren)
+}
 
-  private def computeOutput(): Seq[Attribute] = 
Union.mergeChildOutputs(children.map(_.output))
+/**
+ * The logical node for recursion, that contains a initial (anchor) and a 
recursion describing term,
+ * that contains an [[UnionLoopRef]] node.
+ * The node is very similar to [[Union]] because the initial and "generated" 
children are union-ed
+ * and it is also similar to a loop because the recursion continues until the 
last generated child
+ * is not empty.
+ *
+ * @param id The id of the loop, inherited from [[CTERelationDef]]
+ * @param anchor The plan of the initial element of the loop.
+ * @param recursion The plan that describes the recursion with an 
[[UnionLoopRef]] node.
+ * @param limit An optional limit that can be pushed down to the node to stop 
the loop earlier.
+ */
+case class UnionLoop(id: Long,
+    anchor: LogicalPlan,
+    recursion: LogicalPlan,
+    limit: Option[Int] = None) extends UnionBase {
+  override def children: Seq[LogicalPlan] = Seq(anchor, recursion)
+
+  override protected def withNewChildrenInternal(newChildren: 
IndexedSeq[LogicalPlan]): UnionLoop =
+    copy(anchor = newChildren(0), recursion = newChildren(1))
+}
 
-  /**
-   * Maps the constraints containing a given (original) sequence of attributes 
to those with a
-   * given (reference) sequence of attributes. Given the nature of union, we 
expect that the
-   * mapping between the original and reference sequences are symmetric.
-   */
-  private def rewriteConstraints(
-      reference: Seq[Attribute],
-      original: Seq[Attribute],
-      constraints: ExpressionSet): ExpressionSet = {
-    require(reference.size == original.size)
-    val attributeRewrites = AttributeMap(original.zip(reference))
-    constraints.map(_ transform {
-      case a: Attribute => attributeRewrites(a)
-    })
-  }
+/**
+ * The recursive reference in the recursive term of an [[UnionLoop]] node.
+ *
+ * @param loopId The id of the loop, inherited from [[CTERelationRef]]
+ * @param output The output attributes of this recursive reference.
+ * @param accumulated If false the the reference stands for the result of the 
previous iteration.
+ *                    If it is true then then it stands for the union of all 
previous iteration
+ *                    results.
+ */
+case class UnionLoopRef(loopId: Long,
+    override val output: Seq[Attribute],

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -41,16 +47,113 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
-      case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
-          }
+      case withCTE @ WithCTE(_, cteDefs) =>
+        val newCTEDefs = cteDefs.map {
+          // `cteDef.recursive` means "presence of a recursive CTERelationRef 
under cteDef". The
+          // side effect of node substitution below is that after 
CTERelationRef substitution
+          // its cteDef is no more considered `recursive`. This code path is 
common for `cteDef`
+          // that were non-recursive from the get go, as well as those that 
are no more recursive
+          // due to node substitution.
+          case cteDef if !cteDef.recursive =>
+            if (cteDef.resolved) {
+              cteDefMap.put(cteDef.id, cteDef)
+            }
+            cteDef
+          case cteDef =>
+            cteDef.child match {
+              // If it is a supported recursive CTE query pattern (4 so far), 
extract the anchor and
+              // recursive plans from the Union and rewrite Union with 
UnionLoop. The recursive CTE
+              // references inside UnionLoop's recursive plan will be 
rewritten as UnionLoopRef,
+              // using the output of the resolved anchor plan. The side effect 
of recursive
+              // CTERelationRef->UnionLoopRef substitution is that `cteDef` 
that was originally
+              // considered `recursive` is no more in the context of 
`cteDef.recursive` method
+              // definition.
+              //
+              // Simple case of duplicating (UNION ALL) clause.
+              case alias @ SubqueryAlias(_, Union(Seq(anchor, recursion), 
false, false)) =>
+                if (!anchor.resolved) {
+                  cteDef
+                } else {
+                  val loop = UnionLoop(
+                    cteDef.id,
+                    anchor,
+                    rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, 
None))
+                  cteDef.copy(child = alias.copy(child = loop))
+                }
+
+              // The case of CTE name followed by a parenthesized list of 
column name(s), eg.
+              // WITH RECURSIVE t(n).
+              case alias @ SubqueryAlias(_,
+              columnAlias @ UnresolvedSubqueryColumnAliases(
+              colNames,
+              Union(Seq(anchor, recursion), false, false)
+              )) =>
+                if (!anchor.resolved) {
+                  cteDef
+                } else {
+                  val loop = UnionLoop(
+                    cteDef.id,
+                    anchor,
+                    rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, 
Some(colNames)))
+                  cteDef.copy(child = alias.copy(child = 
columnAlias.copy(child = loop)))
+                }
+
+              // If the recursion is described with an UNION (deduplicating) 
clause then the
+              // recursive term should not return those rows that have been 
calculated previously,
+              // and we exclude those rows from the current iteration result.
+              case alias @ SubqueryAlias(_,
+              Distinct(Union(Seq(anchor, recursion), false, false))) =>

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  // Substitute CTERelationRef with UnionLoopRef.
+  private def transformRefs(plan: LogicalPlan) = {

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -539,48 +588,56 @@ case class Union(
     children.length > 1 && !(byName || allowMissingCol) && childrenResolved && 
allChildrenCompatible
   }
 
-  private lazy val lazyOutput: Seq[Attribute] = computeOutput()
+  override protected def withNewChildrenInternal(newChildren: 
IndexedSeq[LogicalPlan]): Union =
+    copy(children = newChildren)
+}
 
-  private def computeOutput(): Seq[Attribute] = 
Union.mergeChildOutputs(children.map(_.output))
+/**
+ * The logical node for recursion, that contains a initial (anchor) and a 
recursion describing term,
+ * that contains an [[UnionLoopRef]] node.
+ * The node is very similar to [[Union]] because the initial and "generated" 
children are union-ed
+ * and it is also similar to a loop because the recursion continues until the 
last generated child
+ * is not empty.
+ *
+ * @param id The id of the loop, inherited from [[CTERelationDef]]
+ * @param anchor The plan of the initial element of the loop.
+ * @param recursion The plan that describes the recursion with an 
[[UnionLoopRef]] node.
+ * @param limit An optional limit that can be pushed down to the node to stop 
the loop earlier.
+ */
+case class UnionLoop(id: Long,
+    anchor: LogicalPlan,

Review Comment:
   Done



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveRecursiveCTESuite.scala:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.catalyst.analysis
+
+import 
org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.ResolveSubqueryColumnAliases
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class ResolveRecursiveCTESuite extends AnalysisTest {
+  // Motivated by:
+  // WITH RECURSIVE t AS (SELECT 1 UNION ALL SELECT * FROM t) SELECT * FROM t;
+  test("ResolveWithCTE rule on recursive CTE without 
UnresolvedSubqueryColumnAliases") {
+    // The analyzer will repeat ResolveWithCTE rule twice.
+    val rules = Seq(ResolveWithCTE, ResolveWithCTE)
+    val analyzer = new RuleExecutor[LogicalPlan] {
+      override val batches = Seq(Batch("Resolution", Once, rules: _*))
+    }
+    // Since cteDef IDs need to be the same, cteDef for each case will be 
created by copying
+    // this one with its child replaced.
+    val cteDef = CTERelationDef(OneRowRelation())
+
+    def getBeforePlan(cteDef: CTERelationDef): LogicalPlan = {
+      val anchor = Project(Seq(Alias(Literal(1), "1")()), OneRowRelation())
+
+      val recursionPart = Project(anchor.output,
+        SubqueryAlias("t",
+          CTERelationRef(cteDef.id, false, Seq(), false, recursive = true)))
+
+      val cteDefFinal = cteDef.copy(child =
+        SubqueryAlias("t", Union(Seq(anchor, recursionPart))))
+
+      val outerProject = Project(anchor.output,
+        SubqueryAlias("t",
+          CTERelationRef(cteDefFinal.id, false, Seq(), false, recursive = 
false)))
+
+      val finalPlan = WithCTE(outerProject, Seq(cteDefFinal))
+      finalPlan

Review Comment:
   Done.



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveRecursiveCTESuite.scala:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.catalyst.analysis
+
+import 
org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.ResolveSubqueryColumnAliases
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class ResolveRecursiveCTESuite extends AnalysisTest {
+  // Motivated by:
+  // WITH RECURSIVE t AS (SELECT 1 UNION ALL SELECT * FROM t) SELECT * FROM t;
+  test("ResolveWithCTE rule on recursive CTE without 
UnresolvedSubqueryColumnAliases") {
+    // The analyzer will repeat ResolveWithCTE rule twice.
+    val rules = Seq(ResolveWithCTE, ResolveWithCTE)
+    val analyzer = new RuleExecutor[LogicalPlan] {
+      override val batches = Seq(Batch("Resolution", Once, rules: _*))
+    }
+    // Since cteDef IDs need to be the same, cteDef for each case will be 
created by copying
+    // this one with its child replaced.
+    val cteDef = CTERelationDef(OneRowRelation())
+
+    def getBeforePlan(cteDef: CTERelationDef): LogicalPlan = {
+      val anchor = Project(Seq(Alias(Literal(1), "1")()), OneRowRelation())
+
+      val recursionPart = Project(anchor.output,
+        SubqueryAlias("t",
+          CTERelationRef(cteDef.id, false, Seq(), false, recursive = true)))
+
+      val cteDefFinal = cteDef.copy(child =
+        SubqueryAlias("t", Union(Seq(anchor, recursionPart))))
+
+      val outerProject = Project(anchor.output,

Review Comment:
   Done.



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveRecursiveCTESuite.scala:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.catalyst.analysis
+
+import 
org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.ResolveSubqueryColumnAliases
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class ResolveRecursiveCTESuite extends AnalysisTest {
+  // Motivated by:
+  // WITH RECURSIVE t AS (SELECT 1 UNION ALL SELECT * FROM t) SELECT * FROM t;
+  test("ResolveWithCTE rule on recursive CTE without 
UnresolvedSubqueryColumnAliases") {
+    // The analyzer will repeat ResolveWithCTE rule twice.
+    val rules = Seq(ResolveWithCTE, ResolveWithCTE)
+    val analyzer = new RuleExecutor[LogicalPlan] {
+      override val batches = Seq(Batch("Resolution", Once, rules: _*))
+    }
+    // Since cteDef IDs need to be the same, cteDef for each case will be 
created by copying
+    // this one with its child replaced.
+    val cteDef = CTERelationDef(OneRowRelation())
+
+    def getBeforePlan(cteDef: CTERelationDef): LogicalPlan = {
+      val anchor = Project(Seq(Alias(Literal(1), "1")()), OneRowRelation())
+
+      val recursionPart = Project(anchor.output,
+        SubqueryAlias("t",
+          CTERelationRef(cteDef.id, false, Seq(), false, recursive = true)))
+
+      val cteDefFinal = cteDef.copy(child =
+        SubqueryAlias("t", Union(Seq(anchor, recursionPart))))
+
+      val outerProject = Project(anchor.output,
+        SubqueryAlias("t",
+          CTERelationRef(cteDefFinal.id, false, Seq(), false, recursive = 
false)))
+
+      val finalPlan = WithCTE(outerProject, Seq(cteDefFinal))
+      finalPlan
+    }
+
+    def getAfterPlan(cteDef: CTERelationDef): LogicalPlan = {
+      val anchor = Project(Seq(Alias(Literal(1), "1")()), OneRowRelation())

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -41,16 +47,113 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
-      case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
-          }
+      case withCTE @ WithCTE(_, cteDefs) =>
+        val newCTEDefs = cteDefs.map {
+          // `cteDef.recursive` means "presence of a recursive CTERelationRef 
under cteDef". The
+          // side effect of node substitution below is that after 
CTERelationRef substitution
+          // its cteDef is no more considered `recursive`. This code path is 
common for `cteDef`
+          // that were non-recursive from the get go, as well as those that 
are no more recursive
+          // due to node substitution.
+          case cteDef if !cteDef.recursive =>
+            if (cteDef.resolved) {
+              cteDefMap.put(cteDef.id, cteDef)
+            }
+            cteDef
+          case cteDef =>
+            cteDef.child match {
+              // If it is a supported recursive CTE query pattern (4 so far), 
extract the anchor and
+              // recursive plans from the Union and rewrite Union with 
UnionLoop. The recursive CTE
+              // references inside UnionLoop's recursive plan will be 
rewritten as UnionLoopRef,
+              // using the output of the resolved anchor plan. The side effect 
of recursive
+              // CTERelationRef->UnionLoopRef substitution is that `cteDef` 
that was originally
+              // considered `recursive` is no more in the context of 
`cteDef.recursive` method
+              // definition.
+              //
+              // Simple case of duplicating (UNION ALL) clause.
+              case alias @ SubqueryAlias(_, Union(Seq(anchor, recursion), 
false, false)) =>
+                if (!anchor.resolved) {
+                  cteDef
+                } else {
+                  val loop = UnionLoop(
+                    cteDef.id,
+                    anchor,
+                    rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, 
None))
+                  cteDef.copy(child = alias.copy(child = loop))
+                }
+
+              // The case of CTE name followed by a parenthesized list of 
column name(s), eg.
+              // WITH RECURSIVE t(n).
+              case alias @ SubqueryAlias(_,
+              columnAlias @ UnresolvedSubqueryColumnAliases(
+              colNames,
+              Union(Seq(anchor, recursion), false, false)
+              )) =>
+                if (!anchor.resolved) {
+                  cteDef
+                } else {
+                  val loop = UnionLoop(
+                    cteDef.id,
+                    anchor,
+                    rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, 
Some(colNames)))
+                  cteDef.copy(child = alias.copy(child = 
columnAlias.copy(child = loop)))
+                }
+
+              // If the recursion is described with an UNION (deduplicating) 
clause then the
+              // recursive term should not return those rows that have been 
calculated previously,
+              // and we exclude those rows from the current iteration result.
+              case alias @ SubqueryAlias(_,
+              Distinct(Union(Seq(anchor, recursion), false, false))) =>
+                if (!anchor.resolved) {
+                  cteDef
+                } else {
+                  val loop = UnionLoop(
+                    cteDef.id,
+                    Distinct(anchor),
+                    Except(
+                      rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, 
None),
+                      UnionLoopRef(cteDef.id, anchor.output, true),
+                      isAll = false
+                    )
+                  )
+                  cteDef.copy(child = alias.copy(child = loop))
+                }
+
+              // The case of CTE name followed by a parenthesized list of 
column name(s).
+              case alias @ SubqueryAlias(_,
+              columnAlias@UnresolvedSubqueryColumnAliases(
+              colNames,
+              Distinct(Union(Seq(anchor, recursion), false, false))
+              )) =>

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -41,19 +43,102 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
-      case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
-          }
+      case withCTE @ WithCTE(_, cteDefs) =>
+        val newCTEDefs = cteDefs.map {
+          case cteDef if !cteDef.recursive =>
+            val newCTEDef = cteDef
+            if (newCTEDef.resolved) {
+              cteDefMap.put(newCTEDef.id, newCTEDef)
+            }
+            newCTEDef
+          case cteDef =>

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,23 +39,159 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  // Substitute CTERelationRef with UnionLoopRef.
+  private def transformRefs(plan: LogicalPlan) = {
+    plan.transformWithPruning(_.containsPattern(CTE)) {
+      case r: CTERelationRef if r.recursive =>
+        UnionLoopRef(r.cteId, r.output, false)
+    }
+  }
+
+  // Update the definition's recursiveAnchor if the anchor is resolved.
+  private def recursiveAnchorResolved(cteDef: CTERelationDef): 
Option[LogicalPlan] = {
+    cteDef.child match {
+      case SubqueryAlias(_, ul: UnionLoop) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, Distinct(ul: UnionLoop)) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) 
=>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: 
UnionLoop))) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case _ =>
+        cteDef.failAnalysis(
+          errorClass = "INVALID_RECURSIVE_CTE",
+          messageParameters = Map.empty)
+        throw QueryCompilationErrors.recursiveCteError()
+    }
+  }
+
   private def resolveWithCTE(
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
-      case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
+      case withCTE @ WithCTE(_, cteDefs) =>
+        val newCTEDefs = cteDefs.map { cteDef =>
+          val newCTEDef = if (cteDef.recursive) {
+            cteDef.child match {
+              // Substitutions to UnionLoop and UnionLoopRef.
+              case alias @ SubqueryAlias(_, Union(Seq(anchor, recursion), 
false, false)) =>

Review Comment:
   Moot at this point. Closing.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -41,16 +47,113 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
-      case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
-          }
+      case withCTE @ WithCTE(_, cteDefs) =>
+        val newCTEDefs = cteDefs.map {
+          // `cteDef.recursive` means "presence of a recursive CTERelationRef 
under cteDef". The
+          // side effect of node substitution below is that after 
CTERelationRef substitution
+          // its cteDef is no more considered `recursive`. This code path is 
common for `cteDef`
+          // that were non-recursive from the get go, as well as those that 
are no more recursive
+          // due to node substitution.
+          case cteDef if !cteDef.recursive =>
+            if (cteDef.resolved) {
+              cteDefMap.put(cteDef.id, cteDef)
+            }
+            cteDef
+          case cteDef =>
+            cteDef.child match {
+              // If it is a supported recursive CTE query pattern (4 so far), 
extract the anchor and
+              // recursive plans from the Union and rewrite Union with 
UnionLoop. The recursive CTE
+              // references inside UnionLoop's recursive plan will be 
rewritten as UnionLoopRef,
+              // using the output of the resolved anchor plan. The side effect 
of recursive
+              // CTERelationRef->UnionLoopRef substitution is that `cteDef` 
that was originally
+              // considered `recursive` is no more in the context of 
`cteDef.recursive` method
+              // definition.
+              //
+              // Simple case of duplicating (UNION ALL) clause.
+              case alias @ SubqueryAlias(_, Union(Seq(anchor, recursion), 
false, false)) =>
+                if (!anchor.resolved) {
+                  cteDef
+                } else {
+                  val loop = UnionLoop(
+                    cteDef.id,
+                    anchor,
+                    rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, 
None))
+                  cteDef.copy(child = alias.copy(child = loop))
+                }
+
+              // The case of CTE name followed by a parenthesized list of 
column name(s), eg.
+              // WITH RECURSIVE t(n).
+              case alias @ SubqueryAlias(_,
+              columnAlias @ UnresolvedSubqueryColumnAliases(
+              colNames,
+              Union(Seq(anchor, recursion), false, false)
+              )) =>
+                if (!anchor.resolved) {
+                  cteDef
+                } else {
+                  val loop = UnionLoop(
+                    cteDef.id,
+                    anchor,
+                    rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, 
Some(colNames)))
+                  cteDef.copy(child = alias.copy(child = 
columnAlias.copy(child = loop)))
+                }
+
+              // If the recursion is described with an UNION (deduplicating) 
clause then the
+              // recursive term should not return those rows that have been 
calculated previously,
+              // and we exclude those rows from the current iteration result.
+              case alias @ SubqueryAlias(_,
+              Distinct(Union(Seq(anchor, recursion), false, false))) =>
+                if (!anchor.resolved) {
+                  cteDef
+                } else {
+                  val loop = UnionLoop(
+                    cteDef.id,
+                    Distinct(anchor),
+                    Except(
+                      rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, 
None),
+                      UnionLoopRef(cteDef.id, anchor.output, true),
+                      isAll = false
+                    )
+                  )
+                  cteDef.copy(child = alias.copy(child = loop))
+                }
+
+              // The case of CTE name followed by a parenthesized list of 
column name(s).
+              case alias @ SubqueryAlias(_,
+              columnAlias@UnresolvedSubqueryColumnAliases(
+              colNames,
+              Distinct(Union(Seq(anchor, recursion), false, false))
+              )) =>
+                if (!anchor.resolved) {
+                  cteDef
+                } else {
+                  val loop = UnionLoop(
+                    cteDef.id,
+                    Distinct(anchor),
+                    Except(
+                      rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, 
Some(colNames)),
+                      UnionLoopRef(cteDef.id, anchor.output, true),
+                      isAll = false
+                    )
+                  )
+                  cteDef.copy(child = alias.copy(child = 
columnAlias.copy(child = loop)))
+                }
+
+              case other if !other.exists(_.isInstanceOf[UnionLoop]) =>

Review Comment:
   Done.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala:
##########
@@ -297,6 +297,7 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
   /**
    * Generate detailed field string with different format based on type of 
input value
    */
+  // TODO(nemanja.petro...@databricks.com) Delete method as it is duplicated 
in QueryPlan.scala.

Review Comment:
   Follow up please.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -41,16 +47,113 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
-      case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
-          }
+      case withCTE @ WithCTE(_, cteDefs) =>
+        val newCTEDefs = cteDefs.map {
+          // `cteDef.recursive` means "presence of a recursive CTERelationRef 
under cteDef". The
+          // side effect of node substitution below is that after 
CTERelationRef substitution
+          // its cteDef is no more considered `recursive`. This code path is 
common for `cteDef`
+          // that were non-recursive from the get go, as well as those that 
are no more recursive
+          // due to node substitution.
+          case cteDef if !cteDef.recursive =>
+            if (cteDef.resolved) {
+              cteDefMap.put(cteDef.id, cteDef)
+            }
+            cteDef
+          case cteDef =>
+            cteDef.child match {
+              // If it is a supported recursive CTE query pattern (4 so far), 
extract the anchor and
+              // recursive plans from the Union and rewrite Union with 
UnionLoop. The recursive CTE
+              // references inside UnionLoop's recursive plan will be 
rewritten as UnionLoopRef,
+              // using the output of the resolved anchor plan. The side effect 
of recursive
+              // CTERelationRef->UnionLoopRef substitution is that `cteDef` 
that was originally
+              // considered `recursive` is no more in the context of 
`cteDef.recursive` method
+              // definition.
+              //
+              // Simple case of duplicating (UNION ALL) clause.
+              case alias @ SubqueryAlias(_, Union(Seq(anchor, recursion), 
false, false)) =>
+                if (!anchor.resolved) {
+                  cteDef
+                } else {
+                  val loop = UnionLoop(
+                    cteDef.id,
+                    anchor,
+                    rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, 
None))
+                  cteDef.copy(child = alias.copy(child = loop))
+                }
+
+              // The case of CTE name followed by a parenthesized list of 
column name(s), eg.
+              // WITH RECURSIVE t(n).
+              case alias @ SubqueryAlias(_,
+              columnAlias @ UnresolvedSubqueryColumnAliases(
+              colNames,
+              Union(Seq(anchor, recursion), false, false)
+              )) =>

Review Comment:
   Done.



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveRecursiveCTESuite.scala:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.catalyst.analysis
+
+import 
org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.ResolveSubqueryColumnAliases
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class ResolveRecursiveCTESuite extends AnalysisTest {
+  // Motivated by:
+  // WITH RECURSIVE t AS (SELECT 1 UNION ALL SELECT * FROM t) SELECT * FROM t;
+  test("ResolveWithCTE rule on recursive CTE without 
UnresolvedSubqueryColumnAliases") {
+    // The analyzer will repeat ResolveWithCTE rule twice.
+    val rules = Seq(ResolveWithCTE, ResolveWithCTE)
+    val analyzer = new RuleExecutor[LogicalPlan] {
+      override val batches = Seq(Batch("Resolution", Once, rules: _*))
+    }
+    // Since cteDef IDs need to be the same, cteDef for each case will be 
created by copying
+    // this one with its child replaced.
+    val cteDef = CTERelationDef(OneRowRelation())
+
+    def getBeforePlan(cteDef: CTERelationDef): LogicalPlan = {
+      val anchor = Project(Seq(Alias(Literal(1), "1")()), OneRowRelation())
+
+      val recursionPart = Project(anchor.output,
+        SubqueryAlias("t",
+          CTERelationRef(cteDef.id, false, Seq(), false, recursive = true)))
+
+      val cteDefFinal = cteDef.copy(child =
+        SubqueryAlias("t", Union(Seq(anchor, recursionPart))))
+
+      val outerProject = Project(anchor.output,
+        SubqueryAlias("t",
+          CTERelationRef(cteDefFinal.id, false, Seq(), false, recursive = 
false)))
+
+      val finalPlan = WithCTE(outerProject, Seq(cteDefFinal))
+      finalPlan
+    }
+
+    def getAfterPlan(cteDef: CTERelationDef): LogicalPlan = {
+      val anchor = Project(Seq(Alias(Literal(1), "1")()), OneRowRelation())
+
+      val saRecursion = SubqueryAlias("t",
+        UnionLoopRef(cteDef.id, anchor.output, false))
+      val recursionPart = Project(saRecursion.output, saRecursion)
+
+      val cteDefFinal = cteDef.copy(child =
+        SubqueryAlias("t", UnionLoop(cteDef.id, anchor, recursionPart)))
+
+      val outerCteRef = CTERelationRef(cteDefFinal.id, true, 
cteDefFinal.output, false,
+        recursive = false)
+      val outerProject = Project(outerCteRef.output, SubqueryAlias("t", 
outerCteRef))
+
+      val finalPlan = WithCTE(outerProject, Seq(cteDefFinal))
+      finalPlan
+    }
+
+    val beforePlan = getBeforePlan(cteDef)
+    val afterPlan = getAfterPlan(cteDef)
+
+    comparePlans(analyzer.execute(beforePlan), afterPlan)

Review Comment:
   No analyze method, queryExecution, execution or alike, seems to me.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -4311,4 +4311,12 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       origin = origin
     )
   }
+
+  def recursiveCteError(error: String): Throwable = {

Review Comment:
   Done.



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -3048,6 +3048,12 @@
     ],
     "sqlState" : "42000"
   },
+  "INVALID_RECURSIVE_CTE" : {

Review Comment:
   Done.



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -3048,6 +3048,12 @@
     ],
     "sqlState" : "42000"
   },
+  "INVALID_RECURSIVE_CTE" : {
+    "message" : [
+      "Invalid recursive definition found. Recursive queries must contain an 
UNION or an UNION ALL statement with 2 children. The first child needs to be 
the anchor term without any recursive references. <error>"

Review Comment:
   Removed.



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveRecursiveCTESuite.scala:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.catalyst.analysis
+
+import 
org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.ResolveSubqueryColumnAliases
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class ResolveRecursiveCTESuite extends AnalysisTest {
+  // Motivated by:
+  // WITH RECURSIVE t AS (SELECT 1 UNION ALL SELECT * FROM t) SELECT * FROM t;
+  test("ResolveWithCTE rule on recursive CTE without 
UnresolvedSubqueryColumnAliases") {
+    // The analyzer will repeat ResolveWithCTE rule twice.
+    val rules = Seq(ResolveWithCTE, ResolveWithCTE)
+    val analyzer = new RuleExecutor[LogicalPlan] {
+      override val batches = Seq(Batch("Resolution", Once, rules: _*))
+    }
+    // Since cteDef IDs need to be the same, cteDef for each case will be 
created by copying
+    // this one with its child replaced.
+    val cteDef = CTERelationDef(OneRowRelation())
+
+    def getBeforePlan(cteDef: CTERelationDef): LogicalPlan = {
+      val anchor = Project(Seq(Alias(Literal(1), "1")()), OneRowRelation())
+
+      val recursionPart = Project(anchor.output,
+        SubqueryAlias("t",
+          CTERelationRef(cteDef.id, false, Seq(), false, recursive = true)))
+
+      val cteDefFinal = cteDef.copy(child =
+        SubqueryAlias("t", Union(Seq(anchor, recursionPart))))
+
+      val outerProject = Project(anchor.output,
+        SubqueryAlias("t",
+          CTERelationRef(cteDefFinal.id, false, Seq(), false, recursive = 
false)))
+
+      val finalPlan = WithCTE(outerProject, Seq(cteDefFinal))
+      finalPlan
+    }
+
+    def getAfterPlan(cteDef: CTERelationDef): LogicalPlan = {
+      val anchor = Project(Seq(Alias(Literal(1), "1")()), OneRowRelation())
+
+      val saRecursion = SubqueryAlias("t",
+        UnionLoopRef(cteDef.id, anchor.output, false))
+      val recursionPart = Project(saRecursion.output, saRecursion)
+
+      val cteDefFinal = cteDef.copy(child =
+        SubqueryAlias("t", UnionLoop(cteDef.id, anchor, recursionPart)))
+
+      val outerCteRef = CTERelationRef(cteDefFinal.id, true, 
cteDefFinal.output, false,

Review Comment:
   No, it is a non recursive Ref, outside of Union/UnionLoop.



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveRecursiveCTESuite.scala:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.catalyst.analysis
+
+import 
org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.ResolveSubqueryColumnAliases
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class ResolveRecursiveCTESuite extends AnalysisTest {
+  // Motivated by:
+  // WITH RECURSIVE t AS (SELECT 1 UNION ALL SELECT * FROM t) SELECT * FROM t;
+  test("ResolveWithCTE rule on recursive CTE without 
UnresolvedSubqueryColumnAliases") {
+    // The analyzer will repeat ResolveWithCTE rule twice.
+    val rules = Seq(ResolveWithCTE, ResolveWithCTE)
+    val analyzer = new RuleExecutor[LogicalPlan] {
+      override val batches = Seq(Batch("Resolution", Once, rules: _*))
+    }
+    // Since cteDef IDs need to be the same, cteDef for each case will be 
created by copying
+    // this one with its child replaced.
+    val cteDef = CTERelationDef(OneRowRelation())
+
+    def getBeforePlan(cteDef: CTERelationDef): LogicalPlan = {
+      val anchor = Project(Seq(Alias(Literal(1), "1")()), OneRowRelation())
+
+      val recursionPart = Project(anchor.output,

Review Comment:
   Simplified it. I was following a valid SQL plan.



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveRecursiveCTESuite.scala:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.catalyst.analysis
+
+import 
org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.ResolveSubqueryColumnAliases
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class ResolveRecursiveCTESuite extends AnalysisTest {
+  // Motivated by:
+  // WITH RECURSIVE t AS (SELECT 1 UNION ALL SELECT * FROM t) SELECT * FROM t;
+  test("ResolveWithCTE rule on recursive CTE without 
UnresolvedSubqueryColumnAliases") {
+    // The analyzer will repeat ResolveWithCTE rule twice.
+    val rules = Seq(ResolveWithCTE, ResolveWithCTE)
+    val analyzer = new RuleExecutor[LogicalPlan] {
+      override val batches = Seq(Batch("Resolution", Once, rules: _*))
+    }
+    // Since cteDef IDs need to be the same, cteDef for each case will be 
created by copying
+    // this one with its child replaced.
+    val cteDef = CTERelationDef(OneRowRelation())
+
+    def getBeforePlan(cteDef: CTERelationDef): LogicalPlan = {
+      val anchor = Project(Seq(Alias(Literal(1), "1")()), OneRowRelation())
+
+      val recursionPart = Project(anchor.output,

Review Comment:
   As discussed offline I will do it in a follow up PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to