vladimirg-db commented on code in PR #50548:
URL: https://github.com/apache/spark/pull/50548#discussion_r2036222349


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2319,18 +2380,56 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
      */
     private def resolveSubQueries(plan: LogicalPlan, outer: LogicalPlan): 
LogicalPlan = {
       
plan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION), 
ruleId) {
-        case s @ ScalarSubquery(sub, _, exprId, _, _, _, _) if !sub.resolved =>
-          resolveSubQuery(s, outer)(ScalarSubquery(_, _, exprId))
-        case e @ Exists(sub, _, exprId, _, _) if !sub.resolved =>
-          resolveSubQuery(e, outer)(Exists(_, _, exprId))
-        case InSubquery(values, l @ ListQuery(_, _, exprId, _, _, _))
+        // There are four kinds of outer references here:
+        // 1. Outer references which are newly introduced in the subquery `res`
+        //  which can be resolved in current `plan`.
+        //  It is extracted by `SubExprUtils.getOuterReferences(res.plan)` and
+        //  stored among res.outerAttrs
+        // 2. Outer references which are newly introduced in the subquery `res`
+        //  which cannot be resolved in current `plan`
+        //  It is extracted by `SubExprUtils.getOuterReferences(res.plan)` with
+        //  `getNestedOuterReferences(res, plan)` filter and stored in
+        //  res.nestedOuterAttrs
+        // 3. Outer references which are introduced by nested subquery within 
`res.plan`
+        //  which can be resolved in current `plan`
+        //  It is extracted by `getOuterAttrsNeedToBePropagated(res.plan)`, 
filtered
+        //  by `plan.inputSet.contains(_)`, need to be stored in res.outerAttrs
+        // 4. Outer references which are introduced by nested subquery within 
`res.plan`
+        //  which cannot be resolved in current `plan`
+        //  It is extracted by `getOuterAttrsNeedToBePropagated(res.plan)`, 
filtered
+        //  by `!plan.inputSet.contains(_)`, need to be stored in
+        //  res.outerAttrs and res.nestedOuterAttrs
+        case s @ ScalarSubquery(sub, _, _, exprId, _, _, _, _) if 
!sub.resolved =>
+          val res = resolveSubQuery(s, outer)(ScalarSubquery(_, _, Seq.empty, 
exprId))
+          val nestedOuterReferences = getNestedOuterReferences(res, plan)
+          res.withNewNestedOuterAttrs(nestedOuterReferences)
+        case e @ Exists(sub, _, _, exprId, _, _) if !sub.resolved =>
+          val res = resolveSubQuery(e, outer)(Exists(_, _, Seq.empty, exprId))
+          val nestedOuterReferences = getNestedOuterReferences(res, plan)
+          res.withNewNestedOuterAttrs(nestedOuterReferences)
+        case InSubquery(values, l)
             if values.forall(_.resolved) && !l.resolved =>
           val expr = resolveSubQuery(l, outer)((plan, exprs) => {
-            ListQuery(plan, exprs, exprId, plan.output.length)
-          })
-          InSubquery(values, expr.asInstanceOf[ListQuery])
-        case s @ LateralSubquery(sub, _, exprId, _, _) if !sub.resolved =>
-          resolveSubQuery(s, outer)(LateralSubquery(_, _, exprId))
+            ListQuery(plan, exprs, Seq.empty, l.exprId, plan.output.length)
+          }).asInstanceOf[ListQuery]
+          val nestedOuterReferences = getNestedOuterReferences(expr, plan)
+          val newExpr = expr.withNewNestedOuterAttrs(nestedOuterReferences)
+          InSubquery(values, newExpr)
+        case s @ LateralSubquery(sub, _, _, exprId, _, _) if !sub.resolved =>
+          val res = resolveSubQuery(s, outer)(LateralSubquery(_, _, Seq.empty, 
exprId))
+          val nestedOuterReferences = getNestedOuterReferences(res, plan)
+          if (nestedOuterReferences.nonEmpty) {
+            // Outer references within lateral subquery can only refer to 
attributes
+            // in the containing plan of lateral subquery. Nested outer 
references
+            // are not allowed.
+            throw new SparkIllegalArgumentException(
+              errorClass = "FIELD_NOT_FOUND",
+              messageParameters = Map(
+                "fieldName" -> toSQLId(nestedOuterReferences.head.prettyName),
+                "fields" -> plan.inputSet.map(f => 
toSQLId(f.name)).mkString(", "))
+            )
+          }
+          res

Review Comment:
   Do we need to extract nested outer aggs in `UpdateOuterReferences`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to