Thanks for the quick reply, Timo. Ill test with the row_ts and compaction mode suggestions. However, ive read somewhere in the archives that the append only stream is only possible if i extract "the first" record from the ranking only which in my case is the oldest record.
Regards On Mon, Feb 8, 2021, 18:56 Timo Walther <twal...@apache.org> wrote: > Hi, > > could the problem be that you are mixing OVER and TUMBLE window with > each other? The TUMBLE is correctly defined over time attribute `row_ts` > but the OVER window is defined using a regular column `upd_ts`. This > might be the case why the query is not append-only but updating. > > Maybe you can split the problem into sub queries and share the plan with > us using .explain()? > > The nulls in upsert-kafka should be gone once you enable compaction mode > in Kafka. > > I hope this helps. > > Regards, > Timo > > > On 08.02.21 10:53, Khachatryan Roman wrote: > > Hi, > > > > AFAIK this should be supported in 1.12 via FLINK-19568 [1] > > I'm pulling in Timo and Jark who might know better. > > > > https://issues.apache.org/jira/browse/FLINK-19857 > > <https://issues.apache.org/jira/browse/FLINK-19857> > > > > Regards, > > Roman > > > > > > On Mon, Feb 8, 2021 at 9:14 AM meneldor <menel...@gmail.com > > <mailto:menel...@gmail.com>> wrote: > > > > Any help please? Is there a way to use the "Last row" from a > > deduplication in an append-only stream or tell upsert-kafka to not > > produce *null* records in the sink? > > > > Thank you > > > > On Thu, Feb 4, 2021 at 1:22 PM meneldor <menel...@gmail.com > > <mailto:menel...@gmail.com>> wrote: > > > > Hello, > > Flink 1.12.1(pyflink) > > I am deduplicating CDC records coming from Maxwell in a kafka > > topic. Here is the SQL: > > > > CREATE TABLE stats_topic( > > `data` ROW<`id` BIGINT, `account` INT, `upd_ts` > > BIGINT>, > > `ts` BIGINT, > > `xid` BIGINT , > > row_ts AS > TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)), > > WATERMARK FOR `row_ts` AS `row_ts` - INTERVAL > > '15' SECOND > > ) WITH ( > > 'connector' = 'kafka', > > 'format' = 'json', > > 'topic' = 'stats_topic', > > 'properties.bootstrap.servers' = 'localhost:9092', > > 'properties.group.id > > <http://properties.group.id>' = 'test_group' > > ) > > > > CREATE TABLE sink_table( > > `id` BIGINT, > > `account` INT, > > `upd_ts` BIGINT > > ) WITH ( > > 'connector' = 'kafka', > > 'format' = 'json', > > 'topic' = 'sink_topic', > > 'properties.bootstrap.servers' = 'localhost:9092', > > 'properties.group.id > > <http://properties.group.id>' = 'test_group' > > ) > > > > > > INSERT INTO sink_table > > SELECT > > id, > > account, > > upd_ts > > FROM ( > > SELECT > > id, > > account, > > upd_ts, > > ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc) > > AS rownum > > FROM stats_topic > > GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20' > > MINUTE) > > ) > > WHERE rownum=1 > > > > > > As there are a lot of CDC records for a single ID im using > > ROW_NUMBER() and produce them on a 20 minutes interval to the > > sink_topic. The problem is that flink doesnt allow me to use it > > in combination with with the kafka connector: > > > > pyflink.util.exceptions.TableException: Table sink > > 'default_catalog.default_database.sink_table' doesn't > > support consuming update and delete changes which is > > produced by node Rank(strategy=[UndefinedStrategy], > > rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], > > partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1, > $f2]) > > > > > > If I use the*upsert-kafka* connector everything is fine but then > > i receive empty JSON records in the sink topic: > > > > {"id": 111111, "account": 4, "upd_ts": 1612334952} > > {"id": 222222, "account": 4, "upd_ts": 1612334953} > > {} > > {"id": 333333, "account": 4, "upd_ts": 1612334955} > > {} > > {"id": 444444, "account": 4, "upd_ts": 1612334956} > > > > > > Thank you! > > > >