[ 
https://issues.apache.org/jira/browse/FLINK-3475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853647#comment-15853647
 ] 

ASF GitHub Bot commented on FLINK-3475:
---------------------------------------

Github user docete commented on the issue:

    https://github.com/apache/flink/pull/3111
  
    @fhueske Yes, I have checked the execution plan. It's very similar to your 
description:
    
    Take example for SQL "select sum(distinct a), sum(distinct b), sum(c) from 
expr", where expr is a table, and it has 3 fields: a, b, c.
    
    The explaination for the query is:
    ```
    == Abstract Syntax Tree ==
    LogicalAggregate(group=[{}], EXPR$0=[SUM(DISTINCT $0)], 
EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[SUM($2)])
      LogicalTableScan(table=[[expr]])
    
    == Optimized Logical Plan ==
    DataSetCalc(select=[EXPR$0, EXPR$1, EXPR$2])
      DataSetSingleRowJoin(where=[true], join=[EXPR$2, EXPR$0, EXPR$1], 
joinType=[NestedLoopJoin])
        DataSetSingleRowJoin(where=[true], join=[EXPR$2, EXPR$0], 
joinType=[NestedLoopJoin])
          DataSetAggregate(select=[SUM(c) AS EXPR$2])
            DataSetUnion(union=[a, b, c])
              DataSetValues(tuples=[[{ null, null, null }]], values=[a, b, c])
              DataSetScan(table=[[_DataSetTable_0]])
          DataSetAggregate(select=[SUM(a) AS EXPR$0])
            DataSetUnion(union=[a])
              DataSetValues(tuples=[[{ null }]], values=[a])
              DataSetAggregate(groupBy=[a], select=[a])
                DataSetCalc(select=[a])
                  DataSetScan(table=[[_DataSetTable_0]])
        DataSetAggregate(select=[SUM(b) AS EXPR$1])
          DataSetUnion(union=[b])
            DataSetValues(tuples=[[{ null }]], values=[b])
            DataSetAggregate(groupBy=[b], select=[b])
              DataSetCalc(select=[b])
                DataSetScan(table=[[_DataSetTable_0]])
    
    == Physical Execution Plan ==
    Stage 8 : Data Source
        content : collect elements with CollectionInputFormat
        Partitioning : RANDOM_PARTITIONED
    
    Stage 14 : Data Source
        content : collect elements with CollectionInputFormat
        Partitioning : RANDOM_PARTITIONED
    
        Stage 13 : Map
                content : from: (a, b, c)
                ship_strategy : Forward
                exchange_mode : BATCH
                driver_strategy : Map
                Partitioning : RANDOM_PARTITIONED
    
                Stage 12 : FlatMap
                        content : select: (a)
                        ship_strategy : Forward
                        exchange_mode : PIPELINED
                        driver_strategy : FlatMap
                        Partitioning : RANDOM_PARTITIONED
    
                        Stage 11 : Map
                                content : prepare select: (a)
                                ship_strategy : Forward
                                exchange_mode : PIPELINED
                                driver_strategy : Map
                                Partitioning : RANDOM_PARTITIONED
    
                                Stage 10 : GroupCombine
                                        content : groupBy: (a), select: (a)
                                        ship_strategy : Forward
                                        exchange_mode : PIPELINED
                                        driver_strategy : Sorted Combine
                                        Partitioning : RANDOM_PARTITIONED
    
                                        Stage 9 : GroupReduce
                                                content : groupBy: (a), select: 
(a)
                                                ship_strategy : Hash Partition 
on [0]
                                                exchange_mode : PIPELINED
                                                driver_strategy : Sorted Group 
Reduce
                                                Partitioning : 
RANDOM_PARTITIONED
    
                                                Stage 7 : Union
                                                        content : 
                                                        ship_strategy : 
Redistribute
                                                        exchange_mode : 
PIPELINED
                                                        Partitioning : 
RANDOM_PARTITIONED
    
                                                        Stage 6 : Map
                                                                content : 
prepare select: (SUM(a) AS EXPR$0)
                                                                ship_strategy : 
Forward
                                                                exchange_mode : 
PIPELINED
                                                                driver_strategy 
: Map
                                                                Partitioning : 
RANDOM_PARTITIONED
    
                                                                Stage 5 : 
GroupCombine
                                                                        content 
: select:(SUM(a) AS EXPR$0)
                                                                        
ship_strategy : Forward
                                                                        
exchange_mode : PIPELINED
                                                                        
driver_strategy : Group Reduce All
                                                                        
Partitioning : RANDOM_PARTITIONED
    
                                                                        Stage 4 
: GroupReduce
                                                                                
content : select:(SUM(a) AS EXPR$0)
                                                                                
ship_strategy : Redistribute
                                                                                
exchange_mode : PIPELINED
                                                                                
driver_strategy : Group Reduce All
                                                                                
Partitioning : RANDOM_PARTITIONED
    
    Stage 19 : Data Source
        content : collect elements with CollectionInputFormat
        Partitioning : RANDOM_PARTITIONED
    
        Stage 20 : Map
                content : from: (a, b, c)
                ship_strategy : Forward
                exchange_mode : BATCH
                driver_strategy : Map
                Partitioning : RANDOM_PARTITIONED
    
                Stage 18 : Union
                        content : 
                        ship_strategy : Redistribute
                        exchange_mode : PIPELINED
                        Partitioning : RANDOM_PARTITIONED
    
                        Stage 17 : Map
                                content : prepare select: (SUM(c) AS EXPR$2)
                                ship_strategy : Forward
                                exchange_mode : PIPELINED
                                driver_strategy : Map
                                Partitioning : RANDOM_PARTITIONED
    
                                Stage 16 : GroupCombine
                                        content : select:(SUM(c) AS EXPR$2)
                                        ship_strategy : Forward
                                        exchange_mode : PIPELINED
                                        driver_strategy : Group Reduce All
                                        Partitioning : RANDOM_PARTITIONED
    
                                        Stage 15 : GroupReduce
                                                content : select:(SUM(c) AS 
EXPR$2)
                                                ship_strategy : Redistribute
                                                exchange_mode : PIPELINED
                                                driver_strategy : Group Reduce 
All
                                                Partitioning : 
RANDOM_PARTITIONED
    
                                                Stage 3 : FlatMap
                                                        content : where: 
(true), join: (EXPR$2, EXPR$0)
                                                        ship_strategy : Forward
                                                        exchange_mode : 
PIPELINED
                                                        driver_strategy : 
FlatMap
                                                        Partitioning : 
RANDOM_PARTITIONED
    
    Stage 25 : Data Source
        content : collect elements with CollectionInputFormat
        Partitioning : RANDOM_PARTITIONED
    
        Stage 30 : Map
                content : from: (a, b, c)
                ship_strategy : Forward
                exchange_mode : BATCH
                driver_strategy : Map
                Partitioning : RANDOM_PARTITIONED
    
                Stage 29 : FlatMap
                        content : select: (b)
                        ship_strategy : Forward
                        exchange_mode : PIPELINED
                        driver_strategy : FlatMap
                        Partitioning : RANDOM_PARTITIONED
    
                        Stage 28 : Map
                                content : prepare select: (b)
                                ship_strategy : Forward
                                exchange_mode : PIPELINED
                                driver_strategy : Map
                                Partitioning : RANDOM_PARTITIONED
    
                                Stage 27 : GroupCombine
                                        content : groupBy: (b), select: (b)
                                        ship_strategy : Forward
                                        exchange_mode : PIPELINED
                                        driver_strategy : Sorted Combine
                                        Partitioning : RANDOM_PARTITIONED
    
                                        Stage 26 : GroupReduce
                                                content : groupBy: (b), select: 
(b)
                                                ship_strategy : Hash Partition 
on [0]
                                                exchange_mode : PIPELINED
                                                driver_strategy : Sorted Group 
Reduce
                                                Partitioning : 
RANDOM_PARTITIONED
    
                                                Stage 24 : Union
                                                        content : 
                                                        ship_strategy : 
Redistribute
                                                        exchange_mode : 
PIPELINED
                                                        Partitioning : 
RANDOM_PARTITIONED
    
                                                        Stage 23 : Map
                                                                content : 
prepare select: (SUM(b) AS EXPR$1)
                                                                ship_strategy : 
Forward
                                                                exchange_mode : 
PIPELINED
                                                                driver_strategy 
: Map
                                                                Partitioning : 
RANDOM_PARTITIONED
    
                                                                Stage 22 : 
GroupCombine
                                                                        content 
: select:(SUM(b) AS EXPR$1)
                                                                        
ship_strategy : Forward
                                                                        
exchange_mode : PIPELINED
                                                                        
driver_strategy : Group Reduce All
                                                                        
Partitioning : RANDOM_PARTITIONED
    
                                                                        Stage 
21 : GroupReduce
                                                                                
content : select:(SUM(b) AS EXPR$1)
                                                                                
ship_strategy : Redistribute
                                                                                
exchange_mode : PIPELINED
                                                                                
driver_strategy : Group Reduce All
                                                                                
Partitioning : RANDOM_PARTITIONED
    
                                                                                
Stage 2 : FlatMap
                                                                                
        content : where: (true), join: (EXPR$2, EXPR$0, EXPR$1)
                                                                                
        ship_strategy : Forward
                                                                                
        exchange_mode : PIPELINED
                                                                                
        driver_strategy : FlatMap
                                                                                
        Partitioning : RANDOM_PARTITIONED
    
                                                                                
        Stage 1 : FlatMap
                                                                                
                content : select: (EXPR$0, EXPR$1, EXPR$2)
                                                                                
                ship_strategy : Forward
                                                                                
                exchange_mode : PIPELINED
                                                                                
                driver_strategy : FlatMap
                                                                                
                Partitioning : RANDOM_PARTITIONED
    
                                                                                
                Stage 0 : Data Sink
                                                                                
                        content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
                                                                                
                        ship_strategy : Forward
                                                                                
                        exchange_mode : PIPELINED
                                                                                
                        Partitioning : RANDOM_PARTITIONED
    
    ```


> DISTINCT aggregate function support for SQL queries
> ---------------------------------------------------
>
>                 Key: FLINK-3475
>                 URL: https://issues.apache.org/jira/browse/FLINK-3475
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Chengxiang Li
>            Assignee: Zhenghua Gao
>
> DISTINCT aggregate function may be able to reuse the aggregate function 
> instead of separate implementation, and let Flink runtime take care of 
> duplicate records.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to