Ayoub,
thanks for resurrecting this KIP. I think a built-in de-duplication
operator will be very useful.
Couple of questions:
100: `deduplicationKeySelector`
Is this the best name? It might indicate that we select a "key" what is
an overloaded term... Maybe we could use `Field` or `Id` or `Attribute`
instead of `Key` in the name? Just brainstorming. If we think `Key` is
the best word, I am also ok with it.
101: `Deduplicated` class
You propose to add static methods `keySerde()` and `valueSerde()` -- in
other config classes, we use only `with(keySerde, valueSerde)` as we try
to use the "builder" pattern, and avoid too many overloads. I would
prefer to omit both methods you suggest and just use a single `with` for
both serdes.
Similarly, I thing we don't want to add `with(...)` which takes all
parameters at once (which should only be 3 parameters, not 4 as it's
currently in the KIP)?
102: Usage of `WindowedStore`:
Would this be efficient? The physical byte layout it "<dataKey,ts>" for
the store key, so it would be difficult to do an efficient lookup for a
given "de-duplication key" to discard duplicates, as we don't know the
timestamp of the first record with the same "de-duplication key".
This boils down to the actual de-duplication logic (some more comments
below), but what you propose seems to require expensive range-scans what
could be cost prohibitive in practice. I think we need to find a way to
use efficient key-point-lookups to make this work.
103: "Processing logic":
Might need some updates (Cf 102 comment). I am not sure if I fully
understand the logic: cf 105 below.
104:
If no entries found → forward the record + save the record in the store
This part is critical, and we should discuss in detail. In the end,
de-duplication does only make sense when EOS is used, and we might want
to call this out (eg, on the JavaDocs)? But if used with ALOS, it's very
difficult to ensure that we never lose data... Your proposal to
first-forward goes into the right direction, but does not really solve
the problem entirely:
Even if we forward the message first, all downstream processing happens,
`context.forward()` returns and we update the state store, we could now
crash w/o committing offsets. For this case, we have no guarantee that
the result records where published (as we did not flush the producer
yet), but when re-reading from the input topic, we would find the record
in the store and incorrectly drop as duplicate...
I think the only solution to make this work would be to use TX-state
stores in combination with ALOS as proposed via KIP-892?
Using an in-memory store won't help much either? The producer could have
send the write into the changelog topic, but not into the result topic,
and thus we could still not guarantee ALOS...?
How do we want to go about this? We could also say, this new operator
only works with EOS. Would this be too restrictive? -- At lest for know,
until KIP-892 lands, and we could relax it?
105: "How to detect late records"
In the end, it seems to boil down to determine which of the records to
forward and which record to drop, for (1) the regular case and (2) the
out-of-order data case.
Regular case (no out-of-order data): For this case, offset and ts order
is the same, and we can forward the first record we get. All later
record within "de-duplication period" with the same "de-duplication key"
would be dropped. If a record with the same "de-duplication key" arrives
after "de-duplication period" passed, we cannot drop it any longer, but
would still forward it, as by the contract of the operator /
de-duplication period.
For the out-of-order case: The first question we need to answer is, do
we want to forward the record with the smallest offset or the record
with the smallest ts? Logically, forwarding with the smallest ts might
be "more correct", however, it implies we could only forward it after
"de-duplication period" passed, what might be undesired latency? Would
this be desired/acceptable?
In contrast, if we forward record with the smallest offset (this is what
you seem to propose) we don't have a latency issue, but of course the
question what records to drop is more tricky to answer: it seems you
propose to compare the time difference of the stored record to the
current record, but I am wondering why? Would it not be desired to drop
all duplicates independent of their ts, as long as we find a record in
the store? Would be good to get some more motivation and tradeoffs
discussed about the different strategies we could use.
You also propose to drop _any_ late record: I am also not sure if that's
desired? Could this not lead to data loss? Assume we get a late record,
but in fact there was never a duplicate? Why would we want to drop it?
If there is a late record which is indeed a duplicate, but we purged the
original record from the store already, it seems to be the same case as
for the "no out-of-order case": after we purged we cannot de-duplicate
and thus it's a regular case we can just accept?
-Matthias
On 5/29/24 4:58 AM, Ayoub Omari wrote:
Hi everyone,
I've just made a (small) change to this KIP about an implementation detail.
Please let me know your thoughts.
Thank you,
Ayoub
Le lun. 20 mai 2024 à 21:13, Ayoub <ayoubomar...@gmail.com> a écrit :
Hello,
Following a discussion on community slack channel, I would like to revive
the discussion on the KIP-655, which is about adding a deduplication
processor in kafka-streams.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
Even though the motivation is not quite the same as the initial one, I
updated the KIP rather than creating a new one, as I believe the end goal
is the same.
Thanks,
Ayoub