vladimirg-db commented on code in PR #49351:
URL: https://github.com/apache/spark/pull/49351#discussion_r1910920492


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  // Substitute CTERelationRef with UnionLoopRef.
+  private def transformRefs(plan: LogicalPlan) = {
+    plan.transformWithPruning(_.containsPattern(CTE)) {
+      case r: CTERelationRef if r.recursive =>
+        UnionLoopRef(r.cteId, r.output, false)
+    }
+  }
+
+  // Update the definition's recursiveAnchor if the anchor is resolved.
+  private def recursiveAnchorResolved(cteDef: CTERelationDef): 
Option[LogicalPlan] = {
+    cteDef.child match {
+      case SubqueryAlias(_, ul: UnionLoop) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, Distinct(ul: UnionLoop)) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) 
=>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: 
UnionLoop))) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case _ =>
+        cteDef.failAnalysis(
+          errorClass = "INVALID_RECURSIVE_CTE",
+          messageParameters = Map.empty)
+    }
+  }
+
   private def resolveWithCTE(
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
       case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
+        val newCTEDefs = cteDefs.map { cteDef =>
+          val newCTEDef = if (cteDef.recursive) {
+            cteDef.child match {
+              // Substitutions to UnionLoop and UnionLoopRef.
+              case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, 
false)) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    UnionLoop(cteDef.id, anchor, transformRefs(recursion))))
+              case a @ SubqueryAlias(_,
+              ca @ UnresolvedSubqueryColumnAliases(_,
+              Union(Seq(anchor, recursion), false, false))) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    ca.copy(child =
+                      UnionLoop(cteDef.id, anchor, transformRefs(recursion)))))
+              // If the recursion is described with an UNION (deduplicating) 
clause then the
+              // recursive term should not return those rows that have been 
calculated previously,
+              // and we exclude those rows from the current iteration result.
+              case a @ SubqueryAlias(_, Distinct(Union(Seq(anchor, recursion), 
false, false))) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    UnionLoop(cteDef.id,
+                      Distinct(anchor),
+                      Except(
+                        transformRefs(recursion),
+                        UnionLoopRef(cteDef.id, cteDef.output, true),
+                        false))))
+              case a @ SubqueryAlias(_,
+              ca @ UnresolvedSubqueryColumnAliases(_, 
Distinct(Union(Seq(anchor, recursion),
+              false, false)))) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    ca.copy(child =
+                      UnionLoop(cteDef.id,
+                        Distinct(anchor),
+                        Except(
+                          transformRefs(recursion),
+                          UnionLoopRef(cteDef.id, cteDef.output, true),
+                          false)))))
+              case _ =>
+                // We do not support cases of sole Union (needs a 
SubqueryAlias above it), nor
+                // Project (as UnresolvedSubqueryColumnAliases have not been 
substituted with the
+                // Project yet), leaving us with cases of SubqueryAlias->Union 
and SubqueryAlias->
+                // UnresolvedSubqueryColumnAliases->Union. The same applies to 
Distinct Union.
+                cteDef.failAnalysis(
+                    errorClass = "INVALID_RECURSIVE_CTE",

Review Comment:
   Shall we add a new error class to `error-conditions.json` and a new function 
to `QueryCompilationErrors`?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  // Substitute CTERelationRef with UnionLoopRef.
+  private def transformRefs(plan: LogicalPlan) = {
+    plan.transformWithPruning(_.containsPattern(CTE)) {
+      case r: CTERelationRef if r.recursive =>
+        UnionLoopRef(r.cteId, r.output, false)
+    }
+  }
+
+  // Update the definition's recursiveAnchor if the anchor is resolved.
+  private def recursiveAnchorResolved(cteDef: CTERelationDef): 
Option[LogicalPlan] = {
+    cteDef.child match {
+      case SubqueryAlias(_, ul: UnionLoop) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, Distinct(ul: UnionLoop)) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) 
=>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: 
UnionLoop))) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case _ =>
+        cteDef.failAnalysis(
+          errorClass = "INVALID_RECURSIVE_CTE",
+          messageParameters = Map.empty)
+    }
+  }
+
   private def resolveWithCTE(
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
       case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
+        val newCTEDefs = cteDefs.map { cteDef =>
+          val newCTEDef = if (cteDef.recursive) {
+            cteDef.child match {
+              // Substitutions to UnionLoop and UnionLoopRef.
+              case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, 
false)) =>

Review Comment:
   I know that developers in Catalyst really enjoy one-letter variables in 
matches, but it does not feel like a good code health.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  // Substitute CTERelationRef with UnionLoopRef.
+  private def transformRefs(plan: LogicalPlan) = {
+    plan.transformWithPruning(_.containsPattern(CTE)) {
+      case r: CTERelationRef if r.recursive =>
+        UnionLoopRef(r.cteId, r.output, false)
+    }
+  }
+
+  // Update the definition's recursiveAnchor if the anchor is resolved.
+  private def recursiveAnchorResolved(cteDef: CTERelationDef): 
Option[LogicalPlan] = {
+    cteDef.child match {
+      case SubqueryAlias(_, ul: UnionLoop) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, Distinct(ul: UnionLoop)) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) 
=>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: 
UnionLoop))) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case _ =>
+        cteDef.failAnalysis(
+          errorClass = "INVALID_RECURSIVE_CTE",
+          messageParameters = Map.empty)
+    }
+  }
+
   private def resolveWithCTE(
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
       case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
+        val newCTEDefs = cteDefs.map { cteDef =>
+          val newCTEDef = if (cteDef.recursive) {
+            cteDef.child match {
+              // Substitutions to UnionLoop and UnionLoopRef.
+              case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, 
false)) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    UnionLoop(cteDef.id, anchor, transformRefs(recursion))))
+              case a @ SubqueryAlias(_,
+              ca @ UnresolvedSubqueryColumnAliases(_,
+              Union(Seq(anchor, recursion), false, false))) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    ca.copy(child =
+                      UnionLoop(cteDef.id, anchor, transformRefs(recursion)))))
+              // If the recursion is described with an UNION (deduplicating) 
clause then the
+              // recursive term should not return those rows that have been 
calculated previously,
+              // and we exclude those rows from the current iteration result.
+              case a @ SubqueryAlias(_, Distinct(Union(Seq(anchor, recursion), 
false, false))) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    UnionLoop(cteDef.id,
+                      Distinct(anchor),
+                      Except(
+                        transformRefs(recursion),
+                        UnionLoopRef(cteDef.id, cteDef.output, true),
+                        false))))
+              case a @ SubqueryAlias(_,
+              ca @ UnresolvedSubqueryColumnAliases(_, 
Distinct(Union(Seq(anchor, recursion),
+              false, false)))) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    ca.copy(child =
+                      UnionLoop(cteDef.id,
+                        Distinct(anchor),
+                        Except(
+                          transformRefs(recursion),
+                          UnionLoopRef(cteDef.id, cteDef.output, true),
+                          false)))))
+              case _ =>
+                // We do not support cases of sole Union (needs a 
SubqueryAlias above it), nor
+                // Project (as UnresolvedSubqueryColumnAliases have not been 
substituted with the
+                // Project yet), leaving us with cases of SubqueryAlias->Union 
and SubqueryAlias->
+                // UnresolvedSubqueryColumnAliases->Union. The same applies to 
Distinct Union.
+                cteDef.failAnalysis(
+                    errorClass = "INVALID_RECURSIVE_CTE",
+                    messageParameters = Map.empty)
+            }
+          } else {
+            cteDef
           }
+
+          if (newCTEDef.recursive) {
+            // cteDefMap holds "partially" resolved (only via anchor) CTE 
definitions in the
+            // recursive case.
+            if (newCTEDef.resolved) {
+              newCTEDef.failAnalysis(
+                errorClass = "INVALID_RECURSIVE_CTE",
+                messageParameters = Map.empty)
+            }
+            if (recursiveAnchorResolved(newCTEDef).isDefined) {
+              cteDefMap.put(newCTEDef.id, newCTEDef)
+            }
+          } else {
+            if (newCTEDef.resolved) {
+              cteDefMap.put(newCTEDef.id, newCTEDef)
+            }
+          }
+
+          newCTEDef
         }
-        w
+        w.copy(cteDefs = newCTEDefs)
 
       case ref: CTERelationRef if !ref.resolved =>
         cteDefMap.get(ref.cteId).map { cteDef =>
-          CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output, 
cteDef.isStreaming)
+          if (ref.recursive) {
+            // Recursive references can be resolved from the anchor term. 
Non-resolved ref
+            // implies non-resolved definition. Since the definition was 
present in the map of
+            // resolved and "partially" resolved definitions, the only 
explanation is that
+            // definition was "partially" resolved.
+            val anchorResolved = recursiveAnchorResolved(cteDef)
+            if (anchorResolved.isDefined) {
+              ref.copy(_resolved = true, output = anchorResolved.get.output,
+                isStreaming = cteDef.isStreaming)
+            } else {
+              cteDef.failAnalysis(
+                errorClass = "INVALID_RECURSIVE_CTE",
+                messageParameters = Map.empty)
+            }
+          } else if (cteDef.resolved) {
+            ref.copy(_resolved = true, output = cteDef.output, isStreaming = 
cteDef.isStreaming)
+          } else {
+            // In the non-recursive case, cteDefMap contains only resolved 
Definitions.
+            cteDef.failAnalysis(

Review Comment:
   It changes the non-recursive behavior - if the def is unresolved now we 
would throw an error. Also, completely unrelated to the problem.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  // Substitute CTERelationRef with UnionLoopRef.
+  private def transformRefs(plan: LogicalPlan) = {
+    plan.transformWithPruning(_.containsPattern(CTE)) {
+      case r: CTERelationRef if r.recursive =>
+        UnionLoopRef(r.cteId, r.output, false)
+    }
+  }
+
+  // Update the definition's recursiveAnchor if the anchor is resolved.
+  private def recursiveAnchorResolved(cteDef: CTERelationDef): 
Option[LogicalPlan] = {
+    cteDef.child match {
+      case SubqueryAlias(_, ul: UnionLoop) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, Distinct(ul: UnionLoop)) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) 
=>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: 
UnionLoop))) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case _ =>
+        cteDef.failAnalysis(
+          errorClass = "INVALID_RECURSIVE_CTE",
+          messageParameters = Map.empty)
+    }
+  }
+
   private def resolveWithCTE(
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
       case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
+        val newCTEDefs = cteDefs.map { cteDef =>

Review Comment:
   It feels like this `map` body should have two distinct cases - recursive and 
not recursive. We can rewrite it like this:
   ```
   cteDefs.map {
     case cteDef if !cteDef.isRecursive =>
       ...
     case cteDef =>
       ...
   }
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  // Substitute CTERelationRef with UnionLoopRef.
+  private def transformRefs(plan: LogicalPlan) = {
+    plan.transformWithPruning(_.containsPattern(CTE)) {
+      case r: CTERelationRef if r.recursive =>
+        UnionLoopRef(r.cteId, r.output, false)
+    }
+  }
+
+  // Update the definition's recursiveAnchor if the anchor is resolved.
+  private def recursiveAnchorResolved(cteDef: CTERelationDef): 
Option[LogicalPlan] = {
+    cteDef.child match {
+      case SubqueryAlias(_, ul: UnionLoop) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, Distinct(ul: UnionLoop)) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) 
=>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: 
UnionLoop))) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case _ =>
+        cteDef.failAnalysis(
+          errorClass = "INVALID_RECURSIVE_CTE",
+          messageParameters = Map.empty)
+    }
+  }
+
   private def resolveWithCTE(
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
       case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
+        val newCTEDefs = cteDefs.map { cteDef =>
+          val newCTEDef = if (cteDef.recursive) {
+            cteDef.child match {
+              // Substitutions to UnionLoop and UnionLoopRef.
+              case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, 
false)) =>

Review Comment:
   Maybe we can introduce an extractor object to reduce complexity here:
   
   ```
   object ReplaceUnionWithUnionLoop {
     def unapply(plan: LogicalPlan): Option[UnionLoop] = plan match {
       case distinctUnion: Distinct(Union(Seq(anchor, recursion), false, 
false)) =>
         Some(UnionLoop(cteDef.id, Distinct(anchor), 
Except(transformRefs(recursion), UnionLoopRef(cteDef.id, cteDef.output, true), 
false)))
       case union: Union(Seq(anchor, recursion), false, false) =>
         Some(UnionLoop(cteDef.id, anchor, transformRefs(recursion)))
       case _ =>
         None
     }
   }
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  // Substitute CTERelationRef with UnionLoopRef.

Review Comment:
   Can you please explain in the method doc why exactly are we replacing all 
the simple refs with union refs under a `UnionLoop`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  // Substitute CTERelationRef with UnionLoopRef.
+  private def transformRefs(plan: LogicalPlan) = {
+    plan.transformWithPruning(_.containsPattern(CTE)) {
+      case r: CTERelationRef if r.recursive =>
+        UnionLoopRef(r.cteId, r.output, false)
+    }
+  }
+
+  // Update the definition's recursiveAnchor if the anchor is resolved.
+  private def recursiveAnchorResolved(cteDef: CTERelationDef): 
Option[LogicalPlan] = {
+    cteDef.child match {
+      case SubqueryAlias(_, ul: UnionLoop) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, Distinct(ul: UnionLoop)) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) 
=>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: 
UnionLoop))) =>
+        if (ul.anchor.resolved) {
+          Some(ul.anchor)
+        } else {
+          None
+        }
+      case _ =>
+        cteDef.failAnalysis(
+          errorClass = "INVALID_RECURSIVE_CTE",
+          messageParameters = Map.empty)
+    }
+  }
+
   private def resolveWithCTE(
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
       case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
+        val newCTEDefs = cteDefs.map { cteDef =>
+          val newCTEDef = if (cteDef.recursive) {
+            cteDef.child match {
+              // Substitutions to UnionLoop and UnionLoopRef.
+              case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, 
false)) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    UnionLoop(cteDef.id, anchor, transformRefs(recursion))))
+              case a @ SubqueryAlias(_,
+              ca @ UnresolvedSubqueryColumnAliases(_,
+              Union(Seq(anchor, recursion), false, false))) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    ca.copy(child =
+                      UnionLoop(cteDef.id, anchor, transformRefs(recursion)))))
+              // If the recursion is described with an UNION (deduplicating) 
clause then the
+              // recursive term should not return those rows that have been 
calculated previously,
+              // and we exclude those rows from the current iteration result.
+              case a @ SubqueryAlias(_, Distinct(Union(Seq(anchor, recursion), 
false, false))) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    UnionLoop(cteDef.id,
+                      Distinct(anchor),
+                      Except(
+                        transformRefs(recursion),
+                        UnionLoopRef(cteDef.id, cteDef.output, true),
+                        false))))
+              case a @ SubqueryAlias(_,
+              ca @ UnresolvedSubqueryColumnAliases(_, 
Distinct(Union(Seq(anchor, recursion),
+              false, false)))) =>
+                cteDef.copy(child =
+                  a.copy(child =
+                    ca.copy(child =
+                      UnionLoop(cteDef.id,
+                        Distinct(anchor),
+                        Except(
+                          transformRefs(recursion),
+                          UnionLoopRef(cteDef.id, cteDef.output, true),
+                          false)))))
+              case _ =>
+                // We do not support cases of sole Union (needs a 
SubqueryAlias above it), nor
+                // Project (as UnresolvedSubqueryColumnAliases have not been 
substituted with the
+                // Project yet), leaving us with cases of SubqueryAlias->Union 
and SubqueryAlias->
+                // UnresolvedSubqueryColumnAliases->Union. The same applies to 
Distinct Union.
+                cteDef.failAnalysis(
+                    errorClass = "INVALID_RECURSIVE_CTE",
+                    messageParameters = Map.empty)
+            }
+          } else {
+            cteDef
           }
+
+          if (newCTEDef.recursive) {
+            // cteDefMap holds "partially" resolved (only via anchor) CTE 
definitions in the
+            // recursive case.
+            if (newCTEDef.resolved) {
+              newCTEDef.failAnalysis(
+                errorClass = "INVALID_RECURSIVE_CTE",
+                messageParameters = Map.empty)
+            }
+            if (recursiveAnchorResolved(newCTEDef).isDefined) {
+              cteDefMap.put(newCTEDef.id, newCTEDef)
+            }
+          } else {
+            if (newCTEDef.resolved) {
+              cteDefMap.put(newCTEDef.id, newCTEDef)
+            }
+          }
+
+          newCTEDef
         }
-        w
+        w.copy(cteDefs = newCTEDefs)
 
       case ref: CTERelationRef if !ref.resolved =>
         cteDefMap.get(ref.cteId).map { cteDef =>

Review Comment:
   How about we rewrite it a bit:
   
   ```
   cteDefMap.get(ref.cteId) match {
     case Some(cteDef) if !ref.recursive=>
       ref.copy(_resolved = true, output = cteDef.output, isStreaming = 
cteDef.isStreaming)
     case Some(cteDef) =>
       ...
     case None =>
       ref
   }
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  // Substitute CTERelationRef with UnionLoopRef.
+  private def transformRefs(plan: LogicalPlan) = {

Review Comment:
   Shall we place methods in a top-down order as they are used? More natural 
for reading.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  // Substitute CTERelationRef with UnionLoopRef.
+  private def transformRefs(plan: LogicalPlan) = {

Review Comment:
   ```suggestion
     private def replaceSimpleRefsWithUnionLoopRefs(plan: LogicalPlan) = {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to