Thanks for updating the KIP.
A few comments/question:
(100): For `deduplicateByKey(...)` the input stream might still be
repartitioned if the key was changed upstream, right (similar to how
`groupByKey()` and `join()` works)?
builder.stream(...).deduplicateByKey(); // no repartitioing
buidler.stream(...).map(...).deduplicateByKey(); // repartitioning
If my understanding is correct, it should be reflected in the JavaDocs.
(101): I guess the same as (100) applies to `deduplicateByKeyValue(...)`?
(102): I am not 100% sure if `deduplicate()` is the best API, and if we
even need it? It seems we could express `deduplicate()` also as:
stream.map(...).deduplicateByKey(...).map(...).repartition();
I am also not sure, how common it is to de-duplicate on a value field,
_and_ expect/want the result stream to be still partitioned on the
original key? I could also imagine that the semantics of `deduplicate()`
would actually be the same as (and this might be more intuitive for users?):
stream.map(...).deduplicateByKey(...)
Ie, there won't be no automatic "back partitioning" on the original key.
Is there really demand for auto-back-partitioning?
Atm, I tend to think that `deduplicate()` might not be required to get
added at all, as it's only syntactic sugar anyway (no matter which
semantics, w/ or w/o auto-back-partitioning we pick). [And if there is
user demand down the road, it seems simple enough to add it later.]
(103) `Deduplicated`: Right now it only takes a name for the state
store, what mean, it's not possible to plug-in a different store. Is
this intentional, or should we allow to pass in `DslStoreSuppliers`
and/or `KeyValueBytesStoreSupplier`? We could also defer this for a
follow up KIP.
(104) Punctuation:
For punctuated records - whose offset value is null - deduplication is always
performed.
Why are we doing it this way? We could also say, we never de-duplicate
if there is no offset (similar to null-key and/or null-id). (Or maybe
even make it configurable -- of course, we can defer a config also for a
follow up KIP in case there is demand.)
In general, keeping stuff and letting users add additional filtering is
the better default. If we filter, users have no way to "resurrect" the
data, but if we keep it, adding an additional filter is possible.
(Similar to what is propose for late record -- we forward, not drop).
(105) Deduplication logic:
I am not 100% sure if I understand the proposed logic with regard to
time-semantics and out-of-order data.
Assume there is no out-of-order data (de-dup interval 10 sec):
Example A:
k1@20 -> forwarded and inserted into store -> stream-time=20
k1@25 -> dropped as duplicate stream-time=25
k1@35 -> forwarded and inserted into store -> stream-time=35 (we did
discard k1@20 already and thus k1@35 is not considered a duplicate)
This example is clear, however, we still need to define the exact
cut-off point, ie, what is the first record which is not dropped any
longer? k1@29 is clearly dropped, and k1@31 would clearly be forwarded;
I am not sure about k1@30 -> would it purge k1@20 already and thus be
forwarded, or would it be dropped?
The question boils down to if de-dup-interval of 0 is allowed (ie, if we
define the upper-bound of the de-duplication interval as inclusive or
exclusive)? If we allow interval size of zero (mean upper bound is
inclusive), it means we only de-duplicate with the same timestamp for
interval 0:
Example B:
k1@20 -> forwarded and inserted into store -> stream-time=20
k1@20 -> dropped as duplicate stream-time=20
k1@21 -> forwarded and inserted into store -> stream-time=21 (we did
discard k1@20 already and thus k1@21 is not considered a duplicate)
For this case, k1@30 in Example A above would be dropped.
However, if we think interval zero should not a valid interval, and 1 is
the allowed minimum (and thus, the upper bound is exclusive), interval 1
says to de-duplicate with the same timestamp:
Example C (same as Example B, even if interval is 1 now, due to
different interval bound semantics):
k1@20 -> forwarded and inserted into store -> stream-time=20
k1@20 -> dropped as duplicate stream-time=20
k1@21 -> forwarded and inserted into store -> stream-time=21 (we did
discard k1@20 already and thus k1@21 is not considered a duplicate)
For this case, k1@30 in Example A above would be forwarded.
What about out-of-order (same key):
k1@20 -> forwarded and inserted into store -> stream-time=20
k1@9 -> ?
I think we should forward; time diff is larger than de-dup interval
Also: record is "late" with regard to stream-time, so we should
forward as "late"
k1@11 -> ?
we could de-dup; we have k1@20 in the store
ts is within de-dup-interval
ts is not older than stream-time minus de-dup interval either (ie,
not "late")
k1@25 -> dropped as duplicate stream-time=25
k1@23 -> dropped as duplicate stream-time=25
k1@11 -> ?
we could de-dup; we have k1@20 in the store
ts is within de-dup-interval
ts IS OLDER than stream-time minus de-dup interval though, we could
also forward as "late"
What about different keys? What about different keys in combination with
"late" records? Overall, I could not implement the logic base on the KIP
as there is too many unspecified cases.
-Matthias
On 8/30/24 2:08 AM, Ayoub Omari wrote:
Hi Bill,
Thanks for your answer.
Can you update the KIP to state that deduplication by value will result in
auto-repartiting by Kafka Streams
Updated the KIP to the latest proposal.
For option 3, what about `deduplicateByKeyValue`? (If during the PR phase
we stumble across a better name we can update the KIP then too).
Yes, better than the first suggested name. Reflected in the KIP.
I also added a note about records with null deduplication key.
If no new points are raised in the next few days, I will start a VOTE
thread.
Thanks,
Ayoub
Le mar. 27 août 2024 à 17:47, Bill Bejeck <bbej...@gmail.com> a écrit :
Hi Ayoub,
Thanks for the update.
Your latest proposal looks good to me, but I have a couple of comments.
in (2) we repartition by the id field
I agree with this approach, but the KIP still states "No repartitioning is
done by this processor. In case a user wants to deduplicate an id over
multiple partitions, they should first repartition the topic on this id."
Can you update the KIP to state that deduplication by value will result in
auto-repartiting by Kafka Streams, unless the developer adds a manual
repartitioning operator themselves.
For option 3, what about `deduplicateByKeyValue`? (If during the PR phase
we stumble across a better name we can update the KIP then too).
This latest update has been open for a while now, I'd say it's good to
start a VOTE thread in 1-2 days.
Thanks,
Bill
On Mon, Aug 26, 2024 at 3:15 AM Ayoub Omari <ayoubomar...@gmail.com>
wrote:
Hi Bill,
I think all the points raised are now handled in the KIP.
We just still have the 106) on which we should have an
agreement. I gave a proposition in my last mail, and
I am waiting for others' opinions.
My proposition is to have 3 methods of deduplication:
1) `deduplicateByKey()`
2) `deduplicate((k, v) -> v.id)`
3) `deduplicateByKeyAndId((k, v) -> v.id)` (Name to be chosen)
The point of having both (2) and (3) is that in (2) we repartition
by the id field, Whereas in (3) we are not repartitioning.
(3) is needed in case the current partitioning ensures that a same
id will always land on the same task. Here is an example I gave above:
For example, we have a topic with keyValue (`userId`, `transaction`)
and deduplication is done on `transaction`.`id` . Here, the application
wants to deduplicate transactions. It knows that a transaction id
maps to a single userId. Any duplicate of that record would be received
by the task which processes this userId.
Let me know what you think.
Thanks,
Ayoub
Le jeu. 22 août 2024 à 21:37, Bill Bejeck <bbej...@apache.org> a écrit :
Hi Ayoub,
What's the status of this KIP?
Regards,
Bill
On Sat, Jul 13, 2024 at 5:13 AM Ayoub Omari <ayoubomar...@gmail.com>
wrote:
Hi Bill,
Thanks for your comments !
I am prefixing your points with B.
B1.
That's a good catch. I think punctuated records aren't concerned by
this problem of reprocessing because they are not issued twice.
We will still need to make a difference between a punctuated
and a source record. The logic becomes:
if record.offset is not null && record.offset = offset_stored:
forward(record)
else:
// do_not_forward
I will update the KIP on this point.
B2. (i.e. 106)
That's the point discussed in 106. The main idea is to avoid
repartitioning
when we don't have to. But let's say we want to repartition when
deduplication is on an id that is not the key.
The user code may look something like this:
```kstream
.deduplicate((k, v) -> v.id)
.map((k, v) -> ...)
```
The user expects that the key in the map processor which follows the
deduplicate is the same as the initial key.
Internally, if we decide to repartition the deduplication processor
may
do
the following:
```kstream
.map((k, v) -> (v.id, ...)) // For repartitioning on id
.deduplicateByKey()
.map((id, v) -> (initialKey, ...)) // To recover the initial key
```
In this case, there is some internal complexity of this processor:
I) We should repartition twice, before and after deduplication.
The
second
map triggers repartitioning if there are following stateful
processors
in the pipeline.
II.) The initial key should be kept somewhere. One way is to wrap
the
value
to add the key, and then unwrap it after the `deduplicateByKey`
As mentioned in one use case above (deduplicating transactions -
written
at
least
once - by id), repartition is not needed for that case. This added
complexity won't
be useful.
*TLDR,* I think one solution is to have a separate api to cover such
a
use
case. In total,
we can have 3 apis of deduplication
1) `deduplicateByKey()` *// No repartitioning*
2) `deduplicateByKey((k, v) -> v.id)` *or
*`deduplicateByKeyAndId((k,
v)
-> v.id)` *// No repartitioning*
3) `deduplicate((k, v) -> v.id)` *// causes repartitioning*
The second api is equivalent to deduplicate by the couple (key, id).
No repartitioning is required because the records with the same key
are
already
together in the same partition. cc @Matthias @Sebastien.
B3.
I think here we can do what some existing processors do for periodic
work,
we can track the last cleanup time and the current stream time,
if the delta exceeds the deduplication period, we trigger the
cleanup.
Best,
Ayoub
Le jeu. 11 juil. 2024 à 01:17, Bill Bejeck <bbej...@apache.org> a
écrit
:
Hi All,
Thanks Ayoub for getting this KIP going again, I think a
deduplication
operator will be very useful. My applogies for being late to the
discussion.
Overall - I agree with the direction of the KIP but I have a couple
of
questions.
1. Regarding using offsets for tracking the first arrival. I
think
there could be a case when offsets would not be available, when
records
are
forwarded by punctuation for example.
2. I'm not sure about using something other than the key for
identifying
dupllicate records. By doing so, one could end up missing a
de-duplication
due to records with the same id characteristic in the value but
having
different keys so the records land on different partitions. I
guess
we
could enforce a repartition if one choses to use a de-duplication
id
other
than the key.
3. I think a punctuation scheduled to run at the de-duplication
period
to
clean out older records would be a clean approach for purging older
records. I'm not taking a hard stance on this approach and I'm
willing
to
discuss different methods.
Thanks,
Bill
On 2024/06/25 18:44:06 Ayoub Omari wrote:
Hi Matthias,
Here are my updates on your points.
101.
You propose to add static methods `keySerde()` and
`valueSerde()`
--
in other config classes, we use only `with(keySerde,
valueSerde)`
as
we
try
to use the "builder" pattern, and avoid too many overloads. I
would
prefer to omit both methods you suggest and just use a single
`with`
for
both serdes.
I was actually inspired by the other config classes, for example
`Joined`
and
`Grouped` both have the static methods `keySerde()` and
`valueSerde()`.
I think we don't want to add `with(...)` which takes all
parameters at once
Done.
102.
Thanks, your suggestion sounds good to me. The trade-off of
having
an
index
that allows to efficiently purge expired records besides the
keyValue
store
makes sense. I've been looking into the code, and I think a
similar
idea
was implemented for other processors (for example with
DualSchemaRocksDBSegmentedBytesStore).
As you said, I think we would benefit from some existing code
here.
KIP updated !
104.
Updated the KIP to consider records' offsets.
105
picking the first offset with smallest ts sounds good to me.
The
KIP
should be explicit about it
Done.
But as discussed above, it might be
simplest to not really have a window lookup, but just a plain
key-lookup
and drop if the key exists in the store?
KIP updated, we will be `.get()`ing from a keyValueStore instead
of
`.fetch()`ing
from a WindowStore.
Another line of thinking, that did serve us well in the past:
in
doubt
keep a record -- users can add operators to drop record (in
case
they
don't want to keep it), but if we drop a record, users have no
way
to
resurrect it (thus, building a workaround to change semantica
is
possible for users if we default to keep records, but not the
other
way
around).
Makes total sense ! I updated the KIP to forward late records
instead
of
dropping them.
106.
For the moment, I highlighted in Javadocs that we are
deduplicating
by
partition. If there is a better name to have this information in
the
name
of the api itself it would be good.
Best,
Ayoub
Le jeu. 13 juin 2024 à 09:03, Sebastien Viale <
sebastien.vi...@michelin.com>
a écrit :
Hi,
106 :
Thanks for the clarification. Actually, this is not what I
expected,
but I
better understand the performance issues regarding the state
store
iteration.
If this is how it should be designed, it is fine for me as long
as
it
is
clear that the repartition must be done before the
deduplication.
Sébastien
________________________________
De : Matthias J. Sax <mj...@apache.org>
Envoyé : jeudi 13 juin 2024 02:51
À : dev@kafka.apache.org <dev@kafka.apache.org>
Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication
processor
in
kafka-streams
Warning External sender Do not click on any links or open any
attachments
unless you trust the sender and know the content is safe.
106:
For the use-case of deduplicating a "at least once written"
stream,
we are sure that the duplicate record has the same key as the
original one, and will land on the same task. Here, a user
would
want to specify a deduplication key different from the
topic's
key
in case the topic's key is not a unique identifier
For example, we have a topic with keyValue (`userId`,
`transaction`)
and deduplication is done on `transaction`.`id` . Here, the
application
wants to deduplicate transactions. It knows that a
transaction
id
maps to a single userId. Any duplicate of that record would
be
received
by the task which processes this userId.
This is an interesting point.
My concern is to some extend, that it seems (on the surface) to
not
follow the established pattern of auto-repartitioning in the
DSL.
Of
course, given that the current proposal says we use an "id
extractor"
and not a "key extractor" it might be ok (but it might be
somewhat
subtle). Of course, JavaDocs always help to explain in detail.
Would
this be enough?
Would be good to hear from others about this point. I am
personally
not
sure which approach I would prefer personally at this point.
The problem reminds me on
https://issues.apache.org/jira/browse/KAFKA-10844<
https://issues.apache.org/jira/browse/KAFKA-10844> which is
not
resolve
directly either. We do have KIP-759
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-759:
Unneeded
repartition canceling<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-759:
Unneeded
repartition canceling>)
which is WIP and helps with KAFKA-10844, but not sure if it
would
be
a
viable solution for the de-duplication case?
-Matthias
This email was screened for spam and malicious content but
exercise
caution anyway.
On 6/11/24 2:31 PM, Ayoub Omari wrote:
Hi Sebastien & Matthias,
For 106.
My idea was to deduplicate on a per-task basis. If the user
wants
to do a global deduplication over all partitions, I think
it's
better to
have him explicitly repartition and then call the
deduplication
processor.
For the use-case of deduplicating a "at least once written"
stream,
we are sure that the duplicate record has the same key as the
original one, and will land on the same task. Here, a user
would
want to specify a deduplication key different from the
topic's
key
in case the topic's key is not a unique identifier.
For example, we have a topic with keyValue (`userId`,
`transaction`)
and deduplication is done on `transaction`.`id` . Here, the
application
wants to deduplicate transactions. It knows that a
transaction
id
maps to a single userId. Any duplicate of that record would
be
received
by the task which processes this userId.
One other thought I have when writing the KIP about global
deduplication,
is that it will require to map twice the key of the stream
(first
map to
change the key to deduplication key, and second map to get
back the initial key). Second map may imply a second
repartitioning.
However, if we do a per-task deduplication, the user may
adapt
to
his
specific use-case.
Let me know what you think
Ayoub
Le mar. 11 juin 2024 à 20:21, Matthias J. Sax <
mj...@apache.org>
a
écrit :
Thanks Sebastien,
that's a good point. Thanks for raising it. -- I like your
proposal.
An alternative would be to have two overloads of
`deduplicate()`
one w/
and one w/o the "id extractor" parameter. This would be less
explicit
though.
-Matthias
On 6/11/24 2:30 AM, Sebastien Viale wrote:
Hi,
I am really interested in this KIP.
106:
I hope I am not talking nonsense, but if you do not
deduplicate
based
on
the key, the input stream has to be repartitioned.
Otherwise, different stream tasks may handle records that
need
to
be
deduplicated, and thus duplicates will not be detected.
This is why I would have created two different methods, as
is
done
for
GroupBy:
deduplicateByKey(...)
deduplicate(...)
If deduplicateByKey is used, the input stream does not need
to
be
repartitioned.
thanks
Sébastien
________________________________
De : Matthias J. Sax <mj...@apache.org>
Envoyé : mardi 11 juin 2024 01:54
À : dev@kafka.apache.org <dev@kafka.apache.org>
Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication
processor
in
kafka-streams
Warning External sender Do not click on any links or open
any
attachments unless you trust the sender and know the content
is
safe.
Thanks for the update Ayoub.
101: you say:
But I am not sure if we don't want to have them for this
processor ?
What is your reasoning to move off the established pattern?
Would
be
good to understand, why `Deduplicated` class needs a
different
"structure" compared to existing classes.
102: Creating iterators is very expensive. For other work,
we
actually
hit 100x (?) throughput degradation by creating an (for
most
cases
empty) iterator for every input record, and needed to find
other
ways
to
avoid creating an iterator per record. It really kills
performance.
I see the point about data expiration. We could experiment
with
punctuation to expire old data, or add a second
"time-ordered
store"
(which we already have at hand) which acts as an index into
the
main
store. -- Another possibility would be to add a new version
of
segmented
store with a different key-layout (ie, just store the plain
key). I
think with some refactoring, we might be able to re-use a
lot
of
existing code.
104:
This gets me wondering if this is a limitation of stateful
processors
in ALOS. For example a windowed aggregation with
`on_window_close`
emit strategy may have the same limitation today (we
receive a
record,
we update its aggregation result in the store, then crash
before
committing,
then the record will be again reconsidered for
aggregation).
Is
this
correct ?
Yes, this is correct, but it does not violate ALOS, because
we
did
not
lose the input record -- of course, the aggregation would
contain
the
input record twice (eg, over count), but this is ok under
ALOS.
Unfortunately, for de-duplication this pattern breaks,
because
de-duplication operator does a different "aggregation
logic"
depending
on its state (emit if no key found, but not emit if key
found).
For
counting as an example, we increment the count and emit
unconditionally
though.
As a workaround, I think storing the record's offset
inside
the
store's value can tell us whether the record has been
already
seen or
not.
If we receive a record whose deduplication id exists in
the
store
and the entry in the store has the same offset, it means
the
record
is processed twice and we can go ahead and forward it. If
the
offset
is different, it means it's a duplicate record, so we
ignore
it.
Great idea. This might work... If we store the input record
offset, we
can actually avoid that the "aggregation logic" changes for
the
same
input record. -- And yes, with ALOS potentially emitting a
duplicate is
the-name-of-the-game, so no concerns on this part from my
side.
105: picking the first offset with smallest ts sound good
to
me.
The
KIP
should be explicit about it. But as discussed above, it
might
be
simplest to not really have a window lookup, but just a
plain
key-lookup
and drop if the key exists in the store? -- For late
records,
it
might
imply that they are not de-duplicated, but this is also the
case
for
in-order records if they are further apart than the
de-duplication
window size, right? Thus I would believe this is "more
natural"
compared
to discarding late records pro-actively, which would lead
to
missing
result records?
We could also make this configurable if we are not sure
what
users
really need -- or add such a configuration later in case
the
semantics
we pick don't work for some users.
Another line of thinking, that did serve us well in the
past:
in
doubt
keep a record -- users can add operators to drop record (in
case
they
don't want to keep it), but if we drop a record, users have
no
way
to
resurrect it (thus, building a workaround to change
semantica
is
possible for users if we default to keep records, but not
the
other way
around).
Would be good to get input from the broader community on
this
question
thought. In the end, it must be a use-case driven decision?
-Matthias
This email was screened for spam and malicious content but
exercise
caution anyway.
On 6/6/24 5:02 AM, Ayoub Omari wrote:
Hi Matthias,
Thank you for your review !
100.
I agree. I changed the name of the parameter to
"idSelector".
Because this id may be computed, It is better to call it
"id"
rather
than
field or attribute.
101.
The reason I added the methods `keySerde()` and
`valueSerde()`
was to
have the same capabilities as other serde classes (such as
Grouped
or Joined). As a Kafka-streams user, I usually use
`with(keySerde,
valueSerde)`
as you suggested. But I am not sure if we don't want to
have
them
for
this
processor ?
102.
That's a good point ! Because we know that the window
store
will
contain
at most one instance of a given key, I am not sure how the
range
fetch
on WindowStore compares to a KeyValueStore `get()` in this
case.
Wondering if the fact that the record's key is the prefix
of
the
underlying
keyValueStore's key ("<dataKey,ts>") may provide
comparable
performance
to the random access of KeyValueStore ? Of course, the
WindowStore
fetch()
would be less efficient because it may fetch from more
than
1
segment,
and
because of some iterator overhead.
The purpose of using a WindowStore is to automatically
purge
old
data.
For example, deduplicating a topic written at least once
wouldn't
require
keeping a large history. This is not the case of using a
KeyValueStore
which would require scanning regularly to remove expired
records.
That might cause a sudden increase of latency whenever the
cleanup
is triggered.
It would be good to hear from anyone who has done some
analysis
on RocksDB's range fetch.
103.
Sure, I can update it once we agree on underlying
semantics.
104.
Another good point !
In the end, de-duplication does only make sense when EOS
is
used
I agree with that. And for me, the use case of
deduplicating a
topic
written ALOS inside an EOS application might be the top 1
use
case
of deduplication.
all downstream processing happens, `context.forward()`
returns
and we update the state store, we could now crash w/o
committing
offsets
This gets me wondering if this is a limitation of stateful
processors
in ALOS. For example a windowed aggregation with
`on_window_close`
emit strategy may have the same limitation today (we
receive a
record,
we update its aggregation result in the store, then crash
before
committing,
then the record will be again reconsidered for
aggregation).
Is
this
correct ?
As a workaround, I think storing the record's offset
inside
the
store's value can tell us whether the record has been
already
seen or
not.
If we receive a record whose deduplication id exists in
the
store
and the entry in the store has the same offset, it means
the
record
is processed twice and we can go ahead and forward it. If
the
offset
is different, it means it's a duplicate record, so we
ignore
it.
As you said, we don't have any guarantee whether the
initial
record
was
forwarded or not in case of a crash before commit. In this
solution
we would forward the record twice, which is against
deduplication.
But, this is still an ALOS application, so it has the same
semantics
as any other such application. With this, I am not sure we
can
have "strict" deduplication for ALOS applications.
105.
For me, if there are two duplicate records, it means they
are
the same in the application's point of view, so it can
choose
either one. Thus, I would go with forwarding the record
with
the least offset.
Would it not be desired to drop all duplicates
independent
of their ts, as long as we find a record in the store?
This is actually related to the (suggested) windowed
nature
of deduplication. As in 102. we don't want to do a
"forever"
deduplication, which may be impossible for huge workloads
where all records should be kept in the store. Hence, the
fetch
of timestamp between [ts-deduplicationInterval,
ts+deduplicationInterval]
About late records, this is again due to the windowed
nature.
Because the store won't save those late (i.e. expired)
records,
we have two options. Either, we do not apply deduplication
on them, thus the deduplication doesn't work on late
records,
or we discard them (which is the option I suggest).
In the second case, It would be up to the user to choose
any deduplicationInterval that may sufficiently cover all
his
late
data.
What do you think ?
Thanks,
Ayoub
Le mar. 4 juin 2024 à 23:58, Matthias J. Sax <
mj...@apache.org>
a
écrit :
Ayoub,
thanks for resurrecting this KIP. I think a built-in
de-duplication
operator will be very useful.
Couple of questions:
100: `deduplicationKeySelector`
Is this the best name? It might indicate that we select a
"key"
what
is
an overloaded term... Maybe we could use `Field` or `Id`
or
`Attribute`
instead of `Key` in the name? Just brainstorming. If we
think
`Key`
is
the best word, I am also ok with it.
101: `Deduplicated` class
You propose to add static methods `keySerde()` and
`valueSerde()` --
in
other config classes, we use only `with(keySerde,
valueSerde)`
as we
try
to use the "builder" pattern, and avoid too many
overloads. I
would
prefer to omit both methods you suggest and just use a
single
`with`
for
both serdes.
Similarly, I thing we don't want to add `with(...)` which
takes
all
parameters at once (which should only be 3 parameters,
not
4
as
it's
currently in the KIP)?
102: Usage of `WindowedStore`:
Would this be efficient? The physical byte layout it
"<dataKey,ts>"
for
the store key, so it would be difficult to do an
efficient
lookup
for a
given "de-duplication key" to discard duplicates, as we
don't
know
the
timestamp of the first record with the same
"de-duplication
key".
This boils down to the actual de-duplication logic (some
more
comments
below), but what you propose seems to require expensive
range-scans
what
could be cost prohibitive in practice. I think we need to
find
a
way
to
use efficient key-point-lookups to make this work.
103: "Processing logic":
Might need some updates (Cf 102 comment). I am not sure
if
I
fully
understand the logic: cf 105 below.
104:
If no entries found → forward the record + save the
record
in
the
store
This part is critical, and we should discuss in detail.
In
the
end,
de-duplication does only make sense when EOS is used, and
we
might
want
to call this out (eg, on the JavaDocs)? But if used with
ALOS,
it's
very
difficult to ensure that we never lose data... Your
proposal
to
first-forward goes into the right direction, but does not
really
solve
the problem entirely:
Even if we forward the message first, all downstream
processing
happens,
`context.forward()` returns and we update the state
store,
we
could
now
crash w/o committing offsets. For this case, we have no
guarantee
that
the result records where published (as we did not flush
the
producer
yet), but when re-reading from the input topic, we would
find
the
record
in the store and incorrectly drop as duplicate...
I think the only solution to make this work would be to
use
TX-state
stores in combination with ALOS as proposed via KIP-892?
Using an in-memory store won't help much either? The
producer
could
have
send the write into the changelog topic, but not into the
result
topic,
and thus we could still not guarantee ALOS...?
How do we want to go about this? We could also say, this
new
operator
only works with EOS. Would this be too restrictive? -- At
lest
for
know,
until KIP-892 lands, and we could relax it?
105: "How to detect late records"
In the end, it seems to boil down to determine which of
the
records
to
forward and which record to drop, for (1) the regular
case
and
(2)
the
out-of-order data case.
Regular case (no out-of-order data): For this case,
offset
and
ts
order
is the same, and we can forward the first record we get.
All
later
record within "de-duplication period" with the same
"de-duplication
key"
would be dropped. If a record with the same
"de-duplication
key"
arrives
after "de-duplication period" passed, we cannot drop it
any
longer,
but
would still forward it, as by the contract of the
operator
/
de-duplication period.
For the out-of-order case: The first question we need to
answer
is,
do
we want to forward the record with the smallest offset or
the
record
with the smallest ts? Logically, forwarding with the
smallest
ts
might
be "more correct", however, it implies we could only
forward
it
after
"de-duplication period" passed, what might be undesired
latency?
Would
this be desired/acceptable?
In contrast, if we forward record with the smallest
offset
(this
is
what
you seem to propose) we don't have a latency issue, but
of
course the
question what records to drop is more tricky to answer:
it
seems
you
propose to compare the time difference of the stored
record
to
the
current record, but I am wondering why? Would it not be
desired
to
drop
all duplicates independent of their ts, as long as we
find
a
record
in
the store? Would be good to get some more motivation and
tradeoffs
discussed about the different strategies we could use.
You also propose to drop _any_ late record: I am also not
sure
if
that's
desired? Could this not lead to data loss? Assume we get
a
late
record,
but in fact there was never a duplicate? Why would we
want
to
drop
it?
If there is a late record which is indeed a duplicate,
but
we
purged
the
original record from the store already, it seems to be
the
same
case
as
for the "no out-of-order case": after we purged we cannot
de-duplicate
and thus it's a regular case we can just accept?
-Matthias
On 5/29/24 4:58 AM, Ayoub Omari wrote:
Hi everyone,
I've just made a (small) change to this KIP about an
implementation
detail.
Please let me know your thoughts.
Thank you,
Ayoub
Le lun. 20 mai 2024 à 21:13, Ayoub <
ayoubomar...@gmail.com>
a
écrit :
Hello,
Following a discussion on community slack channel, I
would
like to
revive
the discussion on the KIP-655, which is about adding a
deduplication
processor in kafka-streams.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
Even though the motivation is not quite the same as the
initial
one,
I
updated the KIP rather than creating a new one, as I
believe
the
end
goal
is the same.
Thanks,
Ayoub