Lucas,

Thanks for your remarks.
It seems we agree to remove the `.deduplicate()` api that
does the repartitioning. So I removed it from the KIP.

I also added more details of the semantics in the java docs,
please mention anything that should be added/removed if
there is any.

I will open a vote thread for this KIP.

Best,
Ayoub

Le mer. 6 nov. 2024 à 17:29, Lucas Brutschy <lbruts...@confluent.io.invalid>
a écrit :

> Hi Ayoub,
>
> I see that there was already a very long discussion thread, I won't
> start nitpicking names here. I think we are ready to move on to a
> vote. The agreed upon solutioins sound very reasonable to me, I
> especially like how you handled the ALOS case. Not deduplicating late
> records is a defensive choice - it's the lesser evil, even if it can
> lead to surprises for the user (data duplication surprises are
> certainly better than data loss surprises). Let's just make sure that,
> we write a long description of all those subtleties in the
> documentation, and not ask people to find it in the KIP.
>
> My main point is that I'd indeed vote to remove `deduplicate`. As
> stated before, the `deduplicate` operator does many things at once and
> it's easy for users to accidentally doing to much (repartitioning) by
> using it. It's probably a not-so-common use case which is not so hard
> to replicate using `deduplicateByKey`.
>
> Side note on for expiring records using punctuation - I see that this
> is, performance-wise, the best option we have here, so I agree going
> with this approach in the KIP. Just want to throw out there that in
> C++ rocksdb seems to have compaction filter which should give us the
> ability to expire records during compaction (and otherwise ignore
> expired records during processing) - this may be a nice long-term
> improvement for these kinds of things, but it would require extending
> the RocksDB JNI interfact to allow implementing custom
> CompactionFilters, IIUC. So it doesn't directly affect your KIP.
>
> Thanks for the hard work and for not giving up on this KIP!
>
> Cheers,
> Lucas
>
> On Tue, Nov 5, 2024 at 11:45 AM Ayoub Omari <ayoubomar...@gmail.com>
> wrote:
> >
> > Hi Matthias,
> >
> > Thanks for your review, and sorry for this late reply.
> >
> > (102) Ok, I keep it open here.
> >
> >
> >
> > (104)
> > > Well, but if we say punctuation records are "inherently distinct",
> > > there is nothing to be de-duplicated?
> >
> > We still need to deduplicate punctuation records even if they are
> distinct.
> > A functional deduplication acts essentially on distinct records having
> the
> > same functional id. It's the same as for input records. The records are
> > distinct,
> > but we keep only one from those having the same deduplication id.
> >
> >
> >
> > (105)
> > > even if they never use a smaller interval, because in the end,
> > > the question is always which ms is included and which one is excluded.
> >
> > Agreed, the KIP now specified that "Deduplication interval ends are
> > inclusive."
> >
> >
> >
> > > Why do you point out that `This is a duplicate of a2` -- why does it
> > matter?
> > > We did drop a2 on the floor already?
> > > The reason why we forward a3, is because it is outside of a1's
> > > deduplicationInterval, right? The fact that we did drop a2 on the
> floor,
> > > seems irrelevant?
> >
> > Right, fixed in the KIP.
> >
> >
> > Thanks for your opinions on the semantics ! If no points are raised in
> the
> > next
> > few days, I will start a voting thread.
> > We can always continue discussion in this thread after that :)
> >
> >
> > Best,
> > Ayoub
> >
> >
> > Le ven. 4 oct. 2024, 02:37, Matthias J. Sax <mj...@apache.org> a écrit :
> >
> > > Thanks Ayoub!
> > >
> > > (102) let's hear what others think.
> > >
> > > (103) Keeping it internal if fine with me. Was just curious.
> > >
> > > (104)
> > >
> > > > The offset checking was specifically added for the case of
> reprocessing
> > > > the same input record.
> > >
> > > Yes.
> > >
> > >
> > > > Punctuation records are generated by the application. And I assume
> > > > we can consider that any two punctuated records are inherently
> distinct ?
> > >
> > > I agree. And if they are "inherently distinct" we should never
> > > de-duplicate them, right?
> > >
> > >
> > > > Also, for me, we should apply deduplication on punctuated records,
> > > > because if the user defines a punctuator before calling
> deduplication,
> > > > it means they want to deduplicate them as well, otherwise they
> > > > could define it later in the pipeline.
> > >
> > > Well, but if we say punctuation records are "inherently distinct",
> there
> > > is nothing to be de-duplicated?
> > >
> > >
> > >
> > > (105)
> > >
> > > > However, I don't see practical use cases of a tiny deduplication
> > > interval.
> > >
> > > I agree, but that's not the point. We need to clearly define it, to
> > > avoid "off by one" errors and to be able to implement it (and write
> > > bound tests), and give users a change to reason about it, even if they
> > > never use a smaller interval, because in the end, the question is
> always
> > > which ms is included and which one is excluded.
> > >
> > > Assume one has a window of 1 hour, and get a record with Y ms first,
> and
> > > later Y + 1hour -- do we forward or drop the second record is an
> > > question we need to answer. We cannot leave this question open :)
> > >
> > >
> > >
> > > For the "how to handle later records", your example make sense to me,
> > > and I think it's reasonable semantics. Again: to me it's just important
> > > that the KIP is very explicit with such edge cases and that it does not
> > > leave any room for interpretation.
> > >
> > >
> > >  From the KIP:
> > >
> > > > event a3 at t+11s → This is a duplicate of a2 (i.e. within the
> > > deduplicationInterval) which has not been forwarded, we will forward
> it as
> > > if there was never an event a2
> > >
> > > Why do you point out that `This is a duplicate of a2` -- why does it
> > > matter? We did drop a2 on the floor already?
> > >
> > > The reason why we forward a3, is because it is outside of a1's
> > > deduplicationInterval, right? The fact that we did drop a2 on the
> floor,
> > > seems irrelevant?
> > >
> > >
> > >  From the KIP:
> > >
> > > > The same thing applies for deduplicating out-of-order records:
> > > >
> > > > event a1 @t=15s  → Forwarded
> > > > event a2 @t=5s → Dropped
> > > > event a3 @t=4s → Forwarded
> > >
> > > This is interesting. So you propose to also apply the
> > > "deduplicationInterval" into the past. I am ok with this. Just want to
> > > point it out, as it's an important semantic piece.
> > >
> > > Same for the examples with stream-time and different keys and state
> > > content. Love these examples. Makes it very clear what's going on. I am
> > > personally happy with these semantics (I guess I would also be happy if
> > > we would choose different semantics thought).
> > >
> > >
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > >
> > >
> > > On 9/4/24 7:49 AM, Ayoub Omari wrote:
> > > > Hi Matthias,
> > > >
> > > > Thanks for your comments !
> > > >
> > > > (100)
> > > > Yes, there will be repartitioning if a key-changing operation
> > > > was used before. Updated java docs.
> > > >
> > > >
> > > > (101)
> > > > Yes, KIP updated.
> > > >
> > > >
> > > > (102)
> > > > My idea was that a user may think of `.deduplicate()` as
> > > > a black-box which only rejects records, without changing
> > > > the stream's structure.
> > > >
> > > > I am also concerned by the auto-back-repartitioning.
> > > > If it is fine not to provide the two apis (like in groupByKey vs
> > > groupBy),
> > > > I agree to remove it.
> > > >
> > > >
> > > > (103)
> > > > I think the store should be kept internal as it will have
> > > > some internal information. For example, for the eviction algorithm
> > > > we could have a time-index which is part of the same state store
> > > > (like in DualSchema stores where the first bit of each entry
> > > > determines if the entry belongs to the base or to the index store).
> > > >
> > > > Also, it should hold the offset and timestamp of each record which
> > > > are technical information that an application may not need to know.
> > > > WDYT ?
> > > >
> > > >
> > > > (104)
> > > >
> > > > The offset checking was specifically added for the case of
> reprocessing
> > > >
> > > > the same input record.
> > > >
> > > >
> > > > Punctuation records are generated by the application. And I assume
> > > >
> > > > we can consider that any two punctuated records are inherently
> distinct ?
> > > >
> > > >
> > > > Also, for me, we should apply deduplication on punctuated records,
> > > >
> > > > because if the user defines a punctuator before calling
> deduplication,
> > > >
> > > > it means they want to deduplicate them as well, otherwise they
> > > >
> > > > could define it later in the pipeline.
> > > >
> > > >
> > > >
> > > >
> > > > (105)
> > > >
> > > > Thanks for these points ! I added more details and examples to the
> KIP.
> > > >
> > > >
> > > > Some of the examples you mentioned were buried in the text, I added
> > > >
> > > > some highlights and examples to make the semantics clearer.
> > > >
> > > >
> > > > About the lower bound of deduplicationInterval, I think it's more
> > > intuitive
> > > >
> > > > to make the interval ends inclusive, which gives a lower bound of 0.
> > > >
> > > > However, I don't see practical use cases of a tiny deduplication
> > > interval.
> > > >
> > > > Ideally, the id chosen by the user to be a deduplication id should be
> > > unique
> > > >
> > > > (at least for an extended period of time). If deduplication interval
> is
> > > > chosen
> > > >
> > > > to be some milliseconds, then I suppose it's more rate limiting than
> > > >
> > > > deduplication ?
> > > >
> > > >
> > > > The current semantics are more adapted to a longer
> > > >
> > > > dedup-interval. Small dedup-intervals make it more
> > > >
> > > > likely that an out-of-order record is a late record, in which case
> > > >
> > > > records are most likely to be forwarded without dedup.
> > > >
> > > >
> > > > As an example, if we consider a case of de-dup interval=0,
> > > >
> > > > and following events:
> > > >
> > > > K1 @ t=10ms
> > > >
> > > > K2 @ t=11ms
> > > >
> > > > K1 @ t=10ms
> > > >
> > > >
> > > > Then the current semantics would detect the second event K1 as late,
> > > >
> > > > and would forward it.
> > > >
> > > > (Note: the first event here may still exist in the store
> > > >
> > > > if eviction is not yet triggered, however it is an expired entry.
> > > >
> > > > In order to have a consistent general behavior we would simulate
> that all
> > > > entries
> > > >
> > > > should be expired once dedup-interval is elapsed, if we find an entry
> > > >
> > > > in the store which is already expired, we ignore it)
> > > >
> > > >
> > > > I am open to further discussion on this.
> > > >
> > > >
> > > >
> > > >
> > > > If we agree on these semantics, I will add a description of the
> > > processor's
> > > >
> > > > behavior on late records in java docs, which would make it clearer
> > > >
> > > > that it's more suitable to have longer dedup-intervals.
> > > >
> > > >
> > > >
> > > > Best,
> > > >
> > > > Ayoub
> > > >
> > > >
> > > >
> > > >
> > > > Le sam. 31 août 2024 à 03:50, Matthias J. Sax <mj...@apache.org> a
> > > écrit :
> > > >
> > > >> Thanks for updating the KIP.
> > > >>
> > > >> A few comments/question:
> > > >>
> > > >>
> > > >>
> > > >> (100): For `deduplicateByKey(...)` the input stream might still be
> > > >> repartitioned if the key was changed upstream, right (similar to how
> > > >> `groupByKey()` and `join()` works)?
> > > >>
> > > >>      builder.stream(...).deduplicateByKey(); // no repartitioing
> > > >>      buidler.stream(...).map(...).deduplicateByKey(); //
> repartitioning
> > > >>
> > > >> If my understanding is correct, it should be reflected in the
> JavaDocs.
> > > >>
> > > >>
> > > >>
> > > >> (101): I guess the same as (100) applies to
> > > `deduplicateByKeyValue(...)`?
> > > >>
> > > >>
> > > >>
> > > >> (102): I am not 100% sure if `deduplicate()` is the best API, and
> if we
> > > >> even need it? It seems we could express `deduplicate()` also as:
> > > >>
> > > >>       stream.map(...).deduplicateByKey(...).map(...).repartition();
> > > >>
> > > >> I am also not sure, how common it is to de-duplicate on a value
> field,
> > > >> _and_ expect/want the result stream to be still partitioned on the
> > > >> original key? I could also imagine that the semantics of
> `deduplicate()`
> > > >> would actually be the same as (and this might be more intuitive for
> > > >> users?):
> > > >>
> > > >>      stream.map(...).deduplicateByKey(...)
> > > >>
> > > >> Ie, there won't be no automatic "back partitioning" on the original
> key.
> > > >> Is there really demand for auto-back-partitioning?
> > > >>
> > > >> Atm, I tend to think that `deduplicate()` might not be required to
> get
> > > >> added at all, as it's only syntactic sugar anyway (no matter which
> > > >> semantics, w/ or w/o auto-back-partitioning we pick). [And if there
> is
> > > >> user demand down the road, it seems simple enough to add it later.]
> > > >>
> > > >>
> > > >>
> > > >> (103) `Deduplicated`: Right now it only takes a name for the state
> > > >> store, what mean, it's not possible to plug-in a different store. Is
> > > >> this intentional, or should we allow to pass in `DslStoreSuppliers`
> > > >> and/or `KeyValueBytesStoreSupplier`? We could also defer this for a
> > > >> follow up KIP.
> > > >>
> > > >>
> > > >>
> > > >> (104) Punctuation:
> > > >>
> > > >>> For punctuated records - whose offset value is null -
> deduplication is
> > > >> always performed.
> > > >>
> > > >> Why are we doing it this way? We could also say, we never
> de-duplicate
> > > >> if there is no offset (similar to null-key and/or null-id). (Or
> maybe
> > > >> even make it configurable -- of course, we can defer a config also
> for a
> > > >> follow up KIP in case there is demand.)
> > > >>
> > > >> In general, keeping stuff and letting users add additional
> filtering is
> > > >> the better default. If we filter, users have no way to "resurrect"
> the
> > > >> data, but if we keep it, adding an additional filter is possible.
> > > >> (Similar to what is propose for late record -- we forward, not
> drop).
> > > >>
> > > >>
> > > >>
> > > >> (105) Deduplication logic:
> > > >>
> > > >> I am not 100% sure if I understand the proposed logic with regard to
> > > >> time-semantics and out-of-order data.
> > > >>
> > > >> Assume there is no out-of-order data (de-dup interval 10 sec):
> > > >>
> > > >>
> > > >> Example A:
> > > >>
> > > >> k1@20 -> forwarded and inserted into store -> stream-time=20
> > > >> k1@25 -> dropped as duplicate stream-time=25
> > > >> k1@35 -> forwarded and inserted into store -> stream-time=35 (we
> did
> > > >> discard k1@20 already and thus k1@35 is not considered a duplicate)
> > > >>
> > > >> This example is clear, however, we still need to define the exact
> > > >> cut-off point, ie, what is the first record which is not dropped any
> > > >> longer? k1@29 is clearly dropped, and k1@31 would clearly be
> forwarded;
> > > >> I am not sure about k1@30 -> would it purge k1@20 already and thus
> be
> > > >> forwarded, or would it be dropped?
> > > >>
> > > >> The question boils down to if de-dup-interval of 0 is allowed (ie,
> if we
> > > >> define the upper-bound of the de-duplication interval as inclusive
> or
> > > >> exclusive)? If we allow interval size of zero (mean upper bound is
> > > >> inclusive), it means we only de-duplicate with the same timestamp
> for
> > > >> interval 0:
> > > >>
> > > >> Example B:
> > > >>
> > > >> k1@20 -> forwarded and inserted into store -> stream-time=20
> > > >> k1@20 -> dropped as duplicate stream-time=20
> > > >> k1@21 -> forwarded and inserted into store -> stream-time=21 (we
> did
> > > >> discard k1@20 already and thus k1@21 is not considered a duplicate)
> > > >>
> > > >> For this case, k1@30 in Example A above would be dropped.
> > > >>
> > > >> However, if we think interval zero should not a valid interval, and
> 1 is
> > > >> the allowed minimum (and thus, the upper bound is exclusive),
> interval 1
> > > >> says to de-duplicate with the same timestamp:
> > > >>
> > > >> Example C (same as Example B, even if interval is 1 now, due to
> > > >> different interval bound semantics):
> > > >>
> > > >> k1@20 -> forwarded and inserted into store -> stream-time=20
> > > >> k1@20 -> dropped as duplicate stream-time=20
> > > >> k1@21 -> forwarded and inserted into store -> stream-time=21 (we
> did
> > > >> discard k1@20 already and thus k1@21 is not considered a duplicate)
> > > >>
> > > >> For this case, k1@30 in Example A above would be forwarded.
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> What about out-of-order (same key):
> > > >>
> > > >> k1@20 -> forwarded and inserted into store -> stream-time=20
> > > >> k1@9 -> ?
> > > >>      I think we should forward; time diff is larger than de-dup
> interval
> > > >>      Also: record is "late" with regard to stream-time, so we should
> > > >> forward as "late"
> > > >>
> > > >> k1@11 -> ?
> > > >>      we could de-dup; we have k1@20 in the store
> > > >>      ts is within de-dup-interval
> > > >>      ts is not older than stream-time minus de-dup interval either
> (ie,
> > > >> not "late")
> > > >>
> > > >> k1@25 -> dropped as duplicate stream-time=25
> > > >> k1@23 -> dropped as duplicate stream-time=25
> > > >>
> > > >> k1@11 -> ?
> > > >>     we could de-dup; we have k1@20 in the store
> > > >>     ts is within de-dup-interval
> > > >>     ts IS OLDER than stream-time minus de-dup interval though, we
> could
> > > >> also forward as "late"
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> What about different keys? What about different keys in combination
> with
> > > >> "late" records? Overall, I could not implement the logic base on
> the KIP
> > > >> as there is too many unspecified cases.
> > > >>
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> On 8/30/24 2:08 AM, Ayoub Omari wrote:
> > > >>> Hi Bill,
> > > >>>
> > > >>> Thanks for your answer.
> > > >>>
> > > >>>> Can you update the KIP to state that deduplication by value will
> > > result
> > > >> in
> > > >>>> auto-repartiting by Kafka Streams
> > > >>>
> > > >>> Updated the KIP to the latest proposal.
> > > >>>
> > > >>>> For option 3, what about `deduplicateByKeyValue`? (If during the
> PR
> > > >> phase
> > > >>>> we stumble across a better name we can update the KIP then too).
> > > >>>
> > > >>> Yes, better than the first suggested name. Reflected in the KIP.
> > > >>>
> > > >>> I also added a note about records with null deduplication key.
> > > >>>
> > > >>> If no new points are raised in the next few days, I will start a
> VOTE
> > > >>> thread.
> > > >>>
> > > >>> Thanks,
> > > >>> Ayoub
> > > >>>
> > > >>>
> > > >>>
> > > >>>
> > > >>> Le mar. 27 août 2024 à 17:47, Bill Bejeck <bbej...@gmail.com> a
> écrit
> > > :
> > > >>>
> > > >>>> Hi Ayoub,
> > > >>>>
> > > >>>> Thanks for the update.
> > > >>>>
> > > >>>> Your latest proposal looks good to me, but I have a couple of
> > > comments.
> > > >>>>
> > > >>>> in (2) we repartition by the id field
> > > >>>>>
> > > >>>>
> > > >>>> I agree with this approach, but the KIP still states "No
> > > repartitioning
> > > >> is
> > > >>>> done by this processor. In case a user wants to deduplicate an id
> over
> > > >>>> multiple partitions, they should first repartition the topic on
> this
> > > >> id."
> > > >>>> Can you update the KIP to state that deduplication by value will
> > > result
> > > >> in
> > > >>>> auto-repartiting by Kafka Streams, unless the developer adds a
> manual
> > > >>>> repartitioning operator themselves.
> > > >>>>
> > > >>>> For option 3, what about `deduplicateByKeyValue`? (If during the
> PR
> > > >> phase
> > > >>>> we stumble across a better name we can update the KIP then too).
> > > >>>>
> > > >>>> This latest update has been open for a while now, I'd say it's
> good to
> > > >>>> start a VOTE thread in 1-2 days.
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Bill
> > > >>>>
> > > >>>> On Mon, Aug 26, 2024 at 3:15 AM Ayoub Omari <
> ayoubomar...@gmail.com>
> > > >>>> wrote:
> > > >>>>
> > > >>>>> Hi Bill,
> > > >>>>>
> > > >>>>> I think all the points raised are now handled in the KIP.
> > > >>>>> We just still have the 106) on which we should have an
> > > >>>>> agreement. I gave a proposition in my last mail, and
> > > >>>>> I am waiting for others' opinions.
> > > >>>>>
> > > >>>>> My proposition is to have 3 methods of deduplication:
> > > >>>>> 1) `deduplicateByKey()`
> > > >>>>> 2) `deduplicate((k, v) -> v.id)`
> > > >>>>> 3) `deduplicateByKeyAndId((k, v) -> v.id)`  (Name to be chosen)
> > > >>>>>
> > > >>>>> The point of having both (2) and (3) is that in (2) we
> repartition
> > > >>>>> by the id field, Whereas in (3) we are not repartitioning.
> > > >>>>>
> > > >>>>> (3) is needed in case the current partitioning ensures that a
> same
> > > >>>>> id will always land on the same task. Here is an example I gave
> > > above:
> > > >>>>>
> > > >>>>>> For example, we have a topic with keyValue (`userId`,
> `transaction`)
> > > >>>>>> and deduplication is done on `transaction`.`id` . Here, the
> > > >> application
> > > >>>>>> wants to deduplicate transactions. It knows that a transaction
> id
> > > >>>>>> maps to a single userId. Any duplicate of that record would be
> > > >> received
> > > >>>>>> by the task which processes this userId.
> > > >>>>>
> > > >>>>> Let me know what you think.
> > > >>>>>
> > > >>>>> Thanks,
> > > >>>>> Ayoub
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> Le jeu. 22 août 2024 à 21:37, Bill Bejeck <bbej...@apache.org> a
> > > >> écrit :
> > > >>>>>
> > > >>>>>> Hi Ayoub,
> > > >>>>>>
> > > >>>>>> What's the status of this KIP?
> > > >>>>>>
> > > >>>>>> Regards,
> > > >>>>>> Bill
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Sat, Jul 13, 2024 at 5:13 AM Ayoub Omari <
> ayoubomar...@gmail.com
> > > >
> > > >>>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi Bill,
> > > >>>>>>>
> > > >>>>>>> Thanks for your comments !
> > > >>>>>>>
> > > >>>>>>> I am prefixing your points with B.
> > > >>>>>>>
> > > >>>>>>> B1.
> > > >>>>>>> That's a good catch. I think punctuated records aren't
> concerned by
> > > >>>>>>> this problem of reprocessing because they are not issued twice.
> > > >>>>>>> We will still need to make a difference between a punctuated
> > > >>>>>>> and a source record. The logic becomes:
> > > >>>>>>>
> > > >>>>>>>> if record.offset is not null && record.offset = offset_stored:
> > > >>>>>>>
> > > >>>>>>>        forward(record)
> > > >>>>>>>
> > > >>>>>>> else:
> > > >>>>>>>
> > > >>>>>>>       // do_not_forward
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> I will update the KIP on this point.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> B2. (i.e. 106)
> > > >>>>>>> That's the point discussed in 106. The main idea is to avoid
> > > >>>>>> repartitioning
> > > >>>>>>> when we don't have to. But let's say we want to repartition
> when
> > > >>>>>>> deduplication is on an id that is not the key.
> > > >>>>>>>
> > > >>>>>>> The user code may look something like this:
> > > >>>>>>>    ```kstream
> > > >>>>>>>        .deduplicate((k, v) -> v.id)
> > > >>>>>>>        .map((k, v) -> ...)
> > > >>>>>>>     ```
> > > >>>>>>> The user expects that the key in the map processor which
> follows
> > > the
> > > >>>>>>> deduplicate is the same as the initial key.
> > > >>>>>>>
> > > >>>>>>> Internally, if we decide to repartition the deduplication
> processor
> > > >>>> may
> > > >>>>>> do
> > > >>>>>>> the following:
> > > >>>>>>> ```kstream
> > > >>>>>>>        .map((k, v) -> (v.id, ...)) // For repartitioning on id
> > > >>>>>>>        .deduplicateByKey()
> > > >>>>>>>        .map((id, v) -> (initialKey, ...)) // To recover the
> initial
> > > >> key
> > > >>>>>>>     ```
> > > >>>>>>>
> > > >>>>>>> In this case, there is some internal complexity of this
> processor:
> > > >>>>>>>       I) We should repartition twice, before and after
> > > deduplication.
> > > >>>> The
> > > >>>>>>> second
> > > >>>>>>> map triggers repartitioning if there are following stateful
> > > >>>> processors
> > > >>>>>>> in the pipeline.
> > > >>>>>>>
> > > >>>>>>>       II.) The initial key should be kept somewhere. One way
> is to
> > > >> wrap
> > > >>>>> the
> > > >>>>>>> value
> > > >>>>>>> to add the key, and then unwrap it after the `deduplicateByKey`
> > > >>>>>>>
> > > >>>>>>> As mentioned in one use case above (deduplicating transactions
> -
> > > >>>>> written
> > > >>>>>> at
> > > >>>>>>> least
> > > >>>>>>> once - by id), repartition is not needed for that case. This
> added
> > > >>>>>>> complexity won't
> > > >>>>>>> be useful.
> > > >>>>>>>
> > > >>>>>>> *TLDR,* I think one solution is to have a separate api to cover
> > > such
> > > >>>> a
> > > >>>>>> use
> > > >>>>>>> case. In total,
> > > >>>>>>> we can have 3 apis of deduplication
> > > >>>>>>>
> > > >>>>>>> 1) `deduplicateByKey()`  *// No repartitioning*
> > > >>>>>>> 2) `deduplicateByKey((k, v) -> v.id)`  *or
> > > >>>> *`deduplicateByKeyAndId((k,
> > > >>>>>> v)
> > > >>>>>>> -> v.id)`  *// No repartitioning*
> > > >>>>>>> 3) `deduplicate((k, v) -> v.id)`  *// causes repartitioning*
> > > >>>>>>>
> > > >>>>>>> The second api is equivalent to deduplicate by the couple (key,
> > > id).
> > > >>>>>>> No repartitioning is required because the records with the
> same key
> > > >>>> are
> > > >>>>>>> already
> > > >>>>>>> together in the same partition. cc @Matthias @Sebastien.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> B3.
> > > >>>>>>> I think here we can do what some existing processors do for
> > > periodic
> > > >>>>>> work,
> > > >>>>>>> we can track the last cleanup time and the current stream time,
> > > >>>>>>> if the delta exceeds the deduplication period, we trigger the
> > > >>>> cleanup.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Best,
> > > >>>>>>> Ayoub
> > > >>>>>>>
> > > >>>>>>> Le jeu. 11 juil. 2024 à 01:17, Bill Bejeck <bbej...@apache.org>
> a
> > > >>>>> écrit
> > > >>>>>> :
> > > >>>>>>>
> > > >>>>>>>> Hi All,
> > > >>>>>>>>
> > > >>>>>>>> Thanks Ayoub for getting this KIP going again, I think a
> > > >>>>> deduplication
> > > >>>>>>>> operator will be very useful.  My applogies for being late to
> the
> > > >>>>>>>> discussion.
> > > >>>>>>>>
> > > >>>>>>>> Overall - I agree with the direction of the KIP but I have a
> > > couple
> > > >>>>> of
> > > >>>>>>>> questions.
> > > >>>>>>>>
> > > >>>>>>>>    1.  Regarding using offsets for tracking the first
> arrival.  I
> > > >>>> think
> > > >>>>>>>> there could be a case when offsets would not be available,
> when
> > > >>>>> records
> > > >>>>>>> are
> > > >>>>>>>> forwarded by punctuation for example.
> > > >>>>>>>>
> > > >>>>>>>> 2. I'm not sure about using something other than the key for
> > > >>>>>> identifying
> > > >>>>>>>> dupllicate records.  By doing so, one could end up missing a
> > > >>>>>>> de-duplication
> > > >>>>>>>> due to records with the same id characteristic in the value
> but
> > > >>>>> having
> > > >>>>>>>> different keys so the records land on different partitions.  I
> > > >>>> guess
> > > >>>>> we
> > > >>>>>>>> could enforce a repartition if one choses to use a
> de-duplication
> > > >>>> id
> > > >>>>>>> other
> > > >>>>>>>> than the key.
> > > >>>>>>>>
> > > >>>>>>>> 3. I think a punctuation scheduled to run at the
> de-duplication
> > > >>>>> period
> > > >>>>>> to
> > > >>>>>>>> clean out older records would be a clean approach for purging
> > > older
> > > >>>>>>>> records.  I'm not taking a hard stance on this approach and
> I'm
> > > >>>>> willing
> > > >>>>>>> to
> > > >>>>>>>> discuss different methods.
> > > >>>>>>>>
> > > >>>>>>>> Thanks,
> > > >>>>>>>>
> > > >>>>>>>> Bill
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> On 2024/06/25 18:44:06 Ayoub Omari wrote:
> > > >>>>>>>>> Hi Matthias,
> > > >>>>>>>>>
> > > >>>>>>>>> Here are my updates on your points.
> > > >>>>>>>>>
> > > >>>>>>>>> 101.
> > > >>>>>>>>>> You propose to add static methods `keySerde()` and
> > > >>>> `valueSerde()`
> > > >>>>>> --
> > > >>>>>>>>>> in other config classes, we use only `with(keySerde,
> > > >>>> valueSerde)`
> > > >>>>>> as
> > > >>>>>>> we
> > > >>>>>>>>> try
> > > >>>>>>>>>> to use the "builder" pattern, and avoid too many overloads.
> I
> > > >>>>> would
> > > >>>>>>>>>> prefer to omit both methods you suggest and just use a
> single
> > > >>>>>> `with`
> > > >>>>>>>> for
> > > >>>>>>>>>> both serdes.
> > > >>>>>>>>>
> > > >>>>>>>>> I was actually inspired by the other config classes, for
> example
> > > >>>>>>> `Joined`
> > > >>>>>>>>> and
> > > >>>>>>>>> `Grouped` both have the static methods `keySerde()` and
> > > >>>>>> `valueSerde()`.
> > > >>>>>>>>>
> > > >>>>>>>>>> I think we don't want to add `with(...)` which takes all
> > > >>>>>>>>>> parameters at once
> > > >>>>>>>>>
> > > >>>>>>>>> Done.
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> 102.
> > > >>>>>>>>> Thanks, your suggestion sounds good to me. The trade-off of
> > > >>>> having
> > > >>>>> an
> > > >>>>>>>> index
> > > >>>>>>>>> that allows to efficiently purge expired records besides the
> > > >>>>> keyValue
> > > >>>>>>>> store
> > > >>>>>>>>> makes sense. I've been looking into the code, and I think a
> > > >>>> similar
> > > >>>>>>> idea
> > > >>>>>>>>> was implemented for other processors (for example with
> > > >>>>>>>>> DualSchemaRocksDBSegmentedBytesStore).
> > > >>>>>>>>> As you said, I think we would benefit from some existing code
> > > >>>> here.
> > > >>>>>>>>> KIP updated !
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> 104.
> > > >>>>>>>>> Updated the KIP to consider records' offsets.
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> 105
> > > >>>>>>>>>> picking the first offset with smallest ts sounds good to me.
> > > >>>> The
> > > >>>>>> KIP
> > > >>>>>>>>>> should be explicit about it
> > > >>>>>>>>>
> > > >>>>>>>>> Done.
> > > >>>>>>>>>
> > > >>>>>>>>>> But as discussed above, it might be
> > > >>>>>>>>>> simplest to not really have a window lookup, but just a
> plain
> > > >>>>>>>> key-lookup
> > > >>>>>>>>>> and drop if the key exists in the store?
> > > >>>>>>>>>
> > > >>>>>>>>> KIP updated, we will be `.get()`ing from a keyValueStore
> instead
> > > >>>> of
> > > >>>>>>>>> `.fetch()`ing
> > > >>>>>>>>> from a WindowStore.
> > > >>>>>>>>>
> > > >>>>>>>>>> Another line of thinking, that did serve us well in the
> past:
> > > >>>> in
> > > >>>>>>> doubt
> > > >>>>>>>>>> keep a record -- users can add operators to drop record (in
> > > >>>> case
> > > >>>>>> they
> > > >>>>>>>>>> don't want to keep it), but if we drop a record, users have
> no
> > > >>>>> way
> > > >>>>>> to
> > > >>>>>>>>>> resurrect it (thus, building a workaround to change
> semantica
> > > >>>> is
> > > >>>>>>>>>> possible for users if we default to keep records, but not
> the
> > > >>>>> other
> > > >>>>>>> way
> > > >>>>>>>>>> around).
> > > >>>>>>>>>
> > > >>>>>>>>> Makes total sense ! I updated the KIP to forward late records
> > > >>>>> instead
> > > >>>>>>> of
> > > >>>>>>>>> dropping them.
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> 106.
> > > >>>>>>>>> For the moment, I highlighted in Javadocs that we are
> > > >>>> deduplicating
> > > >>>>>> by
> > > >>>>>>>>> partition. If there is a better name to have this
> information in
> > > >>>>> the
> > > >>>>>>> name
> > > >>>>>>>>> of the api itself it would be good.
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> Best,
> > > >>>>>>>>> Ayoub
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> Le jeu. 13 juin 2024 à 09:03, Sebastien Viale <
> > > >>>>>>>> sebastien.vi...@michelin.com>
> > > >>>>>>>>> a écrit :
> > > >>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> Hi,
> > > >>>>>>>>>>
> > > >>>>>>>>>> 106 :
> > > >>>>>>>>>>
> > > >>>>>>>>>> Thanks for the clarification. Actually, this is not what I
> > > >>>>>> expected,
> > > >>>>>>>> but I
> > > >>>>>>>>>> better understand the performance issues regarding the state
> > > >>>>> store
> > > >>>>>>>>>> iteration.
> > > >>>>>>>>>> If this is how it should be designed, it is fine for me as
> long
> > > >>>>> as
> > > >>>>>> it
> > > >>>>>>>> is
> > > >>>>>>>>>> clear that the repartition must be done before the
> > > >>>> deduplication.
> > > >>>>>>>>>> Sébastien
> > > >>>>>>>>>>
> > > >>>>>>>>>> ________________________________
> > > >>>>>>>>>> De : Matthias J. Sax <mj...@apache.org>
> > > >>>>>>>>>> Envoyé : jeudi 13 juin 2024 02:51
> > > >>>>>>>>>> À : dev@kafka.apache.org <dev@kafka.apache.org>
> > > >>>>>>>>>> Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication
> > > >>>> processor
> > > >>>>> in
> > > >>>>>>>>>> kafka-streams
> > > >>>>>>>>>>
> > > >>>>>>>>>> Warning External sender Do not click on any links or open
> any
> > > >>>>>>>> attachments
> > > >>>>>>>>>> unless you trust the sender and know the content is safe.
> > > >>>>>>>>>>
> > > >>>>>>>>>> 106:
> > > >>>>>>>>>>
> > > >>>>>>>>>>> For the use-case of deduplicating a "at least once written"
> > > >>>>>> stream,
> > > >>>>>>>>>>> we are sure that the duplicate record has the same key as
> the
> > > >>>>>>>>>>> original one, and will land on the same task. Here, a user
> > > >>>>> would
> > > >>>>>>>>>>> want to specify a deduplication key different from the
> > > >>>> topic's
> > > >>>>>> key
> > > >>>>>>>>>>> in case the topic's key is not a unique identifier
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> For example, we have a topic with keyValue (`userId`,
> > > >>>>>>> `transaction`)
> > > >>>>>>>>>>> and deduplication is done on `transaction`.`id` . Here, the
> > > >>>>>>>> application
> > > >>>>>>>>>>> wants to deduplicate transactions. It knows that a
> > > >>>> transaction
> > > >>>>> id
> > > >>>>>>>>>>> maps to a single userId. Any duplicate of that record would
> > > >>>> be
> > > >>>>>>>> received
> > > >>>>>>>>>>> by the task which processes this userId.
> > > >>>>>>>>>>
> > > >>>>>>>>>> This is an interesting point.
> > > >>>>>>>>>>
> > > >>>>>>>>>> My concern is to some extend, that it seems (on the
> surface) to
> > > >>>>> not
> > > >>>>>>>>>> follow the established pattern of auto-repartitioning in the
> > > >>>> DSL.
> > > >>>>>> Of
> > > >>>>>>>>>> course, given that the current proposal says we use an "id
> > > >>>>>> extractor"
> > > >>>>>>>>>> and not a "key extractor" it might be ok (but it might be
> > > >>>>> somewhat
> > > >>>>>>>>>> subtle). Of course, JavaDocs always help to explain in
> detail.
> > > >>>>>> Would
> > > >>>>>>>>>> this be enough?
> > > >>>>>>>>>>
> > > >>>>>>>>>> Would be good to hear from others about this point. I am
> > > >>>>> personally
> > > >>>>>>> not
> > > >>>>>>>>>> sure which approach I would prefer personally at this point.
> > > >>>>>>>>>>
> > > >>>>>>>>>> The problem reminds me on
> > > >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10844<
> > > >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10844> which is
> > > >>>> not
> > > >>>>>>>> resolve
> > > >>>>>>>>>> directly either. We do have KIP-759
> > > >>>>>>>>>> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-759:
> > > >>>>>> Unneeded
> > > >>>>>>>>>> repartition canceling<
> > > >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759:
> > > >>>>>> Unneeded
> > > >>>>>>>>>> repartition canceling>)
> > > >>>>>>>>>> which is WIP and helps with KAFKA-10844, but not sure if it
> > > >>>> would
> > > >>>>>> be
> > > >>>>>>> a
> > > >>>>>>>>>> viable solution for the de-duplication case?
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> -Matthias
> > > >>>>>>>>>>
> > > >>>>>>>>>> This email was screened for spam and malicious content but
> > > >>>>> exercise
> > > >>>>>>>>>> caution anyway.
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> On 6/11/24 2:31 PM, Ayoub Omari wrote:
> > > >>>>>>>>>>> Hi Sebastien & Matthias,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> For 106.
> > > >>>>>>>>>>> My idea was to deduplicate on a per-task basis. If the user
> > > >>>>> wants
> > > >>>>>>>>>>> to do a global deduplication over all partitions, I think
> > > >>>> it's
> > > >>>>>>>> better to
> > > >>>>>>>>>>> have him explicitly repartition and then call the
> > > >>>> deduplication
> > > >>>>>>>>>> processor.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> For the use-case of deduplicating a "at least once written"
> > > >>>>>> stream,
> > > >>>>>>>>>>> we are sure that the duplicate record has the same key as
> the
> > > >>>>>>>>>>> original one, and will land on the same task. Here, a user
> > > >>>>> would
> > > >>>>>>>>>>> want to specify a deduplication key different from the
> > > >>>> topic's
> > > >>>>>> key
> > > >>>>>>>>>>> in case the topic's key is not a unique identifier.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> For example, we have a topic with keyValue (`userId`,
> > > >>>>>>> `transaction`)
> > > >>>>>>>>>>> and deduplication is done on `transaction`.`id` . Here, the
> > > >>>>>>>> application
> > > >>>>>>>>>>> wants to deduplicate transactions. It knows that a
> > > >>>> transaction
> > > >>>>> id
> > > >>>>>>>>>>> maps to a single userId. Any duplicate of that record would
> > > >>>> be
> > > >>>>>>>> received
> > > >>>>>>>>>>> by the task which processes this userId.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> One other thought I have when writing the KIP about global
> > > >>>>>>>> deduplication,
> > > >>>>>>>>>>> is that it will require to map twice the key of the stream
> > > >>>>> (first
> > > >>>>>>>> map to
> > > >>>>>>>>>>> change the key to deduplication key, and second map to get
> > > >>>>>>>>>>> back the initial key). Second map may imply a second
> > > >>>>>>> repartitioning.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> However, if we do a per-task deduplication, the user may
> > > >>>> adapt
> > > >>>>> to
> > > >>>>>>> his
> > > >>>>>>>>>>> specific use-case.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Let me know what you think
> > > >>>>>>>>>>> Ayoub
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Le mar. 11 juin 2024 à 20:21, Matthias J. Sax <
> > > >>>>> mj...@apache.org>
> > > >>>>>> a
> > > >>>>>>>>>> écrit :
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> Thanks Sebastien,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> that's a good point. Thanks for raising it. -- I like your
> > > >>>>>>> proposal.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> An alternative would be to have two overloads of
> > > >>>>> `deduplicate()`
> > > >>>>>>>> one w/
> > > >>>>>>>>>>>> and one w/o the "id extractor" parameter. This would be
> less
> > > >>>>>>>> explicit
> > > >>>>>>>>>>>> though.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> -Matthias
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> On 6/11/24 2:30 AM, Sebastien Viale wrote:
> > > >>>>>>>>>>>>> Hi,
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> I am really interested in this KIP.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 106:
> > > >>>>>>>>>>>>> I hope I am not talking nonsense, but if you do not
> > > >>>>> deduplicate
> > > >>>>>>>> based
> > > >>>>>>>>>> on
> > > >>>>>>>>>>>> the key, the input stream has to be repartitioned.
> > > >>>>>>>>>>>>> Otherwise, different stream tasks may handle records that
> > > >>>>> need
> > > >>>>>> to
> > > >>>>>>>> be
> > > >>>>>>>>>>>> deduplicated, and thus duplicates will not be detected.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> This is why I would have created two different methods,
> as
> > > >>>> is
> > > >>>>>>> done
> > > >>>>>>>> for
> > > >>>>>>>>>>>> GroupBy:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> deduplicateByKey(...)
> > > >>>>>>>>>>>>> deduplicate(...)
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> If deduplicateByKey is used, the input stream does not
> need
> > > >>>>> to
> > > >>>>>> be
> > > >>>>>>>>>>>> repartitioned.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> thanks
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Sébastien
> > > >>>>>>>>>>>>> ________________________________
> > > >>>>>>>>>>>>> De : Matthias J. Sax <mj...@apache.org>
> > > >>>>>>>>>>>>> Envoyé : mardi 11 juin 2024 01:54
> > > >>>>>>>>>>>>> À : dev@kafka.apache.org <dev@kafka.apache.org>
> > > >>>>>>>>>>>>> Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication
> > > >>>>>> processor
> > > >>>>>>> in
> > > >>>>>>>>>>>> kafka-streams
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Warning External sender Do not click on any links or open
> > > >>>> any
> > > >>>>>>>>>>>> attachments unless you trust the sender and know the
> content
> > > >>>>> is
> > > >>>>>>>> safe.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Thanks for the update Ayoub.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 101: you say:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> But I am not sure if we don't want to have them for this
> > > >>>>>>>> processor ?
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> What is your reasoning to move off the established
> pattern?
> > > >>>>>> Would
> > > >>>>>>>> be
> > > >>>>>>>>>>>>> good to understand, why `Deduplicated` class needs a
> > > >>>>> different
> > > >>>>>>>>>>>>> "structure" compared to existing classes.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 102: Creating iterators is very expensive. For other
> work,
> > > >>>> we
> > > >>>>>>>> actually
> > > >>>>>>>>>>>>> hit 100x (?) throughput degradation by creating an (for
> > > >>>> most
> > > >>>>>>> cases
> > > >>>>>>>>>>>>> empty) iterator for every input record, and needed to
> find
> > > >>>>>> other
> > > >>>>>>>> ways
> > > >>>>>>>>>> to
> > > >>>>>>>>>>>>> avoid creating an iterator per record. It really kills
> > > >>>>>>> performance.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> I see the point about data expiration. We could
> experiment
> > > >>>>> with
> > > >>>>>>>>>>>>> punctuation to expire old data, or add a second
> > > >>>> "time-ordered
> > > >>>>>>>> store"
> > > >>>>>>>>>>>>> (which we already have at hand) which acts as an index
> into
> > > >>>>> the
> > > >>>>>>>> main
> > > >>>>>>>>>>>>> store. -- Another possibility would be to add a new
> version
> > > >>>>> of
> > > >>>>>>>>>> segmented
> > > >>>>>>>>>>>>> store with a different key-layout (ie, just store the
> plain
> > > >>>>>>> key). I
> > > >>>>>>>>>>>>> think with some refactoring, we might be able to re-use a
> > > >>>> lot
> > > >>>>>> of
> > > >>>>>>>>>>>>> existing code.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 104:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> This gets me wondering if this is a limitation of
> stateful
> > > >>>>>>>> processors
> > > >>>>>>>>>>>>>> in ALOS. For example a windowed aggregation with
> > > >>>>>>> `on_window_close`
> > > >>>>>>>>>>>>>> emit strategy may have the same limitation today (we
> > > >>>>> receive a
> > > >>>>>>>> record,
> > > >>>>>>>>>>>>>> we update its aggregation result in the store, then
> crash
> > > >>>>>> before
> > > >>>>>>>>>>>> committing,
> > > >>>>>>>>>>>>>> then the record will be again reconsidered for
> > > >>>> aggregation).
> > > >>>>>> Is
> > > >>>>>>>> this
> > > >>>>>>>>>>>>>> correct ?
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Yes, this is correct, but it does not violate ALOS,
> because
> > > >>>>> we
> > > >>>>>>> did
> > > >>>>>>>> not
> > > >>>>>>>>>>>>> lose the input record -- of course, the aggregation would
> > > >>>>>> contain
> > > >>>>>>>> the
> > > >>>>>>>>>>>>> input record twice (eg, over count), but this is ok under
> > > >>>>> ALOS.
> > > >>>>>>>>>>>>> Unfortunately, for de-duplication this pattern breaks,
> > > >>>>> because
> > > >>>>>>>>>>>>> de-duplication operator does a different "aggregation
> > > >>>> logic"
> > > >>>>>>>> depending
> > > >>>>>>>>>>>>> on its state (emit if no key found, but not emit if key
> > > >>>>> found).
> > > >>>>>>> For
> > > >>>>>>>>>>>>> counting as an example, we increment the count and emit
> > > >>>>>>>> unconditionally
> > > >>>>>>>>>>>>> though.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> As a workaround, I think storing the record's offset
> > > >>>> inside
> > > >>>>>> the
> > > >>>>>>>>>>>>>> store's value can tell us whether the record has been
> > > >>>>> already
> > > >>>>>>>> seen or
> > > >>>>>>>>>>>> not.
> > > >>>>>>>>>>>>>> If we receive a record whose deduplication id exists in
> > > >>>> the
> > > >>>>>>> store
> > > >>>>>>>>>>>>>> and the entry in the store has the same offset, it means
> > > >>>> the
> > > >>>>>>>> record
> > > >>>>>>>>>>>>>> is processed twice and we can go ahead and forward it.
> If
> > > >>>>> the
> > > >>>>>>>> offset
> > > >>>>>>>>>>>>>> is different, it means it's a duplicate record, so we
> > > >>>> ignore
> > > >>>>>> it.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Great idea. This might work... If we store the input
> record
> > > >>>>>>>> offset, we
> > > >>>>>>>>>>>>> can actually avoid that the "aggregation logic" changes
> for
> > > >>>>> the
> > > >>>>>>>> same
> > > >>>>>>>>>>>>> input record. -- And yes, with ALOS potentially emitting
> a
> > > >>>>>>>> duplicate is
> > > >>>>>>>>>>>>> the-name-of-the-game, so no concerns on this part from my
> > > >>>>> side.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 105: picking the first offset with smallest ts sound good
> > > >>>> to
> > > >>>>>> me.
> > > >>>>>>>> The
> > > >>>>>>>>>> KIP
> > > >>>>>>>>>>>>> should be explicit about it. But as discussed above, it
> > > >>>> might
> > > >>>>>> be
> > > >>>>>>>>>>>>> simplest to not really have a window lookup, but just a
> > > >>>> plain
> > > >>>>>>>>>> key-lookup
> > > >>>>>>>>>>>>> and drop if the key exists in the store? -- For late
> > > >>>> records,
> > > >>>>>> it
> > > >>>>>>>> might
> > > >>>>>>>>>>>>> imply that they are not de-duplicated, but this is also
> the
> > > >>>>>> case
> > > >>>>>>>> for
> > > >>>>>>>>>>>>> in-order records if they are further apart than the
> > > >>>>>>> de-duplication
> > > >>>>>>>>>>>>> window size, right? Thus I would believe this is "more
> > > >>>>> natural"
> > > >>>>>>>>>> compared
> > > >>>>>>>>>>>>> to discarding late records pro-actively, which would lead
> > > >>>> to
> > > >>>>>>>> missing
> > > >>>>>>>>>>>>> result records?
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> We could also make this configurable if we are not sure
> > > >>>> what
> > > >>>>>>> users
> > > >>>>>>>>>>>>> really need -- or add such a configuration later in case
> > > >>>> the
> > > >>>>>>>> semantics
> > > >>>>>>>>>>>>> we pick don't work for some users.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Another line of thinking, that did serve us well in the
> > > >>>> past:
> > > >>>>>> in
> > > >>>>>>>> doubt
> > > >>>>>>>>>>>>> keep a record -- users can add operators to drop record
> (in
> > > >>>>>> case
> > > >>>>>>>> they
> > > >>>>>>>>>>>>> don't want to keep it), but if we drop a record, users
> have
> > > >>>>> no
> > > >>>>>>> way
> > > >>>>>>>> to
> > > >>>>>>>>>>>>> resurrect it (thus, building a workaround to change
> > > >>>> semantica
> > > >>>>>> is
> > > >>>>>>>>>>>>> possible for users if we default to keep records, but not
> > > >>>> the
> > > >>>>>>>> other way
> > > >>>>>>>>>>>>> around).
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Would be good to get input from the broader community on
> > > >>>> this
> > > >>>>>>>> question
> > > >>>>>>>>>>>>> thought. In the end, it must be a use-case driven
> decision?
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> -Matthias
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> This email was screened for spam and malicious content
> but
> > > >>>>>>> exercise
> > > >>>>>>>>>>>> caution anyway.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> On 6/6/24 5:02 AM, Ayoub Omari wrote:
> > > >>>>>>>>>>>>>> Hi Matthias,
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Thank you for your review !
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> 100.
> > > >>>>>>>>>>>>>> I agree. I changed the name of the parameter to
> > > >>>>> "idSelector".
> > > >>>>>>>>>>>>>> Because this id may be computed, It is better to call it
> > > >>>>> "id"
> > > >>>>>>>> rather
> > > >>>>>>>>>>>> than
> > > >>>>>>>>>>>>>> field or attribute.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> 101.
> > > >>>>>>>>>>>>>> The reason I added the methods `keySerde()` and
> > > >>>>> `valueSerde()`
> > > >>>>>>>> was to
> > > >>>>>>>>>>>>>> have the same capabilities as other serde classes (such
> as
> > > >>>>>>> Grouped
> > > >>>>>>>>>>>>>> or Joined). As a Kafka-streams user, I usually use
> > > >>>>>>> `with(keySerde,
> > > >>>>>>>>>>>>>> valueSerde)`
> > > >>>>>>>>>>>>>> as you suggested. But I am not sure if we don't want to
> > > >>>> have
> > > >>>>>>> them
> > > >>>>>>>> for
> > > >>>>>>>>>>>> this
> > > >>>>>>>>>>>>>> processor ?
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> 102.
> > > >>>>>>>>>>>>>> That's a good point ! Because we know that the window
> > > >>>> store
> > > >>>>>> will
> > > >>>>>>>>>> contain
> > > >>>>>>>>>>>>>> at most one instance of a given key, I am not sure how
> the
> > > >>>>>> range
> > > >>>>>>>> fetch
> > > >>>>>>>>>>>>>> on WindowStore compares to a KeyValueStore `get()` in
> this
> > > >>>>>> case.
> > > >>>>>>>>>>>>>> Wondering if the fact that the record's key is the
> prefix
> > > >>>> of
> > > >>>>>> the
> > > >>>>>>>>>>>> underlying
> > > >>>>>>>>>>>>>> keyValueStore's key ("<dataKey,ts>") may provide
> > > >>>> comparable
> > > >>>>>>>>>> performance
> > > >>>>>>>>>>>>>> to the random access of KeyValueStore ? Of course, the
> > > >>>>>>> WindowStore
> > > >>>>>>>>>>>> fetch()
> > > >>>>>>>>>>>>>> would be less efficient because it may fetch from more
> > > >>>> than
> > > >>>>> 1
> > > >>>>>>>> segment,
> > > >>>>>>>>>>>> and
> > > >>>>>>>>>>>>>> because of some iterator overhead.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> The purpose of using a WindowStore is to automatically
> > > >>>> purge
> > > >>>>>> old
> > > >>>>>>>> data.
> > > >>>>>>>>>>>>>> For example, deduplicating a topic written at least once
> > > >>>>>>> wouldn't
> > > >>>>>>>>>>>> require
> > > >>>>>>>>>>>>>> keeping a large history. This is not the case of using a
> > > >>>>>>>> KeyValueStore
> > > >>>>>>>>>>>>>> which would require scanning regularly to remove expired
> > > >>>>>>> records.
> > > >>>>>>>>>>>>>> That might cause a sudden increase of latency whenever
> the
> > > >>>>>>> cleanup
> > > >>>>>>>>>>>>>> is triggered.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> It would be good to hear from anyone who has done some
> > > >>>>>> analysis
> > > >>>>>>>>>>>>>> on RocksDB's range fetch.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> 103.
> > > >>>>>>>>>>>>>> Sure, I can update it once we agree on underlying
> > > >>>> semantics.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> 104.
> > > >>>>>>>>>>>>>> Another good point !
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> In the end, de-duplication does only make sense when
> EOS
> > > >>>> is
> > > >>>>>>> used
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> I agree with that. And for me, the use case of
> > > >>>>> deduplicating a
> > > >>>>>>>> topic
> > > >>>>>>>>>>>>>> written ALOS inside an EOS application might be the top
> 1
> > > >>>>> use
> > > >>>>>>> case
> > > >>>>>>>>>>>>>> of deduplication.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> all downstream processing happens, `context.forward()`
> > > >>>>>> returns
> > > >>>>>>>>>>>>>>> and we update the state store, we could now crash w/o
> > > >>>>>>> committing
> > > >>>>>>>>>>>> offsets
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> This gets me wondering if this is a limitation of
> stateful
> > > >>>>>>>> processors
> > > >>>>>>>>>>>>>> in ALOS. For example a windowed aggregation with
> > > >>>>>>> `on_window_close`
> > > >>>>>>>>>>>>>> emit strategy may have the same limitation today (we
> > > >>>>> receive a
> > > >>>>>>>> record,
> > > >>>>>>>>>>>>>> we update its aggregation result in the store, then
> crash
> > > >>>>>> before
> > > >>>>>>>>>>>> committing,
> > > >>>>>>>>>>>>>> then the record will be again reconsidered for
> > > >>>> aggregation).
> > > >>>>>> Is
> > > >>>>>>>> this
> > > >>>>>>>>>>>>>> correct ?
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> As a workaround, I think storing the record's offset
> > > >>>> inside
> > > >>>>>> the
> > > >>>>>>>>>>>>>> store's value can tell us whether the record has been
> > > >>>>> already
> > > >>>>>>>> seen or
> > > >>>>>>>>>>>> not.
> > > >>>>>>>>>>>>>> If we receive a record whose deduplication id exists in
> > > >>>> the
> > > >>>>>>> store
> > > >>>>>>>>>>>>>> and the entry in the store has the same offset, it means
> > > >>>> the
> > > >>>>>>>> record
> > > >>>>>>>>>>>>>> is processed twice and we can go ahead and forward it.
> If
> > > >>>>> the
> > > >>>>>>>> offset
> > > >>>>>>>>>>>>>> is different, it means it's a duplicate record, so we
> > > >>>> ignore
> > > >>>>>> it.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> As you said, we don't have any guarantee whether the
> > > >>>> initial
> > > >>>>>>>> record
> > > >>>>>>>>>> was
> > > >>>>>>>>>>>>>> forwarded or not in case of a crash before commit. In
> this
> > > >>>>>>>> solution
> > > >>>>>>>>>>>>>> we would forward the record twice, which is against
> > > >>>>>>> deduplication.
> > > >>>>>>>>>>>>>> But, this is still an ALOS application, so it has the
> same
> > > >>>>>>>> semantics
> > > >>>>>>>>>>>>>> as any other such application. With this, I am not sure
> we
> > > >>>>> can
> > > >>>>>>>>>>>>>> have "strict" deduplication for ALOS applications.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> 105.
> > > >>>>>>>>>>>>>> For me, if there are two duplicate records, it means
> they
> > > >>>>> are
> > > >>>>>>>>>>>>>> the same in the application's point of view, so it can
> > > >>>>> choose
> > > >>>>>>>>>>>>>> either one. Thus, I would go with forwarding the record
> > > >>>> with
> > > >>>>>>>>>>>>>> the least offset.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Would it not be desired to drop all duplicates
> > > >>>> independent
> > > >>>>>>>>>>>>>>> of their ts, as long as we find a record in the store?
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> This is actually related to the (suggested) windowed
> > > >>>> nature
> > > >>>>>>>>>>>>>> of deduplication. As in 102. we don't want to do a
> > > >>>> "forever"
> > > >>>>>>>>>>>>>> deduplication, which may be impossible for huge
> workloads
> > > >>>>>>>>>>>>>> where all records should be kept in the store. Hence,
> the
> > > >>>>>> fetch
> > > >>>>>>>>>>>>>> of timestamp between [ts-deduplicationInterval,
> > > >>>>>>>>>>>> ts+deduplicationInterval]
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> About late records, this is again due to the windowed
> > > >>>>> nature.
> > > >>>>>>>>>>>>>> Because the store won't save those late (i.e. expired)
> > > >>>>>> records,
> > > >>>>>>>>>>>>>> we have two options. Either, we do not apply
> deduplication
> > > >>>>>>>>>>>>>> on them, thus the deduplication doesn't work on late
> > > >>>>> records,
> > > >>>>>>>>>>>>>> or we discard them (which is the option I suggest).
> > > >>>>>>>>>>>>>> In the second case, It would be up to the user to choose
> > > >>>>>>>>>>>>>> any deduplicationInterval that may sufficiently cover
> all
> > > >>>>> his
> > > >>>>>>> late
> > > >>>>>>>>>> data.
> > > >>>>>>>>>>>>>> What do you think ?
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>>>> Ayoub
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Le mar. 4 juin 2024 à 23:58, Matthias J. Sax <
> > > >>>>>> mj...@apache.org>
> > > >>>>>>> a
> > > >>>>>>>>>>>> écrit :
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Ayoub,
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> thanks for resurrecting this KIP. I think a built-in
> > > >>>>>>>> de-duplication
> > > >>>>>>>>>>>>>>> operator will be very useful.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Couple of questions:
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> 100: `deduplicationKeySelector`
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Is this the best name? It might indicate that we
> select a
> > > >>>>>> "key"
> > > >>>>>>>> what
> > > >>>>>>>>>> is
> > > >>>>>>>>>>>>>>> an overloaded term... Maybe we could use `Field` or
> `Id`
> > > >>>> or
> > > >>>>>>>>>> `Attribute`
> > > >>>>>>>>>>>>>>> instead of `Key` in the name? Just brainstorming. If we
> > > >>>>> think
> > > >>>>>>>> `Key`
> > > >>>>>>>>>> is
> > > >>>>>>>>>>>>>>> the best word, I am also ok with it.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> 101: `Deduplicated` class
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> You propose to add static methods `keySerde()` and
> > > >>>>>>>> `valueSerde()` --
> > > >>>>>>>>>> in
> > > >>>>>>>>>>>>>>> other config classes, we use only `with(keySerde,
> > > >>>>>> valueSerde)`
> > > >>>>>>>> as we
> > > >>>>>>>>>>>> try
> > > >>>>>>>>>>>>>>> to use the "builder" pattern, and avoid too many
> > > >>>>> overloads. I
> > > >>>>>>>> would
> > > >>>>>>>>>>>>>>> prefer to omit both methods you suggest and just use a
> > > >>>>> single
> > > >>>>>>>> `with`
> > > >>>>>>>>>>>> for
> > > >>>>>>>>>>>>>>> both serdes.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Similarly, I thing we don't want to add `with(...)`
> which
> > > >>>>>> takes
> > > >>>>>>>> all
> > > >>>>>>>>>>>>>>> parameters at once (which should only be 3 parameters,
> > > >>>> not
> > > >>>>> 4
> > > >>>>>> as
> > > >>>>>>>> it's
> > > >>>>>>>>>>>>>>> currently in the KIP)?
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> 102: Usage of `WindowedStore`:
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Would this be efficient? The physical byte layout it
> > > >>>>>>>> "<dataKey,ts>"
> > > >>>>>>>>>> for
> > > >>>>>>>>>>>>>>> the store key, so it would be difficult to do an
> > > >>>> efficient
> > > >>>>>>> lookup
> > > >>>>>>>>>> for a
> > > >>>>>>>>>>>>>>> given "de-duplication key" to discard duplicates, as we
> > > >>>>> don't
> > > >>>>>>>> know
> > > >>>>>>>>>> the
> > > >>>>>>>>>>>>>>> timestamp of the first record with the same
> > > >>>> "de-duplication
> > > >>>>>>> key".
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> This boils down to the actual de-duplication logic
> (some
> > > >>>>> more
> > > >>>>>>>>>> comments
> > > >>>>>>>>>>>>>>> below), but what you propose seems to require expensive
> > > >>>>>>>> range-scans
> > > >>>>>>>>>>>> what
> > > >>>>>>>>>>>>>>> could be cost prohibitive in practice. I think we need
> to
> > > >>>>>> find
> > > >>>>>>> a
> > > >>>>>>>> way
> > > >>>>>>>>>> to
> > > >>>>>>>>>>>>>>> use efficient key-point-lookups to make this work.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> 103: "Processing logic":
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Might need some updates (Cf 102 comment). I am not sure
> > > >>>> if
> > > >>>>> I
> > > >>>>>>>> fully
> > > >>>>>>>>>>>>>>> understand the logic: cf 105 below.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> 104:
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> If no entries found → forward the record + save the
> > > >>>> record
> > > >>>>>> in
> > > >>>>>>>> the
> > > >>>>>>>>>>>> store
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> This part is critical, and we should discuss in detail.
> > > >>>> In
> > > >>>>>> the
> > > >>>>>>>> end,
> > > >>>>>>>>>>>>>>> de-duplication does only make sense when EOS is used,
> and
> > > >>>>> we
> > > >>>>>>>> might
> > > >>>>>>>>>> want
> > > >>>>>>>>>>>>>>> to call this out (eg, on the JavaDocs)? But if used
> with
> > > >>>>>> ALOS,
> > > >>>>>>>> it's
> > > >>>>>>>>>>>> very
> > > >>>>>>>>>>>>>>> difficult to ensure that we never lose data... Your
> > > >>>>> proposal
> > > >>>>>> to
> > > >>>>>>>>>>>>>>> first-forward goes into the right direction, but does
> not
> > > >>>>>>> really
> > > >>>>>>>>>> solve
> > > >>>>>>>>>>>>>>> the problem entirely:
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Even if we forward the message first, all downstream
> > > >>>>>> processing
> > > >>>>>>>>>>>> happens,
> > > >>>>>>>>>>>>>>> `context.forward()` returns and we update the state
> > > >>>> store,
> > > >>>>> we
> > > >>>>>>>> could
> > > >>>>>>>>>> now
> > > >>>>>>>>>>>>>>> crash w/o committing offsets. For this case, we have no
> > > >>>>>>> guarantee
> > > >>>>>>>>>> that
> > > >>>>>>>>>>>>>>> the result records where published (as we did not flush
> > > >>>> the
> > > >>>>>>>> producer
> > > >>>>>>>>>>>>>>> yet), but when re-reading from the input topic, we
> would
> > > >>>>> find
> > > >>>>>>> the
> > > >>>>>>>>>>>> record
> > > >>>>>>>>>>>>>>> in the store and incorrectly drop as duplicate...
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> I think the only solution to make this work would be to
> > > >>>> use
> > > >>>>>>>> TX-state
> > > >>>>>>>>>>>>>>> stores in combination with ALOS as proposed via
> KIP-892?
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Using an in-memory store won't help much either? The
> > > >>>>> producer
> > > >>>>>>>> could
> > > >>>>>>>>>>>> have
> > > >>>>>>>>>>>>>>> send the write into the changelog topic, but not into
> the
> > > >>>>>>> result
> > > >>>>>>>>>> topic,
> > > >>>>>>>>>>>>>>> and thus we could still not guarantee ALOS...?
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> How do we want to go about this? We could also say,
> this
> > > >>>>> new
> > > >>>>>>>> operator
> > > >>>>>>>>>>>>>>> only works with EOS. Would this be too restrictive? --
> At
> > > >>>>>> lest
> > > >>>>>>>> for
> > > >>>>>>>>>>>> know,
> > > >>>>>>>>>>>>>>> until KIP-892 lands, and we could relax it?
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> 105: "How to detect late records"
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> In the end, it seems to boil down to determine which of
> > > >>>> the
> > > >>>>>>>> records
> > > >>>>>>>>>> to
> > > >>>>>>>>>>>>>>> forward and which record to drop, for (1) the regular
> > > >>>> case
> > > >>>>>> and
> > > >>>>>>>> (2)
> > > >>>>>>>>>> the
> > > >>>>>>>>>>>>>>> out-of-order data case.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Regular case (no out-of-order data): For this case,
> > > >>>> offset
> > > >>>>>> and
> > > >>>>>>> ts
> > > >>>>>>>>>> order
> > > >>>>>>>>>>>>>>> is the same, and we can forward the first record we
> get.
> > > >>>>> All
> > > >>>>>>>> later
> > > >>>>>>>>>>>>>>> record within "de-duplication period" with the same
> > > >>>>>>>> "de-duplication
> > > >>>>>>>>>>>> key"
> > > >>>>>>>>>>>>>>> would be dropped. If a record with the same
> > > >>>> "de-duplication
> > > >>>>>>> key"
> > > >>>>>>>>>>>> arrives
> > > >>>>>>>>>>>>>>> after "de-duplication period" passed, we cannot drop it
> > > >>>> any
> > > >>>>>>>> longer,
> > > >>>>>>>>>> but
> > > >>>>>>>>>>>>>>> would still forward it, as by the contract of the
> > > >>>> operator
> > > >>>>> /
> > > >>>>>>>>>>>>>>> de-duplication period.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> For the out-of-order case: The first question we need
> to
> > > >>>>>> answer
> > > >>>>>>>> is,
> > > >>>>>>>>>> do
> > > >>>>>>>>>>>>>>> we want to forward the record with the smallest offset
> or
> > > >>>>> the
> > > >>>>>>>> record
> > > >>>>>>>>>>>>>>> with the smallest ts? Logically, forwarding with the
> > > >>>>> smallest
> > > >>>>>>> ts
> > > >>>>>>>>>> might
> > > >>>>>>>>>>>>>>> be "more correct", however, it implies we could only
> > > >>>>> forward
> > > >>>>>> it
> > > >>>>>>>> after
> > > >>>>>>>>>>>>>>> "de-duplication period" passed, what might be undesired
> > > >>>>>>> latency?
> > > >>>>>>>>>> Would
> > > >>>>>>>>>>>>>>> this be desired/acceptable?
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> In contrast, if we forward record with the smallest
> > > >>>> offset
> > > >>>>>>> (this
> > > >>>>>>>> is
> > > >>>>>>>>>>>> what
> > > >>>>>>>>>>>>>>> you seem to propose) we don't have a latency issue, but
> > > >>>> of
> > > >>>>>>>> course the
> > > >>>>>>>>>>>>>>> question what records to drop is more tricky to answer:
> > > >>>> it
> > > >>>>>>> seems
> > > >>>>>>>> you
> > > >>>>>>>>>>>>>>> propose to compare the time difference of the stored
> > > >>>> record
> > > >>>>>> to
> > > >>>>>>>> the
> > > >>>>>>>>>>>>>>> current record, but I am wondering why? Would it not be
> > > >>>>>> desired
> > > >>>>>>>> to
> > > >>>>>>>>>> drop
> > > >>>>>>>>>>>>>>> all duplicates independent of their ts, as long as we
> > > >>>> find
> > > >>>>> a
> > > >>>>>>>> record
> > > >>>>>>>>>> in
> > > >>>>>>>>>>>>>>> the store? Would be good to get some more motivation
> and
> > > >>>>>>>> tradeoffs
> > > >>>>>>>>>>>>>>> discussed about the different strategies we could use.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> You also propose to drop _any_ late record: I am also
> not
> > > >>>>>> sure
> > > >>>>>>> if
> > > >>>>>>>>>>>> that's
> > > >>>>>>>>>>>>>>> desired? Could this not lead to data loss? Assume we
> get
> > > >>>> a
> > > >>>>>> late
> > > >>>>>>>>>> record,
> > > >>>>>>>>>>>>>>> but in fact there was never a duplicate? Why would we
> > > >>>> want
> > > >>>>> to
> > > >>>>>>>> drop
> > > >>>>>>>>>> it?
> > > >>>>>>>>>>>>>>> If there is a late record which is indeed a duplicate,
> > > >>>> but
> > > >>>>> we
> > > >>>>>>>> purged
> > > >>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>> original record from the store already, it seems to be
> > > >>>> the
> > > >>>>>> same
> > > >>>>>>>> case
> > > >>>>>>>>>> as
> > > >>>>>>>>>>>>>>> for the "no out-of-order case": after we purged we
> cannot
> > > >>>>>>>>>> de-duplicate
> > > >>>>>>>>>>>>>>> and thus it's a regular case we can just accept?
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> -Matthias
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> On 5/29/24 4:58 AM, Ayoub Omari wrote:
> > > >>>>>>>>>>>>>>>> Hi everyone,
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> I've just made a (small) change to this KIP about an
> > > >>>>>>>> implementation
> > > >>>>>>>>>>>>>>> detail.
> > > >>>>>>>>>>>>>>>> Please let me know your thoughts.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Thank you,
> > > >>>>>>>>>>>>>>>> Ayoub
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Le lun. 20 mai 2024 à 21:13, Ayoub <
> > > >>>>> ayoubomar...@gmail.com>
> > > >>>>>> a
> > > >>>>>>>>>> écrit :
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Hello,
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Following a discussion on community slack channel, I
> > > >>>>> would
> > > >>>>>>>> like to
> > > >>>>>>>>>>>>>>> revive
> > > >>>>>>>>>>>>>>>>> the discussion on the KIP-655, which is about adding
> a
> > > >>>>>>>>>> deduplication
> > > >>>>>>>>>>>>>>>>> processor in kafka-streams.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>
>

Reply via email to