Hi Bill,

Thanks for your comments !

I am prefixing your points with B.

B1.
That's a good catch. I think punctuated records aren't concerned by
this problem of reprocessing because they are not issued twice.
We will still need to make a difference between a punctuated
and a source record. The logic becomes:

> if record.offset is not null && record.offset = offset_stored:

     forward(record)

else:

    // do_not_forward


I will update the KIP on this point.



B2. (i.e. 106)
That's the point discussed in 106. The main idea is to avoid repartitioning
when we don't have to. But let's say we want to repartition when
deduplication is on an id that is not the key.

The user code may look something like this:
 ```kstream
     .deduplicate((k, v) -> v.id)
     .map((k, v) -> ...)
  ```
The user expects that the key in the map processor which follows the
deduplicate is the same as the initial key.

Internally, if we decide to repartition the deduplication processor may do
the following:
```kstream
     .map((k, v) -> (v.id, ...)) // For repartitioning on id
     .deduplicateByKey()
     .map((id, v) -> (initialKey, ...)) // To recover the initial key
  ```

In this case, there is some internal complexity of this processor:
    I) We should repartition twice, before and after deduplication. The
second
map triggers repartitioning if there are following stateful processors
in the pipeline.

    II.) The initial key should be kept somewhere. One way is to wrap the
value
to add the key, and then unwrap it after the `deduplicateByKey`

As mentioned in one use case above (deduplicating transactions - written at
least
once - by id), repartition is not needed for that case. This added
complexity won't
be useful.

*TLDR,* I think one solution is to have a separate api to cover such a use
case. In total,
we can have 3 apis of deduplication

1) `deduplicateByKey()`  *// No repartitioning*
2) `deduplicateByKey((k, v) -> v.id)`  *or *`deduplicateByKeyAndId((k, v)
-> v.id)`  *// No repartitioning*
3) `deduplicate((k, v) -> v.id)`  *// causes repartitioning*

The second api is equivalent to deduplicate by the couple (key, id).
No repartitioning is required because the records with the same key are
already
together in the same partition. cc @Matthias @Sebastien.



B3.
I think here we can do what some existing processors do for periodic work,
we can track the last cleanup time and the current stream time,
if the delta exceeds the deduplication period, we trigger the cleanup.


Best,
Ayoub

Le jeu. 11 juil. 2024 à 01:17, Bill Bejeck <bbej...@apache.org> a écrit :

> Hi All,
>
> Thanks Ayoub for getting this KIP going again, I think a deduplication
> operator will be very useful.  My applogies for being late to the
> discussion.
>
> Overall - I agree with the direction of the KIP but I have a couple of
> questions.
>
>  1.  Regarding using offsets for tracking the first arrival.  I think
> there could be a case when offsets would not be available, when records are
> forwarded by punctuation for example.
>
> 2. I'm not sure about using something other than the key for identifying
> dupllicate records.  By doing so, one could end up missing a de-duplication
> due to records with the same id characteristic in the value but having
> different keys so the records land on different partitions.  I guess we
> could enforce a repartition if one choses to use a de-duplication id other
> than the key.
>
> 3. I think a punctuation scheduled to run at the de-duplication period to
> clean out older records would be a clean approach for purging older
> records.  I'm not taking a hard stance on this approach and I'm willing to
> discuss different methods.
>
> Thanks,
>
> Bill
>
>
>
> On 2024/06/25 18:44:06 Ayoub Omari wrote:
> > Hi Matthias,
> >
> > Here are my updates on your points.
> >
> > 101.
> > > You propose to add static methods `keySerde()` and `valueSerde()` --
> > > in other config classes, we use only `with(keySerde, valueSerde)` as we
> > try
> > > to use the "builder" pattern, and avoid too many overloads. I would
> > > prefer to omit both methods you suggest and just use a single `with`
> for
> > > both serdes.
> >
> > I was actually inspired by the other config classes, for example `Joined`
> > and
> > `Grouped` both have the static methods `keySerde()` and `valueSerde()`.
> >
> > > I think we don't want to add `with(...)` which takes all
> > > parameters at once
> >
> > Done.
> >
> >
> > 102.
> > Thanks, your suggestion sounds good to me. The trade-off of having an
> index
> > that allows to efficiently purge expired records besides the keyValue
> store
> > makes sense. I've been looking into the code, and I think a similar idea
> > was implemented for other processors (for example with
> > DualSchemaRocksDBSegmentedBytesStore).
> > As you said, I think we would benefit from some existing code here.
> > KIP updated !
> >
> >
> > 104.
> > Updated the KIP to consider records' offsets.
> >
> >
> > 105
> > > picking the first offset with smallest ts sounds good to me. The KIP
> > > should be explicit about it
> >
> > Done.
> >
> > > But as discussed above, it might be
> > > simplest to not really have a window lookup, but just a plain
> key-lookup
> > > and drop if the key exists in the store?
> >
> > KIP updated, we will be `.get()`ing from a keyValueStore instead of
> > `.fetch()`ing
> > from a WindowStore.
> >
> > > Another line of thinking, that did serve us well in the past: in doubt
> > > keep a record -- users can add operators to drop record (in case they
> > > don't want to keep it), but if we drop a record, users have no way to
> > > resurrect it (thus, building a workaround to change semantica is
> > > possible for users if we default to keep records, but not the other way
> > > around).
> >
> > Makes total sense ! I updated the KIP to forward late records instead of
> > dropping them.
> >
> >
> > 106.
> > For the moment, I highlighted in Javadocs that we are deduplicating by
> > partition. If there is a better name to have this information in the name
> > of the api itself it would be good.
> >
> >
> > Best,
> > Ayoub
> >
> >
> > Le jeu. 13 juin 2024 à 09:03, Sebastien Viale <
> sebastien.vi...@michelin.com>
> > a écrit :
> >
> > >
> > > Hi,
> > >
> > > 106 :
> > >
> > > Thanks for the clarification. Actually, this is not what I expected,
> but I
> > > better understand the performance issues regarding the state store
> > > iteration.
> > > If this is how it should be designed, it is fine for me as long as it
> is
> > > clear that the repartition must be done before the deduplication.
> > > Sébastien
> > >
> > > ________________________________
> > > De : Matthias J. Sax <mj...@apache.org>
> > > Envoyé : jeudi 13 juin 2024 02:51
> > > À : dev@kafka.apache.org <dev@kafka.apache.org>
> > > Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication processor in
> > > kafka-streams
> > >
> > > Warning External sender Do not click on any links or open any
> attachments
> > > unless you trust the sender and know the content is safe.
> > >
> > > 106:
> > >
> > > > For the use-case of deduplicating a "at least once written" stream,
> > > > we are sure that the duplicate record has the same key as the
> > > > original one, and will land on the same task. Here, a user would
> > > > want to specify a deduplication key different from the topic's key
> > > > in case the topic's key is not a unique identifier
> > > >
> > > > For example, we have a topic with keyValue (`userId`, `transaction`)
> > > > and deduplication is done on `transaction`.`id` . Here, the
> application
> > > > wants to deduplicate transactions. It knows that a transaction id
> > > > maps to a single userId. Any duplicate of that record would be
> received
> > > > by the task which processes this userId.
> > >
> > > This is an interesting point.
> > >
> > > My concern is to some extend, that it seems (on the surface) to not
> > > follow the established pattern of auto-repartitioning in the DSL. Of
> > > course, given that the current proposal says we use an "id extractor"
> > > and not a "key extractor" it might be ok (but it might be somewhat
> > > subtle). Of course, JavaDocs always help to explain in detail. Would
> > > this be enough?
> > >
> > > Would be good to hear from others about this point. I am personally not
> > > sure which approach I would prefer personally at this point.
> > >
> > > The problem reminds me on
> > > https://issues.apache.org/jira/browse/KAFKA-10844<
> > > https://issues.apache.org/jira/browse/KAFKA-10844> which is not
> resolve
> > > directly either. We do have KIP-759
> > > (https://cwiki.apache.org/confluence/display/KAFKA/KIP-759: Unneeded
> > > repartition canceling<
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-759: Unneeded
> > > repartition canceling>)
> > > which is WIP and helps with KAFKA-10844, but not sure if it would be a
> > > viable solution for the de-duplication case?
> > >
> > >
> > >
> > > -Matthias
> > >
> > > This email was screened for spam and malicious content but exercise
> > > caution anyway.
> > >
> > >
> > >
> > >
> > > On 6/11/24 2:31 PM, Ayoub Omari wrote:
> > > > Hi Sebastien & Matthias,
> > > >
> > > > For 106.
> > > > My idea was to deduplicate on a per-task basis. If the user wants
> > > > to do a global deduplication over all partitions, I think it's
> better to
> > > > have him explicitly repartition and then call the deduplication
> > > processor.
> > > >
> > > > For the use-case of deduplicating a "at least once written" stream,
> > > > we are sure that the duplicate record has the same key as the
> > > > original one, and will land on the same task. Here, a user would
> > > > want to specify a deduplication key different from the topic's key
> > > > in case the topic's key is not a unique identifier.
> > > >
> > > > For example, we have a topic with keyValue (`userId`, `transaction`)
> > > > and deduplication is done on `transaction`.`id` . Here, the
> application
> > > > wants to deduplicate transactions. It knows that a transaction id
> > > > maps to a single userId. Any duplicate of that record would be
> received
> > > > by the task which processes this userId.
> > > >
> > > > One other thought I have when writing the KIP about global
> deduplication,
> > > > is that it will require to map twice the key of the stream (first
> map to
> > > > change the key to deduplication key, and second map to get
> > > > back the initial key). Second map may imply a second repartitioning.
> > > >
> > > > However, if we do a per-task deduplication, the user may adapt to his
> > > > specific use-case.
> > > >
> > > > Let me know what you think
> > > > Ayoub
> > > >
> > > >
> > > >
> > > > Le mar. 11 juin 2024 à 20:21, Matthias J. Sax <mj...@apache.org> a
> > > écrit :
> > > >
> > > >> Thanks Sebastien,
> > > >>
> > > >> that's a good point. Thanks for raising it. -- I like your proposal.
> > > >>
> > > >> An alternative would be to have two overloads of `deduplicate()`
> one w/
> > > >> and one w/o the "id extractor" parameter. This would be less
> explicit
> > > >> though.
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >> On 6/11/24 2:30 AM, Sebastien Viale wrote:
> > > >>> Hi,
> > > >>>
> > > >>> I am really interested in this KIP.
> > > >>>
> > > >>> 106:
> > > >>> I hope I am not talking nonsense, but if you do not deduplicate
> based
> > > on
> > > >> the key, the input stream has to be repartitioned.
> > > >>> Otherwise, different stream tasks may handle records that need to
> be
> > > >> deduplicated, and thus duplicates will not be detected.
> > > >>>
> > > >>> This is why I would have created two different methods, as is done
> for
> > > >> GroupBy:
> > > >>>
> > > >>> deduplicateByKey(...)
> > > >>> deduplicate(...)
> > > >>>
> > > >>> If deduplicateByKey is used, the input stream does not need to be
> > > >> repartitioned.
> > > >>>
> > > >>> thanks
> > > >>>
> > > >>> Sébastien
> > > >>> ________________________________
> > > >>> De : Matthias J. Sax <mj...@apache.org>
> > > >>> Envoyé : mardi 11 juin 2024 01:54
> > > >>> À : dev@kafka.apache.org <dev@kafka.apache.org>
> > > >>> Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication processor in
> > > >> kafka-streams
> > > >>>
> > > >>> Warning External sender Do not click on any links or open any
> > > >> attachments unless you trust the sender and know the content is
> safe.
> > > >>>
> > > >>> Thanks for the update Ayoub.
> > > >>>
> > > >>>
> > > >>> 101: you say:
> > > >>>
> > > >>>> But I am not sure if we don't want to have them for this
> processor ?
> > > >>>
> > > >>> What is your reasoning to move off the established pattern? Would
> be
> > > >>> good to understand, why `Deduplicated` class needs a different
> > > >>> "structure" compared to existing classes.
> > > >>>
> > > >>>
> > > >>>
> > > >>> 102: Creating iterators is very expensive. For other work, we
> actually
> > > >>> hit 100x (?) throughput degradation by creating an (for most cases
> > > >>> empty) iterator for every input record, and needed to find other
> ways
> > > to
> > > >>> avoid creating an iterator per record. It really kills performance.
> > > >>>
> > > >>> I see the point about data expiration. We could experiment with
> > > >>> punctuation to expire old data, or add a second "time-ordered
> store"
> > > >>> (which we already have at hand) which acts as an index into the
> main
> > > >>> store. -- Another possibility would be to add a new version of
> > > segmented
> > > >>> store with a different key-layout (ie, just store the plain key). I
> > > >>> think with some refactoring, we might be able to re-use a lot of
> > > >>> existing code.
> > > >>>
> > > >>>
> > > >>>
> > > >>> 104:
> > > >>>
> > > >>>> This gets me wondering if this is a limitation of stateful
> processors
> > > >>>> in ALOS. For example a windowed aggregation with `on_window_close`
> > > >>>> emit strategy may have the same limitation today (we receive a
> record,
> > > >>>> we update its aggregation result in the store, then crash before
> > > >> committing,
> > > >>>> then the record will be again reconsidered for aggregation). Is
> this
> > > >>>> correct ?
> > > >>>
> > > >>> Yes, this is correct, but it does not violate ALOS, because we did
> not
> > > >>> lose the input record -- of course, the aggregation would contain
> the
> > > >>> input record twice (eg, over count), but this is ok under ALOS.
> > > >>> Unfortunately, for de-duplication this pattern breaks, because
> > > >>> de-duplication operator does a different "aggregation logic"
> depending
> > > >>> on its state (emit if no key found, but not emit if key found). For
> > > >>> counting as an example, we increment the count and emit
> unconditionally
> > > >>> though.
> > > >>>
> > > >>>
> > > >>>> As a workaround, I think storing the record's offset inside the
> > > >>>> store's value can tell us whether the record has been already
> seen or
> > > >> not.
> > > >>>> If we receive a record whose deduplication id exists in the store
> > > >>>> and the entry in the store has the same offset, it means the
> record
> > > >>>> is processed twice and we can go ahead and forward it. If the
> offset
> > > >>>> is different, it means it's a duplicate record, so we ignore it.
> > > >>>
> > > >>> Great idea. This might work... If we store the input record
> offset, we
> > > >>> can actually avoid that the "aggregation logic" changes for the
> same
> > > >>> input record. -- And yes, with ALOS potentially emitting a
> duplicate is
> > > >>> the-name-of-the-game, so no concerns on this part from my side.
> > > >>>
> > > >>>
> > > >>>
> > > >>> 105: picking the first offset with smallest ts sound good to me.
> The
> > > KIP
> > > >>> should be explicit about it. But as discussed above, it might be
> > > >>> simplest to not really have a window lookup, but just a plain
> > > key-lookup
> > > >>> and drop if the key exists in the store? -- For late records, it
> might
> > > >>> imply that they are not de-duplicated, but this is also the case
> for
> > > >>> in-order records if they are further apart than the de-duplication
> > > >>> window size, right? Thus I would believe this is "more natural"
> > > compared
> > > >>> to discarding late records pro-actively, which would lead to
> missing
> > > >>> result records?
> > > >>>
> > > >>> We could also make this configurable if we are not sure what users
> > > >>> really need -- or add such a configuration later in case the
> semantics
> > > >>> we pick don't work for some users.
> > > >>>
> > > >>> Another line of thinking, that did serve us well in the past: in
> doubt
> > > >>> keep a record -- users can add operators to drop record (in case
> they
> > > >>> don't want to keep it), but if we drop a record, users have no way
> to
> > > >>> resurrect it (thus, building a workaround to change semantica is
> > > >>> possible for users if we default to keep records, but not the
> other way
> > > >>> around).
> > > >>>
> > > >>> Would be good to get input from the broader community on this
> question
> > > >>> thought. In the end, it must be a use-case driven decision?
> > > >>>
> > > >>>
> > > >>>
> > > >>> -Matthias
> > > >>>
> > > >>> This email was screened for spam and malicious content but exercise
> > > >> caution anyway.
> > > >>>
> > > >>>
> > > >>>
> > > >>>
> > > >>>
> > > >>> On 6/6/24 5:02 AM, Ayoub Omari wrote:
> > > >>>> Hi Matthias,
> > > >>>>
> > > >>>> Thank you for your review !
> > > >>>>
> > > >>>> 100.
> > > >>>> I agree. I changed the name of the parameter to "idSelector".
> > > >>>> Because this id may be computed, It is better to call it "id"
> rather
> > > >> than
> > > >>>> field or attribute.
> > > >>>>
> > > >>>> 101.
> > > >>>> The reason I added the methods `keySerde()` and `valueSerde()`
> was to
> > > >>>> have the same capabilities as other serde classes (such as Grouped
> > > >>>> or Joined). As a Kafka-streams user, I usually use `with(keySerde,
> > > >>>> valueSerde)`
> > > >>>> as you suggested. But I am not sure if we don't want to have them
> for
> > > >> this
> > > >>>> processor ?
> > > >>>>
> > > >>>> 102.
> > > >>>> That's a good point ! Because we know that the window store will
> > > contain
> > > >>>> at most one instance of a given key, I am not sure how the range
> fetch
> > > >>>> on WindowStore compares to a KeyValueStore `get()` in this case.
> > > >>>> Wondering if the fact that the record's key is the prefix of the
> > > >> underlying
> > > >>>> keyValueStore's key ("<dataKey,ts>") may provide comparable
> > > performance
> > > >>>> to the random access of KeyValueStore ? Of course, the WindowStore
> > > >> fetch()
> > > >>>> would be less efficient because it may fetch from more than 1
> segment,
> > > >> and
> > > >>>> because of some iterator overhead.
> > > >>>>
> > > >>>> The purpose of using a WindowStore is to automatically purge old
> data.
> > > >>>> For example, deduplicating a topic written at least once wouldn't
> > > >> require
> > > >>>> keeping a large history. This is not the case of using a
> KeyValueStore
> > > >>>> which would require scanning regularly to remove expired records.
> > > >>>> That might cause a sudden increase of latency whenever the cleanup
> > > >>>> is triggered.
> > > >>>>
> > > >>>> It would be good to hear from anyone who has done some analysis
> > > >>>> on RocksDB's range fetch.
> > > >>>>
> > > >>>> 103.
> > > >>>> Sure, I can update it once we agree on underlying semantics.
> > > >>>>
> > > >>>> 104.
> > > >>>> Another good point !
> > > >>>>
> > > >>>>> In the end, de-duplication does only make sense when EOS is used
> > > >>>>
> > > >>>> I agree with that. And for me, the use case of deduplicating a
> topic
> > > >>>> written ALOS inside an EOS application might be the top 1 use case
> > > >>>> of deduplication.
> > > >>>>
> > > >>>>> all downstream processing happens, `context.forward()` returns
> > > >>>>> and we update the state store, we could now crash w/o committing
> > > >> offsets
> > > >>>>
> > > >>>> This gets me wondering if this is a limitation of stateful
> processors
> > > >>>> in ALOS. For example a windowed aggregation with `on_window_close`
> > > >>>> emit strategy may have the same limitation today (we receive a
> record,
> > > >>>> we update its aggregation result in the store, then crash before
> > > >> committing,
> > > >>>> then the record will be again reconsidered for aggregation). Is
> this
> > > >>>> correct ?
> > > >>>>
> > > >>>> As a workaround, I think storing the record's offset inside the
> > > >>>> store's value can tell us whether the record has been already
> seen or
> > > >> not.
> > > >>>> If we receive a record whose deduplication id exists in the store
> > > >>>> and the entry in the store has the same offset, it means the
> record
> > > >>>> is processed twice and we can go ahead and forward it. If the
> offset
> > > >>>> is different, it means it's a duplicate record, so we ignore it.
> > > >>>>
> > > >>>> As you said, we don't have any guarantee whether the initial
> record
> > > was
> > > >>>> forwarded or not in case of a crash before commit. In this
> solution
> > > >>>> we would forward the record twice, which is against deduplication.
> > > >>>> But, this is still an ALOS application, so it has the same
> semantics
> > > >>>> as any other such application. With this, I am not sure we can
> > > >>>> have "strict" deduplication for ALOS applications.
> > > >>>>
> > > >>>> 105.
> > > >>>> For me, if there are two duplicate records, it means they are
> > > >>>> the same in the application's point of view, so it can choose
> > > >>>> either one. Thus, I would go with forwarding the record with
> > > >>>> the least offset.
> > > >>>>
> > > >>>>> Would it not be desired to drop all duplicates independent
> > > >>>>> of their ts, as long as we find a record in the store?
> > > >>>>
> > > >>>> This is actually related to the (suggested) windowed nature
> > > >>>> of deduplication. As in 102. we don't want to do a "forever"
> > > >>>> deduplication, which may be impossible for huge workloads
> > > >>>> where all records should be kept in the store. Hence, the fetch
> > > >>>> of timestamp between [ts-deduplicationInterval,
> > > >> ts+deduplicationInterval]
> > > >>>>
> > > >>>> About late records, this is again due to the windowed nature.
> > > >>>> Because the store won't save those late (i.e. expired) records,
> > > >>>> we have two options. Either, we do not apply deduplication
> > > >>>> on them, thus the deduplication doesn't work on late records,
> > > >>>> or we discard them (which is the option I suggest).
> > > >>>> In the second case, It would be up to the user to choose
> > > >>>> any deduplicationInterval that may sufficiently cover all his late
> > > data.
> > > >>>> What do you think ?
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Ayoub
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> Le mar. 4 juin 2024 à 23:58, Matthias J. Sax <mj...@apache.org> a
> > > >> écrit :
> > > >>>>
> > > >>>>> Ayoub,
> > > >>>>>
> > > >>>>> thanks for resurrecting this KIP. I think a built-in
> de-duplication
> > > >>>>> operator will be very useful.
> > > >>>>>
> > > >>>>>
> > > >>>>> Couple of questions:
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> 100: `deduplicationKeySelector`
> > > >>>>>
> > > >>>>> Is this the best name? It might indicate that we select a "key"
> what
> > > is
> > > >>>>> an overloaded term... Maybe we could use `Field` or `Id` or
> > > `Attribute`
> > > >>>>> instead of `Key` in the name? Just brainstorming. If we think
> `Key`
> > > is
> > > >>>>> the best word, I am also ok with it.
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> 101: `Deduplicated` class
> > > >>>>>
> > > >>>>> You propose to add static methods `keySerde()` and
> `valueSerde()` --
> > > in
> > > >>>>> other config classes, we use only `with(keySerde, valueSerde)`
> as we
> > > >> try
> > > >>>>> to use the "builder" pattern, and avoid too many overloads. I
> would
> > > >>>>> prefer to omit both methods you suggest and just use a single
> `with`
> > > >> for
> > > >>>>> both serdes.
> > > >>>>>
> > > >>>>> Similarly, I thing we don't want to add `with(...)` which takes
> all
> > > >>>>> parameters at once (which should only be 3 parameters, not 4 as
> it's
> > > >>>>> currently in the KIP)?
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> 102: Usage of `WindowedStore`:
> > > >>>>>
> > > >>>>> Would this be efficient? The physical byte layout it
> "<dataKey,ts>"
> > > for
> > > >>>>> the store key, so it would be difficult to do an efficient lookup
> > > for a
> > > >>>>> given "de-duplication key" to discard duplicates, as we don't
> know
> > > the
> > > >>>>> timestamp of the first record with the same "de-duplication key".
> > > >>>>>
> > > >>>>> This boils down to the actual de-duplication logic (some more
> > > comments
> > > >>>>> below), but what you propose seems to require expensive
> range-scans
> > > >> what
> > > >>>>> could be cost prohibitive in practice. I think we need to find a
> way
> > > to
> > > >>>>> use efficient key-point-lookups to make this work.
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> 103: "Processing logic":
> > > >>>>>
> > > >>>>> Might need some updates (Cf 102 comment). I am not sure if I
> fully
> > > >>>>> understand the logic: cf 105 below.
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> 104:
> > > >>>>>
> > > >>>>>> If no entries found → forward the record + save the record in
> the
> > > >> store
> > > >>>>>
> > > >>>>> This part is critical, and we should discuss in detail. In the
> end,
> > > >>>>> de-duplication does only make sense when EOS is used, and we
> might
> > > want
> > > >>>>> to call this out (eg, on the JavaDocs)? But if used with ALOS,
> it's
> > > >> very
> > > >>>>> difficult to ensure that we never lose data... Your proposal to
> > > >>>>> first-forward goes into the right direction, but does not really
> > > solve
> > > >>>>> the problem entirely:
> > > >>>>>
> > > >>>>> Even if we forward the message first, all downstream processing
> > > >> happens,
> > > >>>>> `context.forward()` returns and we update the state store, we
> could
> > > now
> > > >>>>> crash w/o committing offsets. For this case, we have no guarantee
> > > that
> > > >>>>> the result records where published (as we did not flush the
> producer
> > > >>>>> yet), but when re-reading from the input topic, we would find the
> > > >> record
> > > >>>>> in the store and incorrectly drop as duplicate...
> > > >>>>>
> > > >>>>> I think the only solution to make this work would be to use
> TX-state
> > > >>>>> stores in combination with ALOS as proposed via KIP-892?
> > > >>>>>
> > > >>>>> Using an in-memory store won't help much either? The producer
> could
> > > >> have
> > > >>>>> send the write into the changelog topic, but not into the result
> > > topic,
> > > >>>>> and thus we could still not guarantee ALOS...?
> > > >>>>>
> > > >>>>> How do we want to go about this? We could also say, this new
> operator
> > > >>>>> only works with EOS. Would this be too restrictive? -- At lest
> for
> > > >> know,
> > > >>>>> until KIP-892 lands, and we could relax it?
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> 105: "How to detect late records"
> > > >>>>>
> > > >>>>> In the end, it seems to boil down to determine which of the
> records
> > > to
> > > >>>>> forward and which record to drop, for (1) the regular case and
> (2)
> > > the
> > > >>>>> out-of-order data case.
> > > >>>>>
> > > >>>>> Regular case (no out-of-order data): For this case, offset and ts
> > > order
> > > >>>>> is the same, and we can forward the first record we get. All
> later
> > > >>>>> record within "de-duplication period" with the same
> "de-duplication
> > > >> key"
> > > >>>>> would be dropped. If a record with the same "de-duplication key"
> > > >> arrives
> > > >>>>> after "de-duplication period" passed, we cannot drop it any
> longer,
> > > but
> > > >>>>> would still forward it, as by the contract of the operator /
> > > >>>>> de-duplication period.
> > > >>>>>
> > > >>>>> For the out-of-order case: The first question we need to answer
> is,
> > > do
> > > >>>>> we want to forward the record with the smallest offset or the
> record
> > > >>>>> with the smallest ts? Logically, forwarding with the smallest ts
> > > might
> > > >>>>> be "more correct", however, it implies we could only forward it
> after
> > > >>>>> "de-duplication period" passed, what might be undesired latency?
> > > Would
> > > >>>>> this be desired/acceptable?
> > > >>>>>
> > > >>>>> In contrast, if we forward record with the smallest offset (this
> is
> > > >> what
> > > >>>>> you seem to propose) we don't have a latency issue, but of
> course the
> > > >>>>> question what records to drop is more tricky to answer: it seems
> you
> > > >>>>> propose to compare the time difference of the stored record to
> the
> > > >>>>> current record, but I am wondering why? Would it not be desired
> to
> > > drop
> > > >>>>> all duplicates independent of their ts, as long as we find a
> record
> > > in
> > > >>>>> the store? Would be good to get some more motivation and
> tradeoffs
> > > >>>>> discussed about the different strategies we could use.
> > > >>>>>
> > > >>>>> You also propose to drop _any_ late record: I am also not sure if
> > > >> that's
> > > >>>>> desired? Could this not lead to data loss? Assume we get a late
> > > record,
> > > >>>>> but in fact there was never a duplicate? Why would we want to
> drop
> > > it?
> > > >>>>> If there is a late record which is indeed a duplicate, but we
> purged
> > > >> the
> > > >>>>> original record from the store already, it seems to be the same
> case
> > > as
> > > >>>>> for the "no out-of-order case": after we purged we cannot
> > > de-duplicate
> > > >>>>> and thus it's a regular case we can just accept?
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> -Matthias
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> On 5/29/24 4:58 AM, Ayoub Omari wrote:
> > > >>>>>> Hi everyone,
> > > >>>>>>
> > > >>>>>> I've just made a (small) change to this KIP about an
> implementation
> > > >>>>> detail.
> > > >>>>>> Please let me know your thoughts.
> > > >>>>>>
> > > >>>>>> Thank you,
> > > >>>>>> Ayoub
> > > >>>>>>
> > > >>>>>> Le lun. 20 mai 2024 à 21:13, Ayoub <ayoubomar...@gmail.com> a
> > > écrit :
> > > >>>>>>
> > > >>>>>>> Hello,
> > > >>>>>>>
> > > >>>>>>> Following a discussion on community slack channel, I would
> like to
> > > >>>>> revive
> > > >>>>>>> the discussion on the KIP-655, which is about adding a
> > > deduplication
> > > >>>>>>> processor in kafka-streams.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>
> > > >>
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> > > <
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> > > >
> > > >> <
> > > >>
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> > > <
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> > > >
> > > >>>
> > > >>>>>>>
> > > >>>>>>> Even though the motivation is not quite the same as the initial
> > > one,
> > > >> I
> > > >>>>>>> updated the KIP rather than creating a new one, as I believe
> the
> > > end
> > > >>>>> goal
> > > >>>>>>> is the same.
> > > >>>>>>>
> > > >>>>>>> Thanks,
> > > >>>>>>> Ayoub
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > > >
> > >
> >
>

Reply via email to