Hi Lei,

INSERT INTO  jdbc_table SELECT * FROM changelog_table;

For Flink 1.11 new connectors, you need to define the primary key for
jdbc_table (and also your mysql table needs to have the corresponding
primary key) because changelog_table has the "update", "delete" records.

And then, jdbc sink will:
- insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to
deal with "insert" and "update" messages.
- delete to deal with "delete" messages.

So generally speaking, with the primary key, this mysql table will be the
same to your source database table. (table for generating changelog)

Best,
Jingsong

On Tue, Jun 30, 2020 at 9:58 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
> CREATE TABLE my_table (
>   id BIGINT,
>   first_name STRING,
>   last_name STRING,
>   email STRING
> ) WITH (
>  'connector'='kafka',
>  'topic'='user_topic',
>  'properties.bootstrap.servers'='localhost:9092',
>  'scan.startup.mode'='earliest-offset',
>  'format'='debezium-json'
> );
>
> INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM
>  my_table;
>
>
> What will happen after  i execute the insert sql statement? For the
> update/delete message from kafka, the corresponding record will be updated
> or deleted in the mysql_sink_table?
>
> INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM
>  my_table;
>
>
> Thanks,
>
> Lei
>
>
> ------------------------------
> wangl...@geekplus.com.cn
>
>

-- 
Best, Jingsong Lee

Reply via email to