Hi,

could the problem be that you are mixing OVER and TUMBLE window with each other? The TUMBLE is correctly defined over time attribute `row_ts` but the OVER window is defined using a regular column `upd_ts`. This might be the case why the query is not append-only but updating.

Maybe you can split the problem into sub queries and share the plan with us using .explain()?

The nulls in upsert-kafka should be gone once you enable compaction mode in Kafka.

I hope this helps.

Regards,
Timo


On 08.02.21 10:53, Khachatryan Roman wrote:
Hi,

AFAIK this should be supported in 1.12 via FLINK-19568 [1]
I'm pulling in Timo and Jark who might know better.

https://issues.apache.org/jira/browse/FLINK-19857 <https://issues.apache.org/jira/browse/FLINK-19857>

Regards,
Roman


On Mon, Feb 8, 2021 at 9:14 AM meneldor <menel...@gmail.com <mailto:menel...@gmail.com>> wrote:

    Any help please? Is there a way to use the "Last row" from a
    deduplication in an append-only stream or tell upsert-kafka to not
    produce *null* records in the sink?

    Thank you

    On Thu, Feb 4, 2021 at 1:22 PM meneldor <menel...@gmail.com
    <mailto:menel...@gmail.com>> wrote:

        Hello,
        Flink 1.12.1(pyflink)
        I am deduplicating CDC records coming from Maxwell in a kafka
        topic.  Here is the SQL:

            CREATE TABLE stats_topic(
                       `data` ROW<`id` BIGINT, `account` INT, `upd_ts`
            BIGINT>,
                       `ts` BIGINT,
                       `xid` BIGINT ,
                       row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
                       WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL
            '15' SECOND
                     ) WITH (
                       'connector' = 'kafka',
                       'format' = 'json',
                       'topic' = 'stats_topic',
                       'properties.bootstrap.servers' = 'localhost:9092',
                       'properties.group.id
            <http://properties.group.id>' = 'test_group'
                     )

            CREATE TABLE sink_table(
                       `id` BIGINT,
                       `account` INT,
                       `upd_ts` BIGINT
                     ) WITH (
                       'connector' = 'kafka',
                       'format' = 'json',
                       'topic' = 'sink_topic',
                       'properties.bootstrap.servers' = 'localhost:9092',
                       'properties.group.id
            <http://properties.group.id>' = 'test_group'
                     )


            INSERT INTO sink_table
            SELECT
            id,
            account,
            upd_ts
            FROM (
            SELECT
              id,
              account,
              upd_ts,
              ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc)
            AS rownum
            FROM stats_topic
            GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20'
            MINUTE)
            )
            WHERE rownum=1


          As there are a lot of CDC records for a single ID im using
        ROW_NUMBER() and produce them on a 20 minutes interval to the
        sink_topic. The problem is that flink doesnt allow me to use it
        in combination with with the kafka connector:

            pyflink.util.exceptions.TableException: Table sink
            'default_catalog.default_database.sink_table' doesn't
            support consuming update and delete changes which is
            produced by node Rank(strategy=[UndefinedStrategy],
            rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
            partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1, $f2])


        If I use the*upsert-kafka* connector everything is fine but then
        i receive empty JSON records in the sink topic:

            {"id": 111111, "account": 4, "upd_ts": 1612334952}
            {"id": 222222, "account": 4, "upd_ts": 1612334953}
            {}
            {"id": 333333, "account": 4, "upd_ts": 1612334955}
            {}
            {"id": 444444, "account": 4, "upd_ts": 1612334956}


        Thank you!


Reply via email to