Hello,
Flink 1.12.1(pyflink)
I am deduplicating CDC records coming from Maxwell in a kafka topic.  Here
is the SQL:

CREATE TABLE stats_topic(
>           `data` ROW<`id` BIGINT, `account` INT, `upd_ts` BIGINT>,
>           `ts` BIGINT,
>           `xid` BIGINT ,
>           row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>           WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL '15' SECOND
>         ) WITH (
>           'connector' = 'kafka',
>           'format' = 'json',
>           'topic' = 'stats_topic',
>           'properties.bootstrap.servers' = 'localhost:9092',
>           'properties.group.id' = 'test_group'
>         )
>
> CREATE TABLE sink_table(
>           `id` BIGINT,
>           `account` INT,
>           `upd_ts` BIGINT
>         ) WITH (
>           'connector' = 'kafka',
>           'format' = 'json',
>           'topic' = 'sink_topic',
>           'properties.bootstrap.servers' = 'localhost:9092',
>           'properties.group.id' = 'test_group'
>         )
>
>
> INSERT INTO sink_table
> SELECT
> id,
> account,
> upd_ts
> FROM (
> SELECT
>  id,
>  account,
>  upd_ts,
>  ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc) AS rownum
> FROM stats_topic
> GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20' MINUTE)
> )
> WHERE rownum=1
>

 As there are a lot of CDC records for a single ID im using ROW_NUMBER()
and produce them on a 20 minutes interval to the sink_topic. The problem is
that flink doesnt allow me to use it in combination with with the kafka
connector:

> pyflink.util.exceptions.TableException: Table sink
> 'default_catalog.default_database.sink_table' doesn't support consuming
> update and delete changes which is produced by node
> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[$f0], orderBy=[$f2 DESC],
> select=[$f0, $f1, $f2])
>

If I use the* upsert-kafka* connector everything is fine but then i receive
empty JSON records in the sink topic:

> {"id": 111111, "account": 4, "upd_ts": 1612334952}
> {"id": 222222, "account": 4, "upd_ts": 1612334953}
> {}
> {"id": 333333, "account": 4, "upd_ts": 1612334955}
> {}
> {"id": 444444, "account": 4, "upd_ts": 1612334956}
>

Thank you!

Reply via email to