Hi Matthias,

Thank you for your review !

100.
I agree. I changed the name of the parameter to "idSelector".
Because this id may be computed, It is better to call it "id" rather than
field or attribute.

101.
The reason I added the methods `keySerde()` and `valueSerde()` was to
have the same capabilities as other serde classes (such as Grouped
or Joined). As a Kafka-streams user, I usually use `with(keySerde,
valueSerde)`
as you suggested. But I am not sure if we don't want to have them for this
processor ?

102.
 That's a good point ! Because we know that the window store will contain
at most one instance of a given key, I am not sure how the range fetch
on WindowStore compares to a KeyValueStore `get()` in this case.
Wondering if the fact that the record's key is the prefix of the underlying
keyValueStore's key ("<dataKey,ts>") may provide comparable performance
to the random access of KeyValueStore ? Of course, the WindowStore fetch()
would be less efficient because it may fetch from more than 1 segment, and
because of some iterator overhead.

The purpose of using a WindowStore is to automatically purge old data.
For example, deduplicating a topic written at least once wouldn't require
keeping a large history. This is not the case of using a KeyValueStore
which would require scanning regularly to remove expired records.
That might cause a sudden increase of latency whenever the cleanup
is triggered.

It would be good to hear from anyone who has done some analysis
on RocksDB's range fetch.

103.
Sure, I can update it once we agree on underlying semantics.

104.
Another good point !

> In the end, de-duplication does only make sense when EOS is used

I agree with that. And for me, the use case of deduplicating a topic
written ALOS inside an EOS application might be the top 1 use case
of deduplication.

> all downstream processing happens, `context.forward()` returns
> and we update the state store, we could now crash w/o committing offsets

This gets me wondering if this is a limitation of stateful processors
in ALOS. For example a windowed aggregation with `on_window_close`
emit strategy may have the same limitation today (we receive a record,
we update its aggregation result in the store, then crash before committing,
then the record will be again reconsidered for aggregation). Is this
correct ?

As a workaround, I think storing the record's offset inside the
store's value can tell us whether the record has been already seen or not.
If we receive a record whose deduplication id exists in the store
and the entry in the store has the same offset, it means the record
is processed twice and we can go ahead and forward it. If the offset
is different, it means it's a duplicate record, so we ignore it.

As you said, we don't have any guarantee whether the initial record was
forwarded or not in case of a crash before commit. In this solution
we would forward the record twice, which is against deduplication.
But, this is still an ALOS application, so it has the same semantics
as any other such application. With this, I am not sure we can
have "strict" deduplication for ALOS applications.

105.
For me, if there are two duplicate records, it means they are
the same in the application's point of view, so it can choose
either one. Thus, I would go with forwarding the record with
the least offset.

> Would it not be desired to drop all duplicates independent
> of their ts, as long as we find a record in the store?

This is actually related to the (suggested) windowed nature
of deduplication. As in 102. we don't want to do a "forever"
deduplication, which may be impossible for huge workloads
where all records should be kept in the store. Hence, the fetch
of timestamp between [ts-deduplicationInterval, ts+deduplicationInterval]

About late records, this is again due to the windowed nature.
Because the store won't save those late (i.e. expired) records,
we have two options. Either, we do not apply deduplication
on them, thus the deduplication doesn't work on late records,
or we discard them (which is the option I suggest).
In the second case, It would be up to the user to choose
any deduplicationInterval that may sufficiently cover all his late data.
What do you think ?

Thanks,
Ayoub








Le mar. 4 juin 2024 à 23:58, Matthias J. Sax <mj...@apache.org> a écrit :

> Ayoub,
>
> thanks for resurrecting this KIP. I think a built-in de-duplication
> operator will be very useful.
>
>
> Couple of questions:
>
>
>
> 100: `deduplicationKeySelector`
>
> Is this the best name? It might indicate that we select a "key" what is
> an overloaded term... Maybe we could use `Field` or `Id` or `Attribute`
> instead of `Key` in the name? Just brainstorming. If we think `Key` is
> the best word, I am also ok with it.
>
>
>
> 101: `Deduplicated` class
>
> You propose to add static methods `keySerde()` and `valueSerde()` -- in
> other config classes, we use only `with(keySerde, valueSerde)` as we try
> to use the "builder" pattern, and avoid too many overloads. I would
> prefer to omit both methods you suggest and just use a single `with` for
> both serdes.
>
> Similarly, I thing we don't want to add `with(...)` which takes all
> parameters at once (which should only be 3 parameters, not 4 as it's
> currently in the KIP)?
>
>
>
> 102: Usage of `WindowedStore`:
>
> Would this be efficient? The physical byte layout it "<dataKey,ts>" for
> the store key, so it would be difficult to do an efficient lookup for a
> given "de-duplication key" to discard duplicates, as we don't know the
> timestamp of the first record with the same "de-duplication key".
>
> This boils down to the actual de-duplication logic (some more comments
> below), but what you propose seems to require expensive range-scans what
> could be cost prohibitive in practice. I think we need to find a way to
> use efficient key-point-lookups to make this work.
>
>
>
> 103: "Processing logic":
>
> Might need some updates (Cf 102 comment). I am not sure if I fully
> understand the logic: cf 105 below.
>
>
>
> 104:
>
> > If no entries found → forward the record + save the record in the store
>
> This part is critical, and we should discuss in detail. In the end,
> de-duplication does only make sense when EOS is used, and we might want
> to call this out (eg, on the JavaDocs)? But if used with ALOS, it's very
> difficult to ensure that we never lose data... Your proposal to
> first-forward goes into the right direction, but does not really solve
> the problem entirely:
>
> Even if we forward the message first, all downstream processing happens,
> `context.forward()` returns and we update the state store, we could now
> crash w/o committing offsets. For this case, we have no guarantee that
> the result records where published (as we did not flush the producer
> yet), but when re-reading from the input topic, we would find the record
> in the store and incorrectly drop as duplicate...
>
> I think the only solution to make this work would be to use TX-state
> stores in combination with ALOS as proposed via KIP-892?
>
> Using an in-memory store won't help much either? The producer could have
> send the write into the changelog topic, but not into the result topic,
> and thus we could still not guarantee ALOS...?
>
> How do we want to go about this? We could also say, this new operator
> only works with EOS. Would this be too restrictive? -- At lest for know,
> until KIP-892 lands, and we could relax it?
>
>
>
> 105: "How to detect late records"
>
> In the end, it seems to boil down to determine which of the records to
> forward and which record to drop, for (1) the regular case and (2) the
> out-of-order data case.
>
> Regular case (no out-of-order data): For this case, offset and ts order
> is the same, and we can forward the first record we get. All later
> record within "de-duplication period" with the same "de-duplication key"
> would be dropped. If a record with the same "de-duplication key" arrives
> after "de-duplication period" passed, we cannot drop it any longer, but
> would still forward it, as by the contract of the operator /
> de-duplication period.
>
> For the out-of-order case: The first question we need to answer is, do
> we want to forward the record with the smallest offset or the record
> with the smallest ts? Logically, forwarding with the smallest ts might
> be "more correct", however, it implies we could only forward it after
> "de-duplication period" passed, what might be undesired latency? Would
> this be desired/acceptable?
>
> In contrast, if we forward record with the smallest offset (this is what
> you seem to propose) we don't have a latency issue, but of course the
> question what records to drop is more tricky to answer: it seems you
> propose to compare the time difference of the stored record to the
> current record, but I am wondering why? Would it not be desired to drop
> all duplicates independent of their ts, as long as we find a record in
> the store? Would be good to get some more motivation and tradeoffs
> discussed about the different strategies we could use.
>
> You also propose to drop _any_ late record: I am also not sure if that's
> desired? Could this not lead to data loss? Assume we get a late record,
> but in fact there was never a duplicate? Why would we want to drop it?
> If there is a late record which is indeed a duplicate, but we purged the
> original record from the store already, it seems to be the same case as
> for the "no out-of-order case": after we purged we cannot de-duplicate
> and thus it's a regular case we can just accept?
>
>
>
> -Matthias
>
>
>
>
> On 5/29/24 4:58 AM, Ayoub Omari wrote:
> > Hi everyone,
> >
> > I've just made a (small) change to this KIP about an implementation
> detail.
> > Please let me know your thoughts.
> >
> > Thank you,
> > Ayoub
> >
> > Le lun. 20 mai 2024 à 21:13, Ayoub <ayoubomar...@gmail.com> a écrit :
> >
> >> Hello,
> >>
> >> Following a discussion on community slack channel, I would like to
> revive
> >> the discussion on the KIP-655, which is about adding a deduplication
> >> processor in kafka-streams.
> >>
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> >>
> >> Even though the motivation is not quite the same as the initial one, I
> >> updated the KIP rather than creating a new one, as I believe the end
> goal
> >> is the same.
> >>
> >> Thanks,
> >> Ayoub
> >>
> >>
> >>
> >
>

Reply via email to