I have created an issue [1] and a pull request to fix this. Hope we can
catch up with this release.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-18461

On Wed, 1 Jul 2020 at 18:16, Jingsong Li <jingsongl...@gmail.com> wrote:

> CC: @Jark Wu <imj...@gmail.com> and @Timo Walther <t...@ververica.com>
>
> Best,
> Jingsong
>
> On Wed, Jul 1, 2020 at 5:55 PM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>> CREATE TABLE t_pick_order (
>>       order_no VARCHAR,
>>       status INT
>>   ) WITH (
>>       'connector' = 'kafka',
>>       'topic' = 'example',
>>       'scan.startup.mode' = 'latest-offset',
>>       'properties.bootstrap.servers' = '172.19.78.32:9092',
>>       'format' = 'canal-json'
>>    )
>> CREATE TABLE order_status (
>>           order_no VARCHAR,
>>           status INT,
>>
>>                PRIMARY KEY (order_no) NOT ENFORCED
>>       ) WITH (
>>           'connector' = 'jdbc',
>>           'url' = 'jdbc:mysql://xxx:3306/flink_test',
>>           'table-name' = 'order_status',
>>           'username' = 'dev',
>>           'password' = 'xxxx'
>>        )
>>
>>
>>
>> But when i execute insert INTO order_status SELECT order_no, status FROM
>> t_pick_order
>> There's error:
>>
>> [ERROR] Could not execute SQL statement. Reason:
>> org.apache.flink.table.api.TableException: Provided trait
>> [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This
>> is a bug in planner, please file an issue.
>> Current node is TableSourceScan(table=[[default_catalog,
>> default_database, t_pick_order]], fields=[order_no, status])
>>
>>
>> ------------------------------
>> wangl...@geekplus.com.cn
>>
>> *From:* Danny Chan <yuzhao....@gmail.com>
>> *Date:* 2020-06-30 20:25
>> *To:* wangl...@geekplus.com.cn
>> *Subject:* Re: Re: Flip-105 can the debezium/canal SQL sink to database
>> directly?
>> Hi, wanglei2 ~
>>
>> For primary key syntax you can reference [1] for the “PRIMARY KEY” part,
>> notice that currently we only support the NOT ENFORCED mode. Here is the
>> reason:
>>
>> >SQL standard specifies that a constraint can either be ENFORCED or NOT
>> ENFORCED. This controls if the constraint checks are performed on the
>> incoming/outgoing data. Flink does not own the data therefore the only mode
>> we want to support is the NOT ENFORCED mode. It is up to the user to
>> ensure that the query enforces key integrity.
>>
>> For DDL to create JDBC table, you can reference [2]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
>>
>> Best,
>> Danny Chan
>> 在 2020年6月30日 +0800 AM10:25,wangl...@geekplus.com.cn <
>> wangl...@geekplus.com.cn>,写道:
>>
>> Thanks Jingsong,
>>
>> Is there any document or example to this?
>> I will build the flink-1.11 package and have a try.
>>
>> Thanks,
>> Lei
>>
>> ------------------------------
>> wangl...@geekplus.com.cn
>>
>>
>> *From:* Jingsong Li <jingsongl...@gmail.com>
>> *Date:* 2020-06-30 10:08
>> *To:* wangl...@geekplus.com.cn
>> *CC:* user <user@flink.apache.org>
>> *Subject:* Re: Flip-105 can the debezium/canal SQL sink to database
>> directly?
>> Hi Lei,
>>
>> INSERT INTO  jdbc_table SELECT * FROM changelog_table;
>>
>> For Flink 1.11 new connectors, you need to define the primary key for
>> jdbc_table (and also your mysql table needs to have the corresponding
>> primary key) because changelog_table has the "update", "delete" records.
>>
>> And then, jdbc sink will:
>> - insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to
>> deal with "insert" and "update" messages.
>> - delete to deal with "delete" messages.
>>
>> So generally speaking, with the primary key, this mysql table will be the
>> same to your source database table. (table for generating changelog)
>>
>> Best,
>> Jingsong
>>
>> On Tue, Jun 30, 2020 at 9:58 AM wangl...@geekplus.com.cn <
>> wangl...@geekplus.com.cn> wrote:
>>
>>>
>>> CREATE TABLE my_table (
>>>   id BIGINT,
>>>   first_name STRING,
>>>   last_name STRING,
>>>   email STRING
>>> ) WITH (
>>>  'connector'='kafka',
>>>  'topic'='user_topic',
>>>  'properties.bootstrap.servers'='localhost:9092',
>>>  'scan.startup.mode'='earliest-offset',
>>>  'format'='debezium-json'
>>> );
>>>
>>> INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM
>>>  my_table;
>>>
>>>
>>> What will happen after  i execute the insert sql statement? For the
>>> update/delete message from kafka, the corresponding record will be updated
>>> or deleted in the mysql_sink_table?
>>>
>>> INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM
>>>  my_table;
>>>
>>>
>>> Thanks,
>>>
>>> Lei
>>>
>>>
>>> ------------------------------
>>> wangl...@geekplus.com.cn
>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>>
>
> --
> Best, Jingsong Lee
>

Reply via email to