[ https://issues.apache.org/jira/browse/FLINK-37691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dawid Wysakowicz reassigned FLINK-37691: ---------------------------------------- Assignee: Dawid Wysakowicz > Can't consume changelog stream with upsert-kafka connector > ---------------------------------------------------------- > > Key: FLINK-37691 > URL: https://issues.apache.org/jira/browse/FLINK-37691 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 2.1.0 > Reporter: Gunnar Morling > Assignee: Dawid Wysakowicz > Priority: Major > > I read from a Kafka topic with data change events using the Kafka SQL > connector using changelog semantics and write those events to another Kafka > topic using the Upsert Kafka SQL connector. This works as expected with Flink > 1.20 and 2.0.0 (the Debezium events on the source topic are emitted as flat > upsert-style records on the sink topic), but fails as of 2.1-SNAPSHOT: > {code} > Exception in thread "main" java.lang.UnsupportedOperationException: > Unsupported to visit node StreamPhysicalDropUpdateBefore. The node either > should not be pushed through the changelog normalize or is not supported yet. > at > org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver.visit(ChangelogNormalizeRequirementResolver.java:119) > at > org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver.visit(ChangelogNormalizeRequirementResolver.java:90) > at > org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver.isRequired(ChangelogNormalizeRequirementResolver.java:74) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:1164) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.$anonfun$visitChildren$4(FlinkChangelogModeInferenceProgram.scala:1208) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableLike.map(TraversableLike.scala:286) > at scala.collection.TraversableLike.map$(TraversableLike.scala:279) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:1207) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.$anonfun$visitSink$2(FlinkChangelogModeInferenceProgram.scala:1253) > at scala.collection.immutable.List.flatMap(List.scala:366) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.visitSink(FlinkChangelogModeInferenceProgram.scala:1253) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:1031) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.$anonfun$optimize$2(FlinkChangelogModeInferenceProgram.scala:103) > at scala.collection.immutable.List.flatMap(List.scala:366) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:101) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:47) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) > at scala.collection.immutable.Range.foreach(Range.scala:158) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:196) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeSinkBlocks(StreamCommonSubGraphBasedOptimizer.scala:83) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:118) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:395) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1373) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:951) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1181) > at > org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:59) > at > dev.morling.demos.cdcingest.KafkaChangelogToUpsertJob.main(KafkaChangelogToUpsertJob.java:59) > {code} > Here's my job definition: > {code} > public class KafkaChangelogToUpsertJob { > public static void main(String[] args) { > Configuration configuration = > Configuration.fromMap(Map.of("table.exec.source.cdc-events-duplicate", > "true")); > EnvironmentSettings settings = EnvironmentSettings > .newInstance() > .inStreamingMode() > .withConfiguration(configuration) > .build(); > TableEnvironment tableEnv = TableEnvironment.create(settings); > tableEnv.executeSql(""" > CREATE TABLE authors_source ( > id BIGINT, > first_name STRING, > last_name STRING, > biography STRING, > registered BIGINT, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'dbserver1.inventory.authors', > 'properties.bootstrap.servers' = > 'localhost:9092', > 'scan.startup.mode' = 'earliest-offset', > 'value.format' = 'debezium-json' > ); > """); > tableEnv.executeSql(""" > CREATE TABLE authors_sink ( > id BIGINT, > first_name STRING, > last_name STRING, > biography STRING, > registered BIGINT, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'authors_processed', > 'properties.bootstrap.servers' = > 'localhost:9092', > 'key.format' = 'json', > 'value.format' = 'json' > ); > """); > Table authors = tableEnv.sqlQuery("SELECT id, first_name, > last_name, biography, registered FROM authors_source"); > authors.insertInto("authors_sink").execute(); > authors.execute().print(); > } > {code} > This might be a regression caused by FLINK-37475. -- This message was sent by Atlassian Jira (v8.20.10#820010)