nemanjapetr-db commented on code in PR #49351:
URL: https://github.com/apache/spark/pull/49351#discussion_r1907989728


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,89 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  private def updateRecursiveAnchor(cteDef: CTERelationDef): CTERelationDef = {
+    cteDef.child match {
+      case SubqueryAlias(_, u: Union) =>
+        if (u.children.head.resolved) {
+          cteDef.copy(recursionAnchor = Some(u.children.head))
+        } else {
+          cteDef
+        }
+      case SubqueryAlias(_, d @ Distinct(u: Union)) =>
+        if (u.children.head.resolved) {
+          cteDef.copy(recursionAnchor = Some(d.copy(child = u.children.head)))
+        } else {
+          cteDef
+        }
+      case SubqueryAlias(_, a @ UnresolvedSubqueryColumnAliases(_, u: Union)) 
=>
+        if (u.children.head.resolved) {
+          cteDef.copy(recursionAnchor = Some(a.copy(child = u.children.head)))
+        } else {
+          cteDef
+        }
+      case SubqueryAlias(_, a @ UnresolvedSubqueryColumnAliases(_, d @ 
Distinct(u: Union))) =>
+        if (u.children.head.resolved) {
+          cteDef.copy(recursionAnchor = Some(a.copy(child = d.copy(child = 
u.children.head))))
+        } else {
+          cteDef
+        }
+      case _ =>
+        cteDef.failAnalysis(
+          errorClass = "INVALID_RECURSIVE_CTE",
+          messageParameters = Map.empty)
+    }
+  }
+
   private def resolveWithCTE(
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
       case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
+        val newCTEDefs = cteDefs.map { cteDef =>
+          // If a recursive CTE definition is not yet resolved then extract 
the anchor term to the
+          // definition, but if it is resolved then the extracted anchor term 
is no longer needed
+          // and can be removed.
+          val newCTEDef = if (cteDef.recursive) {
+            if (!cteDef.resolved) {
+              if (cteDef.recursionAnchor.isEmpty) {
+                updateRecursiveAnchor(cteDef)
+              } else {
+                cteDef
+              }
+            } else {
+              if (cteDef.recursionAnchor.nonEmpty) {

Review Comment:
   `recursionAnchor` removed, comment is now moot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to