Hi Ayoub, I see that there was already a very long discussion thread, I won't start nitpicking names here. I think we are ready to move on to a vote. The agreed upon solutioins sound very reasonable to me, I especially like how you handled the ALOS case. Not deduplicating late records is a defensive choice - it's the lesser evil, even if it can lead to surprises for the user (data duplication surprises are certainly better than data loss surprises). Let's just make sure that, we write a long description of all those subtleties in the documentation, and not ask people to find it in the KIP.
My main point is that I'd indeed vote to remove `deduplicate`. As stated before, the `deduplicate` operator does many things at once and it's easy for users to accidentally doing to much (repartitioning) by using it. It's probably a not-so-common use case which is not so hard to replicate using `deduplicateByKey`. Side note on for expiring records using punctuation - I see that this is, performance-wise, the best option we have here, so I agree going with this approach in the KIP. Just want to throw out there that in C++ rocksdb seems to have compaction filter which should give us the ability to expire records during compaction (and otherwise ignore expired records during processing) - this may be a nice long-term improvement for these kinds of things, but it would require extending the RocksDB JNI interfact to allow implementing custom CompactionFilters, IIUC. So it doesn't directly affect your KIP. Thanks for the hard work and for not giving up on this KIP! Cheers, Lucas On Tue, Nov 5, 2024 at 11:45 AM Ayoub Omari <ayoubomar...@gmail.com> wrote: > > Hi Matthias, > > Thanks for your review, and sorry for this late reply. > > (102) Ok, I keep it open here. > > > > (104) > > Well, but if we say punctuation records are "inherently distinct", > > there is nothing to be de-duplicated? > > We still need to deduplicate punctuation records even if they are distinct. > A functional deduplication acts essentially on distinct records having the > same functional id. It's the same as for input records. The records are > distinct, > but we keep only one from those having the same deduplication id. > > > > (105) > > even if they never use a smaller interval, because in the end, > > the question is always which ms is included and which one is excluded. > > Agreed, the KIP now specified that "Deduplication interval ends are > inclusive." > > > > > Why do you point out that `This is a duplicate of a2` -- why does it > matter? > > We did drop a2 on the floor already? > > The reason why we forward a3, is because it is outside of a1's > > deduplicationInterval, right? The fact that we did drop a2 on the floor, > > seems irrelevant? > > Right, fixed in the KIP. > > > Thanks for your opinions on the semantics ! If no points are raised in the > next > few days, I will start a voting thread. > We can always continue discussion in this thread after that :) > > > Best, > Ayoub > > > Le ven. 4 oct. 2024, 02:37, Matthias J. Sax <mj...@apache.org> a écrit : > > > Thanks Ayoub! > > > > (102) let's hear what others think. > > > > (103) Keeping it internal if fine with me. Was just curious. > > > > (104) > > > > > The offset checking was specifically added for the case of reprocessing > > > the same input record. > > > > Yes. > > > > > > > Punctuation records are generated by the application. And I assume > > > we can consider that any two punctuated records are inherently distinct ? > > > > I agree. And if they are "inherently distinct" we should never > > de-duplicate them, right? > > > > > > > Also, for me, we should apply deduplication on punctuated records, > > > because if the user defines a punctuator before calling deduplication, > > > it means they want to deduplicate them as well, otherwise they > > > could define it later in the pipeline. > > > > Well, but if we say punctuation records are "inherently distinct", there > > is nothing to be de-duplicated? > > > > > > > > (105) > > > > > However, I don't see practical use cases of a tiny deduplication > > interval. > > > > I agree, but that's not the point. We need to clearly define it, to > > avoid "off by one" errors and to be able to implement it (and write > > bound tests), and give users a change to reason about it, even if they > > never use a smaller interval, because in the end, the question is always > > which ms is included and which one is excluded. > > > > Assume one has a window of 1 hour, and get a record with Y ms first, and > > later Y + 1hour -- do we forward or drop the second record is an > > question we need to answer. We cannot leave this question open :) > > > > > > > > For the "how to handle later records", your example make sense to me, > > and I think it's reasonable semantics. Again: to me it's just important > > that the KIP is very explicit with such edge cases and that it does not > > leave any room for interpretation. > > > > > > From the KIP: > > > > > event a3 at t+11s → This is a duplicate of a2 (i.e. within the > > deduplicationInterval) which has not been forwarded, we will forward it as > > if there was never an event a2 > > > > Why do you point out that `This is a duplicate of a2` -- why does it > > matter? We did drop a2 on the floor already? > > > > The reason why we forward a3, is because it is outside of a1's > > deduplicationInterval, right? The fact that we did drop a2 on the floor, > > seems irrelevant? > > > > > > From the KIP: > > > > > The same thing applies for deduplicating out-of-order records: > > > > > > event a1 @t=15s → Forwarded > > > event a2 @t=5s → Dropped > > > event a3 @t=4s → Forwarded > > > > This is interesting. So you propose to also apply the > > "deduplicationInterval" into the past. I am ok with this. Just want to > > point it out, as it's an important semantic piece. > > > > Same for the examples with stream-time and different keys and state > > content. Love these examples. Makes it very clear what's going on. I am > > personally happy with these semantics (I guess I would also be happy if > > we would choose different semantics thought). > > > > > > > > > > -Matthias > > > > > > > > > > > > On 9/4/24 7:49 AM, Ayoub Omari wrote: > > > Hi Matthias, > > > > > > Thanks for your comments ! > > > > > > (100) > > > Yes, there will be repartitioning if a key-changing operation > > > was used before. Updated java docs. > > > > > > > > > (101) > > > Yes, KIP updated. > > > > > > > > > (102) > > > My idea was that a user may think of `.deduplicate()` as > > > a black-box which only rejects records, without changing > > > the stream's structure. > > > > > > I am also concerned by the auto-back-repartitioning. > > > If it is fine not to provide the two apis (like in groupByKey vs > > groupBy), > > > I agree to remove it. > > > > > > > > > (103) > > > I think the store should be kept internal as it will have > > > some internal information. For example, for the eviction algorithm > > > we could have a time-index which is part of the same state store > > > (like in DualSchema stores where the first bit of each entry > > > determines if the entry belongs to the base or to the index store). > > > > > > Also, it should hold the offset and timestamp of each record which > > > are technical information that an application may not need to know. > > > WDYT ? > > > > > > > > > (104) > > > > > > The offset checking was specifically added for the case of reprocessing > > > > > > the same input record. > > > > > > > > > Punctuation records are generated by the application. And I assume > > > > > > we can consider that any two punctuated records are inherently distinct ? > > > > > > > > > Also, for me, we should apply deduplication on punctuated records, > > > > > > because if the user defines a punctuator before calling deduplication, > > > > > > it means they want to deduplicate them as well, otherwise they > > > > > > could define it later in the pipeline. > > > > > > > > > > > > > > > (105) > > > > > > Thanks for these points ! I added more details and examples to the KIP. > > > > > > > > > Some of the examples you mentioned were buried in the text, I added > > > > > > some highlights and examples to make the semantics clearer. > > > > > > > > > About the lower bound of deduplicationInterval, I think it's more > > intuitive > > > > > > to make the interval ends inclusive, which gives a lower bound of 0. > > > > > > However, I don't see practical use cases of a tiny deduplication > > interval. > > > > > > Ideally, the id chosen by the user to be a deduplication id should be > > unique > > > > > > (at least for an extended period of time). If deduplication interval is > > > chosen > > > > > > to be some milliseconds, then I suppose it's more rate limiting than > > > > > > deduplication ? > > > > > > > > > The current semantics are more adapted to a longer > > > > > > dedup-interval. Small dedup-intervals make it more > > > > > > likely that an out-of-order record is a late record, in which case > > > > > > records are most likely to be forwarded without dedup. > > > > > > > > > As an example, if we consider a case of de-dup interval=0, > > > > > > and following events: > > > > > > K1 @ t=10ms > > > > > > K2 @ t=11ms > > > > > > K1 @ t=10ms > > > > > > > > > Then the current semantics would detect the second event K1 as late, > > > > > > and would forward it. > > > > > > (Note: the first event here may still exist in the store > > > > > > if eviction is not yet triggered, however it is an expired entry. > > > > > > In order to have a consistent general behavior we would simulate that all > > > entries > > > > > > should be expired once dedup-interval is elapsed, if we find an entry > > > > > > in the store which is already expired, we ignore it) > > > > > > > > > I am open to further discussion on this. > > > > > > > > > > > > > > > If we agree on these semantics, I will add a description of the > > processor's > > > > > > behavior on late records in java docs, which would make it clearer > > > > > > that it's more suitable to have longer dedup-intervals. > > > > > > > > > > > > Best, > > > > > > Ayoub > > > > > > > > > > > > > > > Le sam. 31 août 2024 à 03:50, Matthias J. Sax <mj...@apache.org> a > > écrit : > > > > > >> Thanks for updating the KIP. > > >> > > >> A few comments/question: > > >> > > >> > > >> > > >> (100): For `deduplicateByKey(...)` the input stream might still be > > >> repartitioned if the key was changed upstream, right (similar to how > > >> `groupByKey()` and `join()` works)? > > >> > > >> builder.stream(...).deduplicateByKey(); // no repartitioing > > >> buidler.stream(...).map(...).deduplicateByKey(); // repartitioning > > >> > > >> If my understanding is correct, it should be reflected in the JavaDocs. > > >> > > >> > > >> > > >> (101): I guess the same as (100) applies to > > `deduplicateByKeyValue(...)`? > > >> > > >> > > >> > > >> (102): I am not 100% sure if `deduplicate()` is the best API, and if we > > >> even need it? It seems we could express `deduplicate()` also as: > > >> > > >> stream.map(...).deduplicateByKey(...).map(...).repartition(); > > >> > > >> I am also not sure, how common it is to de-duplicate on a value field, > > >> _and_ expect/want the result stream to be still partitioned on the > > >> original key? I could also imagine that the semantics of `deduplicate()` > > >> would actually be the same as (and this might be more intuitive for > > >> users?): > > >> > > >> stream.map(...).deduplicateByKey(...) > > >> > > >> Ie, there won't be no automatic "back partitioning" on the original key. > > >> Is there really demand for auto-back-partitioning? > > >> > > >> Atm, I tend to think that `deduplicate()` might not be required to get > > >> added at all, as it's only syntactic sugar anyway (no matter which > > >> semantics, w/ or w/o auto-back-partitioning we pick). [And if there is > > >> user demand down the road, it seems simple enough to add it later.] > > >> > > >> > > >> > > >> (103) `Deduplicated`: Right now it only takes a name for the state > > >> store, what mean, it's not possible to plug-in a different store. Is > > >> this intentional, or should we allow to pass in `DslStoreSuppliers` > > >> and/or `KeyValueBytesStoreSupplier`? We could also defer this for a > > >> follow up KIP. > > >> > > >> > > >> > > >> (104) Punctuation: > > >> > > >>> For punctuated records - whose offset value is null - deduplication is > > >> always performed. > > >> > > >> Why are we doing it this way? We could also say, we never de-duplicate > > >> if there is no offset (similar to null-key and/or null-id). (Or maybe > > >> even make it configurable -- of course, we can defer a config also for a > > >> follow up KIP in case there is demand.) > > >> > > >> In general, keeping stuff and letting users add additional filtering is > > >> the better default. If we filter, users have no way to "resurrect" the > > >> data, but if we keep it, adding an additional filter is possible. > > >> (Similar to what is propose for late record -- we forward, not drop). > > >> > > >> > > >> > > >> (105) Deduplication logic: > > >> > > >> I am not 100% sure if I understand the proposed logic with regard to > > >> time-semantics and out-of-order data. > > >> > > >> Assume there is no out-of-order data (de-dup interval 10 sec): > > >> > > >> > > >> Example A: > > >> > > >> k1@20 -> forwarded and inserted into store -> stream-time=20 > > >> k1@25 -> dropped as duplicate stream-time=25 > > >> k1@35 -> forwarded and inserted into store -> stream-time=35 (we did > > >> discard k1@20 already and thus k1@35 is not considered a duplicate) > > >> > > >> This example is clear, however, we still need to define the exact > > >> cut-off point, ie, what is the first record which is not dropped any > > >> longer? k1@29 is clearly dropped, and k1@31 would clearly be forwarded; > > >> I am not sure about k1@30 -> would it purge k1@20 already and thus be > > >> forwarded, or would it be dropped? > > >> > > >> The question boils down to if de-dup-interval of 0 is allowed (ie, if we > > >> define the upper-bound of the de-duplication interval as inclusive or > > >> exclusive)? If we allow interval size of zero (mean upper bound is > > >> inclusive), it means we only de-duplicate with the same timestamp for > > >> interval 0: > > >> > > >> Example B: > > >> > > >> k1@20 -> forwarded and inserted into store -> stream-time=20 > > >> k1@20 -> dropped as duplicate stream-time=20 > > >> k1@21 -> forwarded and inserted into store -> stream-time=21 (we did > > >> discard k1@20 already and thus k1@21 is not considered a duplicate) > > >> > > >> For this case, k1@30 in Example A above would be dropped. > > >> > > >> However, if we think interval zero should not a valid interval, and 1 is > > >> the allowed minimum (and thus, the upper bound is exclusive), interval 1 > > >> says to de-duplicate with the same timestamp: > > >> > > >> Example C (same as Example B, even if interval is 1 now, due to > > >> different interval bound semantics): > > >> > > >> k1@20 -> forwarded and inserted into store -> stream-time=20 > > >> k1@20 -> dropped as duplicate stream-time=20 > > >> k1@21 -> forwarded and inserted into store -> stream-time=21 (we did > > >> discard k1@20 already and thus k1@21 is not considered a duplicate) > > >> > > >> For this case, k1@30 in Example A above would be forwarded. > > >> > > >> > > >> > > >> > > >> > > >> What about out-of-order (same key): > > >> > > >> k1@20 -> forwarded and inserted into store -> stream-time=20 > > >> k1@9 -> ? > > >> I think we should forward; time diff is larger than de-dup interval > > >> Also: record is "late" with regard to stream-time, so we should > > >> forward as "late" > > >> > > >> k1@11 -> ? > > >> we could de-dup; we have k1@20 in the store > > >> ts is within de-dup-interval > > >> ts is not older than stream-time minus de-dup interval either (ie, > > >> not "late") > > >> > > >> k1@25 -> dropped as duplicate stream-time=25 > > >> k1@23 -> dropped as duplicate stream-time=25 > > >> > > >> k1@11 -> ? > > >> we could de-dup; we have k1@20 in the store > > >> ts is within de-dup-interval > > >> ts IS OLDER than stream-time minus de-dup interval though, we could > > >> also forward as "late" > > >> > > >> > > >> > > >> > > >> What about different keys? What about different keys in combination with > > >> "late" records? Overall, I could not implement the logic base on the KIP > > >> as there is too many unspecified cases. > > >> > > >> > > >> > > >> -Matthias > > >> > > >> > > >> > > >> > > >> On 8/30/24 2:08 AM, Ayoub Omari wrote: > > >>> Hi Bill, > > >>> > > >>> Thanks for your answer. > > >>> > > >>>> Can you update the KIP to state that deduplication by value will > > result > > >> in > > >>>> auto-repartiting by Kafka Streams > > >>> > > >>> Updated the KIP to the latest proposal. > > >>> > > >>>> For option 3, what about `deduplicateByKeyValue`? (If during the PR > > >> phase > > >>>> we stumble across a better name we can update the KIP then too). > > >>> > > >>> Yes, better than the first suggested name. Reflected in the KIP. > > >>> > > >>> I also added a note about records with null deduplication key. > > >>> > > >>> If no new points are raised in the next few days, I will start a VOTE > > >>> thread. > > >>> > > >>> Thanks, > > >>> Ayoub > > >>> > > >>> > > >>> > > >>> > > >>> Le mar. 27 août 2024 à 17:47, Bill Bejeck <bbej...@gmail.com> a écrit > > : > > >>> > > >>>> Hi Ayoub, > > >>>> > > >>>> Thanks for the update. > > >>>> > > >>>> Your latest proposal looks good to me, but I have a couple of > > comments. > > >>>> > > >>>> in (2) we repartition by the id field > > >>>>> > > >>>> > > >>>> I agree with this approach, but the KIP still states "No > > repartitioning > > >> is > > >>>> done by this processor. In case a user wants to deduplicate an id over > > >>>> multiple partitions, they should first repartition the topic on this > > >> id." > > >>>> Can you update the KIP to state that deduplication by value will > > result > > >> in > > >>>> auto-repartiting by Kafka Streams, unless the developer adds a manual > > >>>> repartitioning operator themselves. > > >>>> > > >>>> For option 3, what about `deduplicateByKeyValue`? (If during the PR > > >> phase > > >>>> we stumble across a better name we can update the KIP then too). > > >>>> > > >>>> This latest update has been open for a while now, I'd say it's good to > > >>>> start a VOTE thread in 1-2 days. > > >>>> > > >>>> Thanks, > > >>>> Bill > > >>>> > > >>>> On Mon, Aug 26, 2024 at 3:15 AM Ayoub Omari <ayoubomar...@gmail.com> > > >>>> wrote: > > >>>> > > >>>>> Hi Bill, > > >>>>> > > >>>>> I think all the points raised are now handled in the KIP. > > >>>>> We just still have the 106) on which we should have an > > >>>>> agreement. I gave a proposition in my last mail, and > > >>>>> I am waiting for others' opinions. > > >>>>> > > >>>>> My proposition is to have 3 methods of deduplication: > > >>>>> 1) `deduplicateByKey()` > > >>>>> 2) `deduplicate((k, v) -> v.id)` > > >>>>> 3) `deduplicateByKeyAndId((k, v) -> v.id)` (Name to be chosen) > > >>>>> > > >>>>> The point of having both (2) and (3) is that in (2) we repartition > > >>>>> by the id field, Whereas in (3) we are not repartitioning. > > >>>>> > > >>>>> (3) is needed in case the current partitioning ensures that a same > > >>>>> id will always land on the same task. Here is an example I gave > > above: > > >>>>> > > >>>>>> For example, we have a topic with keyValue (`userId`, `transaction`) > > >>>>>> and deduplication is done on `transaction`.`id` . Here, the > > >> application > > >>>>>> wants to deduplicate transactions. It knows that a transaction id > > >>>>>> maps to a single userId. Any duplicate of that record would be > > >> received > > >>>>>> by the task which processes this userId. > > >>>>> > > >>>>> Let me know what you think. > > >>>>> > > >>>>> Thanks, > > >>>>> Ayoub > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> Le jeu. 22 août 2024 à 21:37, Bill Bejeck <bbej...@apache.org> a > > >> écrit : > > >>>>> > > >>>>>> Hi Ayoub, > > >>>>>> > > >>>>>> What's the status of this KIP? > > >>>>>> > > >>>>>> Regards, > > >>>>>> Bill > > >>>>>> > > >>>>>> > > >>>>>> On Sat, Jul 13, 2024 at 5:13 AM Ayoub Omari <ayoubomar...@gmail.com > > > > > >>>>>> wrote: > > >>>>>> > > >>>>>>> Hi Bill, > > >>>>>>> > > >>>>>>> Thanks for your comments ! > > >>>>>>> > > >>>>>>> I am prefixing your points with B. > > >>>>>>> > > >>>>>>> B1. > > >>>>>>> That's a good catch. I think punctuated records aren't concerned by > > >>>>>>> this problem of reprocessing because they are not issued twice. > > >>>>>>> We will still need to make a difference between a punctuated > > >>>>>>> and a source record. The logic becomes: > > >>>>>>> > > >>>>>>>> if record.offset is not null && record.offset = offset_stored: > > >>>>>>> > > >>>>>>> forward(record) > > >>>>>>> > > >>>>>>> else: > > >>>>>>> > > >>>>>>> // do_not_forward > > >>>>>>> > > >>>>>>> > > >>>>>>> I will update the KIP on this point. > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> B2. (i.e. 106) > > >>>>>>> That's the point discussed in 106. The main idea is to avoid > > >>>>>> repartitioning > > >>>>>>> when we don't have to. But let's say we want to repartition when > > >>>>>>> deduplication is on an id that is not the key. > > >>>>>>> > > >>>>>>> The user code may look something like this: > > >>>>>>> ```kstream > > >>>>>>> .deduplicate((k, v) -> v.id) > > >>>>>>> .map((k, v) -> ...) > > >>>>>>> ``` > > >>>>>>> The user expects that the key in the map processor which follows > > the > > >>>>>>> deduplicate is the same as the initial key. > > >>>>>>> > > >>>>>>> Internally, if we decide to repartition the deduplication processor > > >>>> may > > >>>>>> do > > >>>>>>> the following: > > >>>>>>> ```kstream > > >>>>>>> .map((k, v) -> (v.id, ...)) // For repartitioning on id > > >>>>>>> .deduplicateByKey() > > >>>>>>> .map((id, v) -> (initialKey, ...)) // To recover the initial > > >> key > > >>>>>>> ``` > > >>>>>>> > > >>>>>>> In this case, there is some internal complexity of this processor: > > >>>>>>> I) We should repartition twice, before and after > > deduplication. > > >>>> The > > >>>>>>> second > > >>>>>>> map triggers repartitioning if there are following stateful > > >>>> processors > > >>>>>>> in the pipeline. > > >>>>>>> > > >>>>>>> II.) The initial key should be kept somewhere. One way is to > > >> wrap > > >>>>> the > > >>>>>>> value > > >>>>>>> to add the key, and then unwrap it after the `deduplicateByKey` > > >>>>>>> > > >>>>>>> As mentioned in one use case above (deduplicating transactions - > > >>>>> written > > >>>>>> at > > >>>>>>> least > > >>>>>>> once - by id), repartition is not needed for that case. This added > > >>>>>>> complexity won't > > >>>>>>> be useful. > > >>>>>>> > > >>>>>>> *TLDR,* I think one solution is to have a separate api to cover > > such > > >>>> a > > >>>>>> use > > >>>>>>> case. In total, > > >>>>>>> we can have 3 apis of deduplication > > >>>>>>> > > >>>>>>> 1) `deduplicateByKey()` *// No repartitioning* > > >>>>>>> 2) `deduplicateByKey((k, v) -> v.id)` *or > > >>>> *`deduplicateByKeyAndId((k, > > >>>>>> v) > > >>>>>>> -> v.id)` *// No repartitioning* > > >>>>>>> 3) `deduplicate((k, v) -> v.id)` *// causes repartitioning* > > >>>>>>> > > >>>>>>> The second api is equivalent to deduplicate by the couple (key, > > id). > > >>>>>>> No repartitioning is required because the records with the same key > > >>>> are > > >>>>>>> already > > >>>>>>> together in the same partition. cc @Matthias @Sebastien. > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> B3. > > >>>>>>> I think here we can do what some existing processors do for > > periodic > > >>>>>> work, > > >>>>>>> we can track the last cleanup time and the current stream time, > > >>>>>>> if the delta exceeds the deduplication period, we trigger the > > >>>> cleanup. > > >>>>>>> > > >>>>>>> > > >>>>>>> Best, > > >>>>>>> Ayoub > > >>>>>>> > > >>>>>>> Le jeu. 11 juil. 2024 à 01:17, Bill Bejeck <bbej...@apache.org> a > > >>>>> écrit > > >>>>>> : > > >>>>>>> > > >>>>>>>> Hi All, > > >>>>>>>> > > >>>>>>>> Thanks Ayoub for getting this KIP going again, I think a > > >>>>> deduplication > > >>>>>>>> operator will be very useful. My applogies for being late to the > > >>>>>>>> discussion. > > >>>>>>>> > > >>>>>>>> Overall - I agree with the direction of the KIP but I have a > > couple > > >>>>> of > > >>>>>>>> questions. > > >>>>>>>> > > >>>>>>>> 1. Regarding using offsets for tracking the first arrival. I > > >>>> think > > >>>>>>>> there could be a case when offsets would not be available, when > > >>>>> records > > >>>>>>> are > > >>>>>>>> forwarded by punctuation for example. > > >>>>>>>> > > >>>>>>>> 2. I'm not sure about using something other than the key for > > >>>>>> identifying > > >>>>>>>> dupllicate records. By doing so, one could end up missing a > > >>>>>>> de-duplication > > >>>>>>>> due to records with the same id characteristic in the value but > > >>>>> having > > >>>>>>>> different keys so the records land on different partitions. I > > >>>> guess > > >>>>> we > > >>>>>>>> could enforce a repartition if one choses to use a de-duplication > > >>>> id > > >>>>>>> other > > >>>>>>>> than the key. > > >>>>>>>> > > >>>>>>>> 3. I think a punctuation scheduled to run at the de-duplication > > >>>>> period > > >>>>>> to > > >>>>>>>> clean out older records would be a clean approach for purging > > older > > >>>>>>>> records. I'm not taking a hard stance on this approach and I'm > > >>>>> willing > > >>>>>>> to > > >>>>>>>> discuss different methods. > > >>>>>>>> > > >>>>>>>> Thanks, > > >>>>>>>> > > >>>>>>>> Bill > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On 2024/06/25 18:44:06 Ayoub Omari wrote: > > >>>>>>>>> Hi Matthias, > > >>>>>>>>> > > >>>>>>>>> Here are my updates on your points. > > >>>>>>>>> > > >>>>>>>>> 101. > > >>>>>>>>>> You propose to add static methods `keySerde()` and > > >>>> `valueSerde()` > > >>>>>> -- > > >>>>>>>>>> in other config classes, we use only `with(keySerde, > > >>>> valueSerde)` > > >>>>>> as > > >>>>>>> we > > >>>>>>>>> try > > >>>>>>>>>> to use the "builder" pattern, and avoid too many overloads. I > > >>>>> would > > >>>>>>>>>> prefer to omit both methods you suggest and just use a single > > >>>>>> `with` > > >>>>>>>> for > > >>>>>>>>>> both serdes. > > >>>>>>>>> > > >>>>>>>>> I was actually inspired by the other config classes, for example > > >>>>>>> `Joined` > > >>>>>>>>> and > > >>>>>>>>> `Grouped` both have the static methods `keySerde()` and > > >>>>>> `valueSerde()`. > > >>>>>>>>> > > >>>>>>>>>> I think we don't want to add `with(...)` which takes all > > >>>>>>>>>> parameters at once > > >>>>>>>>> > > >>>>>>>>> Done. > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> 102. > > >>>>>>>>> Thanks, your suggestion sounds good to me. The trade-off of > > >>>> having > > >>>>> an > > >>>>>>>> index > > >>>>>>>>> that allows to efficiently purge expired records besides the > > >>>>> keyValue > > >>>>>>>> store > > >>>>>>>>> makes sense. I've been looking into the code, and I think a > > >>>> similar > > >>>>>>> idea > > >>>>>>>>> was implemented for other processors (for example with > > >>>>>>>>> DualSchemaRocksDBSegmentedBytesStore). > > >>>>>>>>> As you said, I think we would benefit from some existing code > > >>>> here. > > >>>>>>>>> KIP updated ! > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> 104. > > >>>>>>>>> Updated the KIP to consider records' offsets. > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> 105 > > >>>>>>>>>> picking the first offset with smallest ts sounds good to me. > > >>>> The > > >>>>>> KIP > > >>>>>>>>>> should be explicit about it > > >>>>>>>>> > > >>>>>>>>> Done. > > >>>>>>>>> > > >>>>>>>>>> But as discussed above, it might be > > >>>>>>>>>> simplest to not really have a window lookup, but just a plain > > >>>>>>>> key-lookup > > >>>>>>>>>> and drop if the key exists in the store? > > >>>>>>>>> > > >>>>>>>>> KIP updated, we will be `.get()`ing from a keyValueStore instead > > >>>> of > > >>>>>>>>> `.fetch()`ing > > >>>>>>>>> from a WindowStore. > > >>>>>>>>> > > >>>>>>>>>> Another line of thinking, that did serve us well in the past: > > >>>> in > > >>>>>>> doubt > > >>>>>>>>>> keep a record -- users can add operators to drop record (in > > >>>> case > > >>>>>> they > > >>>>>>>>>> don't want to keep it), but if we drop a record, users have no > > >>>>> way > > >>>>>> to > > >>>>>>>>>> resurrect it (thus, building a workaround to change semantica > > >>>> is > > >>>>>>>>>> possible for users if we default to keep records, but not the > > >>>>> other > > >>>>>>> way > > >>>>>>>>>> around). > > >>>>>>>>> > > >>>>>>>>> Makes total sense ! I updated the KIP to forward late records > > >>>>> instead > > >>>>>>> of > > >>>>>>>>> dropping them. > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> 106. > > >>>>>>>>> For the moment, I highlighted in Javadocs that we are > > >>>> deduplicating > > >>>>>> by > > >>>>>>>>> partition. If there is a better name to have this information in > > >>>>> the > > >>>>>>> name > > >>>>>>>>> of the api itself it would be good. > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> Best, > > >>>>>>>>> Ayoub > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> Le jeu. 13 juin 2024 à 09:03, Sebastien Viale < > > >>>>>>>> sebastien.vi...@michelin.com> > > >>>>>>>>> a écrit : > > >>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> Hi, > > >>>>>>>>>> > > >>>>>>>>>> 106 : > > >>>>>>>>>> > > >>>>>>>>>> Thanks for the clarification. Actually, this is not what I > > >>>>>> expected, > > >>>>>>>> but I > > >>>>>>>>>> better understand the performance issues regarding the state > > >>>>> store > > >>>>>>>>>> iteration. > > >>>>>>>>>> If this is how it should be designed, it is fine for me as long > > >>>>> as > > >>>>>> it > > >>>>>>>> is > > >>>>>>>>>> clear that the repartition must be done before the > > >>>> deduplication. > > >>>>>>>>>> Sébastien > > >>>>>>>>>> > > >>>>>>>>>> ________________________________ > > >>>>>>>>>> De : Matthias J. Sax <mj...@apache.org> > > >>>>>>>>>> Envoyé : jeudi 13 juin 2024 02:51 > > >>>>>>>>>> À : dev@kafka.apache.org <dev@kafka.apache.org> > > >>>>>>>>>> Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication > > >>>> processor > > >>>>> in > > >>>>>>>>>> kafka-streams > > >>>>>>>>>> > > >>>>>>>>>> Warning External sender Do not click on any links or open any > > >>>>>>>> attachments > > >>>>>>>>>> unless you trust the sender and know the content is safe. > > >>>>>>>>>> > > >>>>>>>>>> 106: > > >>>>>>>>>> > > >>>>>>>>>>> For the use-case of deduplicating a "at least once written" > > >>>>>> stream, > > >>>>>>>>>>> we are sure that the duplicate record has the same key as the > > >>>>>>>>>>> original one, and will land on the same task. Here, a user > > >>>>> would > > >>>>>>>>>>> want to specify a deduplication key different from the > > >>>> topic's > > >>>>>> key > > >>>>>>>>>>> in case the topic's key is not a unique identifier > > >>>>>>>>>>> > > >>>>>>>>>>> For example, we have a topic with keyValue (`userId`, > > >>>>>>> `transaction`) > > >>>>>>>>>>> and deduplication is done on `transaction`.`id` . Here, the > > >>>>>>>> application > > >>>>>>>>>>> wants to deduplicate transactions. It knows that a > > >>>> transaction > > >>>>> id > > >>>>>>>>>>> maps to a single userId. Any duplicate of that record would > > >>>> be > > >>>>>>>> received > > >>>>>>>>>>> by the task which processes this userId. > > >>>>>>>>>> > > >>>>>>>>>> This is an interesting point. > > >>>>>>>>>> > > >>>>>>>>>> My concern is to some extend, that it seems (on the surface) to > > >>>>> not > > >>>>>>>>>> follow the established pattern of auto-repartitioning in the > > >>>> DSL. > > >>>>>> Of > > >>>>>>>>>> course, given that the current proposal says we use an "id > > >>>>>> extractor" > > >>>>>>>>>> and not a "key extractor" it might be ok (but it might be > > >>>>> somewhat > > >>>>>>>>>> subtle). Of course, JavaDocs always help to explain in detail. > > >>>>>> Would > > >>>>>>>>>> this be enough? > > >>>>>>>>>> > > >>>>>>>>>> Would be good to hear from others about this point. I am > > >>>>> personally > > >>>>>>> not > > >>>>>>>>>> sure which approach I would prefer personally at this point. > > >>>>>>>>>> > > >>>>>>>>>> The problem reminds me on > > >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10844< > > >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10844> which is > > >>>> not > > >>>>>>>> resolve > > >>>>>>>>>> directly either. We do have KIP-759 > > >>>>>>>>>> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-759: > > >>>>>> Unneeded > > >>>>>>>>>> repartition canceling< > > >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759: > > >>>>>> Unneeded > > >>>>>>>>>> repartition canceling>) > > >>>>>>>>>> which is WIP and helps with KAFKA-10844, but not sure if it > > >>>> would > > >>>>>> be > > >>>>>>> a > > >>>>>>>>>> viable solution for the de-duplication case? > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> -Matthias > > >>>>>>>>>> > > >>>>>>>>>> This email was screened for spam and malicious content but > > >>>>> exercise > > >>>>>>>>>> caution anyway. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On 6/11/24 2:31 PM, Ayoub Omari wrote: > > >>>>>>>>>>> Hi Sebastien & Matthias, > > >>>>>>>>>>> > > >>>>>>>>>>> For 106. > > >>>>>>>>>>> My idea was to deduplicate on a per-task basis. If the user > > >>>>> wants > > >>>>>>>>>>> to do a global deduplication over all partitions, I think > > >>>> it's > > >>>>>>>> better to > > >>>>>>>>>>> have him explicitly repartition and then call the > > >>>> deduplication > > >>>>>>>>>> processor. > > >>>>>>>>>>> > > >>>>>>>>>>> For the use-case of deduplicating a "at least once written" > > >>>>>> stream, > > >>>>>>>>>>> we are sure that the duplicate record has the same key as the > > >>>>>>>>>>> original one, and will land on the same task. Here, a user > > >>>>> would > > >>>>>>>>>>> want to specify a deduplication key different from the > > >>>> topic's > > >>>>>> key > > >>>>>>>>>>> in case the topic's key is not a unique identifier. > > >>>>>>>>>>> > > >>>>>>>>>>> For example, we have a topic with keyValue (`userId`, > > >>>>>>> `transaction`) > > >>>>>>>>>>> and deduplication is done on `transaction`.`id` . Here, the > > >>>>>>>> application > > >>>>>>>>>>> wants to deduplicate transactions. It knows that a > > >>>> transaction > > >>>>> id > > >>>>>>>>>>> maps to a single userId. Any duplicate of that record would > > >>>> be > > >>>>>>>> received > > >>>>>>>>>>> by the task which processes this userId. > > >>>>>>>>>>> > > >>>>>>>>>>> One other thought I have when writing the KIP about global > > >>>>>>>> deduplication, > > >>>>>>>>>>> is that it will require to map twice the key of the stream > > >>>>> (first > > >>>>>>>> map to > > >>>>>>>>>>> change the key to deduplication key, and second map to get > > >>>>>>>>>>> back the initial key). Second map may imply a second > > >>>>>>> repartitioning. > > >>>>>>>>>>> > > >>>>>>>>>>> However, if we do a per-task deduplication, the user may > > >>>> adapt > > >>>>> to > > >>>>>>> his > > >>>>>>>>>>> specific use-case. > > >>>>>>>>>>> > > >>>>>>>>>>> Let me know what you think > > >>>>>>>>>>> Ayoub > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> Le mar. 11 juin 2024 à 20:21, Matthias J. Sax < > > >>>>> mj...@apache.org> > > >>>>>> a > > >>>>>>>>>> écrit : > > >>>>>>>>>>> > > >>>>>>>>>>>> Thanks Sebastien, > > >>>>>>>>>>>> > > >>>>>>>>>>>> that's a good point. Thanks for raising it. -- I like your > > >>>>>>> proposal. > > >>>>>>>>>>>> > > >>>>>>>>>>>> An alternative would be to have two overloads of > > >>>>> `deduplicate()` > > >>>>>>>> one w/ > > >>>>>>>>>>>> and one w/o the "id extractor" parameter. This would be less > > >>>>>>>> explicit > > >>>>>>>>>>>> though. > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> -Matthias > > >>>>>>>>>>>> > > >>>>>>>>>>>> On 6/11/24 2:30 AM, Sebastien Viale wrote: > > >>>>>>>>>>>>> Hi, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> I am really interested in this KIP. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 106: > > >>>>>>>>>>>>> I hope I am not talking nonsense, but if you do not > > >>>>> deduplicate > > >>>>>>>> based > > >>>>>>>>>> on > > >>>>>>>>>>>> the key, the input stream has to be repartitioned. > > >>>>>>>>>>>>> Otherwise, different stream tasks may handle records that > > >>>>> need > > >>>>>> to > > >>>>>>>> be > > >>>>>>>>>>>> deduplicated, and thus duplicates will not be detected. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> This is why I would have created two different methods, as > > >>>> is > > >>>>>>> done > > >>>>>>>> for > > >>>>>>>>>>>> GroupBy: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> deduplicateByKey(...) > > >>>>>>>>>>>>> deduplicate(...) > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> If deduplicateByKey is used, the input stream does not need > > >>>>> to > > >>>>>> be > > >>>>>>>>>>>> repartitioned. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> thanks > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Sébastien > > >>>>>>>>>>>>> ________________________________ > > >>>>>>>>>>>>> De : Matthias J. Sax <mj...@apache.org> > > >>>>>>>>>>>>> Envoyé : mardi 11 juin 2024 01:54 > > >>>>>>>>>>>>> À : dev@kafka.apache.org <dev@kafka.apache.org> > > >>>>>>>>>>>>> Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication > > >>>>>> processor > > >>>>>>> in > > >>>>>>>>>>>> kafka-streams > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Warning External sender Do not click on any links or open > > >>>> any > > >>>>>>>>>>>> attachments unless you trust the sender and know the content > > >>>>> is > > >>>>>>>> safe. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Thanks for the update Ayoub. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 101: you say: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> But I am not sure if we don't want to have them for this > > >>>>>>>> processor ? > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> What is your reasoning to move off the established pattern? > > >>>>>> Would > > >>>>>>>> be > > >>>>>>>>>>>>> good to understand, why `Deduplicated` class needs a > > >>>>> different > > >>>>>>>>>>>>> "structure" compared to existing classes. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 102: Creating iterators is very expensive. For other work, > > >>>> we > > >>>>>>>> actually > > >>>>>>>>>>>>> hit 100x (?) throughput degradation by creating an (for > > >>>> most > > >>>>>>> cases > > >>>>>>>>>>>>> empty) iterator for every input record, and needed to find > > >>>>>> other > > >>>>>>>> ways > > >>>>>>>>>> to > > >>>>>>>>>>>>> avoid creating an iterator per record. It really kills > > >>>>>>> performance. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> I see the point about data expiration. We could experiment > > >>>>> with > > >>>>>>>>>>>>> punctuation to expire old data, or add a second > > >>>> "time-ordered > > >>>>>>>> store" > > >>>>>>>>>>>>> (which we already have at hand) which acts as an index into > > >>>>> the > > >>>>>>>> main > > >>>>>>>>>>>>> store. -- Another possibility would be to add a new version > > >>>>> of > > >>>>>>>>>> segmented > > >>>>>>>>>>>>> store with a different key-layout (ie, just store the plain > > >>>>>>> key). I > > >>>>>>>>>>>>> think with some refactoring, we might be able to re-use a > > >>>> lot > > >>>>>> of > > >>>>>>>>>>>>> existing code. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 104: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> This gets me wondering if this is a limitation of stateful > > >>>>>>>> processors > > >>>>>>>>>>>>>> in ALOS. For example a windowed aggregation with > > >>>>>>> `on_window_close` > > >>>>>>>>>>>>>> emit strategy may have the same limitation today (we > > >>>>> receive a > > >>>>>>>> record, > > >>>>>>>>>>>>>> we update its aggregation result in the store, then crash > > >>>>>> before > > >>>>>>>>>>>> committing, > > >>>>>>>>>>>>>> then the record will be again reconsidered for > > >>>> aggregation). > > >>>>>> Is > > >>>>>>>> this > > >>>>>>>>>>>>>> correct ? > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Yes, this is correct, but it does not violate ALOS, because > > >>>>> we > > >>>>>>> did > > >>>>>>>> not > > >>>>>>>>>>>>> lose the input record -- of course, the aggregation would > > >>>>>> contain > > >>>>>>>> the > > >>>>>>>>>>>>> input record twice (eg, over count), but this is ok under > > >>>>> ALOS. > > >>>>>>>>>>>>> Unfortunately, for de-duplication this pattern breaks, > > >>>>> because > > >>>>>>>>>>>>> de-duplication operator does a different "aggregation > > >>>> logic" > > >>>>>>>> depending > > >>>>>>>>>>>>> on its state (emit if no key found, but not emit if key > > >>>>> found). > > >>>>>>> For > > >>>>>>>>>>>>> counting as an example, we increment the count and emit > > >>>>>>>> unconditionally > > >>>>>>>>>>>>> though. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> As a workaround, I think storing the record's offset > > >>>> inside > > >>>>>> the > > >>>>>>>>>>>>>> store's value can tell us whether the record has been > > >>>>> already > > >>>>>>>> seen or > > >>>>>>>>>>>> not. > > >>>>>>>>>>>>>> If we receive a record whose deduplication id exists in > > >>>> the > > >>>>>>> store > > >>>>>>>>>>>>>> and the entry in the store has the same offset, it means > > >>>> the > > >>>>>>>> record > > >>>>>>>>>>>>>> is processed twice and we can go ahead and forward it. If > > >>>>> the > > >>>>>>>> offset > > >>>>>>>>>>>>>> is different, it means it's a duplicate record, so we > > >>>> ignore > > >>>>>> it. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Great idea. This might work... If we store the input record > > >>>>>>>> offset, we > > >>>>>>>>>>>>> can actually avoid that the "aggregation logic" changes for > > >>>>> the > > >>>>>>>> same > > >>>>>>>>>>>>> input record. -- And yes, with ALOS potentially emitting a > > >>>>>>>> duplicate is > > >>>>>>>>>>>>> the-name-of-the-game, so no concerns on this part from my > > >>>>> side. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 105: picking the first offset with smallest ts sound good > > >>>> to > > >>>>>> me. > > >>>>>>>> The > > >>>>>>>>>> KIP > > >>>>>>>>>>>>> should be explicit about it. But as discussed above, it > > >>>> might > > >>>>>> be > > >>>>>>>>>>>>> simplest to not really have a window lookup, but just a > > >>>> plain > > >>>>>>>>>> key-lookup > > >>>>>>>>>>>>> and drop if the key exists in the store? -- For late > > >>>> records, > > >>>>>> it > > >>>>>>>> might > > >>>>>>>>>>>>> imply that they are not de-duplicated, but this is also the > > >>>>>> case > > >>>>>>>> for > > >>>>>>>>>>>>> in-order records if they are further apart than the > > >>>>>>> de-duplication > > >>>>>>>>>>>>> window size, right? Thus I would believe this is "more > > >>>>> natural" > > >>>>>>>>>> compared > > >>>>>>>>>>>>> to discarding late records pro-actively, which would lead > > >>>> to > > >>>>>>>> missing > > >>>>>>>>>>>>> result records? > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> We could also make this configurable if we are not sure > > >>>> what > > >>>>>>> users > > >>>>>>>>>>>>> really need -- or add such a configuration later in case > > >>>> the > > >>>>>>>> semantics > > >>>>>>>>>>>>> we pick don't work for some users. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Another line of thinking, that did serve us well in the > > >>>> past: > > >>>>>> in > > >>>>>>>> doubt > > >>>>>>>>>>>>> keep a record -- users can add operators to drop record (in > > >>>>>> case > > >>>>>>>> they > > >>>>>>>>>>>>> don't want to keep it), but if we drop a record, users have > > >>>>> no > > >>>>>>> way > > >>>>>>>> to > > >>>>>>>>>>>>> resurrect it (thus, building a workaround to change > > >>>> semantica > > >>>>>> is > > >>>>>>>>>>>>> possible for users if we default to keep records, but not > > >>>> the > > >>>>>>>> other way > > >>>>>>>>>>>>> around). > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Would be good to get input from the broader community on > > >>>> this > > >>>>>>>> question > > >>>>>>>>>>>>> thought. In the end, it must be a use-case driven decision? > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> -Matthias > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> This email was screened for spam and malicious content but > > >>>>>>> exercise > > >>>>>>>>>>>> caution anyway. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On 6/6/24 5:02 AM, Ayoub Omari wrote: > > >>>>>>>>>>>>>> Hi Matthias, > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Thank you for your review ! > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 100. > > >>>>>>>>>>>>>> I agree. I changed the name of the parameter to > > >>>>> "idSelector". > > >>>>>>>>>>>>>> Because this id may be computed, It is better to call it > > >>>>> "id" > > >>>>>>>> rather > > >>>>>>>>>>>> than > > >>>>>>>>>>>>>> field or attribute. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 101. > > >>>>>>>>>>>>>> The reason I added the methods `keySerde()` and > > >>>>> `valueSerde()` > > >>>>>>>> was to > > >>>>>>>>>>>>>> have the same capabilities as other serde classes (such as > > >>>>>>> Grouped > > >>>>>>>>>>>>>> or Joined). As a Kafka-streams user, I usually use > > >>>>>>> `with(keySerde, > > >>>>>>>>>>>>>> valueSerde)` > > >>>>>>>>>>>>>> as you suggested. But I am not sure if we don't want to > > >>>> have > > >>>>>>> them > > >>>>>>>> for > > >>>>>>>>>>>> this > > >>>>>>>>>>>>>> processor ? > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 102. > > >>>>>>>>>>>>>> That's a good point ! Because we know that the window > > >>>> store > > >>>>>> will > > >>>>>>>>>> contain > > >>>>>>>>>>>>>> at most one instance of a given key, I am not sure how the > > >>>>>> range > > >>>>>>>> fetch > > >>>>>>>>>>>>>> on WindowStore compares to a KeyValueStore `get()` in this > > >>>>>> case. > > >>>>>>>>>>>>>> Wondering if the fact that the record's key is the prefix > > >>>> of > > >>>>>> the > > >>>>>>>>>>>> underlying > > >>>>>>>>>>>>>> keyValueStore's key ("<dataKey,ts>") may provide > > >>>> comparable > > >>>>>>>>>> performance > > >>>>>>>>>>>>>> to the random access of KeyValueStore ? Of course, the > > >>>>>>> WindowStore > > >>>>>>>>>>>> fetch() > > >>>>>>>>>>>>>> would be less efficient because it may fetch from more > > >>>> than > > >>>>> 1 > > >>>>>>>> segment, > > >>>>>>>>>>>> and > > >>>>>>>>>>>>>> because of some iterator overhead. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> The purpose of using a WindowStore is to automatically > > >>>> purge > > >>>>>> old > > >>>>>>>> data. > > >>>>>>>>>>>>>> For example, deduplicating a topic written at least once > > >>>>>>> wouldn't > > >>>>>>>>>>>> require > > >>>>>>>>>>>>>> keeping a large history. This is not the case of using a > > >>>>>>>> KeyValueStore > > >>>>>>>>>>>>>> which would require scanning regularly to remove expired > > >>>>>>> records. > > >>>>>>>>>>>>>> That might cause a sudden increase of latency whenever the > > >>>>>>> cleanup > > >>>>>>>>>>>>>> is triggered. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> It would be good to hear from anyone who has done some > > >>>>>> analysis > > >>>>>>>>>>>>>> on RocksDB's range fetch. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 103. > > >>>>>>>>>>>>>> Sure, I can update it once we agree on underlying > > >>>> semantics. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 104. > > >>>>>>>>>>>>>> Another good point ! > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> In the end, de-duplication does only make sense when EOS > > >>>> is > > >>>>>>> used > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> I agree with that. And for me, the use case of > > >>>>> deduplicating a > > >>>>>>>> topic > > >>>>>>>>>>>>>> written ALOS inside an EOS application might be the top 1 > > >>>>> use > > >>>>>>> case > > >>>>>>>>>>>>>> of deduplication. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> all downstream processing happens, `context.forward()` > > >>>>>> returns > > >>>>>>>>>>>>>>> and we update the state store, we could now crash w/o > > >>>>>>> committing > > >>>>>>>>>>>> offsets > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> This gets me wondering if this is a limitation of stateful > > >>>>>>>> processors > > >>>>>>>>>>>>>> in ALOS. For example a windowed aggregation with > > >>>>>>> `on_window_close` > > >>>>>>>>>>>>>> emit strategy may have the same limitation today (we > > >>>>> receive a > > >>>>>>>> record, > > >>>>>>>>>>>>>> we update its aggregation result in the store, then crash > > >>>>>> before > > >>>>>>>>>>>> committing, > > >>>>>>>>>>>>>> then the record will be again reconsidered for > > >>>> aggregation). > > >>>>>> Is > > >>>>>>>> this > > >>>>>>>>>>>>>> correct ? > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> As a workaround, I think storing the record's offset > > >>>> inside > > >>>>>> the > > >>>>>>>>>>>>>> store's value can tell us whether the record has been > > >>>>> already > > >>>>>>>> seen or > > >>>>>>>>>>>> not. > > >>>>>>>>>>>>>> If we receive a record whose deduplication id exists in > > >>>> the > > >>>>>>> store > > >>>>>>>>>>>>>> and the entry in the store has the same offset, it means > > >>>> the > > >>>>>>>> record > > >>>>>>>>>>>>>> is processed twice and we can go ahead and forward it. If > > >>>>> the > > >>>>>>>> offset > > >>>>>>>>>>>>>> is different, it means it's a duplicate record, so we > > >>>> ignore > > >>>>>> it. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> As you said, we don't have any guarantee whether the > > >>>> initial > > >>>>>>>> record > > >>>>>>>>>> was > > >>>>>>>>>>>>>> forwarded or not in case of a crash before commit. In this > > >>>>>>>> solution > > >>>>>>>>>>>>>> we would forward the record twice, which is against > > >>>>>>> deduplication. > > >>>>>>>>>>>>>> But, this is still an ALOS application, so it has the same > > >>>>>>>> semantics > > >>>>>>>>>>>>>> as any other such application. With this, I am not sure we > > >>>>> can > > >>>>>>>>>>>>>> have "strict" deduplication for ALOS applications. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 105. > > >>>>>>>>>>>>>> For me, if there are two duplicate records, it means they > > >>>>> are > > >>>>>>>>>>>>>> the same in the application's point of view, so it can > > >>>>> choose > > >>>>>>>>>>>>>> either one. Thus, I would go with forwarding the record > > >>>> with > > >>>>>>>>>>>>>> the least offset. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Would it not be desired to drop all duplicates > > >>>> independent > > >>>>>>>>>>>>>>> of their ts, as long as we find a record in the store? > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> This is actually related to the (suggested) windowed > > >>>> nature > > >>>>>>>>>>>>>> of deduplication. As in 102. we don't want to do a > > >>>> "forever" > > >>>>>>>>>>>>>> deduplication, which may be impossible for huge workloads > > >>>>>>>>>>>>>> where all records should be kept in the store. Hence, the > > >>>>>> fetch > > >>>>>>>>>>>>>> of timestamp between [ts-deduplicationInterval, > > >>>>>>>>>>>> ts+deduplicationInterval] > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> About late records, this is again due to the windowed > > >>>>> nature. > > >>>>>>>>>>>>>> Because the store won't save those late (i.e. expired) > > >>>>>> records, > > >>>>>>>>>>>>>> we have two options. Either, we do not apply deduplication > > >>>>>>>>>>>>>> on them, thus the deduplication doesn't work on late > > >>>>> records, > > >>>>>>>>>>>>>> or we discard them (which is the option I suggest). > > >>>>>>>>>>>>>> In the second case, It would be up to the user to choose > > >>>>>>>>>>>>>> any deduplicationInterval that may sufficiently cover all > > >>>>> his > > >>>>>>> late > > >>>>>>>>>> data. > > >>>>>>>>>>>>>> What do you think ? > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>> Ayoub > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Le mar. 4 juin 2024 à 23:58, Matthias J. Sax < > > >>>>>> mj...@apache.org> > > >>>>>>> a > > >>>>>>>>>>>> écrit : > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Ayoub, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> thanks for resurrecting this KIP. I think a built-in > > >>>>>>>> de-duplication > > >>>>>>>>>>>>>>> operator will be very useful. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Couple of questions: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 100: `deduplicationKeySelector` > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Is this the best name? It might indicate that we select a > > >>>>>> "key" > > >>>>>>>> what > > >>>>>>>>>> is > > >>>>>>>>>>>>>>> an overloaded term... Maybe we could use `Field` or `Id` > > >>>> or > > >>>>>>>>>> `Attribute` > > >>>>>>>>>>>>>>> instead of `Key` in the name? Just brainstorming. If we > > >>>>> think > > >>>>>>>> `Key` > > >>>>>>>>>> is > > >>>>>>>>>>>>>>> the best word, I am also ok with it. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 101: `Deduplicated` class > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> You propose to add static methods `keySerde()` and > > >>>>>>>> `valueSerde()` -- > > >>>>>>>>>> in > > >>>>>>>>>>>>>>> other config classes, we use only `with(keySerde, > > >>>>>> valueSerde)` > > >>>>>>>> as we > > >>>>>>>>>>>> try > > >>>>>>>>>>>>>>> to use the "builder" pattern, and avoid too many > > >>>>> overloads. I > > >>>>>>>> would > > >>>>>>>>>>>>>>> prefer to omit both methods you suggest and just use a > > >>>>> single > > >>>>>>>> `with` > > >>>>>>>>>>>> for > > >>>>>>>>>>>>>>> both serdes. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Similarly, I thing we don't want to add `with(...)` which > > >>>>>> takes > > >>>>>>>> all > > >>>>>>>>>>>>>>> parameters at once (which should only be 3 parameters, > > >>>> not > > >>>>> 4 > > >>>>>> as > > >>>>>>>> it's > > >>>>>>>>>>>>>>> currently in the KIP)? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 102: Usage of `WindowedStore`: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Would this be efficient? The physical byte layout it > > >>>>>>>> "<dataKey,ts>" > > >>>>>>>>>> for > > >>>>>>>>>>>>>>> the store key, so it would be difficult to do an > > >>>> efficient > > >>>>>>> lookup > > >>>>>>>>>> for a > > >>>>>>>>>>>>>>> given "de-duplication key" to discard duplicates, as we > > >>>>> don't > > >>>>>>>> know > > >>>>>>>>>> the > > >>>>>>>>>>>>>>> timestamp of the first record with the same > > >>>> "de-duplication > > >>>>>>> key". > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> This boils down to the actual de-duplication logic (some > > >>>>> more > > >>>>>>>>>> comments > > >>>>>>>>>>>>>>> below), but what you propose seems to require expensive > > >>>>>>>> range-scans > > >>>>>>>>>>>> what > > >>>>>>>>>>>>>>> could be cost prohibitive in practice. I think we need to > > >>>>>> find > > >>>>>>> a > > >>>>>>>> way > > >>>>>>>>>> to > > >>>>>>>>>>>>>>> use efficient key-point-lookups to make this work. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 103: "Processing logic": > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Might need some updates (Cf 102 comment). I am not sure > > >>>> if > > >>>>> I > > >>>>>>>> fully > > >>>>>>>>>>>>>>> understand the logic: cf 105 below. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 104: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> If no entries found → forward the record + save the > > >>>> record > > >>>>>> in > > >>>>>>>> the > > >>>>>>>>>>>> store > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> This part is critical, and we should discuss in detail. > > >>>> In > > >>>>>> the > > >>>>>>>> end, > > >>>>>>>>>>>>>>> de-duplication does only make sense when EOS is used, and > > >>>>> we > > >>>>>>>> might > > >>>>>>>>>> want > > >>>>>>>>>>>>>>> to call this out (eg, on the JavaDocs)? But if used with > > >>>>>> ALOS, > > >>>>>>>> it's > > >>>>>>>>>>>> very > > >>>>>>>>>>>>>>> difficult to ensure that we never lose data... Your > > >>>>> proposal > > >>>>>> to > > >>>>>>>>>>>>>>> first-forward goes into the right direction, but does not > > >>>>>>> really > > >>>>>>>>>> solve > > >>>>>>>>>>>>>>> the problem entirely: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Even if we forward the message first, all downstream > > >>>>>> processing > > >>>>>>>>>>>> happens, > > >>>>>>>>>>>>>>> `context.forward()` returns and we update the state > > >>>> store, > > >>>>> we > > >>>>>>>> could > > >>>>>>>>>> now > > >>>>>>>>>>>>>>> crash w/o committing offsets. For this case, we have no > > >>>>>>> guarantee > > >>>>>>>>>> that > > >>>>>>>>>>>>>>> the result records where published (as we did not flush > > >>>> the > > >>>>>>>> producer > > >>>>>>>>>>>>>>> yet), but when re-reading from the input topic, we would > > >>>>> find > > >>>>>>> the > > >>>>>>>>>>>> record > > >>>>>>>>>>>>>>> in the store and incorrectly drop as duplicate... > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> I think the only solution to make this work would be to > > >>>> use > > >>>>>>>> TX-state > > >>>>>>>>>>>>>>> stores in combination with ALOS as proposed via KIP-892? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Using an in-memory store won't help much either? The > > >>>>> producer > > >>>>>>>> could > > >>>>>>>>>>>> have > > >>>>>>>>>>>>>>> send the write into the changelog topic, but not into the > > >>>>>>> result > > >>>>>>>>>> topic, > > >>>>>>>>>>>>>>> and thus we could still not guarantee ALOS...? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> How do we want to go about this? We could also say, this > > >>>>> new > > >>>>>>>> operator > > >>>>>>>>>>>>>>> only works with EOS. Would this be too restrictive? -- At > > >>>>>> lest > > >>>>>>>> for > > >>>>>>>>>>>> know, > > >>>>>>>>>>>>>>> until KIP-892 lands, and we could relax it? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 105: "How to detect late records" > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> In the end, it seems to boil down to determine which of > > >>>> the > > >>>>>>>> records > > >>>>>>>>>> to > > >>>>>>>>>>>>>>> forward and which record to drop, for (1) the regular > > >>>> case > > >>>>>> and > > >>>>>>>> (2) > > >>>>>>>>>> the > > >>>>>>>>>>>>>>> out-of-order data case. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Regular case (no out-of-order data): For this case, > > >>>> offset > > >>>>>> and > > >>>>>>> ts > > >>>>>>>>>> order > > >>>>>>>>>>>>>>> is the same, and we can forward the first record we get. > > >>>>> All > > >>>>>>>> later > > >>>>>>>>>>>>>>> record within "de-duplication period" with the same > > >>>>>>>> "de-duplication > > >>>>>>>>>>>> key" > > >>>>>>>>>>>>>>> would be dropped. If a record with the same > > >>>> "de-duplication > > >>>>>>> key" > > >>>>>>>>>>>> arrives > > >>>>>>>>>>>>>>> after "de-duplication period" passed, we cannot drop it > > >>>> any > > >>>>>>>> longer, > > >>>>>>>>>> but > > >>>>>>>>>>>>>>> would still forward it, as by the contract of the > > >>>> operator > > >>>>> / > > >>>>>>>>>>>>>>> de-duplication period. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> For the out-of-order case: The first question we need to > > >>>>>> answer > > >>>>>>>> is, > > >>>>>>>>>> do > > >>>>>>>>>>>>>>> we want to forward the record with the smallest offset or > > >>>>> the > > >>>>>>>> record > > >>>>>>>>>>>>>>> with the smallest ts? Logically, forwarding with the > > >>>>> smallest > > >>>>>>> ts > > >>>>>>>>>> might > > >>>>>>>>>>>>>>> be "more correct", however, it implies we could only > > >>>>> forward > > >>>>>> it > > >>>>>>>> after > > >>>>>>>>>>>>>>> "de-duplication period" passed, what might be undesired > > >>>>>>> latency? > > >>>>>>>>>> Would > > >>>>>>>>>>>>>>> this be desired/acceptable? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> In contrast, if we forward record with the smallest > > >>>> offset > > >>>>>>> (this > > >>>>>>>> is > > >>>>>>>>>>>> what > > >>>>>>>>>>>>>>> you seem to propose) we don't have a latency issue, but > > >>>> of > > >>>>>>>> course the > > >>>>>>>>>>>>>>> question what records to drop is more tricky to answer: > > >>>> it > > >>>>>>> seems > > >>>>>>>> you > > >>>>>>>>>>>>>>> propose to compare the time difference of the stored > > >>>> record > > >>>>>> to > > >>>>>>>> the > > >>>>>>>>>>>>>>> current record, but I am wondering why? Would it not be > > >>>>>> desired > > >>>>>>>> to > > >>>>>>>>>> drop > > >>>>>>>>>>>>>>> all duplicates independent of their ts, as long as we > > >>>> find > > >>>>> a > > >>>>>>>> record > > >>>>>>>>>> in > > >>>>>>>>>>>>>>> the store? Would be good to get some more motivation and > > >>>>>>>> tradeoffs > > >>>>>>>>>>>>>>> discussed about the different strategies we could use. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> You also propose to drop _any_ late record: I am also not > > >>>>>> sure > > >>>>>>> if > > >>>>>>>>>>>> that's > > >>>>>>>>>>>>>>> desired? Could this not lead to data loss? Assume we get > > >>>> a > > >>>>>> late > > >>>>>>>>>> record, > > >>>>>>>>>>>>>>> but in fact there was never a duplicate? Why would we > > >>>> want > > >>>>> to > > >>>>>>>> drop > > >>>>>>>>>> it? > > >>>>>>>>>>>>>>> If there is a late record which is indeed a duplicate, > > >>>> but > > >>>>> we > > >>>>>>>> purged > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>> original record from the store already, it seems to be > > >>>> the > > >>>>>> same > > >>>>>>>> case > > >>>>>>>>>> as > > >>>>>>>>>>>>>>> for the "no out-of-order case": after we purged we cannot > > >>>>>>>>>> de-duplicate > > >>>>>>>>>>>>>>> and thus it's a regular case we can just accept? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> -Matthias > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On 5/29/24 4:58 AM, Ayoub Omari wrote: > > >>>>>>>>>>>>>>>> Hi everyone, > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> I've just made a (small) change to this KIP about an > > >>>>>>>> implementation > > >>>>>>>>>>>>>>> detail. > > >>>>>>>>>>>>>>>> Please let me know your thoughts. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Thank you, > > >>>>>>>>>>>>>>>> Ayoub > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Le lun. 20 mai 2024 à 21:13, Ayoub < > > >>>>> ayoubomar...@gmail.com> > > >>>>>> a > > >>>>>>>>>> écrit : > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Hello, > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Following a discussion on community slack channel, I > > >>>>> would > > >>>>>>>> like to > > >>>>>>>>>>>>>>> revive > > >>>>>>>>>>>>>>>>> the discussion on the KIP-655, which is about adding a > > >>>>>>>>>> deduplication > > >>>>>>>>>>>>>>>>> processor in kafka-streams. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>