Hi, Maybe you have some misunderstanding to upsert sink. You can take a look to [1], it can deal with "delete" records.
[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion Best, Jingsong Lee On Wed, Mar 25, 2020 at 11:37 AM Jingsong Li <jingsongl...@gmail.com> wrote: > Hi, > > This can be a upsert stream [1], and JDBC has upsert sink now [2]. > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion > [2] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector > > Best, > Jingsong Lee > > On Wed, Mar 25, 2020 at 11:14 AM Jingsong Li <jingsongl...@gmail.com> > wrote: > >> Hi, >> >> This can be a upsert stream [1] >> >> Best, >> Jingsong Lee >> >> On Wed, Mar 25, 2020 at 11:12 AM wangl...@geekplus.com.cn < >> wangl...@geekplus.com.cn> wrote: >> >>> >>> Create one table with kafka, another table with MySQL using flinksql. >>> Write a sql to read from kafka and write to MySQL. >>> >>> INSERT INTO mysqlTable SELECT status, COUNT(order_no) AS num FROM >>> (SELECT order_no, LAST_VALUE(status) AS status FROM >>> kafkaTable GROUP BY order_no) >>> GROUP BY status >>> >>> I think this is a retract stream. >>> >>> But where can i find the java source code about MySQL retract table sink? >>> >>> >>> Thanks, >>> >>> Lei >>> >>> >>> ------------------------------ >>> wangl...@geekplus.com.cn >>> >>> >> >> -- >> Best, Jingsong Lee >> > > > -- > Best, Jingsong Lee > -- Best, Jingsong Lee