Lucas, Thanks for your remarks. It seems we agree to remove the `.deduplicate()` api that does the repartitioning. So I removed it from the KIP.
I also added more details of the semantics in the java docs, please mention anything that should be added/removed if there is any. I will open a vote thread for this KIP. Best, Ayoub Le mer. 6 nov. 2024 à 17:29, Lucas Brutschy <lbruts...@confluent.io.invalid> a écrit : > Hi Ayoub, > > I see that there was already a very long discussion thread, I won't > start nitpicking names here. I think we are ready to move on to a > vote. The agreed upon solutioins sound very reasonable to me, I > especially like how you handled the ALOS case. Not deduplicating late > records is a defensive choice - it's the lesser evil, even if it can > lead to surprises for the user (data duplication surprises are > certainly better than data loss surprises). Let's just make sure that, > we write a long description of all those subtleties in the > documentation, and not ask people to find it in the KIP. > > My main point is that I'd indeed vote to remove `deduplicate`. As > stated before, the `deduplicate` operator does many things at once and > it's easy for users to accidentally doing to much (repartitioning) by > using it. It's probably a not-so-common use case which is not so hard > to replicate using `deduplicateByKey`. > > Side note on for expiring records using punctuation - I see that this > is, performance-wise, the best option we have here, so I agree going > with this approach in the KIP. Just want to throw out there that in > C++ rocksdb seems to have compaction filter which should give us the > ability to expire records during compaction (and otherwise ignore > expired records during processing) - this may be a nice long-term > improvement for these kinds of things, but it would require extending > the RocksDB JNI interfact to allow implementing custom > CompactionFilters, IIUC. So it doesn't directly affect your KIP. > > Thanks for the hard work and for not giving up on this KIP! > > Cheers, > Lucas > > On Tue, Nov 5, 2024 at 11:45 AM Ayoub Omari <ayoubomar...@gmail.com> > wrote: > > > > Hi Matthias, > > > > Thanks for your review, and sorry for this late reply. > > > > (102) Ok, I keep it open here. > > > > > > > > (104) > > > Well, but if we say punctuation records are "inherently distinct", > > > there is nothing to be de-duplicated? > > > > We still need to deduplicate punctuation records even if they are > distinct. > > A functional deduplication acts essentially on distinct records having > the > > same functional id. It's the same as for input records. The records are > > distinct, > > but we keep only one from those having the same deduplication id. > > > > > > > > (105) > > > even if they never use a smaller interval, because in the end, > > > the question is always which ms is included and which one is excluded. > > > > Agreed, the KIP now specified that "Deduplication interval ends are > > inclusive." > > > > > > > > > Why do you point out that `This is a duplicate of a2` -- why does it > > matter? > > > We did drop a2 on the floor already? > > > The reason why we forward a3, is because it is outside of a1's > > > deduplicationInterval, right? The fact that we did drop a2 on the > floor, > > > seems irrelevant? > > > > Right, fixed in the KIP. > > > > > > Thanks for your opinions on the semantics ! If no points are raised in > the > > next > > few days, I will start a voting thread. > > We can always continue discussion in this thread after that :) > > > > > > Best, > > Ayoub > > > > > > Le ven. 4 oct. 2024, 02:37, Matthias J. Sax <mj...@apache.org> a écrit : > > > > > Thanks Ayoub! > > > > > > (102) let's hear what others think. > > > > > > (103) Keeping it internal if fine with me. Was just curious. > > > > > > (104) > > > > > > > The offset checking was specifically added for the case of > reprocessing > > > > the same input record. > > > > > > Yes. > > > > > > > > > > Punctuation records are generated by the application. And I assume > > > > we can consider that any two punctuated records are inherently > distinct ? > > > > > > I agree. And if they are "inherently distinct" we should never > > > de-duplicate them, right? > > > > > > > > > > Also, for me, we should apply deduplication on punctuated records, > > > > because if the user defines a punctuator before calling > deduplication, > > > > it means they want to deduplicate them as well, otherwise they > > > > could define it later in the pipeline. > > > > > > Well, but if we say punctuation records are "inherently distinct", > there > > > is nothing to be de-duplicated? > > > > > > > > > > > > (105) > > > > > > > However, I don't see practical use cases of a tiny deduplication > > > interval. > > > > > > I agree, but that's not the point. We need to clearly define it, to > > > avoid "off by one" errors and to be able to implement it (and write > > > bound tests), and give users a change to reason about it, even if they > > > never use a smaller interval, because in the end, the question is > always > > > which ms is included and which one is excluded. > > > > > > Assume one has a window of 1 hour, and get a record with Y ms first, > and > > > later Y + 1hour -- do we forward or drop the second record is an > > > question we need to answer. We cannot leave this question open :) > > > > > > > > > > > > For the "how to handle later records", your example make sense to me, > > > and I think it's reasonable semantics. Again: to me it's just important > > > that the KIP is very explicit with such edge cases and that it does not > > > leave any room for interpretation. > > > > > > > > > From the KIP: > > > > > > > event a3 at t+11s → This is a duplicate of a2 (i.e. within the > > > deduplicationInterval) which has not been forwarded, we will forward > it as > > > if there was never an event a2 > > > > > > Why do you point out that `This is a duplicate of a2` -- why does it > > > matter? We did drop a2 on the floor already? > > > > > > The reason why we forward a3, is because it is outside of a1's > > > deduplicationInterval, right? The fact that we did drop a2 on the > floor, > > > seems irrelevant? > > > > > > > > > From the KIP: > > > > > > > The same thing applies for deduplicating out-of-order records: > > > > > > > > event a1 @t=15s → Forwarded > > > > event a2 @t=5s → Dropped > > > > event a3 @t=4s → Forwarded > > > > > > This is interesting. So you propose to also apply the > > > "deduplicationInterval" into the past. I am ok with this. Just want to > > > point it out, as it's an important semantic piece. > > > > > > Same for the examples with stream-time and different keys and state > > > content. Love these examples. Makes it very clear what's going on. I am > > > personally happy with these semantics (I guess I would also be happy if > > > we would choose different semantics thought). > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > > > On 9/4/24 7:49 AM, Ayoub Omari wrote: > > > > Hi Matthias, > > > > > > > > Thanks for your comments ! > > > > > > > > (100) > > > > Yes, there will be repartitioning if a key-changing operation > > > > was used before. Updated java docs. > > > > > > > > > > > > (101) > > > > Yes, KIP updated. > > > > > > > > > > > > (102) > > > > My idea was that a user may think of `.deduplicate()` as > > > > a black-box which only rejects records, without changing > > > > the stream's structure. > > > > > > > > I am also concerned by the auto-back-repartitioning. > > > > If it is fine not to provide the two apis (like in groupByKey vs > > > groupBy), > > > > I agree to remove it. > > > > > > > > > > > > (103) > > > > I think the store should be kept internal as it will have > > > > some internal information. For example, for the eviction algorithm > > > > we could have a time-index which is part of the same state store > > > > (like in DualSchema stores where the first bit of each entry > > > > determines if the entry belongs to the base or to the index store). > > > > > > > > Also, it should hold the offset and timestamp of each record which > > > > are technical information that an application may not need to know. > > > > WDYT ? > > > > > > > > > > > > (104) > > > > > > > > The offset checking was specifically added for the case of > reprocessing > > > > > > > > the same input record. > > > > > > > > > > > > Punctuation records are generated by the application. And I assume > > > > > > > > we can consider that any two punctuated records are inherently > distinct ? > > > > > > > > > > > > Also, for me, we should apply deduplication on punctuated records, > > > > > > > > because if the user defines a punctuator before calling > deduplication, > > > > > > > > it means they want to deduplicate them as well, otherwise they > > > > > > > > could define it later in the pipeline. > > > > > > > > > > > > > > > > > > > > (105) > > > > > > > > Thanks for these points ! I added more details and examples to the > KIP. > > > > > > > > > > > > Some of the examples you mentioned were buried in the text, I added > > > > > > > > some highlights and examples to make the semantics clearer. > > > > > > > > > > > > About the lower bound of deduplicationInterval, I think it's more > > > intuitive > > > > > > > > to make the interval ends inclusive, which gives a lower bound of 0. > > > > > > > > However, I don't see practical use cases of a tiny deduplication > > > interval. > > > > > > > > Ideally, the id chosen by the user to be a deduplication id should be > > > unique > > > > > > > > (at least for an extended period of time). If deduplication interval > is > > > > chosen > > > > > > > > to be some milliseconds, then I suppose it's more rate limiting than > > > > > > > > deduplication ? > > > > > > > > > > > > The current semantics are more adapted to a longer > > > > > > > > dedup-interval. Small dedup-intervals make it more > > > > > > > > likely that an out-of-order record is a late record, in which case > > > > > > > > records are most likely to be forwarded without dedup. > > > > > > > > > > > > As an example, if we consider a case of de-dup interval=0, > > > > > > > > and following events: > > > > > > > > K1 @ t=10ms > > > > > > > > K2 @ t=11ms > > > > > > > > K1 @ t=10ms > > > > > > > > > > > > Then the current semantics would detect the second event K1 as late, > > > > > > > > and would forward it. > > > > > > > > (Note: the first event here may still exist in the store > > > > > > > > if eviction is not yet triggered, however it is an expired entry. > > > > > > > > In order to have a consistent general behavior we would simulate > that all > > > > entries > > > > > > > > should be expired once dedup-interval is elapsed, if we find an entry > > > > > > > > in the store which is already expired, we ignore it) > > > > > > > > > > > > I am open to further discussion on this. > > > > > > > > > > > > > > > > > > > > If we agree on these semantics, I will add a description of the > > > processor's > > > > > > > > behavior on late records in java docs, which would make it clearer > > > > > > > > that it's more suitable to have longer dedup-intervals. > > > > > > > > > > > > > > > > Best, > > > > > > > > Ayoub > > > > > > > > > > > > > > > > > > > > Le sam. 31 août 2024 à 03:50, Matthias J. Sax <mj...@apache.org> a > > > écrit : > > > > > > > >> Thanks for updating the KIP. > > > >> > > > >> A few comments/question: > > > >> > > > >> > > > >> > > > >> (100): For `deduplicateByKey(...)` the input stream might still be > > > >> repartitioned if the key was changed upstream, right (similar to how > > > >> `groupByKey()` and `join()` works)? > > > >> > > > >> builder.stream(...).deduplicateByKey(); // no repartitioing > > > >> buidler.stream(...).map(...).deduplicateByKey(); // > repartitioning > > > >> > > > >> If my understanding is correct, it should be reflected in the > JavaDocs. > > > >> > > > >> > > > >> > > > >> (101): I guess the same as (100) applies to > > > `deduplicateByKeyValue(...)`? > > > >> > > > >> > > > >> > > > >> (102): I am not 100% sure if `deduplicate()` is the best API, and > if we > > > >> even need it? It seems we could express `deduplicate()` also as: > > > >> > > > >> stream.map(...).deduplicateByKey(...).map(...).repartition(); > > > >> > > > >> I am also not sure, how common it is to de-duplicate on a value > field, > > > >> _and_ expect/want the result stream to be still partitioned on the > > > >> original key? I could also imagine that the semantics of > `deduplicate()` > > > >> would actually be the same as (and this might be more intuitive for > > > >> users?): > > > >> > > > >> stream.map(...).deduplicateByKey(...) > > > >> > > > >> Ie, there won't be no automatic "back partitioning" on the original > key. > > > >> Is there really demand for auto-back-partitioning? > > > >> > > > >> Atm, I tend to think that `deduplicate()` might not be required to > get > > > >> added at all, as it's only syntactic sugar anyway (no matter which > > > >> semantics, w/ or w/o auto-back-partitioning we pick). [And if there > is > > > >> user demand down the road, it seems simple enough to add it later.] > > > >> > > > >> > > > >> > > > >> (103) `Deduplicated`: Right now it only takes a name for the state > > > >> store, what mean, it's not possible to plug-in a different store. Is > > > >> this intentional, or should we allow to pass in `DslStoreSuppliers` > > > >> and/or `KeyValueBytesStoreSupplier`? We could also defer this for a > > > >> follow up KIP. > > > >> > > > >> > > > >> > > > >> (104) Punctuation: > > > >> > > > >>> For punctuated records - whose offset value is null - > deduplication is > > > >> always performed. > > > >> > > > >> Why are we doing it this way? We could also say, we never > de-duplicate > > > >> if there is no offset (similar to null-key and/or null-id). (Or > maybe > > > >> even make it configurable -- of course, we can defer a config also > for a > > > >> follow up KIP in case there is demand.) > > > >> > > > >> In general, keeping stuff and letting users add additional > filtering is > > > >> the better default. If we filter, users have no way to "resurrect" > the > > > >> data, but if we keep it, adding an additional filter is possible. > > > >> (Similar to what is propose for late record -- we forward, not > drop). > > > >> > > > >> > > > >> > > > >> (105) Deduplication logic: > > > >> > > > >> I am not 100% sure if I understand the proposed logic with regard to > > > >> time-semantics and out-of-order data. > > > >> > > > >> Assume there is no out-of-order data (de-dup interval 10 sec): > > > >> > > > >> > > > >> Example A: > > > >> > > > >> k1@20 -> forwarded and inserted into store -> stream-time=20 > > > >> k1@25 -> dropped as duplicate stream-time=25 > > > >> k1@35 -> forwarded and inserted into store -> stream-time=35 (we > did > > > >> discard k1@20 already and thus k1@35 is not considered a duplicate) > > > >> > > > >> This example is clear, however, we still need to define the exact > > > >> cut-off point, ie, what is the first record which is not dropped any > > > >> longer? k1@29 is clearly dropped, and k1@31 would clearly be > forwarded; > > > >> I am not sure about k1@30 -> would it purge k1@20 already and thus > be > > > >> forwarded, or would it be dropped? > > > >> > > > >> The question boils down to if de-dup-interval of 0 is allowed (ie, > if we > > > >> define the upper-bound of the de-duplication interval as inclusive > or > > > >> exclusive)? If we allow interval size of zero (mean upper bound is > > > >> inclusive), it means we only de-duplicate with the same timestamp > for > > > >> interval 0: > > > >> > > > >> Example B: > > > >> > > > >> k1@20 -> forwarded and inserted into store -> stream-time=20 > > > >> k1@20 -> dropped as duplicate stream-time=20 > > > >> k1@21 -> forwarded and inserted into store -> stream-time=21 (we > did > > > >> discard k1@20 already and thus k1@21 is not considered a duplicate) > > > >> > > > >> For this case, k1@30 in Example A above would be dropped. > > > >> > > > >> However, if we think interval zero should not a valid interval, and > 1 is > > > >> the allowed minimum (and thus, the upper bound is exclusive), > interval 1 > > > >> says to de-duplicate with the same timestamp: > > > >> > > > >> Example C (same as Example B, even if interval is 1 now, due to > > > >> different interval bound semantics): > > > >> > > > >> k1@20 -> forwarded and inserted into store -> stream-time=20 > > > >> k1@20 -> dropped as duplicate stream-time=20 > > > >> k1@21 -> forwarded and inserted into store -> stream-time=21 (we > did > > > >> discard k1@20 already and thus k1@21 is not considered a duplicate) > > > >> > > > >> For this case, k1@30 in Example A above would be forwarded. > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> What about out-of-order (same key): > > > >> > > > >> k1@20 -> forwarded and inserted into store -> stream-time=20 > > > >> k1@9 -> ? > > > >> I think we should forward; time diff is larger than de-dup > interval > > > >> Also: record is "late" with regard to stream-time, so we should > > > >> forward as "late" > > > >> > > > >> k1@11 -> ? > > > >> we could de-dup; we have k1@20 in the store > > > >> ts is within de-dup-interval > > > >> ts is not older than stream-time minus de-dup interval either > (ie, > > > >> not "late") > > > >> > > > >> k1@25 -> dropped as duplicate stream-time=25 > > > >> k1@23 -> dropped as duplicate stream-time=25 > > > >> > > > >> k1@11 -> ? > > > >> we could de-dup; we have k1@20 in the store > > > >> ts is within de-dup-interval > > > >> ts IS OLDER than stream-time minus de-dup interval though, we > could > > > >> also forward as "late" > > > >> > > > >> > > > >> > > > >> > > > >> What about different keys? What about different keys in combination > with > > > >> "late" records? Overall, I could not implement the logic base on > the KIP > > > >> as there is too many unspecified cases. > > > >> > > > >> > > > >> > > > >> -Matthias > > > >> > > > >> > > > >> > > > >> > > > >> On 8/30/24 2:08 AM, Ayoub Omari wrote: > > > >>> Hi Bill, > > > >>> > > > >>> Thanks for your answer. > > > >>> > > > >>>> Can you update the KIP to state that deduplication by value will > > > result > > > >> in > > > >>>> auto-repartiting by Kafka Streams > > > >>> > > > >>> Updated the KIP to the latest proposal. > > > >>> > > > >>>> For option 3, what about `deduplicateByKeyValue`? (If during the > PR > > > >> phase > > > >>>> we stumble across a better name we can update the KIP then too). > > > >>> > > > >>> Yes, better than the first suggested name. Reflected in the KIP. > > > >>> > > > >>> I also added a note about records with null deduplication key. > > > >>> > > > >>> If no new points are raised in the next few days, I will start a > VOTE > > > >>> thread. > > > >>> > > > >>> Thanks, > > > >>> Ayoub > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> Le mar. 27 août 2024 à 17:47, Bill Bejeck <bbej...@gmail.com> a > écrit > > > : > > > >>> > > > >>>> Hi Ayoub, > > > >>>> > > > >>>> Thanks for the update. > > > >>>> > > > >>>> Your latest proposal looks good to me, but I have a couple of > > > comments. > > > >>>> > > > >>>> in (2) we repartition by the id field > > > >>>>> > > > >>>> > > > >>>> I agree with this approach, but the KIP still states "No > > > repartitioning > > > >> is > > > >>>> done by this processor. In case a user wants to deduplicate an id > over > > > >>>> multiple partitions, they should first repartition the topic on > this > > > >> id." > > > >>>> Can you update the KIP to state that deduplication by value will > > > result > > > >> in > > > >>>> auto-repartiting by Kafka Streams, unless the developer adds a > manual > > > >>>> repartitioning operator themselves. > > > >>>> > > > >>>> For option 3, what about `deduplicateByKeyValue`? (If during the > PR > > > >> phase > > > >>>> we stumble across a better name we can update the KIP then too). > > > >>>> > > > >>>> This latest update has been open for a while now, I'd say it's > good to > > > >>>> start a VOTE thread in 1-2 days. > > > >>>> > > > >>>> Thanks, > > > >>>> Bill > > > >>>> > > > >>>> On Mon, Aug 26, 2024 at 3:15 AM Ayoub Omari < > ayoubomar...@gmail.com> > > > >>>> wrote: > > > >>>> > > > >>>>> Hi Bill, > > > >>>>> > > > >>>>> I think all the points raised are now handled in the KIP. > > > >>>>> We just still have the 106) on which we should have an > > > >>>>> agreement. I gave a proposition in my last mail, and > > > >>>>> I am waiting for others' opinions. > > > >>>>> > > > >>>>> My proposition is to have 3 methods of deduplication: > > > >>>>> 1) `deduplicateByKey()` > > > >>>>> 2) `deduplicate((k, v) -> v.id)` > > > >>>>> 3) `deduplicateByKeyAndId((k, v) -> v.id)` (Name to be chosen) > > > >>>>> > > > >>>>> The point of having both (2) and (3) is that in (2) we > repartition > > > >>>>> by the id field, Whereas in (3) we are not repartitioning. > > > >>>>> > > > >>>>> (3) is needed in case the current partitioning ensures that a > same > > > >>>>> id will always land on the same task. Here is an example I gave > > > above: > > > >>>>> > > > >>>>>> For example, we have a topic with keyValue (`userId`, > `transaction`) > > > >>>>>> and deduplication is done on `transaction`.`id` . Here, the > > > >> application > > > >>>>>> wants to deduplicate transactions. It knows that a transaction > id > > > >>>>>> maps to a single userId. Any duplicate of that record would be > > > >> received > > > >>>>>> by the task which processes this userId. > > > >>>>> > > > >>>>> Let me know what you think. > > > >>>>> > > > >>>>> Thanks, > > > >>>>> Ayoub > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> Le jeu. 22 août 2024 à 21:37, Bill Bejeck <bbej...@apache.org> a > > > >> écrit : > > > >>>>> > > > >>>>>> Hi Ayoub, > > > >>>>>> > > > >>>>>> What's the status of this KIP? > > > >>>>>> > > > >>>>>> Regards, > > > >>>>>> Bill > > > >>>>>> > > > >>>>>> > > > >>>>>> On Sat, Jul 13, 2024 at 5:13 AM Ayoub Omari < > ayoubomar...@gmail.com > > > > > > > >>>>>> wrote: > > > >>>>>> > > > >>>>>>> Hi Bill, > > > >>>>>>> > > > >>>>>>> Thanks for your comments ! > > > >>>>>>> > > > >>>>>>> I am prefixing your points with B. > > > >>>>>>> > > > >>>>>>> B1. > > > >>>>>>> That's a good catch. I think punctuated records aren't > concerned by > > > >>>>>>> this problem of reprocessing because they are not issued twice. > > > >>>>>>> We will still need to make a difference between a punctuated > > > >>>>>>> and a source record. The logic becomes: > > > >>>>>>> > > > >>>>>>>> if record.offset is not null && record.offset = offset_stored: > > > >>>>>>> > > > >>>>>>> forward(record) > > > >>>>>>> > > > >>>>>>> else: > > > >>>>>>> > > > >>>>>>> // do_not_forward > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> I will update the KIP on this point. > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> B2. (i.e. 106) > > > >>>>>>> That's the point discussed in 106. The main idea is to avoid > > > >>>>>> repartitioning > > > >>>>>>> when we don't have to. But let's say we want to repartition > when > > > >>>>>>> deduplication is on an id that is not the key. > > > >>>>>>> > > > >>>>>>> The user code may look something like this: > > > >>>>>>> ```kstream > > > >>>>>>> .deduplicate((k, v) -> v.id) > > > >>>>>>> .map((k, v) -> ...) > > > >>>>>>> ``` > > > >>>>>>> The user expects that the key in the map processor which > follows > > > the > > > >>>>>>> deduplicate is the same as the initial key. > > > >>>>>>> > > > >>>>>>> Internally, if we decide to repartition the deduplication > processor > > > >>>> may > > > >>>>>> do > > > >>>>>>> the following: > > > >>>>>>> ```kstream > > > >>>>>>> .map((k, v) -> (v.id, ...)) // For repartitioning on id > > > >>>>>>> .deduplicateByKey() > > > >>>>>>> .map((id, v) -> (initialKey, ...)) // To recover the > initial > > > >> key > > > >>>>>>> ``` > > > >>>>>>> > > > >>>>>>> In this case, there is some internal complexity of this > processor: > > > >>>>>>> I) We should repartition twice, before and after > > > deduplication. > > > >>>> The > > > >>>>>>> second > > > >>>>>>> map triggers repartitioning if there are following stateful > > > >>>> processors > > > >>>>>>> in the pipeline. > > > >>>>>>> > > > >>>>>>> II.) The initial key should be kept somewhere. One way > is to > > > >> wrap > > > >>>>> the > > > >>>>>>> value > > > >>>>>>> to add the key, and then unwrap it after the `deduplicateByKey` > > > >>>>>>> > > > >>>>>>> As mentioned in one use case above (deduplicating transactions > - > > > >>>>> written > > > >>>>>> at > > > >>>>>>> least > > > >>>>>>> once - by id), repartition is not needed for that case. This > added > > > >>>>>>> complexity won't > > > >>>>>>> be useful. > > > >>>>>>> > > > >>>>>>> *TLDR,* I think one solution is to have a separate api to cover > > > such > > > >>>> a > > > >>>>>> use > > > >>>>>>> case. In total, > > > >>>>>>> we can have 3 apis of deduplication > > > >>>>>>> > > > >>>>>>> 1) `deduplicateByKey()` *// No repartitioning* > > > >>>>>>> 2) `deduplicateByKey((k, v) -> v.id)` *or > > > >>>> *`deduplicateByKeyAndId((k, > > > >>>>>> v) > > > >>>>>>> -> v.id)` *// No repartitioning* > > > >>>>>>> 3) `deduplicate((k, v) -> v.id)` *// causes repartitioning* > > > >>>>>>> > > > >>>>>>> The second api is equivalent to deduplicate by the couple (key, > > > id). > > > >>>>>>> No repartitioning is required because the records with the > same key > > > >>>> are > > > >>>>>>> already > > > >>>>>>> together in the same partition. cc @Matthias @Sebastien. > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> B3. > > > >>>>>>> I think here we can do what some existing processors do for > > > periodic > > > >>>>>> work, > > > >>>>>>> we can track the last cleanup time and the current stream time, > > > >>>>>>> if the delta exceeds the deduplication period, we trigger the > > > >>>> cleanup. > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> Best, > > > >>>>>>> Ayoub > > > >>>>>>> > > > >>>>>>> Le jeu. 11 juil. 2024 à 01:17, Bill Bejeck <bbej...@apache.org> > a > > > >>>>> écrit > > > >>>>>> : > > > >>>>>>> > > > >>>>>>>> Hi All, > > > >>>>>>>> > > > >>>>>>>> Thanks Ayoub for getting this KIP going again, I think a > > > >>>>> deduplication > > > >>>>>>>> operator will be very useful. My applogies for being late to > the > > > >>>>>>>> discussion. > > > >>>>>>>> > > > >>>>>>>> Overall - I agree with the direction of the KIP but I have a > > > couple > > > >>>>> of > > > >>>>>>>> questions. > > > >>>>>>>> > > > >>>>>>>> 1. Regarding using offsets for tracking the first > arrival. I > > > >>>> think > > > >>>>>>>> there could be a case when offsets would not be available, > when > > > >>>>> records > > > >>>>>>> are > > > >>>>>>>> forwarded by punctuation for example. > > > >>>>>>>> > > > >>>>>>>> 2. I'm not sure about using something other than the key for > > > >>>>>> identifying > > > >>>>>>>> dupllicate records. By doing so, one could end up missing a > > > >>>>>>> de-duplication > > > >>>>>>>> due to records with the same id characteristic in the value > but > > > >>>>> having > > > >>>>>>>> different keys so the records land on different partitions. I > > > >>>> guess > > > >>>>> we > > > >>>>>>>> could enforce a repartition if one choses to use a > de-duplication > > > >>>> id > > > >>>>>>> other > > > >>>>>>>> than the key. > > > >>>>>>>> > > > >>>>>>>> 3. I think a punctuation scheduled to run at the > de-duplication > > > >>>>> period > > > >>>>>> to > > > >>>>>>>> clean out older records would be a clean approach for purging > > > older > > > >>>>>>>> records. I'm not taking a hard stance on this approach and > I'm > > > >>>>> willing > > > >>>>>>> to > > > >>>>>>>> discuss different methods. > > > >>>>>>>> > > > >>>>>>>> Thanks, > > > >>>>>>>> > > > >>>>>>>> Bill > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> On 2024/06/25 18:44:06 Ayoub Omari wrote: > > > >>>>>>>>> Hi Matthias, > > > >>>>>>>>> > > > >>>>>>>>> Here are my updates on your points. > > > >>>>>>>>> > > > >>>>>>>>> 101. > > > >>>>>>>>>> You propose to add static methods `keySerde()` and > > > >>>> `valueSerde()` > > > >>>>>> -- > > > >>>>>>>>>> in other config classes, we use only `with(keySerde, > > > >>>> valueSerde)` > > > >>>>>> as > > > >>>>>>> we > > > >>>>>>>>> try > > > >>>>>>>>>> to use the "builder" pattern, and avoid too many overloads. > I > > > >>>>> would > > > >>>>>>>>>> prefer to omit both methods you suggest and just use a > single > > > >>>>>> `with` > > > >>>>>>>> for > > > >>>>>>>>>> both serdes. > > > >>>>>>>>> > > > >>>>>>>>> I was actually inspired by the other config classes, for > example > > > >>>>>>> `Joined` > > > >>>>>>>>> and > > > >>>>>>>>> `Grouped` both have the static methods `keySerde()` and > > > >>>>>> `valueSerde()`. > > > >>>>>>>>> > > > >>>>>>>>>> I think we don't want to add `with(...)` which takes all > > > >>>>>>>>>> parameters at once > > > >>>>>>>>> > > > >>>>>>>>> Done. > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> 102. > > > >>>>>>>>> Thanks, your suggestion sounds good to me. The trade-off of > > > >>>> having > > > >>>>> an > > > >>>>>>>> index > > > >>>>>>>>> that allows to efficiently purge expired records besides the > > > >>>>> keyValue > > > >>>>>>>> store > > > >>>>>>>>> makes sense. I've been looking into the code, and I think a > > > >>>> similar > > > >>>>>>> idea > > > >>>>>>>>> was implemented for other processors (for example with > > > >>>>>>>>> DualSchemaRocksDBSegmentedBytesStore). > > > >>>>>>>>> As you said, I think we would benefit from some existing code > > > >>>> here. > > > >>>>>>>>> KIP updated ! > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> 104. > > > >>>>>>>>> Updated the KIP to consider records' offsets. > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> 105 > > > >>>>>>>>>> picking the first offset with smallest ts sounds good to me. > > > >>>> The > > > >>>>>> KIP > > > >>>>>>>>>> should be explicit about it > > > >>>>>>>>> > > > >>>>>>>>> Done. > > > >>>>>>>>> > > > >>>>>>>>>> But as discussed above, it might be > > > >>>>>>>>>> simplest to not really have a window lookup, but just a > plain > > > >>>>>>>> key-lookup > > > >>>>>>>>>> and drop if the key exists in the store? > > > >>>>>>>>> > > > >>>>>>>>> KIP updated, we will be `.get()`ing from a keyValueStore > instead > > > >>>> of > > > >>>>>>>>> `.fetch()`ing > > > >>>>>>>>> from a WindowStore. > > > >>>>>>>>> > > > >>>>>>>>>> Another line of thinking, that did serve us well in the > past: > > > >>>> in > > > >>>>>>> doubt > > > >>>>>>>>>> keep a record -- users can add operators to drop record (in > > > >>>> case > > > >>>>>> they > > > >>>>>>>>>> don't want to keep it), but if we drop a record, users have > no > > > >>>>> way > > > >>>>>> to > > > >>>>>>>>>> resurrect it (thus, building a workaround to change > semantica > > > >>>> is > > > >>>>>>>>>> possible for users if we default to keep records, but not > the > > > >>>>> other > > > >>>>>>> way > > > >>>>>>>>>> around). > > > >>>>>>>>> > > > >>>>>>>>> Makes total sense ! I updated the KIP to forward late records > > > >>>>> instead > > > >>>>>>> of > > > >>>>>>>>> dropping them. > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> 106. > > > >>>>>>>>> For the moment, I highlighted in Javadocs that we are > > > >>>> deduplicating > > > >>>>>> by > > > >>>>>>>>> partition. If there is a better name to have this > information in > > > >>>>> the > > > >>>>>>> name > > > >>>>>>>>> of the api itself it would be good. > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> Best, > > > >>>>>>>>> Ayoub > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> Le jeu. 13 juin 2024 à 09:03, Sebastien Viale < > > > >>>>>>>> sebastien.vi...@michelin.com> > > > >>>>>>>>> a écrit : > > > >>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> Hi, > > > >>>>>>>>>> > > > >>>>>>>>>> 106 : > > > >>>>>>>>>> > > > >>>>>>>>>> Thanks for the clarification. Actually, this is not what I > > > >>>>>> expected, > > > >>>>>>>> but I > > > >>>>>>>>>> better understand the performance issues regarding the state > > > >>>>> store > > > >>>>>>>>>> iteration. > > > >>>>>>>>>> If this is how it should be designed, it is fine for me as > long > > > >>>>> as > > > >>>>>> it > > > >>>>>>>> is > > > >>>>>>>>>> clear that the repartition must be done before the > > > >>>> deduplication. > > > >>>>>>>>>> Sébastien > > > >>>>>>>>>> > > > >>>>>>>>>> ________________________________ > > > >>>>>>>>>> De : Matthias J. Sax <mj...@apache.org> > > > >>>>>>>>>> Envoyé : jeudi 13 juin 2024 02:51 > > > >>>>>>>>>> À : dev@kafka.apache.org <dev@kafka.apache.org> > > > >>>>>>>>>> Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication > > > >>>> processor > > > >>>>> in > > > >>>>>>>>>> kafka-streams > > > >>>>>>>>>> > > > >>>>>>>>>> Warning External sender Do not click on any links or open > any > > > >>>>>>>> attachments > > > >>>>>>>>>> unless you trust the sender and know the content is safe. > > > >>>>>>>>>> > > > >>>>>>>>>> 106: > > > >>>>>>>>>> > > > >>>>>>>>>>> For the use-case of deduplicating a "at least once written" > > > >>>>>> stream, > > > >>>>>>>>>>> we are sure that the duplicate record has the same key as > the > > > >>>>>>>>>>> original one, and will land on the same task. Here, a user > > > >>>>> would > > > >>>>>>>>>>> want to specify a deduplication key different from the > > > >>>> topic's > > > >>>>>> key > > > >>>>>>>>>>> in case the topic's key is not a unique identifier > > > >>>>>>>>>>> > > > >>>>>>>>>>> For example, we have a topic with keyValue (`userId`, > > > >>>>>>> `transaction`) > > > >>>>>>>>>>> and deduplication is done on `transaction`.`id` . Here, the > > > >>>>>>>> application > > > >>>>>>>>>>> wants to deduplicate transactions. It knows that a > > > >>>> transaction > > > >>>>> id > > > >>>>>>>>>>> maps to a single userId. Any duplicate of that record would > > > >>>> be > > > >>>>>>>> received > > > >>>>>>>>>>> by the task which processes this userId. > > > >>>>>>>>>> > > > >>>>>>>>>> This is an interesting point. > > > >>>>>>>>>> > > > >>>>>>>>>> My concern is to some extend, that it seems (on the > surface) to > > > >>>>> not > > > >>>>>>>>>> follow the established pattern of auto-repartitioning in the > > > >>>> DSL. > > > >>>>>> Of > > > >>>>>>>>>> course, given that the current proposal says we use an "id > > > >>>>>> extractor" > > > >>>>>>>>>> and not a "key extractor" it might be ok (but it might be > > > >>>>> somewhat > > > >>>>>>>>>> subtle). Of course, JavaDocs always help to explain in > detail. > > > >>>>>> Would > > > >>>>>>>>>> this be enough? > > > >>>>>>>>>> > > > >>>>>>>>>> Would be good to hear from others about this point. I am > > > >>>>> personally > > > >>>>>>> not > > > >>>>>>>>>> sure which approach I would prefer personally at this point. > > > >>>>>>>>>> > > > >>>>>>>>>> The problem reminds me on > > > >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10844< > > > >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10844> which is > > > >>>> not > > > >>>>>>>> resolve > > > >>>>>>>>>> directly either. We do have KIP-759 > > > >>>>>>>>>> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-759: > > > >>>>>> Unneeded > > > >>>>>>>>>> repartition canceling< > > > >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759: > > > >>>>>> Unneeded > > > >>>>>>>>>> repartition canceling>) > > > >>>>>>>>>> which is WIP and helps with KAFKA-10844, but not sure if it > > > >>>> would > > > >>>>>> be > > > >>>>>>> a > > > >>>>>>>>>> viable solution for the de-duplication case? > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> -Matthias > > > >>>>>>>>>> > > > >>>>>>>>>> This email was screened for spam and malicious content but > > > >>>>> exercise > > > >>>>>>>>>> caution anyway. > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> On 6/11/24 2:31 PM, Ayoub Omari wrote: > > > >>>>>>>>>>> Hi Sebastien & Matthias, > > > >>>>>>>>>>> > > > >>>>>>>>>>> For 106. > > > >>>>>>>>>>> My idea was to deduplicate on a per-task basis. If the user > > > >>>>> wants > > > >>>>>>>>>>> to do a global deduplication over all partitions, I think > > > >>>> it's > > > >>>>>>>> better to > > > >>>>>>>>>>> have him explicitly repartition and then call the > > > >>>> deduplication > > > >>>>>>>>>> processor. > > > >>>>>>>>>>> > > > >>>>>>>>>>> For the use-case of deduplicating a "at least once written" > > > >>>>>> stream, > > > >>>>>>>>>>> we are sure that the duplicate record has the same key as > the > > > >>>>>>>>>>> original one, and will land on the same task. Here, a user > > > >>>>> would > > > >>>>>>>>>>> want to specify a deduplication key different from the > > > >>>> topic's > > > >>>>>> key > > > >>>>>>>>>>> in case the topic's key is not a unique identifier. > > > >>>>>>>>>>> > > > >>>>>>>>>>> For example, we have a topic with keyValue (`userId`, > > > >>>>>>> `transaction`) > > > >>>>>>>>>>> and deduplication is done on `transaction`.`id` . Here, the > > > >>>>>>>> application > > > >>>>>>>>>>> wants to deduplicate transactions. It knows that a > > > >>>> transaction > > > >>>>> id > > > >>>>>>>>>>> maps to a single userId. Any duplicate of that record would > > > >>>> be > > > >>>>>>>> received > > > >>>>>>>>>>> by the task which processes this userId. > > > >>>>>>>>>>> > > > >>>>>>>>>>> One other thought I have when writing the KIP about global > > > >>>>>>>> deduplication, > > > >>>>>>>>>>> is that it will require to map twice the key of the stream > > > >>>>> (first > > > >>>>>>>> map to > > > >>>>>>>>>>> change the key to deduplication key, and second map to get > > > >>>>>>>>>>> back the initial key). Second map may imply a second > > > >>>>>>> repartitioning. > > > >>>>>>>>>>> > > > >>>>>>>>>>> However, if we do a per-task deduplication, the user may > > > >>>> adapt > > > >>>>> to > > > >>>>>>> his > > > >>>>>>>>>>> specific use-case. > > > >>>>>>>>>>> > > > >>>>>>>>>>> Let me know what you think > > > >>>>>>>>>>> Ayoub > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> Le mar. 11 juin 2024 à 20:21, Matthias J. Sax < > > > >>>>> mj...@apache.org> > > > >>>>>> a > > > >>>>>>>>>> écrit : > > > >>>>>>>>>>> > > > >>>>>>>>>>>> Thanks Sebastien, > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> that's a good point. Thanks for raising it. -- I like your > > > >>>>>>> proposal. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> An alternative would be to have two overloads of > > > >>>>> `deduplicate()` > > > >>>>>>>> one w/ > > > >>>>>>>>>>>> and one w/o the "id extractor" parameter. This would be > less > > > >>>>>>>> explicit > > > >>>>>>>>>>>> though. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> -Matthias > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> On 6/11/24 2:30 AM, Sebastien Viale wrote: > > > >>>>>>>>>>>>> Hi, > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> I am really interested in this KIP. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 106: > > > >>>>>>>>>>>>> I hope I am not talking nonsense, but if you do not > > > >>>>> deduplicate > > > >>>>>>>> based > > > >>>>>>>>>> on > > > >>>>>>>>>>>> the key, the input stream has to be repartitioned. > > > >>>>>>>>>>>>> Otherwise, different stream tasks may handle records that > > > >>>>> need > > > >>>>>> to > > > >>>>>>>> be > > > >>>>>>>>>>>> deduplicated, and thus duplicates will not be detected. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> This is why I would have created two different methods, > as > > > >>>> is > > > >>>>>>> done > > > >>>>>>>> for > > > >>>>>>>>>>>> GroupBy: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> deduplicateByKey(...) > > > >>>>>>>>>>>>> deduplicate(...) > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> If deduplicateByKey is used, the input stream does not > need > > > >>>>> to > > > >>>>>> be > > > >>>>>>>>>>>> repartitioned. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> thanks > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Sébastien > > > >>>>>>>>>>>>> ________________________________ > > > >>>>>>>>>>>>> De : Matthias J. Sax <mj...@apache.org> > > > >>>>>>>>>>>>> Envoyé : mardi 11 juin 2024 01:54 > > > >>>>>>>>>>>>> À : dev@kafka.apache.org <dev@kafka.apache.org> > > > >>>>>>>>>>>>> Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication > > > >>>>>> processor > > > >>>>>>> in > > > >>>>>>>>>>>> kafka-streams > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Warning External sender Do not click on any links or open > > > >>>> any > > > >>>>>>>>>>>> attachments unless you trust the sender and know the > content > > > >>>>> is > > > >>>>>>>> safe. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Thanks for the update Ayoub. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 101: you say: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> But I am not sure if we don't want to have them for this > > > >>>>>>>> processor ? > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> What is your reasoning to move off the established > pattern? > > > >>>>>> Would > > > >>>>>>>> be > > > >>>>>>>>>>>>> good to understand, why `Deduplicated` class needs a > > > >>>>> different > > > >>>>>>>>>>>>> "structure" compared to existing classes. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 102: Creating iterators is very expensive. For other > work, > > > >>>> we > > > >>>>>>>> actually > > > >>>>>>>>>>>>> hit 100x (?) throughput degradation by creating an (for > > > >>>> most > > > >>>>>>> cases > > > >>>>>>>>>>>>> empty) iterator for every input record, and needed to > find > > > >>>>>> other > > > >>>>>>>> ways > > > >>>>>>>>>> to > > > >>>>>>>>>>>>> avoid creating an iterator per record. It really kills > > > >>>>>>> performance. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> I see the point about data expiration. We could > experiment > > > >>>>> with > > > >>>>>>>>>>>>> punctuation to expire old data, or add a second > > > >>>> "time-ordered > > > >>>>>>>> store" > > > >>>>>>>>>>>>> (which we already have at hand) which acts as an index > into > > > >>>>> the > > > >>>>>>>> main > > > >>>>>>>>>>>>> store. -- Another possibility would be to add a new > version > > > >>>>> of > > > >>>>>>>>>> segmented > > > >>>>>>>>>>>>> store with a different key-layout (ie, just store the > plain > > > >>>>>>> key). I > > > >>>>>>>>>>>>> think with some refactoring, we might be able to re-use a > > > >>>> lot > > > >>>>>> of > > > >>>>>>>>>>>>> existing code. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 104: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> This gets me wondering if this is a limitation of > stateful > > > >>>>>>>> processors > > > >>>>>>>>>>>>>> in ALOS. For example a windowed aggregation with > > > >>>>>>> `on_window_close` > > > >>>>>>>>>>>>>> emit strategy may have the same limitation today (we > > > >>>>> receive a > > > >>>>>>>> record, > > > >>>>>>>>>>>>>> we update its aggregation result in the store, then > crash > > > >>>>>> before > > > >>>>>>>>>>>> committing, > > > >>>>>>>>>>>>>> then the record will be again reconsidered for > > > >>>> aggregation). > > > >>>>>> Is > > > >>>>>>>> this > > > >>>>>>>>>>>>>> correct ? > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Yes, this is correct, but it does not violate ALOS, > because > > > >>>>> we > > > >>>>>>> did > > > >>>>>>>> not > > > >>>>>>>>>>>>> lose the input record -- of course, the aggregation would > > > >>>>>> contain > > > >>>>>>>> the > > > >>>>>>>>>>>>> input record twice (eg, over count), but this is ok under > > > >>>>> ALOS. > > > >>>>>>>>>>>>> Unfortunately, for de-duplication this pattern breaks, > > > >>>>> because > > > >>>>>>>>>>>>> de-duplication operator does a different "aggregation > > > >>>> logic" > > > >>>>>>>> depending > > > >>>>>>>>>>>>> on its state (emit if no key found, but not emit if key > > > >>>>> found). > > > >>>>>>> For > > > >>>>>>>>>>>>> counting as an example, we increment the count and emit > > > >>>>>>>> unconditionally > > > >>>>>>>>>>>>> though. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> As a workaround, I think storing the record's offset > > > >>>> inside > > > >>>>>> the > > > >>>>>>>>>>>>>> store's value can tell us whether the record has been > > > >>>>> already > > > >>>>>>>> seen or > > > >>>>>>>>>>>> not. > > > >>>>>>>>>>>>>> If we receive a record whose deduplication id exists in > > > >>>> the > > > >>>>>>> store > > > >>>>>>>>>>>>>> and the entry in the store has the same offset, it means > > > >>>> the > > > >>>>>>>> record > > > >>>>>>>>>>>>>> is processed twice and we can go ahead and forward it. > If > > > >>>>> the > > > >>>>>>>> offset > > > >>>>>>>>>>>>>> is different, it means it's a duplicate record, so we > > > >>>> ignore > > > >>>>>> it. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Great idea. This might work... If we store the input > record > > > >>>>>>>> offset, we > > > >>>>>>>>>>>>> can actually avoid that the "aggregation logic" changes > for > > > >>>>> the > > > >>>>>>>> same > > > >>>>>>>>>>>>> input record. -- And yes, with ALOS potentially emitting > a > > > >>>>>>>> duplicate is > > > >>>>>>>>>>>>> the-name-of-the-game, so no concerns on this part from my > > > >>>>> side. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 105: picking the first offset with smallest ts sound good > > > >>>> to > > > >>>>>> me. > > > >>>>>>>> The > > > >>>>>>>>>> KIP > > > >>>>>>>>>>>>> should be explicit about it. But as discussed above, it > > > >>>> might > > > >>>>>> be > > > >>>>>>>>>>>>> simplest to not really have a window lookup, but just a > > > >>>> plain > > > >>>>>>>>>> key-lookup > > > >>>>>>>>>>>>> and drop if the key exists in the store? -- For late > > > >>>> records, > > > >>>>>> it > > > >>>>>>>> might > > > >>>>>>>>>>>>> imply that they are not de-duplicated, but this is also > the > > > >>>>>> case > > > >>>>>>>> for > > > >>>>>>>>>>>>> in-order records if they are further apart than the > > > >>>>>>> de-duplication > > > >>>>>>>>>>>>> window size, right? Thus I would believe this is "more > > > >>>>> natural" > > > >>>>>>>>>> compared > > > >>>>>>>>>>>>> to discarding late records pro-actively, which would lead > > > >>>> to > > > >>>>>>>> missing > > > >>>>>>>>>>>>> result records? > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> We could also make this configurable if we are not sure > > > >>>> what > > > >>>>>>> users > > > >>>>>>>>>>>>> really need -- or add such a configuration later in case > > > >>>> the > > > >>>>>>>> semantics > > > >>>>>>>>>>>>> we pick don't work for some users. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Another line of thinking, that did serve us well in the > > > >>>> past: > > > >>>>>> in > > > >>>>>>>> doubt > > > >>>>>>>>>>>>> keep a record -- users can add operators to drop record > (in > > > >>>>>> case > > > >>>>>>>> they > > > >>>>>>>>>>>>> don't want to keep it), but if we drop a record, users > have > > > >>>>> no > > > >>>>>>> way > > > >>>>>>>> to > > > >>>>>>>>>>>>> resurrect it (thus, building a workaround to change > > > >>>> semantica > > > >>>>>> is > > > >>>>>>>>>>>>> possible for users if we default to keep records, but not > > > >>>> the > > > >>>>>>>> other way > > > >>>>>>>>>>>>> around). > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Would be good to get input from the broader community on > > > >>>> this > > > >>>>>>>> question > > > >>>>>>>>>>>>> thought. In the end, it must be a use-case driven > decision? > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> -Matthias > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> This email was screened for spam and malicious content > but > > > >>>>>>> exercise > > > >>>>>>>>>>>> caution anyway. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> On 6/6/24 5:02 AM, Ayoub Omari wrote: > > > >>>>>>>>>>>>>> Hi Matthias, > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Thank you for your review ! > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> 100. > > > >>>>>>>>>>>>>> I agree. I changed the name of the parameter to > > > >>>>> "idSelector". > > > >>>>>>>>>>>>>> Because this id may be computed, It is better to call it > > > >>>>> "id" > > > >>>>>>>> rather > > > >>>>>>>>>>>> than > > > >>>>>>>>>>>>>> field or attribute. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> 101. > > > >>>>>>>>>>>>>> The reason I added the methods `keySerde()` and > > > >>>>> `valueSerde()` > > > >>>>>>>> was to > > > >>>>>>>>>>>>>> have the same capabilities as other serde classes (such > as > > > >>>>>>> Grouped > > > >>>>>>>>>>>>>> or Joined). As a Kafka-streams user, I usually use > > > >>>>>>> `with(keySerde, > > > >>>>>>>>>>>>>> valueSerde)` > > > >>>>>>>>>>>>>> as you suggested. But I am not sure if we don't want to > > > >>>> have > > > >>>>>>> them > > > >>>>>>>> for > > > >>>>>>>>>>>> this > > > >>>>>>>>>>>>>> processor ? > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> 102. > > > >>>>>>>>>>>>>> That's a good point ! Because we know that the window > > > >>>> store > > > >>>>>> will > > > >>>>>>>>>> contain > > > >>>>>>>>>>>>>> at most one instance of a given key, I am not sure how > the > > > >>>>>> range > > > >>>>>>>> fetch > > > >>>>>>>>>>>>>> on WindowStore compares to a KeyValueStore `get()` in > this > > > >>>>>> case. > > > >>>>>>>>>>>>>> Wondering if the fact that the record's key is the > prefix > > > >>>> of > > > >>>>>> the > > > >>>>>>>>>>>> underlying > > > >>>>>>>>>>>>>> keyValueStore's key ("<dataKey,ts>") may provide > > > >>>> comparable > > > >>>>>>>>>> performance > > > >>>>>>>>>>>>>> to the random access of KeyValueStore ? Of course, the > > > >>>>>>> WindowStore > > > >>>>>>>>>>>> fetch() > > > >>>>>>>>>>>>>> would be less efficient because it may fetch from more > > > >>>> than > > > >>>>> 1 > > > >>>>>>>> segment, > > > >>>>>>>>>>>> and > > > >>>>>>>>>>>>>> because of some iterator overhead. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> The purpose of using a WindowStore is to automatically > > > >>>> purge > > > >>>>>> old > > > >>>>>>>> data. > > > >>>>>>>>>>>>>> For example, deduplicating a topic written at least once > > > >>>>>>> wouldn't > > > >>>>>>>>>>>> require > > > >>>>>>>>>>>>>> keeping a large history. This is not the case of using a > > > >>>>>>>> KeyValueStore > > > >>>>>>>>>>>>>> which would require scanning regularly to remove expired > > > >>>>>>> records. > > > >>>>>>>>>>>>>> That might cause a sudden increase of latency whenever > the > > > >>>>>>> cleanup > > > >>>>>>>>>>>>>> is triggered. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> It would be good to hear from anyone who has done some > > > >>>>>> analysis > > > >>>>>>>>>>>>>> on RocksDB's range fetch. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> 103. > > > >>>>>>>>>>>>>> Sure, I can update it once we agree on underlying > > > >>>> semantics. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> 104. > > > >>>>>>>>>>>>>> Another good point ! > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> In the end, de-duplication does only make sense when > EOS > > > >>>> is > > > >>>>>>> used > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> I agree with that. And for me, the use case of > > > >>>>> deduplicating a > > > >>>>>>>> topic > > > >>>>>>>>>>>>>> written ALOS inside an EOS application might be the top > 1 > > > >>>>> use > > > >>>>>>> case > > > >>>>>>>>>>>>>> of deduplication. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> all downstream processing happens, `context.forward()` > > > >>>>>> returns > > > >>>>>>>>>>>>>>> and we update the state store, we could now crash w/o > > > >>>>>>> committing > > > >>>>>>>>>>>> offsets > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> This gets me wondering if this is a limitation of > stateful > > > >>>>>>>> processors > > > >>>>>>>>>>>>>> in ALOS. For example a windowed aggregation with > > > >>>>>>> `on_window_close` > > > >>>>>>>>>>>>>> emit strategy may have the same limitation today (we > > > >>>>> receive a > > > >>>>>>>> record, > > > >>>>>>>>>>>>>> we update its aggregation result in the store, then > crash > > > >>>>>> before > > > >>>>>>>>>>>> committing, > > > >>>>>>>>>>>>>> then the record will be again reconsidered for > > > >>>> aggregation). > > > >>>>>> Is > > > >>>>>>>> this > > > >>>>>>>>>>>>>> correct ? > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> As a workaround, I think storing the record's offset > > > >>>> inside > > > >>>>>> the > > > >>>>>>>>>>>>>> store's value can tell us whether the record has been > > > >>>>> already > > > >>>>>>>> seen or > > > >>>>>>>>>>>> not. > > > >>>>>>>>>>>>>> If we receive a record whose deduplication id exists in > > > >>>> the > > > >>>>>>> store > > > >>>>>>>>>>>>>> and the entry in the store has the same offset, it means > > > >>>> the > > > >>>>>>>> record > > > >>>>>>>>>>>>>> is processed twice and we can go ahead and forward it. > If > > > >>>>> the > > > >>>>>>>> offset > > > >>>>>>>>>>>>>> is different, it means it's a duplicate record, so we > > > >>>> ignore > > > >>>>>> it. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> As you said, we don't have any guarantee whether the > > > >>>> initial > > > >>>>>>>> record > > > >>>>>>>>>> was > > > >>>>>>>>>>>>>> forwarded or not in case of a crash before commit. In > this > > > >>>>>>>> solution > > > >>>>>>>>>>>>>> we would forward the record twice, which is against > > > >>>>>>> deduplication. > > > >>>>>>>>>>>>>> But, this is still an ALOS application, so it has the > same > > > >>>>>>>> semantics > > > >>>>>>>>>>>>>> as any other such application. With this, I am not sure > we > > > >>>>> can > > > >>>>>>>>>>>>>> have "strict" deduplication for ALOS applications. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> 105. > > > >>>>>>>>>>>>>> For me, if there are two duplicate records, it means > they > > > >>>>> are > > > >>>>>>>>>>>>>> the same in the application's point of view, so it can > > > >>>>> choose > > > >>>>>>>>>>>>>> either one. Thus, I would go with forwarding the record > > > >>>> with > > > >>>>>>>>>>>>>> the least offset. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Would it not be desired to drop all duplicates > > > >>>> independent > > > >>>>>>>>>>>>>>> of their ts, as long as we find a record in the store? > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> This is actually related to the (suggested) windowed > > > >>>> nature > > > >>>>>>>>>>>>>> of deduplication. As in 102. we don't want to do a > > > >>>> "forever" > > > >>>>>>>>>>>>>> deduplication, which may be impossible for huge > workloads > > > >>>>>>>>>>>>>> where all records should be kept in the store. Hence, > the > > > >>>>>> fetch > > > >>>>>>>>>>>>>> of timestamp between [ts-deduplicationInterval, > > > >>>>>>>>>>>> ts+deduplicationInterval] > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> About late records, this is again due to the windowed > > > >>>>> nature. > > > >>>>>>>>>>>>>> Because the store won't save those late (i.e. expired) > > > >>>>>> records, > > > >>>>>>>>>>>>>> we have two options. Either, we do not apply > deduplication > > > >>>>>>>>>>>>>> on them, thus the deduplication doesn't work on late > > > >>>>> records, > > > >>>>>>>>>>>>>> or we discard them (which is the option I suggest). > > > >>>>>>>>>>>>>> In the second case, It would be up to the user to choose > > > >>>>>>>>>>>>>> any deduplicationInterval that may sufficiently cover > all > > > >>>>> his > > > >>>>>>> late > > > >>>>>>>>>> data. > > > >>>>>>>>>>>>>> What do you think ? > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>>> Ayoub > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Le mar. 4 juin 2024 à 23:58, Matthias J. Sax < > > > >>>>>> mj...@apache.org> > > > >>>>>>> a > > > >>>>>>>>>>>> écrit : > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Ayoub, > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> thanks for resurrecting this KIP. I think a built-in > > > >>>>>>>> de-duplication > > > >>>>>>>>>>>>>>> operator will be very useful. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Couple of questions: > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> 100: `deduplicationKeySelector` > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Is this the best name? It might indicate that we > select a > > > >>>>>> "key" > > > >>>>>>>> what > > > >>>>>>>>>> is > > > >>>>>>>>>>>>>>> an overloaded term... Maybe we could use `Field` or > `Id` > > > >>>> or > > > >>>>>>>>>> `Attribute` > > > >>>>>>>>>>>>>>> instead of `Key` in the name? Just brainstorming. If we > > > >>>>> think > > > >>>>>>>> `Key` > > > >>>>>>>>>> is > > > >>>>>>>>>>>>>>> the best word, I am also ok with it. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> 101: `Deduplicated` class > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> You propose to add static methods `keySerde()` and > > > >>>>>>>> `valueSerde()` -- > > > >>>>>>>>>> in > > > >>>>>>>>>>>>>>> other config classes, we use only `with(keySerde, > > > >>>>>> valueSerde)` > > > >>>>>>>> as we > > > >>>>>>>>>>>> try > > > >>>>>>>>>>>>>>> to use the "builder" pattern, and avoid too many > > > >>>>> overloads. I > > > >>>>>>>> would > > > >>>>>>>>>>>>>>> prefer to omit both methods you suggest and just use a > > > >>>>> single > > > >>>>>>>> `with` > > > >>>>>>>>>>>> for > > > >>>>>>>>>>>>>>> both serdes. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Similarly, I thing we don't want to add `with(...)` > which > > > >>>>>> takes > > > >>>>>>>> all > > > >>>>>>>>>>>>>>> parameters at once (which should only be 3 parameters, > > > >>>> not > > > >>>>> 4 > > > >>>>>> as > > > >>>>>>>> it's > > > >>>>>>>>>>>>>>> currently in the KIP)? > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> 102: Usage of `WindowedStore`: > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Would this be efficient? The physical byte layout it > > > >>>>>>>> "<dataKey,ts>" > > > >>>>>>>>>> for > > > >>>>>>>>>>>>>>> the store key, so it would be difficult to do an > > > >>>> efficient > > > >>>>>>> lookup > > > >>>>>>>>>> for a > > > >>>>>>>>>>>>>>> given "de-duplication key" to discard duplicates, as we > > > >>>>> don't > > > >>>>>>>> know > > > >>>>>>>>>> the > > > >>>>>>>>>>>>>>> timestamp of the first record with the same > > > >>>> "de-duplication > > > >>>>>>> key". > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> This boils down to the actual de-duplication logic > (some > > > >>>>> more > > > >>>>>>>>>> comments > > > >>>>>>>>>>>>>>> below), but what you propose seems to require expensive > > > >>>>>>>> range-scans > > > >>>>>>>>>>>> what > > > >>>>>>>>>>>>>>> could be cost prohibitive in practice. I think we need > to > > > >>>>>> find > > > >>>>>>> a > > > >>>>>>>> way > > > >>>>>>>>>> to > > > >>>>>>>>>>>>>>> use efficient key-point-lookups to make this work. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> 103: "Processing logic": > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Might need some updates (Cf 102 comment). I am not sure > > > >>>> if > > > >>>>> I > > > >>>>>>>> fully > > > >>>>>>>>>>>>>>> understand the logic: cf 105 below. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> 104: > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> If no entries found → forward the record + save the > > > >>>> record > > > >>>>>> in > > > >>>>>>>> the > > > >>>>>>>>>>>> store > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> This part is critical, and we should discuss in detail. > > > >>>> In > > > >>>>>> the > > > >>>>>>>> end, > > > >>>>>>>>>>>>>>> de-duplication does only make sense when EOS is used, > and > > > >>>>> we > > > >>>>>>>> might > > > >>>>>>>>>> want > > > >>>>>>>>>>>>>>> to call this out (eg, on the JavaDocs)? But if used > with > > > >>>>>> ALOS, > > > >>>>>>>> it's > > > >>>>>>>>>>>> very > > > >>>>>>>>>>>>>>> difficult to ensure that we never lose data... Your > > > >>>>> proposal > > > >>>>>> to > > > >>>>>>>>>>>>>>> first-forward goes into the right direction, but does > not > > > >>>>>>> really > > > >>>>>>>>>> solve > > > >>>>>>>>>>>>>>> the problem entirely: > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Even if we forward the message first, all downstream > > > >>>>>> processing > > > >>>>>>>>>>>> happens, > > > >>>>>>>>>>>>>>> `context.forward()` returns and we update the state > > > >>>> store, > > > >>>>> we > > > >>>>>>>> could > > > >>>>>>>>>> now > > > >>>>>>>>>>>>>>> crash w/o committing offsets. For this case, we have no > > > >>>>>>> guarantee > > > >>>>>>>>>> that > > > >>>>>>>>>>>>>>> the result records where published (as we did not flush > > > >>>> the > > > >>>>>>>> producer > > > >>>>>>>>>>>>>>> yet), but when re-reading from the input topic, we > would > > > >>>>> find > > > >>>>>>> the > > > >>>>>>>>>>>> record > > > >>>>>>>>>>>>>>> in the store and incorrectly drop as duplicate... > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> I think the only solution to make this work would be to > > > >>>> use > > > >>>>>>>> TX-state > > > >>>>>>>>>>>>>>> stores in combination with ALOS as proposed via > KIP-892? > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Using an in-memory store won't help much either? The > > > >>>>> producer > > > >>>>>>>> could > > > >>>>>>>>>>>> have > > > >>>>>>>>>>>>>>> send the write into the changelog topic, but not into > the > > > >>>>>>> result > > > >>>>>>>>>> topic, > > > >>>>>>>>>>>>>>> and thus we could still not guarantee ALOS...? > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> How do we want to go about this? We could also say, > this > > > >>>>> new > > > >>>>>>>> operator > > > >>>>>>>>>>>>>>> only works with EOS. Would this be too restrictive? -- > At > > > >>>>>> lest > > > >>>>>>>> for > > > >>>>>>>>>>>> know, > > > >>>>>>>>>>>>>>> until KIP-892 lands, and we could relax it? > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> 105: "How to detect late records" > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> In the end, it seems to boil down to determine which of > > > >>>> the > > > >>>>>>>> records > > > >>>>>>>>>> to > > > >>>>>>>>>>>>>>> forward and which record to drop, for (1) the regular > > > >>>> case > > > >>>>>> and > > > >>>>>>>> (2) > > > >>>>>>>>>> the > > > >>>>>>>>>>>>>>> out-of-order data case. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Regular case (no out-of-order data): For this case, > > > >>>> offset > > > >>>>>> and > > > >>>>>>> ts > > > >>>>>>>>>> order > > > >>>>>>>>>>>>>>> is the same, and we can forward the first record we > get. > > > >>>>> All > > > >>>>>>>> later > > > >>>>>>>>>>>>>>> record within "de-duplication period" with the same > > > >>>>>>>> "de-duplication > > > >>>>>>>>>>>> key" > > > >>>>>>>>>>>>>>> would be dropped. If a record with the same > > > >>>> "de-duplication > > > >>>>>>> key" > > > >>>>>>>>>>>> arrives > > > >>>>>>>>>>>>>>> after "de-duplication period" passed, we cannot drop it > > > >>>> any > > > >>>>>>>> longer, > > > >>>>>>>>>> but > > > >>>>>>>>>>>>>>> would still forward it, as by the contract of the > > > >>>> operator > > > >>>>> / > > > >>>>>>>>>>>>>>> de-duplication period. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> For the out-of-order case: The first question we need > to > > > >>>>>> answer > > > >>>>>>>> is, > > > >>>>>>>>>> do > > > >>>>>>>>>>>>>>> we want to forward the record with the smallest offset > or > > > >>>>> the > > > >>>>>>>> record > > > >>>>>>>>>>>>>>> with the smallest ts? Logically, forwarding with the > > > >>>>> smallest > > > >>>>>>> ts > > > >>>>>>>>>> might > > > >>>>>>>>>>>>>>> be "more correct", however, it implies we could only > > > >>>>> forward > > > >>>>>> it > > > >>>>>>>> after > > > >>>>>>>>>>>>>>> "de-duplication period" passed, what might be undesired > > > >>>>>>> latency? > > > >>>>>>>>>> Would > > > >>>>>>>>>>>>>>> this be desired/acceptable? > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> In contrast, if we forward record with the smallest > > > >>>> offset > > > >>>>>>> (this > > > >>>>>>>> is > > > >>>>>>>>>>>> what > > > >>>>>>>>>>>>>>> you seem to propose) we don't have a latency issue, but > > > >>>> of > > > >>>>>>>> course the > > > >>>>>>>>>>>>>>> question what records to drop is more tricky to answer: > > > >>>> it > > > >>>>>>> seems > > > >>>>>>>> you > > > >>>>>>>>>>>>>>> propose to compare the time difference of the stored > > > >>>> record > > > >>>>>> to > > > >>>>>>>> the > > > >>>>>>>>>>>>>>> current record, but I am wondering why? Would it not be > > > >>>>>> desired > > > >>>>>>>> to > > > >>>>>>>>>> drop > > > >>>>>>>>>>>>>>> all duplicates independent of their ts, as long as we > > > >>>> find > > > >>>>> a > > > >>>>>>>> record > > > >>>>>>>>>> in > > > >>>>>>>>>>>>>>> the store? Would be good to get some more motivation > and > > > >>>>>>>> tradeoffs > > > >>>>>>>>>>>>>>> discussed about the different strategies we could use. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> You also propose to drop _any_ late record: I am also > not > > > >>>>>> sure > > > >>>>>>> if > > > >>>>>>>>>>>> that's > > > >>>>>>>>>>>>>>> desired? Could this not lead to data loss? Assume we > get > > > >>>> a > > > >>>>>> late > > > >>>>>>>>>> record, > > > >>>>>>>>>>>>>>> but in fact there was never a duplicate? Why would we > > > >>>> want > > > >>>>> to > > > >>>>>>>> drop > > > >>>>>>>>>> it? > > > >>>>>>>>>>>>>>> If there is a late record which is indeed a duplicate, > > > >>>> but > > > >>>>> we > > > >>>>>>>> purged > > > >>>>>>>>>>>> the > > > >>>>>>>>>>>>>>> original record from the store already, it seems to be > > > >>>> the > > > >>>>>> same > > > >>>>>>>> case > > > >>>>>>>>>> as > > > >>>>>>>>>>>>>>> for the "no out-of-order case": after we purged we > cannot > > > >>>>>>>>>> de-duplicate > > > >>>>>>>>>>>>>>> and thus it's a regular case we can just accept? > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> -Matthias > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> On 5/29/24 4:58 AM, Ayoub Omari wrote: > > > >>>>>>>>>>>>>>>> Hi everyone, > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> I've just made a (small) change to this KIP about an > > > >>>>>>>> implementation > > > >>>>>>>>>>>>>>> detail. > > > >>>>>>>>>>>>>>>> Please let me know your thoughts. > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Thank you, > > > >>>>>>>>>>>>>>>> Ayoub > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Le lun. 20 mai 2024 à 21:13, Ayoub < > > > >>>>> ayoubomar...@gmail.com> > > > >>>>>> a > > > >>>>>>>>>> écrit : > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Hello, > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Following a discussion on community slack channel, I > > > >>>>> would > > > >>>>>>>> like to > > > >>>>>>>>>>>>>>> revive > > > >>>>>>>>>>>>>>>>> the discussion on the KIP-655, which is about adding > a > > > >>>>>>>>>> deduplication > > > >>>>>>>>>>>>>>>>> processor in kafka-streams. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>> >