Hi, Laurent
> 
> I'm not sure that would do what I want though. As far as I understand, the 
> deduplication query will always remember any values it has seen. So if I 
> have, for a specific primary key, the following values in another field: "a", 
> "a", "b", "b", "a", "a", the deduplication query could provide me with "a", 
> "b" as a result. But never with "a", "b", "a" (possibility to come back to a 
> previous value), which is what I need.

From  my understanding, your case is not a pure deduplication case but want to 
both keep the previous record and current record, thus the deduplication query 
can not satisfy your requirement.


> Moreover, I tried putting procttime() DESC, and I get the message: The 
> submitted query is not an append-only query. Only queries producing 
> exclusively new rows over time are supported at the moment. I do want an 
> append only query.
Keeping last row in Deduplication always produces a changelog stream, because 
we need to retract the previous last value and sent the new last value. You 
could use a connector that supports upsert sink like HBase, JDBC or 
upsert-kafka connector when sink a changelog stream, the kafka connector can 
only accept append-only stream and thus you got the message.


> The LAG function makes complete sense to me here, since it would only compare 
> with the previous record. I just don't understand why it does not get the 
> value of the previous record, whatever offset I give it. Any idea what I 
> might be doing wrong?

The LAG function is used in over window aggregation and should work in your 
case, but unfortunately look like the LAG function does not implements 
correctly, I create an issue[1] to fix this.

 
 
Best,
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-20405

> On Fri, 27 Nov 2020 at 03:28, Leonard Xu <xbjt...@gmail.com 
> <mailto:xbjt...@gmail.com>> wrote:
> Hi, Laurent
> 
>> Basically, I need to deduplicate, but only keeping in the deduplication 
>> state the latest value of the changed column to compare with. While here it 
>> seems to keep all previous values…
> 
> You can use ` ORDER BY proctime() DESC`  in the deduplication query,  it will 
> keep last row, I think that’s what you want.
> 
> BTW, the deduplication has supported event time in 1.12, this will be 
> available soon.
> 
> Best,
> Leonard
> 
> 
> 
> -- 
> Laurent Exsteens
> Data Engineer
> (M) +32 (0) 486 20 48 36
> 
> EURA NOVA
> Rue Emile Francqui, 4
> 1435 Mont-Saint-Guibert
> (T) +32 10 75 02 00 <tel:%2B32%2010%2075%2002%2000>
> 
> euranova.eu <http://euranova.eu/>
> research.euranova.eu <http://research.euranova.eu/>
> ♻ Be green, keep it on the screen

Reply via email to