Github user yhuai commented on the pull request:
https://github.com/apache/spark/pull/5604#issuecomment-98883070
Adding more information about the implementation...
This PR is adding the support of window functions to Spark SQL
(specifically `OVER` and `WINDOW` clause). For every expression having a `OVER`
clause, we use a `WindowExpression` as the container of a `WindowFunction` and
the corresponding `WindowSpecDefinition` (the definition of a window frame,
i.e. partition specification, order specification, and frame specification
appearing in a `OVER` clause).
## Implementation
The high level work flow of the implementation is described as follows.
* Query parsing: In the query parse process, all `WindowExpression`s are
originally placed in the `projectList` of a `Project` operator or the
`aggregateExpressions` of an `Aggregate` operator. It makes our changes to
simple and keep all of parsing rules for window functions at a single place
(`nodesToWindowSpecification`). For the `WINDOW`clause in a query, we use a
`WithWindowDefinition` as the container as the mapping from the name of a
window specification to a `WindowSpecDefinition`. This changes is similar with
our common table expression support.
* Analysis: The query analysis process has three steps for window functions.
* Resolve all `WindowSpecReference`s by replacing them with
`WindowSpecReference`s according to the mapping table stored in the node of
`WithWindowDefinition`.
* Resolve `WindowFunction`s in the `projectList` of a `Project` operator
or the `aggregateExpressions` of an `Aggregate` operator. For this PR, we use
Hive's functions for window functions because we will have a major refactoring
of our internal UDAFs and it is better to switch our UDAFs after that
refactoring work.
* Once we have resolved all `WindowFunction`s, we will use
`ResolveWindowFunction` to extract `WindowExpressions` from `projectList` and
`aggregateExpressions` and then create a `Window` operator for every distinct
`WindowSpecDefinition`. With this choice, at the execution time, we can rely on
the `Exchange` operator to do all of work on reorganizing the table and we do
not need to worry about it in the physical `Window` operator. An example
analyzed plan is shown as follows
```
sql("""
SELECT
year, country, product, sales,
avg(sales) over(partition by product) avg_product,
sum(sales) over(partition by country) sum_country
FROM sales
ORDER BY year, country, product
""").explain(true)
== Analyzed Logical Plan ==
Sort [year#34 ASC,country#35 ASC,product#36 ASC], true
Project
[year#34,country#35,product#36,sales#37,avg_product#27,sum_country#28]
Window [year#34,country#35,product#36,sales#37,avg_product#27],
[HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum(sales#37)
WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND
UNBOUNDED FOLLOWING AS sum_country#28], WindowSpecDefinition [country#35], [],
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
Window [year#34,country#35,product#36,sales#37],
[HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage(sales#37)
WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND
UNBOUNDED FOLLOWING AS avg_product#27], WindowSpecDefinition [product#36], [],
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
Project [year#34,country#35,product#36,sales#37]
MetastoreRelation default, sales, None
```
* Query planning: In the process of query planning, we simple generate the
physical `Window` operator based on the logical `Window` operator. Then, to
prepare the `executedPlan`, the `EnsureRequirements`
rule will add `Exchange` and `Sort` operators if necessary. The
`EnsureRequirements` rule will analyze the data properties and try to not add
unnecessary shuffle and sort. The physical plan for the above example query is
shown below.
```
== Physical Plan ==
Sort [year#34 ASC,country#35 ASC,product#36 ASC], true
Exchange (RangePartitioning [year#34 ASC,country#35 ASC,product#36 ASC],
200), []
Window [year#34,country#35,product#36,sales#37,avg_product#27],
[HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum(sales#37)
WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND
UNBOUNDED FOLLOWING AS sum_country#28], WindowSpecDefinition [country#35], [],
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
Exchange (HashPartitioning [country#35], 200), [country#35 ASC]
Window [year#34,country#35,product#36,sales#37],
[HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage(sales#37)
WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND
UNBOUNDED FOLLOWING AS avg_product#27], WindowSpecDefinition [product#36], [],
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
Exchange (HashPartitioning [product#36], 200), [product#36 ASC]
HiveTableScan [year#34,country#35,product#36,sales#37],
(MetastoreRelation default, sales, None), None
```
* Execution time: At execution time, a physical `Window` operator buffers
all rows in a partition specified in the partition spec of a `OVER` clause. If
necessary, it also maintains a sliding window frame. The current implementation
tries to buffer the input parameters of a window function according to the
window frame to avoid evaluating a row multiple times.
## Future work
Here are three improvements that are not hard to add:
* Taking advantage of the window frame specification to reduce the number
of rows buffered in the physical `Window` operator. For some cases, we only
need to buffer the rows appearing in the sliding window. But for other cases,
we will not be able to reduce the number of rows buffered (e.g. `ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING`).
* When a`RAGEN` frame is used, for `<value> PRECEDING` and `<value>
FOLLOWING`, it will be great if the `<value>` part is an expression (we can
start with `Literal`). So, when the data type of `ORDER BY` expression is a
`FractionalType`, we can support `FractionalType` as the type `<value>`
(`<value>` still needs to be evaluated as a positive value).
* When a`RAGEN` frame is used, we need to support `DateType` and
`TimestampType` as the data type of the expression appearing in the order
specification. Then, the `<value>` part of `<value> PRECEDING` and `<value>
FOLLOWING` can support interval types (once we support them).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]