106:

For the use-case of deduplicating a "at least once written" stream,
we are sure that the duplicate record has the same key as the
original one, and will land on the same task. Here, a user would
want to specify a deduplication key different from the topic's key
in case the topic's key is not a unique identifier

For example, we have a topic with keyValue (`userId`, `transaction`)
and deduplication is done on `transaction`.`id` . Here, the application
wants to deduplicate transactions. It knows that a transaction id
maps to a single userId. Any duplicate of that record would be received
by the task which processes this userId.

This is an interesting point.

My concern is to some extend, that it seems (on the surface) to not follow the established pattern of auto-repartitioning in the DSL. Of course, given that the current proposal says we use an "id extractor" and not a "key extractor" it might be ok (but it might be somewhat subtle). Of course, JavaDocs always help to explain in detail. Would this be enough?

Would be good to hear from others about this point. I am personally not sure which approach I would prefer personally at this point.

The problem reminds me on https://issues.apache.org/jira/browse/KAFKA-10844 which is not resolve directly either. We do have KIP-759 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling) which is WIP and helps with KAFKA-10844, but not sure if it would be a viable solution for the de-duplication case?



-Matthias


On 6/11/24 2:31 PM, Ayoub Omari wrote:
Hi Sebastien & Matthias,

For 106.
My idea was to deduplicate on a per-task basis. If the user wants
to do a global deduplication over all partitions, I think it's better to
have him explicitly repartition and then call the deduplication processor.

For the use-case of deduplicating a "at least once written" stream,
we are sure that the duplicate record has the same key as the
original one, and will land on the same task. Here, a user would
want to specify a deduplication key different from the topic's key
in case the topic's key is not a unique identifier.

For example, we have a topic with keyValue (`userId`, `transaction`)
and deduplication is done on `transaction`.`id` . Here, the application
wants to deduplicate transactions. It knows that a transaction id
maps to a single userId. Any duplicate of that record would be received
by the task which processes this userId.

One other thought I have when writing the KIP about global deduplication,
is that it will require to map twice the key of the stream (first map to
change the key to deduplication key, and second map to get
back the initial key). Second map may imply a second repartitioning.

However, if we do a per-task deduplication, the user may adapt to his
specific use-case.

Let me know what you think
Ayoub



Le mar. 11 juin 2024 à 20:21, Matthias J. Sax <mj...@apache.org> a écrit :

Thanks Sebastien,

that's a good point. Thanks for raising it. -- I like your proposal.

An alternative would be to have two overloads of `deduplicate()` one w/
and one w/o the "id extractor" parameter. This would be less explicit
though.


-Matthias

On 6/11/24 2:30 AM, Sebastien Viale wrote:
Hi,

I am really interested in this KIP.

106:
I hope I am not talking nonsense, but if you do not deduplicate based on
the key, the input stream has to be repartitioned.
Otherwise, different stream tasks may handle records that need to be
deduplicated, and thus duplicates will not be detected.

This is why I would have created two different methods, as is done for
GroupBy:

deduplicateByKey(...)
deduplicate(...)

If deduplicateByKey is used, the input stream does not need to be
repartitioned.

thanks

Sébastien
________________________________
De : Matthias J. Sax <mj...@apache.org>
Envoyé : mardi 11 juin 2024 01:54
À : dev@kafka.apache.org <dev@kafka.apache.org>
Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication processor in
kafka-streams

Warning External sender Do not click on any links or open any
attachments unless you trust the sender and know the content is safe.

Thanks for the update Ayoub.


101: you say:

But I am not sure if we don't want to have them for this processor ?

What is your reasoning to move off the established pattern? Would be
good to understand, why `Deduplicated` class needs a different
"structure" compared to existing classes.



102: Creating iterators is very expensive. For other work, we actually
hit 100x (?) throughput degradation by creating an (for most cases
empty) iterator for every input record, and needed to find other ways to
avoid creating an iterator per record. It really kills performance.

I see the point about data expiration. We could experiment with
punctuation to expire old data, or add a second "time-ordered store"
(which we already have at hand) which acts as an index into the main
store. -- Another possibility would be to add a new version of segmented
store with a different key-layout (ie, just store the plain key). I
think with some refactoring, we might be able to re-use a lot of
existing code.



104:

This gets me wondering if this is a limitation of stateful processors
in ALOS. For example a windowed aggregation with `on_window_close`
emit strategy may have the same limitation today (we receive a record,
we update its aggregation result in the store, then crash before
committing,
then the record will be again reconsidered for aggregation). Is this
correct ?

Yes, this is correct, but it does not violate ALOS, because we did not
lose the input record -- of course, the aggregation would contain the
input record twice (eg, over count), but this is ok under ALOS.
Unfortunately, for de-duplication this pattern breaks, because
de-duplication operator does a different "aggregation logic" depending
on its state (emit if no key found, but not emit if key found). For
counting as an example, we increment the count and emit unconditionally
though.


As a workaround, I think storing the record's offset inside the
store's value can tell us whether the record has been already seen or
not.
If we receive a record whose deduplication id exists in the store
and the entry in the store has the same offset, it means the record
is processed twice and we can go ahead and forward it. If the offset
is different, it means it's a duplicate record, so we ignore it.

Great idea. This might work... If we store the input record offset, we
can actually avoid that the "aggregation logic" changes for the same
input record. -- And yes, with ALOS potentially emitting a duplicate is
the-name-of-the-game, so no concerns on this part from my side.



105: picking the first offset with smallest ts sound good to me. The KIP
should be explicit about it. But as discussed above, it might be
simplest to not really have a window lookup, but just a plain key-lookup
and drop if the key exists in the store? -- For late records, it might
imply that they are not de-duplicated, but this is also the case for
in-order records if they are further apart than the de-duplication
window size, right? Thus I would believe this is "more natural" compared
to discarding late records pro-actively, which would lead to missing
result records?

We could also make this configurable if we are not sure what users
really need -- or add such a configuration later in case the semantics
we pick don't work for some users.

Another line of thinking, that did serve us well in the past: in doubt
keep a record -- users can add operators to drop record (in case they
don't want to keep it), but if we drop a record, users have no way to
resurrect it (thus, building a workaround to change semantica is
possible for users if we default to keep records, but not the other way
around).

Would be good to get input from the broader community on this question
thought. In the end, it must be a use-case driven decision?



-Matthias

This email was screened for spam and malicious content but exercise
caution anyway.





On 6/6/24 5:02 AM, Ayoub Omari wrote:
Hi Matthias,

Thank you for your review !

100.
I agree. I changed the name of the parameter to "idSelector".
Because this id may be computed, It is better to call it "id" rather
than
field or attribute.

101.
The reason I added the methods `keySerde()` and `valueSerde()` was to
have the same capabilities as other serde classes (such as Grouped
or Joined). As a Kafka-streams user, I usually use `with(keySerde,
valueSerde)`
as you suggested. But I am not sure if we don't want to have them for
this
processor ?

102.
That's a good point ! Because we know that the window store will contain
at most one instance of a given key, I am not sure how the range fetch
on WindowStore compares to a KeyValueStore `get()` in this case.
Wondering if the fact that the record's key is the prefix of the
underlying
keyValueStore's key ("<dataKey,ts>") may provide comparable performance
to the random access of KeyValueStore ? Of course, the WindowStore
fetch()
would be less efficient because it may fetch from more than 1 segment,
and
because of some iterator overhead.

The purpose of using a WindowStore is to automatically purge old data.
For example, deduplicating a topic written at least once wouldn't
require
keeping a large history. This is not the case of using a KeyValueStore
which would require scanning regularly to remove expired records.
That might cause a sudden increase of latency whenever the cleanup
is triggered.

It would be good to hear from anyone who has done some analysis
on RocksDB's range fetch.

103.
Sure, I can update it once we agree on underlying semantics.

104.
Another good point !

In the end, de-duplication does only make sense when EOS is used

I agree with that. And for me, the use case of deduplicating a topic
written ALOS inside an EOS application might be the top 1 use case
of deduplication.

all downstream processing happens, `context.forward()` returns
and we update the state store, we could now crash w/o committing
offsets

This gets me wondering if this is a limitation of stateful processors
in ALOS. For example a windowed aggregation with `on_window_close`
emit strategy may have the same limitation today (we receive a record,
we update its aggregation result in the store, then crash before
committing,
then the record will be again reconsidered for aggregation). Is this
correct ?

As a workaround, I think storing the record's offset inside the
store's value can tell us whether the record has been already seen or
not.
If we receive a record whose deduplication id exists in the store
and the entry in the store has the same offset, it means the record
is processed twice and we can go ahead and forward it. If the offset
is different, it means it's a duplicate record, so we ignore it.

As you said, we don't have any guarantee whether the initial record was
forwarded or not in case of a crash before commit. In this solution
we would forward the record twice, which is against deduplication.
But, this is still an ALOS application, so it has the same semantics
as any other such application. With this, I am not sure we can
have "strict" deduplication for ALOS applications.

105.
For me, if there are two duplicate records, it means they are
the same in the application's point of view, so it can choose
either one. Thus, I would go with forwarding the record with
the least offset.

Would it not be desired to drop all duplicates independent
of their ts, as long as we find a record in the store?

This is actually related to the (suggested) windowed nature
of deduplication. As in 102. we don't want to do a "forever"
deduplication, which may be impossible for huge workloads
where all records should be kept in the store. Hence, the fetch
of timestamp between [ts-deduplicationInterval,
ts+deduplicationInterval]

About late records, this is again due to the windowed nature.
Because the store won't save those late (i.e. expired) records,
we have two options. Either, we do not apply deduplication
on them, thus the deduplication doesn't work on late records,
or we discard them (which is the option I suggest).
In the second case, It would be up to the user to choose
any deduplicationInterval that may sufficiently cover all his late data.
What do you think ?

Thanks,
Ayoub








Le mar. 4 juin 2024 à 23:58, Matthias J. Sax <mj...@apache.org> a
écrit :

Ayoub,

thanks for resurrecting this KIP. I think a built-in de-duplication
operator will be very useful.


Couple of questions:



100: `deduplicationKeySelector`

Is this the best name? It might indicate that we select a "key" what is
an overloaded term... Maybe we could use `Field` or `Id` or `Attribute`
instead of `Key` in the name? Just brainstorming. If we think `Key` is
the best word, I am also ok with it.



101: `Deduplicated` class

You propose to add static methods `keySerde()` and `valueSerde()` -- in
other config classes, we use only `with(keySerde, valueSerde)` as we
try
to use the "builder" pattern, and avoid too many overloads. I would
prefer to omit both methods you suggest and just use a single `with`
for
both serdes.

Similarly, I thing we don't want to add `with(...)` which takes all
parameters at once (which should only be 3 parameters, not 4 as it's
currently in the KIP)?



102: Usage of `WindowedStore`:

Would this be efficient? The physical byte layout it "<dataKey,ts>" for
the store key, so it would be difficult to do an efficient lookup for a
given "de-duplication key" to discard duplicates, as we don't know the
timestamp of the first record with the same "de-duplication key".

This boils down to the actual de-duplication logic (some more comments
below), but what you propose seems to require expensive range-scans
what
could be cost prohibitive in practice. I think we need to find a way to
use efficient key-point-lookups to make this work.



103: "Processing logic":

Might need some updates (Cf 102 comment). I am not sure if I fully
understand the logic: cf 105 below.



104:

If no entries found → forward the record + save the record in the
store

This part is critical, and we should discuss in detail. In the end,
de-duplication does only make sense when EOS is used, and we might want
to call this out (eg, on the JavaDocs)? But if used with ALOS, it's
very
difficult to ensure that we never lose data... Your proposal to
first-forward goes into the right direction, but does not really solve
the problem entirely:

Even if we forward the message first, all downstream processing
happens,
`context.forward()` returns and we update the state store, we could now
crash w/o committing offsets. For this case, we have no guarantee that
the result records where published (as we did not flush the producer
yet), but when re-reading from the input topic, we would find the
record
in the store and incorrectly drop as duplicate...

I think the only solution to make this work would be to use TX-state
stores in combination with ALOS as proposed via KIP-892?

Using an in-memory store won't help much either? The producer could
have
send the write into the changelog topic, but not into the result topic,
and thus we could still not guarantee ALOS...?

How do we want to go about this? We could also say, this new operator
only works with EOS. Would this be too restrictive? -- At lest for
know,
until KIP-892 lands, and we could relax it?



105: "How to detect late records"

In the end, it seems to boil down to determine which of the records to
forward and which record to drop, for (1) the regular case and (2) the
out-of-order data case.

Regular case (no out-of-order data): For this case, offset and ts order
is the same, and we can forward the first record we get. All later
record within "de-duplication period" with the same "de-duplication
key"
would be dropped. If a record with the same "de-duplication key"
arrives
after "de-duplication period" passed, we cannot drop it any longer, but
would still forward it, as by the contract of the operator /
de-duplication period.

For the out-of-order case: The first question we need to answer is, do
we want to forward the record with the smallest offset or the record
with the smallest ts? Logically, forwarding with the smallest ts might
be "more correct", however, it implies we could only forward it after
"de-duplication period" passed, what might be undesired latency? Would
this be desired/acceptable?

In contrast, if we forward record with the smallest offset (this is
what
you seem to propose) we don't have a latency issue, but of course the
question what records to drop is more tricky to answer: it seems you
propose to compare the time difference of the stored record to the
current record, but I am wondering why? Would it not be desired to drop
all duplicates independent of their ts, as long as we find a record in
the store? Would be good to get some more motivation and tradeoffs
discussed about the different strategies we could use.

You also propose to drop _any_ late record: I am also not sure if
that's
desired? Could this not lead to data loss? Assume we get a late record,
but in fact there was never a duplicate? Why would we want to drop it?
If there is a late record which is indeed a duplicate, but we purged
the
original record from the store already, it seems to be the same case as
for the "no out-of-order case": after we purged we cannot de-duplicate
and thus it's a regular case we can just accept?



-Matthias




On 5/29/24 4:58 AM, Ayoub Omari wrote:
Hi everyone,

I've just made a (small) change to this KIP about an implementation
detail.
Please let me know your thoughts.

Thank you,
Ayoub

Le lun. 20 mai 2024 à 21:13, Ayoub <ayoubomar...@gmail.com> a écrit :

Hello,

Following a discussion on community slack channel, I would like to
revive
the discussion on the KIP-655, which is about adding a deduplication
processor in kafka-streams.




https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API


Even though the motivation is not quite the same as the initial one,
I
updated the KIP rather than creating a new one, as I believe the end
goal
is the same.

Thanks,
Ayoub








Reply via email to