Gunnar Morling created FLINK-37691: -------------------------------------- Summary: Can't consume changelog stream with upsert-kafka connector Key: FLINK-37691 URL: https://issues.apache.org/jira/browse/FLINK-37691 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 2.0.0 Reporter: Gunnar Morling
I read from a Kafka topic with data change events using the Kafka SQL connector using changelog semantics and write those events to another Kafka topic using the Upsert Kafka SQL connector. This works as expected with Flink 1.20 and 2.0.0 (the Debezium events on the source topic are emitted as flat upsert-style records on the sink topic), but fails as of 2.1-SNAPSHOT: ``` Exception in thread "main" java.lang.UnsupportedOperationException: Unsupported to visit node StreamPhysicalDropUpdateBefore. The node either should not be pushed through the changelog normalize or is not supported yet. at org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver.visit(ChangelogNormalizeRequirementResolver.java:119) at org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver.visit(ChangelogNormalizeRequirementResolver.java:90) at org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver.isRequired(ChangelogNormalizeRequirementResolver.java:74) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:1164) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.$anonfun$visitChildren$4(FlinkChangelogModeInferenceProgram.scala:1208) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:1207) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.$anonfun$visitSink$2(FlinkChangelogModeInferenceProgram.scala:1253) at scala.collection.immutable.List.flatMap(List.scala:366) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.visitSink(FlinkChangelogModeInferenceProgram.scala:1253) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:1031) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.$anonfun$optimize$2(FlinkChangelogModeInferenceProgram.scala:103) at scala.collection.immutable.List.flatMap(List.scala:366) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:101) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:47) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.immutable.Range.foreach(Range.scala:158) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:196) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeSinkBlocks(StreamCommonSubGraphBasedOptimizer.scala:83) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:118) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:395) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1373) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:951) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1181) at org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:59) at dev.morling.demos.cdcingest.KafkaChangelogToUpsertJob.main(KafkaChangelogToUpsertJob.java:59) ``` Here's my job definition: ``` public class KafkaChangelogToUpsertJob { public static void main(String[] args) { Configuration configuration = Configuration.fromMap(Map.of("table.exec.source.cdc-events-duplicate", "true")); EnvironmentSettings settings = EnvironmentSettings .newInstance() .inStreamingMode() .withConfiguration(configuration) .build(); TableEnvironment tableEnv = TableEnvironment.create(settings); tableEnv.executeSql(""" CREATE TABLE authors_source ( id BIGINT, first_name STRING, last_name STRING, biography STRING, registered BIGINT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'dbserver1.inventory.authors', 'properties.bootstrap.servers' = 'localhost:9092', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'debezium-json' ); """); tableEnv.executeSql(""" CREATE TABLE authors_sink ( id BIGINT, first_name STRING, last_name STRING, biography STRING, registered BIGINT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'authors_processed', 'properties.bootstrap.servers' = 'localhost:9092', 'key.format' = 'json', 'value.format' = 'json' ); """); Table authors = tableEnv.sqlQuery("SELECT id, first_name, last_name, biography, registered FROM authors_source"); authors.insertInto("authors_sink").execute(); authors.execute().print(); } } ``` This might be a regression caused by FLINK-37475. -- This message was sent by Atlassian Jira (v8.20.10#820010)