cloud-fan commented on code in PR #49351: URL: https://github.com/apache/spark/pull/49351#discussion_r1917868339
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala: ########## @@ -41,16 +47,113 @@ object ResolveWithCTE extends Rule[LogicalPlan] { plan: LogicalPlan, cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = { plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) { - case w @ WithCTE(_, cteDefs) => - cteDefs.foreach { cteDef => - if (cteDef.resolved) { - cteDefMap.put(cteDef.id, cteDef) - } + case withCTE @ WithCTE(_, cteDefs) => + val newCTEDefs = cteDefs.map { + // `cteDef.recursive` means "presence of a recursive CTERelationRef under cteDef". The + // side effect of node substitution below is that after CTERelationRef substitution + // its cteDef is no more considered `recursive`. This code path is common for `cteDef` + // that were non-recursive from the get go, as well as those that are no more recursive + // due to node substitution. + case cteDef if !cteDef.recursive => + if (cteDef.resolved) { + cteDefMap.put(cteDef.id, cteDef) + } + cteDef + case cteDef => + cteDef.child match { + // If it is a supported recursive CTE query pattern (4 so far), extract the anchor and + // recursive plans from the Union and rewrite Union with UnionLoop. The recursive CTE + // references inside UnionLoop's recursive plan will be rewritten as UnionLoopRef, + // using the output of the resolved anchor plan. The side effect of recursive + // CTERelationRef->UnionLoopRef substitution is that `cteDef` that was originally + // considered `recursive` is no more in the context of `cteDef.recursive` method + // definition. + // + // Simple case of duplicating (UNION ALL) clause. + case alias @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, false)) => + if (!anchor.resolved) { + cteDef + } else { + val loop = UnionLoop( + cteDef.id, + anchor, + rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, None)) + cteDef.copy(child = alias.copy(child = loop)) + } + + // The case of CTE name followed by a parenthesized list of column name(s), eg. + // WITH RECURSIVE t(n). + case alias @ SubqueryAlias(_, + columnAlias @ UnresolvedSubqueryColumnAliases( + colNames, + Union(Seq(anchor, recursion), false, false) + )) => + if (!anchor.resolved) { + cteDef + } else { + val loop = UnionLoop( + cteDef.id, + anchor, + rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, Some(colNames))) + cteDef.copy(child = alias.copy(child = columnAlias.copy(child = loop))) + } + + // If the recursion is described with an UNION (deduplicating) clause then the + // recursive term should not return those rows that have been calculated previously, + // and we exclude those rows from the current iteration result. + case alias @ SubqueryAlias(_, + Distinct(Union(Seq(anchor, recursion), false, false))) => Review Comment: ```suggestion Distinct(Union(Seq(anchor, recursion), false, false))) => ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org