Github user frreiss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13155#discussion_r63954767
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
    @@ -1648,16 +1648,56 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] {
       }
     
       /**
    +   * Statically evaluate an expression containing one or more aggregates 
on an empty input.
    +   */
    +  private def evalOnZeroTups(expr : Expression) : Option[Any] = {
    +    // AggregateExpressions are Unevaluable, so we need to replace all 
aggregates
    +    // in the expression with the value they would return for zero input 
tuples.
    +    val rewrittenExpr = expr transform {
    +      case a @ AggregateExpression(aggFunc, _, _, resultId) =>
    +        val resultLit = aggFunc.defaultResult match {
    +          case Some(lit) => lit
    +          case None => Literal.default(NullType)
    +        }
    +        Alias(resultLit, "aggVal") (exprId = resultId)
    +    }
    +    Option(rewrittenExpr.eval())
    +  }
    +
    +  /**
        * Construct a new child plan by left joining the given subqueries to a 
base plan.
        */
       private def constructLeftJoins(
           child: LogicalPlan,
           subqueries: ArrayBuffer[ScalarSubquery]): LogicalPlan = {
         subqueries.foldLeft(child) {
           case (currentChild, ScalarSubquery(query, conditions, _)) =>
    +        val aggOutputExpr = 
query.asInstanceOf[Aggregate].aggregateExpressions.head
    --- End diff --
    
    I came across an additional counterexample just now:
    ```sql
    select l.a from l where
            (select case when count(*) = 1 then null else count(*) from r where 
l.a = r.c) = 0
    ```
    This subquery returns null when count(*) on the inner query block is 1. The 
rewrite in this PR turns the overall query into:
    ```sql
    select * 
    from l left outer join 
        (select c, case when count(*) = 1 then null else count(*) as cnt from r 
group by c) sq
        on l.a = sq.c 
    where coalesce(sq.cnt, 0) = 0
    ```
    This result is incorrect; if exactly one tuple from R joins with a tuple of 
L, the query will return the L tuple, even though that tuple should not be 
returned.
    
    So, to summarize, the operators above the outer join need to be able to 
discern between *four* different cases:
    1. A tuple from the outer query block joins with one or more tuples in the 
subquery and produces an aggregate result that is *not* null
    2. A tuple from the outer query block joins with one or more tuples in the 
subquery and produces an aggregate result that *is* null
    3. A tuple from the outer query block does not join with any tuples in the 
subquery
    4. A tuple from the outer query block joins with tuples in the subquery and 
produces an aggregate result that is filtered out by a HAVING clause
    
    A single result column is simply not able to encode all four cases (or even 
the first three). The output of the outer join needs to have at least two 
columns to pass through enough information. 
    Here's a scheme that will work as far as I can see: The result of the outer 
join contains the correlation columns, plus a column `aggVal` for the aggregate 
value, plus an additional column `isFiltered`. The four cases above are encoded 
as follows:
    1. `aggVal is not null and isFiltered = false`
    2. `aggVal is null and isFiltered = false`
    3. `isFiltered is null`
    4. `isFiltered = true`
    
    Then the coalesce expression above the outer join turns into a CASE 
statement:
    ```sql
    case 
       when isFiltered is null then coalesce(aggVal, <SQ return value on empty 
input>)
       when isFiltered then null
       else aggVal
    ```
    
    Here's pseudocode for a rewrite that should produce the correct values of 
`isFiltered` and `aggVal`:
    ```
    if (subquery returns null when zero tuples join) 
    then
      Use the original rewrite.
    else 
      if (subquery has a Filter above the Aggregate node)
      then
        Replace the Filter node with a Project that computes the value of 
isFiltered.
        Rewrite nodes above the Filter node so they pass through the isFiltered 
column.
      else
         Add an isFiltered column with a hard-coded value of false to the top 
Project in the subquery.
      endif
      Create a left outer join between the outer query block and the rewritten 
subquery.
      Put the case statement in the previous listing into the Project operator 
above the outer join.
    endif
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to