Unfortunately using row_ts doesn't help. Setting the kafka
topic cleanup.policy to compact is not a very good idea as it increases
cpu, memory and might lead to other problems.
So for now I'll just ignore the null records. Is there anyone who is
successfully deduplicating CDC records into either kafka topic or S3
files(CSV/parquet) ?

Thanks!

On Mon, Feb 8, 2021 at 7:13 PM meneldor <menel...@gmail.com> wrote:

> Thanks for the quick reply, Timo. Ill test with the  row_ts and compaction
> mode suggestions. However, ive read somewhere in the archives that the
> append only stream is only possible if i extract "the first" record from
> the ranking only which in my case is the oldest record.
>
> Regards
>
> On Mon, Feb 8, 2021, 18:56 Timo Walther <twal...@apache.org> wrote:
>
>> Hi,
>>
>> could the problem be that you are mixing OVER and TUMBLE window with
>> each other? The TUMBLE is correctly defined over time attribute `row_ts`
>> but the OVER window is defined using a regular column `upd_ts`. This
>> might be the case why the query is not append-only but updating.
>>
>> Maybe you can split the problem into sub queries and share the plan with
>> us using .explain()?
>>
>> The nulls in upsert-kafka should be gone once you enable compaction mode
>> in Kafka.
>>
>> I hope this helps.
>>
>> Regards,
>> Timo
>>
>>
>> On 08.02.21 10:53, Khachatryan Roman wrote:
>> > Hi,
>> >
>> > AFAIK this should be supported in 1.12 via FLINK-19568 [1]
>> > I'm pulling in Timo and Jark who might know better.
>> >
>> > https://issues.apache.org/jira/browse/FLINK-19857
>> > <https://issues.apache.org/jira/browse/FLINK-19857>
>> >
>> > Regards,
>> > Roman
>> >
>> >
>> > On Mon, Feb 8, 2021 at 9:14 AM meneldor <menel...@gmail.com
>> > <mailto:menel...@gmail.com>> wrote:
>> >
>> >     Any help please? Is there a way to use the "Last row" from a
>> >     deduplication in an append-only stream or tell upsert-kafka to not
>> >     produce *null* records in the sink?
>> >
>> >     Thank you
>> >
>> >     On Thu, Feb 4, 2021 at 1:22 PM meneldor <menel...@gmail.com
>> >     <mailto:menel...@gmail.com>> wrote:
>> >
>> >         Hello,
>> >         Flink 1.12.1(pyflink)
>> >         I am deduplicating CDC records coming from Maxwell in a kafka
>> >         topic.  Here is the SQL:
>> >
>> >             CREATE TABLE stats_topic(
>> >                        `data` ROW<`id` BIGINT, `account` INT, `upd_ts`
>> >             BIGINT>,
>> >                        `ts` BIGINT,
>> >                        `xid` BIGINT ,
>> >                        row_ts AS
>> TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>> >                        WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL
>> >             '15' SECOND
>> >                      ) WITH (
>> >                        'connector' = 'kafka',
>> >                        'format' = 'json',
>> >                        'topic' = 'stats_topic',
>> >                        'properties.bootstrap.servers' =
>> 'localhost:9092',
>> >                        'properties.group.id
>> >             <http://properties.group.id>' = 'test_group'
>> >                      )
>> >
>> >             CREATE TABLE sink_table(
>> >                        `id` BIGINT,
>> >                        `account` INT,
>> >                        `upd_ts` BIGINT
>> >                      ) WITH (
>> >                        'connector' = 'kafka',
>> >                        'format' = 'json',
>> >                        'topic' = 'sink_topic',
>> >                        'properties.bootstrap.servers' =
>> 'localhost:9092',
>> >                        'properties.group.id
>> >             <http://properties.group.id>' = 'test_group'
>> >                      )
>> >
>> >
>> >             INSERT INTO sink_table
>> >             SELECT
>> >             id,
>> >             account,
>> >             upd_ts
>> >             FROM (
>> >             SELECT
>> >               id,
>> >               account,
>> >               upd_ts,
>> >               ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc)
>> >             AS rownum
>> >             FROM stats_topic
>> >             GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20'
>> >             MINUTE)
>> >             )
>> >             WHERE rownum=1
>> >
>> >
>> >           As there are a lot of CDC records for a single ID im using
>> >         ROW_NUMBER() and produce them on a 20 minutes interval to the
>> >         sink_topic. The problem is that flink doesnt allow me to use it
>> >         in combination with with the kafka connector:
>> >
>> >             pyflink.util.exceptions.TableException: Table sink
>> >             'default_catalog.default_database.sink_table' doesn't
>> >             support consuming update and delete changes which is
>> >             produced by node Rank(strategy=[UndefinedStrategy],
>> >             rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
>> >             partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1,
>> $f2])
>> >
>> >
>> >         If I use the*upsert-kafka* connector everything is fine but then
>> >         i receive empty JSON records in the sink topic:
>> >
>> >             {"id": 111111, "account": 4, "upd_ts": 1612334952}
>> >             {"id": 222222, "account": 4, "upd_ts": 1612334953}
>> >             {}
>> >             {"id": 333333, "account": 4, "upd_ts": 1612334955}
>> >             {}
>> >             {"id": 444444, "account": 4, "upd_ts": 1612334956}
>> >
>> >
>> >         Thank you!
>> >
>>
>>

Reply via email to