Hi,
could the problem be that you are mixing OVER and TUMBLE window with
each other? The TUMBLE is correctly defined over time attribute `row_ts`
but the OVER window is defined using a regular column `upd_ts`. This
might be the case why the query is not append-only but updating.
Maybe you can split the problem into sub queries and share the plan with
us using .explain()?
The nulls in upsert-kafka should be gone once you enable compaction mode
in Kafka.
I hope this helps.
Regards,
Timo
On 08.02.21 10:53, Khachatryan Roman wrote:
Hi,
AFAIK this should be supported in 1.12 via FLINK-19568 [1]
I'm pulling in Timo and Jark who might know better.
https://issues.apache.org/jira/browse/FLINK-19857
<https://issues.apache.org/jira/browse/FLINK-19857>
Regards,
Roman
On Mon, Feb 8, 2021 at 9:14 AM meneldor <menel...@gmail.com
<mailto:menel...@gmail.com>> wrote:
Any help please? Is there a way to use the "Last row" from a
deduplication in an append-only stream or tell upsert-kafka to not
produce *null* records in the sink?
Thank you
On Thu, Feb 4, 2021 at 1:22 PM meneldor <menel...@gmail.com
<mailto:menel...@gmail.com>> wrote:
Hello,
Flink 1.12.1(pyflink)
I am deduplicating CDC records coming from Maxwell in a kafka
topic. Here is the SQL:
CREATE TABLE stats_topic(
`data` ROW<`id` BIGINT, `account` INT, `upd_ts`
BIGINT>,
`ts` BIGINT,
`xid` BIGINT ,
row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
WATERMARK FOR `row_ts` AS `row_ts` - INTERVAL
'15' SECOND
) WITH (
'connector' = 'kafka',
'format' = 'json',
'topic' = 'stats_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id
<http://properties.group.id>' = 'test_group'
)
CREATE TABLE sink_table(
`id` BIGINT,
`account` INT,
`upd_ts` BIGINT
) WITH (
'connector' = 'kafka',
'format' = 'json',
'topic' = 'sink_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id
<http://properties.group.id>' = 'test_group'
)
INSERT INTO sink_table
SELECT
id,
account,
upd_ts
FROM (
SELECT
id,
account,
upd_ts,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc)
AS rownum
FROM stats_topic
GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20'
MINUTE)
)
WHERE rownum=1
As there are a lot of CDC records for a single ID im using
ROW_NUMBER() and produce them on a 20 minutes interval to the
sink_topic. The problem is that flink doesnt allow me to use it
in combination with with the kafka connector:
pyflink.util.exceptions.TableException: Table sink
'default_catalog.default_database.sink_table' doesn't
support consuming update and delete changes which is
produced by node Rank(strategy=[UndefinedStrategy],
rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1, $f2])
If I use the*upsert-kafka* connector everything is fine but then
i receive empty JSON records in the sink topic:
{"id": 111111, "account": 4, "upd_ts": 1612334952}
{"id": 222222, "account": 4, "upd_ts": 1612334953}
{}
{"id": 333333, "account": 4, "upd_ts": 1612334955}
{}
{"id": 444444, "account": 4, "upd_ts": 1612334956}
Thank you!