[ 
https://issues.apache.org/jira/browse/FLINK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954190#comment-15954190
 ] 

Fabian Hueske commented on FLINK-6249:
--------------------------------------

Hi [~rtudoran],

thanks for creating the JIRA. 
I think the plan to implement the deduplication in the ProcessFunction is good. 
It gives more flexibility and might also allow to use user-defined aggregation 
functions with distinct.
For now, I would not implement DISTINCT for unbounded OVER windows.

I think the discussion of the non-OVER window DISTINCT cases is a bit confusing 
in the context of this issue. Can you move those parts into separate JIRAs to 
keep the discussion focused on DISTINCT with OVER windows?



> Distinct Aggregates for OVER window
> -----------------------------------
>
>                 Key: FLINK-6249
>                 URL: https://issues.apache.org/jira/browse/FLINK-6249
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>    Affects Versions: 1.3.0
>            Reporter: radu
>              Labels: features, patch
>
> Time target: ProcTime/EventTime
> SQL targeted query examples:
> ----------------------------
> Q1. Boundaries are expressed in windows and meant for the elements to be 
> aggregated
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.2. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() RANGE BETWEEN 
> INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.3. `SELECT SUM( DISTINCT  b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.4. `SELECT SUM( DISTINCT  b) OVER (ORDER BY rowTime() RANGE BETWEEN 
> INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1`
> General comments:
> -   DISTINCT operation makes sense only within the context of windows or
>     some bounded defined structures. Otherwise the operation would keep
>     an infinite amount of data to ensure uniqueness and would not
>     trigger for certain functions (e.g. aggregates)
> -   We can consider as a sub-JIRA issue the implementation of DISTINCT
>     for UNBOUND sliding windows. However, there would be no control      over 
> the data structure to keep seen data (to check it is not re-process). ->   
> This needs to be decided if we want to support it (to create appropriate JIRA 
> issues)
> => We will open sub-JIRA issues to extend the current functionality of 
> aggregates for the DISTINCT CASE (Q1.{1-4}).  (This is the main target of 
> this JIRA)
> =>   Aggregations over distinct elements without any boundary (i.e.
>     within SELECT clause) do not make sense just as aggregations do not
>     make sense without groupings or windows.
> Other similar query support
> ------------
> Q2. Boundaries are expressed in GROUP BY clause and distinct is applied for 
> the elements of the aggregate(s)
> `SELECT MIN( DISTINCT rowtime), prodID FROM stream1 GROUP BY FLOOR(procTime() 
> TO HOUR)`
> => We need to decide if we aim to support for this release distinct 
> aggregates for the group by (Q2). If so sub-JIRA issues need to be created. 
> We can follow the same design/implementation.
> => We can consider as a sub-JIRA issue the implementation of DISTINCT
>     for select clauses. However, there is no control over the growing
>     size of the data structure and it will unavoidably crash the memory.
> Q3. Distinct is applied to the collection of outputs to be selected.
> `SELECT STREAM DISTINCT procTime(), prodId  FROM stream1 GROUP BY 
> FLOOR(procTime() TO DAY)`
> Description:
> ------------
> The DISTINCT operator requires processing the elements to ensure
> uniqueness. Either that the operation is used for SELECT ALL distinct
> elements or for applying typical aggregation functions over a set of
> elements, there is a prior need of forming a collection of elements.
> This brings the need of using windows or grouping methods. Therefore the 
> distinct function will be implemented within windows. Depending on the
> type of window definition there are several options:
> -   Main Scope: If distinct is applied as in Q1 example for window 
> aggregations than either we extend the implementation with distinct 
> aggregates (less prefered) or extend the sliding window aggregates 
> implementation in the processFunction with distinctinction identification 
> support (prefered). The later option is prefered because a query can carry 
> multiple aggregates including multiple aggregates that have the distinct key 
> word set up. Implementing the distinction between elements in the process 
> function avoid the need to multiply the data structure to mark what what was 
> seen across multiple aggregates. It also makes the implementation more robust 
> and resilient as we cn keep the data structure for marking the seen elements 
> in a state (mapstate).
> -   If distinct is applied as in Q2 example on group elements than
>     either we define a new implementation if selection is general or
>     extend the current implementation of grouped aggregates with
>     distinct group aggregates
> -   If distinct is applied as in Q3 example for the select all elements,
>     then a new implementation needs to be defined. This would work over
>     a specific window and within the window function the uniqueness of
>     the results to be processed will be done.
> Functionality example
> ---------------------
> We exemplify below the functionality of the IN/Exists when working with
> streams.
> `Q1: SELECT STREAM DISTINCT b FROM stream1 GROUP BY FLOOR(PROCTIME TO HOUR) `
> `Q2: SELECT  COUNT(DISTINCT  b) FROM stream1 GROUP BY FLOOR(PROCTIME() TO 
> HOUR) `
> `Q3:  SELECT  sum(DISTINCT  a) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> ||Proctime||IngestionTime(Event)||Stream1||Q1||Q2||Q3||
> ||10:00:01|   (ab,1)|           |   | 1 |
> ||10:05:00| (aa,2)|       |   | 3 |
> ||11:00:00|         | ab,aa | 2 |   |
> ||11:03:00|   (aa,2)|       |   | 3 |
> ||11:09:00|   (aa,2 |       |   | 2 |
> ||12:00:00|             | aa    | 1 |   |
> |...|
> Implementation option
> ---------------------
> Considering that the behavior depends on over window behavior, the
> implementation will be done by reusing the existing implementation of the 
> over window functions - done based on processFunction. As mentioned in the 
> description section, there are 2 options to consider:
> 1)  Using distinct within the aggregates implementation by extending with 
> distinct aggregates implementation the current aggregates in Flink. For this 
> we define additional JIRA issues for each implementation to support the 
> distinct keyword.
> 2)  Using distinct for selection within the process logic when calling the 
> aggregates. This requires a new implementation of the process Function used 
> for computing the aggregates. The processFunction will also carry the logic 
> of taking each element once. For this  2 options are possible. Option 1 (To 
> be used within the ProcessFunction) trades memory – and would require  to 
> create a hashmap (e.g. mapstate) with binary values to mark if the event was 
> saw  before. This will be created once per window and will be reused across 
> multiple distinct aggregates. Option 2 trades computation and would require 
> to sort the window contents and in case of identical elements to eliminate 
> them. The sorting can be done based on hash values in case the events are  
> non numeric or composite or do not possess an id to mark the uniqueness.  
> Option 2 is not prefered for incremental aggregates and should be consider 
> only if certain aggregates would require a window implementation that 
> recomputes everything from scratch. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to