Hi Bill,

Thanks for your answer.

> Can you update the KIP to state that deduplication by value will result in
> auto-repartiting by Kafka Streams

Updated the KIP to the latest proposal.

> For option 3, what about `deduplicateByKeyValue`? (If during the PR phase
> we stumble across a better name we can update the KIP then too).

Yes, better than the first suggested name. Reflected in the KIP.

I also added a note about records with null deduplication key.

If no new points are raised in the next few days, I will start a VOTE
thread.

Thanks,
Ayoub




Le mar. 27 août 2024 à 17:47, Bill Bejeck <bbej...@gmail.com> a écrit :

> Hi Ayoub,
>
> Thanks for the update.
>
> Your latest proposal looks good to me, but I have a couple of comments.
>
> in (2) we repartition by the id field
> >
>
> I agree with this approach, but the KIP still states "No repartitioning is
> done by this processor. In case a user wants to deduplicate an id over
> multiple partitions, they should first repartition the topic on this id."
> Can you update the KIP to state that deduplication by value will result in
> auto-repartiting by Kafka Streams, unless the developer adds a manual
> repartitioning operator themselves.
>
> For option 3, what about `deduplicateByKeyValue`? (If during the PR phase
> we stumble across a better name we can update the KIP then too).
>
> This latest update has been open for a while now, I'd say it's good to
> start a VOTE thread in 1-2 days.
>
> Thanks,
> Bill
>
> On Mon, Aug 26, 2024 at 3:15 AM Ayoub Omari <ayoubomar...@gmail.com>
> wrote:
>
> > Hi Bill,
> >
> > I think all the points raised are now handled in the KIP.
> > We just still have the 106) on which we should have an
> > agreement. I gave a proposition in my last mail, and
> > I am waiting for others' opinions.
> >
> > My proposition is to have 3 methods of deduplication:
> > 1) `deduplicateByKey()`
> > 2) `deduplicate((k, v) -> v.id)`
> > 3) `deduplicateByKeyAndId((k, v) -> v.id)`  (Name to be chosen)
> >
> > The point of having both (2) and (3) is that in (2) we repartition
> > by the id field, Whereas in (3) we are not repartitioning.
> >
> > (3) is needed in case the current partitioning ensures that a same
> > id will always land on the same task. Here is an example I gave above:
> >
> > > For example, we have a topic with keyValue (`userId`, `transaction`)
> > > and deduplication is done on `transaction`.`id` . Here, the application
> > > wants to deduplicate transactions. It knows that a transaction id
> > > maps to a single userId. Any duplicate of that record would be received
> > > by the task which processes this userId.
> >
> > Let me know what you think.
> >
> > Thanks,
> > Ayoub
> >
> >
> >
> >
> > Le jeu. 22 août 2024 à 21:37, Bill Bejeck <bbej...@apache.org> a écrit :
> >
> > > Hi Ayoub,
> > >
> > > What's the status of this KIP?
> > >
> > > Regards,
> > > Bill
> > >
> > >
> > > On Sat, Jul 13, 2024 at 5:13 AM Ayoub Omari <ayoubomar...@gmail.com>
> > > wrote:
> > >
> > > > Hi Bill,
> > > >
> > > > Thanks for your comments !
> > > >
> > > > I am prefixing your points with B.
> > > >
> > > > B1.
> > > > That's a good catch. I think punctuated records aren't concerned by
> > > > this problem of reprocessing because they are not issued twice.
> > > > We will still need to make a difference between a punctuated
> > > > and a source record. The logic becomes:
> > > >
> > > > > if record.offset is not null && record.offset = offset_stored:
> > > >
> > > >      forward(record)
> > > >
> > > > else:
> > > >
> > > >     // do_not_forward
> > > >
> > > >
> > > > I will update the KIP on this point.
> > > >
> > > >
> > > >
> > > > B2. (i.e. 106)
> > > > That's the point discussed in 106. The main idea is to avoid
> > > repartitioning
> > > > when we don't have to. But let's say we want to repartition when
> > > > deduplication is on an id that is not the key.
> > > >
> > > > The user code may look something like this:
> > > >  ```kstream
> > > >      .deduplicate((k, v) -> v.id)
> > > >      .map((k, v) -> ...)
> > > >   ```
> > > > The user expects that the key in the map processor which follows the
> > > > deduplicate is the same as the initial key.
> > > >
> > > > Internally, if we decide to repartition the deduplication processor
> may
> > > do
> > > > the following:
> > > > ```kstream
> > > >      .map((k, v) -> (v.id, ...)) // For repartitioning on id
> > > >      .deduplicateByKey()
> > > >      .map((id, v) -> (initialKey, ...)) // To recover the initial key
> > > >   ```
> > > >
> > > > In this case, there is some internal complexity of this processor:
> > > >     I) We should repartition twice, before and after deduplication.
> The
> > > > second
> > > > map triggers repartitioning if there are following stateful
> processors
> > > > in the pipeline.
> > > >
> > > >     II.) The initial key should be kept somewhere. One way is to wrap
> > the
> > > > value
> > > > to add the key, and then unwrap it after the `deduplicateByKey`
> > > >
> > > > As mentioned in one use case above (deduplicating transactions -
> > written
> > > at
> > > > least
> > > > once - by id), repartition is not needed for that case. This added
> > > > complexity won't
> > > > be useful.
> > > >
> > > > *TLDR,* I think one solution is to have a separate api to cover such
> a
> > > use
> > > > case. In total,
> > > > we can have 3 apis of deduplication
> > > >
> > > > 1) `deduplicateByKey()`  *// No repartitioning*
> > > > 2) `deduplicateByKey((k, v) -> v.id)`  *or
> *`deduplicateByKeyAndId((k,
> > > v)
> > > > -> v.id)`  *// No repartitioning*
> > > > 3) `deduplicate((k, v) -> v.id)`  *// causes repartitioning*
> > > >
> > > > The second api is equivalent to deduplicate by the couple (key, id).
> > > > No repartitioning is required because the records with the same key
> are
> > > > already
> > > > together in the same partition. cc @Matthias @Sebastien.
> > > >
> > > >
> > > >
> > > > B3.
> > > > I think here we can do what some existing processors do for periodic
> > > work,
> > > > we can track the last cleanup time and the current stream time,
> > > > if the delta exceeds the deduplication period, we trigger the
> cleanup.
> > > >
> > > >
> > > > Best,
> > > > Ayoub
> > > >
> > > > Le jeu. 11 juil. 2024 à 01:17, Bill Bejeck <bbej...@apache.org> a
> > écrit
> > > :
> > > >
> > > > > Hi All,
> > > > >
> > > > > Thanks Ayoub for getting this KIP going again, I think a
> > deduplication
> > > > > operator will be very useful.  My applogies for being late to the
> > > > > discussion.
> > > > >
> > > > > Overall - I agree with the direction of the KIP but I have a couple
> > of
> > > > > questions.
> > > > >
> > > > >  1.  Regarding using offsets for tracking the first arrival.  I
> think
> > > > > there could be a case when offsets would not be available, when
> > records
> > > > are
> > > > > forwarded by punctuation for example.
> > > > >
> > > > > 2. I'm not sure about using something other than the key for
> > > identifying
> > > > > dupllicate records.  By doing so, one could end up missing a
> > > > de-duplication
> > > > > due to records with the same id characteristic in the value but
> > having
> > > > > different keys so the records land on different partitions.  I
> guess
> > we
> > > > > could enforce a repartition if one choses to use a de-duplication
> id
> > > > other
> > > > > than the key.
> > > > >
> > > > > 3. I think a punctuation scheduled to run at the de-duplication
> > period
> > > to
> > > > > clean out older records would be a clean approach for purging older
> > > > > records.  I'm not taking a hard stance on this approach and I'm
> > willing
> > > > to
> > > > > discuss different methods.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Bill
> > > > >
> > > > >
> > > > >
> > > > > On 2024/06/25 18:44:06 Ayoub Omari wrote:
> > > > > > Hi Matthias,
> > > > > >
> > > > > > Here are my updates on your points.
> > > > > >
> > > > > > 101.
> > > > > > > You propose to add static methods `keySerde()` and
> `valueSerde()`
> > > --
> > > > > > > in other config classes, we use only `with(keySerde,
> valueSerde)`
> > > as
> > > > we
> > > > > > try
> > > > > > > to use the "builder" pattern, and avoid too many overloads. I
> > would
> > > > > > > prefer to omit both methods you suggest and just use a single
> > > `with`
> > > > > for
> > > > > > > both serdes.
> > > > > >
> > > > > > I was actually inspired by the other config classes, for example
> > > > `Joined`
> > > > > > and
> > > > > > `Grouped` both have the static methods `keySerde()` and
> > > `valueSerde()`.
> > > > > >
> > > > > > > I think we don't want to add `with(...)` which takes all
> > > > > > > parameters at once
> > > > > >
> > > > > > Done.
> > > > > >
> > > > > >
> > > > > > 102.
> > > > > > Thanks, your suggestion sounds good to me. The trade-off of
> having
> > an
> > > > > index
> > > > > > that allows to efficiently purge expired records besides the
> > keyValue
> > > > > store
> > > > > > makes sense. I've been looking into the code, and I think a
> similar
> > > > idea
> > > > > > was implemented for other processors (for example with
> > > > > > DualSchemaRocksDBSegmentedBytesStore).
> > > > > > As you said, I think we would benefit from some existing code
> here.
> > > > > > KIP updated !
> > > > > >
> > > > > >
> > > > > > 104.
> > > > > > Updated the KIP to consider records' offsets.
> > > > > >
> > > > > >
> > > > > > 105
> > > > > > > picking the first offset with smallest ts sounds good to me.
> The
> > > KIP
> > > > > > > should be explicit about it
> > > > > >
> > > > > > Done.
> > > > > >
> > > > > > > But as discussed above, it might be
> > > > > > > simplest to not really have a window lookup, but just a plain
> > > > > key-lookup
> > > > > > > and drop if the key exists in the store?
> > > > > >
> > > > > > KIP updated, we will be `.get()`ing from a keyValueStore instead
> of
> > > > > > `.fetch()`ing
> > > > > > from a WindowStore.
> > > > > >
> > > > > > > Another line of thinking, that did serve us well in the past:
> in
> > > > doubt
> > > > > > > keep a record -- users can add operators to drop record (in
> case
> > > they
> > > > > > > don't want to keep it), but if we drop a record, users have no
> > way
> > > to
> > > > > > > resurrect it (thus, building a workaround to change semantica
> is
> > > > > > > possible for users if we default to keep records, but not the
> > other
> > > > way
> > > > > > > around).
> > > > > >
> > > > > > Makes total sense ! I updated the KIP to forward late records
> > instead
> > > > of
> > > > > > dropping them.
> > > > > >
> > > > > >
> > > > > > 106.
> > > > > > For the moment, I highlighted in Javadocs that we are
> deduplicating
> > > by
> > > > > > partition. If there is a better name to have this information in
> > the
> > > > name
> > > > > > of the api itself it would be good.
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Ayoub
> > > > > >
> > > > > >
> > > > > > Le jeu. 13 juin 2024 à 09:03, Sebastien Viale <
> > > > > sebastien.vi...@michelin.com>
> > > > > > a écrit :
> > > > > >
> > > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > 106 :
> > > > > > >
> > > > > > > Thanks for the clarification. Actually, this is not what I
> > > expected,
> > > > > but I
> > > > > > > better understand the performance issues regarding the state
> > store
> > > > > > > iteration.
> > > > > > > If this is how it should be designed, it is fine for me as long
> > as
> > > it
> > > > > is
> > > > > > > clear that the repartition must be done before the
> deduplication.
> > > > > > > Sébastien
> > > > > > >
> > > > > > > ________________________________
> > > > > > > De : Matthias J. Sax <mj...@apache.org>
> > > > > > > Envoyé : jeudi 13 juin 2024 02:51
> > > > > > > À : dev@kafka.apache.org <dev@kafka.apache.org>
> > > > > > > Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication
> processor
> > in
> > > > > > > kafka-streams
> > > > > > >
> > > > > > > Warning External sender Do not click on any links or open any
> > > > > attachments
> > > > > > > unless you trust the sender and know the content is safe.
> > > > > > >
> > > > > > > 106:
> > > > > > >
> > > > > > > > For the use-case of deduplicating a "at least once written"
> > > stream,
> > > > > > > > we are sure that the duplicate record has the same key as the
> > > > > > > > original one, and will land on the same task. Here, a user
> > would
> > > > > > > > want to specify a deduplication key different from the
> topic's
> > > key
> > > > > > > > in case the topic's key is not a unique identifier
> > > > > > > >
> > > > > > > > For example, we have a topic with keyValue (`userId`,
> > > > `transaction`)
> > > > > > > > and deduplication is done on `transaction`.`id` . Here, the
> > > > > application
> > > > > > > > wants to deduplicate transactions. It knows that a
> transaction
> > id
> > > > > > > > maps to a single userId. Any duplicate of that record would
> be
> > > > > received
> > > > > > > > by the task which processes this userId.
> > > > > > >
> > > > > > > This is an interesting point.
> > > > > > >
> > > > > > > My concern is to some extend, that it seems (on the surface) to
> > not
> > > > > > > follow the established pattern of auto-repartitioning in the
> DSL.
> > > Of
> > > > > > > course, given that the current proposal says we use an "id
> > > extractor"
> > > > > > > and not a "key extractor" it might be ok (but it might be
> > somewhat
> > > > > > > subtle). Of course, JavaDocs always help to explain in detail.
> > > Would
> > > > > > > this be enough?
> > > > > > >
> > > > > > > Would be good to hear from others about this point. I am
> > personally
> > > > not
> > > > > > > sure which approach I would prefer personally at this point.
> > > > > > >
> > > > > > > The problem reminds me on
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-10844<
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-10844> which is
> not
> > > > > resolve
> > > > > > > directly either. We do have KIP-759
> > > > > > > (https://cwiki.apache.org/confluence/display/KAFKA/KIP-759:
> > > Unneeded
> > > > > > > repartition canceling<
> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-759:
> > > Unneeded
> > > > > > > repartition canceling>)
> > > > > > > which is WIP and helps with KAFKA-10844, but not sure if it
> would
> > > be
> > > > a
> > > > > > > viable solution for the de-duplication case?
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > -Matthias
> > > > > > >
> > > > > > > This email was screened for spam and malicious content but
> > exercise
> > > > > > > caution anyway.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On 6/11/24 2:31 PM, Ayoub Omari wrote:
> > > > > > > > Hi Sebastien & Matthias,
> > > > > > > >
> > > > > > > > For 106.
> > > > > > > > My idea was to deduplicate on a per-task basis. If the user
> > wants
> > > > > > > > to do a global deduplication over all partitions, I think
> it's
> > > > > better to
> > > > > > > > have him explicitly repartition and then call the
> deduplication
> > > > > > > processor.
> > > > > > > >
> > > > > > > > For the use-case of deduplicating a "at least once written"
> > > stream,
> > > > > > > > we are sure that the duplicate record has the same key as the
> > > > > > > > original one, and will land on the same task. Here, a user
> > would
> > > > > > > > want to specify a deduplication key different from the
> topic's
> > > key
> > > > > > > > in case the topic's key is not a unique identifier.
> > > > > > > >
> > > > > > > > For example, we have a topic with keyValue (`userId`,
> > > > `transaction`)
> > > > > > > > and deduplication is done on `transaction`.`id` . Here, the
> > > > > application
> > > > > > > > wants to deduplicate transactions. It knows that a
> transaction
> > id
> > > > > > > > maps to a single userId. Any duplicate of that record would
> be
> > > > > received
> > > > > > > > by the task which processes this userId.
> > > > > > > >
> > > > > > > > One other thought I have when writing the KIP about global
> > > > > deduplication,
> > > > > > > > is that it will require to map twice the key of the stream
> > (first
> > > > > map to
> > > > > > > > change the key to deduplication key, and second map to get
> > > > > > > > back the initial key). Second map may imply a second
> > > > repartitioning.
> > > > > > > >
> > > > > > > > However, if we do a per-task deduplication, the user may
> adapt
> > to
> > > > his
> > > > > > > > specific use-case.
> > > > > > > >
> > > > > > > > Let me know what you think
> > > > > > > > Ayoub
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Le mar. 11 juin 2024 à 20:21, Matthias J. Sax <
> > mj...@apache.org>
> > > a
> > > > > > > écrit :
> > > > > > > >
> > > > > > > >> Thanks Sebastien,
> > > > > > > >>
> > > > > > > >> that's a good point. Thanks for raising it. -- I like your
> > > > proposal.
> > > > > > > >>
> > > > > > > >> An alternative would be to have two overloads of
> > `deduplicate()`
> > > > > one w/
> > > > > > > >> and one w/o the "id extractor" parameter. This would be less
> > > > > explicit
> > > > > > > >> though.
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> -Matthias
> > > > > > > >>
> > > > > > > >> On 6/11/24 2:30 AM, Sebastien Viale wrote:
> > > > > > > >>> Hi,
> > > > > > > >>>
> > > > > > > >>> I am really interested in this KIP.
> > > > > > > >>>
> > > > > > > >>> 106:
> > > > > > > >>> I hope I am not talking nonsense, but if you do not
> > deduplicate
> > > > > based
> > > > > > > on
> > > > > > > >> the key, the input stream has to be repartitioned.
> > > > > > > >>> Otherwise, different stream tasks may handle records that
> > need
> > > to
> > > > > be
> > > > > > > >> deduplicated, and thus duplicates will not be detected.
> > > > > > > >>>
> > > > > > > >>> This is why I would have created two different methods, as
> is
> > > > done
> > > > > for
> > > > > > > >> GroupBy:
> > > > > > > >>>
> > > > > > > >>> deduplicateByKey(...)
> > > > > > > >>> deduplicate(...)
> > > > > > > >>>
> > > > > > > >>> If deduplicateByKey is used, the input stream does not need
> > to
> > > be
> > > > > > > >> repartitioned.
> > > > > > > >>>
> > > > > > > >>> thanks
> > > > > > > >>>
> > > > > > > >>> Sébastien
> > > > > > > >>> ________________________________
> > > > > > > >>> De : Matthias J. Sax <mj...@apache.org>
> > > > > > > >>> Envoyé : mardi 11 juin 2024 01:54
> > > > > > > >>> À : dev@kafka.apache.org <dev@kafka.apache.org>
> > > > > > > >>> Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication
> > > processor
> > > > in
> > > > > > > >> kafka-streams
> > > > > > > >>>
> > > > > > > >>> Warning External sender Do not click on any links or open
> any
> > > > > > > >> attachments unless you trust the sender and know the content
> > is
> > > > > safe.
> > > > > > > >>>
> > > > > > > >>> Thanks for the update Ayoub.
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>> 101: you say:
> > > > > > > >>>
> > > > > > > >>>> But I am not sure if we don't want to have them for this
> > > > > processor ?
> > > > > > > >>>
> > > > > > > >>> What is your reasoning to move off the established pattern?
> > > Would
> > > > > be
> > > > > > > >>> good to understand, why `Deduplicated` class needs a
> > different
> > > > > > > >>> "structure" compared to existing classes.
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>> 102: Creating iterators is very expensive. For other work,
> we
> > > > > actually
> > > > > > > >>> hit 100x (?) throughput degradation by creating an (for
> most
> > > > cases
> > > > > > > >>> empty) iterator for every input record, and needed to find
> > > other
> > > > > ways
> > > > > > > to
> > > > > > > >>> avoid creating an iterator per record. It really kills
> > > > performance.
> > > > > > > >>>
> > > > > > > >>> I see the point about data expiration. We could experiment
> > with
> > > > > > > >>> punctuation to expire old data, or add a second
> "time-ordered
> > > > > store"
> > > > > > > >>> (which we already have at hand) which acts as an index into
> > the
> > > > > main
> > > > > > > >>> store. -- Another possibility would be to add a new version
> > of
> > > > > > > segmented
> > > > > > > >>> store with a different key-layout (ie, just store the plain
> > > > key). I
> > > > > > > >>> think with some refactoring, we might be able to re-use a
> lot
> > > of
> > > > > > > >>> existing code.
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>> 104:
> > > > > > > >>>
> > > > > > > >>>> This gets me wondering if this is a limitation of stateful
> > > > > processors
> > > > > > > >>>> in ALOS. For example a windowed aggregation with
> > > > `on_window_close`
> > > > > > > >>>> emit strategy may have the same limitation today (we
> > receive a
> > > > > record,
> > > > > > > >>>> we update its aggregation result in the store, then crash
> > > before
> > > > > > > >> committing,
> > > > > > > >>>> then the record will be again reconsidered for
> aggregation).
> > > Is
> > > > > this
> > > > > > > >>>> correct ?
> > > > > > > >>>
> > > > > > > >>> Yes, this is correct, but it does not violate ALOS, because
> > we
> > > > did
> > > > > not
> > > > > > > >>> lose the input record -- of course, the aggregation would
> > > contain
> > > > > the
> > > > > > > >>> input record twice (eg, over count), but this is ok under
> > ALOS.
> > > > > > > >>> Unfortunately, for de-duplication this pattern breaks,
> > because
> > > > > > > >>> de-duplication operator does a different "aggregation
> logic"
> > > > > depending
> > > > > > > >>> on its state (emit if no key found, but not emit if key
> > found).
> > > > For
> > > > > > > >>> counting as an example, we increment the count and emit
> > > > > unconditionally
> > > > > > > >>> though.
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>>> As a workaround, I think storing the record's offset
> inside
> > > the
> > > > > > > >>>> store's value can tell us whether the record has been
> > already
> > > > > seen or
> > > > > > > >> not.
> > > > > > > >>>> If we receive a record whose deduplication id exists in
> the
> > > > store
> > > > > > > >>>> and the entry in the store has the same offset, it means
> the
> > > > > record
> > > > > > > >>>> is processed twice and we can go ahead and forward it. If
> > the
> > > > > offset
> > > > > > > >>>> is different, it means it's a duplicate record, so we
> ignore
> > > it.
> > > > > > > >>>
> > > > > > > >>> Great idea. This might work... If we store the input record
> > > > > offset, we
> > > > > > > >>> can actually avoid that the "aggregation logic" changes for
> > the
> > > > > same
> > > > > > > >>> input record. -- And yes, with ALOS potentially emitting a
> > > > > duplicate is
> > > > > > > >>> the-name-of-the-game, so no concerns on this part from my
> > side.
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>> 105: picking the first offset with smallest ts sound good
> to
> > > me.
> > > > > The
> > > > > > > KIP
> > > > > > > >>> should be explicit about it. But as discussed above, it
> might
> > > be
> > > > > > > >>> simplest to not really have a window lookup, but just a
> plain
> > > > > > > key-lookup
> > > > > > > >>> and drop if the key exists in the store? -- For late
> records,
> > > it
> > > > > might
> > > > > > > >>> imply that they are not de-duplicated, but this is also the
> > > case
> > > > > for
> > > > > > > >>> in-order records if they are further apart than the
> > > > de-duplication
> > > > > > > >>> window size, right? Thus I would believe this is "more
> > natural"
> > > > > > > compared
> > > > > > > >>> to discarding late records pro-actively, which would lead
> to
> > > > > missing
> > > > > > > >>> result records?
> > > > > > > >>>
> > > > > > > >>> We could also make this configurable if we are not sure
> what
> > > > users
> > > > > > > >>> really need -- or add such a configuration later in case
> the
> > > > > semantics
> > > > > > > >>> we pick don't work for some users.
> > > > > > > >>>
> > > > > > > >>> Another line of thinking, that did serve us well in the
> past:
> > > in
> > > > > doubt
> > > > > > > >>> keep a record -- users can add operators to drop record (in
> > > case
> > > > > they
> > > > > > > >>> don't want to keep it), but if we drop a record, users have
> > no
> > > > way
> > > > > to
> > > > > > > >>> resurrect it (thus, building a workaround to change
> semantica
> > > is
> > > > > > > >>> possible for users if we default to keep records, but not
> the
> > > > > other way
> > > > > > > >>> around).
> > > > > > > >>>
> > > > > > > >>> Would be good to get input from the broader community on
> this
> > > > > question
> > > > > > > >>> thought. In the end, it must be a use-case driven decision?
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>> -Matthias
> > > > > > > >>>
> > > > > > > >>> This email was screened for spam and malicious content but
> > > > exercise
> > > > > > > >> caution anyway.
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>> On 6/6/24 5:02 AM, Ayoub Omari wrote:
> > > > > > > >>>> Hi Matthias,
> > > > > > > >>>>
> > > > > > > >>>> Thank you for your review !
> > > > > > > >>>>
> > > > > > > >>>> 100.
> > > > > > > >>>> I agree. I changed the name of the parameter to
> > "idSelector".
> > > > > > > >>>> Because this id may be computed, It is better to call it
> > "id"
> > > > > rather
> > > > > > > >> than
> > > > > > > >>>> field or attribute.
> > > > > > > >>>>
> > > > > > > >>>> 101.
> > > > > > > >>>> The reason I added the methods `keySerde()` and
> > `valueSerde()`
> > > > > was to
> > > > > > > >>>> have the same capabilities as other serde classes (such as
> > > > Grouped
> > > > > > > >>>> or Joined). As a Kafka-streams user, I usually use
> > > > `with(keySerde,
> > > > > > > >>>> valueSerde)`
> > > > > > > >>>> as you suggested. But I am not sure if we don't want to
> have
> > > > them
> > > > > for
> > > > > > > >> this
> > > > > > > >>>> processor ?
> > > > > > > >>>>
> > > > > > > >>>> 102.
> > > > > > > >>>> That's a good point ! Because we know that the window
> store
> > > will
> > > > > > > contain
> > > > > > > >>>> at most one instance of a given key, I am not sure how the
> > > range
> > > > > fetch
> > > > > > > >>>> on WindowStore compares to a KeyValueStore `get()` in this
> > > case.
> > > > > > > >>>> Wondering if the fact that the record's key is the prefix
> of
> > > the
> > > > > > > >> underlying
> > > > > > > >>>> keyValueStore's key ("<dataKey,ts>") may provide
> comparable
> > > > > > > performance
> > > > > > > >>>> to the random access of KeyValueStore ? Of course, the
> > > > WindowStore
> > > > > > > >> fetch()
> > > > > > > >>>> would be less efficient because it may fetch from more
> than
> > 1
> > > > > segment,
> > > > > > > >> and
> > > > > > > >>>> because of some iterator overhead.
> > > > > > > >>>>
> > > > > > > >>>> The purpose of using a WindowStore is to automatically
> purge
> > > old
> > > > > data.
> > > > > > > >>>> For example, deduplicating a topic written at least once
> > > > wouldn't
> > > > > > > >> require
> > > > > > > >>>> keeping a large history. This is not the case of using a
> > > > > KeyValueStore
> > > > > > > >>>> which would require scanning regularly to remove expired
> > > > records.
> > > > > > > >>>> That might cause a sudden increase of latency whenever the
> > > > cleanup
> > > > > > > >>>> is triggered.
> > > > > > > >>>>
> > > > > > > >>>> It would be good to hear from anyone who has done some
> > > analysis
> > > > > > > >>>> on RocksDB's range fetch.
> > > > > > > >>>>
> > > > > > > >>>> 103.
> > > > > > > >>>> Sure, I can update it once we agree on underlying
> semantics.
> > > > > > > >>>>
> > > > > > > >>>> 104.
> > > > > > > >>>> Another good point !
> > > > > > > >>>>
> > > > > > > >>>>> In the end, de-duplication does only make sense when EOS
> is
> > > > used
> > > > > > > >>>>
> > > > > > > >>>> I agree with that. And for me, the use case of
> > deduplicating a
> > > > > topic
> > > > > > > >>>> written ALOS inside an EOS application might be the top 1
> > use
> > > > case
> > > > > > > >>>> of deduplication.
> > > > > > > >>>>
> > > > > > > >>>>> all downstream processing happens, `context.forward()`
> > > returns
> > > > > > > >>>>> and we update the state store, we could now crash w/o
> > > > committing
> > > > > > > >> offsets
> > > > > > > >>>>
> > > > > > > >>>> This gets me wondering if this is a limitation of stateful
> > > > > processors
> > > > > > > >>>> in ALOS. For example a windowed aggregation with
> > > > `on_window_close`
> > > > > > > >>>> emit strategy may have the same limitation today (we
> > receive a
> > > > > record,
> > > > > > > >>>> we update its aggregation result in the store, then crash
> > > before
> > > > > > > >> committing,
> > > > > > > >>>> then the record will be again reconsidered for
> aggregation).
> > > Is
> > > > > this
> > > > > > > >>>> correct ?
> > > > > > > >>>>
> > > > > > > >>>> As a workaround, I think storing the record's offset
> inside
> > > the
> > > > > > > >>>> store's value can tell us whether the record has been
> > already
> > > > > seen or
> > > > > > > >> not.
> > > > > > > >>>> If we receive a record whose deduplication id exists in
> the
> > > > store
> > > > > > > >>>> and the entry in the store has the same offset, it means
> the
> > > > > record
> > > > > > > >>>> is processed twice and we can go ahead and forward it. If
> > the
> > > > > offset
> > > > > > > >>>> is different, it means it's a duplicate record, so we
> ignore
> > > it.
> > > > > > > >>>>
> > > > > > > >>>> As you said, we don't have any guarantee whether the
> initial
> > > > > record
> > > > > > > was
> > > > > > > >>>> forwarded or not in case of a crash before commit. In this
> > > > > solution
> > > > > > > >>>> we would forward the record twice, which is against
> > > > deduplication.
> > > > > > > >>>> But, this is still an ALOS application, so it has the same
> > > > > semantics
> > > > > > > >>>> as any other such application. With this, I am not sure we
> > can
> > > > > > > >>>> have "strict" deduplication for ALOS applications.
> > > > > > > >>>>
> > > > > > > >>>> 105.
> > > > > > > >>>> For me, if there are two duplicate records, it means they
> > are
> > > > > > > >>>> the same in the application's point of view, so it can
> > choose
> > > > > > > >>>> either one. Thus, I would go with forwarding the record
> with
> > > > > > > >>>> the least offset.
> > > > > > > >>>>
> > > > > > > >>>>> Would it not be desired to drop all duplicates
> independent
> > > > > > > >>>>> of their ts, as long as we find a record in the store?
> > > > > > > >>>>
> > > > > > > >>>> This is actually related to the (suggested) windowed
> nature
> > > > > > > >>>> of deduplication. As in 102. we don't want to do a
> "forever"
> > > > > > > >>>> deduplication, which may be impossible for huge workloads
> > > > > > > >>>> where all records should be kept in the store. Hence, the
> > > fetch
> > > > > > > >>>> of timestamp between [ts-deduplicationInterval,
> > > > > > > >> ts+deduplicationInterval]
> > > > > > > >>>>
> > > > > > > >>>> About late records, this is again due to the windowed
> > nature.
> > > > > > > >>>> Because the store won't save those late (i.e. expired)
> > > records,
> > > > > > > >>>> we have two options. Either, we do not apply deduplication
> > > > > > > >>>> on them, thus the deduplication doesn't work on late
> > records,
> > > > > > > >>>> or we discard them (which is the option I suggest).
> > > > > > > >>>> In the second case, It would be up to the user to choose
> > > > > > > >>>> any deduplicationInterval that may sufficiently cover all
> > his
> > > > late
> > > > > > > data.
> > > > > > > >>>> What do you think ?
> > > > > > > >>>>
> > > > > > > >>>> Thanks,
> > > > > > > >>>> Ayoub
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>> Le mar. 4 juin 2024 à 23:58, Matthias J. Sax <
> > > mj...@apache.org>
> > > > a
> > > > > > > >> écrit :
> > > > > > > >>>>
> > > > > > > >>>>> Ayoub,
> > > > > > > >>>>>
> > > > > > > >>>>> thanks for resurrecting this KIP. I think a built-in
> > > > > de-duplication
> > > > > > > >>>>> operator will be very useful.
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>> Couple of questions:
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>> 100: `deduplicationKeySelector`
> > > > > > > >>>>>
> > > > > > > >>>>> Is this the best name? It might indicate that we select a
> > > "key"
> > > > > what
> > > > > > > is
> > > > > > > >>>>> an overloaded term... Maybe we could use `Field` or `Id`
> or
> > > > > > > `Attribute`
> > > > > > > >>>>> instead of `Key` in the name? Just brainstorming. If we
> > think
> > > > > `Key`
> > > > > > > is
> > > > > > > >>>>> the best word, I am also ok with it.
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>> 101: `Deduplicated` class
> > > > > > > >>>>>
> > > > > > > >>>>> You propose to add static methods `keySerde()` and
> > > > > `valueSerde()` --
> > > > > > > in
> > > > > > > >>>>> other config classes, we use only `with(keySerde,
> > > valueSerde)`
> > > > > as we
> > > > > > > >> try
> > > > > > > >>>>> to use the "builder" pattern, and avoid too many
> > overloads. I
> > > > > would
> > > > > > > >>>>> prefer to omit both methods you suggest and just use a
> > single
> > > > > `with`
> > > > > > > >> for
> > > > > > > >>>>> both serdes.
> > > > > > > >>>>>
> > > > > > > >>>>> Similarly, I thing we don't want to add `with(...)` which
> > > takes
> > > > > all
> > > > > > > >>>>> parameters at once (which should only be 3 parameters,
> not
> > 4
> > > as
> > > > > it's
> > > > > > > >>>>> currently in the KIP)?
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>> 102: Usage of `WindowedStore`:
> > > > > > > >>>>>
> > > > > > > >>>>> Would this be efficient? The physical byte layout it
> > > > > "<dataKey,ts>"
> > > > > > > for
> > > > > > > >>>>> the store key, so it would be difficult to do an
> efficient
> > > > lookup
> > > > > > > for a
> > > > > > > >>>>> given "de-duplication key" to discard duplicates, as we
> > don't
> > > > > know
> > > > > > > the
> > > > > > > >>>>> timestamp of the first record with the same
> "de-duplication
> > > > key".
> > > > > > > >>>>>
> > > > > > > >>>>> This boils down to the actual de-duplication logic (some
> > more
> > > > > > > comments
> > > > > > > >>>>> below), but what you propose seems to require expensive
> > > > > range-scans
> > > > > > > >> what
> > > > > > > >>>>> could be cost prohibitive in practice. I think we need to
> > > find
> > > > a
> > > > > way
> > > > > > > to
> > > > > > > >>>>> use efficient key-point-lookups to make this work.
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>> 103: "Processing logic":
> > > > > > > >>>>>
> > > > > > > >>>>> Might need some updates (Cf 102 comment). I am not sure
> if
> > I
> > > > > fully
> > > > > > > >>>>> understand the logic: cf 105 below.
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>> 104:
> > > > > > > >>>>>
> > > > > > > >>>>>> If no entries found → forward the record + save the
> record
> > > in
> > > > > the
> > > > > > > >> store
> > > > > > > >>>>>
> > > > > > > >>>>> This part is critical, and we should discuss in detail.
> In
> > > the
> > > > > end,
> > > > > > > >>>>> de-duplication does only make sense when EOS is used, and
> > we
> > > > > might
> > > > > > > want
> > > > > > > >>>>> to call this out (eg, on the JavaDocs)? But if used with
> > > ALOS,
> > > > > it's
> > > > > > > >> very
> > > > > > > >>>>> difficult to ensure that we never lose data... Your
> > proposal
> > > to
> > > > > > > >>>>> first-forward goes into the right direction, but does not
> > > > really
> > > > > > > solve
> > > > > > > >>>>> the problem entirely:
> > > > > > > >>>>>
> > > > > > > >>>>> Even if we forward the message first, all downstream
> > > processing
> > > > > > > >> happens,
> > > > > > > >>>>> `context.forward()` returns and we update the state
> store,
> > we
> > > > > could
> > > > > > > now
> > > > > > > >>>>> crash w/o committing offsets. For this case, we have no
> > > > guarantee
> > > > > > > that
> > > > > > > >>>>> the result records where published (as we did not flush
> the
> > > > > producer
> > > > > > > >>>>> yet), but when re-reading from the input topic, we would
> > find
> > > > the
> > > > > > > >> record
> > > > > > > >>>>> in the store and incorrectly drop as duplicate...
> > > > > > > >>>>>
> > > > > > > >>>>> I think the only solution to make this work would be to
> use
> > > > > TX-state
> > > > > > > >>>>> stores in combination with ALOS as proposed via KIP-892?
> > > > > > > >>>>>
> > > > > > > >>>>> Using an in-memory store won't help much either? The
> > producer
> > > > > could
> > > > > > > >> have
> > > > > > > >>>>> send the write into the changelog topic, but not into the
> > > > result
> > > > > > > topic,
> > > > > > > >>>>> and thus we could still not guarantee ALOS...?
> > > > > > > >>>>>
> > > > > > > >>>>> How do we want to go about this? We could also say, this
> > new
> > > > > operator
> > > > > > > >>>>> only works with EOS. Would this be too restrictive? -- At
> > > lest
> > > > > for
> > > > > > > >> know,
> > > > > > > >>>>> until KIP-892 lands, and we could relax it?
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>> 105: "How to detect late records"
> > > > > > > >>>>>
> > > > > > > >>>>> In the end, it seems to boil down to determine which of
> the
> > > > > records
> > > > > > > to
> > > > > > > >>>>> forward and which record to drop, for (1) the regular
> case
> > > and
> > > > > (2)
> > > > > > > the
> > > > > > > >>>>> out-of-order data case.
> > > > > > > >>>>>
> > > > > > > >>>>> Regular case (no out-of-order data): For this case,
> offset
> > > and
> > > > ts
> > > > > > > order
> > > > > > > >>>>> is the same, and we can forward the first record we get.
> > All
> > > > > later
> > > > > > > >>>>> record within "de-duplication period" with the same
> > > > > "de-duplication
> > > > > > > >> key"
> > > > > > > >>>>> would be dropped. If a record with the same
> "de-duplication
> > > > key"
> > > > > > > >> arrives
> > > > > > > >>>>> after "de-duplication period" passed, we cannot drop it
> any
> > > > > longer,
> > > > > > > but
> > > > > > > >>>>> would still forward it, as by the contract of the
> operator
> > /
> > > > > > > >>>>> de-duplication period.
> > > > > > > >>>>>
> > > > > > > >>>>> For the out-of-order case: The first question we need to
> > > answer
> > > > > is,
> > > > > > > do
> > > > > > > >>>>> we want to forward the record with the smallest offset or
> > the
> > > > > record
> > > > > > > >>>>> with the smallest ts? Logically, forwarding with the
> > smallest
> > > > ts
> > > > > > > might
> > > > > > > >>>>> be "more correct", however, it implies we could only
> > forward
> > > it
> > > > > after
> > > > > > > >>>>> "de-duplication period" passed, what might be undesired
> > > > latency?
> > > > > > > Would
> > > > > > > >>>>> this be desired/acceptable?
> > > > > > > >>>>>
> > > > > > > >>>>> In contrast, if we forward record with the smallest
> offset
> > > > (this
> > > > > is
> > > > > > > >> what
> > > > > > > >>>>> you seem to propose) we don't have a latency issue, but
> of
> > > > > course the
> > > > > > > >>>>> question what records to drop is more tricky to answer:
> it
> > > > seems
> > > > > you
> > > > > > > >>>>> propose to compare the time difference of the stored
> record
> > > to
> > > > > the
> > > > > > > >>>>> current record, but I am wondering why? Would it not be
> > > desired
> > > > > to
> > > > > > > drop
> > > > > > > >>>>> all duplicates independent of their ts, as long as we
> find
> > a
> > > > > record
> > > > > > > in
> > > > > > > >>>>> the store? Would be good to get some more motivation and
> > > > > tradeoffs
> > > > > > > >>>>> discussed about the different strategies we could use.
> > > > > > > >>>>>
> > > > > > > >>>>> You also propose to drop _any_ late record: I am also not
> > > sure
> > > > if
> > > > > > > >> that's
> > > > > > > >>>>> desired? Could this not lead to data loss? Assume we get
> a
> > > late
> > > > > > > record,
> > > > > > > >>>>> but in fact there was never a duplicate? Why would we
> want
> > to
> > > > > drop
> > > > > > > it?
> > > > > > > >>>>> If there is a late record which is indeed a duplicate,
> but
> > we
> > > > > purged
> > > > > > > >> the
> > > > > > > >>>>> original record from the store already, it seems to be
> the
> > > same
> > > > > case
> > > > > > > as
> > > > > > > >>>>> for the "no out-of-order case": after we purged we cannot
> > > > > > > de-duplicate
> > > > > > > >>>>> and thus it's a regular case we can just accept?
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>> -Matthias
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>> On 5/29/24 4:58 AM, Ayoub Omari wrote:
> > > > > > > >>>>>> Hi everyone,
> > > > > > > >>>>>>
> > > > > > > >>>>>> I've just made a (small) change to this KIP about an
> > > > > implementation
> > > > > > > >>>>> detail.
> > > > > > > >>>>>> Please let me know your thoughts.
> > > > > > > >>>>>>
> > > > > > > >>>>>> Thank you,
> > > > > > > >>>>>> Ayoub
> > > > > > > >>>>>>
> > > > > > > >>>>>> Le lun. 20 mai 2024 à 21:13, Ayoub <
> > ayoubomar...@gmail.com>
> > > a
> > > > > > > écrit :
> > > > > > > >>>>>>
> > > > > > > >>>>>>> Hello,
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> Following a discussion on community slack channel, I
> > would
> > > > > like to
> > > > > > > >>>>> revive
> > > > > > > >>>>>>> the discussion on the KIP-655, which is about adding a
> > > > > > > deduplication
> > > > > > > >>>>>>> processor in kafka-streams.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>>
> > > > > > > >>>>>>>
> > > > > > > >>>>>
> > > > > > > >>
> > > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> > > > > > > <
> > > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> > > > > > > >
> > > > > > > >> <
> > > > > > > >>
> > > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> > > > > > > <
> > > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> > > > > > > >
> > > > > > > >>>
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> Even though the motivation is not quite the same as the
> > > > initial
> > > > > > > one,
> > > > > > > >> I
> > > > > > > >>>>>>> updated the KIP rather than creating a new one, as I
> > > believe
> > > > > the
> > > > > > > end
> > > > > > > >>>>> goal
> > > > > > > >>>>>>> is the same.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> Thanks,
> > > > > > > >>>>>>> Ayoub
> > > > > > > >>>>>>>
> > > > > > > >>>>>>>
> > > > > > > >>>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>
> > > > > > > >>>>
> > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to