Hi Ayoub, What's the status of this KIP?
Regards, Bill On Sat, Jul 13, 2024 at 5:13 AM Ayoub Omari <ayoubomar...@gmail.com> wrote: > Hi Bill, > > Thanks for your comments ! > > I am prefixing your points with B. > > B1. > That's a good catch. I think punctuated records aren't concerned by > this problem of reprocessing because they are not issued twice. > We will still need to make a difference between a punctuated > and a source record. The logic becomes: > > > if record.offset is not null && record.offset = offset_stored: > > forward(record) > > else: > > // do_not_forward > > > I will update the KIP on this point. > > > > B2. (i.e. 106) > That's the point discussed in 106. The main idea is to avoid repartitioning > when we don't have to. But let's say we want to repartition when > deduplication is on an id that is not the key. > > The user code may look something like this: > ```kstream > .deduplicate((k, v) -> v.id) > .map((k, v) -> ...) > ``` > The user expects that the key in the map processor which follows the > deduplicate is the same as the initial key. > > Internally, if we decide to repartition the deduplication processor may do > the following: > ```kstream > .map((k, v) -> (v.id, ...)) // For repartitioning on id > .deduplicateByKey() > .map((id, v) -> (initialKey, ...)) // To recover the initial key > ``` > > In this case, there is some internal complexity of this processor: > I) We should repartition twice, before and after deduplication. The > second > map triggers repartitioning if there are following stateful processors > in the pipeline. > > II.) The initial key should be kept somewhere. One way is to wrap the > value > to add the key, and then unwrap it after the `deduplicateByKey` > > As mentioned in one use case above (deduplicating transactions - written at > least > once - by id), repartition is not needed for that case. This added > complexity won't > be useful. > > *TLDR,* I think one solution is to have a separate api to cover such a use > case. In total, > we can have 3 apis of deduplication > > 1) `deduplicateByKey()` *// No repartitioning* > 2) `deduplicateByKey((k, v) -> v.id)` *or *`deduplicateByKeyAndId((k, v) > -> v.id)` *// No repartitioning* > 3) `deduplicate((k, v) -> v.id)` *// causes repartitioning* > > The second api is equivalent to deduplicate by the couple (key, id). > No repartitioning is required because the records with the same key are > already > together in the same partition. cc @Matthias @Sebastien. > > > > B3. > I think here we can do what some existing processors do for periodic work, > we can track the last cleanup time and the current stream time, > if the delta exceeds the deduplication period, we trigger the cleanup. > > > Best, > Ayoub > > Le jeu. 11 juil. 2024 à 01:17, Bill Bejeck <bbej...@apache.org> a écrit : > > > Hi All, > > > > Thanks Ayoub for getting this KIP going again, I think a deduplication > > operator will be very useful. My applogies for being late to the > > discussion. > > > > Overall - I agree with the direction of the KIP but I have a couple of > > questions. > > > > 1. Regarding using offsets for tracking the first arrival. I think > > there could be a case when offsets would not be available, when records > are > > forwarded by punctuation for example. > > > > 2. I'm not sure about using something other than the key for identifying > > dupllicate records. By doing so, one could end up missing a > de-duplication > > due to records with the same id characteristic in the value but having > > different keys so the records land on different partitions. I guess we > > could enforce a repartition if one choses to use a de-duplication id > other > > than the key. > > > > 3. I think a punctuation scheduled to run at the de-duplication period to > > clean out older records would be a clean approach for purging older > > records. I'm not taking a hard stance on this approach and I'm willing > to > > discuss different methods. > > > > Thanks, > > > > Bill > > > > > > > > On 2024/06/25 18:44:06 Ayoub Omari wrote: > > > Hi Matthias, > > > > > > Here are my updates on your points. > > > > > > 101. > > > > You propose to add static methods `keySerde()` and `valueSerde()` -- > > > > in other config classes, we use only `with(keySerde, valueSerde)` as > we > > > try > > > > to use the "builder" pattern, and avoid too many overloads. I would > > > > prefer to omit both methods you suggest and just use a single `with` > > for > > > > both serdes. > > > > > > I was actually inspired by the other config classes, for example > `Joined` > > > and > > > `Grouped` both have the static methods `keySerde()` and `valueSerde()`. > > > > > > > I think we don't want to add `with(...)` which takes all > > > > parameters at once > > > > > > Done. > > > > > > > > > 102. > > > Thanks, your suggestion sounds good to me. The trade-off of having an > > index > > > that allows to efficiently purge expired records besides the keyValue > > store > > > makes sense. I've been looking into the code, and I think a similar > idea > > > was implemented for other processors (for example with > > > DualSchemaRocksDBSegmentedBytesStore). > > > As you said, I think we would benefit from some existing code here. > > > KIP updated ! > > > > > > > > > 104. > > > Updated the KIP to consider records' offsets. > > > > > > > > > 105 > > > > picking the first offset with smallest ts sounds good to me. The KIP > > > > should be explicit about it > > > > > > Done. > > > > > > > But as discussed above, it might be > > > > simplest to not really have a window lookup, but just a plain > > key-lookup > > > > and drop if the key exists in the store? > > > > > > KIP updated, we will be `.get()`ing from a keyValueStore instead of > > > `.fetch()`ing > > > from a WindowStore. > > > > > > > Another line of thinking, that did serve us well in the past: in > doubt > > > > keep a record -- users can add operators to drop record (in case they > > > > don't want to keep it), but if we drop a record, users have no way to > > > > resurrect it (thus, building a workaround to change semantica is > > > > possible for users if we default to keep records, but not the other > way > > > > around). > > > > > > Makes total sense ! I updated the KIP to forward late records instead > of > > > dropping them. > > > > > > > > > 106. > > > For the moment, I highlighted in Javadocs that we are deduplicating by > > > partition. If there is a better name to have this information in the > name > > > of the api itself it would be good. > > > > > > > > > Best, > > > Ayoub > > > > > > > > > Le jeu. 13 juin 2024 à 09:03, Sebastien Viale < > > sebastien.vi...@michelin.com> > > > a écrit : > > > > > > > > > > > Hi, > > > > > > > > 106 : > > > > > > > > Thanks for the clarification. Actually, this is not what I expected, > > but I > > > > better understand the performance issues regarding the state store > > > > iteration. > > > > If this is how it should be designed, it is fine for me as long as it > > is > > > > clear that the repartition must be done before the deduplication. > > > > Sébastien > > > > > > > > ________________________________ > > > > De : Matthias J. Sax <mj...@apache.org> > > > > Envoyé : jeudi 13 juin 2024 02:51 > > > > À : dev@kafka.apache.org <dev@kafka.apache.org> > > > > Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication processor in > > > > kafka-streams > > > > > > > > Warning External sender Do not click on any links or open any > > attachments > > > > unless you trust the sender and know the content is safe. > > > > > > > > 106: > > > > > > > > > For the use-case of deduplicating a "at least once written" stream, > > > > > we are sure that the duplicate record has the same key as the > > > > > original one, and will land on the same task. Here, a user would > > > > > want to specify a deduplication key different from the topic's key > > > > > in case the topic's key is not a unique identifier > > > > > > > > > > For example, we have a topic with keyValue (`userId`, > `transaction`) > > > > > and deduplication is done on `transaction`.`id` . Here, the > > application > > > > > wants to deduplicate transactions. It knows that a transaction id > > > > > maps to a single userId. Any duplicate of that record would be > > received > > > > > by the task which processes this userId. > > > > > > > > This is an interesting point. > > > > > > > > My concern is to some extend, that it seems (on the surface) to not > > > > follow the established pattern of auto-repartitioning in the DSL. Of > > > > course, given that the current proposal says we use an "id extractor" > > > > and not a "key extractor" it might be ok (but it might be somewhat > > > > subtle). Of course, JavaDocs always help to explain in detail. Would > > > > this be enough? > > > > > > > > Would be good to hear from others about this point. I am personally > not > > > > sure which approach I would prefer personally at this point. > > > > > > > > The problem reminds me on > > > > https://issues.apache.org/jira/browse/KAFKA-10844< > > > > https://issues.apache.org/jira/browse/KAFKA-10844> which is not > > resolve > > > > directly either. We do have KIP-759 > > > > (https://cwiki.apache.org/confluence/display/KAFKA/KIP-759: Unneeded > > > > repartition canceling< > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-759: Unneeded > > > > repartition canceling>) > > > > which is WIP and helps with KAFKA-10844, but not sure if it would be > a > > > > viable solution for the de-duplication case? > > > > > > > > > > > > > > > > -Matthias > > > > > > > > This email was screened for spam and malicious content but exercise > > > > caution anyway. > > > > > > > > > > > > > > > > > > > > On 6/11/24 2:31 PM, Ayoub Omari wrote: > > > > > Hi Sebastien & Matthias, > > > > > > > > > > For 106. > > > > > My idea was to deduplicate on a per-task basis. If the user wants > > > > > to do a global deduplication over all partitions, I think it's > > better to > > > > > have him explicitly repartition and then call the deduplication > > > > processor. > > > > > > > > > > For the use-case of deduplicating a "at least once written" stream, > > > > > we are sure that the duplicate record has the same key as the > > > > > original one, and will land on the same task. Here, a user would > > > > > want to specify a deduplication key different from the topic's key > > > > > in case the topic's key is not a unique identifier. > > > > > > > > > > For example, we have a topic with keyValue (`userId`, > `transaction`) > > > > > and deduplication is done on `transaction`.`id` . Here, the > > application > > > > > wants to deduplicate transactions. It knows that a transaction id > > > > > maps to a single userId. Any duplicate of that record would be > > received > > > > > by the task which processes this userId. > > > > > > > > > > One other thought I have when writing the KIP about global > > deduplication, > > > > > is that it will require to map twice the key of the stream (first > > map to > > > > > change the key to deduplication key, and second map to get > > > > > back the initial key). Second map may imply a second > repartitioning. > > > > > > > > > > However, if we do a per-task deduplication, the user may adapt to > his > > > > > specific use-case. > > > > > > > > > > Let me know what you think > > > > > Ayoub > > > > > > > > > > > > > > > > > > > > Le mar. 11 juin 2024 à 20:21, Matthias J. Sax <mj...@apache.org> a > > > > écrit : > > > > > > > > > >> Thanks Sebastien, > > > > >> > > > > >> that's a good point. Thanks for raising it. -- I like your > proposal. > > > > >> > > > > >> An alternative would be to have two overloads of `deduplicate()` > > one w/ > > > > >> and one w/o the "id extractor" parameter. This would be less > > explicit > > > > >> though. > > > > >> > > > > >> > > > > >> -Matthias > > > > >> > > > > >> On 6/11/24 2:30 AM, Sebastien Viale wrote: > > > > >>> Hi, > > > > >>> > > > > >>> I am really interested in this KIP. > > > > >>> > > > > >>> 106: > > > > >>> I hope I am not talking nonsense, but if you do not deduplicate > > based > > > > on > > > > >> the key, the input stream has to be repartitioned. > > > > >>> Otherwise, different stream tasks may handle records that need to > > be > > > > >> deduplicated, and thus duplicates will not be detected. > > > > >>> > > > > >>> This is why I would have created two different methods, as is > done > > for > > > > >> GroupBy: > > > > >>> > > > > >>> deduplicateByKey(...) > > > > >>> deduplicate(...) > > > > >>> > > > > >>> If deduplicateByKey is used, the input stream does not need to be > > > > >> repartitioned. > > > > >>> > > > > >>> thanks > > > > >>> > > > > >>> Sébastien > > > > >>> ________________________________ > > > > >>> De : Matthias J. Sax <mj...@apache.org> > > > > >>> Envoyé : mardi 11 juin 2024 01:54 > > > > >>> À : dev@kafka.apache.org <dev@kafka.apache.org> > > > > >>> Objet : [EXT] Re: [DISCUSS] KIP-655: Add deduplication processor > in > > > > >> kafka-streams > > > > >>> > > > > >>> Warning External sender Do not click on any links or open any > > > > >> attachments unless you trust the sender and know the content is > > safe. > > > > >>> > > > > >>> Thanks for the update Ayoub. > > > > >>> > > > > >>> > > > > >>> 101: you say: > > > > >>> > > > > >>>> But I am not sure if we don't want to have them for this > > processor ? > > > > >>> > > > > >>> What is your reasoning to move off the established pattern? Would > > be > > > > >>> good to understand, why `Deduplicated` class needs a different > > > > >>> "structure" compared to existing classes. > > > > >>> > > > > >>> > > > > >>> > > > > >>> 102: Creating iterators is very expensive. For other work, we > > actually > > > > >>> hit 100x (?) throughput degradation by creating an (for most > cases > > > > >>> empty) iterator for every input record, and needed to find other > > ways > > > > to > > > > >>> avoid creating an iterator per record. It really kills > performance. > > > > >>> > > > > >>> I see the point about data expiration. We could experiment with > > > > >>> punctuation to expire old data, or add a second "time-ordered > > store" > > > > >>> (which we already have at hand) which acts as an index into the > > main > > > > >>> store. -- Another possibility would be to add a new version of > > > > segmented > > > > >>> store with a different key-layout (ie, just store the plain > key). I > > > > >>> think with some refactoring, we might be able to re-use a lot of > > > > >>> existing code. > > > > >>> > > > > >>> > > > > >>> > > > > >>> 104: > > > > >>> > > > > >>>> This gets me wondering if this is a limitation of stateful > > processors > > > > >>>> in ALOS. For example a windowed aggregation with > `on_window_close` > > > > >>>> emit strategy may have the same limitation today (we receive a > > record, > > > > >>>> we update its aggregation result in the store, then crash before > > > > >> committing, > > > > >>>> then the record will be again reconsidered for aggregation). Is > > this > > > > >>>> correct ? > > > > >>> > > > > >>> Yes, this is correct, but it does not violate ALOS, because we > did > > not > > > > >>> lose the input record -- of course, the aggregation would contain > > the > > > > >>> input record twice (eg, over count), but this is ok under ALOS. > > > > >>> Unfortunately, for de-duplication this pattern breaks, because > > > > >>> de-duplication operator does a different "aggregation logic" > > depending > > > > >>> on its state (emit if no key found, but not emit if key found). > For > > > > >>> counting as an example, we increment the count and emit > > unconditionally > > > > >>> though. > > > > >>> > > > > >>> > > > > >>>> As a workaround, I think storing the record's offset inside the > > > > >>>> store's value can tell us whether the record has been already > > seen or > > > > >> not. > > > > >>>> If we receive a record whose deduplication id exists in the > store > > > > >>>> and the entry in the store has the same offset, it means the > > record > > > > >>>> is processed twice and we can go ahead and forward it. If the > > offset > > > > >>>> is different, it means it's a duplicate record, so we ignore it. > > > > >>> > > > > >>> Great idea. This might work... If we store the input record > > offset, we > > > > >>> can actually avoid that the "aggregation logic" changes for the > > same > > > > >>> input record. -- And yes, with ALOS potentially emitting a > > duplicate is > > > > >>> the-name-of-the-game, so no concerns on this part from my side. > > > > >>> > > > > >>> > > > > >>> > > > > >>> 105: picking the first offset with smallest ts sound good to me. > > The > > > > KIP > > > > >>> should be explicit about it. But as discussed above, it might be > > > > >>> simplest to not really have a window lookup, but just a plain > > > > key-lookup > > > > >>> and drop if the key exists in the store? -- For late records, it > > might > > > > >>> imply that they are not de-duplicated, but this is also the case > > for > > > > >>> in-order records if they are further apart than the > de-duplication > > > > >>> window size, right? Thus I would believe this is "more natural" > > > > compared > > > > >>> to discarding late records pro-actively, which would lead to > > missing > > > > >>> result records? > > > > >>> > > > > >>> We could also make this configurable if we are not sure what > users > > > > >>> really need -- or add such a configuration later in case the > > semantics > > > > >>> we pick don't work for some users. > > > > >>> > > > > >>> Another line of thinking, that did serve us well in the past: in > > doubt > > > > >>> keep a record -- users can add operators to drop record (in case > > they > > > > >>> don't want to keep it), but if we drop a record, users have no > way > > to > > > > >>> resurrect it (thus, building a workaround to change semantica is > > > > >>> possible for users if we default to keep records, but not the > > other way > > > > >>> around). > > > > >>> > > > > >>> Would be good to get input from the broader community on this > > question > > > > >>> thought. In the end, it must be a use-case driven decision? > > > > >>> > > > > >>> > > > > >>> > > > > >>> -Matthias > > > > >>> > > > > >>> This email was screened for spam and malicious content but > exercise > > > > >> caution anyway. > > > > >>> > > > > >>> > > > > >>> > > > > >>> > > > > >>> > > > > >>> On 6/6/24 5:02 AM, Ayoub Omari wrote: > > > > >>>> Hi Matthias, > > > > >>>> > > > > >>>> Thank you for your review ! > > > > >>>> > > > > >>>> 100. > > > > >>>> I agree. I changed the name of the parameter to "idSelector". > > > > >>>> Because this id may be computed, It is better to call it "id" > > rather > > > > >> than > > > > >>>> field or attribute. > > > > >>>> > > > > >>>> 101. > > > > >>>> The reason I added the methods `keySerde()` and `valueSerde()` > > was to > > > > >>>> have the same capabilities as other serde classes (such as > Grouped > > > > >>>> or Joined). As a Kafka-streams user, I usually use > `with(keySerde, > > > > >>>> valueSerde)` > > > > >>>> as you suggested. But I am not sure if we don't want to have > them > > for > > > > >> this > > > > >>>> processor ? > > > > >>>> > > > > >>>> 102. > > > > >>>> That's a good point ! Because we know that the window store will > > > > contain > > > > >>>> at most one instance of a given key, I am not sure how the range > > fetch > > > > >>>> on WindowStore compares to a KeyValueStore `get()` in this case. > > > > >>>> Wondering if the fact that the record's key is the prefix of the > > > > >> underlying > > > > >>>> keyValueStore's key ("<dataKey,ts>") may provide comparable > > > > performance > > > > >>>> to the random access of KeyValueStore ? Of course, the > WindowStore > > > > >> fetch() > > > > >>>> would be less efficient because it may fetch from more than 1 > > segment, > > > > >> and > > > > >>>> because of some iterator overhead. > > > > >>>> > > > > >>>> The purpose of using a WindowStore is to automatically purge old > > data. > > > > >>>> For example, deduplicating a topic written at least once > wouldn't > > > > >> require > > > > >>>> keeping a large history. This is not the case of using a > > KeyValueStore > > > > >>>> which would require scanning regularly to remove expired > records. > > > > >>>> That might cause a sudden increase of latency whenever the > cleanup > > > > >>>> is triggered. > > > > >>>> > > > > >>>> It would be good to hear from anyone who has done some analysis > > > > >>>> on RocksDB's range fetch. > > > > >>>> > > > > >>>> 103. > > > > >>>> Sure, I can update it once we agree on underlying semantics. > > > > >>>> > > > > >>>> 104. > > > > >>>> Another good point ! > > > > >>>> > > > > >>>>> In the end, de-duplication does only make sense when EOS is > used > > > > >>>> > > > > >>>> I agree with that. And for me, the use case of deduplicating a > > topic > > > > >>>> written ALOS inside an EOS application might be the top 1 use > case > > > > >>>> of deduplication. > > > > >>>> > > > > >>>>> all downstream processing happens, `context.forward()` returns > > > > >>>>> and we update the state store, we could now crash w/o > committing > > > > >> offsets > > > > >>>> > > > > >>>> This gets me wondering if this is a limitation of stateful > > processors > > > > >>>> in ALOS. For example a windowed aggregation with > `on_window_close` > > > > >>>> emit strategy may have the same limitation today (we receive a > > record, > > > > >>>> we update its aggregation result in the store, then crash before > > > > >> committing, > > > > >>>> then the record will be again reconsidered for aggregation). Is > > this > > > > >>>> correct ? > > > > >>>> > > > > >>>> As a workaround, I think storing the record's offset inside the > > > > >>>> store's value can tell us whether the record has been already > > seen or > > > > >> not. > > > > >>>> If we receive a record whose deduplication id exists in the > store > > > > >>>> and the entry in the store has the same offset, it means the > > record > > > > >>>> is processed twice and we can go ahead and forward it. If the > > offset > > > > >>>> is different, it means it's a duplicate record, so we ignore it. > > > > >>>> > > > > >>>> As you said, we don't have any guarantee whether the initial > > record > > > > was > > > > >>>> forwarded or not in case of a crash before commit. In this > > solution > > > > >>>> we would forward the record twice, which is against > deduplication. > > > > >>>> But, this is still an ALOS application, so it has the same > > semantics > > > > >>>> as any other such application. With this, I am not sure we can > > > > >>>> have "strict" deduplication for ALOS applications. > > > > >>>> > > > > >>>> 105. > > > > >>>> For me, if there are two duplicate records, it means they are > > > > >>>> the same in the application's point of view, so it can choose > > > > >>>> either one. Thus, I would go with forwarding the record with > > > > >>>> the least offset. > > > > >>>> > > > > >>>>> Would it not be desired to drop all duplicates independent > > > > >>>>> of their ts, as long as we find a record in the store? > > > > >>>> > > > > >>>> This is actually related to the (suggested) windowed nature > > > > >>>> of deduplication. As in 102. we don't want to do a "forever" > > > > >>>> deduplication, which may be impossible for huge workloads > > > > >>>> where all records should be kept in the store. Hence, the fetch > > > > >>>> of timestamp between [ts-deduplicationInterval, > > > > >> ts+deduplicationInterval] > > > > >>>> > > > > >>>> About late records, this is again due to the windowed nature. > > > > >>>> Because the store won't save those late (i.e. expired) records, > > > > >>>> we have two options. Either, we do not apply deduplication > > > > >>>> on them, thus the deduplication doesn't work on late records, > > > > >>>> or we discard them (which is the option I suggest). > > > > >>>> In the second case, It would be up to the user to choose > > > > >>>> any deduplicationInterval that may sufficiently cover all his > late > > > > data. > > > > >>>> What do you think ? > > > > >>>> > > > > >>>> Thanks, > > > > >>>> Ayoub > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> Le mar. 4 juin 2024 à 23:58, Matthias J. Sax <mj...@apache.org> > a > > > > >> écrit : > > > > >>>> > > > > >>>>> Ayoub, > > > > >>>>> > > > > >>>>> thanks for resurrecting this KIP. I think a built-in > > de-duplication > > > > >>>>> operator will be very useful. > > > > >>>>> > > > > >>>>> > > > > >>>>> Couple of questions: > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> 100: `deduplicationKeySelector` > > > > >>>>> > > > > >>>>> Is this the best name? It might indicate that we select a "key" > > what > > > > is > > > > >>>>> an overloaded term... Maybe we could use `Field` or `Id` or > > > > `Attribute` > > > > >>>>> instead of `Key` in the name? Just brainstorming. If we think > > `Key` > > > > is > > > > >>>>> the best word, I am also ok with it. > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> 101: `Deduplicated` class > > > > >>>>> > > > > >>>>> You propose to add static methods `keySerde()` and > > `valueSerde()` -- > > > > in > > > > >>>>> other config classes, we use only `with(keySerde, valueSerde)` > > as we > > > > >> try > > > > >>>>> to use the "builder" pattern, and avoid too many overloads. I > > would > > > > >>>>> prefer to omit both methods you suggest and just use a single > > `with` > > > > >> for > > > > >>>>> both serdes. > > > > >>>>> > > > > >>>>> Similarly, I thing we don't want to add `with(...)` which takes > > all > > > > >>>>> parameters at once (which should only be 3 parameters, not 4 as > > it's > > > > >>>>> currently in the KIP)? > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> 102: Usage of `WindowedStore`: > > > > >>>>> > > > > >>>>> Would this be efficient? The physical byte layout it > > "<dataKey,ts>" > > > > for > > > > >>>>> the store key, so it would be difficult to do an efficient > lookup > > > > for a > > > > >>>>> given "de-duplication key" to discard duplicates, as we don't > > know > > > > the > > > > >>>>> timestamp of the first record with the same "de-duplication > key". > > > > >>>>> > > > > >>>>> This boils down to the actual de-duplication logic (some more > > > > comments > > > > >>>>> below), but what you propose seems to require expensive > > range-scans > > > > >> what > > > > >>>>> could be cost prohibitive in practice. I think we need to find > a > > way > > > > to > > > > >>>>> use efficient key-point-lookups to make this work. > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> 103: "Processing logic": > > > > >>>>> > > > > >>>>> Might need some updates (Cf 102 comment). I am not sure if I > > fully > > > > >>>>> understand the logic: cf 105 below. > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> 104: > > > > >>>>> > > > > >>>>>> If no entries found → forward the record + save the record in > > the > > > > >> store > > > > >>>>> > > > > >>>>> This part is critical, and we should discuss in detail. In the > > end, > > > > >>>>> de-duplication does only make sense when EOS is used, and we > > might > > > > want > > > > >>>>> to call this out (eg, on the JavaDocs)? But if used with ALOS, > > it's > > > > >> very > > > > >>>>> difficult to ensure that we never lose data... Your proposal to > > > > >>>>> first-forward goes into the right direction, but does not > really > > > > solve > > > > >>>>> the problem entirely: > > > > >>>>> > > > > >>>>> Even if we forward the message first, all downstream processing > > > > >> happens, > > > > >>>>> `context.forward()` returns and we update the state store, we > > could > > > > now > > > > >>>>> crash w/o committing offsets. For this case, we have no > guarantee > > > > that > > > > >>>>> the result records where published (as we did not flush the > > producer > > > > >>>>> yet), but when re-reading from the input topic, we would find > the > > > > >> record > > > > >>>>> in the store and incorrectly drop as duplicate... > > > > >>>>> > > > > >>>>> I think the only solution to make this work would be to use > > TX-state > > > > >>>>> stores in combination with ALOS as proposed via KIP-892? > > > > >>>>> > > > > >>>>> Using an in-memory store won't help much either? The producer > > could > > > > >> have > > > > >>>>> send the write into the changelog topic, but not into the > result > > > > topic, > > > > >>>>> and thus we could still not guarantee ALOS...? > > > > >>>>> > > > > >>>>> How do we want to go about this? We could also say, this new > > operator > > > > >>>>> only works with EOS. Would this be too restrictive? -- At lest > > for > > > > >> know, > > > > >>>>> until KIP-892 lands, and we could relax it? > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> 105: "How to detect late records" > > > > >>>>> > > > > >>>>> In the end, it seems to boil down to determine which of the > > records > > > > to > > > > >>>>> forward and which record to drop, for (1) the regular case and > > (2) > > > > the > > > > >>>>> out-of-order data case. > > > > >>>>> > > > > >>>>> Regular case (no out-of-order data): For this case, offset and > ts > > > > order > > > > >>>>> is the same, and we can forward the first record we get. All > > later > > > > >>>>> record within "de-duplication period" with the same > > "de-duplication > > > > >> key" > > > > >>>>> would be dropped. If a record with the same "de-duplication > key" > > > > >> arrives > > > > >>>>> after "de-duplication period" passed, we cannot drop it any > > longer, > > > > but > > > > >>>>> would still forward it, as by the contract of the operator / > > > > >>>>> de-duplication period. > > > > >>>>> > > > > >>>>> For the out-of-order case: The first question we need to answer > > is, > > > > do > > > > >>>>> we want to forward the record with the smallest offset or the > > record > > > > >>>>> with the smallest ts? Logically, forwarding with the smallest > ts > > > > might > > > > >>>>> be "more correct", however, it implies we could only forward it > > after > > > > >>>>> "de-duplication period" passed, what might be undesired > latency? > > > > Would > > > > >>>>> this be desired/acceptable? > > > > >>>>> > > > > >>>>> In contrast, if we forward record with the smallest offset > (this > > is > > > > >> what > > > > >>>>> you seem to propose) we don't have a latency issue, but of > > course the > > > > >>>>> question what records to drop is more tricky to answer: it > seems > > you > > > > >>>>> propose to compare the time difference of the stored record to > > the > > > > >>>>> current record, but I am wondering why? Would it not be desired > > to > > > > drop > > > > >>>>> all duplicates independent of their ts, as long as we find a > > record > > > > in > > > > >>>>> the store? Would be good to get some more motivation and > > tradeoffs > > > > >>>>> discussed about the different strategies we could use. > > > > >>>>> > > > > >>>>> You also propose to drop _any_ late record: I am also not sure > if > > > > >> that's > > > > >>>>> desired? Could this not lead to data loss? Assume we get a late > > > > record, > > > > >>>>> but in fact there was never a duplicate? Why would we want to > > drop > > > > it? > > > > >>>>> If there is a late record which is indeed a duplicate, but we > > purged > > > > >> the > > > > >>>>> original record from the store already, it seems to be the same > > case > > > > as > > > > >>>>> for the "no out-of-order case": after we purged we cannot > > > > de-duplicate > > > > >>>>> and thus it's a regular case we can just accept? > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> -Matthias > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> On 5/29/24 4:58 AM, Ayoub Omari wrote: > > > > >>>>>> Hi everyone, > > > > >>>>>> > > > > >>>>>> I've just made a (small) change to this KIP about an > > implementation > > > > >>>>> detail. > > > > >>>>>> Please let me know your thoughts. > > > > >>>>>> > > > > >>>>>> Thank you, > > > > >>>>>> Ayoub > > > > >>>>>> > > > > >>>>>> Le lun. 20 mai 2024 à 21:13, Ayoub <ayoubomar...@gmail.com> a > > > > écrit : > > > > >>>>>> > > > > >>>>>>> Hello, > > > > >>>>>>> > > > > >>>>>>> Following a discussion on community slack channel, I would > > like to > > > > >>>>> revive > > > > >>>>>>> the discussion on the KIP-655, which is about adding a > > > > deduplication > > > > >>>>>>> processor in kafka-streams. > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>> > > > > >> > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API > > > > < > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API > > > > > > > > > >> < > > > > >> > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API > > > > < > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API > > > > > > > > > >>> > > > > >>>>>>> > > > > >>>>>>> Even though the motivation is not quite the same as the > initial > > > > one, > > > > >> I > > > > >>>>>>> updated the KIP rather than creating a new one, as I believe > > the > > > > end > > > > >>>>> goal > > > > >>>>>>> is the same. > > > > >>>>>>> > > > > >>>>>>> Thanks, > > > > >>>>>>> Ayoub > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>> > > > > >>>>> > > > > >>>> > > > > >> > > > > > > > > > > > > > > >