HI Lei, Jingsong is wright, you need define a primary key for your sink table. BTW, Flink use `PRIMARY KEY NOT ENFORCED` to define primary key because Flink doesn’t own data and only supports `NOT ENFORCED` mode, it’s a little bit different with the primary key in DB which is default `ENFORCED` , both `ENFORCED ` and `NOT ENFORCED` are supported in SQL standard. You can look up[1][2] for more details.
Best, Leonard [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table> [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table> > 在 2020年6月30日,10:08,Jingsong Li <jingsongl...@gmail.com> 写道: > > Hi Lei, > > INSERT INTO jdbc_table SELECT * FROM changelog_table; > > For Flink 1.11 new connectors, you need to define the primary key for > jdbc_table (and also your mysql table needs to have the corresponding primary > key) because changelog_table has the "update", "delete" records. > > And then, jdbc sink will: > - insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal > with "insert" and "update" messages. > - delete to deal with "delete" messages. > > So generally speaking, with the primary key, this mysql table will be the > same to your source database table. (table for generating changelog) > > Best, > Jingsong > > On Tue, Jun 30, 2020 at 9:58 AM wangl...@geekplus.com.cn > <mailto:wangl...@geekplus.com.cn> <wangl...@geekplus.com.cn > <mailto:wangl...@geekplus.com.cn>> wrote: > > CREATE TABLE my_table ( > id BIGINT, > first_name STRING, > last_name STRING, > email STRING > ) WITH ( > 'connector'='kafka', > 'topic'='user_topic', > 'properties.bootstrap.servers'='localhost:9092', > 'scan.startup.mode'='earliest-offset', > 'format'='debezium-json' > ); > INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM my_table; > > What will happen after i execute the insert sql statement? For the > update/delete message from kafka, the corresponding record will be updated or > deleted in the mysql_sink_table? > INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM > my_table; > > Thanks, > Lei > > wangl...@geekplus.com.cn <mailto:wangl...@geekplus.com.cn> > > > > -- > Best, Jingsong Lee