Hi Bill, Thanks for your answer.
> Can you update the KIP to state that deduplication by value will result in > auto-repartiting by Kafka Streams Updated the KIP to the latest proposal. > For option 3, what about `deduplicateByKeyValue`? (If during the PR phase > we stumble across a better name we can update the KIP then too). Yes, better than the first suggested name. Reflected in the KIP. I also added a note about records with null deduplication key. If no new points are raised in the next few days, I will start a VOTE thread. Thanks, Ayoub Le mar. 27 août 2024 à 17:47, Bill Bejeck <bbej...@gmail.com> a écrit : > Hi Ayoub, > > Thanks for the update. > > Your latest proposal looks good to me, but I have a couple of comments. > > in (2) we repartition by the id field > > > > I agree with this approach, but the KIP still states "No repartitioning is > done by this processor. In case a user wants to deduplicate an id over > multiple partitions, they should first repartition the topic on this id." > Can you update the KIP to state that deduplication by value will result in > auto-repartiting by Kafka Streams, unless the developer adds a manual > repartitioning operator themselves. > > For option 3, what about `deduplicateByKeyValue`? (If during the PR phase > we stumble across a better name we can update the KIP then too). > > This latest update has been open for a while now, I'd say it's good to > start a VOTE thread in 1-2 days. > > Thanks, > Bill > > On Mon, Aug 26, 2024 at 3:15 AM Ayoub Omari <ayoubomar...@gmail.com> > wrote: > > > Hi Bill, > > > > I think all the points raised are now handled in the KIP. > > We just still have the 106) on which we should have an > > agreement. I gave a proposition in my last mail, and > > I am waiting for others' opinions. > > > > My proposition is to have 3 methods of deduplication: > > 1) `deduplicateByKey()` > > 2) `deduplicate((k, v) -> v.id)` > > 3) `deduplicateByKeyAndId((k, v) -> v.id)` (Name to be chosen) > > > > The point of having both (2) and (3) is that in (2) we repartition > > by the id field, Whereas in (3) we are not repartitioning. > > > > (3) is needed in case the current partitioning ensures that a same > > id will always land on the same task. Here is an example I gave above: > > > > > For example, we have a topic with keyValue (`userId`, `transaction`) > > > and deduplication is done on `transaction`.`id` . Here, the application > > > wants to deduplicate transactions. It knows that a transaction id > > > maps to a single userId. Any duplicate of that record would be received > > > by the task which processes this userId. > > > > Let me know what you think. > > > > Thanks, > > Ayoub > > > > > > > > > > Le jeu. 22 août 2024 à 21:37, Bill Bejeck <bbej...@apache.org> a écrit : > > > > > Hi Ayoub, > > > > > > What's the status of this KIP? > > > > > > Regards, > > > Bill > > > > > > > > > On Sat, Jul 13, 2024 at 5:13 AM Ayoub Omari <ayoubomar...@gmail.com> > > > wrote: > > > > > > > Hi Bill, > > > > > > > > Thanks for your comments ! > > > > > > > > I am prefixing your points with B. > > > > > > > > B1. > > > > That's a good catch. I think punctuated records aren't concerned by > > > > this problem of reprocessing because they are not issued twice. > > > > We will still need to make a difference between a punctuated > > > > and a source record. The logic becomes: > > > > > > > > > if record.offset is not null && record.offset = offset_stored: > > > > > > > > forward(record) > > > > > > > > else: > > > > > > > > // do_not_forward > > > > > > > > > > > > I will update the KIP on this point. > > > > > > > > > > > > > > > > B2. (i.e. 106) > > > > That's the point discussed in 106. The main idea is to avoid > > > repartitioning > > > > when we don't have to. But let's say we want to repartition when > > > > deduplication is on an id that is not the key. > > > > > > > > The user code may look something like this: > > > > ```kstream > > > > .deduplicate((k, v) -> v.id) > > > > .map((k, v) -> ...) > > > > ``` > > > > The user expects that the key in the map processor which follows the > > > > deduplicate is the same as the initial key. > > > > > > > > Internally, if we decide to repartition the deduplication processor > may > > > do > > > > the following: > > > > ```kstream > > > > .map((k, v) -> (v.id, ...)) // For repartitioning on id > > > > .deduplicateByKey() > > > > .map((id, v) -> (initialKey, ...)) // To recover the initial key > > > > ``` > > > > > > > > In this case, there is some internal complexity of this processor: > > > > I) We should repartition twice, before and after deduplication. > The > > > > second > > > > map triggers repartitioning if there are following stateful > processors > > > > in the pipeline. > > > > > > > > II.) The initial key should be kept somewhere. One way is to wrap > > the > > > > value > > > > to add the key, and then unwrap it after the `deduplicateByKey` > > > > > > > > As mentioned in one use case above (deduplicating transactions - > > written > > > at > > > > least > > > > once - by id), repartition is not needed for that case. This added > > > > complexity won't > > > > be useful. > > > > > > > > *TLDR,* I think one solution is to have a separate api to cover such > a > > > use > > > > case. In total, > > > > we can have 3 apis of deduplication > > > > > > > > 1) `deduplicateByKey()` *// No repartitioning* > > > > 2) `deduplicateByKey((k, v) -> v.id)` *or > *`deduplicateByKeyAndId((k, > > > v) > > > > -> v.id)` *// No repartitioning* > > > > 3) `deduplicate((k, v) -> v.id)` *// causes repartitioning* > > > > > > > > The second api is equivalent to deduplicate by the couple (key, id). > > > > No repartitioning is required because the records with the same key > are > > > > already > > > > together in the same partition. cc @Matthias @Sebastien. > > > > > > > > > > > > > > > > B3. > > > > I think here we can do what some existing processors do for periodic > > > work, > > > > we can track the last cleanup time and the current stream time, > > > > if the delta exceeds the deduplication period, we trigger the > cleanup. > > > > > > > > > > > > Best, > > > > Ayoub > > > > > > > > Le jeu. 11 juil. 2024 à 01:17, Bill Bejeck <bbej...@apache.org> a > > écrit > > > : > > > > > > > > > Hi All, > > > > > > > > > > Thanks Ayoub for getting this KIP going again, I think a > > deduplication > > > > > operator will be very useful. My applogies for being late to the > > > > > discussion. > > > > > > > > > > Overall - I agree with the direction of the KIP but I have a couple > > of > > > > > questions. > > > > > > > > > > 1. Regarding using offsets for tracking the first arrival. I > think > > > > > there could be a case when offsets would not be available, when > > records > > > > are > > > > > forwarded by punctuation for example. > > > > > > > > > > 2. I'm not sure about using something other than the key for > > > identifying > > > > > dupllicate records. By doing so, one could end up missing a > > > > de-duplication > > > > > due to records with the same id characteristic in the value but > > having > > > > > different keys so the records land on different partitions. I > guess > > we > > > > > could enforce a repartition if one choses to use a de-duplication > id > > > > other > > > > > than the key. > > > > > > > > > > 3. I think a punctuation scheduled to run at the de-duplication > > period > > > to > > > > > clean out older records would be a clean approach for purging older > > > > > records. I'm not taking a hard stance on this approach and I'm > > willing > > > > to > > > > > discuss different methods. > > > > > > > > > > Thanks, > > > > > > > > > > Bill > > > > > > > > > > > > > > > > > > > > On 2024/06/25 18:44:06 Ayoub Omari wrote: > > > > > > Hi Matthias, > > > > > > > > > > > > Here are my updates on your points. > > > > > > > > > > > > 101. > > > > > > > You propose to add static methods `keySerde()` and > `valueSerde()` > > > -- > > > > > > > in other config classes, we use only `with(keySerde, > valueSerde)` > > > as > > > > we > > > > > > try > > > > > > > to use the "builder" pattern, and avoid too many overloads. I > > would > > > > > > > prefer to omit both methods you suggest and just use a single > > > `with` > > > > > for > > > > > > > both serdes. > > > > > > > > > > > > I was actually inspired by the other config classes, for example > > > > `Joined` > > > > > > and > > > > > > `Grouped` both have the static methods `keySerde()` and > > > `valueSerde()`. > > > > > > > > > > > > > I think we don't want to add `with(...)` which takes all > > > > > > > parameters at once > > > > > > > > > > > > Done. > > > > > > > > > > > > > > > > > > 102. > > > > > > Thanks, your suggestion sounds good to me. The trade-off of > having > > an > > > > > index > > > > > > that allows to efficiently purge expired records besides the > > keyValue > > > > > store > > > > > > makes sense. I've been looking into the code, and I think a > similar > > > > idea > > > > > > was implemented for other processors (for example with > > > > > > DualSchemaRocksDBSegmentedBytesStore). > > > > > > As you said, I think we would benefit from some existing code > here. > > > > > > KIP updated ! > > > > > > > > > > > > > > > > > > 104. > > > > > > Updated the KIP to consider records' offsets. > > > > > > > > > > > > > > > > > > 105 > > > > > > > picking the first offset with smallest ts sounds good to me. > The > > > KIP > > > > > > > should be explicit about it > > > > > > > > > > > > Done. > > > > > > > > > > > > > But as discussed above, it might be > > > > > > > simplest to not really have a window lookup, but just a plain > > > > > key-lookup > > > > > > > and drop if the key exists in the store? > > > > > > > > > > > > KIP updated, we will be `.get()`ing from a keyValueStore instead > of > > > > > > `.fetch()`ing > > > > > > from a WindowStore. > > > > > > > > > > > > > Another line of thinking, that did serve us well in the past: > in > > > > doubt > > > > > > > keep a record -- users can add operators to drop record (in > case > > > they > > > > > > > don't want to keep it), but if we drop a record, users have no > > way > > > to > > > > > > > resurrect it (thus, building a workaround to change semantica > is > > > > > > > possible for users if we default to keep records, but not the > > other > > > > way > > > > > > > around). > > > > > > > > > > > > Makes total sense ! I updated the KIP to forward late records > > instead > > > > of > > > > > > dropping them. > > > > > > > > > > > > > > > > > > 106. > > > > > > For the moment, I highlighted in Javadocs that we are > deduplicating > > > by > > > > > > partition. If there is a better name to have this information in > > the > > > > name > > > > > > of the api itself it would be good. > > > > > > > > > > > > > > > > > > Best, > > > > > > Ayoub > > > > > > > > > > > > > > > > > > Le jeu. 13 juin 2024 à 09:03, Sebastien Viale < > > > > > sebastien.vi...@michelin.com> > > > > > > a écrit : > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > 106 : > > > > > > > > > > > > > > Thanks for the clarification. Actually, this is not what I > > > expected, > > > > > but I > > > > > > > better understand the performance issues regarding the state > > store > > > > > > > iteration. > > > > > > > If this is how it should be designed, it is fine for me as long > > as > > > it > > > > > is > > > > > > > clear that the repartition must be done before the > deduplication. > > > > > > > Sébastien > > > > > > > > > > > > > > ________________________________ > > > > > > > De : Matthias J. Sax <mj...@apache.org> > > > > > > > Envoyé : jeudi 13 juin 2024 02:51 > > > > > > > À : dev@kafka.apache.org <dev@kafka.apache.org> > > > > > > > Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication > processor > > in > > > > > > > kafka-streams > > > > > > > > > > > > > > Warning External sender Do not click on any links or open any > > > > > attachments > > > > > > > unless you trust the sender and know the content is safe. > > > > > > > > > > > > > > 106: > > > > > > > > > > > > > > > For the use-case of deduplicating a "at least once written" > > > stream, > > > > > > > > we are sure that the duplicate record has the same key as the > > > > > > > > original one, and will land on the same task. Here, a user > > would > > > > > > > > want to specify a deduplication key different from the > topic's > > > key > > > > > > > > in case the topic's key is not a unique identifier > > > > > > > > > > > > > > > > For example, we have a topic with keyValue (`userId`, > > > > `transaction`) > > > > > > > > and deduplication is done on `transaction`.`id` . Here, the > > > > > application > > > > > > > > wants to deduplicate transactions. It knows that a > transaction > > id > > > > > > > > maps to a single userId. Any duplicate of that record would > be > > > > > received > > > > > > > > by the task which processes this userId. > > > > > > > > > > > > > > This is an interesting point. > > > > > > > > > > > > > > My concern is to some extend, that it seems (on the surface) to > > not > > > > > > > follow the established pattern of auto-repartitioning in the > DSL. > > > Of > > > > > > > course, given that the current proposal says we use an "id > > > extractor" > > > > > > > and not a "key extractor" it might be ok (but it might be > > somewhat > > > > > > > subtle). Of course, JavaDocs always help to explain in detail. > > > Would > > > > > > > this be enough? > > > > > > > > > > > > > > Would be good to hear from others about this point. I am > > personally > > > > not > > > > > > > sure which approach I would prefer personally at this point. > > > > > > > > > > > > > > The problem reminds me on > > > > > > > https://issues.apache.org/jira/browse/KAFKA-10844< > > > > > > > https://issues.apache.org/jira/browse/KAFKA-10844> which is > not > > > > > resolve > > > > > > > directly either. We do have KIP-759 > > > > > > > (https://cwiki.apache.org/confluence/display/KAFKA/KIP-759: > > > Unneeded > > > > > > > repartition canceling< > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-759: > > > Unneeded > > > > > > > repartition canceling>) > > > > > > > which is WIP and helps with KAFKA-10844, but not sure if it > would > > > be > > > > a > > > > > > > viable solution for the de-duplication case? > > > > > > > > > > > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > This email was screened for spam and malicious content but > > exercise > > > > > > > caution anyway. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 6/11/24 2:31 PM, Ayoub Omari wrote: > > > > > > > > Hi Sebastien & Matthias, > > > > > > > > > > > > > > > > For 106. > > > > > > > > My idea was to deduplicate on a per-task basis. If the user > > wants > > > > > > > > to do a global deduplication over all partitions, I think > it's > > > > > better to > > > > > > > > have him explicitly repartition and then call the > deduplication > > > > > > > processor. > > > > > > > > > > > > > > > > For the use-case of deduplicating a "at least once written" > > > stream, > > > > > > > > we are sure that the duplicate record has the same key as the > > > > > > > > original one, and will land on the same task. Here, a user > > would > > > > > > > > want to specify a deduplication key different from the > topic's > > > key > > > > > > > > in case the topic's key is not a unique identifier. > > > > > > > > > > > > > > > > For example, we have a topic with keyValue (`userId`, > > > > `transaction`) > > > > > > > > and deduplication is done on `transaction`.`id` . Here, the > > > > > application > > > > > > > > wants to deduplicate transactions. It knows that a > transaction > > id > > > > > > > > maps to a single userId. Any duplicate of that record would > be > > > > > received > > > > > > > > by the task which processes this userId. > > > > > > > > > > > > > > > > One other thought I have when writing the KIP about global > > > > > deduplication, > > > > > > > > is that it will require to map twice the key of the stream > > (first > > > > > map to > > > > > > > > change the key to deduplication key, and second map to get > > > > > > > > back the initial key). Second map may imply a second > > > > repartitioning. > > > > > > > > > > > > > > > > However, if we do a per-task deduplication, the user may > adapt > > to > > > > his > > > > > > > > specific use-case. > > > > > > > > > > > > > > > > Let me know what you think > > > > > > > > Ayoub > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Le mar. 11 juin 2024 à 20:21, Matthias J. Sax < > > mj...@apache.org> > > > a > > > > > > > écrit : > > > > > > > > > > > > > > > >> Thanks Sebastien, > > > > > > > >> > > > > > > > >> that's a good point. Thanks for raising it. -- I like your > > > > proposal. > > > > > > > >> > > > > > > > >> An alternative would be to have two overloads of > > `deduplicate()` > > > > > one w/ > > > > > > > >> and one w/o the "id extractor" parameter. This would be less > > > > > explicit > > > > > > > >> though. > > > > > > > >> > > > > > > > >> > > > > > > > >> -Matthias > > > > > > > >> > > > > > > > >> On 6/11/24 2:30 AM, Sebastien Viale wrote: > > > > > > > >>> Hi, > > > > > > > >>> > > > > > > > >>> I am really interested in this KIP. > > > > > > > >>> > > > > > > > >>> 106: > > > > > > > >>> I hope I am not talking nonsense, but if you do not > > deduplicate > > > > > based > > > > > > > on > > > > > > > >> the key, the input stream has to be repartitioned. > > > > > > > >>> Otherwise, different stream tasks may handle records that > > need > > > to > > > > > be > > > > > > > >> deduplicated, and thus duplicates will not be detected. > > > > > > > >>> > > > > > > > >>> This is why I would have created two different methods, as > is > > > > done > > > > > for > > > > > > > >> GroupBy: > > > > > > > >>> > > > > > > > >>> deduplicateByKey(...) > > > > > > > >>> deduplicate(...) > > > > > > > >>> > > > > > > > >>> If deduplicateByKey is used, the input stream does not need > > to > > > be > > > > > > > >> repartitioned. > > > > > > > >>> > > > > > > > >>> thanks > > > > > > > >>> > > > > > > > >>> Sébastien > > > > > > > >>> ________________________________ > > > > > > > >>> De : Matthias J. Sax <mj...@apache.org> > > > > > > > >>> Envoyé : mardi 11 juin 2024 01:54 > > > > > > > >>> À : dev@kafka.apache.org <dev@kafka.apache.org> > > > > > > > >>> Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication > > > processor > > > > in > > > > > > > >> kafka-streams > > > > > > > >>> > > > > > > > >>> Warning External sender Do not click on any links or open > any > > > > > > > >> attachments unless you trust the sender and know the content > > is > > > > > safe. > > > > > > > >>> > > > > > > > >>> Thanks for the update Ayoub. > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> 101: you say: > > > > > > > >>> > > > > > > > >>>> But I am not sure if we don't want to have them for this > > > > > processor ? > > > > > > > >>> > > > > > > > >>> What is your reasoning to move off the established pattern? > > > Would > > > > > be > > > > > > > >>> good to understand, why `Deduplicated` class needs a > > different > > > > > > > >>> "structure" compared to existing classes. > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> 102: Creating iterators is very expensive. For other work, > we > > > > > actually > > > > > > > >>> hit 100x (?) throughput degradation by creating an (for > most > > > > cases > > > > > > > >>> empty) iterator for every input record, and needed to find > > > other > > > > > ways > > > > > > > to > > > > > > > >>> avoid creating an iterator per record. It really kills > > > > performance. > > > > > > > >>> > > > > > > > >>> I see the point about data expiration. We could experiment > > with > > > > > > > >>> punctuation to expire old data, or add a second > "time-ordered > > > > > store" > > > > > > > >>> (which we already have at hand) which acts as an index into > > the > > > > > main > > > > > > > >>> store. -- Another possibility would be to add a new version > > of > > > > > > > segmented > > > > > > > >>> store with a different key-layout (ie, just store the plain > > > > key). I > > > > > > > >>> think with some refactoring, we might be able to re-use a > lot > > > of > > > > > > > >>> existing code. > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> 104: > > > > > > > >>> > > > > > > > >>>> This gets me wondering if this is a limitation of stateful > > > > > processors > > > > > > > >>>> in ALOS. For example a windowed aggregation with > > > > `on_window_close` > > > > > > > >>>> emit strategy may have the same limitation today (we > > receive a > > > > > record, > > > > > > > >>>> we update its aggregation result in the store, then crash > > > before > > > > > > > >> committing, > > > > > > > >>>> then the record will be again reconsidered for > aggregation). > > > Is > > > > > this > > > > > > > >>>> correct ? > > > > > > > >>> > > > > > > > >>> Yes, this is correct, but it does not violate ALOS, because > > we > > > > did > > > > > not > > > > > > > >>> lose the input record -- of course, the aggregation would > > > contain > > > > > the > > > > > > > >>> input record twice (eg, over count), but this is ok under > > ALOS. > > > > > > > >>> Unfortunately, for de-duplication this pattern breaks, > > because > > > > > > > >>> de-duplication operator does a different "aggregation > logic" > > > > > depending > > > > > > > >>> on its state (emit if no key found, but not emit if key > > found). > > > > For > > > > > > > >>> counting as an example, we increment the count and emit > > > > > unconditionally > > > > > > > >>> though. > > > > > > > >>> > > > > > > > >>> > > > > > > > >>>> As a workaround, I think storing the record's offset > inside > > > the > > > > > > > >>>> store's value can tell us whether the record has been > > already > > > > > seen or > > > > > > > >> not. > > > > > > > >>>> If we receive a record whose deduplication id exists in > the > > > > store > > > > > > > >>>> and the entry in the store has the same offset, it means > the > > > > > record > > > > > > > >>>> is processed twice and we can go ahead and forward it. If > > the > > > > > offset > > > > > > > >>>> is different, it means it's a duplicate record, so we > ignore > > > it. > > > > > > > >>> > > > > > > > >>> Great idea. This might work... If we store the input record > > > > > offset, we > > > > > > > >>> can actually avoid that the "aggregation logic" changes for > > the > > > > > same > > > > > > > >>> input record. -- And yes, with ALOS potentially emitting a > > > > > duplicate is > > > > > > > >>> the-name-of-the-game, so no concerns on this part from my > > side. > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> 105: picking the first offset with smallest ts sound good > to > > > me. > > > > > The > > > > > > > KIP > > > > > > > >>> should be explicit about it. But as discussed above, it > might > > > be > > > > > > > >>> simplest to not really have a window lookup, but just a > plain > > > > > > > key-lookup > > > > > > > >>> and drop if the key exists in the store? -- For late > records, > > > it > > > > > might > > > > > > > >>> imply that they are not de-duplicated, but this is also the > > > case > > > > > for > > > > > > > >>> in-order records if they are further apart than the > > > > de-duplication > > > > > > > >>> window size, right? Thus I would believe this is "more > > natural" > > > > > > > compared > > > > > > > >>> to discarding late records pro-actively, which would lead > to > > > > > missing > > > > > > > >>> result records? > > > > > > > >>> > > > > > > > >>> We could also make this configurable if we are not sure > what > > > > users > > > > > > > >>> really need -- or add such a configuration later in case > the > > > > > semantics > > > > > > > >>> we pick don't work for some users. > > > > > > > >>> > > > > > > > >>> Another line of thinking, that did serve us well in the > past: > > > in > > > > > doubt > > > > > > > >>> keep a record -- users can add operators to drop record (in > > > case > > > > > they > > > > > > > >>> don't want to keep it), but if we drop a record, users have > > no > > > > way > > > > > to > > > > > > > >>> resurrect it (thus, building a workaround to change > semantica > > > is > > > > > > > >>> possible for users if we default to keep records, but not > the > > > > > other way > > > > > > > >>> around). > > > > > > > >>> > > > > > > > >>> Would be good to get input from the broader community on > this > > > > > question > > > > > > > >>> thought. In the end, it must be a use-case driven decision? > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> -Matthias > > > > > > > >>> > > > > > > > >>> This email was screened for spam and malicious content but > > > > exercise > > > > > > > >> caution anyway. > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> On 6/6/24 5:02 AM, Ayoub Omari wrote: > > > > > > > >>>> Hi Matthias, > > > > > > > >>>> > > > > > > > >>>> Thank you for your review ! > > > > > > > >>>> > > > > > > > >>>> 100. > > > > > > > >>>> I agree. I changed the name of the parameter to > > "idSelector". > > > > > > > >>>> Because this id may be computed, It is better to call it > > "id" > > > > > rather > > > > > > > >> than > > > > > > > >>>> field or attribute. > > > > > > > >>>> > > > > > > > >>>> 101. > > > > > > > >>>> The reason I added the methods `keySerde()` and > > `valueSerde()` > > > > > was to > > > > > > > >>>> have the same capabilities as other serde classes (such as > > > > Grouped > > > > > > > >>>> or Joined). As a Kafka-streams user, I usually use > > > > `with(keySerde, > > > > > > > >>>> valueSerde)` > > > > > > > >>>> as you suggested. But I am not sure if we don't want to > have > > > > them > > > > > for > > > > > > > >> this > > > > > > > >>>> processor ? > > > > > > > >>>> > > > > > > > >>>> 102. > > > > > > > >>>> That's a good point ! Because we know that the window > store > > > will > > > > > > > contain > > > > > > > >>>> at most one instance of a given key, I am not sure how the > > > range > > > > > fetch > > > > > > > >>>> on WindowStore compares to a KeyValueStore `get()` in this > > > case. > > > > > > > >>>> Wondering if the fact that the record's key is the prefix > of > > > the > > > > > > > >> underlying > > > > > > > >>>> keyValueStore's key ("<dataKey,ts>") may provide > comparable > > > > > > > performance > > > > > > > >>>> to the random access of KeyValueStore ? Of course, the > > > > WindowStore > > > > > > > >> fetch() > > > > > > > >>>> would be less efficient because it may fetch from more > than > > 1 > > > > > segment, > > > > > > > >> and > > > > > > > >>>> because of some iterator overhead. > > > > > > > >>>> > > > > > > > >>>> The purpose of using a WindowStore is to automatically > purge > > > old > > > > > data. > > > > > > > >>>> For example, deduplicating a topic written at least once > > > > wouldn't > > > > > > > >> require > > > > > > > >>>> keeping a large history. This is not the case of using a > > > > > KeyValueStore > > > > > > > >>>> which would require scanning regularly to remove expired > > > > records. > > > > > > > >>>> That might cause a sudden increase of latency whenever the > > > > cleanup > > > > > > > >>>> is triggered. > > > > > > > >>>> > > > > > > > >>>> It would be good to hear from anyone who has done some > > > analysis > > > > > > > >>>> on RocksDB's range fetch. > > > > > > > >>>> > > > > > > > >>>> 103. > > > > > > > >>>> Sure, I can update it once we agree on underlying > semantics. > > > > > > > >>>> > > > > > > > >>>> 104. > > > > > > > >>>> Another good point ! > > > > > > > >>>> > > > > > > > >>>>> In the end, de-duplication does only make sense when EOS > is > > > > used > > > > > > > >>>> > > > > > > > >>>> I agree with that. And for me, the use case of > > deduplicating a > > > > > topic > > > > > > > >>>> written ALOS inside an EOS application might be the top 1 > > use > > > > case > > > > > > > >>>> of deduplication. > > > > > > > >>>> > > > > > > > >>>>> all downstream processing happens, `context.forward()` > > > returns > > > > > > > >>>>> and we update the state store, we could now crash w/o > > > > committing > > > > > > > >> offsets > > > > > > > >>>> > > > > > > > >>>> This gets me wondering if this is a limitation of stateful > > > > > processors > > > > > > > >>>> in ALOS. For example a windowed aggregation with > > > > `on_window_close` > > > > > > > >>>> emit strategy may have the same limitation today (we > > receive a > > > > > record, > > > > > > > >>>> we update its aggregation result in the store, then crash > > > before > > > > > > > >> committing, > > > > > > > >>>> then the record will be again reconsidered for > aggregation). > > > Is > > > > > this > > > > > > > >>>> correct ? > > > > > > > >>>> > > > > > > > >>>> As a workaround, I think storing the record's offset > inside > > > the > > > > > > > >>>> store's value can tell us whether the record has been > > already > > > > > seen or > > > > > > > >> not. > > > > > > > >>>> If we receive a record whose deduplication id exists in > the > > > > store > > > > > > > >>>> and the entry in the store has the same offset, it means > the > > > > > record > > > > > > > >>>> is processed twice and we can go ahead and forward it. If > > the > > > > > offset > > > > > > > >>>> is different, it means it's a duplicate record, so we > ignore > > > it. > > > > > > > >>>> > > > > > > > >>>> As you said, we don't have any guarantee whether the > initial > > > > > record > > > > > > > was > > > > > > > >>>> forwarded or not in case of a crash before commit. In this > > > > > solution > > > > > > > >>>> we would forward the record twice, which is against > > > > deduplication. > > > > > > > >>>> But, this is still an ALOS application, so it has the same > > > > > semantics > > > > > > > >>>> as any other such application. With this, I am not sure we > > can > > > > > > > >>>> have "strict" deduplication for ALOS applications. > > > > > > > >>>> > > > > > > > >>>> 105. > > > > > > > >>>> For me, if there are two duplicate records, it means they > > are > > > > > > > >>>> the same in the application's point of view, so it can > > choose > > > > > > > >>>> either one. Thus, I would go with forwarding the record > with > > > > > > > >>>> the least offset. > > > > > > > >>>> > > > > > > > >>>>> Would it not be desired to drop all duplicates > independent > > > > > > > >>>>> of their ts, as long as we find a record in the store? > > > > > > > >>>> > > > > > > > >>>> This is actually related to the (suggested) windowed > nature > > > > > > > >>>> of deduplication. As in 102. we don't want to do a > "forever" > > > > > > > >>>> deduplication, which may be impossible for huge workloads > > > > > > > >>>> where all records should be kept in the store. Hence, the > > > fetch > > > > > > > >>>> of timestamp between [ts-deduplicationInterval, > > > > > > > >> ts+deduplicationInterval] > > > > > > > >>>> > > > > > > > >>>> About late records, this is again due to the windowed > > nature. > > > > > > > >>>> Because the store won't save those late (i.e. expired) > > > records, > > > > > > > >>>> we have two options. Either, we do not apply deduplication > > > > > > > >>>> on them, thus the deduplication doesn't work on late > > records, > > > > > > > >>>> or we discard them (which is the option I suggest). > > > > > > > >>>> In the second case, It would be up to the user to choose > > > > > > > >>>> any deduplicationInterval that may sufficiently cover all > > his > > > > late > > > > > > > data. > > > > > > > >>>> What do you think ? > > > > > > > >>>> > > > > > > > >>>> Thanks, > > > > > > > >>>> Ayoub > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>> Le mar. 4 juin 2024 à 23:58, Matthias J. Sax < > > > mj...@apache.org> > > > > a > > > > > > > >> écrit : > > > > > > > >>>> > > > > > > > >>>>> Ayoub, > > > > > > > >>>>> > > > > > > > >>>>> thanks for resurrecting this KIP. I think a built-in > > > > > de-duplication > > > > > > > >>>>> operator will be very useful. > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> Couple of questions: > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> 100: `deduplicationKeySelector` > > > > > > > >>>>> > > > > > > > >>>>> Is this the best name? It might indicate that we select a > > > "key" > > > > > what > > > > > > > is > > > > > > > >>>>> an overloaded term... Maybe we could use `Field` or `Id` > or > > > > > > > `Attribute` > > > > > > > >>>>> instead of `Key` in the name? Just brainstorming. If we > > think > > > > > `Key` > > > > > > > is > > > > > > > >>>>> the best word, I am also ok with it. > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> 101: `Deduplicated` class > > > > > > > >>>>> > > > > > > > >>>>> You propose to add static methods `keySerde()` and > > > > > `valueSerde()` -- > > > > > > > in > > > > > > > >>>>> other config classes, we use only `with(keySerde, > > > valueSerde)` > > > > > as we > > > > > > > >> try > > > > > > > >>>>> to use the "builder" pattern, and avoid too many > > overloads. I > > > > > would > > > > > > > >>>>> prefer to omit both methods you suggest and just use a > > single > > > > > `with` > > > > > > > >> for > > > > > > > >>>>> both serdes. > > > > > > > >>>>> > > > > > > > >>>>> Similarly, I thing we don't want to add `with(...)` which > > > takes > > > > > all > > > > > > > >>>>> parameters at once (which should only be 3 parameters, > not > > 4 > > > as > > > > > it's > > > > > > > >>>>> currently in the KIP)? > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> 102: Usage of `WindowedStore`: > > > > > > > >>>>> > > > > > > > >>>>> Would this be efficient? The physical byte layout it > > > > > "<dataKey,ts>" > > > > > > > for > > > > > > > >>>>> the store key, so it would be difficult to do an > efficient > > > > lookup > > > > > > > for a > > > > > > > >>>>> given "de-duplication key" to discard duplicates, as we > > don't > > > > > know > > > > > > > the > > > > > > > >>>>> timestamp of the first record with the same > "de-duplication > > > > key". > > > > > > > >>>>> > > > > > > > >>>>> This boils down to the actual de-duplication logic (some > > more > > > > > > > comments > > > > > > > >>>>> below), but what you propose seems to require expensive > > > > > range-scans > > > > > > > >> what > > > > > > > >>>>> could be cost prohibitive in practice. I think we need to > > > find > > > > a > > > > > way > > > > > > > to > > > > > > > >>>>> use efficient key-point-lookups to make this work. > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> 103: "Processing logic": > > > > > > > >>>>> > > > > > > > >>>>> Might need some updates (Cf 102 comment). I am not sure > if > > I > > > > > fully > > > > > > > >>>>> understand the logic: cf 105 below. > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> 104: > > > > > > > >>>>> > > > > > > > >>>>>> If no entries found → forward the record + save the > record > > > in > > > > > the > > > > > > > >> store > > > > > > > >>>>> > > > > > > > >>>>> This part is critical, and we should discuss in detail. > In > > > the > > > > > end, > > > > > > > >>>>> de-duplication does only make sense when EOS is used, and > > we > > > > > might > > > > > > > want > > > > > > > >>>>> to call this out (eg, on the JavaDocs)? But if used with > > > ALOS, > > > > > it's > > > > > > > >> very > > > > > > > >>>>> difficult to ensure that we never lose data... Your > > proposal > > > to > > > > > > > >>>>> first-forward goes into the right direction, but does not > > > > really > > > > > > > solve > > > > > > > >>>>> the problem entirely: > > > > > > > >>>>> > > > > > > > >>>>> Even if we forward the message first, all downstream > > > processing > > > > > > > >> happens, > > > > > > > >>>>> `context.forward()` returns and we update the state > store, > > we > > > > > could > > > > > > > now > > > > > > > >>>>> crash w/o committing offsets. For this case, we have no > > > > guarantee > > > > > > > that > > > > > > > >>>>> the result records where published (as we did not flush > the > > > > > producer > > > > > > > >>>>> yet), but when re-reading from the input topic, we would > > find > > > > the > > > > > > > >> record > > > > > > > >>>>> in the store and incorrectly drop as duplicate... > > > > > > > >>>>> > > > > > > > >>>>> I think the only solution to make this work would be to > use > > > > > TX-state > > > > > > > >>>>> stores in combination with ALOS as proposed via KIP-892? > > > > > > > >>>>> > > > > > > > >>>>> Using an in-memory store won't help much either? The > > producer > > > > > could > > > > > > > >> have > > > > > > > >>>>> send the write into the changelog topic, but not into the > > > > result > > > > > > > topic, > > > > > > > >>>>> and thus we could still not guarantee ALOS...? > > > > > > > >>>>> > > > > > > > >>>>> How do we want to go about this? We could also say, this > > new > > > > > operator > > > > > > > >>>>> only works with EOS. Would this be too restrictive? -- At > > > lest > > > > > for > > > > > > > >> know, > > > > > > > >>>>> until KIP-892 lands, and we could relax it? > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> 105: "How to detect late records" > > > > > > > >>>>> > > > > > > > >>>>> In the end, it seems to boil down to determine which of > the > > > > > records > > > > > > > to > > > > > > > >>>>> forward and which record to drop, for (1) the regular > case > > > and > > > > > (2) > > > > > > > the > > > > > > > >>>>> out-of-order data case. > > > > > > > >>>>> > > > > > > > >>>>> Regular case (no out-of-order data): For this case, > offset > > > and > > > > ts > > > > > > > order > > > > > > > >>>>> is the same, and we can forward the first record we get. > > All > > > > > later > > > > > > > >>>>> record within "de-duplication period" with the same > > > > > "de-duplication > > > > > > > >> key" > > > > > > > >>>>> would be dropped. If a record with the same > "de-duplication > > > > key" > > > > > > > >> arrives > > > > > > > >>>>> after "de-duplication period" passed, we cannot drop it > any > > > > > longer, > > > > > > > but > > > > > > > >>>>> would still forward it, as by the contract of the > operator > > / > > > > > > > >>>>> de-duplication period. > > > > > > > >>>>> > > > > > > > >>>>> For the out-of-order case: The first question we need to > > > answer > > > > > is, > > > > > > > do > > > > > > > >>>>> we want to forward the record with the smallest offset or > > the > > > > > record > > > > > > > >>>>> with the smallest ts? Logically, forwarding with the > > smallest > > > > ts > > > > > > > might > > > > > > > >>>>> be "more correct", however, it implies we could only > > forward > > > it > > > > > after > > > > > > > >>>>> "de-duplication period" passed, what might be undesired > > > > latency? > > > > > > > Would > > > > > > > >>>>> this be desired/acceptable? > > > > > > > >>>>> > > > > > > > >>>>> In contrast, if we forward record with the smallest > offset > > > > (this > > > > > is > > > > > > > >> what > > > > > > > >>>>> you seem to propose) we don't have a latency issue, but > of > > > > > course the > > > > > > > >>>>> question what records to drop is more tricky to answer: > it > > > > seems > > > > > you > > > > > > > >>>>> propose to compare the time difference of the stored > record > > > to > > > > > the > > > > > > > >>>>> current record, but I am wondering why? Would it not be > > > desired > > > > > to > > > > > > > drop > > > > > > > >>>>> all duplicates independent of their ts, as long as we > find > > a > > > > > record > > > > > > > in > > > > > > > >>>>> the store? Would be good to get some more motivation and > > > > > tradeoffs > > > > > > > >>>>> discussed about the different strategies we could use. > > > > > > > >>>>> > > > > > > > >>>>> You also propose to drop _any_ late record: I am also not > > > sure > > > > if > > > > > > > >> that's > > > > > > > >>>>> desired? Could this not lead to data loss? Assume we get > a > > > late > > > > > > > record, > > > > > > > >>>>> but in fact there was never a duplicate? Why would we > want > > to > > > > > drop > > > > > > > it? > > > > > > > >>>>> If there is a late record which is indeed a duplicate, > but > > we > > > > > purged > > > > > > > >> the > > > > > > > >>>>> original record from the store already, it seems to be > the > > > same > > > > > case > > > > > > > as > > > > > > > >>>>> for the "no out-of-order case": after we purged we cannot > > > > > > > de-duplicate > > > > > > > >>>>> and thus it's a regular case we can just accept? > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> -Matthias > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> On 5/29/24 4:58 AM, Ayoub Omari wrote: > > > > > > > >>>>>> Hi everyone, > > > > > > > >>>>>> > > > > > > > >>>>>> I've just made a (small) change to this KIP about an > > > > > implementation > > > > > > > >>>>> detail. > > > > > > > >>>>>> Please let me know your thoughts. > > > > > > > >>>>>> > > > > > > > >>>>>> Thank you, > > > > > > > >>>>>> Ayoub > > > > > > > >>>>>> > > > > > > > >>>>>> Le lun. 20 mai 2024 à 21:13, Ayoub < > > ayoubomar...@gmail.com> > > > a > > > > > > > écrit : > > > > > > > >>>>>> > > > > > > > >>>>>>> Hello, > > > > > > > >>>>>>> > > > > > > > >>>>>>> Following a discussion on community slack channel, I > > would > > > > > like to > > > > > > > >>>>> revive > > > > > > > >>>>>>> the discussion on the KIP-655, which is about adding a > > > > > > > deduplication > > > > > > > >>>>>>> processor in kafka-streams. > > > > > > > >>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API > > > > > > > > > > > > > > > >> < > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API > > > > > > > > > > > > > > > >>> > > > > > > > >>>>>>> > > > > > > > >>>>>>> Even though the motivation is not quite the same as the > > > > initial > > > > > > > one, > > > > > > > >> I > > > > > > > >>>>>>> updated the KIP rather than creating a new one, as I > > > believe > > > > > the > > > > > > > end > > > > > > > >>>>> goal > > > > > > > >>>>>>> is the same. > > > > > > > >>>>>>> > > > > > > > >>>>>>> Thanks, > > > > > > > >>>>>>> Ayoub > > > > > > > >>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>> > > > > > > > >>>>> > > > > > > > >>>> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >