[ https://issues.apache.org/jira/browse/FLINK-31165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703643#comment-17703643 ]
lincoln lee commented on FLINK-31165: ------------------------------------- [~qingyue] can you verify the behavior of this case under flink batch? Personally I prefer to keep a unified behavior on streaming and batch, non-determinism should not be the only reason to reject the query, because similar proctime based computations on streaming are also mostly non-deterministic, WDYT? > Over Agg: The window rank function without order by error in top N query > ------------------------------------------------------------------------ > > Key: FLINK-31165 > URL: https://issues.apache.org/jira/browse/FLINK-31165 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.16.0 > Reporter: P Rohan Kumar > Priority: Major > > > {code:java} > val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(env) > val td = TableDescriptor.forConnector("datagen").option("rows-per-second", > "10") > .option("number-of-rows", "10") > .schema(Schema > .newBuilder() > .column("NAME", DataTypes.VARCHAR(2147483647)) > .column("ROLLNO", DataTypes.DECIMAL(5, 0)) > .column("DOB", DataTypes.DATE()) > .column("CLASS", DataTypes.DECIMAL(2, 0)) > .column("SUBJECT", DataTypes.VARCHAR(2147483647)) > .build()) > .build() > val table = tableEnv.from(td) > tableEnv.createTemporaryView("temp_table", table) > val newTable = tableEnv.sqlQuery("select temp_table.*,cast('2022-01-01' as > date) SRC_NO from temp_table") > tableEnv.createTemporaryView("temp_table2", newTable) > val newTable2 = tableEnv.sqlQuery("select * from (select > NAME,ROLLNO,row_number() over (partition by NAME ORDER BY SRC_NO) AS rownum > from temp_table2 a) where rownum <= 1") > tableEnv.toChangelogStream(newTable2).print() > env.execute() > {code} > > > I am getting the below error if I run the above code. > I have already provided an order by column. > If I change the order by column to some other column, such as "SUBJECT", then > the job runs fine. > > > {code:java} > Exception in thread "main" java.lang.RuntimeException: Error while applying > rule FlinkLogicalOverAggregateConverter(in:NONE,out:LOGICAL), args > [rel#245:LogicalWindow.NONE.any.None: > 0.[NONE].[NONE](input=RelSubset#244,window#0=window(partition {0} rows > between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()]))] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) > at > org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187) > at > scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195) > at > org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224) > at > org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219) > at > org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.scala:160) > at org.example.OverAggregateBug$.main(OverAggregateBug.scala:39) > at org.example.OverAggregateBug.main(OverAggregateBug.scala) > Caused by: org.apache.flink.table.api.ValidationException: Over Agg: The > window rank function without order by. please re-check the over window > statement. > at > org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$2(FlinkLogicalOverAggregate.scala:95) > at > org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$2$adapted(FlinkLogicalOverAggregate.scala:92) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at > org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$1(FlinkLogicalOverAggregate.scala:92) > at > org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$1$adapted(FlinkLogicalOverAggregate.scala:89) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at > org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.convert(FlinkLogicalOverAggregate.scala:89) > at > org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:167) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229) > ... 27 more {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)