[ 
https://issues.apache.org/jira/browse/FLINK-5655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15868220#comment-15868220
 ] 

Fabian Hueske commented on FLINK-5655:
--------------------------------------

Hi [~wheat9], 

First of all, the term sliding window is a bit overloaded. What we call Sliding 
(Group) Window in the DataStream API is not the same as a Sliding (Row) Window 
(hence your examples (1) and (2) are not the semantically equivalent!) I think 
the sliding row window semantics are more common, but now we have the term in 
Flink coined differently and I don't think there is consensus to change that. 
For example this document from the Calcite community calls what Flink calls 
"Sliding Windows" "Hopping Windows": http://calcite.apache.org/docs/stream.html

Sorry for the confusion. 

It is possible to define sliding group windows (as described in FLIP-11) in 
SQL, however, it is a bit cumbersome.
For instance a sliding window of size 5 minutes that slides every minute could 
be defined as 

{code}
SELECT SUM(b) OVER (PARTITION BY a ORDER BY rowtime ROWS BETWEEN 5 PRECEDING 
AND CURRENT ROW)
FROM (
  SELECT a, SUM(b) AS b, MAX(rowtime) AS rowtime
  FROM tab
  GROUP BY a, FLOOR(rowtime TO MINUTE)
)
{code}

This query basically first computes partial aggregates using a tumbling window 
and then the final aggregates using a row window based on row counts.
However, there are a few issues with that. 
- we do not want to support event-time OVER ROW windows because they might 
cause very expensive updates for late data.
- this is very hard to translate to Flink's built-in windows (or the Table API 
windows) because the logic is distributed across several operators.

Hope this helps, Fabian


> Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> ----------------------------------------------------------------
>
>                 Key: FLINK-5655
>                 URL: https://issues.apache.org/jira/browse/FLINK-5655
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Fabian Hueske
>            Assignee: Shaoxuan Wang
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5658)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to