[ 
https://issues.apache.org/jira/browse/FLINK-37691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz reassigned FLINK-37691:
----------------------------------------

    Assignee: Dawid Wysakowicz

> Can't consume changelog stream with upsert-kafka connector
> ----------------------------------------------------------
>
>                 Key: FLINK-37691
>                 URL: https://issues.apache.org/jira/browse/FLINK-37691
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 2.1.0
>            Reporter: Gunnar Morling
>            Assignee: Dawid Wysakowicz
>            Priority: Major
>
> I read from a Kafka topic with data change events using the Kafka SQL 
> connector using changelog semantics and write those events to another Kafka 
> topic using the Upsert Kafka SQL connector. This works as expected with Flink 
> 1.20 and 2.0.0 (the Debezium events on the source topic are emitted as flat 
> upsert-style records on the sink topic), but fails as of 2.1-SNAPSHOT:
> {code}
> Exception in thread "main" java.lang.UnsupportedOperationException: 
> Unsupported to visit node StreamPhysicalDropUpdateBefore. The node either 
> should not be pushed through the changelog normalize or is not supported yet.
>       at 
> org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver.visit(ChangelogNormalizeRequirementResolver.java:119)
>       at 
> org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver.visit(ChangelogNormalizeRequirementResolver.java:90)
>       at 
> org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver.isRequired(ChangelogNormalizeRequirementResolver.java:74)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:1164)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.$anonfun$visitChildren$4(FlinkChangelogModeInferenceProgram.scala:1208)
>       at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
>       at scala.collection.Iterator.foreach(Iterator.scala:943)
>       at scala.collection.Iterator.foreach$(Iterator.scala:943)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>       at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>       at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>       at scala.collection.TraversableLike.map(TraversableLike.scala:286)
>       at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:1207)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.$anonfun$visitSink$2(FlinkChangelogModeInferenceProgram.scala:1253)
>       at scala.collection.immutable.List.flatMap(List.scala:366)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.visitSink(FlinkChangelogModeInferenceProgram.scala:1253)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:1031)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.$anonfun$optimize$2(FlinkChangelogModeInferenceProgram.scala:103)
>       at scala.collection.immutable.List.flatMap(List.scala:366)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:101)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:47)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
>       at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>       at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>       at scala.collection.Iterator.foreach(Iterator.scala:943)
>       at scala.collection.Iterator.foreach$(Iterator.scala:943)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>       at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>       at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>       at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>       at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>       at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
>       at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>       at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>       at scala.collection.immutable.Range.foreach(Range.scala:158)
>       at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>       at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>       at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
>       at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>       at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>       at scala.collection.Iterator.foreach(Iterator.scala:943)
>       at scala.collection.Iterator.foreach$(Iterator.scala:943)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>       at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>       at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>       at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>       at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>       at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
>       at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:196)
>       at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeSinkBlocks(StreamCommonSubGraphBasedOptimizer.scala:83)
>       at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:118)
>       at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:395)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1373)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:951)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1181)
>       at 
> org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:59)
>       at 
> dev.morling.demos.cdcingest.KafkaChangelogToUpsertJob.main(KafkaChangelogToUpsertJob.java:59)
> {code}
> Here's my job definition:
> {code}
> public class KafkaChangelogToUpsertJob {
>       public static void main(String[] args) {
>               Configuration configuration = 
> Configuration.fromMap(Map.of("table.exec.source.cdc-events-duplicate", 
> "true"));
>               EnvironmentSettings settings = EnvironmentSettings
>                               .newInstance()
>                               .inStreamingMode()
>                               .withConfiguration(configuration)
>                               .build();
>               TableEnvironment tableEnv = TableEnvironment.create(settings);
>               tableEnv.executeSql("""
>                               CREATE TABLE authors_source (
>                                       id BIGINT,
>                                       first_name STRING,
>                                       last_name STRING,
>                                       biography STRING,
>                                       registered BIGINT,
>                                       PRIMARY KEY (id) NOT ENFORCED
>                               ) WITH (
>                                 'connector' = 'kafka',
>                                 'topic' = 'dbserver1.inventory.authors',
>                                 'properties.bootstrap.servers' = 
> 'localhost:9092',
>                                 'scan.startup.mode' = 'earliest-offset',
>                                 'value.format' = 'debezium-json'
>                               );
>                               """);
>               tableEnv.executeSql("""
>                               CREATE TABLE authors_sink (
>                                       id BIGINT,
>                                       first_name STRING,
>                                       last_name STRING,
>                                       biography STRING,
>                                       registered BIGINT,
>                                       PRIMARY KEY (id) NOT ENFORCED
>                               ) WITH (
>                                 'connector' = 'upsert-kafka',
>                                 'topic' = 'authors_processed',
>                                 'properties.bootstrap.servers' = 
> 'localhost:9092',
>                                 'key.format' = 'json',
>                                 'value.format' = 'json'
>                               );
>                               """);
>               Table authors = tableEnv.sqlQuery("SELECT id, first_name, 
> last_name, biography, registered FROM authors_source");
>               authors.insertInto("authors_sink").execute();
>               authors.execute().print();
>       }
> {code}
> This might be a regression caused by FLINK-37475.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to