[ https://issues.apache.org/jira/browse/FLINK-31165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703655#comment-17703655 ]
Jane Chan commented on FLINK-31165: ----------------------------------- The current behavior under the Flink batch is the same as under streaming mode since this rewrite rule is applied during the LogicalWindow creation. My concern mainly comes from the implementation aspect. The reason that caused this problem is the use of constant folding optimization when creating LogicalWindow, which leads to the orderByKey being empty when passed to FlinkLogicalOverAggregateConverter. There are two possible solutions. The first one is to remove the constant folding optimization or add some judgment here, such as giving up optimization when orderByKey becomes empty after optimization. The second one is to remove the check of orderByKey in FlinkLogicalOverAggregateConverter, but then the problem becomes how to distinguish between order by constants and no order by clause. > Over Agg: The window rank function without order by error in top N query > ------------------------------------------------------------------------ > > Key: FLINK-31165 > URL: https://issues.apache.org/jira/browse/FLINK-31165 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.16.0 > Reporter: P Rohan Kumar > Priority: Major > > > {code:java} > val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(env) > val td = TableDescriptor.forConnector("datagen").option("rows-per-second", > "10") > .option("number-of-rows", "10") > .schema(Schema > .newBuilder() > .column("NAME", DataTypes.VARCHAR(2147483647)) > .column("ROLLNO", DataTypes.DECIMAL(5, 0)) > .column("DOB", DataTypes.DATE()) > .column("CLASS", DataTypes.DECIMAL(2, 0)) > .column("SUBJECT", DataTypes.VARCHAR(2147483647)) > .build()) > .build() > val table = tableEnv.from(td) > tableEnv.createTemporaryView("temp_table", table) > val newTable = tableEnv.sqlQuery("select temp_table.*,cast('2022-01-01' as > date) SRC_NO from temp_table") > tableEnv.createTemporaryView("temp_table2", newTable) > val newTable2 = tableEnv.sqlQuery("select * from (select > NAME,ROLLNO,row_number() over (partition by NAME ORDER BY SRC_NO) AS rownum > from temp_table2 a) where rownum <= 1") > tableEnv.toChangelogStream(newTable2).print() > env.execute() > {code} > > > I am getting the below error if I run the above code. > I have already provided an order by column. > If I change the order by column to some other column, such as "SUBJECT", then > the job runs fine. > > > {code:java} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule FlinkLogicalOverAggregateConverter(in:NONE,out:LOGICAL), args > [rel#245:LogicalWindow.NONE.any.None: > 0.[NONE].[NONE](input=RelSubset#244,window#0=window(partition {0} rows > between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()]))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) > at > org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195) > at > org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224) > at > org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219) > at > org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.scala:160) > at org.example.OverAggregateBug$.main(OverAggregateBug.scala:39) > at org.example.OverAggregateBug.main(OverAggregateBug.scala) > Caused by: org.apache.flink.table.api.ValidationException: Over Agg: The > window rank function without order by. please re-check the over window > statement. > at > org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$2(FlinkLogicalOverAggregate.scala:95) > at > org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$2$adapted(FlinkLogicalOverAggregate.scala:92) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at > org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$1(FlinkLogicalOverAggregate.scala:92) > at > org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$1$adapted(FlinkLogicalOverAggregate.scala:89) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at > org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.convert(FlinkLogicalOverAggregate.scala:89) > at > org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:167) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229) > ... 27 more {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)