>
> Are you sure that the null records are not actually tombstone records? If
> you use upsert tables you usually want to have them + compaction. Or how
> else will you deal with deletions?

yes they are tombstone records, but i cannot avoid them because the
deduplication query cant produce an append-only connector on a LastRow.

What do you want to achieve? CDC records should be deduplicated by
> definition.
> I'm assuming that you want to aggregate the state to the current state. If
> so, how do you decide when the record is complete (e.g. no future updates)
> and can be written?
> I have the feeling that you are using CDC at a place where you don't want
> to use it, so maybe it helps to first explain your use case. Is stream
> processing a good fit for you in the first place?

Yes, I want to aggregate the state to the current state. The problem is
that the records are gonna be merged in a database by an ETL every hour. So
i don't need all the updates but only the last one, thats why im using a
window function and the future updates will be evaluated by the MERGE query
in the ETL too.

I've changed the query to instead use max(upd_ts) which is producing to
append only stream and it works but im not 100% sure if the result is the
same:

> INSERT INTO sink_table
> SELECT distinct id, account, upd_ts
> FROM stats_topic t, (
>    SELECT id, account, max(upd_ts) as maxTs,
>    FROM stats_topic
>    GROUP BY id, account, TUMBLE(row_ts, INTERVAL '20' MINUTE)
> ) s
> WHERE  t.id = s.id AND t.upd_ts = s.maxTs AND t.account = s.account


Thanks!

On Thu, Feb 11, 2021 at 12:55 PM Arvid Heise <ar...@apache.org> wrote:

> Hi,
>
> Are you sure that the null records are not actually tombstone records? If
> you use upsert tables you usually want to have them + compaction. Or how
> else will you deal with deletions?
>
> Is there anyone who is successfully deduplicating CDC records into either
>> kafka topic or S3 files(CSV/parquet) ?
>>
> What do you want to achieve? CDC records should be deduplicated by
> definition.
> I'm assuming that you want to aggregate the state to the current state. If
> so, how do you decide when the record is complete (e.g. no future updates)
> and can be written?
>
> I have the feeling that you are using CDC at a place where you don't want
> to use it, so maybe it helps to first explain your use case. Is stream
> processing a good fit for you in the first place?
>
> On Tue, Feb 9, 2021 at 10:37 AM meneldor <menel...@gmail.com> wrote:
>
>> Unfortunately using row_ts doesn't help. Setting the kafka
>> topic cleanup.policy to compact is not a very good idea as it increases
>> cpu, memory and might lead to other problems.
>> So for now I'll just ignore the null records. Is there anyone who is
>> successfully deduplicating CDC records into either kafka topic or S3
>> files(CSV/parquet) ?
>>
>> Thanks!
>>
>> On Mon, Feb 8, 2021 at 7:13 PM meneldor <menel...@gmail.com> wrote:
>>
>>> Thanks for the quick reply, Timo. Ill test with the  row_ts and
>>> compaction mode suggestions. However, ive read somewhere in the archives
>>> that the append only stream is only possible if i extract "the first"
>>> record from the ranking only which in my case is the oldest record.
>>>
>>> Regards
>>>
>>> On Mon, Feb 8, 2021, 18:56 Timo Walther <twal...@apache.org> wrote:
>>>
>>>> Hi,
>>>>
>>>> could the problem be that you are mixing OVER and TUMBLE window with
>>>> each other? The TUMBLE is correctly defined over time attribute
>>>> `row_ts`
>>>> but the OVER window is defined using a regular column `upd_ts`. This
>>>> might be the case why the query is not append-only but updating.
>>>>
>>>> Maybe you can split the problem into sub queries and share the plan
>>>> with
>>>> us using .explain()?
>>>>
>>>> The nulls in upsert-kafka should be gone once you enable compaction
>>>> mode
>>>> in Kafka.
>>>>
>>>> I hope this helps.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> On 08.02.21 10:53, Khachatryan Roman wrote:
>>>> > Hi,
>>>> >
>>>> > AFAIK this should be supported in 1.12 via FLINK-19568 [1]
>>>> > I'm pulling in Timo and Jark who might know better.
>>>> >
>>>> > https://issues.apache.org/jira/browse/FLINK-19857
>>>> > <https://issues.apache.org/jira/browse/FLINK-19857>
>>>> >
>>>> > Regards,
>>>> > Roman
>>>> >
>>>> >
>>>> > On Mon, Feb 8, 2021 at 9:14 AM meneldor <menel...@gmail.com
>>>> > <mailto:menel...@gmail.com>> wrote:
>>>> >
>>>> >     Any help please? Is there a way to use the "Last row" from a
>>>> >     deduplication in an append-only stream or tell upsert-kafka to not
>>>> >     produce *null* records in the sink?
>>>> >
>>>> >     Thank you
>>>> >
>>>> >     On Thu, Feb 4, 2021 at 1:22 PM meneldor <menel...@gmail.com
>>>> >     <mailto:menel...@gmail.com>> wrote:
>>>> >
>>>> >         Hello,
>>>> >         Flink 1.12.1(pyflink)
>>>> >         I am deduplicating CDC records coming from Maxwell in a kafka
>>>> >         topic.  Here is the SQL:
>>>> >
>>>> >             CREATE TABLE stats_topic(
>>>> >                        `data` ROW<`id` BIGINT, `account` INT, `upd_ts`
>>>> >             BIGINT>,
>>>> >                        `ts` BIGINT,
>>>> >                        `xid` BIGINT ,
>>>> >                        row_ts AS
>>>> TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>>>> >                        WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL
>>>> >             '15' SECOND
>>>> >                      ) WITH (
>>>> >                        'connector' = 'kafka',
>>>> >                        'format' = 'json',
>>>> >                        'topic' = 'stats_topic',
>>>> >                        'properties.bootstrap.servers' =
>>>> 'localhost:9092',
>>>> >                        'properties.group.id
>>>> >             <http://properties.group.id>' = 'test_group'
>>>> >                      )
>>>> >
>>>> >             CREATE TABLE sink_table(
>>>> >                        `id` BIGINT,
>>>> >                        `account` INT,
>>>> >                        `upd_ts` BIGINT
>>>> >                      ) WITH (
>>>> >                        'connector' = 'kafka',
>>>> >                        'format' = 'json',
>>>> >                        'topic' = 'sink_topic',
>>>> >                        'properties.bootstrap.servers' =
>>>> 'localhost:9092',
>>>> >                        'properties.group.id
>>>> >             <http://properties.group.id>' = 'test_group'
>>>> >                      )
>>>> >
>>>> >
>>>> >             INSERT INTO sink_table
>>>> >             SELECT
>>>> >             id,
>>>> >             account,
>>>> >             upd_ts
>>>> >             FROM (
>>>> >             SELECT
>>>> >               id,
>>>> >               account,
>>>> >               upd_ts,
>>>> >               ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc)
>>>> >             AS rownum
>>>> >             FROM stats_topic
>>>> >             GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20'
>>>> >             MINUTE)
>>>> >             )
>>>> >             WHERE rownum=1
>>>> >
>>>> >
>>>> >           As there are a lot of CDC records for a single ID im using
>>>> >         ROW_NUMBER() and produce them on a 20 minutes interval to the
>>>> >         sink_topic. The problem is that flink doesnt allow me to use
>>>> it
>>>> >         in combination with with the kafka connector:
>>>> >
>>>> >             pyflink.util.exceptions.TableException: Table sink
>>>> >             'default_catalog.default_database.sink_table' doesn't
>>>> >             support consuming update and delete changes which is
>>>> >             produced by node Rank(strategy=[UndefinedStrategy],
>>>> >             rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
>>>> >             partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1,
>>>> $f2])
>>>> >
>>>> >
>>>> >         If I use the*upsert-kafka* connector everything is fine but
>>>> then
>>>> >         i receive empty JSON records in the sink topic:
>>>> >
>>>> >             {"id": 111111, "account": 4, "upd_ts": 1612334952}
>>>> >             {"id": 222222, "account": 4, "upd_ts": 1612334953}
>>>> >             {}
>>>> >             {"id": 333333, "account": 4, "upd_ts": 1612334955}
>>>> >             {}
>>>> >             {"id": 444444, "account": 4, "upd_ts": 1612334956}
>>>> >
>>>> >
>>>> >         Thank you!
>>>> >
>>>>
>>>>

Reply via email to