Hi Ayoub,

What's the status of this KIP?

Regards,
Bill


On Sat, Jul 13, 2024 at 5:13 AM Ayoub Omari <ayoubomar...@gmail.com> wrote:

> Hi Bill,
>
> Thanks for your comments !
>
> I am prefixing your points with B.
>
> B1.
> That's a good catch. I think punctuated records aren't concerned by
> this problem of reprocessing because they are not issued twice.
> We will still need to make a difference between a punctuated
> and a source record. The logic becomes:
>
> > if record.offset is not null && record.offset = offset_stored:
>
>      forward(record)
>
> else:
>
>     // do_not_forward
>
>
> I will update the KIP on this point.
>
>
>
> B2. (i.e. 106)
> That's the point discussed in 106. The main idea is to avoid repartitioning
> when we don't have to. But let's say we want to repartition when
> deduplication is on an id that is not the key.
>
> The user code may look something like this:
>  ```kstream
>      .deduplicate((k, v) -> v.id)
>      .map((k, v) -> ...)
>   ```
> The user expects that the key in the map processor which follows the
> deduplicate is the same as the initial key.
>
> Internally, if we decide to repartition the deduplication processor may do
> the following:
> ```kstream
>      .map((k, v) -> (v.id, ...)) // For repartitioning on id
>      .deduplicateByKey()
>      .map((id, v) -> (initialKey, ...)) // To recover the initial key
>   ```
>
> In this case, there is some internal complexity of this processor:
>     I) We should repartition twice, before and after deduplication. The
> second
> map triggers repartitioning if there are following stateful processors
> in the pipeline.
>
>     II.) The initial key should be kept somewhere. One way is to wrap the
> value
> to add the key, and then unwrap it after the `deduplicateByKey`
>
> As mentioned in one use case above (deduplicating transactions - written at
> least
> once - by id), repartition is not needed for that case. This added
> complexity won't
> be useful.
>
> *TLDR,* I think one solution is to have a separate api to cover such a use
> case. In total,
> we can have 3 apis of deduplication
>
> 1) `deduplicateByKey()`  *// No repartitioning*
> 2) `deduplicateByKey((k, v) -> v.id)`  *or *`deduplicateByKeyAndId((k, v)
> -> v.id)`  *// No repartitioning*
> 3) `deduplicate((k, v) -> v.id)`  *// causes repartitioning*
>
> The second api is equivalent to deduplicate by the couple (key, id).
> No repartitioning is required because the records with the same key are
> already
> together in the same partition. cc @Matthias @Sebastien.
>
>
>
> B3.
> I think here we can do what some existing processors do for periodic work,
> we can track the last cleanup time and the current stream time,
> if the delta exceeds the deduplication period, we trigger the cleanup.
>
>
> Best,
> Ayoub
>
> Le jeu. 11 juil. 2024 à 01:17, Bill Bejeck <bbej...@apache.org> a écrit :
>
> > Hi All,
> >
> > Thanks Ayoub for getting this KIP going again, I think a deduplication
> > operator will be very useful.  My applogies for being late to the
> > discussion.
> >
> > Overall - I agree with the direction of the KIP but I have a couple of
> > questions.
> >
> >  1.  Regarding using offsets for tracking the first arrival.  I think
> > there could be a case when offsets would not be available, when records
> are
> > forwarded by punctuation for example.
> >
> > 2. I'm not sure about using something other than the key for identifying
> > dupllicate records.  By doing so, one could end up missing a
> de-duplication
> > due to records with the same id characteristic in the value but having
> > different keys so the records land on different partitions.  I guess we
> > could enforce a repartition if one choses to use a de-duplication id
> other
> > than the key.
> >
> > 3. I think a punctuation scheduled to run at the de-duplication period to
> > clean out older records would be a clean approach for purging older
> > records.  I'm not taking a hard stance on this approach and I'm willing
> to
> > discuss different methods.
> >
> > Thanks,
> >
> > Bill
> >
> >
> >
> > On 2024/06/25 18:44:06 Ayoub Omari wrote:
> > > Hi Matthias,
> > >
> > > Here are my updates on your points.
> > >
> > > 101.
> > > > You propose to add static methods `keySerde()` and `valueSerde()` --
> > > > in other config classes, we use only `with(keySerde, valueSerde)` as
> we
> > > try
> > > > to use the "builder" pattern, and avoid too many overloads. I would
> > > > prefer to omit both methods you suggest and just use a single `with`
> > for
> > > > both serdes.
> > >
> > > I was actually inspired by the other config classes, for example
> `Joined`
> > > and
> > > `Grouped` both have the static methods `keySerde()` and `valueSerde()`.
> > >
> > > > I think we don't want to add `with(...)` which takes all
> > > > parameters at once
> > >
> > > Done.
> > >
> > >
> > > 102.
> > > Thanks, your suggestion sounds good to me. The trade-off of having an
> > index
> > > that allows to efficiently purge expired records besides the keyValue
> > store
> > > makes sense. I've been looking into the code, and I think a similar
> idea
> > > was implemented for other processors (for example with
> > > DualSchemaRocksDBSegmentedBytesStore).
> > > As you said, I think we would benefit from some existing code here.
> > > KIP updated !
> > >
> > >
> > > 104.
> > > Updated the KIP to consider records' offsets.
> > >
> > >
> > > 105
> > > > picking the first offset with smallest ts sounds good to me. The KIP
> > > > should be explicit about it
> > >
> > > Done.
> > >
> > > > But as discussed above, it might be
> > > > simplest to not really have a window lookup, but just a plain
> > key-lookup
> > > > and drop if the key exists in the store?
> > >
> > > KIP updated, we will be `.get()`ing from a keyValueStore instead of
> > > `.fetch()`ing
> > > from a WindowStore.
> > >
> > > > Another line of thinking, that did serve us well in the past: in
> doubt
> > > > keep a record -- users can add operators to drop record (in case they
> > > > don't want to keep it), but if we drop a record, users have no way to
> > > > resurrect it (thus, building a workaround to change semantica is
> > > > possible for users if we default to keep records, but not the other
> way
> > > > around).
> > >
> > > Makes total sense ! I updated the KIP to forward late records instead
> of
> > > dropping them.
> > >
> > >
> > > 106.
> > > For the moment, I highlighted in Javadocs that we are deduplicating by
> > > partition. If there is a better name to have this information in the
> name
> > > of the api itself it would be good.
> > >
> > >
> > > Best,
> > > Ayoub
> > >
> > >
> > > Le jeu. 13 juin 2024 à 09:03, Sebastien Viale <
> > sebastien.vi...@michelin.com>
> > > a écrit :
> > >
> > > >
> > > > Hi,
> > > >
> > > > 106 :
> > > >
> > > > Thanks for the clarification. Actually, this is not what I expected,
> > but I
> > > > better understand the performance issues regarding the state store
> > > > iteration.
> > > > If this is how it should be designed, it is fine for me as long as it
> > is
> > > > clear that the repartition must be done before the deduplication.
> > > > Sébastien
> > > >
> > > > ________________________________
> > > > De : Matthias J. Sax <mj...@apache.org>
> > > > Envoyé : jeudi 13 juin 2024 02:51
> > > > À : dev@kafka.apache.org <dev@kafka.apache.org>
> > > > Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication processor in
> > > > kafka-streams
> > > >
> > > > Warning External sender Do not click on any links or open any
> > attachments
> > > > unless you trust the sender and know the content is safe.
> > > >
> > > > 106:
> > > >
> > > > > For the use-case of deduplicating a "at least once written" stream,
> > > > > we are sure that the duplicate record has the same key as the
> > > > > original one, and will land on the same task. Here, a user would
> > > > > want to specify a deduplication key different from the topic's key
> > > > > in case the topic's key is not a unique identifier
> > > > >
> > > > > For example, we have a topic with keyValue (`userId`,
> `transaction`)
> > > > > and deduplication is done on `transaction`.`id` . Here, the
> > application
> > > > > wants to deduplicate transactions. It knows that a transaction id
> > > > > maps to a single userId. Any duplicate of that record would be
> > received
> > > > > by the task which processes this userId.
> > > >
> > > > This is an interesting point.
> > > >
> > > > My concern is to some extend, that it seems (on the surface) to not
> > > > follow the established pattern of auto-repartitioning in the DSL. Of
> > > > course, given that the current proposal says we use an "id extractor"
> > > > and not a "key extractor" it might be ok (but it might be somewhat
> > > > subtle). Of course, JavaDocs always help to explain in detail. Would
> > > > this be enough?
> > > >
> > > > Would be good to hear from others about this point. I am personally
> not
> > > > sure which approach I would prefer personally at this point.
> > > >
> > > > The problem reminds me on
> > > > https://issues.apache.org/jira/browse/KAFKA-10844<
> > > > https://issues.apache.org/jira/browse/KAFKA-10844> which is not
> > resolve
> > > > directly either. We do have KIP-759
> > > > (https://cwiki.apache.org/confluence/display/KAFKA/KIP-759: Unneeded
> > > > repartition canceling<
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-759: Unneeded
> > > > repartition canceling>)
> > > > which is WIP and helps with KAFKA-10844, but not sure if it would be
> a
> > > > viable solution for the de-duplication case?
> > > >
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > This email was screened for spam and malicious content but exercise
> > > > caution anyway.
> > > >
> > > >
> > > >
> > > >
> > > > On 6/11/24 2:31 PM, Ayoub Omari wrote:
> > > > > Hi Sebastien & Matthias,
> > > > >
> > > > > For 106.
> > > > > My idea was to deduplicate on a per-task basis. If the user wants
> > > > > to do a global deduplication over all partitions, I think it's
> > better to
> > > > > have him explicitly repartition and then call the deduplication
> > > > processor.
> > > > >
> > > > > For the use-case of deduplicating a "at least once written" stream,
> > > > > we are sure that the duplicate record has the same key as the
> > > > > original one, and will land on the same task. Here, a user would
> > > > > want to specify a deduplication key different from the topic's key
> > > > > in case the topic's key is not a unique identifier.
> > > > >
> > > > > For example, we have a topic with keyValue (`userId`,
> `transaction`)
> > > > > and deduplication is done on `transaction`.`id` . Here, the
> > application
> > > > > wants to deduplicate transactions. It knows that a transaction id
> > > > > maps to a single userId. Any duplicate of that record would be
> > received
> > > > > by the task which processes this userId.
> > > > >
> > > > > One other thought I have when writing the KIP about global
> > deduplication,
> > > > > is that it will require to map twice the key of the stream (first
> > map to
> > > > > change the key to deduplication key, and second map to get
> > > > > back the initial key). Second map may imply a second
> repartitioning.
> > > > >
> > > > > However, if we do a per-task deduplication, the user may adapt to
> his
> > > > > specific use-case.
> > > > >
> > > > > Let me know what you think
> > > > > Ayoub
> > > > >
> > > > >
> > > > >
> > > > > Le mar. 11 juin 2024 à 20:21, Matthias J. Sax <mj...@apache.org> a
> > > > écrit :
> > > > >
> > > > >> Thanks Sebastien,
> > > > >>
> > > > >> that's a good point. Thanks for raising it. -- I like your
> proposal.
> > > > >>
> > > > >> An alternative would be to have two overloads of `deduplicate()`
> > one w/
> > > > >> and one w/o the "id extractor" parameter. This would be less
> > explicit
> > > > >> though.
> > > > >>
> > > > >>
> > > > >> -Matthias
> > > > >>
> > > > >> On 6/11/24 2:30 AM, Sebastien Viale wrote:
> > > > >>> Hi,
> > > > >>>
> > > > >>> I am really interested in this KIP.
> > > > >>>
> > > > >>> 106:
> > > > >>> I hope I am not talking nonsense, but if you do not deduplicate
> > based
> > > > on
> > > > >> the key, the input stream has to be repartitioned.
> > > > >>> Otherwise, different stream tasks may handle records that need to
> > be
> > > > >> deduplicated, and thus duplicates will not be detected.
> > > > >>>
> > > > >>> This is why I would have created two different methods, as is
> done
> > for
> > > > >> GroupBy:
> > > > >>>
> > > > >>> deduplicateByKey(...)
> > > > >>> deduplicate(...)
> > > > >>>
> > > > >>> If deduplicateByKey is used, the input stream does not need to be
> > > > >> repartitioned.
> > > > >>>
> > > > >>> thanks
> > > > >>>
> > > > >>> Sébastien
> > > > >>> ________________________________
> > > > >>> De : Matthias J. Sax <mj...@apache.org>
> > > > >>> Envoyé : mardi 11 juin 2024 01:54
> > > > >>> À : dev@kafka.apache.org <dev@kafka.apache.org>
> > > > >>> Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication processor
> in
> > > > >> kafka-streams
> > > > >>>
> > > > >>> Warning External sender Do not click on any links or open any
> > > > >> attachments unless you trust the sender and know the content is
> > safe.
> > > > >>>
> > > > >>> Thanks for the update Ayoub.
> > > > >>>
> > > > >>>
> > > > >>> 101: you say:
> > > > >>>
> > > > >>>> But I am not sure if we don't want to have them for this
> > processor ?
> > > > >>>
> > > > >>> What is your reasoning to move off the established pattern? Would
> > be
> > > > >>> good to understand, why `Deduplicated` class needs a different
> > > > >>> "structure" compared to existing classes.
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> 102: Creating iterators is very expensive. For other work, we
> > actually
> > > > >>> hit 100x (?) throughput degradation by creating an (for most
> cases
> > > > >>> empty) iterator for every input record, and needed to find other
> > ways
> > > > to
> > > > >>> avoid creating an iterator per record. It really kills
> performance.
> > > > >>>
> > > > >>> I see the point about data expiration. We could experiment with
> > > > >>> punctuation to expire old data, or add a second "time-ordered
> > store"
> > > > >>> (which we already have at hand) which acts as an index into the
> > main
> > > > >>> store. -- Another possibility would be to add a new version of
> > > > segmented
> > > > >>> store with a different key-layout (ie, just store the plain
> key). I
> > > > >>> think with some refactoring, we might be able to re-use a lot of
> > > > >>> existing code.
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> 104:
> > > > >>>
> > > > >>>> This gets me wondering if this is a limitation of stateful
> > processors
> > > > >>>> in ALOS. For example a windowed aggregation with
> `on_window_close`
> > > > >>>> emit strategy may have the same limitation today (we receive a
> > record,
> > > > >>>> we update its aggregation result in the store, then crash before
> > > > >> committing,
> > > > >>>> then the record will be again reconsidered for aggregation). Is
> > this
> > > > >>>> correct ?
> > > > >>>
> > > > >>> Yes, this is correct, but it does not violate ALOS, because we
> did
> > not
> > > > >>> lose the input record -- of course, the aggregation would contain
> > the
> > > > >>> input record twice (eg, over count), but this is ok under ALOS.
> > > > >>> Unfortunately, for de-duplication this pattern breaks, because
> > > > >>> de-duplication operator does a different "aggregation logic"
> > depending
> > > > >>> on its state (emit if no key found, but not emit if key found).
> For
> > > > >>> counting as an example, we increment the count and emit
> > unconditionally
> > > > >>> though.
> > > > >>>
> > > > >>>
> > > > >>>> As a workaround, I think storing the record's offset inside the
> > > > >>>> store's value can tell us whether the record has been already
> > seen or
> > > > >> not.
> > > > >>>> If we receive a record whose deduplication id exists in the
> store
> > > > >>>> and the entry in the store has the same offset, it means the
> > record
> > > > >>>> is processed twice and we can go ahead and forward it. If the
> > offset
> > > > >>>> is different, it means it's a duplicate record, so we ignore it.
> > > > >>>
> > > > >>> Great idea. This might work... If we store the input record
> > offset, we
> > > > >>> can actually avoid that the "aggregation logic" changes for the
> > same
> > > > >>> input record. -- And yes, with ALOS potentially emitting a
> > duplicate is
> > > > >>> the-name-of-the-game, so no concerns on this part from my side.
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> 105: picking the first offset with smallest ts sound good to me.
> > The
> > > > KIP
> > > > >>> should be explicit about it. But as discussed above, it might be
> > > > >>> simplest to not really have a window lookup, but just a plain
> > > > key-lookup
> > > > >>> and drop if the key exists in the store? -- For late records, it
> > might
> > > > >>> imply that they are not de-duplicated, but this is also the case
> > for
> > > > >>> in-order records if they are further apart than the
> de-duplication
> > > > >>> window size, right? Thus I would believe this is "more natural"
> > > > compared
> > > > >>> to discarding late records pro-actively, which would lead to
> > missing
> > > > >>> result records?
> > > > >>>
> > > > >>> We could also make this configurable if we are not sure what
> users
> > > > >>> really need -- or add such a configuration later in case the
> > semantics
> > > > >>> we pick don't work for some users.
> > > > >>>
> > > > >>> Another line of thinking, that did serve us well in the past: in
> > doubt
> > > > >>> keep a record -- users can add operators to drop record (in case
> > they
> > > > >>> don't want to keep it), but if we drop a record, users have no
> way
> > to
> > > > >>> resurrect it (thus, building a workaround to change semantica is
> > > > >>> possible for users if we default to keep records, but not the
> > other way
> > > > >>> around).
> > > > >>>
> > > > >>> Would be good to get input from the broader community on this
> > question
> > > > >>> thought. In the end, it must be a use-case driven decision?
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> -Matthias
> > > > >>>
> > > > >>> This email was screened for spam and malicious content but
> exercise
> > > > >> caution anyway.
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> On 6/6/24 5:02 AM, Ayoub Omari wrote:
> > > > >>>> Hi Matthias,
> > > > >>>>
> > > > >>>> Thank you for your review !
> > > > >>>>
> > > > >>>> 100.
> > > > >>>> I agree. I changed the name of the parameter to "idSelector".
> > > > >>>> Because this id may be computed, It is better to call it "id"
> > rather
> > > > >> than
> > > > >>>> field or attribute.
> > > > >>>>
> > > > >>>> 101.
> > > > >>>> The reason I added the methods `keySerde()` and `valueSerde()`
> > was to
> > > > >>>> have the same capabilities as other serde classes (such as
> Grouped
> > > > >>>> or Joined). As a Kafka-streams user, I usually use
> `with(keySerde,
> > > > >>>> valueSerde)`
> > > > >>>> as you suggested. But I am not sure if we don't want to have
> them
> > for
> > > > >> this
> > > > >>>> processor ?
> > > > >>>>
> > > > >>>> 102.
> > > > >>>> That's a good point ! Because we know that the window store will
> > > > contain
> > > > >>>> at most one instance of a given key, I am not sure how the range
> > fetch
> > > > >>>> on WindowStore compares to a KeyValueStore `get()` in this case.
> > > > >>>> Wondering if the fact that the record's key is the prefix of the
> > > > >> underlying
> > > > >>>> keyValueStore's key ("<dataKey,ts>") may provide comparable
> > > > performance
> > > > >>>> to the random access of KeyValueStore ? Of course, the
> WindowStore
> > > > >> fetch()
> > > > >>>> would be less efficient because it may fetch from more than 1
> > segment,
> > > > >> and
> > > > >>>> because of some iterator overhead.
> > > > >>>>
> > > > >>>> The purpose of using a WindowStore is to automatically purge old
> > data.
> > > > >>>> For example, deduplicating a topic written at least once
> wouldn't
> > > > >> require
> > > > >>>> keeping a large history. This is not the case of using a
> > KeyValueStore
> > > > >>>> which would require scanning regularly to remove expired
> records.
> > > > >>>> That might cause a sudden increase of latency whenever the
> cleanup
> > > > >>>> is triggered.
> > > > >>>>
> > > > >>>> It would be good to hear from anyone who has done some analysis
> > > > >>>> on RocksDB's range fetch.
> > > > >>>>
> > > > >>>> 103.
> > > > >>>> Sure, I can update it once we agree on underlying semantics.
> > > > >>>>
> > > > >>>> 104.
> > > > >>>> Another good point !
> > > > >>>>
> > > > >>>>> In the end, de-duplication does only make sense when EOS is
> used
> > > > >>>>
> > > > >>>> I agree with that. And for me, the use case of deduplicating a
> > topic
> > > > >>>> written ALOS inside an EOS application might be the top 1 use
> case
> > > > >>>> of deduplication.
> > > > >>>>
> > > > >>>>> all downstream processing happens, `context.forward()` returns
> > > > >>>>> and we update the state store, we could now crash w/o
> committing
> > > > >> offsets
> > > > >>>>
> > > > >>>> This gets me wondering if this is a limitation of stateful
> > processors
> > > > >>>> in ALOS. For example a windowed aggregation with
> `on_window_close`
> > > > >>>> emit strategy may have the same limitation today (we receive a
> > record,
> > > > >>>> we update its aggregation result in the store, then crash before
> > > > >> committing,
> > > > >>>> then the record will be again reconsidered for aggregation). Is
> > this
> > > > >>>> correct ?
> > > > >>>>
> > > > >>>> As a workaround, I think storing the record's offset inside the
> > > > >>>> store's value can tell us whether the record has been already
> > seen or
> > > > >> not.
> > > > >>>> If we receive a record whose deduplication id exists in the
> store
> > > > >>>> and the entry in the store has the same offset, it means the
> > record
> > > > >>>> is processed twice and we can go ahead and forward it. If the
> > offset
> > > > >>>> is different, it means it's a duplicate record, so we ignore it.
> > > > >>>>
> > > > >>>> As you said, we don't have any guarantee whether the initial
> > record
> > > > was
> > > > >>>> forwarded or not in case of a crash before commit. In this
> > solution
> > > > >>>> we would forward the record twice, which is against
> deduplication.
> > > > >>>> But, this is still an ALOS application, so it has the same
> > semantics
> > > > >>>> as any other such application. With this, I am not sure we can
> > > > >>>> have "strict" deduplication for ALOS applications.
> > > > >>>>
> > > > >>>> 105.
> > > > >>>> For me, if there are two duplicate records, it means they are
> > > > >>>> the same in the application's point of view, so it can choose
> > > > >>>> either one. Thus, I would go with forwarding the record with
> > > > >>>> the least offset.
> > > > >>>>
> > > > >>>>> Would it not be desired to drop all duplicates independent
> > > > >>>>> of their ts, as long as we find a record in the store?
> > > > >>>>
> > > > >>>> This is actually related to the (suggested) windowed nature
> > > > >>>> of deduplication. As in 102. we don't want to do a "forever"
> > > > >>>> deduplication, which may be impossible for huge workloads
> > > > >>>> where all records should be kept in the store. Hence, the fetch
> > > > >>>> of timestamp between [ts-deduplicationInterval,
> > > > >> ts+deduplicationInterval]
> > > > >>>>
> > > > >>>> About late records, this is again due to the windowed nature.
> > > > >>>> Because the store won't save those late (i.e. expired) records,
> > > > >>>> we have two options. Either, we do not apply deduplication
> > > > >>>> on them, thus the deduplication doesn't work on late records,
> > > > >>>> or we discard them (which is the option I suggest).
> > > > >>>> In the second case, It would be up to the user to choose
> > > > >>>> any deduplicationInterval that may sufficiently cover all his
> late
> > > > data.
> > > > >>>> What do you think ?
> > > > >>>>
> > > > >>>> Thanks,
> > > > >>>> Ayoub
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> Le mar. 4 juin 2024 à 23:58, Matthias J. Sax <mj...@apache.org>
> a
> > > > >> écrit :
> > > > >>>>
> > > > >>>>> Ayoub,
> > > > >>>>>
> > > > >>>>> thanks for resurrecting this KIP. I think a built-in
> > de-duplication
> > > > >>>>> operator will be very useful.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> Couple of questions:
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> 100: `deduplicationKeySelector`
> > > > >>>>>
> > > > >>>>> Is this the best name? It might indicate that we select a "key"
> > what
> > > > is
> > > > >>>>> an overloaded term... Maybe we could use `Field` or `Id` or
> > > > `Attribute`
> > > > >>>>> instead of `Key` in the name? Just brainstorming. If we think
> > `Key`
> > > > is
> > > > >>>>> the best word, I am also ok with it.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> 101: `Deduplicated` class
> > > > >>>>>
> > > > >>>>> You propose to add static methods `keySerde()` and
> > `valueSerde()` --
> > > > in
> > > > >>>>> other config classes, we use only `with(keySerde, valueSerde)`
> > as we
> > > > >> try
> > > > >>>>> to use the "builder" pattern, and avoid too many overloads. I
> > would
> > > > >>>>> prefer to omit both methods you suggest and just use a single
> > `with`
> > > > >> for
> > > > >>>>> both serdes.
> > > > >>>>>
> > > > >>>>> Similarly, I thing we don't want to add `with(...)` which takes
> > all
> > > > >>>>> parameters at once (which should only be 3 parameters, not 4 as
> > it's
> > > > >>>>> currently in the KIP)?
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> 102: Usage of `WindowedStore`:
> > > > >>>>>
> > > > >>>>> Would this be efficient? The physical byte layout it
> > "<dataKey,ts>"
> > > > for
> > > > >>>>> the store key, so it would be difficult to do an efficient
> lookup
> > > > for a
> > > > >>>>> given "de-duplication key" to discard duplicates, as we don't
> > know
> > > > the
> > > > >>>>> timestamp of the first record with the same "de-duplication
> key".
> > > > >>>>>
> > > > >>>>> This boils down to the actual de-duplication logic (some more
> > > > comments
> > > > >>>>> below), but what you propose seems to require expensive
> > range-scans
> > > > >> what
> > > > >>>>> could be cost prohibitive in practice. I think we need to find
> a
> > way
> > > > to
> > > > >>>>> use efficient key-point-lookups to make this work.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> 103: "Processing logic":
> > > > >>>>>
> > > > >>>>> Might need some updates (Cf 102 comment). I am not sure if I
> > fully
> > > > >>>>> understand the logic: cf 105 below.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> 104:
> > > > >>>>>
> > > > >>>>>> If no entries found → forward the record + save the record in
> > the
> > > > >> store
> > > > >>>>>
> > > > >>>>> This part is critical, and we should discuss in detail. In the
> > end,
> > > > >>>>> de-duplication does only make sense when EOS is used, and we
> > might
> > > > want
> > > > >>>>> to call this out (eg, on the JavaDocs)? But if used with ALOS,
> > it's
> > > > >> very
> > > > >>>>> difficult to ensure that we never lose data... Your proposal to
> > > > >>>>> first-forward goes into the right direction, but does not
> really
> > > > solve
> > > > >>>>> the problem entirely:
> > > > >>>>>
> > > > >>>>> Even if we forward the message first, all downstream processing
> > > > >> happens,
> > > > >>>>> `context.forward()` returns and we update the state store, we
> > could
> > > > now
> > > > >>>>> crash w/o committing offsets. For this case, we have no
> guarantee
> > > > that
> > > > >>>>> the result records where published (as we did not flush the
> > producer
> > > > >>>>> yet), but when re-reading from the input topic, we would find
> the
> > > > >> record
> > > > >>>>> in the store and incorrectly drop as duplicate...
> > > > >>>>>
> > > > >>>>> I think the only solution to make this work would be to use
> > TX-state
> > > > >>>>> stores in combination with ALOS as proposed via KIP-892?
> > > > >>>>>
> > > > >>>>> Using an in-memory store won't help much either? The producer
> > could
> > > > >> have
> > > > >>>>> send the write into the changelog topic, but not into the
> result
> > > > topic,
> > > > >>>>> and thus we could still not guarantee ALOS...?
> > > > >>>>>
> > > > >>>>> How do we want to go about this? We could also say, this new
> > operator
> > > > >>>>> only works with EOS. Would this be too restrictive? -- At lest
> > for
> > > > >> know,
> > > > >>>>> until KIP-892 lands, and we could relax it?
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> 105: "How to detect late records"
> > > > >>>>>
> > > > >>>>> In the end, it seems to boil down to determine which of the
> > records
> > > > to
> > > > >>>>> forward and which record to drop, for (1) the regular case and
> > (2)
> > > > the
> > > > >>>>> out-of-order data case.
> > > > >>>>>
> > > > >>>>> Regular case (no out-of-order data): For this case, offset and
> ts
> > > > order
> > > > >>>>> is the same, and we can forward the first record we get. All
> > later
> > > > >>>>> record within "de-duplication period" with the same
> > "de-duplication
> > > > >> key"
> > > > >>>>> would be dropped. If a record with the same "de-duplication
> key"
> > > > >> arrives
> > > > >>>>> after "de-duplication period" passed, we cannot drop it any
> > longer,
> > > > but
> > > > >>>>> would still forward it, as by the contract of the operator /
> > > > >>>>> de-duplication period.
> > > > >>>>>
> > > > >>>>> For the out-of-order case: The first question we need to answer
> > is,
> > > > do
> > > > >>>>> we want to forward the record with the smallest offset or the
> > record
> > > > >>>>> with the smallest ts? Logically, forwarding with the smallest
> ts
> > > > might
> > > > >>>>> be "more correct", however, it implies we could only forward it
> > after
> > > > >>>>> "de-duplication period" passed, what might be undesired
> latency?
> > > > Would
> > > > >>>>> this be desired/acceptable?
> > > > >>>>>
> > > > >>>>> In contrast, if we forward record with the smallest offset
> (this
> > is
> > > > >> what
> > > > >>>>> you seem to propose) we don't have a latency issue, but of
> > course the
> > > > >>>>> question what records to drop is more tricky to answer: it
> seems
> > you
> > > > >>>>> propose to compare the time difference of the stored record to
> > the
> > > > >>>>> current record, but I am wondering why? Would it not be desired
> > to
> > > > drop
> > > > >>>>> all duplicates independent of their ts, as long as we find a
> > record
> > > > in
> > > > >>>>> the store? Would be good to get some more motivation and
> > tradeoffs
> > > > >>>>> discussed about the different strategies we could use.
> > > > >>>>>
> > > > >>>>> You also propose to drop _any_ late record: I am also not sure
> if
> > > > >> that's
> > > > >>>>> desired? Could this not lead to data loss? Assume we get a late
> > > > record,
> > > > >>>>> but in fact there was never a duplicate? Why would we want to
> > drop
> > > > it?
> > > > >>>>> If there is a late record which is indeed a duplicate, but we
> > purged
> > > > >> the
> > > > >>>>> original record from the store already, it seems to be the same
> > case
> > > > as
> > > > >>>>> for the "no out-of-order case": after we purged we cannot
> > > > de-duplicate
> > > > >>>>> and thus it's a regular case we can just accept?
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> -Matthias
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> On 5/29/24 4:58 AM, Ayoub Omari wrote:
> > > > >>>>>> Hi everyone,
> > > > >>>>>>
> > > > >>>>>> I've just made a (small) change to this KIP about an
> > implementation
> > > > >>>>> detail.
> > > > >>>>>> Please let me know your thoughts.
> > > > >>>>>>
> > > > >>>>>> Thank you,
> > > > >>>>>> Ayoub
> > > > >>>>>>
> > > > >>>>>> Le lun. 20 mai 2024 à 21:13, Ayoub <ayoubomar...@gmail.com> a
> > > > écrit :
> > > > >>>>>>
> > > > >>>>>>> Hello,
> > > > >>>>>>>
> > > > >>>>>>> Following a discussion on community slack channel, I would
> > like to
> > > > >>>>> revive
> > > > >>>>>>> the discussion on the KIP-655, which is about adding a
> > > > deduplication
> > > > >>>>>>> processor in kafka-streams.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>
> > > > >>
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> > > > <
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> > > > >
> > > > >> <
> > > > >>
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> > > > <
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> > > > >
> > > > >>>
> > > > >>>>>>>
> > > > >>>>>>> Even though the motivation is not quite the same as the
> initial
> > > > one,
> > > > >> I
> > > > >>>>>>> updated the KIP rather than creating a new one, as I believe
> > the
> > > > end
> > > > >>>>> goal
> > > > >>>>>>> is the same.
> > > > >>>>>>>
> > > > >>>>>>> Thanks,
> > > > >>>>>>> Ayoub
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>
> > > > >
> > > >
> > >
> >
>

Reply via email to