Gunnar Morling created FLINK-37691:
--------------------------------------

             Summary: Can't consume changelog stream with upsert-kafka connector
                 Key: FLINK-37691
                 URL: https://issues.apache.org/jira/browse/FLINK-37691
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 2.0.0
            Reporter: Gunnar Morling


I read from a Kafka topic with data change events using the Kafka SQL connector 
using changelog semantics and write those events to another Kafka topic using 
the Upsert Kafka SQL connector. This works as expected with Flink 1.20 and 
2.0.0 (the Debezium events on the source topic are emitted as flat upsert-style 
records on the sink topic), but fails as of 2.1-SNAPSHOT:

```
Exception in thread "main" java.lang.UnsupportedOperationException: Unsupported 
to visit node StreamPhysicalDropUpdateBefore. The node either should not be 
pushed through the changelog normalize or is not supported yet.
        at 
org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver.visit(ChangelogNormalizeRequirementResolver.java:119)
        at 
org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver.visit(ChangelogNormalizeRequirementResolver.java:90)
        at 
org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver.isRequired(ChangelogNormalizeRequirementResolver.java:74)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:1164)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.$anonfun$visitChildren$4(FlinkChangelogModeInferenceProgram.scala:1208)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:1207)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.$anonfun$visitSink$2(FlinkChangelogModeInferenceProgram.scala:1253)
        at scala.collection.immutable.List.flatMap(List.scala:366)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.visitSink(FlinkChangelogModeInferenceProgram.scala:1253)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyDeleteKindTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:1031)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.$anonfun$optimize$2(FlinkChangelogModeInferenceProgram.scala:103)
        at scala.collection.immutable.List.flatMap(List.scala:366)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:101)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:47)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
        at 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
        at 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
        at 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
        at 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
        at scala.collection.immutable.Range.foreach(Range.scala:158)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
        at 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
        at 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:196)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeSinkBlocks(StreamCommonSubGraphBasedOptimizer.scala:83)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:118)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:395)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1373)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:951)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1181)
        at 
org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:59)
        at 
dev.morling.demos.cdcingest.KafkaChangelogToUpsertJob.main(KafkaChangelogToUpsertJob.java:59)
```

Here's my job definition:

```
public class KafkaChangelogToUpsertJob {

        public static void main(String[] args) {
                Configuration configuration = 
Configuration.fromMap(Map.of("table.exec.source.cdc-events-duplicate", "true"));

                EnvironmentSettings settings = EnvironmentSettings
                                .newInstance()
                                .inStreamingMode()
                                .withConfiguration(configuration)
                                .build();

                TableEnvironment tableEnv = TableEnvironment.create(settings);

                tableEnv.executeSql("""
                                CREATE TABLE authors_source (
                                        id BIGINT,
                                        first_name STRING,
                                        last_name STRING,
                                        biography STRING,
                                        registered BIGINT,
                                        PRIMARY KEY (id) NOT ENFORCED
                                ) WITH (
                                  'connector' = 'kafka',
                                  'topic' = 'dbserver1.inventory.authors',
                                  'properties.bootstrap.servers' = 
'localhost:9092',
                                  'scan.startup.mode' = 'earliest-offset',
                                  'value.format' = 'debezium-json'

                                );
                                """);

                tableEnv.executeSql("""
                                CREATE TABLE authors_sink (
                                        id BIGINT,
                                        first_name STRING,
                                        last_name STRING,
                                        biography STRING,
                                        registered BIGINT,
                                        PRIMARY KEY (id) NOT ENFORCED
                                ) WITH (
                                  'connector' = 'upsert-kafka',
                                  'topic' = 'authors_processed',
                                  'properties.bootstrap.servers' = 
'localhost:9092',
                                  'key.format' = 'json',
                                  'value.format' = 'json'
                                );
                                """);

                Table authors = tableEnv.sqlQuery("SELECT id, first_name, 
last_name, biography, registered FROM authors_source");
                authors.insertInto("authors_sink").execute();
                authors.execute().print();
        }
}
```

This might be a regression caused by FLINK-37475.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to