Github user frreiss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13155#discussion_r63798715
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
    @@ -1648,16 +1648,56 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
       }
     
       /**
    +   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
    +   */
    +  private def evalOnZeroTups(expr : Expression) : Option[Any] = {
    +    // AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
    +    // in the expression with the value they would return for zero input 
tuples.
    +    val rewrittenExpr = expr transform {
    +      case a @ AggregateExpression(aggFunc, _, _, resultId) =>
    +        val resultLit = aggFunc.defaultResult match {
    +          case Some(lit) => lit
    +          case None => Literal.default(NullType)
    +        }
    +        Alias(resultLit, "aggVal") (exprId = resultId)
    +    }
    +    Option(rewrittenExpr.eval())
    +  }
    +
    +  /**
        * Construct a new child plan by left joining the given subqueries to a 
base plan.
        */
       private def constructLeftJoins(
           child: LogicalPlan,
           subqueries: ArrayBuffer[ScalarSubquery]): LogicalPlan = {
         subqueries.foldLeft(child) {
           case (currentChild, ScalarSubquery(query, conditions, _)) =>
    +        val aggOutputExpr = 
query.asInstanceOf[Aggregate].aggregateExpressions.head
    --- End diff --
    
    Update: The relevant upstream checks are in CheckAnalysis.checkAnalysis():
    ```scala
    case s @ ScalarSubquery(query, conditions, _) if conditions.nonEmpty =>
        ...
        def cleanQuery(p: LogicalPlan): LogicalPlan = p match {
          case SubqueryAlias(_, child) => cleanQuery(child)
          case Project(_, child) => cleanQuery(child)
          case child => child
        }
    
        cleanQuery(query) match {
          case a: Aggregate => checkAggregate(a)
          case Filter(_, a: Aggregate) => checkAggregate(a)
          case fail => failAnalysis(s"Correlated scalar subqueries must be 
Aggregated: $fail")
        }
    ```
    These checks will accept a chain of operators above the Aggregate that match
    the regex:
        (SubqueryAlias|Project)*(Filter)?
    
    It doesn't look like AstBuilder will generate a SubqueryAlias nested 
immediately inside a SubqueryAlias, but all other matches of the above regex 
are possible.
    
    I'll update `evalOnZeroTups()` above to rewrite combinations of Project, 
Filter, and Aggregate into an expression, then statically evaluate the 
expression.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to