Thanks!
Maybe the discussion about punctuations is more a question of
wording/phrasing? I am ok with always applying de-duplication if there
is no offset (in the end, using deduplicateXxx() should usually only
have an upstream input topic, and no punctuations anyway).
Wondering, to what extend we need to add details about "offsets" and
ALOS / punctuations in the JavaDocs? Maybe it does become overkill to
add it there, and it would be enough to have these details in the docs
only? Thoughts?
One thing I would add to the JavaDocs of `deduplicateByKeyValue()`
thought is, that it is not a key-changing operation, and thus does not
always repartition (only if the key was changed upstream), and would not
trigger a downstream repartitioing either (if a key-dependent operation
follows). The current proposed JavaDocs might be too subtle, and one
could easily (but incorrectly) assume, that we would repartition by
(key,id) pair? Might be good to explicit say something like:
This operator does not repartition by (key,id).
If a key changing operator was used before this operation,
an internal repartitioning topic will be created in Kafka
to repartition by the key only.
Overall LGTM.
Thanks a ton for pushing this forward. It was a complicated discussion,
and I am glad you did stay on it!
-Matthias
On 11/19/24 3:33 AM, Ayoub Omari wrote:
Lucas,
Thanks for your remarks.
It seems we agree to remove the `.deduplicate()` api that
does the repartitioning. So I removed it from the KIP.
I also added more details of the semantics in the java docs,
please mention anything that should be added/removed if
there is any.
I will open a vote thread for this KIP.
Best,
Ayoub
Le mer. 6 nov. 2024 à 17:29, Lucas Brutschy <lbruts...@confluent.io.invalid>
a écrit :
Hi Ayoub,
I see that there was already a very long discussion thread, I won't
start nitpicking names here. I think we are ready to move on to a
vote. The agreed upon solutioins sound very reasonable to me, I
especially like how you handled the ALOS case. Not deduplicating late
records is a defensive choice - it's the lesser evil, even if it can
lead to surprises for the user (data duplication surprises are
certainly better than data loss surprises). Let's just make sure that,
we write a long description of all those subtleties in the
documentation, and not ask people to find it in the KIP.
My main point is that I'd indeed vote to remove `deduplicate`. As
stated before, the `deduplicate` operator does many things at once and
it's easy for users to accidentally doing to much (repartitioning) by
using it. It's probably a not-so-common use case which is not so hard
to replicate using `deduplicateByKey`.
Side note on for expiring records using punctuation - I see that this
is, performance-wise, the best option we have here, so I agree going
with this approach in the KIP. Just want to throw out there that in
C++ rocksdb seems to have compaction filter which should give us the
ability to expire records during compaction (and otherwise ignore
expired records during processing) - this may be a nice long-term
improvement for these kinds of things, but it would require extending
the RocksDB JNI interfact to allow implementing custom
CompactionFilters, IIUC. So it doesn't directly affect your KIP.
Thanks for the hard work and for not giving up on this KIP!
Cheers,
Lucas
On Tue, Nov 5, 2024 at 11:45 AM Ayoub Omari <ayoubomar...@gmail.com>
wrote:
Hi Matthias,
Thanks for your review, and sorry for this late reply.
(102) Ok, I keep it open here.
(104)
Well, but if we say punctuation records are "inherently distinct",
there is nothing to be de-duplicated?
We still need to deduplicate punctuation records even if they are
distinct.
A functional deduplication acts essentially on distinct records having
the
same functional id. It's the same as for input records. The records are
distinct,
but we keep only one from those having the same deduplication id.
(105)
even if they never use a smaller interval, because in the end,
the question is always which ms is included and which one is excluded.
Agreed, the KIP now specified that "Deduplication interval ends are
inclusive."
Why do you point out that `This is a duplicate of a2` -- why does it
matter?
We did drop a2 on the floor already?
The reason why we forward a3, is because it is outside of a1's
deduplicationInterval, right? The fact that we did drop a2 on the
floor,
seems irrelevant?
Right, fixed in the KIP.
Thanks for your opinions on the semantics ! If no points are raised in
the
next
few days, I will start a voting thread.
We can always continue discussion in this thread after that :)
Best,
Ayoub
Le ven. 4 oct. 2024, 02:37, Matthias J. Sax <mj...@apache.org> a écrit :
Thanks Ayoub!
(102) let's hear what others think.
(103) Keeping it internal if fine with me. Was just curious.
(104)
The offset checking was specifically added for the case of
reprocessing
the same input record.
Yes.
Punctuation records are generated by the application. And I assume
we can consider that any two punctuated records are inherently
distinct ?
I agree. And if they are "inherently distinct" we should never
de-duplicate them, right?
Also, for me, we should apply deduplication on punctuated records,
because if the user defines a punctuator before calling
deduplication,
it means they want to deduplicate them as well, otherwise they
could define it later in the pipeline.
Well, but if we say punctuation records are "inherently distinct",
there
is nothing to be de-duplicated?
(105)
However, I don't see practical use cases of a tiny deduplication
interval.
I agree, but that's not the point. We need to clearly define it, to
avoid "off by one" errors and to be able to implement it (and write
bound tests), and give users a change to reason about it, even if they
never use a smaller interval, because in the end, the question is
always
which ms is included and which one is excluded.
Assume one has a window of 1 hour, and get a record with Y ms first,
and
later Y + 1hour -- do we forward or drop the second record is an
question we need to answer. We cannot leave this question open :)
For the "how to handle later records", your example make sense to me,
and I think it's reasonable semantics. Again: to me it's just important
that the KIP is very explicit with such edge cases and that it does not
leave any room for interpretation.
From the KIP:
event a3 at t+11s → This is a duplicate of a2 (i.e. within the
deduplicationInterval) which has not been forwarded, we will forward
it as
if there was never an event a2
Why do you point out that `This is a duplicate of a2` -- why does it
matter? We did drop a2 on the floor already?
The reason why we forward a3, is because it is outside of a1's
deduplicationInterval, right? The fact that we did drop a2 on the
floor,
seems irrelevant?
From the KIP:
The same thing applies for deduplicating out-of-order records:
event a1 @t=15s → Forwarded
event a2 @t=5s → Dropped
event a3 @t=4s → Forwarded
This is interesting. So you propose to also apply the
"deduplicationInterval" into the past. I am ok with this. Just want to
point it out, as it's an important semantic piece.
Same for the examples with stream-time and different keys and state
content. Love these examples. Makes it very clear what's going on. I am
personally happy with these semantics (I guess I would also be happy if
we would choose different semantics thought).
-Matthias
On 9/4/24 7:49 AM, Ayoub Omari wrote:
Hi Matthias,
Thanks for your comments !
(100)
Yes, there will be repartitioning if a key-changing operation
was used before. Updated java docs.
(101)
Yes, KIP updated.
(102)
My idea was that a user may think of `.deduplicate()` as
a black-box which only rejects records, without changing
the stream's structure.
I am also concerned by the auto-back-repartitioning.
If it is fine not to provide the two apis (like in groupByKey vs
groupBy),
I agree to remove it.
(103)
I think the store should be kept internal as it will have
some internal information. For example, for the eviction algorithm
we could have a time-index which is part of the same state store
(like in DualSchema stores where the first bit of each entry
determines if the entry belongs to the base or to the index store).
Also, it should hold the offset and timestamp of each record which
are technical information that an application may not need to know.
WDYT ?
(104)
The offset checking was specifically added for the case of
reprocessing
the same input record.
Punctuation records are generated by the application. And I assume
we can consider that any two punctuated records are inherently
distinct ?
Also, for me, we should apply deduplication on punctuated records,
because if the user defines a punctuator before calling
deduplication,
it means they want to deduplicate them as well, otherwise they
could define it later in the pipeline.
(105)
Thanks for these points ! I added more details and examples to the
KIP.
Some of the examples you mentioned were buried in the text, I added
some highlights and examples to make the semantics clearer.
About the lower bound of deduplicationInterval, I think it's more
intuitive
to make the interval ends inclusive, which gives a lower bound of 0.
However, I don't see practical use cases of a tiny deduplication
interval.
Ideally, the id chosen by the user to be a deduplication id should be
unique
(at least for an extended period of time). If deduplication interval
is
chosen
to be some milliseconds, then I suppose it's more rate limiting than
deduplication ?
The current semantics are more adapted to a longer
dedup-interval. Small dedup-intervals make it more
likely that an out-of-order record is a late record, in which case
records are most likely to be forwarded without dedup.
As an example, if we consider a case of de-dup interval=0,
and following events:
K1 @ t=10ms
K2 @ t=11ms
K1 @ t=10ms
Then the current semantics would detect the second event K1 as late,
and would forward it.
(Note: the first event here may still exist in the store
if eviction is not yet triggered, however it is an expired entry.
In order to have a consistent general behavior we would simulate
that all
entries
should be expired once dedup-interval is elapsed, if we find an entry
in the store which is already expired, we ignore it)
I am open to further discussion on this.
If we agree on these semantics, I will add a description of the
processor's
behavior on late records in java docs, which would make it clearer
that it's more suitable to have longer dedup-intervals.
Best,
Ayoub
Le sam. 31 août 2024 à 03:50, Matthias J. Sax <mj...@apache.org> a
écrit :
Thanks for updating the KIP.
A few comments/question:
(100): For `deduplicateByKey(...)` the input stream might still be
repartitioned if the key was changed upstream, right (similar to how
`groupByKey()` and `join()` works)?
builder.stream(...).deduplicateByKey(); // no repartitioing
buidler.stream(...).map(...).deduplicateByKey(); //
repartitioning
If my understanding is correct, it should be reflected in the
JavaDocs.
(101): I guess the same as (100) applies to
`deduplicateByKeyValue(...)`?
(102): I am not 100% sure if `deduplicate()` is the best API, and
if we
even need it? It seems we could express `deduplicate()` also as:
stream.map(...).deduplicateByKey(...).map(...).repartition();
I am also not sure, how common it is to de-duplicate on a value
field,
_and_ expect/want the result stream to be still partitioned on the
original key? I could also imagine that the semantics of
`deduplicate()`
would actually be the same as (and this might be more intuitive for
users?):
stream.map(...).deduplicateByKey(...)
Ie, there won't be no automatic "back partitioning" on the original
key.
Is there really demand for auto-back-partitioning?
Atm, I tend to think that `deduplicate()` might not be required to
get
added at all, as it's only syntactic sugar anyway (no matter which
semantics, w/ or w/o auto-back-partitioning we pick). [And if there
is
user demand down the road, it seems simple enough to add it later.]
(103) `Deduplicated`: Right now it only takes a name for the state
store, what mean, it's not possible to plug-in a different store. Is
this intentional, or should we allow to pass in `DslStoreSuppliers`
and/or `KeyValueBytesStoreSupplier`? We could also defer this for a
follow up KIP.
(104) Punctuation:
For punctuated records - whose offset value is null -
deduplication is
always performed.
Why are we doing it this way? We could also say, we never
de-duplicate
if there is no offset (similar to null-key and/or null-id). (Or
maybe
even make it configurable -- of course, we can defer a config also
for a
follow up KIP in case there is demand.)
In general, keeping stuff and letting users add additional
filtering is
the better default. If we filter, users have no way to "resurrect"
the
data, but if we keep it, adding an additional filter is possible.
(Similar to what is propose for late record -- we forward, not
drop).
(105) Deduplication logic:
I am not 100% sure if I understand the proposed logic with regard to
time-semantics and out-of-order data.
Assume there is no out-of-order data (de-dup interval 10 sec):
Example A:
k1@20 -> forwarded and inserted into store -> stream-time=20
k1@25 -> dropped as duplicate stream-time=25
k1@35 -> forwarded and inserted into store -> stream-time=35 (we
did
discard k1@20 already and thus k1@35 is not considered a duplicate)
This example is clear, however, we still need to define the exact
cut-off point, ie, what is the first record which is not dropped any
longer? k1@29 is clearly dropped, and k1@31 would clearly be
forwarded;
I am not sure about k1@30 -> would it purge k1@20 already and thus
be
forwarded, or would it be dropped?
The question boils down to if de-dup-interval of 0 is allowed (ie,
if we
define the upper-bound of the de-duplication interval as inclusive
or
exclusive)? If we allow interval size of zero (mean upper bound is
inclusive), it means we only de-duplicate with the same timestamp
for
interval 0:
Example B:
k1@20 -> forwarded and inserted into store -> stream-time=20
k1@20 -> dropped as duplicate stream-time=20
k1@21 -> forwarded and inserted into store -> stream-time=21 (we
did
discard k1@20 already and thus k1@21 is not considered a duplicate)
For this case, k1@30 in Example A above would be dropped.
However, if we think interval zero should not a valid interval, and
1 is
the allowed minimum (and thus, the upper bound is exclusive),
interval 1
says to de-duplicate with the same timestamp:
Example C (same as Example B, even if interval is 1 now, due to
different interval bound semantics):
k1@20 -> forwarded and inserted into store -> stream-time=20
k1@20 -> dropped as duplicate stream-time=20
k1@21 -> forwarded and inserted into store -> stream-time=21 (we
did
discard k1@20 already and thus k1@21 is not considered a duplicate)
For this case, k1@30 in Example A above would be forwarded.
What about out-of-order (same key):
k1@20 -> forwarded and inserted into store -> stream-time=20
k1@9 -> ?
I think we should forward; time diff is larger than de-dup
interval
Also: record is "late" with regard to stream-time, so we should
forward as "late"
k1@11 -> ?
we could de-dup; we have k1@20 in the store
ts is within de-dup-interval
ts is not older than stream-time minus de-dup interval either
(ie,
not "late")
k1@25 -> dropped as duplicate stream-time=25
k1@23 -> dropped as duplicate stream-time=25
k1@11 -> ?
we could de-dup; we have k1@20 in the store
ts is within de-dup-interval
ts IS OLDER than stream-time minus de-dup interval though, we
could
also forward as "late"
What about different keys? What about different keys in combination
with
"late" records? Overall, I could not implement the logic base on
the KIP
as there is too many unspecified cases.
-Matthias
On 8/30/24 2:08 AM, Ayoub Omari wrote:
Hi Bill,
Thanks for your answer.
Can you update the KIP to state that deduplication by value will
result
in
auto-repartiting by Kafka Streams
Updated the KIP to the latest proposal.
For option 3, what about `deduplicateByKeyValue`? (If during the
PR
phase
we stumble across a better name we can update the KIP then too).
Yes, better than the first suggested name. Reflected in the KIP.
I also added a note about records with null deduplication key.
If no new points are raised in the next few days, I will start a
VOTE
thread.
Thanks,
Ayoub
Le mar. 27 août 2024 à 17:47, Bill Bejeck <bbej...@gmail.com> a
écrit
:
Hi Ayoub,
Thanks for the update.
Your latest proposal looks good to me, but I have a couple of
comments.
in (2) we repartition by the id field
I agree with this approach, but the KIP still states "No
repartitioning
is
done by this processor. In case a user wants to deduplicate an id
over
multiple partitions, they should first repartition the topic on
this
id."
Can you update the KIP to state that deduplication by value will
result
in
auto-repartiting by Kafka Streams, unless the developer adds a
manual
repartitioning operator themselves.
For option 3, what about `deduplicateByKeyValue`? (If during the
PR
phase
we stumble across a better name we can update the KIP then too).
This latest update has been open for a while now, I'd say it's
good to
start a VOTE thread in 1-2 days.
Thanks,
Bill
On Mon, Aug 26, 2024 at 3:15 AM Ayoub Omari <
ayoubomar...@gmail.com>
wrote:
Hi Bill,
I think all the points raised are now handled in the KIP.
We just still have the 106) on which we should have an
agreement. I gave a proposition in my last mail, and
I am waiting for others' opinions.
My proposition is to have 3 methods of deduplication:
1) `deduplicateByKey()`
2) `deduplicate((k, v) -> v.id)`
3) `deduplicateByKeyAndId((k, v) -> v.id)` (Name to be chosen)
The point of having both (2) and (3) is that in (2) we
repartition
by the id field, Whereas in (3) we are not repartitioning.
(3) is needed in case the current partitioning ensures that a
same
id will always land on the same task. Here is an example I gave
above:
For example, we have a topic with keyValue (`userId`,
`transaction`)
and deduplication is done on `transaction`.`id` . Here, the
application
wants to deduplicate transactions. It knows that a transaction
id
maps to a single userId. Any duplicate of that record would be
received
by the task which processes this userId.
Let me know what you think.
Thanks,
Ayoub
Le jeu. 22 août 2024 à 21:37, Bill Bejeck <bbej...@apache.org> a
écrit :
Hi Ayoub,
What's the status of this KIP?
Regards,
Bill
On Sat, Jul 13, 2024 at 5:13 AM Ayoub Omari <
ayoubomar...@gmail.com
wrote:
Hi Bill,
Thanks for your comments !
I am prefixing your points with B.
B1.
That's a good catch. I think punctuated records aren't
concerned by
this problem of reprocessing because they are not issued twice.
We will still need to make a difference between a punctuated
and a source record. The logic becomes:
if record.offset is not null && record.offset = offset_stored:
forward(record)
else:
// do_not_forward
I will update the KIP on this point.
B2. (i.e. 106)
That's the point discussed in 106. The main idea is to avoid
repartitioning
when we don't have to. But let's say we want to repartition
when
deduplication is on an id that is not the key.
The user code may look something like this:
```kstream
.deduplicate((k, v) -> v.id)
.map((k, v) -> ...)
```
The user expects that the key in the map processor which
follows
the
deduplicate is the same as the initial key.
Internally, if we decide to repartition the deduplication
processor
may
do
the following:
```kstream
.map((k, v) -> (v.id, ...)) // For repartitioning on id
.deduplicateByKey()
.map((id, v) -> (initialKey, ...)) // To recover the
initial
key
```
In this case, there is some internal complexity of this
processor:
I) We should repartition twice, before and after
deduplication.
The
second
map triggers repartitioning if there are following stateful
processors
in the pipeline.
II.) The initial key should be kept somewhere. One way
is to
wrap
the
value
to add the key, and then unwrap it after the `deduplicateByKey`
As mentioned in one use case above (deduplicating transactions
-
written
at
least
once - by id), repartition is not needed for that case. This
added
complexity won't
be useful.
*TLDR,* I think one solution is to have a separate api to cover
such
a
use
case. In total,
we can have 3 apis of deduplication
1) `deduplicateByKey()` *// No repartitioning*
2) `deduplicateByKey((k, v) -> v.id)` *or
*`deduplicateByKeyAndId((k,
v)
-> v.id)` *// No repartitioning*
3) `deduplicate((k, v) -> v.id)` *// causes repartitioning*
The second api is equivalent to deduplicate by the couple (key,
id).
No repartitioning is required because the records with the
same key
are
already
together in the same partition. cc @Matthias @Sebastien.
B3.
I think here we can do what some existing processors do for
periodic
work,
we can track the last cleanup time and the current stream time,
if the delta exceeds the deduplication period, we trigger the
cleanup.
Best,
Ayoub
Le jeu. 11 juil. 2024 à 01:17, Bill Bejeck <bbej...@apache.org>
a
écrit
:
Hi All,
Thanks Ayoub for getting this KIP going again, I think a
deduplication
operator will be very useful. My applogies for being late to
the
discussion.
Overall - I agree with the direction of the KIP but I have a
couple
of
questions.
1. Regarding using offsets for tracking the first
arrival. I
think
there could be a case when offsets would not be available,
when
records
are
forwarded by punctuation for example.
2. I'm not sure about using something other than the key for
identifying
dupllicate records. By doing so, one could end up missing a
de-duplication
due to records with the same id characteristic in the value
but
having
different keys so the records land on different partitions. I
guess
we
could enforce a repartition if one choses to use a
de-duplication
id
other
than the key.
3. I think a punctuation scheduled to run at the
de-duplication
period
to
clean out older records would be a clean approach for purging
older
records. I'm not taking a hard stance on this approach and
I'm
willing
to
discuss different methods.
Thanks,
Bill
On 2024/06/25 18:44:06 Ayoub Omari wrote:
Hi Matthias,
Here are my updates on your points.
101.
You propose to add static methods `keySerde()` and
`valueSerde()`
--
in other config classes, we use only `with(keySerde,
valueSerde)`
as
we
try
to use the "builder" pattern, and avoid too many overloads.
I
would
prefer to omit both methods you suggest and just use a
single
`with`
for
both serdes.
I was actually inspired by the other config classes, for
example
`Joined`
and
`Grouped` both have the static methods `keySerde()` and
`valueSerde()`.
I think we don't want to add `with(...)` which takes all
parameters at once
Done.
102.
Thanks, your suggestion sounds good to me. The trade-off of
having
an
index
that allows to efficiently purge expired records besides the
keyValue
store
makes sense. I've been looking into the code, and I think a
similar
idea
was implemented for other processors (for example with
DualSchemaRocksDBSegmentedBytesStore).
As you said, I think we would benefit from some existing code
here.
KIP updated !
104.
Updated the KIP to consider records' offsets.
105
picking the first offset with smallest ts sounds good to me.
The
KIP
should be explicit about it
Done.
But as discussed above, it might be
simplest to not really have a window lookup, but just a
plain
key-lookup
and drop if the key exists in the store?
KIP updated, we will be `.get()`ing from a keyValueStore
instead
of
`.fetch()`ing
from a WindowStore.
Another line of thinking, that did serve us well in the
past:
in
doubt
keep a record -- users can add operators to drop record (in
case
they
don't want to keep it), but if we drop a record, users have
no
way
to
resurrect it (thus, building a workaround to change
semantica
is
possible for users if we default to keep records, but not
the
other
way
around).
Makes total sense ! I updated the KIP to forward late records
instead
of
dropping them.
106.
For the moment, I highlighted in Javadocs that we are
deduplicating
by
partition. If there is a better name to have this
information in
the
name
of the api itself it would be good.
Best,
Ayoub
Le jeu. 13 juin 2024 à 09:03, Sebastien Viale <
sebastien.vi...@michelin.com>
a écrit :
Hi,
106 :
Thanks for the clarification. Actually, this is not what I
expected,
but I
better understand the performance issues regarding the state
store
iteration.
If this is how it should be designed, it is fine for me as
long
as
it
is
clear that the repartition must be done before the
deduplication.
Sébastien
________________________________
De : Matthias J. Sax <mj...@apache.org>
Envoyé : jeudi 13 juin 2024 02:51
À : dev@kafka.apache.org <dev@kafka.apache.org>
Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication
processor
in
kafka-streams
Warning External sender Do not click on any links or open
any
attachments
unless you trust the sender and know the content is safe.
106:
For the use-case of deduplicating a "at least once written"
stream,
we are sure that the duplicate record has the same key as
the
original one, and will land on the same task. Here, a user
would
want to specify a deduplication key different from the
topic's
key
in case the topic's key is not a unique identifier
For example, we have a topic with keyValue (`userId`,
`transaction`)
and deduplication is done on `transaction`.`id` . Here, the
application
wants to deduplicate transactions. It knows that a
transaction
id
maps to a single userId. Any duplicate of that record would
be
received
by the task which processes this userId.
This is an interesting point.
My concern is to some extend, that it seems (on the
surface) to
not
follow the established pattern of auto-repartitioning in the
DSL.
Of
course, given that the current proposal says we use an "id
extractor"
and not a "key extractor" it might be ok (but it might be
somewhat
subtle). Of course, JavaDocs always help to explain in
detail.
Would
this be enough?
Would be good to hear from others about this point. I am
personally
not
sure which approach I would prefer personally at this point.
The problem reminds me on
https://issues.apache.org/jira/browse/KAFKA-10844<
https://issues.apache.org/jira/browse/KAFKA-10844> which is
not
resolve
directly either. We do have KIP-759
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-759:
Unneeded
repartition canceling<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-759:
Unneeded
repartition canceling>)
which is WIP and helps with KAFKA-10844, but not sure if it
would
be
a
viable solution for the de-duplication case?
-Matthias
This email was screened for spam and malicious content but
exercise
caution anyway.
On 6/11/24 2:31 PM, Ayoub Omari wrote:
Hi Sebastien & Matthias,
For 106.
My idea was to deduplicate on a per-task basis. If the user
wants
to do a global deduplication over all partitions, I think
it's
better to
have him explicitly repartition and then call the
deduplication
processor.
For the use-case of deduplicating a "at least once written"
stream,
we are sure that the duplicate record has the same key as
the
original one, and will land on the same task. Here, a user
would
want to specify a deduplication key different from the
topic's
key
in case the topic's key is not a unique identifier.
For example, we have a topic with keyValue (`userId`,
`transaction`)
and deduplication is done on `transaction`.`id` . Here, the
application
wants to deduplicate transactions. It knows that a
transaction
id
maps to a single userId. Any duplicate of that record would
be
received
by the task which processes this userId.
One other thought I have when writing the KIP about global
deduplication,
is that it will require to map twice the key of the stream
(first
map to
change the key to deduplication key, and second map to get
back the initial key). Second map may imply a second
repartitioning.
However, if we do a per-task deduplication, the user may
adapt
to
his
specific use-case.
Let me know what you think
Ayoub
Le mar. 11 juin 2024 à 20:21, Matthias J. Sax <
mj...@apache.org>
a
écrit :
Thanks Sebastien,
that's a good point. Thanks for raising it. -- I like your
proposal.
An alternative would be to have two overloads of
`deduplicate()`
one w/
and one w/o the "id extractor" parameter. This would be
less
explicit
though.
-Matthias
On 6/11/24 2:30 AM, Sebastien Viale wrote:
Hi,
I am really interested in this KIP.
106:
I hope I am not talking nonsense, but if you do not
deduplicate
based
on
the key, the input stream has to be repartitioned.
Otherwise, different stream tasks may handle records that
need
to
be
deduplicated, and thus duplicates will not be detected.
This is why I would have created two different methods,
as
is
done
for
GroupBy:
deduplicateByKey(...)
deduplicate(...)
If deduplicateByKey is used, the input stream does not
need
to
be
repartitioned.
thanks
Sébastien
________________________________
De : Matthias J. Sax <mj...@apache.org>
Envoyé : mardi 11 juin 2024 01:54
À : dev@kafka.apache.org <dev@kafka.apache.org>
Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication
processor
in
kafka-streams
Warning External sender Do not click on any links or open
any
attachments unless you trust the sender and know the
content
is
safe.
Thanks for the update Ayoub.
101: you say:
But I am not sure if we don't want to have them for this
processor ?
What is your reasoning to move off the established
pattern?
Would
be
good to understand, why `Deduplicated` class needs a
different
"structure" compared to existing classes.
102: Creating iterators is very expensive. For other
work,
we
actually
hit 100x (?) throughput degradation by creating an (for
most
cases
empty) iterator for every input record, and needed to
find
other
ways
to
avoid creating an iterator per record. It really kills
performance.
I see the point about data expiration. We could
experiment
with
punctuation to expire old data, or add a second
"time-ordered
store"
(which we already have at hand) which acts as an index
into
the
main
store. -- Another possibility would be to add a new
version
of
segmented
store with a different key-layout (ie, just store the
plain
key). I
think with some refactoring, we might be able to re-use a
lot
of
existing code.
104:
This gets me wondering if this is a limitation of
stateful
processors
in ALOS. For example a windowed aggregation with
`on_window_close`
emit strategy may have the same limitation today (we
receive a
record,
we update its aggregation result in the store, then
crash
before
committing,
then the record will be again reconsidered for
aggregation).
Is
this
correct ?
Yes, this is correct, but it does not violate ALOS,
because
we
did
not
lose the input record -- of course, the aggregation would
contain
the
input record twice (eg, over count), but this is ok under
ALOS.
Unfortunately, for de-duplication this pattern breaks,
because
de-duplication operator does a different "aggregation
logic"
depending
on its state (emit if no key found, but not emit if key
found).
For
counting as an example, we increment the count and emit
unconditionally
though.
As a workaround, I think storing the record's offset
inside
the
store's value can tell us whether the record has been
already
seen or
not.
If we receive a record whose deduplication id exists in
the
store
and the entry in the store has the same offset, it means
the
record
is processed twice and we can go ahead and forward it.
If
the
offset
is different, it means it's a duplicate record, so we
ignore
it.
Great idea. This might work... If we store the input
record
offset, we
can actually avoid that the "aggregation logic" changes
for
the
same
input record. -- And yes, with ALOS potentially emitting
a
duplicate is
the-name-of-the-game, so no concerns on this part from my
side.
105: picking the first offset with smallest ts sound good
to
me.
The
KIP
should be explicit about it. But as discussed above, it
might
be
simplest to not really have a window lookup, but just a
plain
key-lookup
and drop if the key exists in the store? -- For late
records,
it
might
imply that they are not de-duplicated, but this is also
the
case
for
in-order records if they are further apart than the
de-duplication
window size, right? Thus I would believe this is "more
natural"
compared
to discarding late records pro-actively, which would lead
to
missing
result records?
We could also make this configurable if we are not sure
what
users
really need -- or add such a configuration later in case
the
semantics
we pick don't work for some users.
Another line of thinking, that did serve us well in the
past:
in
doubt
keep a record -- users can add operators to drop record
(in
case
they
don't want to keep it), but if we drop a record, users
have
no
way
to
resurrect it (thus, building a workaround to change
semantica
is
possible for users if we default to keep records, but not
the
other way
around).
Would be good to get input from the broader community on
this
question
thought. In the end, it must be a use-case driven
decision?
-Matthias
This email was screened for spam and malicious content
but
exercise
caution anyway.
On 6/6/24 5:02 AM, Ayoub Omari wrote:
Hi Matthias,
Thank you for your review !
100.
I agree. I changed the name of the parameter to
"idSelector".
Because this id may be computed, It is better to call it
"id"
rather
than
field or attribute.
101.
The reason I added the methods `keySerde()` and
`valueSerde()`
was to
have the same capabilities as other serde classes (such
as
Grouped
or Joined). As a Kafka-streams user, I usually use
`with(keySerde,
valueSerde)`
as you suggested. But I am not sure if we don't want to
have
them
for
this
processor ?
102.
That's a good point ! Because we know that the window
store
will
contain
at most one instance of a given key, I am not sure how
the
range
fetch
on WindowStore compares to a KeyValueStore `get()` in
this
case.
Wondering if the fact that the record's key is the
prefix
of
the
underlying
keyValueStore's key ("<dataKey,ts>") may provide
comparable
performance
to the random access of KeyValueStore ? Of course, the
WindowStore
fetch()
would be less efficient because it may fetch from more
than
1
segment,
and
because of some iterator overhead.
The purpose of using a WindowStore is to automatically
purge
old
data.
For example, deduplicating a topic written at least once
wouldn't
require
keeping a large history. This is not the case of using a
KeyValueStore
which would require scanning regularly to remove expired
records.
That might cause a sudden increase of latency whenever
the
cleanup
is triggered.
It would be good to hear from anyone who has done some
analysis
on RocksDB's range fetch.
103.
Sure, I can update it once we agree on underlying
semantics.
104.
Another good point !
In the end, de-duplication does only make sense when
EOS
is
used
I agree with that. And for me, the use case of
deduplicating a
topic
written ALOS inside an EOS application might be the top
1
use
case
of deduplication.
all downstream processing happens, `context.forward()`
returns
and we update the state store, we could now crash w/o
committing
offsets
This gets me wondering if this is a limitation of
stateful
processors
in ALOS. For example a windowed aggregation with
`on_window_close`
emit strategy may have the same limitation today (we
receive a
record,
we update its aggregation result in the store, then
crash
before
committing,
then the record will be again reconsidered for
aggregation).
Is
this
correct ?
As a workaround, I think storing the record's offset
inside
the
store's value can tell us whether the record has been
already
seen or
not.
If we receive a record whose deduplication id exists in
the
store
and the entry in the store has the same offset, it means
the
record
is processed twice and we can go ahead and forward it.
If
the
offset
is different, it means it's a duplicate record, so we
ignore
it.
As you said, we don't have any guarantee whether the
initial
record
was
forwarded or not in case of a crash before commit. In
this
solution
we would forward the record twice, which is against
deduplication.
But, this is still an ALOS application, so it has the
same
semantics
as any other such application. With this, I am not sure
we
can
have "strict" deduplication for ALOS applications.
105.
For me, if there are two duplicate records, it means
they
are
the same in the application's point of view, so it can
choose
either one. Thus, I would go with forwarding the record
with
the least offset.
Would it not be desired to drop all duplicates
independent
of their ts, as long as we find a record in the store?
This is actually related to the (suggested) windowed
nature
of deduplication. As in 102. we don't want to do a
"forever"
deduplication, which may be impossible for huge
workloads
where all records should be kept in the store. Hence,
the
fetch
of timestamp between [ts-deduplicationInterval,
ts+deduplicationInterval]
About late records, this is again due to the windowed
nature.
Because the store won't save those late (i.e. expired)
records,
we have two options. Either, we do not apply
deduplication
on them, thus the deduplication doesn't work on late
records,
or we discard them (which is the option I suggest).
In the second case, It would be up to the user to choose
any deduplicationInterval that may sufficiently cover
all
his
late
data.
What do you think ?
Thanks,
Ayoub
Le mar. 4 juin 2024 à 23:58, Matthias J. Sax <
mj...@apache.org>
a
écrit :
Ayoub,
thanks for resurrecting this KIP. I think a built-in
de-duplication
operator will be very useful.
Couple of questions:
100: `deduplicationKeySelector`
Is this the best name? It might indicate that we
select a
"key"
what
is
an overloaded term... Maybe we could use `Field` or
`Id`
or
`Attribute`
instead of `Key` in the name? Just brainstorming. If we
think
`Key`
is
the best word, I am also ok with it.
101: `Deduplicated` class
You propose to add static methods `keySerde()` and
`valueSerde()` --
in
other config classes, we use only `with(keySerde,
valueSerde)`
as we
try
to use the "builder" pattern, and avoid too many
overloads. I
would
prefer to omit both methods you suggest and just use a
single
`with`
for
both serdes.
Similarly, I thing we don't want to add `with(...)`
which
takes
all
parameters at once (which should only be 3 parameters,
not
4
as
it's
currently in the KIP)?
102: Usage of `WindowedStore`:
Would this be efficient? The physical byte layout it
"<dataKey,ts>"
for
the store key, so it would be difficult to do an
efficient
lookup
for a
given "de-duplication key" to discard duplicates, as we
don't
know
the
timestamp of the first record with the same
"de-duplication
key".
This boils down to the actual de-duplication logic
(some
more
comments
below), but what you propose seems to require expensive
range-scans
what
could be cost prohibitive in practice. I think we need
to
find
a
way
to
use efficient key-point-lookups to make this work.
103: "Processing logic":
Might need some updates (Cf 102 comment). I am not sure
if
I
fully
understand the logic: cf 105 below.
104:
If no entries found → forward the record + save the
record
in
the
store
This part is critical, and we should discuss in detail.
In
the
end,
de-duplication does only make sense when EOS is used,
and
we
might
want
to call this out (eg, on the JavaDocs)? But if used
with
ALOS,
it's
very
difficult to ensure that we never lose data... Your
proposal
to
first-forward goes into the right direction, but does
not
really
solve
the problem entirely:
Even if we forward the message first, all downstream
processing
happens,
`context.forward()` returns and we update the state
store,
we
could
now
crash w/o committing offsets. For this case, we have no
guarantee
that
the result records where published (as we did not flush
the
producer
yet), but when re-reading from the input topic, we
would
find
the
record
in the store and incorrectly drop as duplicate...
I think the only solution to make this work would be to
use
TX-state
stores in combination with ALOS as proposed via
KIP-892?
Using an in-memory store won't help much either? The
producer
could
have
send the write into the changelog topic, but not into
the
result
topic,
and thus we could still not guarantee ALOS...?
How do we want to go about this? We could also say,
this
new
operator
only works with EOS. Would this be too restrictive? --
At
lest
for
know,
until KIP-892 lands, and we could relax it?
105: "How to detect late records"
In the end, it seems to boil down to determine which of
the
records
to
forward and which record to drop, for (1) the regular
case
and
(2)
the
out-of-order data case.
Regular case (no out-of-order data): For this case,
offset
and
ts
order
is the same, and we can forward the first record we
get.
All
later
record within "de-duplication period" with the same
"de-duplication
key"
would be dropped. If a record with the same
"de-duplication
key"
arrives
after "de-duplication period" passed, we cannot drop it
any
longer,
but
would still forward it, as by the contract of the
operator
/
de-duplication period.
For the out-of-order case: The first question we need
to
answer
is,
do
we want to forward the record with the smallest offset
or
the
record
with the smallest ts? Logically, forwarding with the
smallest
ts
might
be "more correct", however, it implies we could only
forward
it
after
"de-duplication period" passed, what might be undesired
latency?
Would
this be desired/acceptable?
In contrast, if we forward record with the smallest
offset
(this
is
what
you seem to propose) we don't have a latency issue, but
of
course the
question what records to drop is more tricky to answer:
it
seems
you
propose to compare the time difference of the stored
record
to
the
current record, but I am wondering why? Would it not be
desired
to
drop
all duplicates independent of their ts, as long as we
find
a
record
in
the store? Would be good to get some more motivation
and
tradeoffs
discussed about the different strategies we could use.
You also propose to drop _any_ late record: I am also
not
sure
if
that's
desired? Could this not lead to data loss? Assume we
get
a
late
record,
but in fact there was never a duplicate? Why would we
want
to
drop
it?
If there is a late record which is indeed a duplicate,
but
we
purged
the
original record from the store already, it seems to be
the
same
case
as
for the "no out-of-order case": after we purged we
cannot
de-duplicate
and thus it's a regular case we can just accept?
-Matthias
On 5/29/24 4:58 AM, Ayoub Omari wrote:
Hi everyone,
I've just made a (small) change to this KIP about an
implementation
detail.
Please let me know your thoughts.
Thank you,
Ayoub
Le lun. 20 mai 2024 à 21:13, Ayoub <
ayoubomar...@gmail.com>
a
écrit :
Hello,
Following a discussion on community slack channel, I
would
like to
revive
the discussion on the KIP-655, which is about adding
a
deduplication
processor in kafka-streams.