使用自定义的Table Sink就可以了啊. Luan Cooper <[email protected]> 于2020年5月7日周四 下午8:39写道:
> Hi > > 有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是 ID,SQL > 如下 > > INSERT INTO sink_es // 将更改同步 upsert 到 ES > SELECT * > FROM binlog // mysql 表的 binlog > > 假设对于 MySQL 中 id = 1 的变更有 10 条,需要在 ES 上都更新 id = 1 的一条记录 > 但是上面的 SQL 是做不到的,只会一直 Insert > > 如果想 Upsert ES 的话,就得让 Source 表变成 Upsert Mode,但是现在好像不支持? > > 社区的 FLIP-87 > > https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API > 可以解决这个问题吗? > > 感谢 >
