Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/5604#issuecomment-98883070
  
    Adding more information about the implementation...
    
    This PR is adding the support of window functions to Spark SQL 
(specifically `OVER` and `WINDOW` clause). For every expression having a `OVER` 
clause, we use a `WindowExpression` as the container of a `WindowFunction` and 
the corresponding `WindowSpecDefinition` (the definition of a window frame, 
i.e. partition specification, order specification, and frame specification 
appearing in a `OVER` clause). 
    
    ## Implementation
    The high level work flow of the implementation is described as follows.
    
    * Query parsing: In the query parse process, all `WindowExpression`s are 
originally placed in the `projectList` of a `Project` operator or the 
`aggregateExpressions` of an `Aggregate` operator. It makes our changes to 
simple and keep all of parsing rules for window functions at a single place 
(`nodesToWindowSpecification`). For the `WINDOW`clause in a query, we use a 
`WithWindowDefinition` as the container as the mapping from the name of a 
window specification to a `WindowSpecDefinition`. This changes is similar with 
our common table expression support.
    
    * Analysis: The query analysis process has three steps for window functions.
      * Resolve all `WindowSpecReference`s by replacing them with 
`WindowSpecReference`s according to the mapping table stored in the node of 
`WithWindowDefinition`.
      * Resolve `WindowFunction`s in the `projectList` of a `Project` operator 
or the `aggregateExpressions` of an `Aggregate` operator. For this PR, we use 
Hive's functions for window functions because we will have a major refactoring 
of our internal UDAFs and it is better to switch our UDAFs after that 
refactoring work. 
      * Once we have resolved all `WindowFunction`s, we will use 
`ResolveWindowFunction` to extract `WindowExpressions` from `projectList` and 
`aggregateExpressions` and then create a `Window` operator for every distinct 
`WindowSpecDefinition`. With this choice, at the execution time, we can rely on 
the `Exchange` operator to do all of work on reorganizing the table and we do 
not need to worry about it in the physical `Window` operator. An example 
analyzed plan is shown as follows
    ```
    sql("""
    SELECT
      year, country, product, sales,
      avg(sales) over(partition by product) avg_product,
      sum(sales) over(partition by country) sum_country
    FROM sales
    ORDER BY year, country, product
    """).explain(true)
    
    == Analyzed Logical Plan ==
    Sort [year#34 ASC,country#35 ASC,product#36 ASC], true
     Project 
[year#34,country#35,product#36,sales#37,avg_product#27,sum_country#28]
      Window [year#34,country#35,product#36,sales#37,avg_product#27], 
[HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum(sales#37)
 WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING AS sum_country#28], WindowSpecDefinition [country#35], [], 
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
       Window [year#34,country#35,product#36,sales#37], 
[HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage(sales#37)
 WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING AS avg_product#27], WindowSpecDefinition [product#36], [], 
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
        Project [year#34,country#35,product#36,sales#37]
         MetastoreRelation default, sales, None
    ```
    
    * Query planning: In the process of query planning, we simple generate the 
physical `Window` operator based on the logical `Window` operator. Then, to 
prepare the `executedPlan`, the `EnsureRequirements`
    rule will add `Exchange` and `Sort` operators if necessary. The 
`EnsureRequirements` rule will analyze the data properties and try to not add 
unnecessary shuffle and sort. The physical plan for the above example query is 
shown below.
    ```
    == Physical Plan ==
    Sort [year#34 ASC,country#35 ASC,product#36 ASC], true
     Exchange (RangePartitioning [year#34 ASC,country#35 ASC,product#36 ASC], 
200), []
      Window [year#34,country#35,product#36,sales#37,avg_product#27], 
[HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum(sales#37)
 WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING AS sum_country#28], WindowSpecDefinition [country#35], [], 
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
       Exchange (HashPartitioning [country#35], 200), [country#35 ASC]
        Window [year#34,country#35,product#36,sales#37], 
[HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage(sales#37)
 WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING AS avg_product#27], WindowSpecDefinition [product#36], [], 
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
         Exchange (HashPartitioning [product#36], 200), [product#36 ASC]
          HiveTableScan [year#34,country#35,product#36,sales#37], 
(MetastoreRelation default, sales, None), None
    ```
    
    * Execution time: At execution time, a physical `Window` operator buffers 
all rows in a partition specified in the partition spec of a `OVER` clause. If 
necessary, it also maintains a sliding window frame. The current implementation 
tries to buffer the input parameters of a window function according to the 
window frame to avoid evaluating a row multiple times. 
    
    ## Future work
    Here are three improvements that are not hard to add:
    * Taking advantage of the window frame specification to reduce the number 
of rows buffered in the physical `Window` operator. For some cases, we only 
need to buffer the rows appearing in the sliding window. But for other cases, 
we will not be able to reduce the number of rows buffered (e.g. `ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING`). 
    
    * When a`RAGEN` frame is used, for `<value> PRECEDING` and `<value> 
FOLLOWING`, it will be great if the `<value>` part is an expression (we can 
start with `Literal`). So, when the data type of `ORDER BY` expression is a 
`FractionalType`, we can support `FractionalType` as the type `<value>` 
(`<value>` still needs to be evaluated as a positive value).
    
    * When a`RAGEN` frame is used, we need to support `DateType` and 
`TimestampType` as the data type of the expression appearing in the order 
specification. Then, the `<value>` part of `<value> PRECEDING` and `<value> 
FOLLOWING` can support interval types (once we support them).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to