I have created an issue [1] and a pull request to fix this. Hope we can catch up with this release.
Best, Jark [1]: https://issues.apache.org/jira/browse/FLINK-18461 On Wed, 1 Jul 2020 at 18:16, Jingsong Li <jingsongl...@gmail.com> wrote: > CC: @Jark Wu <imj...@gmail.com> and @Timo Walther <t...@ververica.com> > > Best, > Jingsong > > On Wed, Jul 1, 2020 at 5:55 PM wangl...@geekplus.com.cn < > wangl...@geekplus.com.cn> wrote: > >> CREATE TABLE t_pick_order ( >> order_no VARCHAR, >> status INT >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = 'example', >> 'scan.startup.mode' = 'latest-offset', >> 'properties.bootstrap.servers' = '172.19.78.32:9092', >> 'format' = 'canal-json' >> ) >> CREATE TABLE order_status ( >> order_no VARCHAR, >> status INT, >> >> PRIMARY KEY (order_no) NOT ENFORCED >> ) WITH ( >> 'connector' = 'jdbc', >> 'url' = 'jdbc:mysql://xxx:3306/flink_test', >> 'table-name' = 'order_status', >> 'username' = 'dev', >> 'password' = 'xxxx' >> ) >> >> >> >> But when i execute insert INTO order_status SELECT order_no, status FROM >> t_pick_order >> There's error: >> >> [ERROR] Could not execute SQL statement. Reason: >> org.apache.flink.table.api.TableException: Provided trait >> [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This >> is a bug in planner, please file an issue. >> Current node is TableSourceScan(table=[[default_catalog, >> default_database, t_pick_order]], fields=[order_no, status]) >> >> >> ------------------------------ >> wangl...@geekplus.com.cn >> >> *From:* Danny Chan <yuzhao....@gmail.com> >> *Date:* 2020-06-30 20:25 >> *To:* wangl...@geekplus.com.cn >> *Subject:* Re: Re: Flip-105 can the debezium/canal SQL sink to database >> directly? >> Hi, wanglei2 ~ >> >> For primary key syntax you can reference [1] for the “PRIMARY KEY” part, >> notice that currently we only support the NOT ENFORCED mode. Here is the >> reason: >> >> >SQL standard specifies that a constraint can either be ENFORCED or NOT >> ENFORCED. This controls if the constraint checks are performed on the >> incoming/outgoing data. Flink does not own the data therefore the only mode >> we want to support is the NOT ENFORCED mode. It is up to the user to >> ensure that the query enforces key integrity. >> >> For DDL to create JDBC table, you can reference [2] >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table >> [2] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table >> >> Best, >> Danny Chan >> 在 2020年6月30日 +0800 AM10:25,wangl...@geekplus.com.cn < >> wangl...@geekplus.com.cn>,写道: >> >> Thanks Jingsong, >> >> Is there any document or example to this? >> I will build the flink-1.11 package and have a try. >> >> Thanks, >> Lei >> >> ------------------------------ >> wangl...@geekplus.com.cn >> >> >> *From:* Jingsong Li <jingsongl...@gmail.com> >> *Date:* 2020-06-30 10:08 >> *To:* wangl...@geekplus.com.cn >> *CC:* user <user@flink.apache.org> >> *Subject:* Re: Flip-105 can the debezium/canal SQL sink to database >> directly? >> Hi Lei, >> >> INSERT INTO jdbc_table SELECT * FROM changelog_table; >> >> For Flink 1.11 new connectors, you need to define the primary key for >> jdbc_table (and also your mysql table needs to have the corresponding >> primary key) because changelog_table has the "update", "delete" records. >> >> And then, jdbc sink will: >> - insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to >> deal with "insert" and "update" messages. >> - delete to deal with "delete" messages. >> >> So generally speaking, with the primary key, this mysql table will be the >> same to your source database table. (table for generating changelog) >> >> Best, >> Jingsong >> >> On Tue, Jun 30, 2020 at 9:58 AM wangl...@geekplus.com.cn < >> wangl...@geekplus.com.cn> wrote: >> >>> >>> CREATE TABLE my_table ( >>> id BIGINT, >>> first_name STRING, >>> last_name STRING, >>> email STRING >>> ) WITH ( >>> 'connector'='kafka', >>> 'topic'='user_topic', >>> 'properties.bootstrap.servers'='localhost:9092', >>> 'scan.startup.mode'='earliest-offset', >>> 'format'='debezium-json' >>> ); >>> >>> INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM >>> my_table; >>> >>> >>> What will happen after i execute the insert sql statement? For the >>> update/delete message from kafka, the corresponding record will be updated >>> or deleted in the mysql_sink_table? >>> >>> INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM >>> my_table; >>> >>> >>> Thanks, >>> >>> Lei >>> >>> >>> ------------------------------ >>> wangl...@geekplus.com.cn >>> >>> >> >> -- >> Best, Jingsong Lee >> >> > > -- > Best, Jingsong Lee >