[ https://issues.apache.org/jira/browse/FLINK-3475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853647#comment-15853647 ]
ASF GitHub Bot commented on FLINK-3475: --------------------------------------- Github user docete commented on the issue: https://github.com/apache/flink/pull/3111 @fhueske Yes, I have checked the execution plan. It's very similar to your description: Take example for SQL "select sum(distinct a), sum(distinct b), sum(c) from expr", where expr is a table, and it has 3 fields: a, b, c. The explaination for the query is: ``` == Abstract Syntax Tree == LogicalAggregate(group=[{}], EXPR$0=[SUM(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[SUM($2)]) LogicalTableScan(table=[[expr]]) == Optimized Logical Plan == DataSetCalc(select=[EXPR$0, EXPR$1, EXPR$2]) DataSetSingleRowJoin(where=[true], join=[EXPR$2, EXPR$0, EXPR$1], joinType=[NestedLoopJoin]) DataSetSingleRowJoin(where=[true], join=[EXPR$2, EXPR$0], joinType=[NestedLoopJoin]) DataSetAggregate(select=[SUM(c) AS EXPR$2]) DataSetUnion(union=[a, b, c]) DataSetValues(tuples=[[{ null, null, null }]], values=[a, b, c]) DataSetScan(table=[[_DataSetTable_0]]) DataSetAggregate(select=[SUM(a) AS EXPR$0]) DataSetUnion(union=[a]) DataSetValues(tuples=[[{ null }]], values=[a]) DataSetAggregate(groupBy=[a], select=[a]) DataSetCalc(select=[a]) DataSetScan(table=[[_DataSetTable_0]]) DataSetAggregate(select=[SUM(b) AS EXPR$1]) DataSetUnion(union=[b]) DataSetValues(tuples=[[{ null }]], values=[b]) DataSetAggregate(groupBy=[b], select=[b]) DataSetCalc(select=[b]) DataSetScan(table=[[_DataSetTable_0]]) == Physical Execution Plan == Stage 8 : Data Source content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 14 : Data Source content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 13 : Map content : from: (a, b, c) ship_strategy : Forward exchange_mode : BATCH driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 12 : FlatMap content : select: (a) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : FlatMap Partitioning : RANDOM_PARTITIONED Stage 11 : Map content : prepare select: (a) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 10 : GroupCombine content : groupBy: (a), select: (a) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Sorted Combine Partitioning : RANDOM_PARTITIONED Stage 9 : GroupReduce content : groupBy: (a), select: (a) ship_strategy : Hash Partition on [0] exchange_mode : PIPELINED driver_strategy : Sorted Group Reduce Partitioning : RANDOM_PARTITIONED Stage 7 : Union content : ship_strategy : Redistribute exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED Stage 6 : Map content : prepare select: (SUM(a) AS EXPR$0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 5 : GroupCombine content : select:(SUM(a) AS EXPR$0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Group Reduce All Partitioning : RANDOM_PARTITIONED Stage 4 : GroupReduce content : select:(SUM(a) AS EXPR$0) ship_strategy : Redistribute exchange_mode : PIPELINED driver_strategy : Group Reduce All Partitioning : RANDOM_PARTITIONED Stage 19 : Data Source content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 20 : Map content : from: (a, b, c) ship_strategy : Forward exchange_mode : BATCH driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 18 : Union content : ship_strategy : Redistribute exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED Stage 17 : Map content : prepare select: (SUM(c) AS EXPR$2) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 16 : GroupCombine content : select:(SUM(c) AS EXPR$2) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Group Reduce All Partitioning : RANDOM_PARTITIONED Stage 15 : GroupReduce content : select:(SUM(c) AS EXPR$2) ship_strategy : Redistribute exchange_mode : PIPELINED driver_strategy : Group Reduce All Partitioning : RANDOM_PARTITIONED Stage 3 : FlatMap content : where: (true), join: (EXPR$2, EXPR$0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : FlatMap Partitioning : RANDOM_PARTITIONED Stage 25 : Data Source content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 30 : Map content : from: (a, b, c) ship_strategy : Forward exchange_mode : BATCH driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 29 : FlatMap content : select: (b) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : FlatMap Partitioning : RANDOM_PARTITIONED Stage 28 : Map content : prepare select: (b) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 27 : GroupCombine content : groupBy: (b), select: (b) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Sorted Combine Partitioning : RANDOM_PARTITIONED Stage 26 : GroupReduce content : groupBy: (b), select: (b) ship_strategy : Hash Partition on [0] exchange_mode : PIPELINED driver_strategy : Sorted Group Reduce Partitioning : RANDOM_PARTITIONED Stage 24 : Union content : ship_strategy : Redistribute exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED Stage 23 : Map content : prepare select: (SUM(b) AS EXPR$1) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 22 : GroupCombine content : select:(SUM(b) AS EXPR$1) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Group Reduce All Partitioning : RANDOM_PARTITIONED Stage 21 : GroupReduce content : select:(SUM(b) AS EXPR$1) ship_strategy : Redistribute exchange_mode : PIPELINED driver_strategy : Group Reduce All Partitioning : RANDOM_PARTITIONED Stage 2 : FlatMap content : where: (true), join: (EXPR$2, EXPR$0, EXPR$1) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : FlatMap Partitioning : RANDOM_PARTITIONED Stage 1 : FlatMap content : select: (EXPR$0, EXPR$1, EXPR$2) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : FlatMap Partitioning : RANDOM_PARTITIONED Stage 0 : Data Sink content : org.apache.flink.api.java.io.DiscardingOutputFormat ship_strategy : Forward exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED ``` > DISTINCT aggregate function support for SQL queries > --------------------------------------------------- > > Key: FLINK-3475 > URL: https://issues.apache.org/jira/browse/FLINK-3475 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Chengxiang Li > Assignee: Zhenghua Gao > > DISTINCT aggregate function may be able to reuse the aggregate function > instead of separate implementation, and let Flink runtime take care of > duplicate records. -- This message was sent by Atlassian JIRA (v6.3.15#6346)