> To recap: Could it be that the idea to apply a DISTINCT-aggregation is> for a different use-case than to remove duplicate messages from a KStream?
OK, imagine the following:We have 100000 thermometers. Each second they transmit the current measured temperature. The id of the thermometer is the key, the temperature is the value. The temperature measured by a single thermometer changes slowly: on average, say once per 30 seconds, so most of the messages from a given thermometer are duplicates. But we need to react to the change ASAP. And we need a materialized view: a relational database table with the columns 'thermometerID' and 'temperature'.
1) We don't want to send 100K updates per second to the database.2) But it's still ok if some updates will be duplicates -- the updates are idempotent.
3) We even can afford to loose a fraction of data -- the data from each thermometer will be eventually 'fixed' by its fresh readings.
This is my use case (actually these are not thermometers and temperature, but it doesn't matter :-)
Is it conceptually different from what you were thinking about? Regards, Ivan 09.08.2021 3:05, Matthias J. Sax пишет:
Thanks for sharing your thoughts. I guess my first question about why using the key boils down to the use case, and maybe you have something else in mind than I do.I see it this way: we define 'distinct' operation as returning a single record per time window per selected key,I believe this sentence explains your way of thinking about it. My way of thinking about it is different though: KStream de-duplication means to "filter/drop" duplicate records, and a record is by definition a duplicate if key AND value are the same. --- Or are the use case, for which there might be an "message ID" and even if the "message ID" is the same, the content of the message might be different? If this holds, do we really de-duplicate records (sounds more like, pick a random one)? Just re-read some of my older replies, and I guess, back than I did just comment about your KeyExtractor idea, without considering the end-the-end picture. Thus, my reply below goes into a different direction now: We only need to apply a window because we need to purge state eventually. To this end, I actually believe that applying a "sliding window" is the best way to go: for each message we encounter, we start the de-duplication window when the message arrives, don't emit any duplicates as long as the window is open, and purge the state afterwards. Of course, internally, the implementation must be quite different compared to a regular aggregation: we need to pull the value into the key, to create a unique window for each message, but this seems to be an implementation detail. Thus, I am wondering if we should not try to put a square pig into a round whole: the current windowing/aggregation API is not designed for a KStream de-duplication use-case, because a de-duplication is no aggregation to begin with. Why not use a different API: KStream<K,V> KStream<K,V>#deduplicate(final Duration windowSize); Including some more overloads to allow configuring the internal state store (this state store should not be queryable similar to KStream-KStream state stores...). To recap: Could it be that the idea to apply a DISTINCT-aggregation is for a different use-case than to remove duplicate messages from a KStream? -Matthias On 8/6/21 4:12 AM, Ivan Ponomarev wrote:- Why restrict de-duplication for the key only? Should we not also consider the value (or make it somehow flexible and let the user choose?)Wasn't it the first idea that we abandoned (I mean, to provide 'KeyExtractor' and so on)? In order to keep things simple we decided to make .selectKey(...) //here we select anything we need //add markAsPartitioned from KIP-759 to taste .groupByKey() .windowed(...) .distinct() //the only new operation that we add to the API, reusing //all the windowed aggregations' infrastructure- I am wondering if the return type should be `KStream` instead of a `KTable`? If I deduplicate a stream, I might expect a stream back? I don't really consider a stream de-duplication an aggregation with "mutable state"...First, because it's going to be an operation on a Time/SessionWindowedKStream, and these operations usually return KTable<Windowed<...>, ...>. Then, it might be useful to know to which time window a deduplicated record actually belongs. And it is trivial task to turn this table back to a stream.IMHO, an unordered stream and it's ordered "cousin" should yield the same result? -- Given your example it seems you want to keep the first record base on offset order. Wondering why?I see it this way: we define 'distinct' operation as returning a single record per time window per selected key, no matter what record. So it's ok if it yields different results for different orderings if its main property holds! And since we can select any key we like, we can get any degree of 'deduplication granularity' and 'determinism'.While I agree that deduplication for overlapping window is questionable, I am still wondering if you plan to disallow it (by adding a runtime check and throwing an exception), or not?Thanks for this point! I think that 'fail-fast' approach is good. We might need to throw an exception, I will add this into the KIP: - SessionWindows -- OK - SlidingWindows -- Exception - TimeWindows -- tumbling -- OK hopping -- Exception Regards, Ivan 04.08.2021 4:22, Matthias J. Sax пишет:Couple of questions: - Why restrict de-duplication for the key only? Should we not also consider the value (or make it somehow flexible and let the user choose?) - I am wondering if the return type should be `KStream` instead of a `KTable`? If I deduplicate a stream, I might expect a stream back? I don't really consider a stream de-duplication an aggregation with "mutable state"... Also, why would the result key need to be windowed? Btw: How should out-of-order data be handled? Given that you only want to consider the key, the value could be different, and thus, if there is out-of-order data, keeping the one or other value could make a difference? IMHO, an unordered stream and it's ordered "cousin" should yield the same result? -- Given your example it seems you want to keep the first record base on offset order. Wondering why? While I agree that deduplication for overlapping window is questionable, I am still wondering if you plan to disallow it (by adding a runtime check and throwing an exception), or not? On 8/1/21 6:42 AM, Ivan Ponomarev wrote:Hi Bruno, I'm sorry for the delay with the answer. Unfortunately your messages were put to spam folder, that's why I didn't answer them right away. Concerning your question about comparing serialized values vs. using equals: I think it must be clear now due to John's explanations. Distinct is a stateful operation, thus we will need to use serialization. (Although AFAICS the in-memory storage might be a good practical solution in many cases).I do currently not see why it should not make sense in hoppingwindows... I do not understand the following sentence: "...one record can be multiplied instead of deduplication." Ok, let me explain. As it's written in the KIP, "The distinct operation returns only a first record that falls into a new window, and filters out all the other records that fall into an already existing window." Also it's worth to remember that the result of `distinct` is KTable<Windowed<K>, V>, not Stream<K, V>. If we have, say, hopping time windows [0, 40], [10, 50], [20, 60] and a record (key, val) with timestamp 25 arrives, it will be forwarded three times ('multiplied') since is falls into the intersection of all three windows. The output will be (key@[0/40], val) (key@[10/50], val) (key@[20/60], val) You can reason about `distinct` operation just like you reason about `sum` or `count`. When a record arrives that falls into a window, we update the aggregation on this window. For `distinct`, when extra records arrive into the same window, we also perform some sort of aggregation (we may even count them internally!), but, unlike sum or count, we will not forward anything since counter is strictly greater than zero. You may refer to 'usage examples' of the KIP (https://cwiki.apache.org/confluence/display/KAFKA/KIP-655:+Windowed+Distinct+Operation+for+Kafka+Streams+API#KIP655:WindowedDistinctOperationforKafkaStreamsAPI-UsageExamples) to get clearer idea of how it works.As I said earlier, I do not think that SQL and the Java Stream API aregood arguments to not use a verb This is an important matter. As we all know, naming is hard. However, `distinct` name is not used just in SQL and Java Streams. It is a kind of a standard operation that is used in nearly all the data processing frameworks, see all the hyperlinked examples in 'Motivation' section of KIP (https://cwiki.apache.org/confluence/display/KAFKA/KIP-655:+Windowed+Distinct+Operation+for+Kafka+Streams+API#KIP655:WindowedDistinctOperationforKafkaStreamsAPI-Motivation) Please look at it and let me know what do you think. Regards, Ivan 29.07.2021 4:49, John Roesler пишет:Hi Bruno, I had previously been thinking to use equals(), since I thought that this might be a stateless operation. Comparing the serialized form requires a serde and a fairly expensive serialization operation, so while byte equality is superior to equals(), we shouldn't use it in operations unless they already require serialization. I chnaged my mind when I later realized I had been mistaken, and this operation is of course stateful. I hope this helps clarify it. Thanks, -John On Fri, 2021-07-23 at 09:53 +0200, Bruno Cadonna wrote:Hi Ivan and John, 1. John, could you clarify why comparing serialized values seems the way to go, now? 2. Ivan, Could you please answer my questions that I posted earlier? I will repost it here: Ivan, could you please make this matter a bit clearer in the KIP? Actually, thinking about it again, I do currently not see why it should not make sense in hopping windows. Regarding this, I do not understand the following sentence: "hopping and sliding windows do not make much sense for distinct() because they produce multiple intersected windows, so that one record can be multiplied instead of deduplication." Ivan, what do you mean with "multiplied"? 3. As I said earlier, I do not think that SQL and the Java Stream API are good arguments to not use a verb. However, if everybody else is fine with it, I can get behind it. John, good catch about the missing overloads! BTW, the overload with Named should be there regardless of stateful or stateless. Best, Bruno On 22.07.21 20:58, John Roesler wrote:Hi Ivan, Thanks for the reply. 1. I think I might have gotten myself confused. I was thinking of this operation as stateless, but now I'm not sure what I was thinking... This operator has to be stateful, right? In that case, I agree that comparing serialized values seems to be way to do it. 2. Thanks for the confirmation 3. I continue to be satisfied to let you all hash it out. Thanks, -John On Tue, 2021-07-20 at 11:42 +0300, Ivan Ponomarev wrote:Hi all, 1. Actually I always thought about the serialized byte array only -- at least this is what local stores depend upon, and what Kafka itself depends upon when doing log compaction. I can imagine a case where two different byte arrays deserialize to objects which are `equals` to each other. But I think we can ignore this for now because IMO the risks related to buggy `equals` implementations outweigh the potential benefits. I will mention the duplicate definition in the KIP. 2. I agree with John, he got my point. 3. Let me gently insist on `distinct`. I believe that an exception to the rule is appropriate here, because the name `distinct()` is ubiquitous. It's not only about Java Streams API (or .NET LINQ, which appeared earlier and also has `Distinct`): Spark's DataFrame has `distinct()` method, Hazelcast Jet has `distinct()` method, and I bet I can find more examples if I search. When we teach KStreams, we always say that KStreams are just like other streaming APIs, and they have roots in SQL queries. Users know what `distinct` is and they expect it to be in the API. Regards, Ivan 13.07.2021 0:10, John Roesler пишет:Hi all, Bruno raised some very good points. I’d like to chime in with additional context. 1. Great point. We faced a similar problem defining KIP-557. For 557, we chose to use the serialized byte array instead of the equals() method, but I think the situation in KIP-655 is a bit different. I think it might make sense to use the equals() method here, but am curious what Ivan thinks. 2. I figured we'd do nothing. I thought Ivan was just saying that it doesn't make a ton of sense to use it, which I agree with, but it doesn't seem like that means we should prohibit it. 3. FWIW, I don't have a strong feeling either way. Thanks, -John On Mon, Jul 12, 2021, at 09:14, Bruno Cadonna wrote:Hi Ivan, Thank you for the KIP! Some aspects are not clear to me from the KIP and I have a proposal. 1. The KIP does not describe the criteria that define a duplicate. Could you add a definition of duplicate to the KIP? 2. The KIP does not describe what happens if distinct() is applied on a hopping window. On the DSL level, I do not see how you can avoid that users apply distinct() on a hopping window, i.e., you cannot avoid it at compile time, you need to check it at runtime and throw an exception. Is this correct or am I missing something? 3. I would also like to back a proposal by Sophie. She proposed to use deduplicate() instead of distinct(), since the other DSL operations are also verbs. I do not think that SQL and the Java Stream API are good arguments to not use a verb. Best, Bruno On 10.07.21 19:11, John Roesler wrote:Hi Ivan, Sorry for the silence! I have just re-read the proposal. To summarize, you are now only proposing the zero-arg distict() method to be added to TimeWindowedKStream and SessionWindowedKStream, right? I’m in favor of this proposal. Thanks, John On Sat, Jul 10, 2021, at 10:18, Ivan Ponomarev wrote:Hello everyone, I would like to remind you about KIP-655 and KIP-759 just in case they got lost in your inbox. Now the initial proposal is split into two independent and smaller ones, so it must be easier to review them. Of course, if you have time. Regards, Ivan 24.06.2021 18:11, Ivan Ponomarev пишет:Hello all, I have rewritten the KIP-655 summarizing what was agreed upon during this discussion (now the proposal is much simpler and less invasive). I have also created KIP-759 (cancelRepartition operation) and started a discussion for it. Regards, Ivan. 04.06.2021 8:15, Matthias J. Sax пишет:Just skimmed over the thread -- first of all, I am glad that we could merge KIP-418 and ship it :) About the re-partitioning concerns, there are already two tickets for it: - https://issues.apache.org/jira/browse/KAFKA-4835 - https://issues.apache.org/jira/browse/KAFKA-10844 Thus, it seems best to exclude this topic from this KIP, and do a separate KIP for it (if necessary, we can "pause" this KIP until the repartition KIP is done). It's a long standing "issue" and we should resolve it in a general way I guess. (Did not yet ready all responses in detail yet, so keeping this comment short.) -Matthias On 6/2/21 6:35 AM, John Roesler wrote:Thanks, Ivan! That sounds like a great plan to me. Two smaller KIPs are easier to agree on than one big one. I agree hopping and sliding windows will actually have a duplicating effect. We can avoid adding distinct() to the sliding window interface, but hopping windows are just a different parameterization of epoch-aligned windows. It seems we can’t do much about that except document the issue. Thanks, John On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote:Hi John! I think that your proposal is just fantastic, it simplifies things a lot! I also felt uncomfortable due to the fact that the proposed `distinct()` is not somewhere near `count()` and `reduce(..)`. But `selectKey(..).groupByKey().windowedBy(..).distinct()` didn't look like a correct option for me because of the issue with the unneeded repartitioning. The bold idea that we can just CANCEL the repartitioning didn't came to my mind. What seemed to me a single problem is in fact two unrelated problems: `distinct` operation and cancelling the unneeded repartitioning. > what if we introduce a parameter to `selectKey()` that specifies that the caller asserts that the new key does _not_ change the data partitioning? I think a more elegant solution would be not to add a new parameter to `selectKey` and all the other key-changing operations (`map`, `transform`, `flatMap`, ...), but add a new operator `KStream#cancelRepartitioning()` that resets `keyChangingOperation` flag for the upstream node. Of course, "use it only if you know what you're doing" warning is to be added. Well, it's a topic for a separate KIP! Concerning `distinct()`. If we use `XXXWindowedKStream` facilities, then changes to the API are minimally invasive: we're just adding `distinct()` to TimeWindowedKStream and SessionWindowedKStream, and that's all. We can now define `distinct` as an operation that returns only a first record that falls into a new window, and filters out all the other records that fall into an already existing window. BTW, we can mock the behaviour of such an operation with `TopologyTestDriver` using `reduce((l, r) -> STOP)`.filterNot((k, v)->STOP.equals(v)). ;-) Consider the following example (record times are in seconds): //three bursts of variously ordered records 4, 5, 6 23, 22, 24 34, 33, 32 //'late arrivals' 7, 22, 35 1. 'Epoch-aligned deduplication' using tumbling windows: .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct() produces (key@[00000/10000], 4) (key@[20000/30000], 23) (key@[30000/40000], 34) -- that is, one record per epoch-aligned window. 2. Hopping and sliding windows do not make much sense here, because they produce multiple intersected windows, so that one record can be multiplied, but we want deduplication. 3. SessionWindows work for 'data-aligned deduplication'. .groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct() produces only ([key@4000/4000], 4) ([key@23000/23000], 23) because all the records bigger than 7 are stuck together in one session. Setting inactivity gap to 9 seconds will return three records: ([key@4000/4000], 4) ([key@23000/23000], 23) ([key@34000/34000], 34) WDYT? If you like this variant, I will re-write KIP-655 and propose a separate KIP for `cancelRepartitioning` (or whatever name we will choose for it). Regards, Ivan 24.05.2021 22:32, John Roesler пишет:Hey there, Ivan! In typical fashion, I'm going to make a somewhat outlandish proposal. I'm hoping that we can side-step some of the complications that have arisen. Please bear with me. It seems like `distinct()` is not fundamentally unlike other windowed "aggregation" operations. Your concern about unnecessary repartitioning seems to apply just as well to `count()` as to `distinct()`. This has come up before, but I don't remember when: what if we introduce a parameter to `selectKey()` that specifies that the caller asserts that the new key does _not_ change the data partitioning? The docs on that parameter would of course spell out all the "rights and responsibilities" of setting it. In that case, we could indeed get back to `selectKey(A).windowBy(B).distinct(...)`, where we get to compose the key mapper and the windowing function without having to carve out a separate domain just for `distinct()`. All the rest of the KStream operations would also benefit. What do you think? Thanks, John On Sun, May 23, 2021, at 08:09, Ivan Ponomarev wrote:Hello everyone, let me revive the discussion for KIP-655. Now I have some time again and I'm eager to finalize this. Based on what was already discussed, I think that we can split the discussion into three topics for our convenience. The three topics are: - idExtractor (how should we extract the deduplication key for the record) - timeWindows (what time windows should we use) - miscellaneous (naming etc.) ---- idExtractor ---- Original proposal: use (k, v) -> f(k, v) mapper, defaulting to (k, v) -> k. The drawback here is that we must warn the user to choose such a function that sets different IDs for records from different partitions, otherwise same IDs might be not co-partitioned (and not deduplicated as a result). Additional concern: what should we do when this function returns null? Matthias proposed key-only deduplication: that is, no idExtractor at all, and if we want to use `distinct` for a particular identifier, we must `selectKey()` before. The drawback of this approach is that we will always have repartitioning after the key selection, while in practice repartitioning will not always be necessary (for example, when the data stream is such that different values infer different keys). So here we have a 'safety vs. performance' trade-off. But 'safe' variant is also not very convenient for developers, since we're forcing them to change the structure of their records. A 'golden mean' here might be using composite ID with its first component equals to k and its second component equals to some f(v) (f defaults to v -> null, and null value returned by f(v) means 'deduplicate by the key only'). The nuance here is that we will have serializers only for types of k and f(v), and we must correctly serialize a tuple (k, f(v)), but of course this is doable. What do you think? ---- timeWindows ---- Originally I proposed TimeWindows only just because they solved my particular case :-) but agree with Matthias' and Sophie's objections. I like the Sophie's point: we need both epoch-aligned and data-aligned windows. IMO this is absolutely correct: "data-aligned is useful for example when you know that a large number of updates to a single key will occur in short bursts, and epoch-aligned when you specifically want to get just a single update per discrete time interval." I just cannot agree right away with Sophie's .groupByKey().windowedBy(...).distinct() proposal, as it implies the key-only deduplication -- see the previous topic. Epoch-aligned windows are very simple: they should forward only one record per enumerated time window. TimeWindows are exactly what we want here. I mentioned in the KIP both tumbling and hopping windows just because both are possible for TimeWindows, but indeed I don't see any real use case for hopping windows, only tumbling windows make sence IMO. For data-aligned windows SlidingWindow interface seems to be a nearly valid choice. Nearly. It should forward a record once when it's first seen, and then not again for any identical records that fall into the next N timeUnits. However, we cannot reuse SlidingWindow as is, because just as Matthias noted, SlidingWindows go backward in time, while we need a windows that go forward in time, and are not opened while records fall into an already existing window. We definitely should make our own implementation, maybe we should call it ExpirationWindow? WDYT? ---- miscellaneous ---- Persistent/in-memory stores. Matthias proposed to pass Materialized parameter next to DistinctParameters (and this is necessary, because we will need to provide a serializer for extracted id). This is absolutely valid point, I agree and I will fix it in the KIP. Naming. Sophie noted that the Streams DSL operators are typically named as verbs, so she proposes `deduplicate` in favour of `distinct`. I think that while it's important to stick to the naming conventions, it is also important to think of the experience of those who come from different stacks/technologies. People who are familiar with SQL and Java Streams API must know for sure what does 'distinct' mean, while data deduplication in general is a more complex task and thus `deduplicate` might be misleading. But I'm ready to be convinced if the majority thinks otherwise. Regards, Ivan 14.09.2020 21:31, Sophie Blee-Goldman пишет:Hey all, I'm not convinced either epoch-aligned or data-aligned will fit all possible use cases. Both seem totally reasonable to me: data-aligned is useful for example when you know that a large number of updates to a single key will occur in short bursts, and epoch- aligned when you specifically want to get just a single update per discrete time interval. Going a step further, though, what if you want just a single update per calendar month, or per year with accounting for leap years? Neither of those are serviced that well by the existing Windows specification to windowed aggregations, a well-known limitation of the current API. There is actually a KIP <https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface> going on in parallel to fix this exact issue and make the windowing interface much more flexible. Maybe instead of re-implementing this windowing interface in a similarly limited fashion for the Distinct operator, we could leverage it here and get all the benefits coming with KIP-645. Specifically, I'm proposing to remove the TimeWindows/etc config from the DistinctParameters class, and move the distinct() method from the KStream interface to the TimeWindowedKStream interface. Since it's semantically similar to a kind of windowed aggregation, it makes sense to align it with the existing windowing framework, ie: inputStream .groupKyKey() .windowedBy() .distinct() Then we could use data-aligned windows if SlidingWindows is specified in the windowedBy(), and epoch-aligned (or some other kind of enumerable window) if a Windows is specified in windowedBy() (or an EnumerableWindowDefinition once KIP-645 is implemented to replace Windows). *SlidingWindows*: should forward a record once when it's first seen, and then not again for any identical records that fall into the next N timeUnits. This includes out-of-order records, ie if you have a SlidingWindows of size 10s and process records at time 15s, 20s, 14s then you would just forward the one at 15s. Presumably, if you're using SlidingWindows, you don't care about what falls into exact time boxes, you just want to deduplicate. If you do care about exact time boxing then you should use... *EnumerableWindowDefinition* (eg *TimeWindows*): should forward only one record per enumerated time window. If you get a records at 15s, 20s,14s where the windows are enumerated at [5,14], [15, 24], etc then you forward the record at 15s and also the record at 14s Just an idea: not sure if the impedance mismatch would throw users off since the semantics of the distinct windows are slightly different than in the aggregations. But if we don't fit this into the existing windowed framework, then we shouldn't use any existing Windows-type classes at all, imo. ie we should create a new DistinctWindows config class, similar to how stream-stream joins get their own JoinWindows class I also think that non-windowed deduplication could be useful, in which case we would want to also have the distinct() operator on the KStream interface. One quick note regarding the naming: it seems like the Streams DSL operators are typically named as verbs rather than adjectives, for example. #suppress or #aggregate. I get that there's some precedent for 'distinct' specifically, but maybe something like 'deduplicate' would be more appropriate for the Streams API. WDYT? On Mon, Sep 14, 2020 at 10:04 AM Ivan Ponomarev <iponoma...@mail.ru.invalid> wrote:Hi Matthias, Thanks for your review! It made me think deeper, and indeed I understood that I was missing some important details. To simplify, let me explain my particular use case first so I can refer to it later. We have a system that collects information about ongoing live sporting events from different sources. The information sources have their IDs and these IDs are keys of the stream. Each source emits messages concerning sporting events, and we can have many messages about each sporing event from each source. Event ID is extracted from the message. We need a database of event IDs that were reported at least once by each source (important: events from different sources are considered to be different entities). The requirements are: 1) each new event ID should be written to the database as soon as possible 2) although it's ok and sometimes even desired to repeat the notification about already known event ID, but we wouldn’t like our database to be bothered by the same event ID more often than once in a given period of time (say, 15 minutes). With this example in mind let me answer your questions > (1) Using the `idExtractor` has the issue that data might not be > co-partitioned as you mentioned in the KIP. Thus, I am wondering if it > might be better to do deduplication only on the key? If one sets a new > key upstream (ie, extracts the deduplication id into the key), the > `distinct` operator could automatically repartition the data and thus we > would avoid user errors. Of course with 'key-only' deduplication + autorepartitioning we will never cause problems with co-partitioning. But in practice, we often don't need repartitioning even if 'dedup ID' is different from the key, like in my example above. So here we have a sort of 'performance vs security' tradeoff. The 'golden middle way' here can be the following: we can form a deduplication ID as KEY + separator + idExtractor(VALUE). In case idExtractor is not provided, we deduplicate by key only (as in original proposal). Then idExtractor transforms only the value (and not the key) and its result is appended to the key. Records from different partitions will inherently have different deduplication IDs and all the data will be co-partitioned. As with any stateful operation, we will repartition the topic in case the key was changed upstream, but only in this case, thus avoiding unnecessary repartitioning. My example above fits this perfectly. > (2) What is the motivation for allowing the `idExtractor` to return > `null`? Might be good to have some use-case examples for this feature. Can't think of any use-cases. As it often happens, it's just came with a copy-paste from StackOverflow -- see Michael Noll's answer here: https://stackoverflow.com/questions/55803210/how-to-handle-duplicate-messages-using-kafka-streaming-dsl-functions But, jokes aside, we'll have to decide what to do with nulls. If we accept the above proposal of having deduplication ID as KEY + postfix, then null can be treated as no postfix at all. If we don't accept this approach, then treating nulls as 'no-deduplication' seems to be a reasonable assumption (we can't get or put null as a key to a KV store, so a record with null ID is always going to look 'new' for us). > (2) Is using a `TimeWindow` really what we want? I was wondering if a > `SlidingWindow` might be better? Or maybe we need a new type of window? Agree. It's probably not what we want. Once I thought that reusing TimeWindow is a clever idea, now I don't. Do we need epoch alignment in our use case? No, we don't, and I don't know if anyone going to need this. Epoch alignment is good for aggregation, but deduplication is a different story. Let me describe the semantic the way I see it now and tell me what you think: - the only parameter that defines the deduplication logic is 'expiration period' - when a deduplication ID arrives and we cannot find it in the store, we forward the message downstream and store the ID + its timestamp. - when an out-of-order ID arrives with an older timestamp and we find a 'fresher' record, we do nothing and don't forward the message (??? OR NOT? In what case would we want to forward an out-of-order message?) - when an ID with fresher timestamp arrives we check if it falls into the expiration period and either forward it or not, but in both cases we update the timestamp of the message in the store - the WindowStore retention mechanism should clean up very old records in order not to run out of space. > (3) `isPersistent` -- instead of using this flag, it seems better to > allow users to pass in a `Materialized` parameter next to > `DistinctParameters` to configure the state store? Fully agree! Users might also want to change the retention time. > (4) I am wondering if we should really have 4 overloads for > `DistinctParameters.with()`? It might be better to have one overload > with all require parameters, and add optional parameters using the > builder pattern? This seems to follow the DSL Grammer proposal. Oh, I can explain. We can't fully rely on the builder pattern because of Java type inference limitations. We have to provide type parameters to the builder methods or the code won't compile: see e. g. this https://twitter.com/inponomarev/status/1265053286933159938 and following discussion with Tagir Valeev. When we came across the similar difficulties in KIP-418, we finally decided to add all the necessary overloads to parameter class. So I just reproduced that approach here. > (5) Even if it might be an implementation detail (and maybe the KIP > itself does not need to mention it), can you give a high level overview > how you intent to implement it (that would be easier to grog, compared > to reading the PR). Well as with any operation on KStreamImpl level I'm building a store and a processor node. KStreamDistinct class is going to be the ProcessorSupplier, with the logic regarding the forwarding/muting of the records located in KStreamDistinct.KStreamDistinctProcessor#process ---- Matthias, if you are still reading this :-) a gentle reminder: my PR for already accepted KIP-418 is still waiting for your review. I think it's better for me to finalize at least one KIP before proceeding to a new one :-) Regards, Ivan 03.09.2020 4:20, Matthias J. Sax пишет:Thanks for the KIP Ivan. Having a built-in deduplication operator is for sure a good addition. Couple of questions: (1) Using the `idExtractor` has the issue that data might not be co-partitioned as you mentioned in the KIP. Thus, I am wondering if it might be better to do deduplication only on the key? If one sets a new key upstream (ie, extracts the deduplication id into the key), the `distinct` operator could automatically repartition the data and thus we would avoid user errors. (2) What is the motivation for allowing the `idExtractor` to return `null`? Might be good to have some use-case examples for this feature. (2) Is using a `TimeWindow` really what we want? I was wondering if a `SlidingWindow` might be better? Or maybe we need a new type of window? It would be helpful if you could describe potential use cases in more detail. -- I am mainly wondering about hopping window? Each record would always falls into multiple window and thus would be emitted multiple times, ie, each time the window closes. Is this really a valid use case? It seems that for de-duplication, one wants to have some "expiration time", ie, for each ID, deduplicate all consecutive records with the same ID and emit the first record after the "expiration time" passed. In terms of a window, this would mean that the window starts at `r.ts` and ends at `r.ts + windowSize`, ie, the window is aligned to the data. TimeWindows are aligned to the epoch though. While `SlidingWindows` also align to the data, for the aggregation use-case they go backward in time, while we need a window that goes forward in time. It's an open question if we can re-purpose `SlidingWindows` -- it might be ok the make the alignment (into the past vs into the future) an operator dependent behavior? (3) `isPersistent` -- instead of using this flag, it seems better to allow users to pass in a `Materialized` parameter next to `DistinctParameters` to configure the state store? (4) I am wondering if we should really have 4 overloads for `DistinctParameters.with()`? It might be better to have one overload with all require parameters, and add optional parameters using the builder pattern? This seems to follow the DSL Grammer proposal. (5) Even if it might be an implementation detail (and maybe the KIP itself does not need to mention it), can you give a high level overview how you intent to implement it (that would be easier to grog, compared to reading the PR). -Matthias On 8/23/20 4:29 PM, Ivan Ponomarev wrote:Sorry, I forgot to add [DISCUSS] tag to the topic 24.08.2020 2:27, Ivan Ponomarev пишет:Hello, I'd like to start a discussion for KIP-655. KIP-655:https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+APII also opened a proof-of-concept PR for you to experiment with the API: PR#9210: https://github.com/apache/kafka/pull/9210 Regards, Ivan Ponomarev