Hi Folks: I am using 'kafka' connector and joining with data from jdbc source (using connector). I am using Flink v 1.14.3. If I do a left outer join between kafka source and jdbc source, and try to save it to another kafka sink using connectors api, I get the following exception: Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.mymocktable' doesn't support consuming update and delete changes which is produced by node Join(joinType=[LeftOuterJoin], where=[($f6 = fieldxxx)], select=[a, b, c, d, e, f, $f6, h, i, k, l, fieldxxx], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:396) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:272) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:353) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:342) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:341)
Note - i have renamed the columns to simplify the query above. If I change the join to inner join - the query works but I don't get the rows that don't have a match in jdbc source. At the bottom of the issue (https://issues.apache.org/jira/browse/FLINK-22954?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=17435312#comment-17435312) , there is a similar question in the comments but I could not find any answer. BTW - the upsert-kafka connector works with the left outer join but I would like to understand if the left join to work with kafka sink connector. Thanks