[ https://issues.apache.org/jira/browse/FLINK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Fabian Hueske closed FLINK-6249. -------------------------------- Resolution: Duplicate Fix Version/s: 1.6.0 Implemented via FLINK-8689 > Distinct Aggregates for OVER window > ----------------------------------- > > Key: FLINK-6249 > URL: https://issues.apache.org/jira/browse/FLINK-6249 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Affects Versions: 1.3.0 > Reporter: radu > Priority: Major > Labels: features, patch > Fix For: 1.6.0 > > > Time target: ProcTime/EventTime > SQL targeted query examples: > ---------------------------- > Q1. Boundaries are expressed in windows and meant for the elements to be > aggregated > Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.2. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() RANGE BETWEEN > INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` > Q1.3. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.4. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() RANGE BETWEEN > INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` > General comments: > - DISTINCT operation makes sense only within the context of windows or some > bounded defined structures. Otherwise the operation would keep an infinite > amount of data to ensure uniqueness and would not trigger for certain > functions (e.g. aggregates) > - We can consider as a sub-JIRA issue the implementation of DISTINCT for > UNBOUND sliding windows. However, there would be no control over the data > structure to keep seen data (to check it is not re-process). -> This needs to > be decided if we want to support it (to create appropriate JIRA issues) > => We will open sub-JIRA issues to extend the current functionality of > aggregates for the DISTINCT CASE > => Aggregations over distinct elements without any boundary (i.e. within > SELECT clause) do not make sense just as aggregations do not make sense > without groupings or windows. > Description: > ------------ > The DISTINCT operator requires processing the elements to ensure uniqueness. > Either that the operation is used for SELECT ALL distinct elements or for > applying typical aggregation functions over a set of elements, there is a > prior need of forming a collection of elements. > This brings the need of using windows or grouping methods. Therefore the > distinct function will be implemented within windows. Depending on the type > of window definition there are several options: > - Main Scope: If distinct is applied as in Q1 example for window > aggregations than either we extend the implementation with distinct > aggregates (less preferred) or extend the sliding window aggregates > implementation in the processFunction with distinction identification support > (preferred). The later option is preferred because a query can carry multiple > aggregates including multiple aggregates that have the distinct key word set > up. Implementing the distinction between elements in the process function > avoid the need to multiply the data structure to mark what what was seen > across multiple aggregates. It also makes the implementation more robust and > resilient as we can keep the data structure for marking the seen elements in > a state (mapstate). > Functionality example > --------------------- > We exemplify below the functionality of the IN/Exists when working with > streams. > `Query: SELECT sum(DISTINCT a) OVER (ORDER BY procTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > ||Proctime||IngestionTime(Event)||Stream1||Q3|| > ||10:00:01| (ab,1)| 1 | > ||10:05:00| (aa,2)| 3 | > ||11:03:00| (aa,2)| 3 | > ||11:09:00| (aa,2 | 2 | > |...| > Implementation option > --------------------- > Considering that the behavior depends on over window behavior, the > implementation will be done by reusing the existing implementation of the > over window functions - done based on processFunction. As mentioned in the > description section, there are 2 options to consider: > 1) Using distinct within the aggregates implementation by extending with > distinct aggregates implementation the current aggregates in Flink. For this > we define additional JIRA issues for each implementation to support the > distinct keyword. > 2) Using distinct for selection within the process logic when calling the > aggregates. This requires a new implementation of the process Function used > for computing the aggregates. The processFunction will also carry the logic > of taking each element once. For this 2 options are possible. Option 1 (To > be used within the ProcessFunction) trades memory – and would require to > create a hashmap (e.g. mapstate) with binary values to mark if the event was > saw before. This will be created once per window and will be reused across > multiple distinct aggregates. Option 2 trades computation and would require > to sort the window contents and in case of identical elements to eliminate > them. The sorting can be done based on hash values in case the events are > non numeric or composite or do not possess an id to mark the uniqueness. > Option 2 is not preferred for incremental aggregates and should be consider > only if certain aggregates would require a window implementation that > recomputes everything from scratch. -- This message was sent by Atlassian JIRA (v7.6.3#76005)