[
https://issues.apache.org/jira/browse/FLINK-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Fabian Hueske updated FLINK-6473:
---------------------------------
Description:
Add support for OVER windows for batch tables.
Since OVER windows are supported for streaming tables, this issue is not about
the API (which is available) but about adding the execution strategies and
translation for OVER windows on batch tables.
The feature could be implemented using the following plans
*UNBOUNDED OVER*
{code}
DataSet[Row] input = ...
DataSet[Row] result = input
.groupBy(partitionKeys)
.sortGroup(orderByKeys)
.reduceGroup(computeAggregates)
{code}
This implementation is quite straightforward because we don't need to retract
rows.
*BOUNDED OVER*
A bit more challenging are BOUNDED OVER windows, because we need to retract
values from aggregates and we don't want to store rows temporarily on the heap.
{code}
DataSet[Row] input = ...
DataSet[Row] sorted = input
.partitionByHash(partitionKey)
.sortPartition(partitionKeys, orderByKeys)
DataSet[Row] result = sorted.coGroup(sorted)
.where(partitionKey).equalTo(partitionKey)
.with(computeAggregates)
{code}
With this, the data set should be partitioned and sorted once. The sorted
{{DataSet}} would be consumed twice (the optimizer should inject a temp barrier
on one of the inputs to avoid a consumption deadlock). The {{CoGroupFunction}}
would accumulate new rows into the aggregates from one input and retract them
from the other. Since both input streams are properly sorted, this can happen
in a zigzag fashion. We need verify that the generated plan is was we want it
to be.
was:
Add support for OVER windows for batch tables.
Since OVER windows are supported for streaming tables, this issue is not about
the API (which is available) but about adding the execution strategies and
translation for OVER windows on batch tables.
> Add OVER window support for batch tables
> ----------------------------------------
>
> Key: FLINK-6473
> URL: https://issues.apache.org/jira/browse/FLINK-6473
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Fabian Hueske
> Priority: Major
>
> Add support for OVER windows for batch tables.
> Since OVER windows are supported for streaming tables, this issue is not
> about the API (which is available) but about adding the execution strategies
> and translation for OVER windows on batch tables.
> The feature could be implemented using the following plans
> *UNBOUNDED OVER*
> {code}
> DataSet[Row] input = ...
> DataSet[Row] result = input
> .groupBy(partitionKeys)
> .sortGroup(orderByKeys)
> .reduceGroup(computeAggregates)
> {code}
> This implementation is quite straightforward because we don't need to retract
> rows.
> *BOUNDED OVER*
> A bit more challenging are BOUNDED OVER windows, because we need to retract
> values from aggregates and we don't want to store rows temporarily on the
> heap.
> {code}
> DataSet[Row] input = ...
> DataSet[Row] sorted = input
> .partitionByHash(partitionKey)
> .sortPartition(partitionKeys, orderByKeys)
> DataSet[Row] result = sorted.coGroup(sorted)
> .where(partitionKey).equalTo(partitionKey)
> .with(computeAggregates)
> {code}
> With this, the data set should be partitioned and sorted once. The sorted
> {{DataSet}} would be consumed twice (the optimizer should inject a temp
> barrier on one of the inputs to avoid a consumption deadlock). The
> {{CoGroupFunction}} would accumulate new rows into the aggregates from one
> input and retract them from the other. Since both input streams are properly
> sorted, this can happen in a zigzag fashion. We need verify that the
> generated plan is was we want it to be.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)