cloud-fan commented on code in PR #49351:
URL: https://github.com/apache/spark/pull/49351#discussion_r1904136373


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -37,21 +38,89 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
     }
   }
 
+  private def updateRecursiveAnchor(cteDef: CTERelationDef): CTERelationDef = {
+    cteDef.child match {
+      case SubqueryAlias(_, u: Union) =>
+        if (u.children.head.resolved) {
+          cteDef.copy(recursionAnchor = Some(u.children.head))
+        } else {
+          cteDef
+        }
+      case SubqueryAlias(_, d @ Distinct(u: Union)) =>
+        if (u.children.head.resolved) {
+          cteDef.copy(recursionAnchor = Some(d.copy(child = u.children.head)))
+        } else {
+          cteDef
+        }
+      case SubqueryAlias(_, a @ UnresolvedSubqueryColumnAliases(_, u: Union)) 
=>
+        if (u.children.head.resolved) {
+          cteDef.copy(recursionAnchor = Some(a.copy(child = u.children.head)))
+        } else {
+          cteDef
+        }
+      case SubqueryAlias(_, a @ UnresolvedSubqueryColumnAliases(_, d @ 
Distinct(u: Union))) =>
+        if (u.children.head.resolved) {
+          cteDef.copy(recursionAnchor = Some(a.copy(child = d.copy(child = 
u.children.head))))
+        } else {
+          cteDef
+        }
+      case _ =>
+        cteDef.failAnalysis(
+          errorClass = "INVALID_RECURSIVE_CTE",
+          messageParameters = Map.empty)
+    }
+  }
+
   private def resolveWithCTE(
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
       case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
+        val newCTEDefs = cteDefs.map { cteDef =>
+          // If a recursive CTE definition is not yet resolved then extract 
the anchor term to the
+          // definition, but if it is resolved then the extracted anchor term 
is no longer needed
+          // and can be removed.
+          val newCTEDef = if (cteDef.recursive) {
+            if (!cteDef.resolved) {
+              if (cteDef.recursionAnchor.isEmpty) {
+                updateRecursiveAnchor(cteDef)
+              } else {
+                cteDef
+              }
+            } else {
+              if (cteDef.recursionAnchor.nonEmpty) {

Review Comment:
   oh sorry I misread the code. Now the question becomes: why do we empty out 
the `recursionAnchor` if the CTE def is resolved?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to