Hi Folks:
I am using 'kafka' connector and joining with data from jdbc source (using 
connector).   I am using Flink v 1.14.3.  If I do a left outer join between 
kafka source and jdbc source, and try to save it to another kafka sink using 
connectors api, I get the following exception:
Exception in thread "main" org.apache.flink.table.api.TableException: Table 
sink 'default_catalog.default_database.mymocktable' doesn't support consuming 
update and delete changes which is produced by node 
Join(joinType=[LeftOuterJoin], where=[($f6 = fieldxxx)], select=[a, b, c, d, e, 
f, $f6, h, i, k, l, fieldxxx], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:396)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:272)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:353)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:342)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:341)

Note - i have renamed the columns to simplify the query above.
If I change the join to inner join - the query works but I don't get the rows 
that don't have a match in jdbc source.
At the bottom of the issue 
(https://issues.apache.org/jira/browse/FLINK-22954?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=17435312#comment-17435312)
 , there is a similar question in the comments but I could not find any answer.

BTW - the upsert-kafka connector works with the left outer join but I would 
like to understand if the left join to work with kafka sink connector. 
Thanks

Reply via email to