nemanjapetr-db commented on code in PR #49351:
URL: https://github.com/apache/spark/pull/49351#discussion_r1907989728


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,89 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  private def updateRecursiveAnchor(cteDef: CTERelationDef): CTERelationDef = {
+    cteDef.child match {
+      case SubqueryAlias(_, u: Union) =>
+        if (u.children.head.resolved) {
+          cteDef.copy(recursionAnchor = Some(u.children.head))
+        } else {
+          cteDef
+        }
+      case SubqueryAlias(_, d @ Distinct(u: Union)) =>
+        if (u.children.head.resolved) {
+          cteDef.copy(recursionAnchor = Some(d.copy(child = u.children.head)))
+        } else {
+          cteDef
+        }
+      case SubqueryAlias(_, a @ UnresolvedSubqueryColumnAliases(_, u: Union)) 
=>
+        if (u.children.head.resolved) {
+          cteDef.copy(recursionAnchor = Some(a.copy(child = u.children.head)))
+        } else {
+          cteDef
+        }
+      case SubqueryAlias(_, a @ UnresolvedSubqueryColumnAliases(_, d @ 
Distinct(u: Union))) =>
+        if (u.children.head.resolved) {
+          cteDef.copy(recursionAnchor = Some(a.copy(child = d.copy(child = 
u.children.head))))
+        } else {
+          cteDef
+        }
+      case _ =>
+        cteDef.failAnalysis(
+          errorClass = "INVALID_RECURSIVE_CTE",
+          messageParameters = Map.empty)
+    }
+  }
+
   private def resolveWithCTE(
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
       case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
+        val newCTEDefs = cteDefs.map { cteDef =>
+          // If a recursive CTE definition is not yet resolved then extract 
the anchor term to the
+          // definition, but if it is resolved then the extracted anchor term 
is no longer needed
+          // and can be removed.
+          val newCTEDef = if (cteDef.recursive) {
+            if (!cteDef.resolved) {
+              if (cteDef.recursionAnchor.isEmpty) {
+                updateRecursiveAnchor(cteDef)
+              } else {
+                cteDef
+              }
+            } else {
+              if (cteDef.recursionAnchor.nonEmpty) {

Review Comment:
   If we empty out the `recursionAnchor` then its `recursionAnchorResolved` is 
definitely returning false, so it will prevent to fetch the output in line 164
   `output = cteDef.recursionAnchor.get.output`
   and is also forcing line 167 that'll return a recursive, non-resolved Ref.
   
   Could that be the reason? If that is a bug, then we should not empty out 
`recursionAnchor` in line 142 and additionally an `if` statement in line 163 
should be
   `cteDef.recursionAnchorResolved || cteDef.resolved`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to