Hi, wanglei PLEASE use English when send mails to user(user@flink.apache.org) mail list. You should send to to user-zh(user...@flink.apache.org) mail list, and I’m pleasure to answer the question here.
Best, Leonard Xu > 在 2020年4月27日,12:14,wangl...@geekplus.com.cn 写道: > > > > INSERT INTO mysql_sink SELECT f1, count(*) FROM kafka_src GROUP BY f1 > > 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。 > 但这个 Sink 到底是用到了 UpsertStream 还是 RetractStream 呢,怎么判断是 UpsertStream 还是 > RetractStream 呢? > > 我看 > https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc > > <https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc> > 没有 Retract 方式 > 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗? > > > > 如若不带 group by 直接: > INSERT INTO mysql_sink SELECT f1, f2 FROM kafka_src > 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢? > > > > wangl...@geekplus.com.cn <mailto:wangl...@geekplus.com.cn>