Any help please? Is there a way to use the "Last row" from a deduplication
in an append-only stream or tell upsert-kafka to not produce *null* records
in the sink?

Thank you

On Thu, Feb 4, 2021 at 1:22 PM meneldor <menel...@gmail.com> wrote:

> Hello,
> Flink 1.12.1(pyflink)
> I am deduplicating CDC records coming from Maxwell in a kafka topic.  Here
> is the SQL:
>
> CREATE TABLE stats_topic(
>>           `data` ROW<`id` BIGINT, `account` INT, `upd_ts` BIGINT>,
>>           `ts` BIGINT,
>>           `xid` BIGINT ,
>>           row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>>           WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL '15' SECOND
>>         ) WITH (
>>           'connector' = 'kafka',
>>           'format' = 'json',
>>           'topic' = 'stats_topic',
>>           'properties.bootstrap.servers' = 'localhost:9092',
>>           'properties.group.id' = 'test_group'
>>         )
>>
>> CREATE TABLE sink_table(
>>           `id` BIGINT,
>>           `account` INT,
>>           `upd_ts` BIGINT
>>         ) WITH (
>>           'connector' = 'kafka',
>>           'format' = 'json',
>>           'topic' = 'sink_topic',
>>           'properties.bootstrap.servers' = 'localhost:9092',
>>           'properties.group.id' = 'test_group'
>>         )
>>
>>
>> INSERT INTO sink_table
>> SELECT
>> id,
>> account,
>> upd_ts
>> FROM (
>> SELECT
>>  id,
>>  account,
>>  upd_ts,
>>  ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc) AS rownum
>> FROM stats_topic
>> GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20' MINUTE)
>> )
>> WHERE rownum=1
>>
>
>  As there are a lot of CDC records for a single ID im using ROW_NUMBER()
> and produce them on a 20 minutes interval to the sink_topic. The problem is
> that flink doesnt allow me to use it in combination with with the kafka
> connector:
>
>> pyflink.util.exceptions.TableException: Table sink
>> 'default_catalog.default_database.sink_table' doesn't support consuming
>> update and delete changes which is produced by node
>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[$f0], orderBy=[$f2 DESC],
>> select=[$f0, $f1, $f2])
>>
>
> If I use the* upsert-kafka* connector everything is fine but then i
> receive empty JSON records in the sink topic:
>
>> {"id": 111111, "account": 4, "upd_ts": 1612334952}
>> {"id": 222222, "account": 4, "upd_ts": 1612334953}
>> {}
>> {"id": 333333, "account": 4, "upd_ts": 1612334955}
>> {}
>> {"id": 444444, "account": 4, "upd_ts": 1612334956}
>>
>
> Thank you!
>

Reply via email to