cloud-fan commented on code in PR #49351: URL: https://github.com/apache/spark/pull/49351#discussion_r1903881211
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala: ########## @@ -539,48 +582,58 @@ case class Union( children.length > 1 && !(byName || allowMissingCol) && childrenResolved && allChildrenCompatible } - private lazy val lazyOutput: Seq[Attribute] = computeOutput() - - private def computeOutput(): Seq[Attribute] = Union.mergeChildOutputs(children.map(_.output)) - - /** - * Maps the constraints containing a given (original) sequence of attributes to those with a - * given (reference) sequence of attributes. Given the nature of union, we expect that the - * mapping between the original and reference sequences are symmetric. - */ - private def rewriteConstraints( - reference: Seq[Attribute], - original: Seq[Attribute], - constraints: ExpressionSet): ExpressionSet = { - require(reference.size == original.size) - val attributeRewrites = AttributeMap(original.zip(reference)) - constraints.map(_ transform { - case a: Attribute => attributeRewrites(a) - }) - } + override protected def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): Union = + copy(children = newChildren) +} - private def merge(a: ExpressionSet, b: ExpressionSet): ExpressionSet = { - val common = a.intersect(b) - // The constraint with only one reference could be easily inferred as predicate - // Grouping the constraints by it's references so we can combine the constraints with same - // reference together - val othera = a.diff(common).filter(_.references.size == 1).groupBy(_.references.head) - val otherb = b.diff(common).filter(_.references.size == 1).groupBy(_.references.head) - // loose the constraints by: A1 && B1 || A2 && B2 -> (A1 || A2) && (B1 || B2) - val others = (othera.keySet intersect otherb.keySet).map { attr => - Or(othera(attr).reduceLeft(And), otherb(attr).reduceLeft(And)) - } - common ++ others - } +/** + * The logical node for recursion, that contains a initial (anchor) and a recursion describing term, + * that contains an [[UnionLoopRef]] node. + * The node is very similar to [[Union]] because the initial and "generated" children are union-ed + * and it is also similar to a loop because the recursion continues until the last generated child + * is not empty. + * + * @param id The id of the loop, inherited from [[CTERelationDef]] + * @param anchor The plan of the initial element of the loop. + * @param recursion The plan that describes the recursion with an [[UnionLoopRef]] node. + * @param limit An optional limit that can be pushed down to the node to stop the loop earlier. + */ +case class UnionLoop( + id: Long, + anchor: LogicalPlan, + recursion: LogicalPlan, + limit: Option[Int] = None) extends UnionBase { + override def children: Seq[LogicalPlan] = Seq(anchor, recursion) + + override protected def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): UnionLoop = + copy(anchor = newChildren(0), recursion = newChildren(1)) +} - override protected lazy val validConstraints: ExpressionSet = { - children - .map(child => rewriteConstraints(children.head.output, child.output, child.constraints)) - .reduce(merge(_, _)) +/** + * The recursive reference in the recursive term of an [[UnionLoop]] node. + * + * @param loopId The id of the loop, inherited from [[CTERelationRef]] + * @param output The output attributes of this recursive reference. + * @param accumulated If false the the reference stands for the result of the previous iteration. + * If it is true then then it stands for the union of all previous iteration + * results. + */ +case class UnionLoopRef( Review Comment: where do we create `UnionLoopRef`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org