[ 
https://issues.apache.org/jira/browse/FLINK-31165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703655#comment-17703655
 ] 

Jane Chan commented on FLINK-31165:
-----------------------------------

The current behavior under the Flink batch is the same as under streaming mode 
since this rewrite rule is applied during the LogicalWindow creation. 

My concern mainly comes from the implementation aspect. The reason that caused 
this problem is the use of constant folding optimization when creating 
LogicalWindow, which leads to the orderByKey being empty when passed to 
FlinkLogicalOverAggregateConverter.

There are two possible solutions. The first one is to remove the constant 
folding optimization or add some judgment here, such as giving up optimization 
when orderByKey becomes empty after optimization. The second one is to remove 
the check of orderByKey in FlinkLogicalOverAggregateConverter, but then the 
problem becomes how to distinguish between order by constants and no order by 
clause. 

> Over Agg: The window rank function without order by error in top N query
> ------------------------------------------------------------------------
>
>                 Key: FLINK-31165
>                 URL: https://issues.apache.org/jira/browse/FLINK-31165
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.16.0
>            Reporter: P Rohan Kumar
>            Priority: Major
>
>  
> {code:java}
> val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(env)
> val td = TableDescriptor.forConnector("datagen").option("rows-per-second", 
> "10")
>   .option("number-of-rows", "10")
>   .schema(Schema
>     .newBuilder()
>     .column("NAME", DataTypes.VARCHAR(2147483647))
>     .column("ROLLNO", DataTypes.DECIMAL(5, 0))
>     .column("DOB", DataTypes.DATE())
>     .column("CLASS", DataTypes.DECIMAL(2, 0))
>     .column("SUBJECT", DataTypes.VARCHAR(2147483647))
>     .build())
>   .build()
> val table = tableEnv.from(td)
> tableEnv.createTemporaryView("temp_table", table)
> val newTable = tableEnv.sqlQuery("select temp_table.*,cast('2022-01-01' as 
> date) SRC_NO from temp_table")
> tableEnv.createTemporaryView("temp_table2", newTable)
> val newTable2 = tableEnv.sqlQuery("select * from (select 
> NAME,ROLLNO,row_number() over (partition by NAME ORDER BY SRC_NO) AS rownum  
> from temp_table2 a) where rownum <= 1")
> tableEnv.toChangelogStream(newTable2).print()
> env.execute()
>  {code}
>  
>  
> I am getting the below error if I run the above code.
> I have already provided an order by column.
> If I change the order by column to some other column, such as "SUBJECT", then 
> the job runs fine.
>  
>  
> {code:java}
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule FlinkLogicalOverAggregateConverter(in:NONE,out:LOGICAL), args 
> [rel#245:LogicalWindow.NONE.any.None: 
> 0.[NONE].[NONE](input=RelSubset#244,window#0=window(partition {0} rows 
> between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()]))]
>     at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
>     at 
> org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
>     at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
>     at 
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
>     at 
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
>     at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
>     at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
>     at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
>     at 
> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
>     at 
> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
>     at 
> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.scala:160)
>     at org.example.OverAggregateBug$.main(OverAggregateBug.scala:39)
>     at org.example.OverAggregateBug.main(OverAggregateBug.scala)
> Caused by: org.apache.flink.table.api.ValidationException: Over Agg: The 
> window rank function without order by. please re-check the over window 
> statement.
>     at 
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$2(FlinkLogicalOverAggregate.scala:95)
>     at 
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$2$adapted(FlinkLogicalOverAggregate.scala:92)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at 
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$1(FlinkLogicalOverAggregate.scala:92)
>     at 
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$1$adapted(FlinkLogicalOverAggregate.scala:89)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at 
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.convert(FlinkLogicalOverAggregate.scala:89)
>     at 
> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:167)
>     at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229)
>     ... 27 more {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to