HI Laurent, Did you manage to find the error in your MATCH_RECOGNIZE statement? If I had to take a guess, I'd say it's because you are accessing A, but due to the quantifier of * there might actually be no event A.
Cheers, Konstantin On Fri, Nov 27, 2020 at 10:03 PM Laurent Exsteens < laurent.exste...@euranova.eu> wrote: > Hi Leonard, > > >> From my understanding, your case is not a pure deduplication case but >> want to both keep the previous record and current record, thus the >> deduplication query can not satisfy your requirement. >> > > Indeed, that's what I came to realise during our discussion on this email > chain. I'm sorry if it caused confusion. I'm still not sure how to express > this requirement in a concise way: "the need to deduplicate but let > previous values come back after a different value has appeared".... > > >> Keeping last row in Deduplication always produces a changelog stream, >> because we need to retract the previous last value and sent the new last >> value. You could use a connector that supports upsert sink like HBase, JDBC >> or upsert-kafka connector when sink a changelog stream, the kafka connector >> can only accept append-only stream and thus you got the message. >> > > That's what I understood indeed. But in my case I really do want to insert > and not upsert. > Just for information: the goal is to be able to historize kafka messages > in real-time. Each message could potentially be splitted to store > information in multiple tables (in my example: name and address would be > inserted in 2 different tables), and the history should be kept and > enriched with the ingestion date. The fact that the kafka message can be > split to be stored in multiple tables creates that "deduplication" > requirement (in my example the address could have changed but not the name, > and we don't want to add a record with no business value in the table > containing the names). And of course, a field can be changed twice and as a > result have the same value again, and that's business information we do > want to keep. > > >> The LAG function is used in over window aggregation and should work in >> your case, but unfortunately look like the LAG function does not implements >> correctly, I create an issue[1] to fix this. >> > > Thanks a lot! I'll follow the issue. > I would love to try to fix it... but quickly looking at that code, I'm not > sure it's the best way to start contributing. I don't understand what > should be changed in that code, let alone find what generated that code and > how it should be fixed... > > > In the meantime, I guess the only other option would be the > MATCH_RECOGNIZE? > Do you think you help me find what I did wrong in this query: > > SELECT * > FROM customers > MATCH_RECOGNIZE ( > PARTITION BY client_number > ORDER BY proctime() > MEASURES > B.client_number as client_number, > B.address as address > PATTERN (A* B) > DEFINE > B AS LAST(A.address, 1) is NULL OR B.address <> LAST(A.address, 1) > ) as T; > > I get the following error: > SQL validation failed. Index 0 out of bounds for length 0 > > Thanks a lot for your help! > > Laurent. > > >> >> >> Best, >> Leonard >> [1] https://issues.apache.org/jira/browse/FLINK-20405 >> >> On Fri, 27 Nov 2020 at 03:28, Leonard Xu <xbjt...@gmail.com> wrote: >> >>> Hi, Laurent >>> >>> Basically, I need to deduplicate, *but only keeping in the >>> deduplication state the latest value of the changed column* to compare >>> with. While here it seems to keep all previous values… >>> >>> >>> You can use ` ORDER BY proctime() DESC` in the deduplication query, >>> it will keep last row, I think that’s what you want. >>> >>> BTW, the deduplication has supported event time in 1.12, this will be >>> available soon. >>> >>> Best, >>> Leonard >>> >>> >> >> -- >> *Laurent Exsteens* >> Data Engineer >> (M) +32 (0) 486 20 48 36 >> >> *EURA NOVA* >> Rue Emile Francqui, 4 >> 1435 Mont-Saint-Guibert >> (T) +32 10 75 02 00 >> >> >> *euranova.eu <http://euranova.eu/>* >> *research.euranova.eu* <http://research.euranova.eu/> >> >> ♻ Be green, keep it on the screen >> >> >> > > -- > *Laurent Exsteens* > Data Engineer > (M) +32 (0) 486 20 48 36 > > *EURA NOVA* > > Rue Emile Francqui, 4 > > 1435 Mont-Saint-Guibert > > (T) +32 10 75 02 00 > > *euranova.eu <http://euranova.eu/>* > > *research.euranova.eu* <http://research.euranova.eu/> > > ♻ Be green, keep it on the screen -- Konstantin Knauf https://twitter.com/snntrable https://github.com/knaufk