Hi, Welcome to try 1.11.
There is no direct doc to describe this, but I think these docs can help you [1][2] [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html Best, Jingsong On Tue, Jun 30, 2020 at 10:25 AM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > Thanks Jingsong, > > Is there any document or example to this? > I will build the flink-1.11 package and have a try. > > Thanks, > Lei > > ------------------------------ > wangl...@geekplus.com.cn > > > *From:* Jingsong Li <jingsongl...@gmail.com> > *Date:* 2020-06-30 10:08 > *To:* wangl...@geekplus.com.cn > *CC:* user <user@flink.apache.org> > *Subject:* Re: Flip-105 can the debezium/canal SQL sink to database > directly? > Hi Lei, > > INSERT INTO jdbc_table SELECT * FROM changelog_table; > > For Flink 1.11 new connectors, you need to define the primary key for > jdbc_table (and also your mysql table needs to have the corresponding > primary key) because changelog_table has the "update", "delete" records. > > And then, jdbc sink will: > - insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to > deal with "insert" and "update" messages. > - delete to deal with "delete" messages. > > So generally speaking, with the primary key, this mysql table will be the > same to your source database table. (table for generating changelog) > > Best, > Jingsong > > On Tue, Jun 30, 2020 at 9:58 AM wangl...@geekplus.com.cn < > wangl...@geekplus.com.cn> wrote: > >> >> CREATE TABLE my_table ( >> id BIGINT, >> first_name STRING, >> last_name STRING, >> email STRING >> ) WITH ( >> 'connector'='kafka', >> 'topic'='user_topic', >> 'properties.bootstrap.servers'='localhost:9092', >> 'scan.startup.mode'='earliest-offset', >> 'format'='debezium-json' >> ); >> >> INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM >> my_table; >> >> >> What will happen after i execute the insert sql statement? For the >> update/delete message from kafka, the corresponding record will be updated >> or deleted in the mysql_sink_table? >> >> INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM >> my_table; >> >> >> Thanks, >> >> Lei >> >> >> ------------------------------ >> wangl...@geekplus.com.cn >> >> > > -- > Best, Jingsong Lee > > -- Best, Jingsong Lee