vladimirg-db commented on code in PR #49351: URL: https://github.com/apache/spark/pull/49351#discussion_r1910920492
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala: ########## @@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] { } } + // Substitute CTERelationRef with UnionLoopRef. + private def transformRefs(plan: LogicalPlan) = { + plan.transformWithPruning(_.containsPattern(CTE)) { + case r: CTERelationRef if r.recursive => + UnionLoopRef(r.cteId, r.output, false) + } + } + + // Update the definition's recursiveAnchor if the anchor is resolved. + private def recursiveAnchorResolved(cteDef: CTERelationDef): Option[LogicalPlan] = { + cteDef.child match { + case SubqueryAlias(_, ul: UnionLoop) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, Distinct(ul: UnionLoop)) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: UnionLoop))) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case _ => + cteDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", + messageParameters = Map.empty) + } + } + private def resolveWithCTE( plan: LogicalPlan, cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = { plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) { case w @ WithCTE(_, cteDefs) => - cteDefs.foreach { cteDef => - if (cteDef.resolved) { - cteDefMap.put(cteDef.id, cteDef) + val newCTEDefs = cteDefs.map { cteDef => + val newCTEDef = if (cteDef.recursive) { + cteDef.child match { + // Substitutions to UnionLoop and UnionLoopRef. + case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, false)) => + cteDef.copy(child = + a.copy(child = + UnionLoop(cteDef.id, anchor, transformRefs(recursion)))) + case a @ SubqueryAlias(_, + ca @ UnresolvedSubqueryColumnAliases(_, + Union(Seq(anchor, recursion), false, false))) => + cteDef.copy(child = + a.copy(child = + ca.copy(child = + UnionLoop(cteDef.id, anchor, transformRefs(recursion))))) + // If the recursion is described with an UNION (deduplicating) clause then the + // recursive term should not return those rows that have been calculated previously, + // and we exclude those rows from the current iteration result. + case a @ SubqueryAlias(_, Distinct(Union(Seq(anchor, recursion), false, false))) => + cteDef.copy(child = + a.copy(child = + UnionLoop(cteDef.id, + Distinct(anchor), + Except( + transformRefs(recursion), + UnionLoopRef(cteDef.id, cteDef.output, true), + false)))) + case a @ SubqueryAlias(_, + ca @ UnresolvedSubqueryColumnAliases(_, Distinct(Union(Seq(anchor, recursion), + false, false)))) => + cteDef.copy(child = + a.copy(child = + ca.copy(child = + UnionLoop(cteDef.id, + Distinct(anchor), + Except( + transformRefs(recursion), + UnionLoopRef(cteDef.id, cteDef.output, true), + false))))) + case _ => + // We do not support cases of sole Union (needs a SubqueryAlias above it), nor + // Project (as UnresolvedSubqueryColumnAliases have not been substituted with the + // Project yet), leaving us with cases of SubqueryAlias->Union and SubqueryAlias-> + // UnresolvedSubqueryColumnAliases->Union. The same applies to Distinct Union. + cteDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", Review Comment: Shall we add a new error class to `error-conditions.json` and a new function to `QueryCompilationErrors`? ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala: ########## @@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] { } } + // Substitute CTERelationRef with UnionLoopRef. + private def transformRefs(plan: LogicalPlan) = { + plan.transformWithPruning(_.containsPattern(CTE)) { + case r: CTERelationRef if r.recursive => + UnionLoopRef(r.cteId, r.output, false) + } + } + + // Update the definition's recursiveAnchor if the anchor is resolved. + private def recursiveAnchorResolved(cteDef: CTERelationDef): Option[LogicalPlan] = { + cteDef.child match { + case SubqueryAlias(_, ul: UnionLoop) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, Distinct(ul: UnionLoop)) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: UnionLoop))) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case _ => + cteDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", + messageParameters = Map.empty) + } + } + private def resolveWithCTE( plan: LogicalPlan, cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = { plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) { case w @ WithCTE(_, cteDefs) => - cteDefs.foreach { cteDef => - if (cteDef.resolved) { - cteDefMap.put(cteDef.id, cteDef) + val newCTEDefs = cteDefs.map { cteDef => + val newCTEDef = if (cteDef.recursive) { + cteDef.child match { + // Substitutions to UnionLoop and UnionLoopRef. + case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, false)) => Review Comment: I know that developers in Catalyst really enjoy one-letter variables in matches, but it does not feel like a good code health. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala: ########## @@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] { } } + // Substitute CTERelationRef with UnionLoopRef. + private def transformRefs(plan: LogicalPlan) = { + plan.transformWithPruning(_.containsPattern(CTE)) { + case r: CTERelationRef if r.recursive => + UnionLoopRef(r.cteId, r.output, false) + } + } + + // Update the definition's recursiveAnchor if the anchor is resolved. + private def recursiveAnchorResolved(cteDef: CTERelationDef): Option[LogicalPlan] = { + cteDef.child match { + case SubqueryAlias(_, ul: UnionLoop) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, Distinct(ul: UnionLoop)) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: UnionLoop))) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case _ => + cteDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", + messageParameters = Map.empty) + } + } + private def resolveWithCTE( plan: LogicalPlan, cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = { plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) { case w @ WithCTE(_, cteDefs) => - cteDefs.foreach { cteDef => - if (cteDef.resolved) { - cteDefMap.put(cteDef.id, cteDef) + val newCTEDefs = cteDefs.map { cteDef => + val newCTEDef = if (cteDef.recursive) { + cteDef.child match { + // Substitutions to UnionLoop and UnionLoopRef. + case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, false)) => + cteDef.copy(child = + a.copy(child = + UnionLoop(cteDef.id, anchor, transformRefs(recursion)))) + case a @ SubqueryAlias(_, + ca @ UnresolvedSubqueryColumnAliases(_, + Union(Seq(anchor, recursion), false, false))) => + cteDef.copy(child = + a.copy(child = + ca.copy(child = + UnionLoop(cteDef.id, anchor, transformRefs(recursion))))) + // If the recursion is described with an UNION (deduplicating) clause then the + // recursive term should not return those rows that have been calculated previously, + // and we exclude those rows from the current iteration result. + case a @ SubqueryAlias(_, Distinct(Union(Seq(anchor, recursion), false, false))) => + cteDef.copy(child = + a.copy(child = + UnionLoop(cteDef.id, + Distinct(anchor), + Except( + transformRefs(recursion), + UnionLoopRef(cteDef.id, cteDef.output, true), + false)))) + case a @ SubqueryAlias(_, + ca @ UnresolvedSubqueryColumnAliases(_, Distinct(Union(Seq(anchor, recursion), + false, false)))) => + cteDef.copy(child = + a.copy(child = + ca.copy(child = + UnionLoop(cteDef.id, + Distinct(anchor), + Except( + transformRefs(recursion), + UnionLoopRef(cteDef.id, cteDef.output, true), + false))))) + case _ => + // We do not support cases of sole Union (needs a SubqueryAlias above it), nor + // Project (as UnresolvedSubqueryColumnAliases have not been substituted with the + // Project yet), leaving us with cases of SubqueryAlias->Union and SubqueryAlias-> + // UnresolvedSubqueryColumnAliases->Union. The same applies to Distinct Union. + cteDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", + messageParameters = Map.empty) + } + } else { + cteDef } + + if (newCTEDef.recursive) { + // cteDefMap holds "partially" resolved (only via anchor) CTE definitions in the + // recursive case. + if (newCTEDef.resolved) { + newCTEDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", + messageParameters = Map.empty) + } + if (recursiveAnchorResolved(newCTEDef).isDefined) { + cteDefMap.put(newCTEDef.id, newCTEDef) + } + } else { + if (newCTEDef.resolved) { + cteDefMap.put(newCTEDef.id, newCTEDef) + } + } + + newCTEDef } - w + w.copy(cteDefs = newCTEDefs) case ref: CTERelationRef if !ref.resolved => cteDefMap.get(ref.cteId).map { cteDef => - CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output, cteDef.isStreaming) + if (ref.recursive) { + // Recursive references can be resolved from the anchor term. Non-resolved ref + // implies non-resolved definition. Since the definition was present in the map of + // resolved and "partially" resolved definitions, the only explanation is that + // definition was "partially" resolved. + val anchorResolved = recursiveAnchorResolved(cteDef) + if (anchorResolved.isDefined) { + ref.copy(_resolved = true, output = anchorResolved.get.output, + isStreaming = cteDef.isStreaming) + } else { + cteDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", + messageParameters = Map.empty) + } + } else if (cteDef.resolved) { + ref.copy(_resolved = true, output = cteDef.output, isStreaming = cteDef.isStreaming) + } else { + // In the non-recursive case, cteDefMap contains only resolved Definitions. + cteDef.failAnalysis( Review Comment: It changes the non-recursive behavior - if the def is unresolved now we would throw an error. Also, completely unrelated to the problem. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala: ########## @@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] { } } + // Substitute CTERelationRef with UnionLoopRef. + private def transformRefs(plan: LogicalPlan) = { + plan.transformWithPruning(_.containsPattern(CTE)) { + case r: CTERelationRef if r.recursive => + UnionLoopRef(r.cteId, r.output, false) + } + } + + // Update the definition's recursiveAnchor if the anchor is resolved. + private def recursiveAnchorResolved(cteDef: CTERelationDef): Option[LogicalPlan] = { + cteDef.child match { + case SubqueryAlias(_, ul: UnionLoop) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, Distinct(ul: UnionLoop)) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: UnionLoop))) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case _ => + cteDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", + messageParameters = Map.empty) + } + } + private def resolveWithCTE( plan: LogicalPlan, cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = { plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) { case w @ WithCTE(_, cteDefs) => - cteDefs.foreach { cteDef => - if (cteDef.resolved) { - cteDefMap.put(cteDef.id, cteDef) + val newCTEDefs = cteDefs.map { cteDef => Review Comment: It feels like this `map` body should have two distinct cases - recursive and not recursive. We can rewrite it like this: ``` cteDefs.map { case cteDef if !cteDef.isRecursive => ... case cteDef => ... } ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala: ########## @@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] { } } + // Substitute CTERelationRef with UnionLoopRef. + private def transformRefs(plan: LogicalPlan) = { + plan.transformWithPruning(_.containsPattern(CTE)) { + case r: CTERelationRef if r.recursive => + UnionLoopRef(r.cteId, r.output, false) + } + } + + // Update the definition's recursiveAnchor if the anchor is resolved. + private def recursiveAnchorResolved(cteDef: CTERelationDef): Option[LogicalPlan] = { + cteDef.child match { + case SubqueryAlias(_, ul: UnionLoop) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, Distinct(ul: UnionLoop)) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: UnionLoop))) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case _ => + cteDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", + messageParameters = Map.empty) + } + } + private def resolveWithCTE( plan: LogicalPlan, cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = { plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) { case w @ WithCTE(_, cteDefs) => - cteDefs.foreach { cteDef => - if (cteDef.resolved) { - cteDefMap.put(cteDef.id, cteDef) + val newCTEDefs = cteDefs.map { cteDef => + val newCTEDef = if (cteDef.recursive) { + cteDef.child match { + // Substitutions to UnionLoop and UnionLoopRef. + case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, false)) => Review Comment: Maybe we can introduce an extractor object to reduce complexity here: ``` object ReplaceUnionWithUnionLoop { def unapply(plan: LogicalPlan): Option[UnionLoop] = plan match { case distinctUnion: Distinct(Union(Seq(anchor, recursion), false, false)) => Some(UnionLoop(cteDef.id, Distinct(anchor), Except(transformRefs(recursion), UnionLoopRef(cteDef.id, cteDef.output, true), false))) case union: Union(Seq(anchor, recursion), false, false) => Some(UnionLoop(cteDef.id, anchor, transformRefs(recursion))) case _ => None } } ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala: ########## @@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] { } } + // Substitute CTERelationRef with UnionLoopRef. Review Comment: Can you please explain in the method doc why exactly are we replacing all the simple refs with union refs under a `UnionLoop` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala: ########## @@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] { } } + // Substitute CTERelationRef with UnionLoopRef. + private def transformRefs(plan: LogicalPlan) = { + plan.transformWithPruning(_.containsPattern(CTE)) { + case r: CTERelationRef if r.recursive => + UnionLoopRef(r.cteId, r.output, false) + } + } + + // Update the definition's recursiveAnchor if the anchor is resolved. + private def recursiveAnchorResolved(cteDef: CTERelationDef): Option[LogicalPlan] = { + cteDef.child match { + case SubqueryAlias(_, ul: UnionLoop) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, Distinct(ul: UnionLoop)) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: UnionLoop))) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case _ => + cteDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", + messageParameters = Map.empty) + } + } + private def resolveWithCTE( plan: LogicalPlan, cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = { plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) { case w @ WithCTE(_, cteDefs) => - cteDefs.foreach { cteDef => - if (cteDef.resolved) { - cteDefMap.put(cteDef.id, cteDef) + val newCTEDefs = cteDefs.map { cteDef => + val newCTEDef = if (cteDef.recursive) { + cteDef.child match { + // Substitutions to UnionLoop and UnionLoopRef. + case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, false)) => + cteDef.copy(child = + a.copy(child = + UnionLoop(cteDef.id, anchor, transformRefs(recursion)))) + case a @ SubqueryAlias(_, + ca @ UnresolvedSubqueryColumnAliases(_, + Union(Seq(anchor, recursion), false, false))) => + cteDef.copy(child = + a.copy(child = + ca.copy(child = + UnionLoop(cteDef.id, anchor, transformRefs(recursion))))) + // If the recursion is described with an UNION (deduplicating) clause then the + // recursive term should not return those rows that have been calculated previously, + // and we exclude those rows from the current iteration result. + case a @ SubqueryAlias(_, Distinct(Union(Seq(anchor, recursion), false, false))) => + cteDef.copy(child = + a.copy(child = + UnionLoop(cteDef.id, + Distinct(anchor), + Except( + transformRefs(recursion), + UnionLoopRef(cteDef.id, cteDef.output, true), + false)))) + case a @ SubqueryAlias(_, + ca @ UnresolvedSubqueryColumnAliases(_, Distinct(Union(Seq(anchor, recursion), + false, false)))) => + cteDef.copy(child = + a.copy(child = + ca.copy(child = + UnionLoop(cteDef.id, + Distinct(anchor), + Except( + transformRefs(recursion), + UnionLoopRef(cteDef.id, cteDef.output, true), + false))))) + case _ => + // We do not support cases of sole Union (needs a SubqueryAlias above it), nor + // Project (as UnresolvedSubqueryColumnAliases have not been substituted with the + // Project yet), leaving us with cases of SubqueryAlias->Union and SubqueryAlias-> + // UnresolvedSubqueryColumnAliases->Union. The same applies to Distinct Union. + cteDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", + messageParameters = Map.empty) + } + } else { + cteDef } + + if (newCTEDef.recursive) { + // cteDefMap holds "partially" resolved (only via anchor) CTE definitions in the + // recursive case. + if (newCTEDef.resolved) { + newCTEDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", + messageParameters = Map.empty) + } + if (recursiveAnchorResolved(newCTEDef).isDefined) { + cteDefMap.put(newCTEDef.id, newCTEDef) + } + } else { + if (newCTEDef.resolved) { + cteDefMap.put(newCTEDef.id, newCTEDef) + } + } + + newCTEDef } - w + w.copy(cteDefs = newCTEDefs) case ref: CTERelationRef if !ref.resolved => cteDefMap.get(ref.cteId).map { cteDef => Review Comment: How about we rewrite it a bit: ``` cteDefMap.get(ref.cteId) match { case Some(cteDef) if !ref.recursive=> ref.copy(_resolved = true, output = cteDef.output, isStreaming = cteDef.isStreaming) case Some(cteDef) => ... case None => ref } ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala: ########## @@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] { } } + // Substitute CTERelationRef with UnionLoopRef. + private def transformRefs(plan: LogicalPlan) = { Review Comment: Shall we place methods in a top-down order as they are used? More natural for reading. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala: ########## @@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] { } } + // Substitute CTERelationRef with UnionLoopRef. + private def transformRefs(plan: LogicalPlan) = { Review Comment: ```suggestion private def replaceSimpleRefsWithUnionLoopRefs(plan: LogicalPlan) = { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org