Hi, Laurent > > I'm not sure that would do what I want though. As far as I understand, the > deduplication query will always remember any values it has seen. So if I > have, for a specific primary key, the following values in another field: "a", > "a", "b", "b", "a", "a", the deduplication query could provide me with "a", > "b" as a result. But never with "a", "b", "a" (possibility to come back to a > previous value), which is what I need.
From my understanding, your case is not a pure deduplication case but want to both keep the previous record and current record, thus the deduplication query can not satisfy your requirement. > Moreover, I tried putting procttime() DESC, and I get the message: The > submitted query is not an append-only query. Only queries producing > exclusively new rows over time are supported at the moment. I do want an > append only query. Keeping last row in Deduplication always produces a changelog stream, because we need to retract the previous last value and sent the new last value. You could use a connector that supports upsert sink like HBase, JDBC or upsert-kafka connector when sink a changelog stream, the kafka connector can only accept append-only stream and thus you got the message. > The LAG function makes complete sense to me here, since it would only compare > with the previous record. I just don't understand why it does not get the > value of the previous record, whatever offset I give it. Any idea what I > might be doing wrong? The LAG function is used in over window aggregation and should work in your case, but unfortunately look like the LAG function does not implements correctly, I create an issue[1] to fix this. Best, Leonard [1] https://issues.apache.org/jira/browse/FLINK-20405 > On Fri, 27 Nov 2020 at 03:28, Leonard Xu <xbjt...@gmail.com > <mailto:xbjt...@gmail.com>> wrote: > Hi, Laurent > >> Basically, I need to deduplicate, but only keeping in the deduplication >> state the latest value of the changed column to compare with. While here it >> seems to keep all previous values… > > You can use ` ORDER BY proctime() DESC` in the deduplication query, it will > keep last row, I think that’s what you want. > > BTW, the deduplication has supported event time in 1.12, this will be > available soon. > > Best, > Leonard > > > > -- > Laurent Exsteens > Data Engineer > (M) +32 (0) 486 20 48 36 > > EURA NOVA > Rue Emile Francqui, 4 > 1435 Mont-Saint-Guibert > (T) +32 10 75 02 00 <tel:%2B32%2010%2075%2002%2000> > > euranova.eu <http://euranova.eu/> > research.euranova.eu <http://research.euranova.eu/> > ♻ Be green, keep it on the screen