Couple of questions: - Why restrict de-duplication for the key only? Should we not also consider the value (or make it somehow flexible and let the user choose?)
- I am wondering if the return type should be `KStream` instead of a `KTable`? If I deduplicate a stream, I might expect a stream back? I don't really consider a stream de-duplication an aggregation with "mutable state"... Also, why would the result key need to be windowed? Btw: How should out-of-order data be handled? Given that you only want to consider the key, the value could be different, and thus, if there is out-of-order data, keeping the one or other value could make a difference? IMHO, an unordered stream and it's ordered "cousin" should yield the same result? -- Given your example it seems you want to keep the first record base on offset order. Wondering why? While I agree that deduplication for overlapping window is questionable, I am still wondering if you plan to disallow it (by adding a runtime check and throwing an exception), or not? On 8/1/21 6:42 AM, Ivan Ponomarev wrote: > Hi Bruno, > > I'm sorry for the delay with the answer. Unfortunately your messages > were put to spam folder, that's why I didn't answer them right away. > > Concerning your question about comparing serialized values vs. using > equals: I think it must be clear now due to John's explanations. > Distinct is a stateful operation, thus we will need to use > serialization. (Although AFAICS the in-memory storage might be a good > practical solution in many cases). > >> I do currently not see why it should not make sense in hopping > windows... I do not understand the following sentence: "...one record > can be multiplied instead of deduplication." > > Ok, let me explain. > > As it's written in the KIP, "The distinct operation returns only a first > record that falls into a new window, and filters out all the other > records that fall into an already existing window." > > Also it's worth to remember that the result of `distinct` is > KTable<Windowed<K>, V>, not Stream<K, V>. > > If we have, say, hopping time windows [0, 40], [10, 50], [20, 60] and a > record (key, val) with timestamp 25 arrives, it will be forwarded three > times ('multiplied') since is falls into the intersection of all three > windows. The output will be > > (key@[0/40], val) > (key@[10/50], val) > (key@[20/60], val) > > You can reason about `distinct` operation just like you reason about > `sum` or `count`. When a record arrives that falls into a window, we > update the aggregation on this window. For `distinct`, when extra > records arrive into the same window, we also perform some sort of > aggregation (we may even count them internally!), but, unlike sum or > count, we will not forward anything since counter is strictly greater > than zero. > > You may refer to 'usage examples' of the KIP > (https://cwiki.apache.org/confluence/display/KAFKA/KIP-655:+Windowed+Distinct+Operation+for+Kafka+Streams+API#KIP655:WindowedDistinctOperationforKafkaStreamsAPI-UsageExamples) > to get clearer idea of how it works. > >> As I said earlier, I do not think that SQL and the Java Stream API are > good arguments to not use a verb > > This is an important matter. As we all know, naming is hard. > > However, `distinct` name is not used just in SQL and Java Streams. It is > a kind of a standard operation that is used in nearly all the data > processing frameworks, see all the hyperlinked examples in 'Motivation' > section of KIP > (https://cwiki.apache.org/confluence/display/KAFKA/KIP-655:+Windowed+Distinct+Operation+for+Kafka+Streams+API#KIP655:WindowedDistinctOperationforKafkaStreamsAPI-Motivation) > > > Please look at it and let me know what do you think. > > Regards, > > Ivan > > 29.07.2021 4:49, John Roesler пишет: >> Hi Bruno, >> >> I had previously been thinking to use equals(), since I >> thought that this might be a stateless operation. Comparing >> the serialized form requires a serde and a fairly expensive >> serialization operation, so while byte equality is superior >> to equals(), we shouldn't use it in operations unless they >> already require serialization. >> >> I chnaged my mind when I later realized I had been mistaken, >> and this operation is of course stateful. >> >> I hope this helps clarify it. >> >> Thanks, >> -John >> >> On Fri, 2021-07-23 at 09:53 +0200, Bruno Cadonna wrote: >>> Hi Ivan and John, >>> >>> 1. John, could you clarify why comparing serialized values seems the way >>> to go, now? >>> >>> 2. Ivan, Could you please answer my questions that I posted earlier? I >>> will repost it here: >>> Ivan, could you please make this matter a bit clearer in the KIP? >>> Actually, thinking about it again, I do currently not see why it should >>> not make sense in hopping windows. Regarding this, I do not understand >>> the following sentence: >>> >>> "hopping and sliding windows do not make much sense for distinct() >>> because they produce multiple intersected windows, so that one record >>> can be multiplied instead of deduplication." >>> >>> Ivan, what do you mean with "multiplied"? >>> >>> 3. As I said earlier, I do not think that SQL and the Java Stream API >>> are good arguments to not use a verb. However, if everybody else is fine >>> with it, I can get behind it. >>> >>> John, good catch about the missing overloads! >>> BTW, the overload with Named should be there regardless of stateful or >>> stateless. >>> >>> Best, >>> Bruno >>> >>> On 22.07.21 20:58, John Roesler wrote: >>>> Hi Ivan, >>>> >>>> Thanks for the reply. >>>> >>>> 1. I think I might have gotten myself confused. I was >>>> thinking of this operation as stateless, but now I'm not >>>> sure what I was thinking... This operator has to be >>>> stateful, right? In that case, I agree that comparing >>>> serialized values seems to be way to do it. >>>> >>>> 2. Thanks for the confirmation >>>> >>>> 3. I continue to be satisfied to let you all hash it out. >>>> >>>> Thanks, >>>> -John >>>> >>>> On Tue, 2021-07-20 at 11:42 +0300, Ivan Ponomarev wrote: >>>>> Hi all, >>>>> >>>>> 1. Actually I always thought about the serialized byte array only >>>>> -- at >>>>> least this is what local stores depend upon, and what Kafka itself >>>>> depends upon when doing log compaction. >>>>> >>>>> I can imagine a case where two different byte arrays deserialize to >>>>> objects which are `equals` to each other. But I think we can ignore >>>>> this >>>>> for now because IMO the risks related to buggy `equals` >>>>> implementations >>>>> outweigh the potential benefits. >>>>> >>>>> I will mention the duplicate definition in the KIP. >>>>> >>>>> 2. I agree with John, he got my point. >>>>> >>>>> 3. Let me gently insist on `distinct`. I believe that an exception to >>>>> the rule is appropriate here, because the name `distinct()` is >>>>> ubiquitous. >>>>> >>>>> It's not only about Java Streams API (or .NET LINQ, which appeared >>>>> earlier and also has `Distinct`): Spark's DataFrame has `distinct()` >>>>> method, Hazelcast Jet has `distinct()` method, and I bet I can find >>>>> more >>>>> examples if I search. When we teach KStreams, we always say that >>>>> KStreams are just like other streaming APIs, and they have roots in >>>>> SQL >>>>> queries. Users know what `distinct` is and they expect it to be in >>>>> the API. >>>>> >>>>> >>>>> Regards, >>>>> >>>>> Ivan >>>>> >>>>> >>>>> 13.07.2021 0:10, John Roesler пишет: >>>>>> Hi all, >>>>>> >>>>>> Bruno raised some very good points. I’d like to chime in with >>>>>> additional context. >>>>>> >>>>>> 1. Great point. We faced a similar problem defining KIP-557. For >>>>>> 557, we chose to use the serialized byte array instead of the >>>>>> equals() method, but I think the situation in KIP-655 is a bit >>>>>> different. I think it might make sense to use the equals() method >>>>>> here, but am curious what Ivan thinks. >>>>>> >>>>>> 2. I figured we'd do nothing. I thought Ivan was just saying that >>>>>> it doesn't make a ton of sense to use it, which I agree with, but >>>>>> it doesn't seem like that means we should prohibit it. >>>>>> >>>>>> 3. FWIW, I don't have a strong feeling either way. >>>>>> >>>>>> Thanks, >>>>>> -John >>>>>> >>>>>> On Mon, Jul 12, 2021, at 09:14, Bruno Cadonna wrote: >>>>>>> Hi Ivan, >>>>>>> >>>>>>> Thank you for the KIP! >>>>>>> >>>>>>> Some aspects are not clear to me from the KIP and I have a proposal. >>>>>>> >>>>>>> 1. The KIP does not describe the criteria that define a >>>>>>> duplicate. Could >>>>>>> you add a definition of duplicate to the KIP? >>>>>>> >>>>>>> 2. The KIP does not describe what happens if distinct() is >>>>>>> applied on a >>>>>>> hopping window. On the DSL level, I do not see how you can avoid >>>>>>> that >>>>>>> users apply distinct() on a hopping window, i.e., you cannot >>>>>>> avoid it at >>>>>>> compile time, you need to check it at runtime and throw an >>>>>>> exception. Is >>>>>>> this correct or am I missing something? >>>>>>> >>>>>>> 3. I would also like to back a proposal by Sophie. She proposed >>>>>>> to use >>>>>>> deduplicate() instead of distinct(), since the other DSL >>>>>>> operations are >>>>>>> also verbs. I do not think that SQL and the Java Stream API are good >>>>>>> arguments to not use a verb. >>>>>>> >>>>>>> Best, >>>>>>> Bruno >>>>>>> >>>>>>> >>>>>>> On 10.07.21 19:11, John Roesler wrote: >>>>>>>> Hi Ivan, >>>>>>>> >>>>>>>> Sorry for the silence! >>>>>>>> >>>>>>>> I have just re-read the proposal. >>>>>>>> >>>>>>>> To summarize, you are now only proposing the zero-arg distict() >>>>>>>> method to be added to TimeWindowedKStream and >>>>>>>> SessionWindowedKStream, right? >>>>>>>> >>>>>>>> I’m in favor of this proposal. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> John >>>>>>>> >>>>>>>> On Sat, Jul 10, 2021, at 10:18, Ivan Ponomarev wrote: >>>>>>>>> Hello everyone, >>>>>>>>> >>>>>>>>> I would like to remind you about KIP-655 and KIP-759 just in >>>>>>>>> case they >>>>>>>>> got lost in your inbox. >>>>>>>>> >>>>>>>>> Now the initial proposal is split into two independent and >>>>>>>>> smaller ones, >>>>>>>>> so it must be easier to review them. Of course, if you have time. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> >>>>>>>>> Ivan >>>>>>>>> >>>>>>>>> >>>>>>>>> 24.06.2021 18:11, Ivan Ponomarev пишет: >>>>>>>>>> Hello all, >>>>>>>>>> >>>>>>>>>> I have rewritten the KIP-655 summarizing what was agreed upon >>>>>>>>>> during >>>>>>>>>> this discussion (now the proposal is much simpler and less >>>>>>>>>> invasive). >>>>>>>>>> >>>>>>>>>> I have also created KIP-759 (cancelRepartition operation) and >>>>>>>>>> started a >>>>>>>>>> discussion for it. >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> >>>>>>>>>> Ivan. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 04.06.2021 8:15, Matthias J. Sax пишет: >>>>>>>>>>> Just skimmed over the thread -- first of all, I am glad that >>>>>>>>>>> we could >>>>>>>>>>> merge KIP-418 and ship it :) >>>>>>>>>>> >>>>>>>>>>> About the re-partitioning concerns, there are already two >>>>>>>>>>> tickets for it: >>>>>>>>>>> >>>>>>>>>>> - https://issues.apache.org/jira/browse/KAFKA-4835 >>>>>>>>>>> - https://issues.apache.org/jira/browse/KAFKA-10844 >>>>>>>>>>> >>>>>>>>>>> Thus, it seems best to exclude this topic from this KIP, and >>>>>>>>>>> do a >>>>>>>>>>> separate KIP for it (if necessary, we can "pause" this KIP >>>>>>>>>>> until the >>>>>>>>>>> repartition KIP is done). It's a long standing "issue" and we >>>>>>>>>>> should >>>>>>>>>>> resolve it in a general way I guess. >>>>>>>>>>> >>>>>>>>>>> (Did not yet ready all responses in detail yet, so keeping >>>>>>>>>>> this comment >>>>>>>>>>> short.) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>>> On 6/2/21 6:35 AM, John Roesler wrote: >>>>>>>>>>>> Thanks, Ivan! >>>>>>>>>>>> >>>>>>>>>>>> That sounds like a great plan to me. Two smaller KIPs are >>>>>>>>>>>> easier to >>>>>>>>>>>> agree on than one big one. >>>>>>>>>>>> >>>>>>>>>>>> I agree hopping and sliding windows will actually have a >>>>>>>>>>>> duplicating >>>>>>>>>>>> effect. We can avoid adding distinct() to the sliding window >>>>>>>>>>>> interface, but hopping windows are just a different >>>>>>>>>>>> parameterization >>>>>>>>>>>> of epoch-aligned windows. It seems we can’t do much about >>>>>>>>>>>> that except >>>>>>>>>>>> document the issue. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> John >>>>>>>>>>>> >>>>>>>>>>>> On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote: >>>>>>>>>>>>> Hi John! >>>>>>>>>>>>> >>>>>>>>>>>>> I think that your proposal is just fantastic, it simplifies >>>>>>>>>>>>> things a >>>>>>>>>>>>> lot! >>>>>>>>>>>>> >>>>>>>>>>>>> I also felt uncomfortable due to the fact that the proposed >>>>>>>>>>>>> `distinct()` >>>>>>>>>>>>> is not somewhere near `count()` and `reduce(..)`. But >>>>>>>>>>>>> `selectKey(..).groupByKey().windowedBy(..).distinct()` >>>>>>>>>>>>> didn't look like >>>>>>>>>>>>> a correct option for me because of the issue with the >>>>>>>>>>>>> unneeded >>>>>>>>>>>>> repartitioning. >>>>>>>>>>>>> >>>>>>>>>>>>> The bold idea that we can just CANCEL the repartitioning >>>>>>>>>>>>> didn't came to >>>>>>>>>>>>> my mind. >>>>>>>>>>>>> >>>>>>>>>>>>> What seemed to me a single problem is in fact two unrelated >>>>>>>>>>>>> problems: >>>>>>>>>>>>> `distinct` operation and cancelling the unneeded >>>>>>>>>>>>> repartitioning. >>>>>>>>>>>>> >>>>>>>>>>>>> > what if we introduce a parameter to `selectKey()` >>>>>>>>>>>>> that specifies >>>>>>>>>>>>> that >>>>>>>>>>>>> the caller asserts that the new key does _not_ change the data >>>>>>>>>>>>> partitioning? >>>>>>>>>>>>> >>>>>>>>>>>>> I think a more elegant solution would be not to add a new >>>>>>>>>>>>> parameter to >>>>>>>>>>>>> `selectKey` and all the other key-changing operations (`map`, >>>>>>>>>>>>> `transform`, `flatMap`, ...), but add a new operator >>>>>>>>>>>>> `KStream#cancelRepartitioning()` that resets >>>>>>>>>>>>> `keyChangingOperation` >>>>>>>>>>>>> flag >>>>>>>>>>>>> for the upstream node. Of course, "use it only if you know >>>>>>>>>>>>> what you're >>>>>>>>>>>>> doing" warning is to be added. Well, it's a topic for a >>>>>>>>>>>>> separate KIP! >>>>>>>>>>>>> >>>>>>>>>>>>> Concerning `distinct()`. If we use `XXXWindowedKStream` >>>>>>>>>>>>> facilities, >>>>>>>>>>>>> then >>>>>>>>>>>>> changes to the API are minimally invasive: we're just adding >>>>>>>>>>>>> `distinct()` to TimeWindowedKStream and >>>>>>>>>>>>> SessionWindowedKStream, and >>>>>>>>>>>>> that's all. >>>>>>>>>>>>> >>>>>>>>>>>>> We can now define `distinct` as an operation that returns >>>>>>>>>>>>> only a first >>>>>>>>>>>>> record that falls into a new window, and filters out all >>>>>>>>>>>>> the other >>>>>>>>>>>>> records that fall into an already existing window. BTW, we >>>>>>>>>>>>> can mock the >>>>>>>>>>>>> behaviour of such an operation with `TopologyTestDriver` using >>>>>>>>>>>>> `reduce((l, r) -> STOP)`.filterNot((k, >>>>>>>>>>>>> v)->STOP.equals(v)). ;-) >>>>>>>>>>>>> >>>>>>>>>>>>> Consider the following example (record times are in seconds): >>>>>>>>>>>>> >>>>>>>>>>>>> //three bursts of variously ordered records >>>>>>>>>>>>> 4, 5, 6 >>>>>>>>>>>>> 23, 22, 24 >>>>>>>>>>>>> 34, 33, 32 >>>>>>>>>>>>> //'late arrivals' >>>>>>>>>>>>> 7, 22, 35 >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> 1. 'Epoch-aligned deduplication' using tumbling windows: >>>>>>>>>>>>> >>>>>>>>>>>>> .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct() >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> produces >>>>>>>>>>>>> >>>>>>>>>>>>> (key@[00000/10000], 4) >>>>>>>>>>>>> (key@[20000/30000], 23) >>>>>>>>>>>>> (key@[30000/40000], 34) >>>>>>>>>>>>> >>>>>>>>>>>>> -- that is, one record per epoch-aligned window. >>>>>>>>>>>>> >>>>>>>>>>>>> 2. Hopping and sliding windows do not make much sense here, >>>>>>>>>>>>> because >>>>>>>>>>>>> they >>>>>>>>>>>>> produce multiple intersected windows, so that one record >>>>>>>>>>>>> can be >>>>>>>>>>>>> multiplied, but we want deduplication. >>>>>>>>>>>>> >>>>>>>>>>>>> 3. SessionWindows work for 'data-aligned deduplication'. >>>>>>>>>>>>> >>>>>>>>>>>>> .groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct() >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> produces only >>>>>>>>>>>>> >>>>>>>>>>>>> ([key@4000/4000], 4) >>>>>>>>>>>>> ([key@23000/23000], 23) >>>>>>>>>>>>> >>>>>>>>>>>>> because all the records bigger than 7 are stuck together in >>>>>>>>>>>>> one >>>>>>>>>>>>> session. >>>>>>>>>>>>> Setting inactivity gap to 9 seconds will return three records: >>>>>>>>>>>>> >>>>>>>>>>>>> ([key@4000/4000], 4) >>>>>>>>>>>>> ([key@23000/23000], 23) >>>>>>>>>>>>> ([key@34000/34000], 34) >>>>>>>>>>>>> >>>>>>>>>>>>> WDYT? If you like this variant, I will re-write KIP-655 and >>>>>>>>>>>>> propose a >>>>>>>>>>>>> separate KIP for `cancelRepartitioning` (or whatever name >>>>>>>>>>>>> we will >>>>>>>>>>>>> choose >>>>>>>>>>>>> for it). >>>>>>>>>>>>> >>>>>>>>>>>>> Regards, >>>>>>>>>>>>> >>>>>>>>>>>>> Ivan >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> 24.05.2021 22:32, John Roesler пишет: >>>>>>>>>>>>>> Hey there, Ivan! >>>>>>>>>>>>>> >>>>>>>>>>>>>> In typical fashion, I'm going to make a somewhat outlandish >>>>>>>>>>>>>> proposal. I'm hoping that we can side-step some of the >>>>>>>>>>>>>> complications that have arisen. Please bear with me. >>>>>>>>>>>>>> >>>>>>>>>>>>>> It seems like `distinct()` is not fundamentally unlike >>>>>>>>>>>>>> other windowed >>>>>>>>>>>>>> "aggregation" operations. Your concern about unnecessary >>>>>>>>>>>>>> repartitioning seems to apply just as well to `count()` as to >>>>>>>>>>>>>> `distinct()`. >>>>>>>>>>>>>> This has come up before, but I don't remember when: what >>>>>>>>>>>>>> if we >>>>>>>>>>>>>> introduce a parameter to `selectKey()` that specifies that >>>>>>>>>>>>>> the caller >>>>>>>>>>>>>> asserts that the new key does _not_ change the data >>>>>>>>>>>>>> partitioning? >>>>>>>>>>>>>> The docs on that parameter would of course spell out all >>>>>>>>>>>>>> the "rights >>>>>>>>>>>>>> and responsibilities" of setting it. >>>>>>>>>>>>>> >>>>>>>>>>>>>> In that case, we could indeed get back to >>>>>>>>>>>>>> `selectKey(A).windowBy(B).distinct(...)`, where we get to >>>>>>>>>>>>>> compose the >>>>>>>>>>>>>> key mapper and the windowing function without having to >>>>>>>>>>>>>> carve out >>>>>>>>>>>>>> a separate domain just for `distinct()`. All the rest of >>>>>>>>>>>>>> the KStream >>>>>>>>>>>>>> operations would also benefit. >>>>>>>>>>>>>> >>>>>>>>>>>>>> What do you think? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> John >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, May 23, 2021, at 08:09, Ivan Ponomarev wrote: >>>>>>>>>>>>>>> Hello everyone, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> let me revive the discussion for KIP-655. Now I have some >>>>>>>>>>>>>>> time >>>>>>>>>>>>>>> again and >>>>>>>>>>>>>>> I'm eager to finalize this. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Based on what was already discussed, I think that we can >>>>>>>>>>>>>>> split the >>>>>>>>>>>>>>> discussion into three topics for our convenience. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The three topics are: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - idExtractor (how should we extract the deduplication >>>>>>>>>>>>>>> key for >>>>>>>>>>>>>>> the record) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - timeWindows (what time windows should we use) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - miscellaneous (naming etc.) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> ---- idExtractor ---- >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Original proposal: use (k, v) -> f(k, v) mapper, >>>>>>>>>>>>>>> defaulting to (k, >>>>>>>>>>>>>>> v) -> >>>>>>>>>>>>>>> k. The drawback here is that we must warn the user to >>>>>>>>>>>>>>> choose such a >>>>>>>>>>>>>>> function that sets different IDs for records from different >>>>>>>>>>>>>>> partitions, >>>>>>>>>>>>>>> otherwise same IDs might be not co-partitioned (and not >>>>>>>>>>>>>>> deduplicated as >>>>>>>>>>>>>>> a result). Additional concern: what should we do when >>>>>>>>>>>>>>> this function >>>>>>>>>>>>>>> returns null? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Matthias proposed key-only deduplication: that is, no >>>>>>>>>>>>>>> idExtractor at >>>>>>>>>>>>>>> all, and if we want to use `distinct` for a particular >>>>>>>>>>>>>>> identifier, we >>>>>>>>>>>>>>> must `selectKey()` before. The drawback of this approach >>>>>>>>>>>>>>> is that >>>>>>>>>>>>>>> we will >>>>>>>>>>>>>>> always have repartitioning after the key selection, while >>>>>>>>>>>>>>> in practice >>>>>>>>>>>>>>> repartitioning will not always be necessary (for example, >>>>>>>>>>>>>>> when the >>>>>>>>>>>>>>> data >>>>>>>>>>>>>>> stream is such that different values infer different keys). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> So here we have a 'safety vs. performance' trade-off. But >>>>>>>>>>>>>>> 'safe' >>>>>>>>>>>>>>> variant >>>>>>>>>>>>>>> is also not very convenient for developers, since we're >>>>>>>>>>>>>>> forcing >>>>>>>>>>>>>>> them to >>>>>>>>>>>>>>> change the structure of their records. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> A 'golden mean' here might be using composite ID with its >>>>>>>>>>>>>>> first >>>>>>>>>>>>>>> component equals to k and its second component equals to >>>>>>>>>>>>>>> some f(v) (f >>>>>>>>>>>>>>> defaults to v -> null, and null value returned by f(v) means >>>>>>>>>>>>>>> 'deduplicate by the key only'). The nuance here is that >>>>>>>>>>>>>>> we will have >>>>>>>>>>>>>>> serializers only for types of k and f(v), and we must >>>>>>>>>>>>>>> correctly >>>>>>>>>>>>>>> serialize a tuple (k, f(v)), but of course this is doable. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> What do you think? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> ---- timeWindows ---- >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Originally I proposed TimeWindows only just because they >>>>>>>>>>>>>>> solved my >>>>>>>>>>>>>>> particular case :-) but agree with Matthias' and Sophie's >>>>>>>>>>>>>>> objections. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I like the Sophie's point: we need both epoch-aligned and >>>>>>>>>>>>>>> data-aligned >>>>>>>>>>>>>>> windows. IMO this is absolutely correct: "data-aligned is >>>>>>>>>>>>>>> useful for >>>>>>>>>>>>>>> example when you know that a large number of updates to a >>>>>>>>>>>>>>> single key >>>>>>>>>>>>>>> will occur in short bursts, and epoch-aligned when you >>>>>>>>>>>>>>> specifically want >>>>>>>>>>>>>>> to get just a single update per discrete time interval." >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I just cannot agree right away with Sophie's >>>>>>>>>>>>>>> .groupByKey().windowedBy(...).distinct() proposal, as it >>>>>>>>>>>>>>> implies the >>>>>>>>>>>>>>> key-only deduplication -- see the previous topic. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Epoch-aligned windows are very simple: they should >>>>>>>>>>>>>>> forward only one >>>>>>>>>>>>>>> record per enumerated time window. TimeWindows are >>>>>>>>>>>>>>> exactly what we >>>>>>>>>>>>>>> want >>>>>>>>>>>>>>> here. I mentioned in the KIP both tumbling and hopping >>>>>>>>>>>>>>> windows just >>>>>>>>>>>>>>> because both are possible for TimeWindows, but indeed I >>>>>>>>>>>>>>> don't see any >>>>>>>>>>>>>>> real use case for hopping windows, only tumbling windows >>>>>>>>>>>>>>> make >>>>>>>>>>>>>>> sence IMO. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> For data-aligned windows SlidingWindow interface seems to >>>>>>>>>>>>>>> be a nearly >>>>>>>>>>>>>>> valid choice. Nearly. It should forward a record once >>>>>>>>>>>>>>> when it's first >>>>>>>>>>>>>>> seen, and then not again for any identical records that >>>>>>>>>>>>>>> fall into the >>>>>>>>>>>>>>> next N timeUnits. However, we cannot reuse SlidingWindow >>>>>>>>>>>>>>> as is, >>>>>>>>>>>>>>> because >>>>>>>>>>>>>>> just as Matthias noted, SlidingWindows go backward in >>>>>>>>>>>>>>> time, while we >>>>>>>>>>>>>>> need a windows that go forward in time, and are not >>>>>>>>>>>>>>> opened while >>>>>>>>>>>>>>> records >>>>>>>>>>>>>>> fall into an already existing window. We definitely >>>>>>>>>>>>>>> should make >>>>>>>>>>>>>>> our own >>>>>>>>>>>>>>> implementation, maybe we should call it ExpirationWindow? >>>>>>>>>>>>>>> WDYT? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> ---- miscellaneous ---- >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Persistent/in-memory stores. Matthias proposed to pass >>>>>>>>>>>>>>> Materialized >>>>>>>>>>>>>>> parameter next to DistinctParameters (and this is necessary, >>>>>>>>>>>>>>> because we >>>>>>>>>>>>>>> will need to provide a serializer for extracted id). This is >>>>>>>>>>>>>>> absolutely >>>>>>>>>>>>>>> valid point, I agree and I will fix it in the KIP. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Naming. Sophie noted that the Streams DSL operators are >>>>>>>>>>>>>>> typically >>>>>>>>>>>>>>> named >>>>>>>>>>>>>>> as verbs, so she proposes `deduplicate` in favour of >>>>>>>>>>>>>>> `distinct`. I >>>>>>>>>>>>>>> think >>>>>>>>>>>>>>> that while it's important to stick to the naming >>>>>>>>>>>>>>> conventions, it >>>>>>>>>>>>>>> is also >>>>>>>>>>>>>>> important to think of the experience of those who come >>>>>>>>>>>>>>> from different >>>>>>>>>>>>>>> stacks/technologies. People who are familiar with SQL and >>>>>>>>>>>>>>> Java >>>>>>>>>>>>>>> Streams >>>>>>>>>>>>>>> API must know for sure what does 'distinct' mean, while data >>>>>>>>>>>>>>> deduplication in general is a more complex task and thus >>>>>>>>>>>>>>> `deduplicate` >>>>>>>>>>>>>>> might be misleading. But I'm ready to be convinced if the >>>>>>>>>>>>>>> majority >>>>>>>>>>>>>>> thinks otherwise. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Ivan >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 14.09.2020 21:31, Sophie Blee-Goldman пишет: >>>>>>>>>>>>>>>> Hey all, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I'm not convinced either epoch-aligned or data-aligned >>>>>>>>>>>>>>>> will fit all >>>>>>>>>>>>>>>> possible use cases. >>>>>>>>>>>>>>>> Both seem totally reasonable to me: data-aligned is >>>>>>>>>>>>>>>> useful for >>>>>>>>>>>>>>>> example when >>>>>>>>>>>>>>>> you know >>>>>>>>>>>>>>>> that a large number of updates to a single key will >>>>>>>>>>>>>>>> occur in >>>>>>>>>>>>>>>> short bursts, >>>>>>>>>>>>>>>> and epoch- >>>>>>>>>>>>>>>> aligned when you specifically want to get just a single >>>>>>>>>>>>>>>> update >>>>>>>>>>>>>>>> per discrete >>>>>>>>>>>>>>>> time >>>>>>>>>>>>>>>> interval. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Going a step further, though, what if you want just a >>>>>>>>>>>>>>>> single >>>>>>>>>>>>>>>> update per >>>>>>>>>>>>>>>> calendar >>>>>>>>>>>>>>>> month, or per year with accounting for leap years? >>>>>>>>>>>>>>>> Neither of >>>>>>>>>>>>>>>> those are >>>>>>>>>>>>>>>> serviced that >>>>>>>>>>>>>>>> well by the existing Windows specification to windowed >>>>>>>>>>>>>>>> aggregations, a >>>>>>>>>>>>>>>> well-known >>>>>>>>>>>>>>>> limitation of the current API. There is actually a KIP >>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> going >>>>>>>>>>>>>>>> on in parallel to fix this >>>>>>>>>>>>>>>> exact issue and make the windowing interface much more >>>>>>>>>>>>>>>> flexible. >>>>>>>>>>>>>>>> Maybe >>>>>>>>>>>>>>>> instead >>>>>>>>>>>>>>>> of re-implementing this windowing interface in a similarly >>>>>>>>>>>>>>>> limited fashion >>>>>>>>>>>>>>>> for the >>>>>>>>>>>>>>>> Distinct operator, we could leverage it here and get all >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> benefits >>>>>>>>>>>>>>>> coming with >>>>>>>>>>>>>>>> KIP-645. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Specifically, I'm proposing to remove the >>>>>>>>>>>>>>>> TimeWindows/etc config >>>>>>>>>>>>>>>> from the >>>>>>>>>>>>>>>> DistinctParameters class, and move the distinct() method >>>>>>>>>>>>>>>> from the >>>>>>>>>>>>>>>> KStream >>>>>>>>>>>>>>>> interface >>>>>>>>>>>>>>>> to the TimeWindowedKStream interface. Since it's >>>>>>>>>>>>>>>> semantically >>>>>>>>>>>>>>>> similar to a >>>>>>>>>>>>>>>> kind of >>>>>>>>>>>>>>>> windowed aggregation, it makes sense to align it with the >>>>>>>>>>>>>>>> existing windowing >>>>>>>>>>>>>>>> framework, ie: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> inputStream >>>>>>>>>>>>>>>> .groupKyKey() >>>>>>>>>>>>>>>> .windowedBy() >>>>>>>>>>>>>>>> .distinct() >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Then we could use data-aligned windows if SlidingWindows is >>>>>>>>>>>>>>>> specified in >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> windowedBy(), and epoch-aligned (or some other kind of >>>>>>>>>>>>>>>> enumerable >>>>>>>>>>>>>>>> window) >>>>>>>>>>>>>>>> if a Windows is specified in windowedBy() (or an >>>>>>>>>>>>>>>> EnumerableWindowDefinition >>>>>>>>>>>>>>>> once KIP-645 is implemented to replace Windows). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> *SlidingWindows*: should forward a record once when it's >>>>>>>>>>>>>>>> first >>>>>>>>>>>>>>>> seen, and >>>>>>>>>>>>>>>> then not again >>>>>>>>>>>>>>>> for any identical records that fall into the next N >>>>>>>>>>>>>>>> timeUnits. This >>>>>>>>>>>>>>>> includes out-of-order >>>>>>>>>>>>>>>> records, ie if you have a SlidingWindows of size 10s and >>>>>>>>>>>>>>>> process >>>>>>>>>>>>>>>> records at >>>>>>>>>>>>>>>> time >>>>>>>>>>>>>>>> 15s, 20s, 14s then you would just forward the one at 15s. >>>>>>>>>>>>>>>> Presumably, if >>>>>>>>>>>>>>>> you're >>>>>>>>>>>>>>>> using SlidingWindows, you don't care about what falls >>>>>>>>>>>>>>>> into exact >>>>>>>>>>>>>>>> time >>>>>>>>>>>>>>>> boxes, you just >>>>>>>>>>>>>>>> want to deduplicate. If you do care about exact time >>>>>>>>>>>>>>>> boxing then >>>>>>>>>>>>>>>> you should >>>>>>>>>>>>>>>> use... >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> *EnumerableWindowDefinition* (eg *TimeWindows*): should >>>>>>>>>>>>>>>> forward >>>>>>>>>>>>>>>> only one >>>>>>>>>>>>>>>> record >>>>>>>>>>>>>>>> per enumerated time window. If you get a records at 15s, >>>>>>>>>>>>>>>> 20s,14s >>>>>>>>>>>>>>>> where the >>>>>>>>>>>>>>>> windows >>>>>>>>>>>>>>>> are enumerated at [5,14], [15, 24], etc then you forward >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> record at 15s >>>>>>>>>>>>>>>> and also >>>>>>>>>>>>>>>> the record at 14s >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Just an idea: not sure if the impedance mismatch would >>>>>>>>>>>>>>>> throw >>>>>>>>>>>>>>>> users off >>>>>>>>>>>>>>>> since the >>>>>>>>>>>>>>>> semantics of the distinct windows are slightly different >>>>>>>>>>>>>>>> than in the >>>>>>>>>>>>>>>> aggregations. >>>>>>>>>>>>>>>> But if we don't fit this into the existing windowed >>>>>>>>>>>>>>>> framework, >>>>>>>>>>>>>>>> then we >>>>>>>>>>>>>>>> shouldn't use >>>>>>>>>>>>>>>> any existing Windows-type classes at all, imo. ie we should >>>>>>>>>>>>>>>> create a new >>>>>>>>>>>>>>>> DistinctWindows config class, similar to how >>>>>>>>>>>>>>>> stream-stream joins >>>>>>>>>>>>>>>> get their >>>>>>>>>>>>>>>> own >>>>>>>>>>>>>>>> JoinWindows class >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I also think that non-windowed deduplication could be >>>>>>>>>>>>>>>> useful, in >>>>>>>>>>>>>>>> which case >>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>> would want to also have the distinct() operator on the >>>>>>>>>>>>>>>> KStream >>>>>>>>>>>>>>>> interface. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> One quick note regarding the naming: it seems like the >>>>>>>>>>>>>>>> Streams >>>>>>>>>>>>>>>> DSL operators >>>>>>>>>>>>>>>> are typically named as verbs rather than adjectives, for >>>>>>>>>>>>>>>> example. >>>>>>>>>>>>>>>> #suppress >>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>> #aggregate. I get that there's some precedent for >>>>>>>>>>>>>>>> 'distinct' >>>>>>>>>>>>>>>> specifically, >>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>> maybe something like 'deduplicate' would be more >>>>>>>>>>>>>>>> appropriate for >>>>>>>>>>>>>>>> the Streams >>>>>>>>>>>>>>>> API. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> WDYT? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Mon, Sep 14, 2020 at 10:04 AM Ivan Ponomarev >>>>>>>>>>>>>>>> <iponoma...@mail.ru.invalid> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for your review! It made me think deeper, and >>>>>>>>>>>>>>>>> indeed I >>>>>>>>>>>>>>>>> understood >>>>>>>>>>>>>>>>> that I was missing some important details. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> To simplify, let me explain my particular use case >>>>>>>>>>>>>>>>> first so I >>>>>>>>>>>>>>>>> can refer >>>>>>>>>>>>>>>>> to it later. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> We have a system that collects information about >>>>>>>>>>>>>>>>> ongoing live >>>>>>>>>>>>>>>>> sporting >>>>>>>>>>>>>>>>> events from different sources. The information sources >>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>> their IDs >>>>>>>>>>>>>>>>> and these IDs are keys of the stream. Each source emits >>>>>>>>>>>>>>>>> messages >>>>>>>>>>>>>>>>> concerning sporting events, and we can have many >>>>>>>>>>>>>>>>> messages about >>>>>>>>>>>>>>>>> each >>>>>>>>>>>>>>>>> sporing event from each source. Event ID is extracted >>>>>>>>>>>>>>>>> from the >>>>>>>>>>>>>>>>> message. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> We need a database of event IDs that were reported at >>>>>>>>>>>>>>>>> least once >>>>>>>>>>>>>>>>> by each >>>>>>>>>>>>>>>>> source (important: events from different sources are >>>>>>>>>>>>>>>>> considered >>>>>>>>>>>>>>>>> to be >>>>>>>>>>>>>>>>> different entities). The requirements are: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 1) each new event ID should be written to the database >>>>>>>>>>>>>>>>> as soon >>>>>>>>>>>>>>>>> as possible >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 2) although it's ok and sometimes even desired to >>>>>>>>>>>>>>>>> repeat the >>>>>>>>>>>>>>>>> notification about already known event ID, but we >>>>>>>>>>>>>>>>> wouldn’t like our >>>>>>>>>>>>>>>>> database to be bothered by the same event ID more often >>>>>>>>>>>>>>>>> than >>>>>>>>>>>>>>>>> once in a >>>>>>>>>>>>>>>>> given period of time (say, 15 minutes). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> With this example in mind let me answer your questions >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > (1) Using the `idExtractor` has the issue >>>>>>>>>>>>>>>>> that data might >>>>>>>>>>>>>>>>> not be >>>>>>>>>>>>>>>>> > co-partitioned as you mentioned in the KIP. >>>>>>>>>>>>>>>>> Thus, I am >>>>>>>>>>>>>>>>> wondering if it >>>>>>>>>>>>>>>>> > might be better to do deduplication only on >>>>>>>>>>>>>>>>> the key? If >>>>>>>>>>>>>>>>> one sets a new >>>>>>>>>>>>>>>>> > key upstream (ie, extracts the deduplication >>>>>>>>>>>>>>>>> id into the >>>>>>>>>>>>>>>>> key), the >>>>>>>>>>>>>>>>> > `distinct` operator could automatically >>>>>>>>>>>>>>>>> repartition the >>>>>>>>>>>>>>>>> data and thus we >>>>>>>>>>>>>>>>> > would avoid user errors. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Of course with 'key-only' deduplication + >>>>>>>>>>>>>>>>> autorepartitioning we >>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>> never cause problems with co-partitioning. But in >>>>>>>>>>>>>>>>> practice, we >>>>>>>>>>>>>>>>> often >>>>>>>>>>>>>>>>> don't need repartitioning even if 'dedup ID' is >>>>>>>>>>>>>>>>> different from >>>>>>>>>>>>>>>>> the key, >>>>>>>>>>>>>>>>> like in my example above. So here we have a sort of >>>>>>>>>>>>>>>>> 'performance vs >>>>>>>>>>>>>>>>> security' tradeoff. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The 'golden middle way' here can be the following: we >>>>>>>>>>>>>>>>> can form a >>>>>>>>>>>>>>>>> deduplication ID as KEY + separator + >>>>>>>>>>>>>>>>> idExtractor(VALUE). In case >>>>>>>>>>>>>>>>> idExtractor is not provided, we deduplicate by key only >>>>>>>>>>>>>>>>> (as in >>>>>>>>>>>>>>>>> original >>>>>>>>>>>>>>>>> proposal). Then idExtractor transforms only the value >>>>>>>>>>>>>>>>> (and not >>>>>>>>>>>>>>>>> the key) >>>>>>>>>>>>>>>>> and its result is appended to the key. Records from >>>>>>>>>>>>>>>>> different >>>>>>>>>>>>>>>>> partitions >>>>>>>>>>>>>>>>> will inherently have different deduplication IDs and >>>>>>>>>>>>>>>>> all the >>>>>>>>>>>>>>>>> data will >>>>>>>>>>>>>>>>> be co-partitioned. As with any stateful operation, we will >>>>>>>>>>>>>>>>> repartition >>>>>>>>>>>>>>>>> the topic in case the key was changed upstream, but >>>>>>>>>>>>>>>>> only in this >>>>>>>>>>>>>>>>> case, >>>>>>>>>>>>>>>>> thus avoiding unnecessary repartitioning. My example >>>>>>>>>>>>>>>>> above fits >>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>> perfectly. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > (2) What is the motivation for allowing the >>>>>>>>>>>>>>>>> `idExtractor` >>>>>>>>>>>>>>>>> to return >>>>>>>>>>>>>>>>> > `null`? Might be good to have some use-case >>>>>>>>>>>>>>>>> examples for >>>>>>>>>>>>>>>>> this feature. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Can't think of any use-cases. As it often happens, it's >>>>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>> came with a >>>>>>>>>>>>>>>>> copy-paste from StackOverflow -- see Michael Noll's >>>>>>>>>>>>>>>>> answer here: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> https://stackoverflow.com/questions/55803210/how-to-handle-duplicate-messages-using-kafka-streaming-dsl-functions >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> But, jokes aside, we'll have to decide what to do with >>>>>>>>>>>>>>>>> nulls. If we >>>>>>>>>>>>>>>>> accept the above proposal of having deduplication ID as >>>>>>>>>>>>>>>>> KEY + >>>>>>>>>>>>>>>>> postfix, >>>>>>>>>>>>>>>>> then null can be treated as no postfix at all. If we don't >>>>>>>>>>>>>>>>> accept this >>>>>>>>>>>>>>>>> approach, then treating nulls as 'no-deduplication' >>>>>>>>>>>>>>>>> seems to be a >>>>>>>>>>>>>>>>> reasonable assumption (we can't get or put null as a >>>>>>>>>>>>>>>>> key to a KV >>>>>>>>>>>>>>>>> store, >>>>>>>>>>>>>>>>> so a record with null ID is always going to look 'new' >>>>>>>>>>>>>>>>> for us). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > (2) Is using a `TimeWindow` really what we >>>>>>>>>>>>>>>>> want? I was >>>>>>>>>>>>>>>>> wondering if a >>>>>>>>>>>>>>>>> > `SlidingWindow` might be better? Or maybe we >>>>>>>>>>>>>>>>> need a new >>>>>>>>>>>>>>>>> type of window? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Agree. It's probably not what we want. Once I thought >>>>>>>>>>>>>>>>> that reusing >>>>>>>>>>>>>>>>> TimeWindow is a clever idea, now I don't. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Do we need epoch alignment in our use case? No, we >>>>>>>>>>>>>>>>> don't, and I >>>>>>>>>>>>>>>>> don't >>>>>>>>>>>>>>>>> know if anyone going to need this. Epoch alignment is >>>>>>>>>>>>>>>>> good for >>>>>>>>>>>>>>>>> aggregation, but deduplication is a different story. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Let me describe the semantic the way I see it now and >>>>>>>>>>>>>>>>> tell me >>>>>>>>>>>>>>>>> what you >>>>>>>>>>>>>>>>> think: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - the only parameter that defines the deduplication >>>>>>>>>>>>>>>>> logic is >>>>>>>>>>>>>>>>> 'expiration >>>>>>>>>>>>>>>>> period' >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - when a deduplication ID arrives and we cannot find it >>>>>>>>>>>>>>>>> in the >>>>>>>>>>>>>>>>> store, we >>>>>>>>>>>>>>>>> forward the message downstream and store the ID + its >>>>>>>>>>>>>>>>> timestamp. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - when an out-of-order ID arrives with an older >>>>>>>>>>>>>>>>> timestamp and we >>>>>>>>>>>>>>>>> find a >>>>>>>>>>>>>>>>> 'fresher' record, we do nothing and don't forward the >>>>>>>>>>>>>>>>> message >>>>>>>>>>>>>>>>> (??? OR >>>>>>>>>>>>>>>>> NOT? In what case would we want to forward an out-of-order >>>>>>>>>>>>>>>>> message?) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - when an ID with fresher timestamp arrives we check if >>>>>>>>>>>>>>>>> it falls >>>>>>>>>>>>>>>>> into >>>>>>>>>>>>>>>>> the expiration period and either forward it or not, but >>>>>>>>>>>>>>>>> in both >>>>>>>>>>>>>>>>> cases we >>>>>>>>>>>>>>>>> update the timestamp of the message in the store >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - the WindowStore retention mechanism should clean up >>>>>>>>>>>>>>>>> very old >>>>>>>>>>>>>>>>> records >>>>>>>>>>>>>>>>> in order not to run out of space. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > (3) `isPersistent` -- instead of using this >>>>>>>>>>>>>>>>> flag, it seems >>>>>>>>>>>>>>>>> better to >>>>>>>>>>>>>>>>> > allow users to pass in a `Materialized` >>>>>>>>>>>>>>>>> parameter next to >>>>>>>>>>>>>>>>> > `DistinctParameters` to configure the state >>>>>>>>>>>>>>>>> store? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Fully agree! Users might also want to change the >>>>>>>>>>>>>>>>> retention time. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > (4) I am wondering if we should really have 4 >>>>>>>>>>>>>>>>> overloads for >>>>>>>>>>>>>>>>> > `DistinctParameters.with()`? It might be >>>>>>>>>>>>>>>>> better to have >>>>>>>>>>>>>>>>> one overload >>>>>>>>>>>>>>>>> > with all require parameters, and add optional >>>>>>>>>>>>>>>>> parameters >>>>>>>>>>>>>>>>> using the >>>>>>>>>>>>>>>>> > builder pattern? This seems to follow the DSL >>>>>>>>>>>>>>>>> Grammer >>>>>>>>>>>>>>>>> proposal. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Oh, I can explain. We can't fully rely on the builder >>>>>>>>>>>>>>>>> pattern >>>>>>>>>>>>>>>>> because of >>>>>>>>>>>>>>>>> Java type inference limitations. We have to provide type >>>>>>>>>>>>>>>>> parameters to >>>>>>>>>>>>>>>>> the builder methods or the code won't compile: see e. >>>>>>>>>>>>>>>>> g. this >>>>>>>>>>>>>>>>> https://twitter.com/inponomarev/status/1265053286933159938 >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> following >>>>>>>>>>>>>>>>> discussion with Tagir Valeev. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> When we came across the similar difficulties in >>>>>>>>>>>>>>>>> KIP-418, we finally >>>>>>>>>>>>>>>>> decided to add all the necessary overloads to parameter >>>>>>>>>>>>>>>>> class. >>>>>>>>>>>>>>>>> So I just >>>>>>>>>>>>>>>>> reproduced that approach here. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > (5) Even if it might be an implementation >>>>>>>>>>>>>>>>> detail (and >>>>>>>>>>>>>>>>> maybe the KIP >>>>>>>>>>>>>>>>> > itself does not need to mention it), can you >>>>>>>>>>>>>>>>> give a high >>>>>>>>>>>>>>>>> level overview >>>>>>>>>>>>>>>>> > how you intent to implement it (that would be >>>>>>>>>>>>>>>>> easier to >>>>>>>>>>>>>>>>> grog, compared >>>>>>>>>>>>>>>>> > to reading the PR). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Well as with any operation on KStreamImpl level I'm >>>>>>>>>>>>>>>>> building a >>>>>>>>>>>>>>>>> store and >>>>>>>>>>>>>>>>> a processor node. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> KStreamDistinct class is going to be the >>>>>>>>>>>>>>>>> ProcessorSupplier, with >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> logic regarding the forwarding/muting of the records >>>>>>>>>>>>>>>>> located in >>>>>>>>>>>>>>>>> KStreamDistinct.KStreamDistinctProcessor#process >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ---- >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Matthias, if you are still reading this :-) a gentle >>>>>>>>>>>>>>>>> reminder: >>>>>>>>>>>>>>>>> my PR for >>>>>>>>>>>>>>>>> already accepted KIP-418 is still waiting for your >>>>>>>>>>>>>>>>> review. I >>>>>>>>>>>>>>>>> think it's >>>>>>>>>>>>>>>>> better for me to finalize at least one KIP before >>>>>>>>>>>>>>>>> proceeding to >>>>>>>>>>>>>>>>> a new >>>>>>>>>>>>>>>>> one :-) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Ivan >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 03.09.2020 4:20, Matthias J. Sax пишет: >>>>>>>>>>>>>>>>>> Thanks for the KIP Ivan. Having a built-in deduplication >>>>>>>>>>>>>>>>>> operator is for >>>>>>>>>>>>>>>>>> sure a good addition. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Couple of questions: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> (1) Using the `idExtractor` has the issue that data >>>>>>>>>>>>>>>>>> might not be >>>>>>>>>>>>>>>>>> co-partitioned as you mentioned in the KIP. Thus, I am >>>>>>>>>>>>>>>>>> wondering if it >>>>>>>>>>>>>>>>>> might be better to do deduplication only on the key? >>>>>>>>>>>>>>>>>> If one >>>>>>>>>>>>>>>>>> sets a new >>>>>>>>>>>>>>>>>> key upstream (ie, extracts the deduplication id into >>>>>>>>>>>>>>>>>> the key), the >>>>>>>>>>>>>>>>>> `distinct` operator could automatically repartition >>>>>>>>>>>>>>>>>> the data >>>>>>>>>>>>>>>>>> and thus we >>>>>>>>>>>>>>>>>> would avoid user errors. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> (2) What is the motivation for allowing the >>>>>>>>>>>>>>>>>> `idExtractor` to >>>>>>>>>>>>>>>>>> return >>>>>>>>>>>>>>>>>> `null`? Might be good to have some use-case examples >>>>>>>>>>>>>>>>>> for this >>>>>>>>>>>>>>>>>> feature. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> (2) Is using a `TimeWindow` really what we want? I was >>>>>>>>>>>>>>>>>> wondering if a >>>>>>>>>>>>>>>>>> `SlidingWindow` might be better? Or maybe we need a >>>>>>>>>>>>>>>>>> new type of >>>>>>>>>>>>>>>>>> window? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> It would be helpful if you could describe potential >>>>>>>>>>>>>>>>>> use cases >>>>>>>>>>>>>>>>>> in more >>>>>>>>>>>>>>>>>> detail. -- I am mainly wondering about hopping window? >>>>>>>>>>>>>>>>>> Each >>>>>>>>>>>>>>>>>> record would >>>>>>>>>>>>>>>>>> always falls into multiple window and thus would be >>>>>>>>>>>>>>>>>> emitted >>>>>>>>>>>>>>>>>> multiple >>>>>>>>>>>>>>>>>> times, ie, each time the window closes. Is this really >>>>>>>>>>>>>>>>>> a valid >>>>>>>>>>>>>>>>>> use case? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> It seems that for de-duplication, one wants to have some >>>>>>>>>>>>>>>>>> "expiration >>>>>>>>>>>>>>>>>> time", ie, for each ID, deduplicate all consecutive >>>>>>>>>>>>>>>>>> records >>>>>>>>>>>>>>>>>> with the >>>>>>>>>>>>>>>>>> same ID and emit the first record after the >>>>>>>>>>>>>>>>>> "expiration time" >>>>>>>>>>>>>>>>>> passed. In >>>>>>>>>>>>>>>>>> terms of a window, this would mean that the window >>>>>>>>>>>>>>>>>> starts at >>>>>>>>>>>>>>>>>> `r.ts` and >>>>>>>>>>>>>>>>>> ends at `r.ts + windowSize`, ie, the window is aligned >>>>>>>>>>>>>>>>>> to the >>>>>>>>>>>>>>>>>> data. >>>>>>>>>>>>>>>>>> TimeWindows are aligned to the epoch though. While >>>>>>>>>>>>>>>>>> `SlidingWindows` also >>>>>>>>>>>>>>>>>> align to the data, for the aggregation use-case they go >>>>>>>>>>>>>>>>>> backward in >>>>>>>>>>>>>>>>>> time, while we need a window that goes forward in >>>>>>>>>>>>>>>>>> time. It's an >>>>>>>>>>>>>>>>>> open >>>>>>>>>>>>>>>>>> question if we can re-purpose `SlidingWindows` -- it >>>>>>>>>>>>>>>>>> might be >>>>>>>>>>>>>>>>>> ok the >>>>>>>>>>>>>>>>>> make the alignment (into the past vs into the future) >>>>>>>>>>>>>>>>>> an operator >>>>>>>>>>>>>>>>>> dependent behavior? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> (3) `isPersistent` -- instead of using this flag, it >>>>>>>>>>>>>>>>>> seems >>>>>>>>>>>>>>>>>> better to >>>>>>>>>>>>>>>>>> allow users to pass in a `Materialized` parameter next to >>>>>>>>>>>>>>>>>> `DistinctParameters` to configure the state store? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> (4) I am wondering if we should really have 4 >>>>>>>>>>>>>>>>>> overloads for >>>>>>>>>>>>>>>>>> `DistinctParameters.with()`? It might be better to >>>>>>>>>>>>>>>>>> have one >>>>>>>>>>>>>>>>>> overload >>>>>>>>>>>>>>>>>> with all require parameters, and add optional >>>>>>>>>>>>>>>>>> parameters using the >>>>>>>>>>>>>>>>>> builder pattern? This seems to follow the DSL Grammer >>>>>>>>>>>>>>>>>> proposal. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> (5) Even if it might be an implementation detail (and >>>>>>>>>>>>>>>>>> maybe the >>>>>>>>>>>>>>>>>> KIP >>>>>>>>>>>>>>>>>> itself does not need to mention it), can you give a >>>>>>>>>>>>>>>>>> high level >>>>>>>>>>>>>>>>>> overview >>>>>>>>>>>>>>>>>> how you intent to implement it (that would be easier >>>>>>>>>>>>>>>>>> to grog, >>>>>>>>>>>>>>>>>> compared >>>>>>>>>>>>>>>>>> to reading the PR). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On 8/23/20 4:29 PM, Ivan Ponomarev wrote: >>>>>>>>>>>>>>>>>>> Sorry, I forgot to add [DISCUSS] tag to the topic >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 24.08.2020 2:27, Ivan Ponomarev пишет: >>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I'd like to start a discussion for KIP-655. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> KIP-655: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I also opened a proof-of-concept PR for you to >>>>>>>>>>>>>>>>>>>> experiment >>>>>>>>>>>>>>>>>>>> with the API: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> PR#9210: https://github.com/apache/kafka/pull/9210 >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Ivan Ponomarev >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>> >>>>> >>>> >>>> >> >> > >