dtenedor commented on code in PR #49518: URL: https://github.com/apache/spark/pull/49518#discussion_r1924249528
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala: ########## @@ -183,4 +184,52 @@ object ResolveWithCTE extends Rule[LogicalPlan] { columnNames.map(UnresolvedSubqueryColumnAliases(_, ref)).getOrElse(ref) } } + + /** + * Checks if data types of anchor and recursive terms of a recursive CTE definition match. + */ + def checkDataTypesAnchorAndRecursiveTerm(unionLoop: UnionLoop): Unit = { + val anchorOutputDatatypes = unionLoop.anchor.output.map(_.dataType) + val recursiveTermOutputDatatypes = unionLoop.recursion.output.map(_.dataType) + + if (!anchorOutputDatatypes.zip(recursiveTermOutputDatatypes).forall { + case (anchorDT, recursionDT) => DataType.equalsStructurally(anchorDT, recursionDT, true) Review Comment: it's a bit confusing to have the inline pattern-match within an `if` like this, let's pull it out into a helper method ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala: ########## @@ -183,4 +184,52 @@ object ResolveWithCTE extends Rule[LogicalPlan] { columnNames.map(UnresolvedSubqueryColumnAliases(_, ref)).getOrElse(ref) } } + + /** + * Checks if data types of anchor and recursive terms of a recursive CTE definition match. + */ + def checkDataTypesAnchorAndRecursiveTerm(unionLoop: UnionLoop): Unit = { + val anchorOutputDatatypes = unionLoop.anchor.output.map(_.dataType) + val recursiveTermOutputDatatypes = unionLoop.recursion.output.map(_.dataType) + + if (!anchorOutputDatatypes.zip(recursiveTermOutputDatatypes).forall { + case (anchorDT, recursionDT) => DataType.equalsStructurally(anchorDT, recursionDT, true) + }) { + throw new AnalysisException( + errorClass = "INVALID_RECURSIVE_REFERENCE.DATA_TYPE", + messageParameters = Map.empty) + } + } + + /** + * Throws error if self-reference is placed in places which are not allowed: + * right side of left outer/semi/anti joins, left side of right outer joins, + * in full outer joins and in aggregates + */ + def checkIfSelfReferenceIsPlacedCorrectly(unionLoop: UnionLoop): Unit = { + def unionLoopRefNotAllowedUnderCurrentNode(currentNode: LogicalPlan) : Unit = + currentNode.foreach { + case UnionLoopRef(unionLoop.id, _, _) => + throw new AnalysisException( + errorClass = "INVALID_RECURSIVE_REFERENCE.PLACE", + messageParameters = Map.empty) + case other => () + } + unionLoop.foreach { + case Join(left, right, LeftOuter, _, _) => + unionLoopRefNotAllowedUnderCurrentNode(right) + case Join(left, right, RightOuter, _, _) => + unionLoopRefNotAllowedUnderCurrentNode(left) + case Join(left, right, LeftSemi, _, _) => + unionLoopRefNotAllowedUnderCurrentNode(right) + case Join(left, right, LeftAnti, _, _) => + unionLoopRefNotAllowedUnderCurrentNode(right) + case Join(left, right, _, _, _) => + unionLoopRefNotAllowedUnderCurrentNode(left) + unionLoopRefNotAllowedUnderCurrentNode(right) + case Aggregate(_, _, child, _) => + unionLoopRefNotAllowedUnderCurrentNode(child) + case other => () Review Comment: you don't need the `()` since the function returns `Unit`, you can just remove it. Same elsewhere ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala: ########## @@ -423,4 +429,20 @@ object CTESubstitution extends Rule[LogicalPlan] { case _ => WithCTE(p, cteDefs) } } + + /** + * Counts number of self-references in a recursive CTE definition and throws an error + * if that number is bigger than 1. + */ + private def checkNumberOfSelfReferences(cteDef: CTERelationDef): Unit = { + val numOfSelfRef = cteDef.map[Boolean] { + case CTERelationRef(cteDef.id, _, _, _, _, true) => true + case other => false + }.count(_ == true) Review Comment: I think you can drop the `(_ == true)` or the `== true` and it will have the same result with simpler code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org