Re: FlinkSQL kafka->dedup->kafka

2020-12-15 Thread Konstantin Knauf
HI Laurent, Did you manage to find the error in your MATCH_RECOGNIZE statement? If I had to take a guess, I'd say it's because you are accessing A, but due to the quantifier of * there might actually be no event A. Cheers, Konstantin On Fri, Nov 27, 2020 at 10:03 PM Laurent Exsteens < laurent

Re: FlinkSQL kafka->dedup->kafka

2020-11-27 Thread Laurent Exsteens
Hi Leonard, > From my understanding, your case is not a pure deduplication case but > want to both keep the previous record and current record, thus the > deduplication query can not satisfy your requirement. > Indeed, that's what I came to realise during our discussion on this email chain. I'm

Re: FlinkSQL kafka->dedup->kafka

2020-11-27 Thread Konstantin Knauf
Hi Laurent, With respect to Ververica Platform, we will support Flink 1.12 and add "upsert-kafka" as a packaged connector in our next minor release which we target for February. Cheers, Konstantin On Thu, Nov 12, 2020 at 3:43 AM Jark Wu wrote: > Hi Laurent, > > 1. Deduplicate with keeping the

Re: FlinkSQL kafka->dedup->kafka

2020-11-27 Thread Leonard Xu
Hi, Laurent > > I'm not sure that would do what I want though. As far as I understand, the > deduplication query will always remember any values it has seen. So if I > have, for a specific primary key, the following values in another field: "a", > "a", "b", "b", "a", "a", the deduplication quer

Re: FlinkSQL kafka->dedup->kafka

2020-11-27 Thread Laurent Exsteens
Hi Leonard, thank you for your answer. I'm not sure that would do what I want though. As far as I understand, the deduplication query will always remember any values it has seen. So if I have, for a specific primary key, the following values in another field: "a", "a", "b", "b", "a", "a", the ded

Re: FlinkSQL kafka->dedup->kafka

2020-11-26 Thread Leonard Xu
Hi, Laurent > Basically, I need to deduplicate, but only keeping in the deduplication state > the latest value of the changed column to compare with. While here it seems > to keep all previous values… You can use ` ORDER BY proctime() DESC` in the deduplication query, it will keep last row,

Re: FlinkSQL kafka->dedup->kafka

2020-11-26 Thread Laurent Exsteens
Hello, seems like LAG would probably be the right function to use. However, I get unexpected results from it: --- -- Deduplicate addresses --- --CREATE TEMPORARY VIEW dedup_address as SELECT * FROM ( SELECT client_number , address ,

Re: FlinkSQL kafka->dedup->kafka

2020-11-12 Thread Laurent Exsteens
I'm now trying with a MATCH_RECOGNIZE: SELECT * FROM customers MATCH_RECOGNIZE ( PARTITION BY client_number ORDER BY proctime() MEASURES LAST(B.client_number) as client_number, LAST(B.address) as address PATTERN (A* B) DEFINE B AS LAST(A.address, 1) is NULL OR B.address <> LAST(A.address, 1) ) as

Re: FlinkSQL kafka->dedup->kafka

2020-11-12 Thread Laurent Exsteens
I see what was my mistake: I was using a field in my ORDER BY, while it only support proctime() for now. That allows me to create an append only stream, thanks a lot! However, it still does not allow me to do what I need: *If I use both my primary key and changing column in PARTITION BY, then i

Re: FlinkSQL kafka->dedup->kafka

2020-11-12 Thread Jark Wu
Hi Laurent, 1. Currently, it's impossible to convert deduplicate with last row into an append-only stream. 2. Yes, I think Ververica platform doesn't support 'changelog-json' format natively. However, regarding your case, I think you can use keep first row on client_number+address key. SELECT *

Re: FlinkSQL kafka->dedup->kafka

2020-11-12 Thread Laurent Exsteens
Hi Jark, thanks again for your quick response! I tried multiple variants of my query by: - specifying only the primary key in the PARTITION BY clause - changing the order to DESC to keep the last row --> I unfortunately always get the same error message. If I try to make a simple select on the r

Re: FlinkSQL kafka->dedup->kafka

2020-11-11 Thread Jark Wu
Hi Laurent, 1. Deduplicate with keeping the first row will generate an append-only stream. But I guess you are expecting to keep the last row which generates an updating stream. An alternative way is you can use the "changelog-json" format in this repo [1], it will convert the updating stream int

Re: FlinkSQL kafka->dedup->kafka

2020-11-11 Thread Laurent Exsteens
Hi Jark, thanks for your quick reply. I was indeed expecting it. But that triggers the following questions: 1. Is there another way to do this deduplication and generate an append-only stream? Match Recognize? UDF? ...? 2. If I would put Postgres as a sink, what would happen? Will the e

Re: FlinkSQL kafka->dedup->kafka

2020-11-10 Thread Jark Wu
Hi Laurent, This is because the deduplicate node generates an updating stream, however Kafka currently only supports append-only stream. This can be addressed in release-1.12, because we introduce a new connector "upsert-kafka" which supports writing updating streams into Kafka compacted topics.

FlinkSQL kafka->dedup->kafka

2020-11-10 Thread Laurent Exsteens
Hello, I'm getting an error in Flink SQL when reading from kafka, deduplicating records and sending them back to Kafka. The behavior I want is the following: *input:* | client_number | address | | --- | --- | | 1 | addr1 | | 1