CC: @Jark Wu <imj...@gmail.com> and @Timo Walther <t...@ververica.com>

Best,
Jingsong

On Wed, Jul 1, 2020 at 5:55 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> CREATE TABLE t_pick_order (
>       order_no VARCHAR,
>       status INT
>   ) WITH (
>       'connector' = 'kafka',
>       'topic' = 'example',
>       'scan.startup.mode' = 'latest-offset',
>       'properties.bootstrap.servers' = '172.19.78.32:9092',
>       'format' = 'canal-json'
>    )
> CREATE TABLE order_status (
>           order_no VARCHAR,
>           status INT,
>
>                 PRIMARY KEY (order_no) NOT ENFORCED
>       ) WITH (
>           'connector' = 'jdbc',
>           'url' = 'jdbc:mysql://xxx:3306/flink_test',
>           'table-name' = 'order_status',
>           'username' = 'dev',
>           'password' = 'xxxx'
>        )
>
>
>
> But when i execute insert INTO order_status SELECT order_no, status FROM
> t_pick_order
> There's error:
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Provided trait
> [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This
> is a bug in planner, please file an issue.
> Current node is TableSourceScan(table=[[default_catalog, default_database,
> t_pick_order]], fields=[order_no, status])
>
>
> ------------------------------
> wangl...@geekplus.com.cn
>
> *From:* Danny Chan <yuzhao....@gmail.com>
> *Date:* 2020-06-30 20:25
> *To:* wangl...@geekplus.com.cn
> *Subject:* Re: Re: Flip-105 can the debezium/canal SQL sink to database
> directly?
> Hi, wanglei2 ~
>
> For primary key syntax you can reference [1] for the “PRIMARY KEY” part,
> notice that currently we only support the NOT ENFORCED mode. Here is the
> reason:
>
> >SQL standard specifies that a constraint can either be ENFORCED or NOT
> ENFORCED. This controls if the constraint checks are performed on the
> incoming/outgoing data. Flink does not own the data therefore the only mode
> we want to support is the NOT ENFORCED mode. It is up to the user to
> ensure that the query enforces key integrity.
>
> For DDL to create JDBC table, you can reference [2]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
>
> Best,
> Danny Chan
> 在 2020年6月30日 +0800 AM10:25,wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn>,写道:
>
> Thanks Jingsong,
>
> Is there any document or example to this?
> I will build the flink-1.11 package and have a try.
>
> Thanks,
> Lei
>
> ------------------------------
> wangl...@geekplus.com.cn
>
>
> *From:* Jingsong Li <jingsongl...@gmail.com>
> *Date:* 2020-06-30 10:08
> *To:* wangl...@geekplus.com.cn
> *CC:* user <user@flink.apache.org>
> *Subject:* Re: Flip-105 can the debezium/canal SQL sink to database
> directly?
> Hi Lei,
>
> INSERT INTO  jdbc_table SELECT * FROM changelog_table;
>
> For Flink 1.11 new connectors, you need to define the primary key for
> jdbc_table (and also your mysql table needs to have the corresponding
> primary key) because changelog_table has the "update", "delete" records.
>
> And then, jdbc sink will:
> - insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to
> deal with "insert" and "update" messages.
> - delete to deal with "delete" messages.
>
> So generally speaking, with the primary key, this mysql table will be the
> same to your source database table. (table for generating changelog)
>
> Best,
> Jingsong
>
> On Tue, Jun 30, 2020 at 9:58 AM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>>
>> CREATE TABLE my_table (
>>   id BIGINT,
>>   first_name STRING,
>>   last_name STRING,
>>   email STRING
>> ) WITH (
>>  'connector'='kafka',
>>  'topic'='user_topic',
>>  'properties.bootstrap.servers'='localhost:9092',
>>  'scan.startup.mode'='earliest-offset',
>>  'format'='debezium-json'
>> );
>>
>> INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM
>>  my_table;
>>
>>
>> What will happen after  i execute the insert sql statement? For the
>> update/delete message from kafka, the corresponding record will be updated
>> or deleted in the mysql_sink_table?
>>
>> INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM
>>  my_table;
>>
>>
>> Thanks,
>>
>> Lei
>>
>>
>> ------------------------------
>> wangl...@geekplus.com.cn
>>
>>
>
> --
> Best, Jingsong Lee
>
>

-- 
Best, Jingsong Lee

Reply via email to