Hi Bill, I think all the points raised are now handled in the KIP. We just still have the 106) on which we should have an agreement. I gave a proposition in my last mail, and I am waiting for others' opinions.
My proposition is to have 3 methods of deduplication: 1) `deduplicateByKey()` 2) `deduplicate((k, v) -> v.id)` 3) `deduplicateByKeyAndId((k, v) -> v.id)` (Name to be chosen) The point of having both (2) and (3) is that in (2) we repartition by the id field, Whereas in (3) we are not repartitioning. (3) is needed in case the current partitioning ensures that a same id will always land on the same task. Here is an example I gave above: > For example, we have a topic with keyValue (`userId`, `transaction`) > and deduplication is done on `transaction`.`id` . Here, the application > wants to deduplicate transactions. It knows that a transaction id > maps to a single userId. Any duplicate of that record would be received > by the task which processes this userId. Let me know what you think. Thanks, Ayoub Le jeu. 22 août 2024 à 21:37, Bill Bejeck <bbej...@apache.org> a écrit : > Hi Ayoub, > > What's the status of this KIP? > > Regards, > Bill > > > On Sat, Jul 13, 2024 at 5:13 AM Ayoub Omari <ayoubomar...@gmail.com> > wrote: > > > Hi Bill, > > > > Thanks for your comments ! > > > > I am prefixing your points with B. > > > > B1. > > That's a good catch. I think punctuated records aren't concerned by > > this problem of reprocessing because they are not issued twice. > > We will still need to make a difference between a punctuated > > and a source record. The logic becomes: > > > > > if record.offset is not null && record.offset = offset_stored: > > > > forward(record) > > > > else: > > > > // do_not_forward > > > > > > I will update the KIP on this point. > > > > > > > > B2. (i.e. 106) > > That's the point discussed in 106. The main idea is to avoid > repartitioning > > when we don't have to. But let's say we want to repartition when > > deduplication is on an id that is not the key. > > > > The user code may look something like this: > > ```kstream > > .deduplicate((k, v) -> v.id) > > .map((k, v) -> ...) > > ``` > > The user expects that the key in the map processor which follows the > > deduplicate is the same as the initial key. > > > > Internally, if we decide to repartition the deduplication processor may > do > > the following: > > ```kstream > > .map((k, v) -> (v.id, ...)) // For repartitioning on id > > .deduplicateByKey() > > .map((id, v) -> (initialKey, ...)) // To recover the initial key > > ``` > > > > In this case, there is some internal complexity of this processor: > > I) We should repartition twice, before and after deduplication. The > > second > > map triggers repartitioning if there are following stateful processors > > in the pipeline. > > > > II.) The initial key should be kept somewhere. One way is to wrap the > > value > > to add the key, and then unwrap it after the `deduplicateByKey` > > > > As mentioned in one use case above (deduplicating transactions - written > at > > least > > once - by id), repartition is not needed for that case. This added > > complexity won't > > be useful. > > > > *TLDR,* I think one solution is to have a separate api to cover such a > use > > case. In total, > > we can have 3 apis of deduplication > > > > 1) `deduplicateByKey()` *// No repartitioning* > > 2) `deduplicateByKey((k, v) -> v.id)` *or *`deduplicateByKeyAndId((k, > v) > > -> v.id)` *// No repartitioning* > > 3) `deduplicate((k, v) -> v.id)` *// causes repartitioning* > > > > The second api is equivalent to deduplicate by the couple (key, id). > > No repartitioning is required because the records with the same key are > > already > > together in the same partition. cc @Matthias @Sebastien. > > > > > > > > B3. > > I think here we can do what some existing processors do for periodic > work, > > we can track the last cleanup time and the current stream time, > > if the delta exceeds the deduplication period, we trigger the cleanup. > > > > > > Best, > > Ayoub > > > > Le jeu. 11 juil. 2024 à 01:17, Bill Bejeck <bbej...@apache.org> a écrit > : > > > > > Hi All, > > > > > > Thanks Ayoub for getting this KIP going again, I think a deduplication > > > operator will be very useful. My applogies for being late to the > > > discussion. > > > > > > Overall - I agree with the direction of the KIP but I have a couple of > > > questions. > > > > > > 1. Regarding using offsets for tracking the first arrival. I think > > > there could be a case when offsets would not be available, when records > > are > > > forwarded by punctuation for example. > > > > > > 2. I'm not sure about using something other than the key for > identifying > > > dupllicate records. By doing so, one could end up missing a > > de-duplication > > > due to records with the same id characteristic in the value but having > > > different keys so the records land on different partitions. I guess we > > > could enforce a repartition if one choses to use a de-duplication id > > other > > > than the key. > > > > > > 3. I think a punctuation scheduled to run at the de-duplication period > to > > > clean out older records would be a clean approach for purging older > > > records. I'm not taking a hard stance on this approach and I'm willing > > to > > > discuss different methods. > > > > > > Thanks, > > > > > > Bill > > > > > > > > > > > > On 2024/06/25 18:44:06 Ayoub Omari wrote: > > > > Hi Matthias, > > > > > > > > Here are my updates on your points. > > > > > > > > 101. > > > > > You propose to add static methods `keySerde()` and `valueSerde()` > -- > > > > > in other config classes, we use only `with(keySerde, valueSerde)` > as > > we > > > > try > > > > > to use the "builder" pattern, and avoid too many overloads. I would > > > > > prefer to omit both methods you suggest and just use a single > `with` > > > for > > > > > both serdes. > > > > > > > > I was actually inspired by the other config classes, for example > > `Joined` > > > > and > > > > `Grouped` both have the static methods `keySerde()` and > `valueSerde()`. > > > > > > > > > I think we don't want to add `with(...)` which takes all > > > > > parameters at once > > > > > > > > Done. > > > > > > > > > > > > 102. > > > > Thanks, your suggestion sounds good to me. The trade-off of having an > > > index > > > > that allows to efficiently purge expired records besides the keyValue > > > store > > > > makes sense. I've been looking into the code, and I think a similar > > idea > > > > was implemented for other processors (for example with > > > > DualSchemaRocksDBSegmentedBytesStore). > > > > As you said, I think we would benefit from some existing code here. > > > > KIP updated ! > > > > > > > > > > > > 104. > > > > Updated the KIP to consider records' offsets. > > > > > > > > > > > > 105 > > > > > picking the first offset with smallest ts sounds good to me. The > KIP > > > > > should be explicit about it > > > > > > > > Done. > > > > > > > > > But as discussed above, it might be > > > > > simplest to not really have a window lookup, but just a plain > > > key-lookup > > > > > and drop if the key exists in the store? > > > > > > > > KIP updated, we will be `.get()`ing from a keyValueStore instead of > > > > `.fetch()`ing > > > > from a WindowStore. > > > > > > > > > Another line of thinking, that did serve us well in the past: in > > doubt > > > > > keep a record -- users can add operators to drop record (in case > they > > > > > don't want to keep it), but if we drop a record, users have no way > to > > > > > resurrect it (thus, building a workaround to change semantica is > > > > > possible for users if we default to keep records, but not the other > > way > > > > > around). > > > > > > > > Makes total sense ! I updated the KIP to forward late records instead > > of > > > > dropping them. > > > > > > > > > > > > 106. > > > > For the moment, I highlighted in Javadocs that we are deduplicating > by > > > > partition. If there is a better name to have this information in the > > name > > > > of the api itself it would be good. > > > > > > > > > > > > Best, > > > > Ayoub > > > > > > > > > > > > Le jeu. 13 juin 2024 à 09:03, Sebastien Viale < > > > sebastien.vi...@michelin.com> > > > > a écrit : > > > > > > > > > > > > > > Hi, > > > > > > > > > > 106 : > > > > > > > > > > Thanks for the clarification. Actually, this is not what I > expected, > > > but I > > > > > better understand the performance issues regarding the state store > > > > > iteration. > > > > > If this is how it should be designed, it is fine for me as long as > it > > > is > > > > > clear that the repartition must be done before the deduplication. > > > > > Sébastien > > > > > > > > > > ________________________________ > > > > > De : Matthias J. Sax <mj...@apache.org> > > > > > Envoyé : jeudi 13 juin 2024 02:51 > > > > > À : dev@kafka.apache.org <dev@kafka.apache.org> > > > > > Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication processor in > > > > > kafka-streams > > > > > > > > > > Warning External sender Do not click on any links or open any > > > attachments > > > > > unless you trust the sender and know the content is safe. > > > > > > > > > > 106: > > > > > > > > > > > For the use-case of deduplicating a "at least once written" > stream, > > > > > > we are sure that the duplicate record has the same key as the > > > > > > original one, and will land on the same task. Here, a user would > > > > > > want to specify a deduplication key different from the topic's > key > > > > > > in case the topic's key is not a unique identifier > > > > > > > > > > > > For example, we have a topic with keyValue (`userId`, > > `transaction`) > > > > > > and deduplication is done on `transaction`.`id` . Here, the > > > application > > > > > > wants to deduplicate transactions. It knows that a transaction id > > > > > > maps to a single userId. Any duplicate of that record would be > > > received > > > > > > by the task which processes this userId. > > > > > > > > > > This is an interesting point. > > > > > > > > > > My concern is to some extend, that it seems (on the surface) to not > > > > > follow the established pattern of auto-repartitioning in the DSL. > Of > > > > > course, given that the current proposal says we use an "id > extractor" > > > > > and not a "key extractor" it might be ok (but it might be somewhat > > > > > subtle). Of course, JavaDocs always help to explain in detail. > Would > > > > > this be enough? > > > > > > > > > > Would be good to hear from others about this point. I am personally > > not > > > > > sure which approach I would prefer personally at this point. > > > > > > > > > > The problem reminds me on > > > > > https://issues.apache.org/jira/browse/KAFKA-10844< > > > > > https://issues.apache.org/jira/browse/KAFKA-10844> which is not > > > resolve > > > > > directly either. We do have KIP-759 > > > > > (https://cwiki.apache.org/confluence/display/KAFKA/KIP-759: > Unneeded > > > > > repartition canceling< > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-759: > Unneeded > > > > > repartition canceling>) > > > > > which is WIP and helps with KAFKA-10844, but not sure if it would > be > > a > > > > > viable solution for the de-duplication case? > > > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > This email was screened for spam and malicious content but exercise > > > > > caution anyway. > > > > > > > > > > > > > > > > > > > > > > > > > On 6/11/24 2:31 PM, Ayoub Omari wrote: > > > > > > Hi Sebastien & Matthias, > > > > > > > > > > > > For 106. > > > > > > My idea was to deduplicate on a per-task basis. If the user wants > > > > > > to do a global deduplication over all partitions, I think it's > > > better to > > > > > > have him explicitly repartition and then call the deduplication > > > > > processor. > > > > > > > > > > > > For the use-case of deduplicating a "at least once written" > stream, > > > > > > we are sure that the duplicate record has the same key as the > > > > > > original one, and will land on the same task. Here, a user would > > > > > > want to specify a deduplication key different from the topic's > key > > > > > > in case the topic's key is not a unique identifier. > > > > > > > > > > > > For example, we have a topic with keyValue (`userId`, > > `transaction`) > > > > > > and deduplication is done on `transaction`.`id` . Here, the > > > application > > > > > > wants to deduplicate transactions. It knows that a transaction id > > > > > > maps to a single userId. Any duplicate of that record would be > > > received > > > > > > by the task which processes this userId. > > > > > > > > > > > > One other thought I have when writing the KIP about global > > > deduplication, > > > > > > is that it will require to map twice the key of the stream (first > > > map to > > > > > > change the key to deduplication key, and second map to get > > > > > > back the initial key). Second map may imply a second > > repartitioning. > > > > > > > > > > > > However, if we do a per-task deduplication, the user may adapt to > > his > > > > > > specific use-case. > > > > > > > > > > > > Let me know what you think > > > > > > Ayoub > > > > > > > > > > > > > > > > > > > > > > > > Le mar. 11 juin 2024 à 20:21, Matthias J. Sax <mj...@apache.org> > a > > > > > écrit : > > > > > > > > > > > >> Thanks Sebastien, > > > > > >> > > > > > >> that's a good point. Thanks for raising it. -- I like your > > proposal. > > > > > >> > > > > > >> An alternative would be to have two overloads of `deduplicate()` > > > one w/ > > > > > >> and one w/o the "id extractor" parameter. This would be less > > > explicit > > > > > >> though. > > > > > >> > > > > > >> > > > > > >> -Matthias > > > > > >> > > > > > >> On 6/11/24 2:30 AM, Sebastien Viale wrote: > > > > > >>> Hi, > > > > > >>> > > > > > >>> I am really interested in this KIP. > > > > > >>> > > > > > >>> 106: > > > > > >>> I hope I am not talking nonsense, but if you do not deduplicate > > > based > > > > > on > > > > > >> the key, the input stream has to be repartitioned. > > > > > >>> Otherwise, different stream tasks may handle records that need > to > > > be > > > > > >> deduplicated, and thus duplicates will not be detected. > > > > > >>> > > > > > >>> This is why I would have created two different methods, as is > > done > > > for > > > > > >> GroupBy: > > > > > >>> > > > > > >>> deduplicateByKey(...) > > > > > >>> deduplicate(...) > > > > > >>> > > > > > >>> If deduplicateByKey is used, the input stream does not need to > be > > > > > >> repartitioned. > > > > > >>> > > > > > >>> thanks > > > > > >>> > > > > > >>> Sébastien > > > > > >>> ________________________________ > > > > > >>> De : Matthias J. Sax <mj...@apache.org> > > > > > >>> Envoyé : mardi 11 juin 2024 01:54 > > > > > >>> À : dev@kafka.apache.org <dev@kafka.apache.org> > > > > > >>> Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication > processor > > in > > > > > >> kafka-streams > > > > > >>> > > > > > >>> Warning External sender Do not click on any links or open any > > > > > >> attachments unless you trust the sender and know the content is > > > safe. > > > > > >>> > > > > > >>> Thanks for the update Ayoub. > > > > > >>> > > > > > >>> > > > > > >>> 101: you say: > > > > > >>> > > > > > >>>> But I am not sure if we don't want to have them for this > > > processor ? > > > > > >>> > > > > > >>> What is your reasoning to move off the established pattern? > Would > > > be > > > > > >>> good to understand, why `Deduplicated` class needs a different > > > > > >>> "structure" compared to existing classes. > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> 102: Creating iterators is very expensive. For other work, we > > > actually > > > > > >>> hit 100x (?) throughput degradation by creating an (for most > > cases > > > > > >>> empty) iterator for every input record, and needed to find > other > > > ways > > > > > to > > > > > >>> avoid creating an iterator per record. It really kills > > performance. > > > > > >>> > > > > > >>> I see the point about data expiration. We could experiment with > > > > > >>> punctuation to expire old data, or add a second "time-ordered > > > store" > > > > > >>> (which we already have at hand) which acts as an index into the > > > main > > > > > >>> store. -- Another possibility would be to add a new version of > > > > > segmented > > > > > >>> store with a different key-layout (ie, just store the plain > > key). I > > > > > >>> think with some refactoring, we might be able to re-use a lot > of > > > > > >>> existing code. > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> 104: > > > > > >>> > > > > > >>>> This gets me wondering if this is a limitation of stateful > > > processors > > > > > >>>> in ALOS. For example a windowed aggregation with > > `on_window_close` > > > > > >>>> emit strategy may have the same limitation today (we receive a > > > record, > > > > > >>>> we update its aggregation result in the store, then crash > before > > > > > >> committing, > > > > > >>>> then the record will be again reconsidered for aggregation). > Is > > > this > > > > > >>>> correct ? > > > > > >>> > > > > > >>> Yes, this is correct, but it does not violate ALOS, because we > > did > > > not > > > > > >>> lose the input record -- of course, the aggregation would > contain > > > the > > > > > >>> input record twice (eg, over count), but this is ok under ALOS. > > > > > >>> Unfortunately, for de-duplication this pattern breaks, because > > > > > >>> de-duplication operator does a different "aggregation logic" > > > depending > > > > > >>> on its state (emit if no key found, but not emit if key found). > > For > > > > > >>> counting as an example, we increment the count and emit > > > unconditionally > > > > > >>> though. > > > > > >>> > > > > > >>> > > > > > >>>> As a workaround, I think storing the record's offset inside > the > > > > > >>>> store's value can tell us whether the record has been already > > > seen or > > > > > >> not. > > > > > >>>> If we receive a record whose deduplication id exists in the > > store > > > > > >>>> and the entry in the store has the same offset, it means the > > > record > > > > > >>>> is processed twice and we can go ahead and forward it. If the > > > offset > > > > > >>>> is different, it means it's a duplicate record, so we ignore > it. > > > > > >>> > > > > > >>> Great idea. This might work... If we store the input record > > > offset, we > > > > > >>> can actually avoid that the "aggregation logic" changes for the > > > same > > > > > >>> input record. -- And yes, with ALOS potentially emitting a > > > duplicate is > > > > > >>> the-name-of-the-game, so no concerns on this part from my side. > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> 105: picking the first offset with smallest ts sound good to > me. > > > The > > > > > KIP > > > > > >>> should be explicit about it. But as discussed above, it might > be > > > > > >>> simplest to not really have a window lookup, but just a plain > > > > > key-lookup > > > > > >>> and drop if the key exists in the store? -- For late records, > it > > > might > > > > > >>> imply that they are not de-duplicated, but this is also the > case > > > for > > > > > >>> in-order records if they are further apart than the > > de-duplication > > > > > >>> window size, right? Thus I would believe this is "more natural" > > > > > compared > > > > > >>> to discarding late records pro-actively, which would lead to > > > missing > > > > > >>> result records? > > > > > >>> > > > > > >>> We could also make this configurable if we are not sure what > > users > > > > > >>> really need -- or add such a configuration later in case the > > > semantics > > > > > >>> we pick don't work for some users. > > > > > >>> > > > > > >>> Another line of thinking, that did serve us well in the past: > in > > > doubt > > > > > >>> keep a record -- users can add operators to drop record (in > case > > > they > > > > > >>> don't want to keep it), but if we drop a record, users have no > > way > > > to > > > > > >>> resurrect it (thus, building a workaround to change semantica > is > > > > > >>> possible for users if we default to keep records, but not the > > > other way > > > > > >>> around). > > > > > >>> > > > > > >>> Would be good to get input from the broader community on this > > > question > > > > > >>> thought. In the end, it must be a use-case driven decision? > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> -Matthias > > > > > >>> > > > > > >>> This email was screened for spam and malicious content but > > exercise > > > > > >> caution anyway. > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> On 6/6/24 5:02 AM, Ayoub Omari wrote: > > > > > >>>> Hi Matthias, > > > > > >>>> > > > > > >>>> Thank you for your review ! > > > > > >>>> > > > > > >>>> 100. > > > > > >>>> I agree. I changed the name of the parameter to "idSelector". > > > > > >>>> Because this id may be computed, It is better to call it "id" > > > rather > > > > > >> than > > > > > >>>> field or attribute. > > > > > >>>> > > > > > >>>> 101. > > > > > >>>> The reason I added the methods `keySerde()` and `valueSerde()` > > > was to > > > > > >>>> have the same capabilities as other serde classes (such as > > Grouped > > > > > >>>> or Joined). As a Kafka-streams user, I usually use > > `with(keySerde, > > > > > >>>> valueSerde)` > > > > > >>>> as you suggested. But I am not sure if we don't want to have > > them > > > for > > > > > >> this > > > > > >>>> processor ? > > > > > >>>> > > > > > >>>> 102. > > > > > >>>> That's a good point ! Because we know that the window store > will > > > > > contain > > > > > >>>> at most one instance of a given key, I am not sure how the > range > > > fetch > > > > > >>>> on WindowStore compares to a KeyValueStore `get()` in this > case. > > > > > >>>> Wondering if the fact that the record's key is the prefix of > the > > > > > >> underlying > > > > > >>>> keyValueStore's key ("<dataKey,ts>") may provide comparable > > > > > performance > > > > > >>>> to the random access of KeyValueStore ? Of course, the > > WindowStore > > > > > >> fetch() > > > > > >>>> would be less efficient because it may fetch from more than 1 > > > segment, > > > > > >> and > > > > > >>>> because of some iterator overhead. > > > > > >>>> > > > > > >>>> The purpose of using a WindowStore is to automatically purge > old > > > data. > > > > > >>>> For example, deduplicating a topic written at least once > > wouldn't > > > > > >> require > > > > > >>>> keeping a large history. This is not the case of using a > > > KeyValueStore > > > > > >>>> which would require scanning regularly to remove expired > > records. > > > > > >>>> That might cause a sudden increase of latency whenever the > > cleanup > > > > > >>>> is triggered. > > > > > >>>> > > > > > >>>> It would be good to hear from anyone who has done some > analysis > > > > > >>>> on RocksDB's range fetch. > > > > > >>>> > > > > > >>>> 103. > > > > > >>>> Sure, I can update it once we agree on underlying semantics. > > > > > >>>> > > > > > >>>> 104. > > > > > >>>> Another good point ! > > > > > >>>> > > > > > >>>>> In the end, de-duplication does only make sense when EOS is > > used > > > > > >>>> > > > > > >>>> I agree with that. And for me, the use case of deduplicating a > > > topic > > > > > >>>> written ALOS inside an EOS application might be the top 1 use > > case > > > > > >>>> of deduplication. > > > > > >>>> > > > > > >>>>> all downstream processing happens, `context.forward()` > returns > > > > > >>>>> and we update the state store, we could now crash w/o > > committing > > > > > >> offsets > > > > > >>>> > > > > > >>>> This gets me wondering if this is a limitation of stateful > > > processors > > > > > >>>> in ALOS. For example a windowed aggregation with > > `on_window_close` > > > > > >>>> emit strategy may have the same limitation today (we receive a > > > record, > > > > > >>>> we update its aggregation result in the store, then crash > before > > > > > >> committing, > > > > > >>>> then the record will be again reconsidered for aggregation). > Is > > > this > > > > > >>>> correct ? > > > > > >>>> > > > > > >>>> As a workaround, I think storing the record's offset inside > the > > > > > >>>> store's value can tell us whether the record has been already > > > seen or > > > > > >> not. > > > > > >>>> If we receive a record whose deduplication id exists in the > > store > > > > > >>>> and the entry in the store has the same offset, it means the > > > record > > > > > >>>> is processed twice and we can go ahead and forward it. If the > > > offset > > > > > >>>> is different, it means it's a duplicate record, so we ignore > it. > > > > > >>>> > > > > > >>>> As you said, we don't have any guarantee whether the initial > > > record > > > > > was > > > > > >>>> forwarded or not in case of a crash before commit. In this > > > solution > > > > > >>>> we would forward the record twice, which is against > > deduplication. > > > > > >>>> But, this is still an ALOS application, so it has the same > > > semantics > > > > > >>>> as any other such application. With this, I am not sure we can > > > > > >>>> have "strict" deduplication for ALOS applications. > > > > > >>>> > > > > > >>>> 105. > > > > > >>>> For me, if there are two duplicate records, it means they are > > > > > >>>> the same in the application's point of view, so it can choose > > > > > >>>> either one. Thus, I would go with forwarding the record with > > > > > >>>> the least offset. > > > > > >>>> > > > > > >>>>> Would it not be desired to drop all duplicates independent > > > > > >>>>> of their ts, as long as we find a record in the store? > > > > > >>>> > > > > > >>>> This is actually related to the (suggested) windowed nature > > > > > >>>> of deduplication. As in 102. we don't want to do a "forever" > > > > > >>>> deduplication, which may be impossible for huge workloads > > > > > >>>> where all records should be kept in the store. Hence, the > fetch > > > > > >>>> of timestamp between [ts-deduplicationInterval, > > > > > >> ts+deduplicationInterval] > > > > > >>>> > > > > > >>>> About late records, this is again due to the windowed nature. > > > > > >>>> Because the store won't save those late (i.e. expired) > records, > > > > > >>>> we have two options. Either, we do not apply deduplication > > > > > >>>> on them, thus the deduplication doesn't work on late records, > > > > > >>>> or we discard them (which is the option I suggest). > > > > > >>>> In the second case, It would be up to the user to choose > > > > > >>>> any deduplicationInterval that may sufficiently cover all his > > late > > > > > data. > > > > > >>>> What do you think ? > > > > > >>>> > > > > > >>>> Thanks, > > > > > >>>> Ayoub > > > > > >>>> > > > > > >>>> > > > > > >>>> > > > > > >>>> > > > > > >>>> > > > > > >>>> > > > > > >>>> > > > > > >>>> > > > > > >>>> Le mar. 4 juin 2024 à 23:58, Matthias J. Sax < > mj...@apache.org> > > a > > > > > >> écrit : > > > > > >>>> > > > > > >>>>> Ayoub, > > > > > >>>>> > > > > > >>>>> thanks for resurrecting this KIP. I think a built-in > > > de-duplication > > > > > >>>>> operator will be very useful. > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> Couple of questions: > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> 100: `deduplicationKeySelector` > > > > > >>>>> > > > > > >>>>> Is this the best name? It might indicate that we select a > "key" > > > what > > > > > is > > > > > >>>>> an overloaded term... Maybe we could use `Field` or `Id` or > > > > > `Attribute` > > > > > >>>>> instead of `Key` in the name? Just brainstorming. If we think > > > `Key` > > > > > is > > > > > >>>>> the best word, I am also ok with it. > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> 101: `Deduplicated` class > > > > > >>>>> > > > > > >>>>> You propose to add static methods `keySerde()` and > > > `valueSerde()` -- > > > > > in > > > > > >>>>> other config classes, we use only `with(keySerde, > valueSerde)` > > > as we > > > > > >> try > > > > > >>>>> to use the "builder" pattern, and avoid too many overloads. I > > > would > > > > > >>>>> prefer to omit both methods you suggest and just use a single > > > `with` > > > > > >> for > > > > > >>>>> both serdes. > > > > > >>>>> > > > > > >>>>> Similarly, I thing we don't want to add `with(...)` which > takes > > > all > > > > > >>>>> parameters at once (which should only be 3 parameters, not 4 > as > > > it's > > > > > >>>>> currently in the KIP)? > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> 102: Usage of `WindowedStore`: > > > > > >>>>> > > > > > >>>>> Would this be efficient? The physical byte layout it > > > "<dataKey,ts>" > > > > > for > > > > > >>>>> the store key, so it would be difficult to do an efficient > > lookup > > > > > for a > > > > > >>>>> given "de-duplication key" to discard duplicates, as we don't > > > know > > > > > the > > > > > >>>>> timestamp of the first record with the same "de-duplication > > key". > > > > > >>>>> > > > > > >>>>> This boils down to the actual de-duplication logic (some more > > > > > comments > > > > > >>>>> below), but what you propose seems to require expensive > > > range-scans > > > > > >> what > > > > > >>>>> could be cost prohibitive in practice. I think we need to > find > > a > > > way > > > > > to > > > > > >>>>> use efficient key-point-lookups to make this work. > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> 103: "Processing logic": > > > > > >>>>> > > > > > >>>>> Might need some updates (Cf 102 comment). I am not sure if I > > > fully > > > > > >>>>> understand the logic: cf 105 below. > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> 104: > > > > > >>>>> > > > > > >>>>>> If no entries found → forward the record + save the record > in > > > the > > > > > >> store > > > > > >>>>> > > > > > >>>>> This part is critical, and we should discuss in detail. In > the > > > end, > > > > > >>>>> de-duplication does only make sense when EOS is used, and we > > > might > > > > > want > > > > > >>>>> to call this out (eg, on the JavaDocs)? But if used with > ALOS, > > > it's > > > > > >> very > > > > > >>>>> difficult to ensure that we never lose data... Your proposal > to > > > > > >>>>> first-forward goes into the right direction, but does not > > really > > > > > solve > > > > > >>>>> the problem entirely: > > > > > >>>>> > > > > > >>>>> Even if we forward the message first, all downstream > processing > > > > > >> happens, > > > > > >>>>> `context.forward()` returns and we update the state store, we > > > could > > > > > now > > > > > >>>>> crash w/o committing offsets. For this case, we have no > > guarantee > > > > > that > > > > > >>>>> the result records where published (as we did not flush the > > > producer > > > > > >>>>> yet), but when re-reading from the input topic, we would find > > the > > > > > >> record > > > > > >>>>> in the store and incorrectly drop as duplicate... > > > > > >>>>> > > > > > >>>>> I think the only solution to make this work would be to use > > > TX-state > > > > > >>>>> stores in combination with ALOS as proposed via KIP-892? > > > > > >>>>> > > > > > >>>>> Using an in-memory store won't help much either? The producer > > > could > > > > > >> have > > > > > >>>>> send the write into the changelog topic, but not into the > > result > > > > > topic, > > > > > >>>>> and thus we could still not guarantee ALOS...? > > > > > >>>>> > > > > > >>>>> How do we want to go about this? We could also say, this new > > > operator > > > > > >>>>> only works with EOS. Would this be too restrictive? -- At > lest > > > for > > > > > >> know, > > > > > >>>>> until KIP-892 lands, and we could relax it? > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> 105: "How to detect late records" > > > > > >>>>> > > > > > >>>>> In the end, it seems to boil down to determine which of the > > > records > > > > > to > > > > > >>>>> forward and which record to drop, for (1) the regular case > and > > > (2) > > > > > the > > > > > >>>>> out-of-order data case. > > > > > >>>>> > > > > > >>>>> Regular case (no out-of-order data): For this case, offset > and > > ts > > > > > order > > > > > >>>>> is the same, and we can forward the first record we get. All > > > later > > > > > >>>>> record within "de-duplication period" with the same > > > "de-duplication > > > > > >> key" > > > > > >>>>> would be dropped. If a record with the same "de-duplication > > key" > > > > > >> arrives > > > > > >>>>> after "de-duplication period" passed, we cannot drop it any > > > longer, > > > > > but > > > > > >>>>> would still forward it, as by the contract of the operator / > > > > > >>>>> de-duplication period. > > > > > >>>>> > > > > > >>>>> For the out-of-order case: The first question we need to > answer > > > is, > > > > > do > > > > > >>>>> we want to forward the record with the smallest offset or the > > > record > > > > > >>>>> with the smallest ts? Logically, forwarding with the smallest > > ts > > > > > might > > > > > >>>>> be "more correct", however, it implies we could only forward > it > > > after > > > > > >>>>> "de-duplication period" passed, what might be undesired > > latency? > > > > > Would > > > > > >>>>> this be desired/acceptable? > > > > > >>>>> > > > > > >>>>> In contrast, if we forward record with the smallest offset > > (this > > > is > > > > > >> what > > > > > >>>>> you seem to propose) we don't have a latency issue, but of > > > course the > > > > > >>>>> question what records to drop is more tricky to answer: it > > seems > > > you > > > > > >>>>> propose to compare the time difference of the stored record > to > > > the > > > > > >>>>> current record, but I am wondering why? Would it not be > desired > > > to > > > > > drop > > > > > >>>>> all duplicates independent of their ts, as long as we find a > > > record > > > > > in > > > > > >>>>> the store? Would be good to get some more motivation and > > > tradeoffs > > > > > >>>>> discussed about the different strategies we could use. > > > > > >>>>> > > > > > >>>>> You also propose to drop _any_ late record: I am also not > sure > > if > > > > > >> that's > > > > > >>>>> desired? Could this not lead to data loss? Assume we get a > late > > > > > record, > > > > > >>>>> but in fact there was never a duplicate? Why would we want to > > > drop > > > > > it? > > > > > >>>>> If there is a late record which is indeed a duplicate, but we > > > purged > > > > > >> the > > > > > >>>>> original record from the store already, it seems to be the > same > > > case > > > > > as > > > > > >>>>> for the "no out-of-order case": after we purged we cannot > > > > > de-duplicate > > > > > >>>>> and thus it's a regular case we can just accept? > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> -Matthias > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> On 5/29/24 4:58 AM, Ayoub Omari wrote: > > > > > >>>>>> Hi everyone, > > > > > >>>>>> > > > > > >>>>>> I've just made a (small) change to this KIP about an > > > implementation > > > > > >>>>> detail. > > > > > >>>>>> Please let me know your thoughts. > > > > > >>>>>> > > > > > >>>>>> Thank you, > > > > > >>>>>> Ayoub > > > > > >>>>>> > > > > > >>>>>> Le lun. 20 mai 2024 à 21:13, Ayoub <ayoubomar...@gmail.com> > a > > > > > écrit : > > > > > >>>>>> > > > > > >>>>>>> Hello, > > > > > >>>>>>> > > > > > >>>>>>> Following a discussion on community slack channel, I would > > > like to > > > > > >>>>> revive > > > > > >>>>>>> the discussion on the KIP-655, which is about adding a > > > > > deduplication > > > > > >>>>>>> processor in kafka-streams. > > > > > >>>>>>> > > > > > >>>>>>> > > > > > >>>>>>> > > > > > >>>>> > > > > > >> > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API > > > > > < > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API > > > > > > > > > > > >> < > > > > > >> > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API > > > > > < > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API > > > > > > > > > > > >>> > > > > > >>>>>>> > > > > > >>>>>>> Even though the motivation is not quite the same as the > > initial > > > > > one, > > > > > >> I > > > > > >>>>>>> updated the KIP rather than creating a new one, as I > believe > > > the > > > > > end > > > > > >>>>> goal > > > > > >>>>>>> is the same. > > > > > >>>>>>> > > > > > >>>>>>> Thanks, > > > > > >>>>>>> Ayoub > > > > > >>>>>>> > > > > > >>>>>>> > > > > > >>>>>>> > > > > > >>>>>> > > > > > >>>>> > > > > > >>>> > > > > > >> > > > > > > > > > > > > > > > > > > > > >