HI Laurent,
Did you manage to find the error in your MATCH_RECOGNIZE statement? If I
had to take a guess, I'd say it's because you are accessing A, but due to
the quantifier of * there might actually be no event A.
Cheers,
Konstantin
On Fri, Nov 27, 2020 at 10:03 PM Laurent Exsteens <
laurent
Hi Leonard,
> From my understanding, your case is not a pure deduplication case but
> want to both keep the previous record and current record, thus the
> deduplication query can not satisfy your requirement.
>
Indeed, that's what I came to realise during our discussion on this email
chain. I'm
Hi Laurent,
With respect to Ververica Platform, we will support Flink 1.12 and add
"upsert-kafka" as a packaged connector in our next minor release which we
target for February.
Cheers,
Konstantin
On Thu, Nov 12, 2020 at 3:43 AM Jark Wu wrote:
> Hi Laurent,
>
> 1. Deduplicate with keeping the
Hi, Laurent
>
> I'm not sure that would do what I want though. As far as I understand, the
> deduplication query will always remember any values it has seen. So if I
> have, for a specific primary key, the following values in another field: "a",
> "a", "b", "b", "a", "a", the deduplication quer
Hi Leonard,
thank you for your answer.
I'm not sure that would do what I want though. As far as I understand, the
deduplication query will always remember any values it has seen. So if I
have, for a specific primary key, the following values in another field:
"a", "a", "b", "b", "a", "a", the ded
Hi, Laurent
> Basically, I need to deduplicate, but only keeping in the deduplication state
> the latest value of the changed column to compare with. While here it seems
> to keep all previous values…
You can use ` ORDER BY proctime() DESC` in the deduplication query, it will
keep last row,
Hello,
seems like LAG would probably be the right function to use.
However, I get unexpected results from it:
---
-- Deduplicate addresses
---
--CREATE TEMPORARY VIEW dedup_address as
SELECT *
FROM (
SELECT client_number
, address
,
I'm now trying with a MATCH_RECOGNIZE:
SELECT *
FROM customers
MATCH_RECOGNIZE (
PARTITION BY client_number
ORDER BY proctime()
MEASURES
LAST(B.client_number) as client_number,
LAST(B.address) as address
PATTERN (A* B)
DEFINE
B AS LAST(A.address, 1) is NULL OR B.address <> LAST(A.address, 1)
) as
I see what was my mistake: I was using a field in my ORDER BY, while it
only support proctime() for now.
That allows me to create an append only stream, thanks a lot!
However, it still does not allow me to do what I need:
*If I use both my primary key and changing column in PARTITION BY, then i
Hi Laurent,
1. Currently, it's impossible to convert deduplicate with last row into an
append-only stream.
2. Yes, I think Ververica platform doesn't support 'changelog-json' format
natively.
However, regarding your case, I think you can use keep first row on
client_number+address key.
SELECT *
Hi Jark,
thanks again for your quick response!
I tried multiple variants of my query by:
- specifying only the primary key in the PARTITION BY clause
- changing the order to DESC to keep the last row
--> I unfortunately always get the same error message.
If I try to make a simple select on the r
Hi Laurent,
1. Deduplicate with keeping the first row will generate an append-only
stream. But I guess you are expecting to keep the last row which generates
an updating stream. An alternative way is you can
use the "changelog-json" format in this repo [1], it will convert the
updating stream int
Hi Jark,
thanks for your quick reply. I was indeed expecting it.
But that triggers the following questions:
1. Is there another way to do this deduplication and generate an
append-only stream? Match Recognize? UDF? ...?
2. If I would put Postgres as a sink, what would happen? Will the e
Hi Laurent,
This is because the deduplicate node generates an updating stream, however
Kafka currently only supports append-only stream.
This can be addressed in release-1.12, because we introduce a new connector
"upsert-kafka" which supports writing updating
streams into Kafka compacted topics.
Hello,
I'm getting an error in Flink SQL when reading from kafka, deduplicating
records and sending them back to Kafka.
The behavior I want is the following:
*input:*
| client_number | address |
| --- | --- |
| 1 | addr1 |
| 1
15 matches
Mail list logo