Hi Ivan, Sorry for the silence!
I have just re-read the proposal. To summarize, you are now only proposing the zero-arg distict() method to be added to TimeWindowedKStream and SessionWindowedKStream, right? I’m in favor of this proposal. Thanks, John On Sat, Jul 10, 2021, at 10:18, Ivan Ponomarev wrote: > Hello everyone, > > I would like to remind you about KIP-655 and KIP-759 just in case they > got lost in your inbox. > > Now the initial proposal is split into two independent and smaller ones, > so it must be easier to review them. Of course, if you have time. > > Regards, > > Ivan > > > 24.06.2021 18:11, Ivan Ponomarev пишет: > > Hello all, > > > > I have rewritten the KIP-655 summarizing what was agreed upon during > > this discussion (now the proposal is much simpler and less invasive). > > > > I have also created KIP-759 (cancelRepartition operation) and started a > > discussion for it. > > > > Regards, > > > > Ivan. > > > > > > > > 04.06.2021 8:15, Matthias J. Sax пишет: > >> Just skimmed over the thread -- first of all, I am glad that we could > >> merge KIP-418 and ship it :) > >> > >> About the re-partitioning concerns, there are already two tickets for it: > >> > >> - https://issues.apache.org/jira/browse/KAFKA-4835 > >> - https://issues.apache.org/jira/browse/KAFKA-10844 > >> > >> Thus, it seems best to exclude this topic from this KIP, and do a > >> separate KIP for it (if necessary, we can "pause" this KIP until the > >> repartition KIP is done). It's a long standing "issue" and we should > >> resolve it in a general way I guess. > >> > >> (Did not yet ready all responses in detail yet, so keeping this comment > >> short.) > >> > >> > >> -Matthias > >> > >> On 6/2/21 6:35 AM, John Roesler wrote: > >>> Thanks, Ivan! > >>> > >>> That sounds like a great plan to me. Two smaller KIPs are easier to > >>> agree on than one big one. > >>> > >>> I agree hopping and sliding windows will actually have a duplicating > >>> effect. We can avoid adding distinct() to the sliding window > >>> interface, but hopping windows are just a different parameterization > >>> of epoch-aligned windows. It seems we can’t do much about that except > >>> document the issue. > >>> > >>> Thanks, > >>> John > >>> > >>> On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote: > >>>> Hi John! > >>>> > >>>> I think that your proposal is just fantastic, it simplifies things a > >>>> lot! > >>>> > >>>> I also felt uncomfortable due to the fact that the proposed > >>>> `distinct()` > >>>> is not somewhere near `count()` and `reduce(..)`. But > >>>> `selectKey(..).groupByKey().windowedBy(..).distinct()` didn't look like > >>>> a correct option for me because of the issue with the unneeded > >>>> repartitioning. > >>>> > >>>> The bold idea that we can just CANCEL the repartitioning didn't came to > >>>> my mind. > >>>> > >>>> What seemed to me a single problem is in fact two unrelated problems: > >>>> `distinct` operation and cancelling the unneeded repartitioning. > >>>> > >>>> > what if we introduce a parameter to `selectKey()` that specifies > >>>> that > >>>> the caller asserts that the new key does _not_ change the data > >>>> partitioning? > >>>> > >>>> I think a more elegant solution would be not to add a new parameter to > >>>> `selectKey` and all the other key-changing operations (`map`, > >>>> `transform`, `flatMap`, ...), but add a new operator > >>>> `KStream#cancelRepartitioning()` that resets `keyChangingOperation` > >>>> flag > >>>> for the upstream node. Of course, "use it only if you know what you're > >>>> doing" warning is to be added. Well, it's a topic for a separate KIP! > >>>> > >>>> Concerning `distinct()`. If we use `XXXWindowedKStream` facilities, > >>>> then > >>>> changes to the API are minimally invasive: we're just adding > >>>> `distinct()` to TimeWindowedKStream and SessionWindowedKStream, and > >>>> that's all. > >>>> > >>>> We can now define `distinct` as an operation that returns only a first > >>>> record that falls into a new window, and filters out all the other > >>>> records that fall into an already existing window. BTW, we can mock the > >>>> behaviour of such an operation with `TopologyTestDriver` using > >>>> `reduce((l, r) -> STOP)`.filterNot((k, v)->STOP.equals(v)). ;-) > >>>> > >>>> Consider the following example (record times are in seconds): > >>>> > >>>> //three bursts of variously ordered records > >>>> 4, 5, 6 > >>>> 23, 22, 24 > >>>> 34, 33, 32 > >>>> //'late arrivals' > >>>> 7, 22, 35 > >>>> > >>>> > >>>> 1. 'Epoch-aligned deduplication' using tumbling windows: > >>>> > >>>> .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct() > >>>> > >>>> > >>>> > >>>> produces > >>>> > >>>> (key@[00000/10000], 4) > >>>> (key@[20000/30000], 23) > >>>> (key@[30000/40000], 34) > >>>> > >>>> -- that is, one record per epoch-aligned window. > >>>> > >>>> 2. Hopping and sliding windows do not make much sense here, because > >>>> they > >>>> produce multiple intersected windows, so that one record can be > >>>> multiplied, but we want deduplication. > >>>> > >>>> 3. SessionWindows work for 'data-aligned deduplication'. > >>>> > >>>> .groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct() > >>>> > >>>> > >>>> > >>>> > >>>> produces only > >>>> > >>>> ([key@4000/4000], 4) > >>>> ([key@23000/23000], 23) > >>>> > >>>> because all the records bigger than 7 are stuck together in one > >>>> session. > >>>> Setting inactivity gap to 9 seconds will return three records: > >>>> > >>>> ([key@4000/4000], 4) > >>>> ([key@23000/23000], 23) > >>>> ([key@34000/34000], 34) > >>>> > >>>> WDYT? If you like this variant, I will re-write KIP-655 and propose a > >>>> separate KIP for `cancelRepartitioning` (or whatever name we will > >>>> choose > >>>> for it). > >>>> > >>>> Regards, > >>>> > >>>> Ivan > >>>> > >>>> > >>>> 24.05.2021 22:32, John Roesler пишет: > >>>>> Hey there, Ivan! > >>>>> > >>>>> In typical fashion, I'm going to make a somewhat outlandish > >>>>> proposal. I'm hoping that we can side-step some of the > >>>>> complications that have arisen. Please bear with me. > >>>>> > >>>>> It seems like `distinct()` is not fundamentally unlike other windowed > >>>>> "aggregation" operations. Your concern about unnecessary > >>>>> repartitioning seems to apply just as well to `count()` as to > >>>>> `distinct()`. > >>>>> This has come up before, but I don't remember when: what if we > >>>>> introduce a parameter to `selectKey()` that specifies that the caller > >>>>> asserts that the new key does _not_ change the data partitioning? > >>>>> The docs on that parameter would of course spell out all the "rights > >>>>> and responsibilities" of setting it. > >>>>> > >>>>> In that case, we could indeed get back to > >>>>> `selectKey(A).windowBy(B).distinct(...)`, where we get to compose the > >>>>> key mapper and the windowing function without having to carve out > >>>>> a separate domain just for `distinct()`. All the rest of the KStream > >>>>> operations would also benefit. > >>>>> > >>>>> What do you think? > >>>>> > >>>>> Thanks, > >>>>> John > >>>>> > >>>>> On Sun, May 23, 2021, at 08:09, Ivan Ponomarev wrote: > >>>>>> Hello everyone, > >>>>>> > >>>>>> let me revive the discussion for KIP-655. Now I have some time > >>>>>> again and > >>>>>> I'm eager to finalize this. > >>>>>> > >>>>>> Based on what was already discussed, I think that we can split the > >>>>>> discussion into three topics for our convenience. > >>>>>> > >>>>>> The three topics are: > >>>>>> > >>>>>> - idExtractor (how should we extract the deduplication key for > >>>>>> the record) > >>>>>> > >>>>>> - timeWindows (what time windows should we use) > >>>>>> > >>>>>> - miscellaneous (naming etc.) > >>>>>> > >>>>>> ---- idExtractor ---- > >>>>>> > >>>>>> Original proposal: use (k, v) -> f(k, v) mapper, defaulting to (k, > >>>>>> v) -> > >>>>>> k. The drawback here is that we must warn the user to choose such a > >>>>>> function that sets different IDs for records from different > >>>>>> partitions, > >>>>>> otherwise same IDs might be not co-partitioned (and not > >>>>>> deduplicated as > >>>>>> a result). Additional concern: what should we do when this function > >>>>>> returns null? > >>>>>> > >>>>>> Matthias proposed key-only deduplication: that is, no idExtractor at > >>>>>> all, and if we want to use `distinct` for a particular identifier, we > >>>>>> must `selectKey()` before. The drawback of this approach is that > >>>>>> we will > >>>>>> always have repartitioning after the key selection, while in practice > >>>>>> repartitioning will not always be necessary (for example, when the > >>>>>> data > >>>>>> stream is such that different values infer different keys). > >>>>>> > >>>>>> So here we have a 'safety vs. performance' trade-off. But 'safe' > >>>>>> variant > >>>>>> is also not very convenient for developers, since we're forcing > >>>>>> them to > >>>>>> change the structure of their records. > >>>>>> > >>>>>> A 'golden mean' here might be using composite ID with its first > >>>>>> component equals to k and its second component equals to some f(v) (f > >>>>>> defaults to v -> null, and null value returned by f(v) means > >>>>>> 'deduplicate by the key only'). The nuance here is that we will have > >>>>>> serializers only for types of k and f(v), and we must correctly > >>>>>> serialize a tuple (k, f(v)), but of course this is doable. > >>>>>> > >>>>>> What do you think? > >>>>>> > >>>>>> ---- timeWindows ---- > >>>>>> > >>>>>> Originally I proposed TimeWindows only just because they solved my > >>>>>> particular case :-) but agree with Matthias' and Sophie's objections. > >>>>>> > >>>>>> I like the Sophie's point: we need both epoch-aligned and > >>>>>> data-aligned > >>>>>> windows. IMO this is absolutely correct: "data-aligned is useful for > >>>>>> example when you know that a large number of updates to a single key > >>>>>> will occur in short bursts, and epoch-aligned when you > >>>>>> specifically want > >>>>>> to get just a single update per discrete time interval." > >>>>>> > >>>>>> I just cannot agree right away with Sophie's > >>>>>> .groupByKey().windowedBy(...).distinct() proposal, as it implies the > >>>>>> key-only deduplication -- see the previous topic. > >>>>>> > >>>>>> Epoch-aligned windows are very simple: they should forward only one > >>>>>> record per enumerated time window. TimeWindows are exactly what we > >>>>>> want > >>>>>> here. I mentioned in the KIP both tumbling and hopping windows just > >>>>>> because both are possible for TimeWindows, but indeed I don't see any > >>>>>> real use case for hopping windows, only tumbling windows make > >>>>>> sence IMO. > >>>>>> > >>>>>> For data-aligned windows SlidingWindow interface seems to be a nearly > >>>>>> valid choice. Nearly. It should forward a record once when it's first > >>>>>> seen, and then not again for any identical records that fall into the > >>>>>> next N timeUnits. However, we cannot reuse SlidingWindow as is, > >>>>>> because > >>>>>> just as Matthias noted, SlidingWindows go backward in time, while we > >>>>>> need a windows that go forward in time, and are not opened while > >>>>>> records > >>>>>> fall into an already existing window. We definitely should make > >>>>>> our own > >>>>>> implementation, maybe we should call it ExpirationWindow? WDYT? > >>>>>> > >>>>>> > >>>>>> ---- miscellaneous ---- > >>>>>> > >>>>>> Persistent/in-memory stores. Matthias proposed to pass Materialized > >>>>>> parameter next to DistinctParameters (and this is necessary, > >>>>>> because we > >>>>>> will need to provide a serializer for extracted id). This is > >>>>>> absolutely > >>>>>> valid point, I agree and I will fix it in the KIP. > >>>>>> > >>>>>> Naming. Sophie noted that the Streams DSL operators are typically > >>>>>> named > >>>>>> as verbs, so she proposes `deduplicate` in favour of `distinct`. I > >>>>>> think > >>>>>> that while it's important to stick to the naming conventions, it > >>>>>> is also > >>>>>> important to think of the experience of those who come from different > >>>>>> stacks/technologies. People who are familiar with SQL and Java > >>>>>> Streams > >>>>>> API must know for sure what does 'distinct' mean, while data > >>>>>> deduplication in general is a more complex task and thus > >>>>>> `deduplicate` > >>>>>> might be misleading. But I'm ready to be convinced if the majority > >>>>>> thinks otherwise. > >>>>>> > >>>>>> > >>>>>> Regards, > >>>>>> > >>>>>> Ivan > >>>>>> > >>>>>> > >>>>>> > >>>>>> 14.09.2020 21:31, Sophie Blee-Goldman пишет: > >>>>>>> Hey all, > >>>>>>> > >>>>>>> I'm not convinced either epoch-aligned or data-aligned will fit all > >>>>>>> possible use cases. > >>>>>>> Both seem totally reasonable to me: data-aligned is useful for > >>>>>>> example when > >>>>>>> you know > >>>>>>> that a large number of updates to a single key will occur in > >>>>>>> short bursts, > >>>>>>> and epoch- > >>>>>>> aligned when you specifically want to get just a single update > >>>>>>> per discrete > >>>>>>> time > >>>>>>> interval. > >>>>>>> > >>>>>>> Going a step further, though, what if you want just a single > >>>>>>> update per > >>>>>>> calendar > >>>>>>> month, or per year with accounting for leap years? Neither of > >>>>>>> those are > >>>>>>> serviced that > >>>>>>> well by the existing Windows specification to windowed > >>>>>>> aggregations, a > >>>>>>> well-known > >>>>>>> limitation of the current API. There is actually a KIP > >>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface> > >>>>>>> > >>>>>>> > >>>>>>> going > >>>>>>> on in parallel to fix this > >>>>>>> exact issue and make the windowing interface much more flexible. > >>>>>>> Maybe > >>>>>>> instead > >>>>>>> of re-implementing this windowing interface in a similarly > >>>>>>> limited fashion > >>>>>>> for the > >>>>>>> Distinct operator, we could leverage it here and get all the > >>>>>>> benefits > >>>>>>> coming with > >>>>>>> KIP-645. > >>>>>>> > >>>>>>> Specifically, I'm proposing to remove the TimeWindows/etc config > >>>>>>> from the > >>>>>>> DistinctParameters class, and move the distinct() method from the > >>>>>>> KStream > >>>>>>> interface > >>>>>>> to the TimeWindowedKStream interface. Since it's semantically > >>>>>>> similar to a > >>>>>>> kind of > >>>>>>> windowed aggregation, it makes sense to align it with the > >>>>>>> existing windowing > >>>>>>> framework, ie: > >>>>>>> > >>>>>>> inputStream > >>>>>>> .groupKyKey() > >>>>>>> .windowedBy() > >>>>>>> .distinct() > >>>>>>> > >>>>>>> Then we could use data-aligned windows if SlidingWindows is > >>>>>>> specified in > >>>>>>> the > >>>>>>> windowedBy(), and epoch-aligned (or some other kind of enumerable > >>>>>>> window) > >>>>>>> if a Windows is specified in windowedBy() (or an > >>>>>>> EnumerableWindowDefinition > >>>>>>> once KIP-645 is implemented to replace Windows). > >>>>>>> > >>>>>>> *SlidingWindows*: should forward a record once when it's first > >>>>>>> seen, and > >>>>>>> then not again > >>>>>>> for any identical records that fall into the next N timeUnits. This > >>>>>>> includes out-of-order > >>>>>>> records, ie if you have a SlidingWindows of size 10s and process > >>>>>>> records at > >>>>>>> time > >>>>>>> 15s, 20s, 14s then you would just forward the one at 15s. > >>>>>>> Presumably, if > >>>>>>> you're > >>>>>>> using SlidingWindows, you don't care about what falls into exact > >>>>>>> time > >>>>>>> boxes, you just > >>>>>>> want to deduplicate. If you do care about exact time boxing then > >>>>>>> you should > >>>>>>> use... > >>>>>>> > >>>>>>> *EnumerableWindowDefinition* (eg *TimeWindows*): should forward > >>>>>>> only one > >>>>>>> record > >>>>>>> per enumerated time window. If you get a records at 15s, 20s,14s > >>>>>>> where the > >>>>>>> windows > >>>>>>> are enumerated at [5,14], [15, 24], etc then you forward the > >>>>>>> record at 15s > >>>>>>> and also > >>>>>>> the record at 14s > >>>>>>> > >>>>>>> Just an idea: not sure if the impedance mismatch would throw > >>>>>>> users off > >>>>>>> since the > >>>>>>> semantics of the distinct windows are slightly different than in the > >>>>>>> aggregations. > >>>>>>> But if we don't fit this into the existing windowed framework, > >>>>>>> then we > >>>>>>> shouldn't use > >>>>>>> any existing Windows-type classes at all, imo. ie we should > >>>>>>> create a new > >>>>>>> DistinctWindows config class, similar to how stream-stream joins > >>>>>>> get their > >>>>>>> own > >>>>>>> JoinWindows class > >>>>>>> > >>>>>>> I also think that non-windowed deduplication could be useful, in > >>>>>>> which case > >>>>>>> we > >>>>>>> would want to also have the distinct() operator on the KStream > >>>>>>> interface. > >>>>>>> > >>>>>>> > >>>>>>> One quick note regarding the naming: it seems like the Streams > >>>>>>> DSL operators > >>>>>>> are typically named as verbs rather than adjectives, for example. > >>>>>>> #suppress > >>>>>>> or > >>>>>>> #aggregate. I get that there's some precedent for 'distinct' > >>>>>>> specifically, > >>>>>>> but > >>>>>>> maybe something like 'deduplicate' would be more appropriate for > >>>>>>> the Streams > >>>>>>> API. > >>>>>>> > >>>>>>> WDYT? > >>>>>>> > >>>>>>> > >>>>>>> On Mon, Sep 14, 2020 at 10:04 AM Ivan Ponomarev > >>>>>>> <iponoma...@mail.ru.invalid> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi Matthias, > >>>>>>>> > >>>>>>>> Thanks for your review! It made me think deeper, and indeed I > >>>>>>>> understood > >>>>>>>> that I was missing some important details. > >>>>>>>> > >>>>>>>> To simplify, let me explain my particular use case first so I > >>>>>>>> can refer > >>>>>>>> to it later. > >>>>>>>> > >>>>>>>> We have a system that collects information about ongoing live > >>>>>>>> sporting > >>>>>>>> events from different sources. The information sources have > >>>>>>>> their IDs > >>>>>>>> and these IDs are keys of the stream. Each source emits messages > >>>>>>>> concerning sporting events, and we can have many messages about > >>>>>>>> each > >>>>>>>> sporing event from each source. Event ID is extracted from the > >>>>>>>> message. > >>>>>>>> > >>>>>>>> We need a database of event IDs that were reported at least once > >>>>>>>> by each > >>>>>>>> source (important: events from different sources are considered > >>>>>>>> to be > >>>>>>>> different entities). The requirements are: > >>>>>>>> > >>>>>>>> 1) each new event ID should be written to the database as soon > >>>>>>>> as possible > >>>>>>>> > >>>>>>>> 2) although it's ok and sometimes even desired to repeat the > >>>>>>>> notification about already known event ID, but we wouldn’t like our > >>>>>>>> database to be bothered by the same event ID more often than > >>>>>>>> once in a > >>>>>>>> given period of time (say, 15 minutes). > >>>>>>>> > >>>>>>>> With this example in mind let me answer your questions > >>>>>>>> > >>>>>>>> > (1) Using the `idExtractor` has the issue that data might > >>>>>>>> not be > >>>>>>>> > co-partitioned as you mentioned in the KIP. Thus, I am > >>>>>>>> wondering if it > >>>>>>>> > might be better to do deduplication only on the key? If > >>>>>>>> one sets a new > >>>>>>>> > key upstream (ie, extracts the deduplication id into the > >>>>>>>> key), the > >>>>>>>> > `distinct` operator could automatically repartition the > >>>>>>>> data and thus we > >>>>>>>> > would avoid user errors. > >>>>>>>> > >>>>>>>> Of course with 'key-only' deduplication + autorepartitioning we > >>>>>>>> will > >>>>>>>> never cause problems with co-partitioning. But in practice, we > >>>>>>>> often > >>>>>>>> don't need repartitioning even if 'dedup ID' is different from > >>>>>>>> the key, > >>>>>>>> like in my example above. So here we have a sort of 'performance vs > >>>>>>>> security' tradeoff. > >>>>>>>> > >>>>>>>> The 'golden middle way' here can be the following: we can form a > >>>>>>>> deduplication ID as KEY + separator + idExtractor(VALUE). In case > >>>>>>>> idExtractor is not provided, we deduplicate by key only (as in > >>>>>>>> original > >>>>>>>> proposal). Then idExtractor transforms only the value (and not > >>>>>>>> the key) > >>>>>>>> and its result is appended to the key. Records from different > >>>>>>>> partitions > >>>>>>>> will inherently have different deduplication IDs and all the > >>>>>>>> data will > >>>>>>>> be co-partitioned. As with any stateful operation, we will > >>>>>>>> repartition > >>>>>>>> the topic in case the key was changed upstream, but only in this > >>>>>>>> case, > >>>>>>>> thus avoiding unnecessary repartitioning. My example above fits > >>>>>>>> this > >>>>>>>> perfectly. > >>>>>>>> > >>>>>>>> > (2) What is the motivation for allowing the `idExtractor` > >>>>>>>> to return > >>>>>>>> > `null`? Might be good to have some use-case examples for > >>>>>>>> this feature. > >>>>>>>> > >>>>>>>> Can't think of any use-cases. As it often happens, it's just > >>>>>>>> came with a > >>>>>>>> copy-paste from StackOverflow -- see Michael Noll's answer here: > >>>>>>>> > >>>>>>>> https://stackoverflow.com/questions/55803210/how-to-handle-duplicate-messages-using-kafka-streaming-dsl-functions > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> But, jokes aside, we'll have to decide what to do with nulls. If we > >>>>>>>> accept the above proposal of having deduplication ID as KEY + > >>>>>>>> postfix, > >>>>>>>> then null can be treated as no postfix at all. If we don't > >>>>>>>> accept this > >>>>>>>> approach, then treating nulls as 'no-deduplication' seems to be a > >>>>>>>> reasonable assumption (we can't get or put null as a key to a KV > >>>>>>>> store, > >>>>>>>> so a record with null ID is always going to look 'new' for us). > >>>>>>>> > >>>>>>>> > >>>>>>>> > (2) Is using a `TimeWindow` really what we want? I was > >>>>>>>> wondering if a > >>>>>>>> > `SlidingWindow` might be better? Or maybe we need a new > >>>>>>>> type of window? > >>>>>>>> > >>>>>>>> Agree. It's probably not what we want. Once I thought that reusing > >>>>>>>> TimeWindow is a clever idea, now I don't. > >>>>>>>> > >>>>>>>> Do we need epoch alignment in our use case? No, we don't, and I > >>>>>>>> don't > >>>>>>>> know if anyone going to need this. Epoch alignment is good for > >>>>>>>> aggregation, but deduplication is a different story. > >>>>>>>> > >>>>>>>> Let me describe the semantic the way I see it now and tell me > >>>>>>>> what you > >>>>>>>> think: > >>>>>>>> > >>>>>>>> - the only parameter that defines the deduplication logic is > >>>>>>>> 'expiration > >>>>>>>> period' > >>>>>>>> > >>>>>>>> - when a deduplication ID arrives and we cannot find it in the > >>>>>>>> store, we > >>>>>>>> forward the message downstream and store the ID + its timestamp. > >>>>>>>> > >>>>>>>> - when an out-of-order ID arrives with an older timestamp and we > >>>>>>>> find a > >>>>>>>> 'fresher' record, we do nothing and don't forward the message > >>>>>>>> (??? OR > >>>>>>>> NOT? In what case would we want to forward an out-of-order > >>>>>>>> message?) > >>>>>>>> > >>>>>>>> - when an ID with fresher timestamp arrives we check if it falls > >>>>>>>> into > >>>>>>>> the expiration period and either forward it or not, but in both > >>>>>>>> cases we > >>>>>>>> update the timestamp of the message in the store > >>>>>>>> > >>>>>>>> - the WindowStore retention mechanism should clean up very old > >>>>>>>> records > >>>>>>>> in order not to run out of space. > >>>>>>>> > >>>>>>>> > (3) `isPersistent` -- instead of using this flag, it seems > >>>>>>>> better to > >>>>>>>> > allow users to pass in a `Materialized` parameter next to > >>>>>>>> > `DistinctParameters` to configure the state store? > >>>>>>>> > >>>>>>>> Fully agree! Users might also want to change the retention time. > >>>>>>>> > >>>>>>>> > (4) I am wondering if we should really have 4 overloads for > >>>>>>>> > `DistinctParameters.with()`? It might be better to have > >>>>>>>> one overload > >>>>>>>> > with all require parameters, and add optional parameters > >>>>>>>> using the > >>>>>>>> > builder pattern? This seems to follow the DSL Grammer > >>>>>>>> proposal. > >>>>>>>> > >>>>>>>> Oh, I can explain. We can't fully rely on the builder pattern > >>>>>>>> because of > >>>>>>>> Java type inference limitations. We have to provide type > >>>>>>>> parameters to > >>>>>>>> the builder methods or the code won't compile: see e. g. this > >>>>>>>> https://twitter.com/inponomarev/status/1265053286933159938 and > >>>>>>>> following > >>>>>>>> discussion with Tagir Valeev. > >>>>>>>> > >>>>>>>> When we came across the similar difficulties in KIP-418, we finally > >>>>>>>> decided to add all the necessary overloads to parameter class. > >>>>>>>> So I just > >>>>>>>> reproduced that approach here. > >>>>>>>> > >>>>>>>> > (5) Even if it might be an implementation detail (and > >>>>>>>> maybe the KIP > >>>>>>>> > itself does not need to mention it), can you give a high > >>>>>>>> level overview > >>>>>>>> > how you intent to implement it (that would be easier to > >>>>>>>> grog, compared > >>>>>>>> > to reading the PR). > >>>>>>>> > >>>>>>>> Well as with any operation on KStreamImpl level I'm building a > >>>>>>>> store and > >>>>>>>> a processor node. > >>>>>>>> > >>>>>>>> KStreamDistinct class is going to be the ProcessorSupplier, with > >>>>>>>> the > >>>>>>>> logic regarding the forwarding/muting of the records located in > >>>>>>>> KStreamDistinct.KStreamDistinctProcessor#process > >>>>>>>> > >>>>>>>> ---- > >>>>>>>> > >>>>>>>> Matthias, if you are still reading this :-) a gentle reminder: > >>>>>>>> my PR for > >>>>>>>> already accepted KIP-418 is still waiting for your review. I > >>>>>>>> think it's > >>>>>>>> better for me to finalize at least one KIP before proceeding to > >>>>>>>> a new > >>>>>>>> one :-) > >>>>>>>> > >>>>>>>> Regards, > >>>>>>>> > >>>>>>>> Ivan > >>>>>>>> > >>>>>>>> 03.09.2020 4:20, Matthias J. Sax пишет: > >>>>>>>>> Thanks for the KIP Ivan. Having a built-in deduplication > >>>>>>>>> operator is for > >>>>>>>>> sure a good addition. > >>>>>>>>> > >>>>>>>>> Couple of questions: > >>>>>>>>> > >>>>>>>>> (1) Using the `idExtractor` has the issue that data might not be > >>>>>>>>> co-partitioned as you mentioned in the KIP. Thus, I am > >>>>>>>>> wondering if it > >>>>>>>>> might be better to do deduplication only on the key? If one > >>>>>>>>> sets a new > >>>>>>>>> key upstream (ie, extracts the deduplication id into the key), the > >>>>>>>>> `distinct` operator could automatically repartition the data > >>>>>>>>> and thus we > >>>>>>>>> would avoid user errors. > >>>>>>>>> > >>>>>>>>> (2) What is the motivation for allowing the `idExtractor` to > >>>>>>>>> return > >>>>>>>>> `null`? Might be good to have some use-case examples for this > >>>>>>>>> feature. > >>>>>>>>> > >>>>>>>>> (2) Is using a `TimeWindow` really what we want? I was > >>>>>>>>> wondering if a > >>>>>>>>> `SlidingWindow` might be better? Or maybe we need a new type of > >>>>>>>>> window? > >>>>>>>>> > >>>>>>>>> It would be helpful if you could describe potential use cases > >>>>>>>>> in more > >>>>>>>>> detail. -- I am mainly wondering about hopping window? Each > >>>>>>>>> record would > >>>>>>>>> always falls into multiple window and thus would be emitted > >>>>>>>>> multiple > >>>>>>>>> times, ie, each time the window closes. Is this really a valid > >>>>>>>>> use case? > >>>>>>>>> > >>>>>>>>> It seems that for de-duplication, one wants to have some > >>>>>>>>> "expiration > >>>>>>>>> time", ie, for each ID, deduplicate all consecutive records > >>>>>>>>> with the > >>>>>>>>> same ID and emit the first record after the "expiration time" > >>>>>>>>> passed. In > >>>>>>>>> terms of a window, this would mean that the window starts at > >>>>>>>>> `r.ts` and > >>>>>>>>> ends at `r.ts + windowSize`, ie, the window is aligned to the > >>>>>>>>> data. > >>>>>>>>> TimeWindows are aligned to the epoch though. While > >>>>>>>>> `SlidingWindows` also > >>>>>>>>> align to the data, for the aggregation use-case they go > >>>>>>>>> backward in > >>>>>>>>> time, while we need a window that goes forward in time. It's an > >>>>>>>>> open > >>>>>>>>> question if we can re-purpose `SlidingWindows` -- it might be > >>>>>>>>> ok the > >>>>>>>>> make the alignment (into the past vs into the future) an operator > >>>>>>>>> dependent behavior? > >>>>>>>>> > >>>>>>>>> (3) `isPersistent` -- instead of using this flag, it seems > >>>>>>>>> better to > >>>>>>>>> allow users to pass in a `Materialized` parameter next to > >>>>>>>>> `DistinctParameters` to configure the state store? > >>>>>>>>> > >>>>>>>>> (4) I am wondering if we should really have 4 overloads for > >>>>>>>>> `DistinctParameters.with()`? It might be better to have one > >>>>>>>>> overload > >>>>>>>>> with all require parameters, and add optional parameters using the > >>>>>>>>> builder pattern? This seems to follow the DSL Grammer proposal. > >>>>>>>>> > >>>>>>>>> (5) Even if it might be an implementation detail (and maybe the > >>>>>>>>> KIP > >>>>>>>>> itself does not need to mention it), can you give a high level > >>>>>>>>> overview > >>>>>>>>> how you intent to implement it (that would be easier to grog, > >>>>>>>>> compared > >>>>>>>>> to reading the PR). > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -Matthias > >>>>>>>>> > >>>>>>>>> On 8/23/20 4:29 PM, Ivan Ponomarev wrote: > >>>>>>>>>> Sorry, I forgot to add [DISCUSS] tag to the topic > >>>>>>>>>> > >>>>>>>>>> 24.08.2020 2:27, Ivan Ponomarev пишет: > >>>>>>>>>>> Hello, > >>>>>>>>>>> > >>>>>>>>>>> I'd like to start a discussion for KIP-655. > >>>>>>>>>>> > >>>>>>>>>>> KIP-655: > >>>>>>>>>>> > >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API > >>>>>>>> > >>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> I also opened a proof-of-concept PR for you to experiment > >>>>>>>>>>> with the API: > >>>>>>>>>>> > >>>>>>>>>>> PR#9210: https://github.com/apache/kafka/pull/9210 > >>>>>>>>>>> > >>>>>>>>>>> Regards, > >>>>>>>>>>> > >>>>>>>>>>> Ivan Ponomarev > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>> > >>>> > > > >