Github user docete commented on the issue: https://github.com/apache/flink/pull/3111 @fhueske Yes, I have checked the execution plan. It's very similar to your description: Take example for SQL "select sum(distinct a), sum(distinct b), sum(c) from expr", where expr is a table, and it has 3 fields: a, b, c. The explaination for the query is: ``` == Abstract Syntax Tree == LogicalAggregate(group=[{}], EXPR$0=[SUM(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[SUM($2)]) LogicalTableScan(table=[[expr]]) == Optimized Logical Plan == DataSetCalc(select=[EXPR$0, EXPR$1, EXPR$2]) DataSetSingleRowJoin(where=[true], join=[EXPR$2, EXPR$0, EXPR$1], joinType=[NestedLoopJoin]) DataSetSingleRowJoin(where=[true], join=[EXPR$2, EXPR$0], joinType=[NestedLoopJoin]) DataSetAggregate(select=[SUM(c) AS EXPR$2]) DataSetUnion(union=[a, b, c]) DataSetValues(tuples=[[{ null, null, null }]], values=[a, b, c]) DataSetScan(table=[[_DataSetTable_0]]) DataSetAggregate(select=[SUM(a) AS EXPR$0]) DataSetUnion(union=[a]) DataSetValues(tuples=[[{ null }]], values=[a]) DataSetAggregate(groupBy=[a], select=[a]) DataSetCalc(select=[a]) DataSetScan(table=[[_DataSetTable_0]]) DataSetAggregate(select=[SUM(b) AS EXPR$1]) DataSetUnion(union=[b]) DataSetValues(tuples=[[{ null }]], values=[b]) DataSetAggregate(groupBy=[b], select=[b]) DataSetCalc(select=[b]) DataSetScan(table=[[_DataSetTable_0]]) == Physical Execution Plan == Stage 8 : Data Source content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 14 : Data Source content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 13 : Map content : from: (a, b, c) ship_strategy : Forward exchange_mode : BATCH driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 12 : FlatMap content : select: (a) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : FlatMap Partitioning : RANDOM_PARTITIONED Stage 11 : Map content : prepare select: (a) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 10 : GroupCombine content : groupBy: (a), select: (a) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Sorted Combine Partitioning : RANDOM_PARTITIONED Stage 9 : GroupReduce content : groupBy: (a), select: (a) ship_strategy : Hash Partition on [0] exchange_mode : PIPELINED driver_strategy : Sorted Group Reduce Partitioning : RANDOM_PARTITIONED Stage 7 : Union content : ship_strategy : Redistribute exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED Stage 6 : Map content : prepare select: (SUM(a) AS EXPR$0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 5 : GroupCombine content : select:(SUM(a) AS EXPR$0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Group Reduce All Partitioning : RANDOM_PARTITIONED Stage 4 : GroupReduce content : select:(SUM(a) AS EXPR$0) ship_strategy : Redistribute exchange_mode : PIPELINED driver_strategy : Group Reduce All Partitioning : RANDOM_PARTITIONED Stage 19 : Data Source content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 20 : Map content : from: (a, b, c) ship_strategy : Forward exchange_mode : BATCH driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 18 : Union content : ship_strategy : Redistribute exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED Stage 17 : Map content : prepare select: (SUM(c) AS EXPR$2) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 16 : GroupCombine content : select:(SUM(c) AS EXPR$2) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Group Reduce All Partitioning : RANDOM_PARTITIONED Stage 15 : GroupReduce content : select:(SUM(c) AS EXPR$2) ship_strategy : Redistribute exchange_mode : PIPELINED driver_strategy : Group Reduce All Partitioning : RANDOM_PARTITIONED Stage 3 : FlatMap content : where: (true), join: (EXPR$2, EXPR$0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : FlatMap Partitioning : RANDOM_PARTITIONED Stage 25 : Data Source content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 30 : Map content : from: (a, b, c) ship_strategy : Forward exchange_mode : BATCH driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 29 : FlatMap content : select: (b) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : FlatMap Partitioning : RANDOM_PARTITIONED Stage 28 : Map content : prepare select: (b) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 27 : GroupCombine content : groupBy: (b), select: (b) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Sorted Combine Partitioning : RANDOM_PARTITIONED Stage 26 : GroupReduce content : groupBy: (b), select: (b) ship_strategy : Hash Partition on [0] exchange_mode : PIPELINED driver_strategy : Sorted Group Reduce Partitioning : RANDOM_PARTITIONED Stage 24 : Union content : ship_strategy : Redistribute exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED Stage 23 : Map content : prepare select: (SUM(b) AS EXPR$1) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 22 : GroupCombine content : select:(SUM(b) AS EXPR$1) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Group Reduce All Partitioning : RANDOM_PARTITIONED Stage 21 : GroupReduce content : select:(SUM(b) AS EXPR$1) ship_strategy : Redistribute exchange_mode : PIPELINED driver_strategy : Group Reduce All Partitioning : RANDOM_PARTITIONED Stage 2 : FlatMap content : where: (true), join: (EXPR$2, EXPR$0, EXPR$1) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : FlatMap Partitioning : RANDOM_PARTITIONED Stage 1 : FlatMap content : select: (EXPR$0, EXPR$1, EXPR$2) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : FlatMap Partitioning : RANDOM_PARTITIONED Stage 0 : Data Sink content : org.apache.flink.api.java.io.DiscardingOutputFormat ship_strategy : Forward exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED ```
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---