> > Are you sure that the null records are not actually tombstone records? If > you use upsert tables you usually want to have them + compaction. Or how > else will you deal with deletions?
yes they are tombstone records, but i cannot avoid them because the deduplication query cant produce an append-only connector on a LastRow. What do you want to achieve? CDC records should be deduplicated by > definition. > I'm assuming that you want to aggregate the state to the current state. If > so, how do you decide when the record is complete (e.g. no future updates) > and can be written? > I have the feeling that you are using CDC at a place where you don't want > to use it, so maybe it helps to first explain your use case. Is stream > processing a good fit for you in the first place? Yes, I want to aggregate the state to the current state. The problem is that the records are gonna be merged in a database by an ETL every hour. So i don't need all the updates but only the last one, thats why im using a window function and the future updates will be evaluated by the MERGE query in the ETL too. I've changed the query to instead use max(upd_ts) which is producing to append only stream and it works but im not 100% sure if the result is the same: > INSERT INTO sink_table > SELECT distinct id, account, upd_ts > FROM stats_topic t, ( > SELECT id, account, max(upd_ts) as maxTs, > FROM stats_topic > GROUP BY id, account, TUMBLE(row_ts, INTERVAL '20' MINUTE) > ) s > WHERE t.id = s.id AND t.upd_ts = s.maxTs AND t.account = s.account Thanks! On Thu, Feb 11, 2021 at 12:55 PM Arvid Heise <ar...@apache.org> wrote: > Hi, > > Are you sure that the null records are not actually tombstone records? If > you use upsert tables you usually want to have them + compaction. Or how > else will you deal with deletions? > > Is there anyone who is successfully deduplicating CDC records into either >> kafka topic or S3 files(CSV/parquet) ? >> > What do you want to achieve? CDC records should be deduplicated by > definition. > I'm assuming that you want to aggregate the state to the current state. If > so, how do you decide when the record is complete (e.g. no future updates) > and can be written? > > I have the feeling that you are using CDC at a place where you don't want > to use it, so maybe it helps to first explain your use case. Is stream > processing a good fit for you in the first place? > > On Tue, Feb 9, 2021 at 10:37 AM meneldor <menel...@gmail.com> wrote: > >> Unfortunately using row_ts doesn't help. Setting the kafka >> topic cleanup.policy to compact is not a very good idea as it increases >> cpu, memory and might lead to other problems. >> So for now I'll just ignore the null records. Is there anyone who is >> successfully deduplicating CDC records into either kafka topic or S3 >> files(CSV/parquet) ? >> >> Thanks! >> >> On Mon, Feb 8, 2021 at 7:13 PM meneldor <menel...@gmail.com> wrote: >> >>> Thanks for the quick reply, Timo. Ill test with the row_ts and >>> compaction mode suggestions. However, ive read somewhere in the archives >>> that the append only stream is only possible if i extract "the first" >>> record from the ranking only which in my case is the oldest record. >>> >>> Regards >>> >>> On Mon, Feb 8, 2021, 18:56 Timo Walther <twal...@apache.org> wrote: >>> >>>> Hi, >>>> >>>> could the problem be that you are mixing OVER and TUMBLE window with >>>> each other? The TUMBLE is correctly defined over time attribute >>>> `row_ts` >>>> but the OVER window is defined using a regular column `upd_ts`. This >>>> might be the case why the query is not append-only but updating. >>>> >>>> Maybe you can split the problem into sub queries and share the plan >>>> with >>>> us using .explain()? >>>> >>>> The nulls in upsert-kafka should be gone once you enable compaction >>>> mode >>>> in Kafka. >>>> >>>> I hope this helps. >>>> >>>> Regards, >>>> Timo >>>> >>>> >>>> On 08.02.21 10:53, Khachatryan Roman wrote: >>>> > Hi, >>>> > >>>> > AFAIK this should be supported in 1.12 via FLINK-19568 [1] >>>> > I'm pulling in Timo and Jark who might know better. >>>> > >>>> > https://issues.apache.org/jira/browse/FLINK-19857 >>>> > <https://issues.apache.org/jira/browse/FLINK-19857> >>>> > >>>> > Regards, >>>> > Roman >>>> > >>>> > >>>> > On Mon, Feb 8, 2021 at 9:14 AM meneldor <menel...@gmail.com >>>> > <mailto:menel...@gmail.com>> wrote: >>>> > >>>> > Any help please? Is there a way to use the "Last row" from a >>>> > deduplication in an append-only stream or tell upsert-kafka to not >>>> > produce *null* records in the sink? >>>> > >>>> > Thank you >>>> > >>>> > On Thu, Feb 4, 2021 at 1:22 PM meneldor <menel...@gmail.com >>>> > <mailto:menel...@gmail.com>> wrote: >>>> > >>>> > Hello, >>>> > Flink 1.12.1(pyflink) >>>> > I am deduplicating CDC records coming from Maxwell in a kafka >>>> > topic. Here is the SQL: >>>> > >>>> > CREATE TABLE stats_topic( >>>> > `data` ROW<`id` BIGINT, `account` INT, `upd_ts` >>>> > BIGINT>, >>>> > `ts` BIGINT, >>>> > `xid` BIGINT , >>>> > row_ts AS >>>> TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)), >>>> > WATERMARK FOR `row_ts` AS `row_ts` - INTERVAL >>>> > '15' SECOND >>>> > ) WITH ( >>>> > 'connector' = 'kafka', >>>> > 'format' = 'json', >>>> > 'topic' = 'stats_topic', >>>> > 'properties.bootstrap.servers' = >>>> 'localhost:9092', >>>> > 'properties.group.id >>>> > <http://properties.group.id>' = 'test_group' >>>> > ) >>>> > >>>> > CREATE TABLE sink_table( >>>> > `id` BIGINT, >>>> > `account` INT, >>>> > `upd_ts` BIGINT >>>> > ) WITH ( >>>> > 'connector' = 'kafka', >>>> > 'format' = 'json', >>>> > 'topic' = 'sink_topic', >>>> > 'properties.bootstrap.servers' = >>>> 'localhost:9092', >>>> > 'properties.group.id >>>> > <http://properties.group.id>' = 'test_group' >>>> > ) >>>> > >>>> > >>>> > INSERT INTO sink_table >>>> > SELECT >>>> > id, >>>> > account, >>>> > upd_ts >>>> > FROM ( >>>> > SELECT >>>> > id, >>>> > account, >>>> > upd_ts, >>>> > ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc) >>>> > AS rownum >>>> > FROM stats_topic >>>> > GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20' >>>> > MINUTE) >>>> > ) >>>> > WHERE rownum=1 >>>> > >>>> > >>>> > As there are a lot of CDC records for a single ID im using >>>> > ROW_NUMBER() and produce them on a 20 minutes interval to the >>>> > sink_topic. The problem is that flink doesnt allow me to use >>>> it >>>> > in combination with with the kafka connector: >>>> > >>>> > pyflink.util.exceptions.TableException: Table sink >>>> > 'default_catalog.default_database.sink_table' doesn't >>>> > support consuming update and delete changes which is >>>> > produced by node Rank(strategy=[UndefinedStrategy], >>>> > rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], >>>> > partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1, >>>> $f2]) >>>> > >>>> > >>>> > If I use the*upsert-kafka* connector everything is fine but >>>> then >>>> > i receive empty JSON records in the sink topic: >>>> > >>>> > {"id": 111111, "account": 4, "upd_ts": 1612334952} >>>> > {"id": 222222, "account": 4, "upd_ts": 1612334953} >>>> > {} >>>> > {"id": 333333, "account": 4, "upd_ts": 1612334955} >>>> > {} >>>> > {"id": 444444, "account": 4, "upd_ts": 1612334956} >>>> > >>>> > >>>> > Thank you! >>>> > >>>> >>>>