Hi,

Welcome to try 1.11.

There is no direct doc to describe this, but I think these docs can help
you [1][2]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html

Best,
Jingsong

On Tue, Jun 30, 2020 at 10:25 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> Thanks Jingsong,
>
> Is there any document or example to this?
> I will build the flink-1.11 package and have a try.
>
> Thanks,
> Lei
>
> ------------------------------
> wangl...@geekplus.com.cn
>
>
> *From:* Jingsong Li <jingsongl...@gmail.com>
> *Date:* 2020-06-30 10:08
> *To:* wangl...@geekplus.com.cn
> *CC:* user <user@flink.apache.org>
> *Subject:* Re: Flip-105 can the debezium/canal SQL sink to database
> directly?
> Hi Lei,
>
> INSERT INTO  jdbc_table SELECT * FROM changelog_table;
>
> For Flink 1.11 new connectors, you need to define the primary key for
> jdbc_table (and also your mysql table needs to have the corresponding
> primary key) because changelog_table has the "update", "delete" records.
>
> And then, jdbc sink will:
> - insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to
> deal with "insert" and "update" messages.
> - delete to deal with "delete" messages.
>
> So generally speaking, with the primary key, this mysql table will be the
> same to your source database table. (table for generating changelog)
>
> Best,
> Jingsong
>
> On Tue, Jun 30, 2020 at 9:58 AM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>>
>> CREATE TABLE my_table (
>>   id BIGINT,
>>   first_name STRING,
>>   last_name STRING,
>>   email STRING
>> ) WITH (
>>  'connector'='kafka',
>>  'topic'='user_topic',
>>  'properties.bootstrap.servers'='localhost:9092',
>>  'scan.startup.mode'='earliest-offset',
>>  'format'='debezium-json'
>> );
>>
>> INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM
>>  my_table;
>>
>>
>> What will happen after  i execute the insert sql statement? For the
>> update/delete message from kafka, the corresponding record will be updated
>> or deleted in the mysql_sink_table?
>>
>> INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM
>>  my_table;
>>
>>
>> Thanks,
>>
>> Lei
>>
>>
>> ------------------------------
>> wangl...@geekplus.com.cn
>>
>>
>
> --
> Best, Jingsong Lee
>
>

-- 
Best, Jingsong Lee

Reply via email to