[ https://issues.apache.org/jira/browse/FLINK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954190#comment-15954190 ]
Fabian Hueske commented on FLINK-6249: -------------------------------------- Hi [~rtudoran], thanks for creating the JIRA. I think the plan to implement the deduplication in the ProcessFunction is good. It gives more flexibility and might also allow to use user-defined aggregation functions with distinct. For now, I would not implement DISTINCT for unbounded OVER windows. I think the discussion of the non-OVER window DISTINCT cases is a bit confusing in the context of this issue. Can you move those parts into separate JIRAs to keep the discussion focused on DISTINCT with OVER windows? > Distinct Aggregates for OVER window > ----------------------------------- > > Key: FLINK-6249 > URL: https://issues.apache.org/jira/browse/FLINK-6249 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Affects Versions: 1.3.0 > Reporter: radu > Labels: features, patch > > Time target: ProcTime/EventTime > SQL targeted query examples: > ---------------------------- > Q1. Boundaries are expressed in windows and meant for the elements to be > aggregated > Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.2. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() RANGE BETWEEN > INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` > Q1.3. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.4. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() RANGE BETWEEN > INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` > General comments: > - DISTINCT operation makes sense only within the context of windows or > some bounded defined structures. Otherwise the operation would keep > an infinite amount of data to ensure uniqueness and would not > trigger for certain functions (e.g. aggregates) > - We can consider as a sub-JIRA issue the implementation of DISTINCT > for UNBOUND sliding windows. However, there would be no control over > the data structure to keep seen data (to check it is not re-process). -> > This needs to be decided if we want to support it (to create appropriate JIRA > issues) > => We will open sub-JIRA issues to extend the current functionality of > aggregates for the DISTINCT CASE (Q1.{1-4}). (This is the main target of > this JIRA) > => Aggregations over distinct elements without any boundary (i.e. > within SELECT clause) do not make sense just as aggregations do not > make sense without groupings or windows. > Other similar query support > ------------ > Q2. Boundaries are expressed in GROUP BY clause and distinct is applied for > the elements of the aggregate(s) > `SELECT MIN( DISTINCT rowtime), prodID FROM stream1 GROUP BY FLOOR(procTime() > TO HOUR)` > => We need to decide if we aim to support for this release distinct > aggregates for the group by (Q2). If so sub-JIRA issues need to be created. > We can follow the same design/implementation. > => We can consider as a sub-JIRA issue the implementation of DISTINCT > for select clauses. However, there is no control over the growing > size of the data structure and it will unavoidably crash the memory. > Q3. Distinct is applied to the collection of outputs to be selected. > `SELECT STREAM DISTINCT procTime(), prodId FROM stream1 GROUP BY > FLOOR(procTime() TO DAY)` > Description: > ------------ > The DISTINCT operator requires processing the elements to ensure > uniqueness. Either that the operation is used for SELECT ALL distinct > elements or for applying typical aggregation functions over a set of > elements, there is a prior need of forming a collection of elements. > This brings the need of using windows or grouping methods. Therefore the > distinct function will be implemented within windows. Depending on the > type of window definition there are several options: > - Main Scope: If distinct is applied as in Q1 example for window > aggregations than either we extend the implementation with distinct > aggregates (less prefered) or extend the sliding window aggregates > implementation in the processFunction with distinctinction identification > support (prefered). The later option is prefered because a query can carry > multiple aggregates including multiple aggregates that have the distinct key > word set up. Implementing the distinction between elements in the process > function avoid the need to multiply the data structure to mark what what was > seen across multiple aggregates. It also makes the implementation more robust > and resilient as we cn keep the data structure for marking the seen elements > in a state (mapstate). > - If distinct is applied as in Q2 example on group elements than > either we define a new implementation if selection is general or > extend the current implementation of grouped aggregates with > distinct group aggregates > - If distinct is applied as in Q3 example for the select all elements, > then a new implementation needs to be defined. This would work over > a specific window and within the window function the uniqueness of > the results to be processed will be done. > Functionality example > --------------------- > We exemplify below the functionality of the IN/Exists when working with > streams. > `Q1: SELECT STREAM DISTINCT b FROM stream1 GROUP BY FLOOR(PROCTIME TO HOUR) ` > `Q2: SELECT COUNT(DISTINCT b) FROM stream1 GROUP BY FLOOR(PROCTIME() TO > HOUR) ` > `Q3: SELECT sum(DISTINCT a) OVER (ORDER BY procTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > ||Proctime||IngestionTime(Event)||Stream1||Q1||Q2||Q3|| > ||10:00:01| (ab,1)| | | 1 | > ||10:05:00| (aa,2)| | | 3 | > ||11:00:00| | ab,aa | 2 | | > ||11:03:00| (aa,2)| | | 3 | > ||11:09:00| (aa,2 | | | 2 | > ||12:00:00| | aa | 1 | | > |...| > Implementation option > --------------------- > Considering that the behavior depends on over window behavior, the > implementation will be done by reusing the existing implementation of the > over window functions - done based on processFunction. As mentioned in the > description section, there are 2 options to consider: > 1) Using distinct within the aggregates implementation by extending with > distinct aggregates implementation the current aggregates in Flink. For this > we define additional JIRA issues for each implementation to support the > distinct keyword. > 2) Using distinct for selection within the process logic when calling the > aggregates. This requires a new implementation of the process Function used > for computing the aggregates. The processFunction will also carry the logic > of taking each element once. For this 2 options are possible. Option 1 (To > be used within the ProcessFunction) trades memory – and would require to > create a hashmap (e.g. mapstate) with binary values to mark if the event was > saw before. This will be created once per window and will be reused across > multiple distinct aggregates. Option 2 trades computation and would require > to sort the window contents and in case of identical elements to eliminate > them. The sorting can be done based on hash values in case the events are > non numeric or composite or do not possess an id to mark the uniqueness. > Option 2 is not prefered for incremental aggregates and should be consider > only if certain aggregates would require a window implementation that > recomputes everything from scratch. -- This message was sent by Atlassian JIRA (v6.3.15#6346)