[ https://issues.apache.org/jira/browse/FLINK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yubin Li updated FLINK-36547: ----------------------------- Description: As official docs said, `RowKind` semantics have been changed: -U -> -D, +D -> +I {code:java} Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as Debezium JSON or Avro messages, and emit to external systems like Kafka. However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER as DELETE and INSERT Debezium messages. {code} In fact, we also have a demand to make the `RowKind` sematics consistent in many scenarios, such as those that require different processing of -U/-D and +U/+I. we have taken advantage of the difference between UPDATE_BEFORE and UPDATE_AFTER to implement the feature and made it run well in business. {code:java} create table datagen1 (id int, name string) with ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 'fields.id.max'='2'); // add 'debezium-json.retain.rowkind' = 'true' create table t2 (id int, name string, num bigint) WITH ( 'topic' = 't2', 'connector' = 'kafka', 'properties.bootstrap.servers' = 'xx', 'properties.group.id' = 'test', 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json', 'key.format' = 'json', 'key.fields' = 'id', 'debezium-json.timestamp-format.standard' = 'ISO-8601', 'debezium-json.schema-include' = 'false', 'debezium-json.retain.rowkind' = 'true' ); insert into t2 select id, max(name) as name, count(1) as num from datagen1 group by id; insert into print1 select * from t2; {code} output result: !image-2024-10-16-11-02-34-406.png|width=660,height=153! was: As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> +I {code:java} Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as Debezium JSON or Avro messages, and emit to external systems like Kafka. However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER as DELETE and INSERT Debezium messages. {code} In fact, we also have a demand to make the `RowKind` sematics consistent in many scenarios, such as those that require different processing of -U/-D and +U/+I. we have taken advantage of the difference between UPDATE_BEFORE and UPDATE_AFTER to implement the feature and made it run well in business. {code:java} create table datagen1 (id int, name string) with ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 'fields.id.max'='2'); // add 'debezium-json.retain.rowkind' = 'true' create table t2 (id int, name string, num bigint) WITH ( 'topic' = 't2', 'connector' = 'kafka', 'properties.bootstrap.servers' = 'xx', 'properties.group.id' = 'test', 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json', 'key.format' = 'json', 'key.fields' = 'id', 'debezium-json.timestamp-format.standard' = 'ISO-8601', 'debezium-json.schema-include' = 'false', 'debezium-json.retain.rowkind' = 'true' ); insert into t2 select id, max(name) as name, count(1) as num from datagen1 group by id; insert into print1 select * from t2; {code} output result: !image-2024-10-16-11-02-34-406.png|width=660,height=153! > Add option to retain `RowKind` sematics for cdc formats > ------------------------------------------------------- > > Key: FLINK-36547 > URL: https://issues.apache.org/jira/browse/FLINK-36547 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) > Affects Versions: 2.0.0 > Reporter: Yubin Li > Assignee: Yubin Li > Priority: Major > Attachments: image-2024-10-16-11-01-54-790.png, > image-2024-10-16-11-02-34-406.png > > > As official docs said, `RowKind` semantics have been changed: -U -> -D, +D -> > +I > {code:java} > Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL > as Debezium JSON or Avro messages, and emit to external systems like Kafka. > However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a > single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and > UDPATE_AFTER as DELETE and INSERT Debezium messages. {code} > In fact, we also have a demand to make the `RowKind` sematics consistent in > many scenarios, such as those that require different processing of -U/-D and > +U/+I. we have taken advantage of the difference between UPDATE_BEFORE and > UPDATE_AFTER to implement the feature and made it run well in business. > {code:java} > create table datagen1 (id int, name string) with > ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', > 'fields.id.max'='2'); > // add 'debezium-json.retain.rowkind' = 'true' > create table t2 (id int, name string, num bigint) WITH ( > 'topic' = 't2', > 'connector' = 'kafka', > 'properties.bootstrap.servers' = 'xx', > 'properties.group.id' = 'test', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'debezium-json', > 'key.format' = 'json', > 'key.fields' = 'id', > 'debezium-json.timestamp-format.standard' = 'ISO-8601', > 'debezium-json.schema-include' = 'false', > 'debezium-json.retain.rowkind' = 'true' > ); > insert into t2 select id, max(name) as name, count(1) as num from datagen1 > group by id; > insert into print1 select * from t2; > {code} > output result: > !image-2024-10-16-11-02-34-406.png|width=660,height=153! -- This message was sent by Atlassian Jira (v8.20.10#820010)