CC: @Jark Wu <imj...@gmail.com> and @Timo Walther <t...@ververica.com>
Best, Jingsong On Wed, Jul 1, 2020 at 5:55 PM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > CREATE TABLE t_pick_order ( > order_no VARCHAR, > status INT > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'example', > 'scan.startup.mode' = 'latest-offset', > 'properties.bootstrap.servers' = '172.19.78.32:9092', > 'format' = 'canal-json' > ) > CREATE TABLE order_status ( > order_no VARCHAR, > status INT, > > PRIMARY KEY (order_no) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://xxx:3306/flink_test', > 'table-name' = 'order_status', > 'username' = 'dev', > 'password' = 'xxxx' > ) > > > > But when i execute insert INTO order_status SELECT order_no, status FROM > t_pick_order > There's error: > > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.TableException: Provided trait > [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This > is a bug in planner, please file an issue. > Current node is TableSourceScan(table=[[default_catalog, default_database, > t_pick_order]], fields=[order_no, status]) > > > ------------------------------ > wangl...@geekplus.com.cn > > *From:* Danny Chan <yuzhao....@gmail.com> > *Date:* 2020-06-30 20:25 > *To:* wangl...@geekplus.com.cn > *Subject:* Re: Re: Flip-105 can the debezium/canal SQL sink to database > directly? > Hi, wanglei2 ~ > > For primary key syntax you can reference [1] for the “PRIMARY KEY” part, > notice that currently we only support the NOT ENFORCED mode. Here is the > reason: > > >SQL standard specifies that a constraint can either be ENFORCED or NOT > ENFORCED. This controls if the constraint checks are performed on the > incoming/outgoing data. Flink does not own the data therefore the only mode > we want to support is the NOT ENFORCED mode. It is up to the user to > ensure that the query enforces key integrity. > > For DDL to create JDBC table, you can reference [2] > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table > [2] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table > > Best, > Danny Chan > 在 2020年6月30日 +0800 AM10:25,wangl...@geekplus.com.cn < > wangl...@geekplus.com.cn>,写道: > > Thanks Jingsong, > > Is there any document or example to this? > I will build the flink-1.11 package and have a try. > > Thanks, > Lei > > ------------------------------ > wangl...@geekplus.com.cn > > > *From:* Jingsong Li <jingsongl...@gmail.com> > *Date:* 2020-06-30 10:08 > *To:* wangl...@geekplus.com.cn > *CC:* user <user@flink.apache.org> > *Subject:* Re: Flip-105 can the debezium/canal SQL sink to database > directly? > Hi Lei, > > INSERT INTO jdbc_table SELECT * FROM changelog_table; > > For Flink 1.11 new connectors, you need to define the primary key for > jdbc_table (and also your mysql table needs to have the corresponding > primary key) because changelog_table has the "update", "delete" records. > > And then, jdbc sink will: > - insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to > deal with "insert" and "update" messages. > - delete to deal with "delete" messages. > > So generally speaking, with the primary key, this mysql table will be the > same to your source database table. (table for generating changelog) > > Best, > Jingsong > > On Tue, Jun 30, 2020 at 9:58 AM wangl...@geekplus.com.cn < > wangl...@geekplus.com.cn> wrote: > >> >> CREATE TABLE my_table ( >> id BIGINT, >> first_name STRING, >> last_name STRING, >> email STRING >> ) WITH ( >> 'connector'='kafka', >> 'topic'='user_topic', >> 'properties.bootstrap.servers'='localhost:9092', >> 'scan.startup.mode'='earliest-offset', >> 'format'='debezium-json' >> ); >> >> INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM >> my_table; >> >> >> What will happen after i execute the insert sql statement? For the >> update/delete message from kafka, the corresponding record will be updated >> or deleted in the mysql_sink_table? >> >> INSERT INTO mysql_sink_table SELECT id, first_name, last_name FROM >> my_table; >> >> >> Thanks, >> >> Lei >> >> >> ------------------------------ >> wangl...@geekplus.com.cn >> >> > > -- > Best, Jingsong Lee > > -- Best, Jingsong Lee