Hello, Flink 1.12.1(pyflink) I am deduplicating CDC records coming from Maxwell in a kafka topic. Here is the SQL:
CREATE TABLE stats_topic( > `data` ROW<`id` BIGINT, `account` INT, `upd_ts` BIGINT>, > `ts` BIGINT, > `xid` BIGINT , > row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)), > WATERMARK FOR `row_ts` AS `row_ts` - INTERVAL '15' SECOND > ) WITH ( > 'connector' = 'kafka', > 'format' = 'json', > 'topic' = 'stats_topic', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'test_group' > ) > > CREATE TABLE sink_table( > `id` BIGINT, > `account` INT, > `upd_ts` BIGINT > ) WITH ( > 'connector' = 'kafka', > 'format' = 'json', > 'topic' = 'sink_topic', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'test_group' > ) > > > INSERT INTO sink_table > SELECT > id, > account, > upd_ts > FROM ( > SELECT > id, > account, > upd_ts, > ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc) AS rownum > FROM stats_topic > GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20' MINUTE) > ) > WHERE rownum=1 > As there are a lot of CDC records for a single ID im using ROW_NUMBER() and produce them on a 20 minutes interval to the sink_topic. The problem is that flink doesnt allow me to use it in combination with with the kafka connector: > pyflink.util.exceptions.TableException: Table sink > 'default_catalog.default_database.sink_table' doesn't support consuming > update and delete changes which is produced by node > Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[$f0], orderBy=[$f2 DESC], > select=[$f0, $f1, $f2]) > If I use the* upsert-kafka* connector everything is fine but then i receive empty JSON records in the sink topic: > {"id": 111111, "account": 4, "upd_ts": 1612334952} > {"id": 222222, "account": 4, "upd_ts": 1612334953} > {} > {"id": 333333, "account": 4, "upd_ts": 1612334955} > {} > {"id": 444444, "account": 4, "upd_ts": 1612334956} > Thank you!