[ https://issues.apache.org/jira/browse/FLINK-23353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17382014#comment-17382014 ]
Wenlong Lyu commented on FLINK-23353: ------------------------------------- hi, [~hayden zhou], Table Aggregate only supported in streaming mode by designed, as you can see in FLINK-13471 / FLIP-29 (https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739) > UDTAGG can't execute in Batch mode > ---------------------------------- > > Key: FLINK-23353 > URL: https://issues.apache.org/jira/browse/FLINK-23353 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.13.1 > Reporter: hayden zhou > Priority: Major > > {code:java} > public class Top2Test { > public static void main(String[] args) { > EnvironmentSettings settings = > EnvironmentSettings.newInstance().inBatchMode()build(); > TableEnvironment tEnv = TableEnvironment.create(settings); > Table sourceTable = tEnv.fromValues( > DataTypes.ROW( > DataTypes.FIELD("id", DataTypes.INT()), > DataTypes.FIELD("name",DataTypes.STRING()), > DataTypes.FIELD("price", DataTypes.INT()) > ), > row(1, "hayden", 18), > row(3, "hayden", 19), > row(4, "hayden", 20), > row(2, "jaylin", 20) > ); > tEnv.createTemporaryView("source", sourceTable); > Table rT = tEnv.from("source") > .groupBy($("name")) > .flatAggregate(call(Top2.class, $("price")).as("price", > "rank")) > .select($("name"), $("price"), $("rank")); > rT.execute().print(); > } > public static class Top2Accumulator { > public Integer first; > public Integer second; > } > public static class Top2 extends TableAggregateFunction<Tuple2<Integer, > Integer>, Top2Accumulator> { > @Override > public Top2Accumulator createAccumulator() { > Top2Accumulator acc = new Top2Accumulator(); > acc.first = Integer.MIN_VALUE; > acc.second = Integer.MIN_VALUE; > return acc; > } > public void accumulate(Top2Accumulator acc, Integer value) { > if (value > acc.first) { > acc.second = acc.first; > acc.first = value; > } else if (value > acc.second) { > acc.second = value; > } > } > public void merge(Top2Accumulator acc, Iterable<Top2Accumulator> it) { > for (Top2Accumulator otherAcc : it) { > accumulate(acc, otherAcc.first); > accumulate(acc, otherAcc.second); > } > } > public void emitValue(Top2Accumulator acc, Collector<Tuple2<Integer, > Integer>> out) { > if (acc.first != Integer.MIN_VALUE) { > out.collect(Tuple2.of(acc.first, 1)); > } > if (acc.second != Integer.MIN_VALUE) { > out.collect(Tuple2.of(acc.second, 2)); > } > } > } > } > {code} > got errors as below: > {code:java} > Exception in thread "main" org.apache.flink.table.api.TableException: Cannot > generate a valid execution plan for the given query: > LogicalSink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1], > fields=[name, price, rank]) > +- LogicalProject(name=[AS($0, _UTF-16LE'name')], price=[AS($1, > _UTF-16LE'price')], rank=[AS($2, _UTF-16LE'rank')]) > +- LogicalTableAggregate(group=[{1}], > tableAggregate=[[flinktest$Top2Test$Top2$4619034833a29d53c136506047509219($2)]]) > +- LogicalUnion(all=[true]) > :- LogicalProject(id=[CAST(1):INTEGER], > name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET > "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], > price=[CAST(18):INTEGER]) > : +- LogicalValues(tuples=[[{ 0 }]]) > :- LogicalProject(id=[CAST(3):INTEGER], > name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET > "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], > price=[CAST(19):INTEGER]) > : +- LogicalValues(tuples=[[{ 0 }]]) > :- LogicalProject(id=[CAST(4):INTEGER], > name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET > "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], > price=[CAST(20):INTEGER]) > : +- LogicalValues(tuples=[[{ 0 }]]) > +- LogicalProject(id=[CAST(2):INTEGER], > name=[CAST(_UTF-16LE'jaylin':VARCHAR(2147483647) CHARACTER SET > "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], > price=[CAST(20):INTEGER]) > +- LogicalValues(tuples=[[{ 0 }]]) > This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:72) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:87) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:46) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:46) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:46) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:791) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1225) > at > org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:577) > at flinktest.Top2Test.main(Top2Test.java:37) > Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There > are not enough rules to produce a node with desired properties: > convention=LOGICAL, FlinkRelDistributionTraitDef=any, sort=[]. > Missing conversion is LogicalTableAggregate[convention: NONE -> LOGICAL] > There is 1 empty subset: rel#436:RelSubset#6.LOGICAL.any.[], the relevant > part of the original plan is as follows > 409:LogicalTableAggregate(group=[{1}], > tableAggregate=[[flinktest$Top2Test$Top2$4619034833a29d53c136506047509219($2)]]) > 407:LogicalUnion(subset=[rel#408:RelSubset#5.NONE.any.[]], all=[true]) > 399:LogicalProject(subset=[rel#400:RelSubset#1.NONE.any.[]], > id=[CAST(1):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], > price=[CAST(18):INTEGER]) > 0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 > }]]) > 401:LogicalProject(subset=[rel#402:RelSubset#2.NONE.any.[]], > id=[CAST(3):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], > price=[CAST(19):INTEGER]) > 0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 > }]]) > 403:LogicalProject(subset=[rel#404:RelSubset#3.NONE.any.[]], > id=[CAST(4):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], > price=[CAST(20):INTEGER]) > 0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 > }]]) > 405:LogicalProject(subset=[rel#406:RelSubset#4.NONE.any.[]], > id=[CAST(2):INTEGER], name=[CAST(_UTF-16LE'jaylin':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], > price=[CAST(20):INTEGER]) > 0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 > }]]) > {code} > if delete inBatchMode() method in > {code:java} > EnvironmentSettings settings = > EnvironmentSettings.newInstance().inBatchMode().build(); > {code} > then it will running normaly -- This message was sent by Atlassian Jira (v8.3.4#803005)