[ https://issues.apache.org/jira/browse/FLINK-21923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kurt Young updated FLINK-21923: ------------------------------- Fix Version/s: (was: 1.13.0) 1.13.1 > SplitAggregateRule will be abnormal, when the sum/count and avg in SQL at the > same time > --------------------------------------------------------------------------------------- > > Key: FLINK-21923 > URL: https://issues.apache.org/jira/browse/FLINK-21923 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.10.0 > Reporter: tartarus > Assignee: tartarus > Priority: Major > Labels: pull-request-available > Fix For: 1.14.0, 1.13.1 > > > SplitAggregateRule optimizes one-layer aggregation to two-layer aggregation > to improve computing performance under data skew. > In the partial phase, avg will be translated into count and sum. If count > already exists in the original SQL at this time, the engine will remove the > duplicate count, and then add Project to calculate and restore the optimized > count result value. > {code:java} > relBuilder.aggregate( > relBuilder.groupKey(fullGroupSet, > ImmutableList.of[ImmutableBitSet](fullGroupSet)), > newPartialAggCalls) > relBuilder.peek().asInstanceOf[FlinkLogicalAggregate] > .setPartialFinalType(PartialFinalType.PARTIAL) > {code} > so `relBuilder.peek()` will return `FlinkLogicalCalc` not > `FlinkLogicalAggregate`, > then will throw exception like > {code:java} > java.lang.ClassCastException: > org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc cannot be > cast to > org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate > at > org.apache.flink.table.planner.plan.rules.logical.SplitAggregateRule.onMatch(SplitAggregateRule.scala:286) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) > at > org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) > at > org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) > at > org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) > at > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) > at > org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:284) > at > org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:889) > at > org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:780) > at > org.apache.flink.table.planner.utils.TableTestUtilBase.verifyPlan(TableTestBase.scala:283) > at > org.apache.flink.table.planner.plan.rules.logical.SplitAggregateRuleTest.testAggWithFilterClause2(SplitAggregateRuleTest.scala:205) > {code} > We can reproduce stably and pass the test cases in `SplitAggregateRuleTest` > {code:java} > @Test > def testAggFilterClauseBothWithAvgAndCount(): Unit = { > util.tableEnv.getConfig.getConfiguration.setBoolean( > OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true) > val sqlQuery = > s""" > |SELECT > | a, > | COUNT(DISTINCT b) FILTER (WHERE NOT b = 2), > | SUM(b) FILTER (WHERE NOT b = 5), > | COUNT(b), > | AVG(b), > | SUM(b) > |FROM MyTable > |GROUP BY a > |""".stripMargin > util.verifyRelPlan(sqlQuery) > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)