vladimirg-db commented on code in PR #50548: URL: https://github.com/apache/spark/pull/50548#discussion_r2053533428
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala: ########## @@ -234,9 +228,9 @@ object AnalysisContext { try f finally { set(originContext) } } - def withOuterPlan[A](outerPlan: LogicalPlan)(f: => A): A = { + def withOuterPlans[A](outerPlans: Seq[LogicalPlan])(f: => A): A = { Review Comment: I would leave the signature as it was `withOuterPlan(...)` to relieve the client code of the burden of updating this stack. ########## sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out: ########## @@ -1209,7 +1201,7 @@ Project [c1#x, c2#x, count(1)#xL] SELECT * FROM t1, LATERAL (SELECT * FROM (SELECT COUNT(*) cnt FROM t2 WHERE t1.c1 = t2.c1) WHERE cnt = c1 - 1) -- !query analysis Project [c1#x, c2#x, cnt#xL] -+- LateralJoin lateral-subquery#x [c1#x && c1#x], Inner ++- LateralJoin lateral-subquery#x [c1#x], Inner Review Comment: Why do we get changes in the existing golden files without nested correlation? ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala: ########## @@ -228,6 +228,67 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString } } + def checkNoNestedOuterReferencesInMainQuery(plan: LogicalPlan): Unit = { Review Comment: I wonder if we can move this all to `ValidateSubqueryExpression`? ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala: ########## @@ -2309,6 +2316,27 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor * Note: CTEs are handled in CTESubstitution. */ object ResolveSubquery extends Rule[LogicalPlan] { + + /** + * Returns the outer scope attributes referenced in the subquery expressions + * in current plan and the children of the current plan. + */ + private def getOuterAttrsNeedToBePropagated(plan: LogicalPlan): Seq[Expression] = { Review Comment: It feels like this method solves the same problem as `SubExprUtils.getOuterReferences`. Can we instead update `SubExprUtils.getOuterReferences` to do that? ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala: ########## @@ -2320,20 +2348,57 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor e: SubqueryExpression, outer: LogicalPlan)( f: (LogicalPlan, Seq[Expression]) => SubqueryExpression): SubqueryExpression = { - val newSubqueryPlan = AnalysisContext.withOuterPlan(outer) { - executeSameContext(e.plan) + val outerPlanContext = AnalysisContext.get.outerPlans + val newSubqueryPlan = if (outerPlanContext.isDefined && + // We don't allow lateral subquery having nested correlation + !e.isInstanceOf[LateralSubquery] Review Comment: Ok, so `LateralSubquery` cannot reference nested scopes. But can the subqueries below the `LateralSubquery` reference attributes above that `LateralSubquery`? ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala: ########## @@ -228,6 +228,67 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString } } + def checkNoNestedOuterReferencesInMainQuery(plan: LogicalPlan): Unit = { + def hasOuterScopeAttrsInSubqueryExpression(expr: Expression): Boolean = { + expr.exists { + case subExpr: SubqueryExpression if subExpr.getOuterScopeAttrs.nonEmpty => true + case _ => false + } + } + + def getOuterScopeAttrsFromSubqueryExpression( + plan: LogicalPlan): Seq[(SubqueryExpression, AttributeSet)] = { + val res = plan.expressions.flatMap { + expr => expr.collect { + case subExpr: SubqueryExpression if subExpr.getOuterScopeAttrs.nonEmpty => + (subExpr, subExpr.getOuterScopeAttrs) + } + } + res.map { + case (subExpr, nestedOuterExprs) => + val attrs = nestedOuterExprs.collect { + case a: AttributeReference => a + } + (subExpr, AttributeSet(attrs)) + } + } Review Comment: Let's add some newlines. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala: ########## @@ -1799,17 +1793,30 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor s.expand(plan, resolver) } catch { case e: AnalysisException => - AnalysisContext.get.outerPlan.map { - // Only Project, Aggregate, CollectMetrics can host star expressions. - case u @ (_: Project | _: Aggregate | _: CollectMetrics) => - Try(s.expand(u.children.head, resolver)) match { - case Success(expanded) => expanded.map(wrapOuterReference) - case Failure(_) => throw e - } - // Do not use the outer plan to resolve the star expression - // since the star usage is invalid. - case _ => throw e - }.getOrElse { throw e } + val outerPlans = + if (AnalysisContext.get.outerPlans.isDefined) { + AnalysisContext.get.outerPlans.get + } else { + Seq.empty[LogicalPlan] + } + val success = outerPlans.flatMap { plan => Review Comment: Can you add a SQL example to the scaladoc of this function when a star is expanded with nested correlation? ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DynamicPruning.scala: ########## @@ -67,6 +68,17 @@ case class DynamicPruningSubquery( copy() } + override def withNewOuterScopeAttrs( + outerScopeAttrs: Seq[Expression] + ): DynamicPruningSubquery = { + // DynamicPruningSubquery should not have outer scope attrs + if (outerScopeAttrs.nonEmpty) { + throw SparkException.internalError( + "DynamicPruningSubquery should not have outer scope attributes.") + } + copy() Review Comment: Why do we copy without changing any fields? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org