vladimirg-db commented on code in PR #50548:
URL: https://github.com/apache/spark/pull/50548#discussion_r2053533428


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -234,9 +228,9 @@ object AnalysisContext {
     try f finally { set(originContext) }
   }
 
-  def withOuterPlan[A](outerPlan: LogicalPlan)(f: => A): A = {
+  def withOuterPlans[A](outerPlans: Seq[LogicalPlan])(f: => A): A = {

Review Comment:
   I would leave the signature as it was `withOuterPlan(...)` to relieve the 
client code of the burden of updating this stack.



##########
sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out:
##########
@@ -1209,7 +1201,7 @@ Project [c1#x, c2#x, count(1)#xL]
 SELECT * FROM t1, LATERAL (SELECT * FROM (SELECT COUNT(*) cnt FROM t2 WHERE 
t1.c1 = t2.c1) WHERE cnt = c1 - 1)
 -- !query analysis
 Project [c1#x, c2#x, cnt#xL]
-+- LateralJoin lateral-subquery#x [c1#x && c1#x], Inner
++- LateralJoin lateral-subquery#x [c1#x], Inner

Review Comment:
   Why do we get changes in the existing golden files without nested 
correlation?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -228,6 +228,67 @@ trait CheckAnalysis extends LookupCatalog with 
QueryErrorsBase with PlanToString
     }
   }
 
+  def checkNoNestedOuterReferencesInMainQuery(plan: LogicalPlan): Unit = {

Review Comment:
   I wonder if we can move this all to `ValidateSubqueryExpression`?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2309,6 +2316,27 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
    * Note: CTEs are handled in CTESubstitution.
    */
   object ResolveSubquery extends Rule[LogicalPlan] {
+
+    /**
+     * Returns the outer scope attributes referenced in the subquery 
expressions
+     *  in current plan and the children of the current plan.
+     */
+    private def getOuterAttrsNeedToBePropagated(plan: LogicalPlan): 
Seq[Expression] = {

Review Comment:
   It feels like this method solves the same problem as 
`SubExprUtils.getOuterReferences`. Can we instead update 
`SubExprUtils.getOuterReferences` to do that?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2320,20 +2348,57 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
         e: SubqueryExpression,
         outer: LogicalPlan)(
         f: (LogicalPlan, Seq[Expression]) => SubqueryExpression): 
SubqueryExpression = {
-      val newSubqueryPlan = AnalysisContext.withOuterPlan(outer) {
-        executeSameContext(e.plan)
+      val outerPlanContext = AnalysisContext.get.outerPlans
+      val newSubqueryPlan = if (outerPlanContext.isDefined &&
+        // We don't allow lateral subquery having nested correlation
+        !e.isInstanceOf[LateralSubquery]

Review Comment:
   Ok, so `LateralSubquery` cannot reference nested scopes. But can the 
subqueries below the `LateralSubquery` reference attributes above that 
`LateralSubquery`?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -228,6 +228,67 @@ trait CheckAnalysis extends LookupCatalog with 
QueryErrorsBase with PlanToString
     }
   }
 
+  def checkNoNestedOuterReferencesInMainQuery(plan: LogicalPlan): Unit = {
+    def hasOuterScopeAttrsInSubqueryExpression(expr: Expression): Boolean = {
+      expr.exists {
+        case subExpr: SubqueryExpression if 
subExpr.getOuterScopeAttrs.nonEmpty => true
+        case _ => false
+      }
+    }
+
+    def getOuterScopeAttrsFromSubqueryExpression(
+        plan: LogicalPlan): Seq[(SubqueryExpression, AttributeSet)] = {
+      val res = plan.expressions.flatMap {
+        expr => expr.collect {
+          case subExpr: SubqueryExpression if 
subExpr.getOuterScopeAttrs.nonEmpty =>
+            (subExpr, subExpr.getOuterScopeAttrs)
+        }
+      }
+      res.map {
+        case (subExpr, nestedOuterExprs) =>
+          val attrs = nestedOuterExprs.collect {
+            case a: AttributeReference => a
+          }
+          (subExpr, AttributeSet(attrs))
+      }
+    }

Review Comment:
   Let's add some newlines.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -1799,17 +1793,30 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
           s.expand(plan, resolver)
         } catch {
           case e: AnalysisException =>
-            AnalysisContext.get.outerPlan.map {
-              // Only Project, Aggregate, CollectMetrics can host star 
expressions.
-              case u @ (_: Project | _: Aggregate | _: CollectMetrics) =>
-                Try(s.expand(u.children.head, resolver)) match {
-                  case Success(expanded) => expanded.map(wrapOuterReference)
-                  case Failure(_) => throw e
-                }
-              // Do not use the outer plan to resolve the star expression
-              // since the star usage is invalid.
-              case _ => throw e
-            }.getOrElse { throw e }
+            val outerPlans =
+              if (AnalysisContext.get.outerPlans.isDefined) {
+                AnalysisContext.get.outerPlans.get
+              } else {
+                Seq.empty[LogicalPlan]
+              }
+            val success = outerPlans.flatMap { plan =>

Review Comment:
   Can you add a SQL example to the scaladoc of this function when a star is 
expanded with nested correlation?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DynamicPruning.scala:
##########
@@ -67,6 +68,17 @@ case class DynamicPruningSubquery(
     copy()
   }
 
+  override def withNewOuterScopeAttrs(
+    outerScopeAttrs: Seq[Expression]
+  ): DynamicPruningSubquery = {
+    // DynamicPruningSubquery should not have outer scope attrs
+    if (outerScopeAttrs.nonEmpty) {
+      throw SparkException.internalError(
+        "DynamicPruningSubquery should not have outer scope attributes.")
+    }
+    copy()

Review Comment:
   Why do we copy without changing any fields?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to