Any help please? Is there a way to use the "Last row" from a deduplication in an append-only stream or tell upsert-kafka to not produce *null* records in the sink?
Thank you On Thu, Feb 4, 2021 at 1:22 PM meneldor <menel...@gmail.com> wrote: > Hello, > Flink 1.12.1(pyflink) > I am deduplicating CDC records coming from Maxwell in a kafka topic. Here > is the SQL: > > CREATE TABLE stats_topic( >> `data` ROW<`id` BIGINT, `account` INT, `upd_ts` BIGINT>, >> `ts` BIGINT, >> `xid` BIGINT , >> row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)), >> WATERMARK FOR `row_ts` AS `row_ts` - INTERVAL '15' SECOND >> ) WITH ( >> 'connector' = 'kafka', >> 'format' = 'json', >> 'topic' = 'stats_topic', >> 'properties.bootstrap.servers' = 'localhost:9092', >> 'properties.group.id' = 'test_group' >> ) >> >> CREATE TABLE sink_table( >> `id` BIGINT, >> `account` INT, >> `upd_ts` BIGINT >> ) WITH ( >> 'connector' = 'kafka', >> 'format' = 'json', >> 'topic' = 'sink_topic', >> 'properties.bootstrap.servers' = 'localhost:9092', >> 'properties.group.id' = 'test_group' >> ) >> >> >> INSERT INTO sink_table >> SELECT >> id, >> account, >> upd_ts >> FROM ( >> SELECT >> id, >> account, >> upd_ts, >> ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc) AS rownum >> FROM stats_topic >> GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20' MINUTE) >> ) >> WHERE rownum=1 >> > > As there are a lot of CDC records for a single ID im using ROW_NUMBER() > and produce them on a 20 minutes interval to the sink_topic. The problem is > that flink doesnt allow me to use it in combination with with the kafka > connector: > >> pyflink.util.exceptions.TableException: Table sink >> 'default_catalog.default_database.sink_table' doesn't support consuming >> update and delete changes which is produced by node >> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], >> rankRange=[rankStart=1, rankEnd=1], partitionBy=[$f0], orderBy=[$f2 DESC], >> select=[$f0, $f1, $f2]) >> > > If I use the* upsert-kafka* connector everything is fine but then i > receive empty JSON records in the sink topic: > >> {"id": 111111, "account": 4, "upd_ts": 1612334952} >> {"id": 222222, "account": 4, "upd_ts": 1612334953} >> {} >> {"id": 333333, "account": 4, "upd_ts": 1612334955} >> {} >> {"id": 444444, "account": 4, "upd_ts": 1612334956} >> > > Thank you! >