chuncheng wu created FLINK-26051: ------------------------------------ Summary: row_number =1 and Subsequent SQL has "case when" and "where" statement : The window can only be ordered in ASCENDING mode Key: FLINK-26051 URL: https://issues.apache.org/jira/browse/FLINK-26051 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.12.2 Reporter: chuncheng wu
hello, i have 2 sqls. One sql has rn=1 and the Subsequent SQL has "case when" and "where".it results the exception as follow. It happen in the occasion when logical plan turn into physical plan : {quote}_org.apache.flink.table.api.TableException: The window can only be ordered in ASCENDING mode._ _at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)_ _at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)_ _at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)_ _at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)_ _at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)_ _at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)_ _at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)_ _at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)_ _at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)_ _at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)_ _at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)_ _at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)_ _at scala.collection.Iterator$class.foreach(Iterator.scala:891)_ _at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)_ _at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)_ _at scala.collection.AbstractIterable.foreach(Iterable.scala:54)_ _at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)_ _at scala.collection.AbstractTraversable.map(Traversable.scala:104)_ _at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)_ _at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)_ _at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)_ _at org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)_ _at org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)_ _at com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)_ _at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)_ _at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)_ _at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)_ _at java.base/java.lang.reflect.Method.invoke(Method.java:568)_ _at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)_ _at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)_ _at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)_ _at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)_ _at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)_ _at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)_ _at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)_ _at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)_ _at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)_ _at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)_ _at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)_ _at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)_ _at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)_ _at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)_ _at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)_ _at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)_ _at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)_ _at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)_ _at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)_ _at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)_ _at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)_ _at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)_ _at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)_ _at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)_ _at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)_ _at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)_ _at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)_ _at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)_ _at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)_ _at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)_ _at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)_ _at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)_ _at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)_ _at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)_ _at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)_ _at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)_ _at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)_ _at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)_ _at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)_ _at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)_ _at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)_ _at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)_ _at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)_ _at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)_ _at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)_ _at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)_ _at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)_ _at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)_ _at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)_ _at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)_ _at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)_ _at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)_ _at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)_ _at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)_ _at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)_ _at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)_ _at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)_ _at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)_ _at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)_ _at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)_ _at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)_ _at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)_ _at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)_ {quote} example code : {quote}import org.apache.flink.api.java.tuple.Tuple12; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.junit.jupiter.api.Test; import java.sql.Timestamp; import static org.apache.flink.table.api.Expressions.$; public class BugTest { @Test public void testRowNumber() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings mySetting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, mySetting); env.setParallelism(1); DataStream<Tuple12<String, Integer, Integer, String, Integer, Integer, Integer, Integer, Integer, String, String, String>> oriStream = env.addSource(new CustomSourceRowNumber()); Table testTable = tableEnv.fromDataStream(oriStream, $("biz_bill_no"), $("task_type"), $("task_mode"), $("parent_task_no"), $("total_stage_num"), $("current_stage_index"), $("use_pre_task_owner"), $("poi_type"), $("biz_origin_bill_type"), $("sowing_task_no"), $("dt"), $("sowing_task_detail_id")); tableEnv.createTemporaryView("wosOutSowingTaskDetail", testTable); Table wosOutSowingTaskDetailLatest = tableEnv.sqlQuery( "SELECT `biz_bill_no`\n" + ",task_type\n" + ",task_mode\n" + ",parent_task_no\n"+ ",total_stage_num\n"+ ",current_stage_index\n"+ ",use_pre_task_owner\n"+ ",poi_type\n"+ ",biz_origin_bill_type\n"+ ",sowing_task_no\n"+ " FROM (\n" + " SELECT *,\n" + " ROW_NUMBER() OVER(PARTITION BY dt,sowing_task_detail_id ORDER BY task_type desc) AS rn\n" + " FROM wosOutSowingTaskDetail\n" + " ) tmp\n" + " WHERE rn = 1"); System.out.println("SQL 0 Plan: "); System.out.println(wosOutSowingTaskDetailLatest.explain(ExplainDetail.ESTIMATED_COST)); System.out.println(wosOutSowingTaskDetailLatest.explain(ExplainDetail.CHANGELOG_MODE)); tableEnv.createTemporaryView("wosOutSowingTaskDetailLatest", wosOutSowingTaskDetailLatest); DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(wosOutSowingTaskDetailLatest, Row.class); // retractStream.print(); Table resultTable = tableEnv.sqlQuery("SELECT\n" + "biz_bill_no\n" + ", CASE WHEN task_type = 21 AND task_mode = 51 THEN parent_task_no\n" + " WHEN task_type = 21 AND task_mode = 40 AND total_stage_num >= 2 AND current_stage_index >= 2 AND use_pre_task_owner = 1 THEN parent_task_no\n" + " ELSE sowing_task_no END AS parent_task_no_cw\n" + ",parent_task_no, sowing_task_no, task_type, task_mode, total_stage_num, current_stage_index,use_pre_task_owner \n" + "FROM wosOutSowingTaskDetailLatest\n" + "WHERE task_type = 21\n" + "AND task_mode IN (51, 40)\n" + "AND poi_type = 2\n"); System.out.println("SQL 1 Plan: "); System.out.println(resultTable.explain(ExplainDetail.ESTIMATED_COST)); System.out.println(resultTable.explain(ExplainDetail.CHANGELOG_MODE)); DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(resultTable, Row.class); // resultStream.print(); env.execute(); } static class CustomSourceRowNumber implements SourceFunction<Tuple12<String, Integer, Integer, String, Integer, Integer, Integer, Integer, Integer, String, String, String>> { private boolean isRuning = true; @Override public void run(SourceContext<Tuple12<String, Integer, Integer, String, Integer, Integer, Integer, Integer, Integer, String, String, String>> sourceContext) throws Exception { while (isRuning) { sourceContext.collect(Tuple12.of("xxx",21,51,"yyy",1,1,0,2,110,"zzz","aaa","bbb")); sourceContext.collect(Tuple12.of("xxx",21,40,"yyy",2,2,1,2,110,"zzz","aaa","bbb")); Thread.sleep(Integer.MAX_VALUE); } } @Override public void cancel() { isRuning = false; } } } {quote} the logical plan in System.out.println is {quote}SQL 0 Plan: == Abstract Syntax Tree == LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], sowing_task_no=[$9]) +- LogicalFilter(condition=[=($12, 1)]) +- LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], sowing_task_no=[$9], dt=[$10], sowing_task_detail_id=[$11], rn=[ROW_NUMBER() OVER (PARTITION BY $10, $11 ORDER BY $1 DESC NULLS LAST)]) +- LogicalTableScan(table=[[default_catalog, default_database, wosOutSowingTaskDetail]]) == Optimized Logical Plan == Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no]): rowcount = 1.0E7, cumulative cost = \{3.1E8 rows, 1.78E10 cpu, 8.8E9 io, 8.8E9 network, 0.0 memory} +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id]): rowcount = 1.0E7, cumulative cost = \{3.0E8 rows, 1.78E10 cpu, 8.8E9 io, 8.8E9 network, 0.0 memory} +- Exchange(distribution=[hash[dt, sowing_task_detail_id]]): rowcount = 1.0E8, cumulative cost = \{2.0E8 rows, 1.77E10 cpu, 8.8E9 io, 8.8E9 network, 0.0 memory} +- DataStreamScan(table=[[default_catalog, default_database, wosOutSowingTaskDetail]], fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id]): rowcount = 1.0E8, cumulative cost = \{1.0E8 rows, 1.0E8 cpu, 8.8E9 io, 0.0 network, 0.0 memory} == Physical Execution Plan == Stage 1 : Data Source content : Source: Custom Source Stage 2 : Operator content : SourceConversion(table=[default_catalog.default_database.wosOutSowingTaskDetail], fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id]) ship_strategy : FORWARD Stage 4 : Operator content : Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id]) ship_strategy : HASH Stage 5 : Operator content : Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no]) ship_strategy : FORWARD == Abstract Syntax Tree == LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], sowing_task_no=[$9]) +- LogicalFilter(condition=[=($12, 1)]) +- LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], sowing_task_no=[$9], dt=[$10], sowing_task_detail_id=[$11], rn=[ROW_NUMBER() OVER (PARTITION BY $10, $11 ORDER BY $1 DESC NULLS LAST)]) +- LogicalTableScan(table=[[default_catalog, default_database, wosOutSowingTaskDetail]]) == Optimized Logical Plan == Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no], changelogMode=[I,UA,D]) +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], changelogMode=[I,UA,D]) +- Exchange(distribution=[hash[dt, sowing_task_detail_id]], changelogMode=[I]) +- DataStreamScan(table=[[default_catalog, default_database, wosOutSowingTaskDetail]], fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], changelogMode=[I]) == Physical Execution Plan == Stage 1 : Data Source content : Source: Custom Source Stage 7 : Operator content : SourceConversion(table=[default_catalog.default_database.wosOutSowingTaskDetail], fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id]) ship_strategy : FORWARD Stage 9 : Operator content : Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id]) ship_strategy : HASH Stage 10 : Operator content : Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no]) ship_strategy : FORWARD SQL 1 Plan: org.apache.flink.table.api.TableException: The window can only be ordered in ASCENDING mode. {quote} -- This message was sent by Atlassian Jira (v8.20.1#820001)