[ https://issues.apache.org/jira/browse/FLINK-22954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17365238#comment-17365238 ]
Wenlong Lyu commented on FLINK-22954: ------------------------------------- [~jark][~godfreyhe], I think ConstantTableFunctionScanRule should not the right way to process the ConstantTableFunction, when the function is not deterministic, the function should be called by every record, How about adding a new rule to convert LogicalJoin(XX, TableFunctionScan) to LogicalCorrelate(XXX, TableFunctionScan) ? > Don't support consuming update and delete changes when use table function > that does not contain table field > ----------------------------------------------------------------------------------------------------------- > > Key: FLINK-22954 > URL: https://issues.apache.org/jira/browse/FLINK-22954 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.12.0 > Reporter: hehuiyuan > Priority: Major > > {code:java} > Exception in thread "main" org.apache.flink.table.api.TableException: Table > sink 'default_catalog.default_database.kafkaTableSink' doesn't support > consuming update and delete changes which is produced by node > Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], > leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])Exception in thread > "main" org.apache.flink.table.api.TableException: Table sink > 'default_catalog.default_database.kafkaTableSink' doesn't support consuming > update and delete changes which is produced by node > Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], > leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:382) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:265) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.Range.foreach(Range.scala:160) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:279) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.Range.foreach(Range.scala:160) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:125) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:50) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:39) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.immutable.Range.foreach(Range.scala:160) at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:100) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630) > at > org.apache.flink.table.api.internal.StatementSetImpl.explain(StatementSetImpl.java:92){code} > > UDF code: > {code:java} > @FunctionHint(output = @DataTypeHint("ROW<word INT>")) > public class GenerateSeriesUdf extends TableFunction<Row> { > public void eval(int from, int to) { > for (int i = from; i < to; i++) { > Row row = new Row(1); > row.setField(0, i); > collect(row); > } > } > @Override > public TypeInformation<Row> getResultType() { > return Types.ROW(Types.INT()); > } > } > {code} > > `JOIN` is ok, `LEFT JOIN` has error. > > > {code:java} > CREATE TABLE kafkaTableSource ( > name string, > age int, > sex string, > address string, > pt as PROCTIME() > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'hehuiyuan1', > 'scan.startup.mode' = 'latest-offset', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.client.id' = 'test-consumer-group', > 'properties.group.id' = 'test-consumer-group', > 'format' = 'json' > ); > CREATE TABLE kafkaTableSink ( > name string, > sname string, > sno string, > sclass string, > address string > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'hehuiyuan2', > 'properties.bootstrap.servers' = 'localhost:9092', > 'format' = 'json' > ); > INSERT INTO kafkaTableSink > SELECT name, name, name, name, word > FROM kafkaTableSource > LEFT JOIN LATERAL TABLE(GENERATE_SERIES(1,5)) AS T(word) ON TRUE; > {code} > > > {code:java} > // UDF is constant , two inut > optimize result: > Sink(table=[default_catalog.default_database.kafkaTableSink], fields=[name, > name0, name1, name2, word]) > +- Calc(select=[name, name AS name0, name AS name1, name AS name2, word]) > +- Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], > leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[single]) > : +- Calc(select=[name]) > : +- TableSourceScan(table=[[default_catalog, default_database, > kafkaTableSource]], fields=[name, age, sex, address]) > +- Exchange(distribution=[single]) > +- Correlate(invocation=[GENERATE_SERIES(1, 5)], > correlate=[table(GENERATE_SERIES(1,5))], select=[word], > rowType=[RecordType:peek_no_expand(INTEGER word)], joinType=[INNER]) > +- Values(tuples=[[{ }]]) > // UDF that use table field , one inut > optimize result: > Sink(table=[default_catalog.default_database.kafkaTableSink], fields=[name, > name0, name1, name2, province]) > +- Calc(select=[name, name AS name0, name AS name1, name AS name2, word AS > province]) > +- Correlate(invocation=[JSON_TUPLE($cor0.address, _UTF-16LE'province')], > correlate=[table(JSON_TUPLE($cor0.address,_UTF-16LE'province'))], > select=[name,age,sex,address,pt,word], > rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, > VARCHAR(2147483647) sex, VARCHAR(2147483647) address, TIME > ATTRIBUTE(PROCTIME) pt, VARCHAR(2147483647) word)], joinType=[LEFT]) > +- Calc(select=[name, age, sex, address, PROCTIME() AS pt]) > +- TableSourceScan(table=[[default_catalog, default_database, > kafkaTableSource]], fields=[name, age, sex, address]) > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)