HI Lei,
Jingsong is wright, you need define a primary key for your sink table.
BTW, Flink use `PRIMARY KEY NOT ENFORCED` to define primary key because Flink 
doesn’t own data and only supports `NOT ENFORCED` mode, it’s a little bit 
different with the primary key  in DB which is default `ENFORCED` , both  
`ENFORCED ` and `NOT ENFORCED` are supported in SQL standard.
You can look up[1][2] for more details.

Best,
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table>
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table>

> 在 2020年6月30日,10:08,Jingsong Li <jingsongl...@gmail.com> 写道:
> 
> Hi Lei,
> 
> INSERT INTO  jdbc_table SELECT * FROM changelog_table;
> 
> For Flink 1.11 new connectors, you need to define the primary key for 
> jdbc_table (and also your mysql table needs to have the corresponding primary 
> key) because changelog_table has the "update", "delete" records.
> 
> And then, jdbc sink will:
> - insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal 
> with "insert" and "update" messages.
> - delete to deal with "delete" messages.
> 
> So generally speaking, with the primary key, this mysql table will be the 
> same to your source database table. (table for generating changelog)
> 
> Best,
> Jingsong
> 
> On Tue, Jun 30, 2020 at 9:58 AM wangl...@geekplus.com.cn 
> <mailto:wangl...@geekplus.com.cn> <wangl...@geekplus.com.cn 
> <mailto:wangl...@geekplus.com.cn>> wrote:
> 
> CREATE TABLE my_table (
>   id BIGINT,
>   first_name STRING,
>   last_name STRING,
>   email STRING
> ) WITH (
>  'connector'='kafka',
>  'topic'='user_topic',
>  'properties.bootstrap.servers'='localhost:9092',
>  'scan.startup.mode'='earliest-offset',
>  'format'='debezium-json'
> );
> INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table;
> 
> What will happen after  i execute the insert sql statement? For the 
> update/delete message from kafka, the corresponding record will be updated or 
> deleted in the mysql_sink_table?
> INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  
> my_table; 
> 
> Thanks,
> Lei
> 
> wangl...@geekplus.com.cn <mailto:wangl...@geekplus.com.cn> 
> 
> 
> 
> -- 
> Best, Jingsong Lee

Reply via email to