Just skimmed over the thread -- first of all, I am glad that we could merge KIP-418 and ship it :)
About the re-partitioning concerns, there are already two tickets for it: - https://issues.apache.org/jira/browse/KAFKA-4835 - https://issues.apache.org/jira/browse/KAFKA-10844 Thus, it seems best to exclude this topic from this KIP, and do a separate KIP for it (if necessary, we can "pause" this KIP until the repartition KIP is done). It's a long standing "issue" and we should resolve it in a general way I guess. (Did not yet ready all responses in detail yet, so keeping this comment short.) -Matthias On 6/2/21 6:35 AM, John Roesler wrote: > Thanks, Ivan! > > That sounds like a great plan to me. Two smaller KIPs are easier to agree on > than one big one. > > I agree hopping and sliding windows will actually have a duplicating effect. > We can avoid adding distinct() to the sliding window interface, but hopping > windows are just a different parameterization of epoch-aligned windows. It > seems we can’t do much about that except document the issue. > > Thanks, > John > > On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote: >> Hi John! >> >> I think that your proposal is just fantastic, it simplifies things a lot! >> >> I also felt uncomfortable due to the fact that the proposed `distinct()` >> is not somewhere near `count()` and `reduce(..)`. But >> `selectKey(..).groupByKey().windowedBy(..).distinct()` didn't look like >> a correct option for me because of the issue with the unneeded >> repartitioning. >> >> The bold idea that we can just CANCEL the repartitioning didn't came to >> my mind. >> >> What seemed to me a single problem is in fact two unrelated problems: >> `distinct` operation and cancelling the unneeded repartitioning. >> >> > what if we introduce a parameter to `selectKey()` that specifies that >> the caller asserts that the new key does _not_ change the data partitioning? >> >> I think a more elegant solution would be not to add a new parameter to >> `selectKey` and all the other key-changing operations (`map`, >> `transform`, `flatMap`, ...), but add a new operator >> `KStream#cancelRepartitioning()` that resets `keyChangingOperation` flag >> for the upstream node. Of course, "use it only if you know what you're >> doing" warning is to be added. Well, it's a topic for a separate KIP! >> >> Concerning `distinct()`. If we use `XXXWindowedKStream` facilities, then >> changes to the API are minimally invasive: we're just adding >> `distinct()` to TimeWindowedKStream and SessionWindowedKStream, and >> that's all. >> >> We can now define `distinct` as an operation that returns only a first >> record that falls into a new window, and filters out all the other >> records that fall into an already existing window. BTW, we can mock the >> behaviour of such an operation with `TopologyTestDriver` using >> `reduce((l, r) -> STOP)`.filterNot((k, v)->STOP.equals(v)). ;-) >> >> Consider the following example (record times are in seconds): >> >> //three bursts of variously ordered records >> 4, 5, 6 >> 23, 22, 24 >> 34, 33, 32 >> //'late arrivals' >> 7, 22, 35 >> >> >> 1. 'Epoch-aligned deduplication' using tumbling windows: >> >> .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct() >> >> produces >> >> (key@[00000/10000], 4) >> (key@[20000/30000], 23) >> (key@[30000/40000], 34) >> >> -- that is, one record per epoch-aligned window. >> >> 2. Hopping and sliding windows do not make much sense here, because they >> produce multiple intersected windows, so that one record can be >> multiplied, but we want deduplication. >> >> 3. SessionWindows work for 'data-aligned deduplication'. >> >> .groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct() >> >> >> >> produces only >> >> ([key@4000/4000], 4) >> ([key@23000/23000], 23) >> >> because all the records bigger than 7 are stuck together in one session. >> Setting inactivity gap to 9 seconds will return three records: >> >> ([key@4000/4000], 4) >> ([key@23000/23000], 23) >> ([key@34000/34000], 34) >> >> WDYT? If you like this variant, I will re-write KIP-655 and propose a >> separate KIP for `cancelRepartitioning` (or whatever name we will choose >> for it). >> >> Regards, >> >> Ivan >> >> >> 24.05.2021 22:32, John Roesler пишет: >>> Hey there, Ivan! >>> >>> In typical fashion, I'm going to make a somewhat outlandish >>> proposal. I'm hoping that we can side-step some of the >>> complications that have arisen. Please bear with me. >>> >>> It seems like `distinct()` is not fundamentally unlike other windowed >>> "aggregation" operations. Your concern about unnecessary >>> repartitioning seems to apply just as well to `count()` as to `distinct()`. >>> This has come up before, but I don't remember when: what if we >>> introduce a parameter to `selectKey()` that specifies that the caller >>> asserts that the new key does _not_ change the data partitioning? >>> The docs on that parameter would of course spell out all the "rights >>> and responsibilities" of setting it. >>> >>> In that case, we could indeed get back to >>> `selectKey(A).windowBy(B).distinct(...)`, where we get to compose the >>> key mapper and the windowing function without having to carve out >>> a separate domain just for `distinct()`. All the rest of the KStream >>> operations would also benefit. >>> >>> What do you think? >>> >>> Thanks, >>> John >>> >>> On Sun, May 23, 2021, at 08:09, Ivan Ponomarev wrote: >>>> Hello everyone, >>>> >>>> let me revive the discussion for KIP-655. Now I have some time again and >>>> I'm eager to finalize this. >>>> >>>> Based on what was already discussed, I think that we can split the >>>> discussion into three topics for our convenience. >>>> >>>> The three topics are: >>>> >>>> - idExtractor (how should we extract the deduplication key for the record) >>>> >>>> - timeWindows (what time windows should we use) >>>> >>>> - miscellaneous (naming etc.) >>>> >>>> ---- idExtractor ---- >>>> >>>> Original proposal: use (k, v) -> f(k, v) mapper, defaulting to (k, v) -> >>>> k. The drawback here is that we must warn the user to choose such a >>>> function that sets different IDs for records from different partitions, >>>> otherwise same IDs might be not co-partitioned (and not deduplicated as >>>> a result). Additional concern: what should we do when this function >>>> returns null? >>>> >>>> Matthias proposed key-only deduplication: that is, no idExtractor at >>>> all, and if we want to use `distinct` for a particular identifier, we >>>> must `selectKey()` before. The drawback of this approach is that we will >>>> always have repartitioning after the key selection, while in practice >>>> repartitioning will not always be necessary (for example, when the data >>>> stream is such that different values infer different keys). >>>> >>>> So here we have a 'safety vs. performance' trade-off. But 'safe' variant >>>> is also not very convenient for developers, since we're forcing them to >>>> change the structure of their records. >>>> >>>> A 'golden mean' here might be using composite ID with its first >>>> component equals to k and its second component equals to some f(v) (f >>>> defaults to v -> null, and null value returned by f(v) means >>>> 'deduplicate by the key only'). The nuance here is that we will have >>>> serializers only for types of k and f(v), and we must correctly >>>> serialize a tuple (k, f(v)), but of course this is doable. >>>> >>>> What do you think? >>>> >>>> ---- timeWindows ---- >>>> >>>> Originally I proposed TimeWindows only just because they solved my >>>> particular case :-) but agree with Matthias' and Sophie's objections. >>>> >>>> I like the Sophie's point: we need both epoch-aligned and data-aligned >>>> windows. IMO this is absolutely correct: "data-aligned is useful for >>>> example when you know that a large number of updates to a single key >>>> will occur in short bursts, and epoch-aligned when you specifically want >>>> to get just a single update per discrete time interval." >>>> >>>> I just cannot agree right away with Sophie's >>>> .groupByKey().windowedBy(...).distinct() proposal, as it implies the >>>> key-only deduplication -- see the previous topic. >>>> >>>> Epoch-aligned windows are very simple: they should forward only one >>>> record per enumerated time window. TimeWindows are exactly what we want >>>> here. I mentioned in the KIP both tumbling and hopping windows just >>>> because both are possible for TimeWindows, but indeed I don't see any >>>> real use case for hopping windows, only tumbling windows make sence IMO. >>>> >>>> For data-aligned windows SlidingWindow interface seems to be a nearly >>>> valid choice. Nearly. It should forward a record once when it's first >>>> seen, and then not again for any identical records that fall into the >>>> next N timeUnits. However, we cannot reuse SlidingWindow as is, because >>>> just as Matthias noted, SlidingWindows go backward in time, while we >>>> need a windows that go forward in time, and are not opened while records >>>> fall into an already existing window. We definitely should make our own >>>> implementation, maybe we should call it ExpirationWindow? WDYT? >>>> >>>> >>>> ---- miscellaneous ---- >>>> >>>> Persistent/in-memory stores. Matthias proposed to pass Materialized >>>> parameter next to DistinctParameters (and this is necessary, because we >>>> will need to provide a serializer for extracted id). This is absolutely >>>> valid point, I agree and I will fix it in the KIP. >>>> >>>> Naming. Sophie noted that the Streams DSL operators are typically named >>>> as verbs, so she proposes `deduplicate` in favour of `distinct`. I think >>>> that while it's important to stick to the naming conventions, it is also >>>> important to think of the experience of those who come from different >>>> stacks/technologies. People who are familiar with SQL and Java Streams >>>> API must know for sure what does 'distinct' mean, while data >>>> deduplication in general is a more complex task and thus `deduplicate` >>>> might be misleading. But I'm ready to be convinced if the majority >>>> thinks otherwise. >>>> >>>> >>>> Regards, >>>> >>>> Ivan >>>> >>>> >>>> >>>> 14.09.2020 21:31, Sophie Blee-Goldman пишет: >>>>> Hey all, >>>>> >>>>> I'm not convinced either epoch-aligned or data-aligned will fit all >>>>> possible use cases. >>>>> Both seem totally reasonable to me: data-aligned is useful for example >>>>> when >>>>> you know >>>>> that a large number of updates to a single key will occur in short bursts, >>>>> and epoch- >>>>> aligned when you specifically want to get just a single update per >>>>> discrete >>>>> time >>>>> interval. >>>>> >>>>> Going a step further, though, what if you want just a single update per >>>>> calendar >>>>> month, or per year with accounting for leap years? Neither of those are >>>>> serviced that >>>>> well by the existing Windows specification to windowed aggregations, a >>>>> well-known >>>>> limitation of the current API. There is actually a KIP >>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface> >>>>> going >>>>> on in parallel to fix this >>>>> exact issue and make the windowing interface much more flexible. Maybe >>>>> instead >>>>> of re-implementing this windowing interface in a similarly limited fashion >>>>> for the >>>>> Distinct operator, we could leverage it here and get all the benefits >>>>> coming with >>>>> KIP-645. >>>>> >>>>> Specifically, I'm proposing to remove the TimeWindows/etc config from the >>>>> DistinctParameters class, and move the distinct() method from the KStream >>>>> interface >>>>> to the TimeWindowedKStream interface. Since it's semantically similar to a >>>>> kind of >>>>> windowed aggregation, it makes sense to align it with the existing >>>>> windowing >>>>> framework, ie: >>>>> >>>>> inputStream >>>>> .groupKyKey() >>>>> .windowedBy() >>>>> .distinct() >>>>> >>>>> Then we could use data-aligned windows if SlidingWindows is specified in >>>>> the >>>>> windowedBy(), and epoch-aligned (or some other kind of enumerable window) >>>>> if a Windows is specified in windowedBy() (or an >>>>> EnumerableWindowDefinition >>>>> once KIP-645 is implemented to replace Windows). >>>>> >>>>> *SlidingWindows*: should forward a record once when it's first seen, and >>>>> then not again >>>>> for any identical records that fall into the next N timeUnits. This >>>>> includes out-of-order >>>>> records, ie if you have a SlidingWindows of size 10s and process records >>>>> at >>>>> time >>>>> 15s, 20s, 14s then you would just forward the one at 15s. Presumably, if >>>>> you're >>>>> using SlidingWindows, you don't care about what falls into exact time >>>>> boxes, you just >>>>> want to deduplicate. If you do care about exact time boxing then you >>>>> should >>>>> use... >>>>> >>>>> *EnumerableWindowDefinition* (eg *TimeWindows*): should forward only one >>>>> record >>>>> per enumerated time window. If you get a records at 15s, 20s,14s where the >>>>> windows >>>>> are enumerated at [5,14], [15, 24], etc then you forward the record at 15s >>>>> and also >>>>> the record at 14s >>>>> >>>>> Just an idea: not sure if the impedance mismatch would throw users off >>>>> since the >>>>> semantics of the distinct windows are slightly different than in the >>>>> aggregations. >>>>> But if we don't fit this into the existing windowed framework, then we >>>>> shouldn't use >>>>> any existing Windows-type classes at all, imo. ie we should create a new >>>>> DistinctWindows config class, similar to how stream-stream joins get their >>>>> own >>>>> JoinWindows class >>>>> >>>>> I also think that non-windowed deduplication could be useful, in which >>>>> case >>>>> we >>>>> would want to also have the distinct() operator on the KStream interface. >>>>> >>>>> >>>>> One quick note regarding the naming: it seems like the Streams DSL >>>>> operators >>>>> are typically named as verbs rather than adjectives, for example. >>>>> #suppress >>>>> or >>>>> #aggregate. I get that there's some precedent for 'distinct' >>>>> specifically, >>>>> but >>>>> maybe something like 'deduplicate' would be more appropriate for the >>>>> Streams >>>>> API. >>>>> >>>>> WDYT? >>>>> >>>>> >>>>> On Mon, Sep 14, 2020 at 10:04 AM Ivan Ponomarev >>>>> <iponoma...@mail.ru.invalid> >>>>> wrote: >>>>> >>>>>> Hi Matthias, >>>>>> >>>>>> Thanks for your review! It made me think deeper, and indeed I understood >>>>>> that I was missing some important details. >>>>>> >>>>>> To simplify, let me explain my particular use case first so I can refer >>>>>> to it later. >>>>>> >>>>>> We have a system that collects information about ongoing live sporting >>>>>> events from different sources. The information sources have their IDs >>>>>> and these IDs are keys of the stream. Each source emits messages >>>>>> concerning sporting events, and we can have many messages about each >>>>>> sporing event from each source. Event ID is extracted from the message. >>>>>> >>>>>> We need a database of event IDs that were reported at least once by each >>>>>> source (important: events from different sources are considered to be >>>>>> different entities). The requirements are: >>>>>> >>>>>> 1) each new event ID should be written to the database as soon as >>>>>> possible >>>>>> >>>>>> 2) although it's ok and sometimes even desired to repeat the >>>>>> notification about already known event ID, but we wouldn’t like our >>>>>> database to be bothered by the same event ID more often than once in a >>>>>> given period of time (say, 15 minutes). >>>>>> >>>>>> With this example in mind let me answer your questions >>>>>> >>>>>> > (1) Using the `idExtractor` has the issue that data might not be >>>>>> > co-partitioned as you mentioned in the KIP. Thus, I am wondering if >>>>>> it >>>>>> > might be better to do deduplication only on the key? If one sets a >>>>>> new >>>>>> > key upstream (ie, extracts the deduplication id into the key), the >>>>>> > `distinct` operator could automatically repartition the data and >>>>>> thus we >>>>>> > would avoid user errors. >>>>>> >>>>>> Of course with 'key-only' deduplication + autorepartitioning we will >>>>>> never cause problems with co-partitioning. But in practice, we often >>>>>> don't need repartitioning even if 'dedup ID' is different from the key, >>>>>> like in my example above. So here we have a sort of 'performance vs >>>>>> security' tradeoff. >>>>>> >>>>>> The 'golden middle way' here can be the following: we can form a >>>>>> deduplication ID as KEY + separator + idExtractor(VALUE). In case >>>>>> idExtractor is not provided, we deduplicate by key only (as in original >>>>>> proposal). Then idExtractor transforms only the value (and not the key) >>>>>> and its result is appended to the key. Records from different partitions >>>>>> will inherently have different deduplication IDs and all the data will >>>>>> be co-partitioned. As with any stateful operation, we will repartition >>>>>> the topic in case the key was changed upstream, but only in this case, >>>>>> thus avoiding unnecessary repartitioning. My example above fits this >>>>>> perfectly. >>>>>> >>>>>> > (2) What is the motivation for allowing the `idExtractor` to return >>>>>> > `null`? Might be good to have some use-case examples for this >>>>>> feature. >>>>>> >>>>>> Can't think of any use-cases. As it often happens, it's just came with a >>>>>> copy-paste from StackOverflow -- see Michael Noll's answer here: >>>>>> >>>>>> https://stackoverflow.com/questions/55803210/how-to-handle-duplicate-messages-using-kafka-streaming-dsl-functions >>>>>> >>>>>> But, jokes aside, we'll have to decide what to do with nulls. If we >>>>>> accept the above proposal of having deduplication ID as KEY + postfix, >>>>>> then null can be treated as no postfix at all. If we don't accept this >>>>>> approach, then treating nulls as 'no-deduplication' seems to be a >>>>>> reasonable assumption (we can't get or put null as a key to a KV store, >>>>>> so a record with null ID is always going to look 'new' for us). >>>>>> >>>>>> >>>>>> > (2) Is using a `TimeWindow` really what we want? I was wondering if >>>>>> a >>>>>> > `SlidingWindow` might be better? Or maybe we need a new type of >>>>>> window? >>>>>> >>>>>> Agree. It's probably not what we want. Once I thought that reusing >>>>>> TimeWindow is a clever idea, now I don't. >>>>>> >>>>>> Do we need epoch alignment in our use case? No, we don't, and I don't >>>>>> know if anyone going to need this. Epoch alignment is good for >>>>>> aggregation, but deduplication is a different story. >>>>>> >>>>>> Let me describe the semantic the way I see it now and tell me what you >>>>>> think: >>>>>> >>>>>> - the only parameter that defines the deduplication logic is 'expiration >>>>>> period' >>>>>> >>>>>> - when a deduplication ID arrives and we cannot find it in the store, we >>>>>> forward the message downstream and store the ID + its timestamp. >>>>>> >>>>>> - when an out-of-order ID arrives with an older timestamp and we find a >>>>>> 'fresher' record, we do nothing and don't forward the message (??? OR >>>>>> NOT? In what case would we want to forward an out-of-order message?) >>>>>> >>>>>> - when an ID with fresher timestamp arrives we check if it falls into >>>>>> the expiration period and either forward it or not, but in both cases we >>>>>> update the timestamp of the message in the store >>>>>> >>>>>> - the WindowStore retention mechanism should clean up very old records >>>>>> in order not to run out of space. >>>>>> >>>>>> > (3) `isPersistent` -- instead of using this flag, it seems better to >>>>>> > allow users to pass in a `Materialized` parameter next to >>>>>> > `DistinctParameters` to configure the state store? >>>>>> >>>>>> Fully agree! Users might also want to change the retention time. >>>>>> >>>>>> > (4) I am wondering if we should really have 4 overloads for >>>>>> > `DistinctParameters.with()`? It might be better to have one overload >>>>>> > with all require parameters, and add optional parameters using the >>>>>> > builder pattern? This seems to follow the DSL Grammer proposal. >>>>>> >>>>>> Oh, I can explain. We can't fully rely on the builder pattern because of >>>>>> Java type inference limitations. We have to provide type parameters to >>>>>> the builder methods or the code won't compile: see e. g. this >>>>>> https://twitter.com/inponomarev/status/1265053286933159938 and following >>>>>> discussion with Tagir Valeev. >>>>>> >>>>>> When we came across the similar difficulties in KIP-418, we finally >>>>>> decided to add all the necessary overloads to parameter class. So I just >>>>>> reproduced that approach here. >>>>>> >>>>>> > (5) Even if it might be an implementation detail (and maybe the KIP >>>>>> > itself does not need to mention it), can you give a high level >>>>>> overview >>>>>> > how you intent to implement it (that would be easier to grog, >>>>>> compared >>>>>> > to reading the PR). >>>>>> >>>>>> Well as with any operation on KStreamImpl level I'm building a store and >>>>>> a processor node. >>>>>> >>>>>> KStreamDistinct class is going to be the ProcessorSupplier, with the >>>>>> logic regarding the forwarding/muting of the records located in >>>>>> KStreamDistinct.KStreamDistinctProcessor#process >>>>>> >>>>>> ---- >>>>>> >>>>>> Matthias, if you are still reading this :-) a gentle reminder: my PR for >>>>>> already accepted KIP-418 is still waiting for your review. I think it's >>>>>> better for me to finalize at least one KIP before proceeding to a new >>>>>> one :-) >>>>>> >>>>>> Regards, >>>>>> >>>>>> Ivan >>>>>> >>>>>> 03.09.2020 4:20, Matthias J. Sax пишет: >>>>>>> Thanks for the KIP Ivan. Having a built-in deduplication operator is for >>>>>>> sure a good addition. >>>>>>> >>>>>>> Couple of questions: >>>>>>> >>>>>>> (1) Using the `idExtractor` has the issue that data might not be >>>>>>> co-partitioned as you mentioned in the KIP. Thus, I am wondering if it >>>>>>> might be better to do deduplication only on the key? If one sets a new >>>>>>> key upstream (ie, extracts the deduplication id into the key), the >>>>>>> `distinct` operator could automatically repartition the data and thus we >>>>>>> would avoid user errors. >>>>>>> >>>>>>> (2) What is the motivation for allowing the `idExtractor` to return >>>>>>> `null`? Might be good to have some use-case examples for this feature. >>>>>>> >>>>>>> (2) Is using a `TimeWindow` really what we want? I was wondering if a >>>>>>> `SlidingWindow` might be better? Or maybe we need a new type of window? >>>>>>> >>>>>>> It would be helpful if you could describe potential use cases in more >>>>>>> detail. -- I am mainly wondering about hopping window? Each record would >>>>>>> always falls into multiple window and thus would be emitted multiple >>>>>>> times, ie, each time the window closes. Is this really a valid use case? >>>>>>> >>>>>>> It seems that for de-duplication, one wants to have some "expiration >>>>>>> time", ie, for each ID, deduplicate all consecutive records with the >>>>>>> same ID and emit the first record after the "expiration time" passed. In >>>>>>> terms of a window, this would mean that the window starts at `r.ts` and >>>>>>> ends at `r.ts + windowSize`, ie, the window is aligned to the data. >>>>>>> TimeWindows are aligned to the epoch though. While `SlidingWindows` also >>>>>>> align to the data, for the aggregation use-case they go backward in >>>>>>> time, while we need a window that goes forward in time. It's an open >>>>>>> question if we can re-purpose `SlidingWindows` -- it might be ok the >>>>>>> make the alignment (into the past vs into the future) an operator >>>>>>> dependent behavior? >>>>>>> >>>>>>> (3) `isPersistent` -- instead of using this flag, it seems better to >>>>>>> allow users to pass in a `Materialized` parameter next to >>>>>>> `DistinctParameters` to configure the state store? >>>>>>> >>>>>>> (4) I am wondering if we should really have 4 overloads for >>>>>>> `DistinctParameters.with()`? It might be better to have one overload >>>>>>> with all require parameters, and add optional parameters using the >>>>>>> builder pattern? This seems to follow the DSL Grammer proposal. >>>>>>> >>>>>>> (5) Even if it might be an implementation detail (and maybe the KIP >>>>>>> itself does not need to mention it), can you give a high level overview >>>>>>> how you intent to implement it (that would be easier to grog, compared >>>>>>> to reading the PR). >>>>>>> >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> On 8/23/20 4:29 PM, Ivan Ponomarev wrote: >>>>>>>> Sorry, I forgot to add [DISCUSS] tag to the topic >>>>>>>> >>>>>>>> 24.08.2020 2:27, Ivan Ponomarev пишет: >>>>>>>>> Hello, >>>>>>>>> >>>>>>>>> I'd like to start a discussion for KIP-655. >>>>>>>>> >>>>>>>>> KIP-655: >>>>>>>>> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API >>>>>>>>> >>>>>>>>> >>>>>>>>> I also opened a proof-of-concept PR for you to experiment with the >>>>>>>>> API: >>>>>>>>> >>>>>>>>> PR#9210: https://github.com/apache/kafka/pull/9210 >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> >>>>>>>>> Ivan Ponomarev >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >> >>