cloud-fan commented on code in PR #49351:
URL: https://github.com/apache/spark/pull/49351#discussion_r1917868339


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -41,16 +47,113 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
       plan: LogicalPlan,
       cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
     plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
-      case w @ WithCTE(_, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          if (cteDef.resolved) {
-            cteDefMap.put(cteDef.id, cteDef)
-          }
+      case withCTE @ WithCTE(_, cteDefs) =>
+        val newCTEDefs = cteDefs.map {
+          // `cteDef.recursive` means "presence of a recursive CTERelationRef 
under cteDef". The
+          // side effect of node substitution below is that after 
CTERelationRef substitution
+          // its cteDef is no more considered `recursive`. This code path is 
common for `cteDef`
+          // that were non-recursive from the get go, as well as those that 
are no more recursive
+          // due to node substitution.
+          case cteDef if !cteDef.recursive =>
+            if (cteDef.resolved) {
+              cteDefMap.put(cteDef.id, cteDef)
+            }
+            cteDef
+          case cteDef =>
+            cteDef.child match {
+              // If it is a supported recursive CTE query pattern (4 so far), 
extract the anchor and
+              // recursive plans from the Union and rewrite Union with 
UnionLoop. The recursive CTE
+              // references inside UnionLoop's recursive plan will be 
rewritten as UnionLoopRef,
+              // using the output of the resolved anchor plan. The side effect 
of recursive
+              // CTERelationRef->UnionLoopRef substitution is that `cteDef` 
that was originally
+              // considered `recursive` is no more in the context of 
`cteDef.recursive` method
+              // definition.
+              //
+              // Simple case of duplicating (UNION ALL) clause.
+              case alias @ SubqueryAlias(_, Union(Seq(anchor, recursion), 
false, false)) =>
+                if (!anchor.resolved) {
+                  cteDef
+                } else {
+                  val loop = UnionLoop(
+                    cteDef.id,
+                    anchor,
+                    rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, 
None))
+                  cteDef.copy(child = alias.copy(child = loop))
+                }
+
+              // The case of CTE name followed by a parenthesized list of 
column name(s), eg.
+              // WITH RECURSIVE t(n).
+              case alias @ SubqueryAlias(_,
+              columnAlias @ UnresolvedSubqueryColumnAliases(
+              colNames,
+              Union(Seq(anchor, recursion), false, false)
+              )) =>
+                if (!anchor.resolved) {
+                  cteDef
+                } else {
+                  val loop = UnionLoop(
+                    cteDef.id,
+                    anchor,
+                    rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, 
Some(colNames)))
+                  cteDef.copy(child = alias.copy(child = 
columnAlias.copy(child = loop)))
+                }
+
+              // If the recursion is described with an UNION (deduplicating) 
clause then the
+              // recursive term should not return those rows that have been 
calculated previously,
+              // and we exclude those rows from the current iteration result.
+              case alias @ SubqueryAlias(_,
+              Distinct(Union(Seq(anchor, recursion), false, false))) =>

Review Comment:
   ```suggestion
                     Distinct(Union(Seq(anchor, recursion), false, false))) =>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to