[ 
https://issues.apache.org/jira/browse/FLINK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yubin Li updated FLINK-36547:
-----------------------------
    Description: 
As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> +I
{code:java}
Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as 
Debezium JSON or Avro messages, and emit to external systems like Kafka. 
However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a 
single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER 
as DELETE and INSERT Debezium messages. {code}
In fact, we also have a demand to make the `RowKind` sematics consistent in 
many scenarios, such as those that require different processing of -U/-D and 
+U/+I. we have taken advantage of the difference between UPDATE_BEFORE and 
UPDATE_AFTER to implement the feature and made it run well in business.
{code:java}
create table datagen1 (id int, name string) with 
('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 
'fields.id.max'='2');

// add 'debezium-json.retain.rowkind' = 'true'
create table t2 (id int, name string, num bigint) WITH (
    'topic' = 't2', 
    'connector' = 'kafka', 
    'properties.bootstrap.servers' = 'xx', 
    'properties.group.id' = 'test', 
    'scan.startup.mode' = 'earliest-offset', 
    'format' = 'debezium-json',
    'key.format' = 'json',
    'key.fields' = 'id',
    'debezium-json.timestamp-format.standard' = 'ISO-8601', 
    'debezium-json.schema-include' = 'false',
    'debezium-json.retain.rowkind' = 'true'
);

insert into t2 select id, max(name) as name, count(1) as num from datagen1 
group by id;
insert into print1 select * from t2;
 {code}
output result:

!image-2024-10-16-11-02-34-406.png|width=660,height=153!

  was:
As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> +I
{code:java}
Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as 
Debezium JSON or Avro messages, and emit to external systems like Kafka. 
However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a 
single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER 
as DELETE and INSERT Debezium messages. {code}
In fact, we also have a demand to make the `RowKind` sematics consistent in 
many scenarios, such as those that require different processing of -U/-D and 
+U/+I. we have taken advantage of the difference between UPDATE_BEFORE and 
UPDATE_AFTER to implement the feature and made it run well in bussiness.
{code:java}
create table datagen1 (id int, name string) with 
('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 
'fields.id.max'='2');

// add 'debezium-json.retain.rowkind' = 'true'
create table t2 (id int, name string, num bigint) WITH (
    'topic' = 't2', 
    'connector' = 'kafka', 
    'properties.bootstrap.servers' = 'xx', 
    'properties.group.id' = 'test', 
    'scan.startup.mode' = 'earliest-offset', 
    'format' = 'debezium-json',
    'key.format' = 'json',
    'key.fields' = 'id',
    'debezium-json.timestamp-format.standard' = 'ISO-8601', 
    'debezium-json.schema-include' = 'false',
    'debezium-json.retain.rowkind' = 'true'
);

insert into t2 select id, max(name) as name, count(1) as num from datagen1 
group by id;
insert into print1 select * from t2;
 {code}
output result:

!image-2024-10-16-11-02-34-406.png|width=660,height=153!


> Add option to retain `RowKind` sematics for cdc formats
> -------------------------------------------------------
>
>                 Key: FLINK-36547
>                 URL: https://issues.apache.org/jira/browse/FLINK-36547
>             Project: Flink
>          Issue Type: Improvement
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 2.0.0
>            Reporter: Yubin Li
>            Assignee: Yubin Li
>            Priority: Major
>         Attachments: image-2024-10-16-11-01-54-790.png, 
> image-2024-10-16-11-02-34-406.png
>
>
> As official docs said, `RowKind` sematics have been changed: -U -> -D, +D -> 
> +I
> {code:java}
> Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL 
> as Debezium JSON or Avro messages, and emit to external systems like Kafka. 
> However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a 
> single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and 
> UDPATE_AFTER as DELETE and INSERT Debezium messages. {code}
> In fact, we also have a demand to make the `RowKind` sematics consistent in 
> many scenarios, such as those that require different processing of -U/-D and 
> +U/+I. we have taken advantage of the difference between UPDATE_BEFORE and 
> UPDATE_AFTER to implement the feature and made it run well in business.
> {code:java}
> create table datagen1 (id int, name string) with 
> ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1', 
> 'fields.id.max'='2');
> // add 'debezium-json.retain.rowkind' = 'true'
> create table t2 (id int, name string, num bigint) WITH (
>     'topic' = 't2', 
>     'connector' = 'kafka', 
>     'properties.bootstrap.servers' = 'xx', 
>     'properties.group.id' = 'test', 
>     'scan.startup.mode' = 'earliest-offset', 
>     'format' = 'debezium-json',
>     'key.format' = 'json',
>     'key.fields' = 'id',
>     'debezium-json.timestamp-format.standard' = 'ISO-8601', 
>     'debezium-json.schema-include' = 'false',
>     'debezium-json.retain.rowkind' = 'true'
> );
> insert into t2 select id, max(name) as name, count(1) as num from datagen1 
> group by id;
> insert into print1 select * from t2;
>  {code}
> output result:
> !image-2024-10-16-11-02-34-406.png|width=660,height=153!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to