[ https://issues.apache.org/jira/browse/FLINK-28986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580082#comment-17580082 ]
godfrey he commented on FLINK-28986: ------------------------------------ [~qingyue] Thanks for reporting this, assign to you > UNNEST function with nested fails to generate plan > -------------------------------------------------- > > Key: FLINK-28986 > URL: https://issues.apache.org/jira/browse/FLINK-28986 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.16.0 > Reporter: Jane Chan > Assignee: Jane Chan > Priority: Major > Attachments: image-2022-08-16-14-36-07-061.png > > > h3. How to reproduce > add the following case to TableEnvironmentITCase > {code:scala} > @Test > def debug(): Unit = { > tEnv.executeSql( > s""" > |CREATE TEMPORARY TABLE source_kafka_wip_his_all ( > | GUID varchar, > | OPERATION varchar, > | PRODUCTID varchar, > | LOTNO varchar, > | SERIALNO varchar, > | QUERYSERIALNO varchar, > | SERIALNO1 varchar, > | SERIALNO2 varchar, > | WIPORDERNO varchar, > | WIPORDERTYPE varchar, > | VIRTUALLOT varchar, > | PREOPERATION varchar, > | NORMALPREOPERATION varchar, > | PROCESSID varchar, > | EQUIPMENT varchar, > | INBOUNDDATE varchar, > | OUTBOUNDDATE varchar, > | REWORK varchar, > | REWORKPROCESSID varchar, > | CONTAINER varchar, > | WIPCONTENTCLASSID varchar, > | STATUSCODE varchar, > | WIPSTATUS varchar, > | TESTPROCESSID varchar, > | TESTORDERTYPE varchar, > | TESTORDER varchar, > | TEST varchar, > | SORTINGPROCESSID varchar, > | SORTINGORDERTYPE varchar, > | SORTINGORDER varchar, > | SORTING varchar, > | MINO varchar, > | GROUPCODE varchar, > | HIGHLOWGROUP varchar, > | PRODUCTNO varchar, > | FACILITY varchar, > | WIPLINE varchar, > | CHILDEQUCODE varchar, > | STATION varchar, > | QTY varchar, > | PASS_FLAG varchar, > | DEFECTCODELIST varchar, > | ISFIRST varchar, > | PARALIST ARRAY<ROW(GUID string,WIP_HIS_GUID string,QUERYSERIALNO > string,OPERATION string,REWORKPROCESSID string,CHARACTERISTIC > string,CHARACTERISTICREVISION string,CHARACTERISTICTYPE > string,CHARACTERISTICCLASS string,UPPERCONTROLLIMIT string,TARGETVALUE > string,LOWERCONTROLLIMIT string,TESTVALUE string,TESTATTRIBUTE > string,TESTINGSTARTDATE string,TESTFINISHDATE string,UOMCODE > string,DEFECTCODE string,SPECPARAMID string,STATION string,GP_TIME > string,REFERENCEID string,LASTUPDATEON string,LASTUPDATEDBY string,CREATEDON > string,CREATEDBY string,ACTIVE string,LASTDELETEON string,LASTDELETEDBY > string,LASTREACTIVATEON string,LASTREACTIVATEDBY string,ARCHIVEID > string,LASTARCHIVEON string,LASTARCHIVEDBY string,LASTRESTOREON > string,LASTRESTOREDBY string,ROWVERSIONSTAMP string)>, > | REFERENCEID varchar, > | LASTUPDATEON varchar, > | LASTUPDATEDBY varchar, > | CREATEDON varchar, > | CREATEDBY varchar, > | ACTIVE varchar, > | LASTDELETEON varchar, > | LASTDELETEDBY varchar, > | LASTREACTIVATEON varchar, > | LASTREACTIVATEDBY varchar, > | ARCHIVEID varchar, > | LASTARCHIVEON varchar, > | LASTARCHIVEDBY varchar, > | LASTRESTOREON varchar, > | LASTRESTOREDBY varchar, > | ROWVERSIONSTAMP varchar, > | proctime as PROCTIME() > | ) with ( > | 'connector' = 'datagen' > |) > |""".stripMargin) > tEnv.executeSql( > s""" > |create TEMPORARY view transform_main_data as > |select > | r.GUID as wip_his_guid, > | r.EQUIPMENT as equipment, > | r.WIPLINE as wipline, > | r.STATION as station, > | cast(r.PROCESSID as decimal) as processid, > | r.PRODUCTNO as productno, > | t.TESTFINISHDATE as testfinishdate, > | t.OPERATION as operation, > | t.CHARACTERISTIC as characteristic, > | t.LOWERCONTROLLIMIT as lowercontrollimit, > | t.UPPERCONTROLLIMIT as uppercontrollimit, > | t.TARGETVALUE as targetvalue, > | t.DEFECTCODE as defectcode, > | t.TESTVALUE as testvalue, > | t.CHARACTERISTICTYPE as characteristictype, > | proctime > | from > | (select > | GUID, > | EQUIPMENT, > | WIPLINE, > | STATION, > | PROCESSID, > | PRODUCTNO, > | PARALIST, > | proctime > | FROM source_kafka_wip_his_all) r > | cross join > | unnest(PARALIST) as t > (GUID,WIP_HIS_GUID,QUERYSERIALNO,OPERATION,REWORKPROCESSID,CHARACTERISTIC,CHARACTERISTICREVISION,CHARACTERISTICTYPE,CHARACTERISTICCLASS,UPPERCONTROLLIMIT,TARGETVALUE,LOWERCONTROLLIMIT,TESTVALUE,TESTATTRIBUTE,TESTINGSTARTDATE,TESTFINISHDATE,UOMCODE,DEFECTCODE,SPECPARAMID,STATION,GP_TIME,REFERENCEID,LASTUPDATEON,LASTUPDATEDBY,CREATEDON,CREATEDBY,ACTIVE,LASTDELETEON,LASTDELETEDBY,LASTREACTIVATEON,LASTREACTIVATEDBY,ARCHIVEID,LASTARCHIVEON,LASTARCHIVEDBY,LASTRESTOREON,LASTRESTOREDBY,ROWVERSIONSTAMP) > | where t.CHARACTERISTICTYPE = '2' > |""".stripMargin) > tEnv.executeSql( > s""" > |explain plan for > |select * from transform_main_data > |where operation not in > ('G1208','G1209','G1211','G1213','G1206','G1207','G1214','G1215','G1282','G1292','G1216') > |""".stripMargin).print() > } {code} > Stacktrace > {code:java} > org.apache.flink.table.api.TableException: Cannot generate a valid execution > plan for the given query: LogicalProject(inputs=[0..3], > exprs=[[CAST($4):DECIMAL(10, 0), $5, $23, $11, $13, $19, $17, $18, $25, $20, > $15, $7]]) > +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], > requiredColumns=[{6}]) > :- LogicalProject(inputs=[0], exprs=[[$14, $36, $38, $13, $34, $43, > PROCTIME()]]) > : +- LogicalTableScan(table=[[default_catalog, default_database, > source_kafka_wip_his_all]]) > +- LogicalFilter(condition=[AND(SEARCH($7, > Sarg[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET > "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), SEARCH($3, > Sarg[(-∞.._UTF-16LE'G1206'), (_UTF-16LE'G1206'.._UTF-16LE'G1207'), > (_UTF-16LE'G1207'.._UTF-16LE'G1208'), (_UTF-16LE'G1208'.._UTF-16LE'G1209'), > (_UTF-16LE'G1209'.._UTF-16LE'G1211'), (_UTF-16LE'G1211'.._UTF-16LE'G1213'), > (_UTF-16LE'G1213'.._UTF-16LE'G1214'), (_UTF-16LE'G1214'.._UTF-16LE'G1215'), > (_UTF-16LE'G1215'.._UTF-16LE'G1216'), (_UTF-16LE'G1216'.._UTF-16LE'G1282'), > (_UTF-16LE'G1282'.._UTF-16LE'G1292'), (_UTF-16LE'G1292'..+∞)]:CHAR(5) > CHARACTER SET "UTF-16LE"))]) > +- Uncollect > +- LogicalProject(exprs=[[$cor1.PARALIST]]) > +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 > }]])This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > org.apache.flink.table.api.TableException: Cannot generate a valid execution > plan for the given query: > LogicalProject(inputs=[0..3], exprs=[[CAST($4):DECIMAL(10, 0), $5, $23, $11, > $13, $19, $17, $18, $25, $20, $15, $7]]) > +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], > requiredColumns=[{6}]) > :- LogicalProject(inputs=[0], exprs=[[$14, $36, $38, $13, $34, $43, > PROCTIME()]]) > : +- LogicalTableScan(table=[[default_catalog, default_database, > source_kafka_wip_his_all]]) > +- LogicalFilter(condition=[AND(SEARCH($7, > Sarg[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET > "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), SEARCH($3, > Sarg[(-∞.._UTF-16LE'G1206'), (_UTF-16LE'G1206'.._UTF-16LE'G1207'), > (_UTF-16LE'G1207'.._UTF-16LE'G1208'), (_UTF-16LE'G1208'.._UTF-16LE'G1209'), > (_UTF-16LE'G1209'.._UTF-16LE'G1211'), (_UTF-16LE'G1211'.._UTF-16LE'G1213'), > (_UTF-16LE'G1213'.._UTF-16LE'G1214'), (_UTF-16LE'G1214'.._UTF-16LE'G1215'), > (_UTF-16LE'G1215'.._UTF-16LE'G1216'), (_UTF-16LE'G1216'.._UTF-16LE'G1282'), > (_UTF-16LE'G1282'.._UTF-16LE'G1292'), (_UTF-16LE'G1292'..+∞)]:CHAR(5) > CHARACTER SET "UTF-16LE"))]) > +- Uncollect > +- LogicalProject(exprs=[[$cor1.PARALIST]]) > +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 > }]]) > This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:70) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315) > at > org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:527) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:96) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:51) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:695) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1356) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:733) > at > org.apache.flink.table.api.TableEnvironmentITCase.debug(TableEnvironmentITCase.scala:695) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runners.Suite.runChild(Suite.java:128) > at org.junit.runners.Suite.runChild(Suite.java:27) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) > Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There > are not enough rules to produce a node with desired properties: > convention=LOGICAL, FlinkRelDistributionTraitDef=any, > MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], > UpdateKindTraitDef=[NONE]. > Missing conversion is Uncollect[convention: NONE -> LOGICAL] > There is 1 empty subset: rel#485:RelSubset#4.LOGICAL.any.None: > 0.[NONE].[NONE], the relevant part of the original plan is as follows > 460:Uncollect > 458:LogicalProject(subset=[rel#459:RelSubset#3.NONE.any.None: > 0.[NONE].[NONE]], PARALIST=[$cor1.PARALIST]) > 17:LogicalValues(subset=[rel#457:RelSubset#2.NONE.any.None: > 0.[NONE].[NONE]], tuples=[[{ 0 }]]){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)