nemanjapetr-db commented on code in PR #49351: URL: https://github.com/apache/spark/pull/49351#discussion_r1912546917
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala: ########## @@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] { } } + // Substitute CTERelationRef with UnionLoopRef. + private def transformRefs(plan: LogicalPlan) = { + plan.transformWithPruning(_.containsPattern(CTE)) { + case r: CTERelationRef if r.recursive => + UnionLoopRef(r.cteId, r.output, false) + } + } + + // Update the definition's recursiveAnchor if the anchor is resolved. + private def recursiveAnchorResolved(cteDef: CTERelationDef): Option[LogicalPlan] = { + cteDef.child match { + case SubqueryAlias(_, ul: UnionLoop) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, Distinct(ul: UnionLoop)) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: UnionLoop))) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case _ => + cteDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", + messageParameters = Map.empty) + } + } + private def resolveWithCTE( plan: LogicalPlan, cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = { plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) { case w @ WithCTE(_, cteDefs) => - cteDefs.foreach { cteDef => - if (cteDef.resolved) { - cteDefMap.put(cteDef.id, cteDef) + val newCTEDefs = cteDefs.map { cteDef => + val newCTEDef = if (cteDef.recursive) { + cteDef.child match { + // Substitutions to UnionLoop and UnionLoopRef. + case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, false)) => Review Comment: I will address this one in the next round. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala: ########## @@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] { } } + // Substitute CTERelationRef with UnionLoopRef. + private def transformRefs(plan: LogicalPlan) = { + plan.transformWithPruning(_.containsPattern(CTE)) { + case r: CTERelationRef if r.recursive => + UnionLoopRef(r.cteId, r.output, false) + } + } + + // Update the definition's recursiveAnchor if the anchor is resolved. + private def recursiveAnchorResolved(cteDef: CTERelationDef): Option[LogicalPlan] = { + cteDef.child match { + case SubqueryAlias(_, ul: UnionLoop) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, Distinct(ul: UnionLoop)) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: UnionLoop))) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case _ => + cteDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", + messageParameters = Map.empty) + } + } + private def resolveWithCTE( plan: LogicalPlan, cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = { plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) { case w @ WithCTE(_, cteDefs) => - cteDefs.foreach { cteDef => - if (cteDef.resolved) { - cteDefMap.put(cteDef.id, cteDef) + val newCTEDefs = cteDefs.map { cteDef => + val newCTEDef = if (cteDef.recursive) { + cteDef.child match { + // Substitutions to UnionLoop and UnionLoopRef. + case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, false)) => + cteDef.copy(child = + a.copy(child = + UnionLoop(cteDef.id, anchor, transformRefs(recursion)))) + case a @ SubqueryAlias(_, + ca @ UnresolvedSubqueryColumnAliases(_, + Union(Seq(anchor, recursion), false, false))) => + cteDef.copy(child = + a.copy(child = + ca.copy(child = + UnionLoop(cteDef.id, anchor, transformRefs(recursion))))) + // If the recursion is described with an UNION (deduplicating) clause then the + // recursive term should not return those rows that have been calculated previously, + // and we exclude those rows from the current iteration result. + case a @ SubqueryAlias(_, Distinct(Union(Seq(anchor, recursion), false, false))) => + cteDef.copy(child = + a.copy(child = + UnionLoop(cteDef.id, + Distinct(anchor), + Except( + transformRefs(recursion), + UnionLoopRef(cteDef.id, cteDef.output, true), + false)))) + case a @ SubqueryAlias(_, + ca @ UnresolvedSubqueryColumnAliases(_, Distinct(Union(Seq(anchor, recursion), + false, false)))) => + cteDef.copy(child = + a.copy(child = + ca.copy(child = + UnionLoop(cteDef.id, + Distinct(anchor), + Except( + transformRefs(recursion), + UnionLoopRef(cteDef.id, cteDef.output, true), + false))))) + case _ => + // We do not support cases of sole Union (needs a SubqueryAlias above it), nor + // Project (as UnresolvedSubqueryColumnAliases have not been substituted with the + // Project yet), leaving us with cases of SubqueryAlias->Union and SubqueryAlias-> + // UnresolvedSubqueryColumnAliases->Union. The same applies to Distinct Union. + cteDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", + messageParameters = Map.empty) + } + } else { + cteDef } + + if (newCTEDef.recursive) { + // cteDefMap holds "partially" resolved (only via anchor) CTE definitions in the + // recursive case. + if (newCTEDef.resolved) { + newCTEDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", + messageParameters = Map.empty) + } + if (recursiveAnchorResolved(newCTEDef).isDefined) { + cteDefMap.put(newCTEDef.id, newCTEDef) + } + } else { + if (newCTEDef.resolved) { + cteDefMap.put(newCTEDef.id, newCTEDef) + } + } + + newCTEDef } - w + w.copy(cteDefs = newCTEDefs) case ref: CTERelationRef if !ref.resolved => cteDefMap.get(ref.cteId).map { cteDef => - CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output, cteDef.isStreaming) + if (ref.recursive) { + // Recursive references can be resolved from the anchor term. Non-resolved ref + // implies non-resolved definition. Since the definition was present in the map of + // resolved and "partially" resolved definitions, the only explanation is that + // definition was "partially" resolved. + val anchorResolved = recursiveAnchorResolved(cteDef) + if (anchorResolved.isDefined) { + ref.copy(_resolved = true, output = anchorResolved.get.output, + isStreaming = cteDef.isStreaming) + } else { + cteDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", + messageParameters = Map.empty) + } + } else if (cteDef.resolved) { + ref.copy(_resolved = true, output = cteDef.output, isStreaming = cteDef.isStreaming) + } else { + // In the non-recursive case, cteDefMap contains only resolved Definitions. + cteDef.failAnalysis( Review Comment: In the non-recursive case: We are pulling this def out from cteDefMap, and it is in that map precisely because it is resolved. The map insertion code above is: if (newCTEDef.resolved) { cteDefMap.put(newCTEDef.id, newCTEDef) } This is a sanity check that is enforcing the invariant. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala: ########## @@ -539,48 +580,56 @@ case class Union( children.length > 1 && !(byName || allowMissingCol) && childrenResolved && allChildrenCompatible } - private lazy val lazyOutput: Seq[Attribute] = computeOutput() + override protected def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): Union = + copy(children = newChildren) +} - private def computeOutput(): Seq[Attribute] = Union.mergeChildOutputs(children.map(_.output)) +/** + * The logical node for recursion, that contains a initial (anchor) and a recursion describing term, + * that contains an [[UnionLoopRef]] node. + * The node is very similar to [[Union]] because the initial and "generated" children are union-ed + * and it is also similar to a loop because the recursion continues until the last generated child + * is not empty. + * + * @param id The id of the loop, inherited from [[CTERelationDef]] + * @param anchor The plan of the initial element of the loop. + * @param recursion The plan that describes the recursion with an [[UnionLoopRef]] node. + * @param limit An optional limit that can be pushed down to the node to stop the loop earlier. + */ +case class UnionLoop(id: Long, + anchor: LogicalPlan, + recursion: LogicalPlan, + limit: Option[Int] = None) extends UnionBase { + override def children: Seq[LogicalPlan] = Seq(anchor, recursion) + + override protected def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): UnionLoop = + copy(anchor = newChildren(0), recursion = newChildren(1)) +} - /** - * Maps the constraints containing a given (original) sequence of attributes to those with a - * given (reference) sequence of attributes. Given the nature of union, we expect that the - * mapping between the original and reference sequences are symmetric. - */ - private def rewriteConstraints( - reference: Seq[Attribute], - original: Seq[Attribute], - constraints: ExpressionSet): ExpressionSet = { - require(reference.size == original.size) - val attributeRewrites = AttributeMap(original.zip(reference)) - constraints.map(_ transform { - case a: Attribute => attributeRewrites(a) - }) - } +/** + * The recursive reference in the recursive term of an [[UnionLoop]] node. + * + * @param loopId The id of the loop, inherited from [[CTERelationRef]] + * @param output The output attributes of this recursive reference. + * @param accumulated If false the the reference stands for the result of the previous iteration. + * If it is true then then it stands for the union of all previous iteration + * results. + */ +case class UnionLoopRef(loopId: Long, Review Comment: Will do at the very end, when all other non-refactoring comments are resolved. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala: ########## @@ -762,4 +762,15 @@ object QueryPlan extends PredicateHelper { case e: AnalysisException => append(e.toString) } } + + /** + * Generate detailed field string with different format based on type of input value Review Comment: Done. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala: ########## @@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] { } } + // Substitute CTERelationRef with UnionLoopRef. + private def transformRefs(plan: LogicalPlan) = { + plan.transformWithPruning(_.containsPattern(CTE)) { + case r: CTERelationRef if r.recursive => + UnionLoopRef(r.cteId, r.output, false) + } + } + + // Update the definition's recursiveAnchor if the anchor is resolved. + private def recursiveAnchorResolved(cteDef: CTERelationDef): Option[LogicalPlan] = { + cteDef.child match { + case SubqueryAlias(_, ul: UnionLoop) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, Distinct(ul: UnionLoop)) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: UnionLoop))) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case _ => + cteDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", + messageParameters = Map.empty) + } + } + private def resolveWithCTE( plan: LogicalPlan, cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = { plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) { case w @ WithCTE(_, cteDefs) => - cteDefs.foreach { cteDef => - if (cteDef.resolved) { - cteDefMap.put(cteDef.id, cteDef) + val newCTEDefs = cteDefs.map { cteDef => + val newCTEDef = if (cteDef.recursive) { + cteDef.child match { + // Substitutions to UnionLoop and UnionLoopRef. + case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, false)) => + cteDef.copy(child = + a.copy(child = + UnionLoop(cteDef.id, anchor, transformRefs(recursion)))) + case a @ SubqueryAlias(_, + ca @ UnresolvedSubqueryColumnAliases(_, + Union(Seq(anchor, recursion), false, false))) => + cteDef.copy(child = + a.copy(child = + ca.copy(child = + UnionLoop(cteDef.id, anchor, transformRefs(recursion))))) + // If the recursion is described with an UNION (deduplicating) clause then the + // recursive term should not return those rows that have been calculated previously, + // and we exclude those rows from the current iteration result. + case a @ SubqueryAlias(_, Distinct(Union(Seq(anchor, recursion), false, false))) => + cteDef.copy(child = + a.copy(child = + UnionLoop(cteDef.id, + Distinct(anchor), + Except( + transformRefs(recursion), + UnionLoopRef(cteDef.id, cteDef.output, true), + false)))) + case a @ SubqueryAlias(_, + ca @ UnresolvedSubqueryColumnAliases(_, Distinct(Union(Seq(anchor, recursion), + false, false)))) => + cteDef.copy(child = + a.copy(child = + ca.copy(child = + UnionLoop(cteDef.id, + Distinct(anchor), + Except( + transformRefs(recursion), + UnionLoopRef(cteDef.id, cteDef.output, true), + false))))) + case _ => + // We do not support cases of sole Union (needs a SubqueryAlias above it), nor + // Project (as UnresolvedSubqueryColumnAliases have not been substituted with the + // Project yet), leaving us with cases of SubqueryAlias->Union and SubqueryAlias-> + // UnresolvedSubqueryColumnAliases->Union. The same applies to Distinct Union. + cteDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", Review Comment: Done. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala: ########## @@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] { } } + // Substitute CTERelationRef with UnionLoopRef. + private def transformRefs(plan: LogicalPlan) = { + plan.transformWithPruning(_.containsPattern(CTE)) { + case r: CTERelationRef if r.recursive => + UnionLoopRef(r.cteId, r.output, false) + } + } + + // Update the definition's recursiveAnchor if the anchor is resolved. + private def recursiveAnchorResolved(cteDef: CTERelationDef): Option[LogicalPlan] = { + cteDef.child match { + case SubqueryAlias(_, ul: UnionLoop) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, Distinct(ul: UnionLoop)) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: UnionLoop))) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case _ => + cteDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", + messageParameters = Map.empty) + } + } + private def resolveWithCTE( plan: LogicalPlan, cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = { plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) { case w @ WithCTE(_, cteDefs) => - cteDefs.foreach { cteDef => - if (cteDef.resolved) { - cteDefMap.put(cteDef.id, cteDef) + val newCTEDefs = cteDefs.map { cteDef => + val newCTEDef = if (cteDef.recursive) { + cteDef.child match { + // Substitutions to UnionLoop and UnionLoopRef. + case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, false)) => Review Comment: Addressed, though this code may be changed to address the other feedback. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala: ########## @@ -539,48 +580,56 @@ case class Union( children.length > 1 && !(byName || allowMissingCol) && childrenResolved && allChildrenCompatible } - private lazy val lazyOutput: Seq[Attribute] = computeOutput() + override protected def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): Union = + copy(children = newChildren) +} - private def computeOutput(): Seq[Attribute] = Union.mergeChildOutputs(children.map(_.output)) +/** + * The logical node for recursion, that contains a initial (anchor) and a recursion describing term, + * that contains an [[UnionLoopRef]] node. + * The node is very similar to [[Union]] because the initial and "generated" children are union-ed + * and it is also similar to a loop because the recursion continues until the last generated child + * is not empty. + * + * @param id The id of the loop, inherited from [[CTERelationDef]] + * @param anchor The plan of the initial element of the loop. + * @param recursion The plan that describes the recursion with an [[UnionLoopRef]] node. + * @param limit An optional limit that can be pushed down to the node to stop the loop earlier. + */ +case class UnionLoop(id: Long, Review Comment: Done. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala: ########## @@ -462,6 +462,57 @@ object Union { } } +abstract class UnionBase extends LogicalPlan { Review Comment: Added a comment that reflects the helper nature of this class, it is never instantiated per se and replects commonalities between two different use cases of UNION (ALL). ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala: ########## @@ -37,21 +38,150 @@ object ResolveWithCTE extends Rule[LogicalPlan] { } } + // Substitute CTERelationRef with UnionLoopRef. + private def transformRefs(plan: LogicalPlan) = { + plan.transformWithPruning(_.containsPattern(CTE)) { + case r: CTERelationRef if r.recursive => + UnionLoopRef(r.cteId, r.output, false) + } + } + + // Update the definition's recursiveAnchor if the anchor is resolved. + private def recursiveAnchorResolved(cteDef: CTERelationDef): Option[LogicalPlan] = { + cteDef.child match { + case SubqueryAlias(_, ul: UnionLoop) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, Distinct(ul: UnionLoop)) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, ul: UnionLoop)) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(_, Distinct(ul: UnionLoop))) => + if (ul.anchor.resolved) { + Some(ul.anchor) + } else { + None + } + case _ => + cteDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", + messageParameters = Map.empty) + } + } + private def resolveWithCTE( plan: LogicalPlan, cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = { plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) { case w @ WithCTE(_, cteDefs) => - cteDefs.foreach { cteDef => - if (cteDef.resolved) { - cteDefMap.put(cteDef.id, cteDef) + val newCTEDefs = cteDefs.map { cteDef => + val newCTEDef = if (cteDef.recursive) { + cteDef.child match { + // Substitutions to UnionLoop and UnionLoopRef. + case a @ SubqueryAlias(_, Union(Seq(anchor, recursion), false, false)) => + cteDef.copy(child = + a.copy(child = + UnionLoop(cteDef.id, anchor, transformRefs(recursion)))) + case a @ SubqueryAlias(_, + ca @ UnresolvedSubqueryColumnAliases(_, + Union(Seq(anchor, recursion), false, false))) => + cteDef.copy(child = + a.copy(child = + ca.copy(child = + UnionLoop(cteDef.id, anchor, transformRefs(recursion))))) + // If the recursion is described with an UNION (deduplicating) clause then the + // recursive term should not return those rows that have been calculated previously, + // and we exclude those rows from the current iteration result. + case a @ SubqueryAlias(_, Distinct(Union(Seq(anchor, recursion), false, false))) => + cteDef.copy(child = + a.copy(child = + UnionLoop(cteDef.id, + Distinct(anchor), + Except( + transformRefs(recursion), + UnionLoopRef(cteDef.id, cteDef.output, true), + false)))) + case a @ SubqueryAlias(_, + ca @ UnresolvedSubqueryColumnAliases(_, Distinct(Union(Seq(anchor, recursion), + false, false)))) => + cteDef.copy(child = + a.copy(child = + ca.copy(child = + UnionLoop(cteDef.id, + Distinct(anchor), + Except( + transformRefs(recursion), + UnionLoopRef(cteDef.id, cteDef.output, true), + false))))) + case _ => + // We do not support cases of sole Union (needs a SubqueryAlias above it), nor + // Project (as UnresolvedSubqueryColumnAliases have not been substituted with the + // Project yet), leaving us with cases of SubqueryAlias->Union and SubqueryAlias-> + // UnresolvedSubqueryColumnAliases->Union. The same applies to Distinct Union. + cteDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", + messageParameters = Map.empty) + } + } else { + cteDef } + + if (newCTEDef.recursive) { + // cteDefMap holds "partially" resolved (only via anchor) CTE definitions in the + // recursive case. + if (newCTEDef.resolved) { + newCTEDef.failAnalysis( + errorClass = "INVALID_RECURSIVE_CTE", + messageParameters = Map.empty) + } + if (recursiveAnchorResolved(newCTEDef).isDefined) { + cteDefMap.put(newCTEDef.id, newCTEDef) + } + } else { + if (newCTEDef.resolved) { + cteDefMap.put(newCTEDef.id, newCTEDef) + } + } + + newCTEDef } - w + w.copy(cteDefs = newCTEDefs) case ref: CTERelationRef if !ref.resolved => cteDefMap.get(ref.cteId).map { cteDef => Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org