Thanks for the quick reply, Timo. Ill test with the  row_ts and compaction
mode suggestions. However, ive read somewhere in the archives that the
append only stream is only possible if i extract "the first" record from
the ranking only which in my case is the oldest record.

Regards

On Mon, Feb 8, 2021, 18:56 Timo Walther <twal...@apache.org> wrote:

> Hi,
>
> could the problem be that you are mixing OVER and TUMBLE window with
> each other? The TUMBLE is correctly defined over time attribute `row_ts`
> but the OVER window is defined using a regular column `upd_ts`. This
> might be the case why the query is not append-only but updating.
>
> Maybe you can split the problem into sub queries and share the plan with
> us using .explain()?
>
> The nulls in upsert-kafka should be gone once you enable compaction mode
> in Kafka.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> On 08.02.21 10:53, Khachatryan Roman wrote:
> > Hi,
> >
> > AFAIK this should be supported in 1.12 via FLINK-19568 [1]
> > I'm pulling in Timo and Jark who might know better.
> >
> > https://issues.apache.org/jira/browse/FLINK-19857
> > <https://issues.apache.org/jira/browse/FLINK-19857>
> >
> > Regards,
> > Roman
> >
> >
> > On Mon, Feb 8, 2021 at 9:14 AM meneldor <menel...@gmail.com
> > <mailto:menel...@gmail.com>> wrote:
> >
> >     Any help please? Is there a way to use the "Last row" from a
> >     deduplication in an append-only stream or tell upsert-kafka to not
> >     produce *null* records in the sink?
> >
> >     Thank you
> >
> >     On Thu, Feb 4, 2021 at 1:22 PM meneldor <menel...@gmail.com
> >     <mailto:menel...@gmail.com>> wrote:
> >
> >         Hello,
> >         Flink 1.12.1(pyflink)
> >         I am deduplicating CDC records coming from Maxwell in a kafka
> >         topic.  Here is the SQL:
> >
> >             CREATE TABLE stats_topic(
> >                        `data` ROW<`id` BIGINT, `account` INT, `upd_ts`
> >             BIGINT>,
> >                        `ts` BIGINT,
> >                        `xid` BIGINT ,
> >                        row_ts AS
> TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
> >                        WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL
> >             '15' SECOND
> >                      ) WITH (
> >                        'connector' = 'kafka',
> >                        'format' = 'json',
> >                        'topic' = 'stats_topic',
> >                        'properties.bootstrap.servers' = 'localhost:9092',
> >                        'properties.group.id
> >             <http://properties.group.id>' = 'test_group'
> >                      )
> >
> >             CREATE TABLE sink_table(
> >                        `id` BIGINT,
> >                        `account` INT,
> >                        `upd_ts` BIGINT
> >                      ) WITH (
> >                        'connector' = 'kafka',
> >                        'format' = 'json',
> >                        'topic' = 'sink_topic',
> >                        'properties.bootstrap.servers' = 'localhost:9092',
> >                        'properties.group.id
> >             <http://properties.group.id>' = 'test_group'
> >                      )
> >
> >
> >             INSERT INTO sink_table
> >             SELECT
> >             id,
> >             account,
> >             upd_ts
> >             FROM (
> >             SELECT
> >               id,
> >               account,
> >               upd_ts,
> >               ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc)
> >             AS rownum
> >             FROM stats_topic
> >             GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20'
> >             MINUTE)
> >             )
> >             WHERE rownum=1
> >
> >
> >           As there are a lot of CDC records for a single ID im using
> >         ROW_NUMBER() and produce them on a 20 minutes interval to the
> >         sink_topic. The problem is that flink doesnt allow me to use it
> >         in combination with with the kafka connector:
> >
> >             pyflink.util.exceptions.TableException: Table sink
> >             'default_catalog.default_database.sink_table' doesn't
> >             support consuming update and delete changes which is
> >             produced by node Rank(strategy=[UndefinedStrategy],
> >             rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
> >             partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1,
> $f2])
> >
> >
> >         If I use the*upsert-kafka* connector everything is fine but then
> >         i receive empty JSON records in the sink topic:
> >
> >             {"id": 111111, "account": 4, "upd_ts": 1612334952}
> >             {"id": 222222, "account": 4, "upd_ts": 1612334953}
> >             {}
> >             {"id": 333333, "account": 4, "upd_ts": 1612334955}
> >             {}
> >             {"id": 444444, "account": 4, "upd_ts": 1612334956}
> >
> >
> >         Thank you!
> >
>
>

Reply via email to