[ https://issues.apache.org/jira/browse/FLINK-26461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17500627#comment-17500627 ]
lincoln lee commented on FLINK-26461: ------------------------------------- [~SpongebobZ] Thanks for reporting this! Could you offer more details about the test query? It may helps. > Throw CannotPlanException in TableFunction > ------------------------------------------ > > Key: FLINK-26461 > URL: https://issues.apache.org/jira/browse/FLINK-26461 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.14.3 > Reporter: Spongebob > Priority: Major > > I got an CannotPlanException when change the isDeterministic option to false. > For detail see this code: > {code:java} > //代码占位符 > public class GetDayTimeEtlSwitch extends TableFunction<Integer> { > private boolean status = false; > @Override > public boolean isDeterministic() { > return false; > } > public void eval() { > if (status) { > collect(1); > } else { > if (System.currentTimeMillis() > 1646298908000L) { > status = true; > collect(1); > } else { > collect(0); > } > } > } > } {code} > Exception stack... > {code:java} > //代码占位符 > Exception in thread "main" org.apache.flink.table.api.TableException: Cannot > generate a valid execution plan for the given query: > FlinkLogicalSink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1], > fields=[STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0]) > +- FlinkLogicalJoin(condition=[true], joinType=[left]) > :- FlinkLogicalCalc(select=[STUNAME, SUBJECT, SCORE, > PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME]) > : +- FlinkLogicalTableSourceScan(table=[[default_catalog, > default_database, DAILY_SCORE_CDC]], fields=[STUNAME, SUBJECT, SCORE]) > +- FlinkLogicalTableFunctionScan(invocation=[GET_SWITCH()], > rowType=[RecordType(INTEGER EXPR$0)])This exception indicates that the query > uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:76) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:81) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:742) > at TestSwitch.main(TestSwitch.java:33) > Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There > are not enough rules to produce a node with desired properties: > convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, > MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], > UpdateKindTraitDef=[NONE]. > Missing conversion is FlinkLogicalTableFunctionScan[convention: LOGICAL -> > STREAM_PHYSICAL, FlinkRelDistributionTraitDef: any -> single] > There is 1 empty subset: rel#189:RelSubset#7.STREAM_PHYSICAL.single.None: > 0.[NONE].[NONE], the relevant part of the original plan is as follows > 168:FlinkLogicalTableFunctionScan(invocation=[GET_SWITCH()], > rowType=[RecordType(INTEGER EXPR$0)])Root: > rel#181:RelSubset#9.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE] > Original rel: > FlinkLogicalSink(subset=[rel#140:RelSubset#4.LOGICAL.any.None: > 0.[NONE].[NONE]], > table=[default_catalog.default_database.Unregistered_Collect_Sink_1], > fields=[STUNAME, SUBJECT, SCORE, PROC_TIME, EXPR$0]): rowcount = 1.0E8, > cumulative cost = {1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, > id = 151 > FlinkLogicalJoin(subset=[rel#150:RelSubset#3.LOGICAL.any.None: > 0.[NONE].[NONE]], condition=[true], joinType=[left]): rowcount = 1.0E8, > cumulative cost = {1.0E8 rows, 1.00000001E8 cpu, 4.800000001E9 io, 0.0 > network, 0.0 memory}, id = 149 > FlinkLogicalCalc(subset=[rel#148:RelSubset#1.LOGICAL.any.None: > 0.[NONE].[NONE]], select=[STUNAME, SUBJECT, SCORE, PROCTIME() AS PROC_TIME]): > rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 > network, 0.0 memory}, id = 153 > > FlinkLogicalTableSourceScan(subset=[rel#143:RelSubset#0.LOGICAL.any.None: > 0.[NONE].[NONE]], table=[[default_catalog, default_database, > DAILY_SCORE_CDC]], fields=[STUNAME, SUBJECT, SCORE]): rowcount = 1.0E8, > cumulative cost = {1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}, > id = 142 > > FlinkLogicalTableFunctionScan(subset=[rel#146:RelSubset#2.LOGICAL.any.None: > 0.[NONE].[NONE]], invocation=[GET_SWITCH()], rowType=[RecordType(INTEGER > EXPR$0)]): rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 0.0 io, 0.0 > network, 0.0 memory}, id = 145Sets: > Set#5, type: RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) > SUBJECT, DECIMAL(10, 0) SCORE) > rel#172:RelSubset#5.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#142 > rel#142:FlinkLogicalTableSourceScan.LOGICAL.any.None: > 0.[NONE].[NONE](table=[default_catalog, default_database, > DAILY_SCORE_CDC],fields=STUNAME, SUBJECT, SCORE), rowcount=1.0E8, cumulative > cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory} > rel#184:RelSubset#5.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], > best=rel#183 > rel#183:StreamPhysicalTableSourceScan.STREAM_PHYSICAL.any.None: > 0.[NONE].[NONE](table=[default_catalog, default_database, > DAILY_SCORE_CDC],fields=STUNAME, SUBJECT, SCORE), rowcount=1.0E8, cumulative > cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory} > Set#6, type: RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) > SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME) > rel#174:RelSubset#6.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#173 > rel#173:FlinkLogicalCalc.LOGICAL.any.None: > 0.[NONE].[NONE](input=RelSubset#172,select=STUNAME, SUBJECT, SCORE, > PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME), rowcount=1.0E8, cumulative > cost={2.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory} > rel#186:RelSubset#6.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], > best=rel#185 > rel#185:StreamPhysicalCalc.STREAM_PHYSICAL.any.None: > 0.[NONE].[NONE](input=RelSubset#184,select=STUNAME, SUBJECT, SCORE, > PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME), rowcount=1.0E8, cumulative > cost={2.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory} > rel#188:AbstractConverter.STREAM_PHYSICAL.single.None: > 0.[NONE].[NONE](input=RelSubset#186,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=single,MiniBatchIntervalTraitDef=None: > 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=1.0E8, > cumulative cost={inf} > rel#194:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: > 0.[NONE].[NONE](input=RelSubset#186,distribution=single), rowcount=1.0E8, > cumulative cost={3.0E8 rows, 1.63E10 cpu, 3.6E9 io, 4.8E9 network, 0.0 memory} > rel#187:RelSubset#6.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE], > best=rel#194 > rel#188:AbstractConverter.STREAM_PHYSICAL.single.None: > 0.[NONE].[NONE](input=RelSubset#186,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=single,MiniBatchIntervalTraitDef=None: > 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=1.0E8, > cumulative cost={inf} > rel#194:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: > 0.[NONE].[NONE](input=RelSubset#186,distribution=single), rowcount=1.0E8, > cumulative cost={3.0E8 rows, 1.63E10 cpu, 3.6E9 io, 4.8E9 network, 0.0 memory} > Set#7, type: RecordType(INTEGER EXPR$0) > rel#175:RelSubset#7.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#168 > rel#168:FlinkLogicalTableFunctionScan.LOGICAL.any.None: > 0.[NONE].[NONE](invocation=GET_SWITCH(),rowType=RecordType(INTEGER EXPR$0)), > rowcount=1.0, cumulative cost={1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 > memory} > rel#189:RelSubset#7.STREAM_PHYSICAL.single.None: 0.[NONE].[NONE], > best=null > Set#8, type: RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) > SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME, > INTEGER EXPR$0) > rel#177:RelSubset#8.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#176 > rel#176:FlinkLogicalJoin.LOGICAL.any.None: > 0.[NONE].[NONE](left=RelSubset#174,right=RelSubset#175,condition=true,joinType=left), > rowcount=1.0E8, cumulative cost={3.00000001E8 rows, 3.00000002E8 cpu, > 8.400000001E9 io, 0.0 network, 0.0 memory} > rel#191:RelSubset#8.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null > rel#190:StreamPhysicalJoin.STREAM_PHYSICAL.any.None: > 0.[NONE].[NONE](left=RelSubset#187,right=RelSubset#189,joinType=LeftOuterJoin,where=true,select=STUNAME, > SUBJECT, SCORE, PROC_TIME, > EXPR$0,leftInputSpec=NoUniqueKey,rightInputSpec=NoUniqueKey), rowcount=1.0E8, > cumulative cost={inf} > Set#9, type: RecordType(VARCHAR(2147483647) STUNAME, VARCHAR(2147483647) > SUBJECT, DECIMAL(10, 0) SCORE, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME, > INTEGER EXPR$0) > rel#179:RelSubset#9.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#178 > rel#178:FlinkLogicalSink.LOGICAL.any.None: > 0.[NONE].[NONE](input=RelSubset#177,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=STUNAME, > SUBJECT, SCORE, PROC_TIME, EXPR$0), rowcount=1.0E8, cumulative > cost={4.00000001E8 rows, 4.00000002E8 cpu, 8.400000001E9 io, 0.0 network, 0.0 > memory} > rel#181:RelSubset#9.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null > rel#182:AbstractConverter.STREAM_PHYSICAL.any.None: > 0.[NONE].[NONE](input=RelSubset#179,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None: > 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=1.0E8, > cumulative cost={inf} > rel#192:StreamPhysicalSink.STREAM_PHYSICAL.any.None: > 0.[NONE].[NONE](input=RelSubset#191,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=STUNAME, > SUBJECT, SCORE, PROC_TIME, EXPR$0), rowcount=1.0E8, cumulative > cost={inf}Graphviz: > digraph G { > root [style=filled,label="Root"]; > subgraph cluster5{ > label="Set 5 RecordType(VARCHAR(2147483647) STUNAME, > VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE)"; > rel142 > [label="rel#142:FlinkLogicalTableSourceScan\ntable=[default_catalog, > default_database, DAILY_SCORE_CDC],fields=STUNAME, SUBJECT, > SCORE\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 > memory}",color=blue,shape=box] > rel183 > [label="rel#183:StreamPhysicalTableSourceScan\ntable=[default_catalog, > default_database, DAILY_SCORE_CDC],fields=STUNAME, SUBJECT, > SCORE\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 > memory}",color=blue,shape=box] > subset172 [label="rel#172:RelSubset#5.LOGICAL.any.None: > 0.[NONE].[NONE]"] > subset184 [label="rel#184:RelSubset#5.STREAM_PHYSICAL.any.None: > 0.[NONE].[NONE]"] > } > subgraph cluster6{ > label="Set 6 RecordType(VARCHAR(2147483647) STUNAME, > VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, > TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME)"; > rel173 > [label="rel#173:FlinkLogicalCalc\ninput=RelSubset#172,select=STUNAME, > SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME\nrows=1.0E8, > cost={2.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 > memory}",color=blue,shape=box] > rel185 > [label="rel#185:StreamPhysicalCalc\ninput=RelSubset#184,select=STUNAME, > SUBJECT, SCORE, PROCTIME_MATERIALIZE(PROCTIME()) AS PROC_TIME\nrows=1.0E8, > cost={2.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 > memory}",color=blue,shape=box] > rel188 > [label="rel#188:AbstractConverter\ninput=RelSubset#186,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=single,MiniBatchIntervalTraitDef=None: > 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]\nrows=1.0E8, > cost={inf}",shape=box] > rel194 > [label="rel#194:StreamPhysicalExchange\ninput=RelSubset#186,distribution=single\nrows=1.0E8, > cost={3.0E8 rows, 1.63E10 cpu, 3.6E9 io, 4.8E9 network, 0.0 > memory}",color=blue,shape=box] > subset174 [label="rel#174:RelSubset#6.LOGICAL.any.None: > 0.[NONE].[NONE]"] > subset186 [label="rel#186:RelSubset#6.STREAM_PHYSICAL.any.None: > 0.[NONE].[NONE]"] > subset187 [label="rel#187:RelSubset#6.STREAM_PHYSICAL.single.None: > 0.[NONE].[NONE]"] > subset186 -> subset187; } > subgraph cluster7{ > label="Set 7 RecordType(INTEGER EXPR$0)"; > rel168 > [label="rel#168:FlinkLogicalTableFunctionScan\ninvocation=GET_SWITCH(),rowType=RecordType(INTEGER > EXPR$0)\nrows=1.0, cost={1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 > memory}",color=blue,shape=box] > subset175 [label="rel#175:RelSubset#7.LOGICAL.any.None: > 0.[NONE].[NONE]"] > subset189 [label="rel#189:RelSubset#7.STREAM_PHYSICAL.single.None: > 0.[NONE].[NONE]",color=red] > } > subgraph cluster8{ > label="Set 8 RecordType(VARCHAR(2147483647) STUNAME, > VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, > TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME, INTEGER EXPR$0)"; > rel176 > [label="rel#176:FlinkLogicalJoin\nleft=RelSubset#174,right=RelSubset#175,condition=true,joinType=left\nrows=1.0E8, > cost={3.00000001E8 rows, 3.00000002E8 cpu, 8.400000001E9 io, 0.0 network, > 0.0 memory}",color=blue,shape=box] > rel190 > [label="rel#190:StreamPhysicalJoin\nleft=RelSubset#187,right=RelSubset#189,joinType=LeftOuterJoin,where=true,select=STUNAME, > SUBJECT, SCORE, PROC_TIME, > EXPR$0,leftInputSpec=NoUniqueKey,rightInputSpec=NoUniqueKey\nrows=1.0E8, > cost={inf}",shape=box] > subset177 [label="rel#177:RelSubset#8.LOGICAL.any.None: > 0.[NONE].[NONE]"] > subset191 [label="rel#191:RelSubset#8.STREAM_PHYSICAL.any.None: > 0.[NONE].[NONE]"] > } > subgraph cluster9{ > label="Set 9 RecordType(VARCHAR(2147483647) STUNAME, > VARCHAR(2147483647) SUBJECT, DECIMAL(10, 0) SCORE, > TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) PROC_TIME, INTEGER EXPR$0)"; > rel178 > [label="rel#178:FlinkLogicalSink\ninput=RelSubset#177,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=STUNAME, > SUBJECT, SCORE, PROC_TIME, EXPR$0\nrows=1.0E8, cost={4.00000001E8 rows, > 4.00000002E8 cpu, 8.400000001E9 io, 0.0 network, 0.0 > memory}",color=blue,shape=box] > rel182 > [label="rel#182:AbstractConverter\ninput=RelSubset#179,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None: > 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]\nrows=1.0E8, > cost={inf}",shape=box] > rel192 > [label="rel#192:StreamPhysicalSink\ninput=RelSubset#191,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=STUNAME, > SUBJECT, SCORE, PROC_TIME, EXPR$0\nrows=1.0E8, cost={inf}",shape=box] > subset179 [label="rel#179:RelSubset#9.LOGICAL.any.None: > 0.[NONE].[NONE]"] > subset181 [label="rel#181:RelSubset#9.STREAM_PHYSICAL.any.None: > 0.[NONE].[NONE]"] > } > root -> subset181; > subset172 -> rel142[color=blue]; > subset184 -> rel183[color=blue]; > subset174 -> rel173[color=blue]; rel173 -> subset172[color=blue]; > subset186 -> rel185[color=blue]; rel185 -> subset184[color=blue]; > subset187 -> rel188; rel188 -> subset186; > subset187 -> rel194[color=blue]; rel194 -> subset186[color=blue]; > subset175 -> rel168[color=blue]; > subset177 -> rel176[color=blue]; rel176 -> > subset174[color=blue,label="0"]; rel176 -> subset175[color=blue,label="1"]; > subset191 -> rel190; rel190 -> subset187[label="0"]; rel190 -> > subset189[label="1"]; > subset179 -> rel178[color=blue]; rel178 -> subset177[color=blue]; > subset181 -> rel182; rel182 -> subset179; > subset181 -> rel192; rel192 -> subset191; > } > at > org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:742) > at > org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:365) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:520) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69) > ... 23 more > {code} > -- This message was sent by Atlassian Jira (v8.20.1#820001)