Ayoub,

thanks for resurrecting this KIP. I think a built-in de-duplication operator will be very useful.


Couple of questions:



100: `deduplicationKeySelector`

Is this the best name? It might indicate that we select a "key" what is an overloaded term... Maybe we could use `Field` or `Id` or `Attribute` instead of `Key` in the name? Just brainstorming. If we think `Key` is the best word, I am also ok with it.



101: `Deduplicated` class

You propose to add static methods `keySerde()` and `valueSerde()` -- in other config classes, we use only `with(keySerde, valueSerde)` as we try to use the "builder" pattern, and avoid too many overloads. I would prefer to omit both methods you suggest and just use a single `with` for both serdes.

Similarly, I thing we don't want to add `with(...)` which takes all parameters at once (which should only be 3 parameters, not 4 as it's currently in the KIP)?



102: Usage of `WindowedStore`:

Would this be efficient? The physical byte layout it "<dataKey,ts>" for the store key, so it would be difficult to do an efficient lookup for a given "de-duplication key" to discard duplicates, as we don't know the timestamp of the first record with the same "de-duplication key".

This boils down to the actual de-duplication logic (some more comments below), but what you propose seems to require expensive range-scans what could be cost prohibitive in practice. I think we need to find a way to use efficient key-point-lookups to make this work.



103: "Processing logic":

Might need some updates (Cf 102 comment). I am not sure if I fully understand the logic: cf 105 below.



104:

If no entries found → forward the record + save the record in the store

This part is critical, and we should discuss in detail. In the end, de-duplication does only make sense when EOS is used, and we might want to call this out (eg, on the JavaDocs)? But if used with ALOS, it's very difficult to ensure that we never lose data... Your proposal to first-forward goes into the right direction, but does not really solve the problem entirely:

Even if we forward the message first, all downstream processing happens, `context.forward()` returns and we update the state store, we could now crash w/o committing offsets. For this case, we have no guarantee that the result records where published (as we did not flush the producer yet), but when re-reading from the input topic, we would find the record in the store and incorrectly drop as duplicate...

I think the only solution to make this work would be to use TX-state stores in combination with ALOS as proposed via KIP-892?

Using an in-memory store won't help much either? The producer could have send the write into the changelog topic, but not into the result topic, and thus we could still not guarantee ALOS...?

How do we want to go about this? We could also say, this new operator only works with EOS. Would this be too restrictive? -- At lest for know, until KIP-892 lands, and we could relax it?



105: "How to detect late records"

In the end, it seems to boil down to determine which of the records to forward and which record to drop, for (1) the regular case and (2) the out-of-order data case.

Regular case (no out-of-order data): For this case, offset and ts order is the same, and we can forward the first record we get. All later record within "de-duplication period" with the same "de-duplication key" would be dropped. If a record with the same "de-duplication key" arrives after "de-duplication period" passed, we cannot drop it any longer, but would still forward it, as by the contract of the operator / de-duplication period.

For the out-of-order case: The first question we need to answer is, do we want to forward the record with the smallest offset or the record with the smallest ts? Logically, forwarding with the smallest ts might be "more correct", however, it implies we could only forward it after "de-duplication period" passed, what might be undesired latency? Would this be desired/acceptable?

In contrast, if we forward record with the smallest offset (this is what you seem to propose) we don't have a latency issue, but of course the question what records to drop is more tricky to answer: it seems you propose to compare the time difference of the stored record to the current record, but I am wondering why? Would it not be desired to drop all duplicates independent of their ts, as long as we find a record in the store? Would be good to get some more motivation and tradeoffs discussed about the different strategies we could use.

You also propose to drop _any_ late record: I am also not sure if that's desired? Could this not lead to data loss? Assume we get a late record, but in fact there was never a duplicate? Why would we want to drop it? If there is a late record which is indeed a duplicate, but we purged the original record from the store already, it seems to be the same case as for the "no out-of-order case": after we purged we cannot de-duplicate and thus it's a regular case we can just accept?



-Matthias




On 5/29/24 4:58 AM, Ayoub Omari wrote:
Hi everyone,

I've just made a (small) change to this KIP about an implementation detail.
Please let me know your thoughts.

Thank you,
Ayoub

Le lun. 20 mai 2024 à 21:13, Ayoub <ayoubomar...@gmail.com> a écrit :

Hello,

Following a discussion on community slack channel, I would like to revive
the discussion on the KIP-655, which is about adding a deduplication
processor in kafka-streams.


https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API

Even though the motivation is not quite the same as the initial one, I
updated the KIP rather than creating a new one, as I believe the end goal
is the same.

Thanks,
Ayoub




Reply via email to