Github user scwf commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5604#discussion_r29730645
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
    @@ -529,6 +532,203 @@ class Analyzer(
               makeGeneratorOutput(p.generator, p.generatorOutput), p.child)
         }
       }
    +
    +  /**
    +   * Extracts [[WindowExpression]]s from the projectList of a [[Project]] 
operator and
    +   * aggregateExpressions of an [[Aggregate]] operator and creates 
individual [[Window]]
    +   * operators for every distinct [[WindowSpecDefinition]].
    +   *
    +   * This rule handles three cases:
    +   *  - A [[Project]] having [[WindowExpression]]s in its projectList;
    +   *  - An [[Aggregate]] having [[WindowExpression]]s in its 
aggregateExpressions.
    +   *  - An [[Filter]]->[[Aggregate]] pattern representing GROUP BY with a 
HAVING
    +   *    clause and the [[Aggregate]] has [[WindowExpression]]s in its 
aggregateExpressions.
    +   * Note: If there is a GROUP BY clause in the query, aggregations and 
corresponding
    +   * filters (expressions in the HAVING clause) should be evaluated before 
any
    +   * [[WindowExpression]]. If a query has SELECT DISTINCT, the DISTINCT 
part should be
    +   * evaluated after all [[WindowExpression]]s.
    +   *
    +   * For every case, the transformation works as follows:
    +   * 1. For a list of [[Expression]]s (a projectList or an 
aggregateExpressions), partitions
    +   *    it two lists of [[Expression]]s, one for all [[WindowExpression]]s 
and another for
    +   *    all regular expressions.
    +   * 2. For all [[WindowExpression]]s, groups them based on their 
[[WindowSpecDefinition]]s.
    +   * 3. For every distinct [[WindowSpecDefinition]], creates a [[Window]] 
operator and inserts
    +   *    it into the plan tree.
    +   */
    +  object ExtractWindowExpressions extends Rule[LogicalPlan] {
    +    def hasWindowFunction(projectList: Seq[NamedExpression]): Boolean =
    +      projectList.exists(hasWindowFunction)
    +
    +    def hasWindowFunction(expr: NamedExpression): Boolean = {
    +      expr.find {
    +        case window: WindowExpression => true
    +        case _ => false
    +      }.isDefined
    +    }
    +
    +    /**
    +     * From a Seq of [[NamedExpression]]s, extract window expressions and
    +     * other regular expressions.
    +     */
    +    def extract(
    +        expressions: Seq[NamedExpression]): (Seq[NamedExpression], 
Seq[NamedExpression]) = {
    +      // First, we simple partition the input expressions to two part, one 
having
    +      // WindowExpressions and another one without WindowExpressions.
    +      val (windowExpressions, regularExpressions) = 
expressions.partition(hasWindowFunction)
    +
    +      // Then, we need to extract those regular expressions used in the 
WindowExpression.
    +      // For example, when we have col1 - Sum(col2 + col3) OVER (PARTITION 
BY col4 ORDER BY col5),
    +      // we need to make sure that col1 to col5 are all projected from the 
child of the Window
    +      // operator.
    +      val extractedExprBuffer = new ArrayBuffer[NamedExpression]()
    +      def extractExpr(expr: Expression): Expression = expr match {
    +        case ne: NamedExpression =>
    +          // If a named expression is not in regularExpressions, add 
extract it and replace it
    +          // with an AttributeReference.
    +          val missingExpr =
    +            AttributeSet(Seq(expr)) -- (regularExpressions ++ 
extractedExprBuffer)
    +          if (missingExpr.nonEmpty) {
    +            extractedExprBuffer += ne
    +          }
    +          ne.toAttribute
    +        case e: Expression if e.foldable =>
    +          e // No need to create an attribute reference if it will be 
evaluated as a Literal.
    +        case e: Expression =>
    +          // For other expressions, we extract it and replace it with an 
AttributeReference (with
    +          // an interal column name, e.g. "_w0").
    +          val withName = Alias(e, s"_w${extractedExprBuffer.length}")()
    +          extractedExprBuffer += withName
    +          withName.toAttribute
    +      }
    +
    +      // Now, we extract expressions from windowExpressions by using 
extractExpr.
    +      val newWindowExpressions = windowExpressions.map {
    +        _.transform {
    +          // Extracts children expressions of a WindowFunction (input 
parameters of
    +          // a WindowFunction).
    +          case wf : WindowFunction =>
    +            val newChildren = wf.children.map(extractExpr(_))
    +            wf.withNewChildren(newChildren)
    +
    +          // Extracts expressions from the partition spec and order spec.
    +          case wsc @ WindowSpecDefinition(partitionSpec, orderSpec, _) =>
    +            val newPartitionSpec = partitionSpec.map(extractExpr(_))
    +            val newOrderSpec = orderSpec.map { so =>
    +              val newChild = extractExpr(so.child)
    +              so.copy(child = newChild)
    +            }
    +            wsc.copy(partitionSpec = newPartitionSpec, orderSpec = 
newOrderSpec)
    +
    +          // Extracts AggregateExpression. For example, for SUM(x) - 
Sum(y) OVER (...),
    +          // we need to extract SUM(x).
    +          case agg: AggregateExpression =>
    +            val withName = Alias(agg, s"_w${extractedExprBuffer.length}")()
    +            extractedExprBuffer += withName
    +            withName.toAttribute
    +        }.asInstanceOf[NamedExpression]
    +      }
    +
    +      (newWindowExpressions, regularExpressions ++ extractedExprBuffer)
    +    }
    +
    +    /**
    +     * Adds operators for Window Expressions. Every Window operator 
handles a single Window Spec.
    +     */
    +    def addWindow(windowExpressions: Seq[NamedExpression], child: 
LogicalPlan): LogicalPlan = {
    +      // First, we group window expressions based on their Window Spec.
    +      val groupedWindowExpression = windowExpressions.groupBy { expr =>
    +        val windowExpression = expr.find {
    +          case window: WindowExpression => true
    +          case other => false
    +        }.map(_.asInstanceOf[WindowExpression].windowSpec)
    +        windowExpression.getOrElse(
    +          failAnalysis(s"$windowExpressions does not have any 
WindowExpression."))
    +      }.toSeq
    +
    +      // For every Window Spec, we add a Window operator and set 
currentChild as the child of it.
    +      var currentChild = child
    +      var i = 0
    +      while (i < groupedWindowExpression.size) {
    +        val (windowSpec, windowExpressions) = groupedWindowExpression(i)
    +        // Set currentChild to the newly created Window operator.
    +        currentChild = Window(currentChild.output, windowExpressions, 
windowSpec, currentChild)
    +
    +        // Move to next WindowExpression.
    +        i += 1
    +      }
    +
    +      // We return the top operator.
    +      currentChild
    +    }
    +
    +    // We have to use transformDown at here to make sure the rule of
    +    // "Aggregate with Having clause" will be triggered.
    +    def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
    +      // Lookup WindowSpecDefinitions. This rule works with unresolved 
children.
    +      case WithWindowDefinition(windowDefinitions, child) =>
    +        child.transform {
    +          case plan => plan.transformExpressions {
    +            case UnresolvedWindowExpression(c, 
WindowSpecReference(windowName)) =>
    +              val errorMessage =
    +                s"Window specification $windowName is not defined in the 
WINDOW clause."
    +              val windowSpecDefinition =
    +                windowDefinitions
    +                  .get(windowName)
    +                  .getOrElse(failAnalysis(errorMessage))
    +              WindowExpression(c, windowSpecDefinition)
    +          }
    +        }
    +
    +      // Aggregate with Having clause. This rule works with an unresolved 
Aggregate because
    +      // a resolved Aggregate will not have Window Functions.
    +      case f @ Filter(condition, a @ Aggregate(groupingExprs, 
aggregateExprs, child))
    +        if child.resolved &&
    +           hasWindowFunction(aggregateExprs) &&
    +           !a.expressions.exists(!_.resolved) =>
    --- End diff --
    
    minor:  a.expressions.forall(_.resolved)  more readable


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to