Thanks Leah. Overall LGTM. A few nits:
- the first figure shows window [9,19] but the window is not aligned properly (it should be 1ms to the right; right now, it's aligned to window [8,18]) - in the second figure, a hopping window would create more windows, ie, the first window would be [-6,14) and the last window would be [19,29), thus it's not just 10 windows but 26 windows (if I did not miss count) - "Two new windows will be created by the late record" late -> out-of-order -Matthias On 7/28/20 4:34 PM, Sophie Blee-Goldman wrote: > Thanks for the update Leah -- I think that all makes sense. > > Cheers, > Sophie > > On Tue, Jul 28, 2020 at 3:55 PM Leah Thomas <ltho...@confluent.io> wrote: > >> Another minor tweak, instead of defining the window by the *size*, it will >> be defined by *timeDifference*, which is the maximum time difference >> between two events. This is a more precise way to define a window due to >> its inclusive ends, while allowing the user to create the window they >> expect. This definition fits with the current examples, where a record at >> *10* would fall into a window of time difference *5* from *[5,10]*. This >> window contains any records at 5, 6, 7, 8, 9, and 10, which is 6 instances >> instead of 5. This semantic difference is why I've shifted *size* to >> *timeDifference*. >> >> The new builder will be *withTimeDifferenceAndGrace*, keeping with other >> conventions. >> >> Let me know if there are any concerns! The vote thread is open as well >> here: >> http://mail-archives.apache.org/mod_mbox/kafka-dev/202007.mbox/browser >> >> Best, >> Leah >> >> On Mon, Jul 27, 2020 at 3:58 PM Leah Thomas <ltho...@confluent.io> wrote: >> >>> A small tweak - to make it more clear to users that grace is required, as >>> well as cleaning up some of the confusing grammar semantics of windows, >> the >>> main builder method for *slidingWindows* will be *withSizeAndGrace* >> instead >>> of *of*. This looks like it'll be the last change (for now) on the >>> public API. If anyone has any comments or thoughts let me know. >> Otherwise, >>> I'll take this to vote shortly. >>> >>> Best, >>> Leah >>> >>> On Fri, Jul 24, 2020 at 3:45 PM Leah Thomas <ltho...@confluent.io> >> wrote: >>> >>>> To accommodate the change to a final class, I've added another >>>> *windowedBy()* function in *CogroupedKStream.java *to handle >>>> SlidingWindows. >>>> >>>> As far as the discussion goes, I think this is the last change we've >>>> talked about. If anyone has other comments or concerns, please let me >> know! >>>> >>>> Cheers, >>>> Leah >>>> >>>> On Thu, Jul 23, 2020 at 7:34 PM Leah Thomas <ltho...@confluent.io> >> wrote: >>>> >>>>> Thanks for the discussion about extending TimeWindows. I agree that >>>>> making it future proof is important, and the implementation of >>>>> SlidingWindows is unique enough that it seems logical to make it its >> own >>>>> final class. >>>>> >>>>> On that note, I've updated the KIP to make SlidingWindows a stand alone >>>>> final class, and added the *windowedBy() *API in *KGroupedStream *to >>>>> handle SlidingWindows. It seems that SlidingWindows would still be >> able to >>>>> leverage *TimeWindowedKStream by* creating a SlidingWindows version of >>>>> *TimeWindowedKStreamImpl* that implements *TimeWindowedKStream. *If >>>>> anyone sees issues with this implementation, please let me know. >>>>> >>>>> Best, >>>>> Leah >>>>> >>>>> On Wed, Jul 22, 2020 at 10:47 PM John Roesler <vvcep...@apache.org> >>>>> wrote: >>>>> >>>>>> Thanks for the reply, Sophie. >>>>>> >>>>>> That all sounds about right to me. >>>>>> >>>>>> The Windows “interface”/algorithm is quite flexible, so it makes sense >>>>>> for it to be extensible. Different implementations can (and do) >> enumerate >>>>>> different windows to suit different use cases. >>>>>> >>>>>> On the other hand, I can’t think of any way to extend SessionWindows >> to >>>>>> do something different using the same algorithm, so it makes sense >> for it >>>>>> to stay final. >>>>>> >>>>>> If we think SlidingWindows is similarly not usefully extensible, then >>>>>> we can make it final. It’s easy to remove final later, but adding it >> is not >>>>>> possible. Or we could go the other route and just make it an >> interface, on >>>>>> general principle. Both of these choices are safe API design. >>>>>> >>>>>> Thanks again, >>>>>> John >>>>>> >>>>>> On Wed, Jul 22, 2020, at 21:54, Sophie Blee-Goldman wrote: >>>>>>>> >>>>>>>> Users could pass in a custom `SessionWindows` as >>>>>>>> long as the session algorithm works correctly for it. >>>>>>> >>>>>>> >>>>>>> Well not really, SessionWindows is a final class. TimeWindows is >> also >>>>>> a >>>>>>> final class, so neither of these can be extended/customized. For a >>>>>> given >>>>>>> Windows then there would only be three (non-overlapping) >>>>>> possibilities: >>>>>>> either it's TimeWindows, SlidingWindows, or a custom Windows. I >> don't >>>>>>> think there's any problem with determining what the user wants in >>>>>> this case >>>>>>> -- >>>>>>> we would just check if it's a SlidingWindows and connect the new >>>>>> processor, >>>>>>> or else connect the existing hopping/tumbling window processor. >>>>>>> >>>>>>> I'll admit that last sentence does leave a bad taste in my mouth. >>>>>> Part of >>>>>>> that >>>>>>> is probably the "leaking" API Matthias pointed out; we just assume >> the >>>>>>> hopping/tumbling window implementation fits all custom windows. But >> I >>>>>> guess >>>>>>> if you really needed to customize the algorithm any further you may >>>>>> as well >>>>>>> stick in a transformer and do it all yourself. >>>>>>> >>>>>>> Anyways, given what we have, it does seem weird to apply one >> algorithm >>>>>>> for most Windows types and then swap in a different one for one >>>>>> specific >>>>>>> extension of Windows. So adding a new #windowedBy(SlidingWindows) >>>>>>> sounds reasonable to me. >>>>>>> >>>>>>> I'm still not convinced that we need a whole new TimeWindowedKStream >>>>>>> equivalent class for sliding windows though. It seems like the >>>>>>> hopping/tumbling >>>>>>> window implementation could benefit just as much from a subtractor >> as >>>>>> the >>>>>>> sliding windows so the only future-proofing we get is the ability to >>>>>> be >>>>>>> lazy and >>>>>>> add the subtractor to one but not the other. Of course it would only >>>>>> be an >>>>>>> optimization so we could just not apply it to one type and nothing >>>>>> would >>>>>>> break. >>>>>>> It does make me nervous to go against the "future-proof" direction, >>>>>> though. >>>>>>> Are there any other examples of things we might want to add to one >>>>>> window >>>>>>> type but not to another? >>>>>>> >>>>>>> On another note, this actually brings up a new question: should >>>>>>> SlidingWindows >>>>>>> also be final? My take is "yes" since there's no reasonable >>>>>> customization of >>>>>>> sliding windows, at least not that I can think of. Thoughts? >>>>>>> >>>>>>> >>>>>>> On Wed, Jul 22, 2020 at 7:15 PM John Roesler <vvcep...@apache.org> >>>>>> wrote: >>>>>>> >>>>>>>> Thanks, all, >>>>>>>> >>>>>>>> I can see how my conclusion was kind of a leap. >>>>>>>> >>>>>>>> What Matthias said is indeed what I was thinking. When you >> provide a >>>>>>>> window definition to the windowBy() method, you are selecting an >>>>>> algorithm >>>>>>>> that will be used to compute the windows from the input data. >>>>>>>> >>>>>>>> I didn’t mean the code implementation “algorithm”, but the >>>>>> high-level >>>>>>>> algorithm that describes how the input stream will be transformed >>>>>> into a >>>>>>>> sequence of windows. >>>>>>>> >>>>>>>> For example, the algorithm for Windows is something like “given >> the >>>>>> record >>>>>>>> timestamp, include the record in each of the enumerated windows”. >>>>>> Note that >>>>>>>> there can be a lot of variation in how the windows are enumerated, >>>>>> which is >>>>>>>> why there are at least a couple of implementations of Windows, and >>>>>> we are >>>>>>>> open to adding more (like for natural calendar boundaries). >>>>>>>> >>>>>>>> For SessionWindows, it’s more like “if any window is within the >> gap, >>>>>>>> extend its boundaries to include this record and if two windows >>>>>> touch, then >>>>>>>> merge them”. >>>>>>>> >>>>>>>> Clearly, the algorithm for SlidingWindows doesn’t fall into either >>>>>>>> category, so it seems inappropriate to claim that it does in the >>>>>> API (by >>>>>>>> implementing Windows with stubbed methods) and then cast >> internally >>>>>> to >>>>>>>> execute a completely different algorithm. >>>>>>>> >>>>>>>> To your other observation, that the DSL object resulting from >>>>>> windowBy >>>>>>>> would look the same for Windows and SessionWindows, maybe it makes >>>>>> sense >>>>>>>> for windowBy(SessionWindows) also to return a TimeWindowedKStream. >>>>>>>> >>>>>>>> i.e.: >>>>>>>> ======================= >>>>>>>> <W extends Window> TimeWindowedKStream<K, V> windowedBy(final >>>>>> Windows<W> >>>>>>>> windows); >>>>>>>> TimeWindowedKStream<K, V> windowedBy(final SlidingWindows >> windows); >>>>>>>> ======================= >>>>>>>> >>>>>>>> I can’t think of a reason this wouldn’t work. But then again, it >>>>>> would be >>>>>>>> more future-proof to go ahead and specify a different return type >>>>>> now, if >>>>>>>> we think we'll want to add subtractors and stuff later. I don't >>>>>> have a >>>>>>>> strong feeling about that part of the API. It seems to be >>>>>> independent of >>>>>>>> whether SlidingWindows extends Windows or not. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> -John >>>>>>>> >>>>>>>> On Wed, Jul 22, 2020, at 19:41, Matthias J. Sax wrote: >>>>>>>>> I think what John tries to say is the following: >>>>>>>>> >>>>>>>>> We have `windowedBy(Windows)` that accept hopping/tumbling >>>>>> windows but >>>>>>>>> also custom window and we use a specific algorithm. Note, that >>>>>> custom >>>>>>>>> windows must "work" based on the used algorithm. >>>>>>>>> >>>>>>>>> For session windows we have `windowedBy(SessionWindows)` and >>>>>> apply a >>>>>>>>> different algorithm. Users could pass in a custom >>>>>> `SessionWindows` as >>>>>>>>> long as the session algorithm works correctly for it. >>>>>>>>> >>>>>>>>> For the new sliding windows, we want to use a different >> algorithm >>>>>>>>> compare to hopping/tumbling windows. If we let sliding window >>>>>> extend >>>>>>>>> `Windows`, we can decide at runtime if we need to use the >>>>>>>>> hopping/tumbling window algorithm for hopping/tumbling windows >> or >>>>>> the >>>>>>>>> new sliding window algorithm for sliding windows. However, if we >>>>>> get a >>>>>>>>> custom window, which algorithm do we pick now? The existing >>>>>>>>> tumbling/hopping window algorithm of the new sliding window >>>>>> algorithm? >>>>>>>>> Both a custom "time-window" and custom "sliding window" >> implement >>>>>> the >>>>>>>>> generic `Windows` class and thus we cannot make a decision as we >>>>>> don't >>>>>>>>> know the user's intent. >>>>>>>>> >>>>>>>>> As a matter of fact, even if the user might not be aware of it, >>>>>> the >>>>>>>>> algorithm we use does already leak into the API (if a user >> extends >>>>>>>>> `Windows` is must work with our hopping/tumbling window >> algorithm >>>>>> and if >>>>>>>>> a user extends `SessionWindows` it must work with our session >>>>>> algorithm) >>>>>>>>> and it seems we need to preserve this property for sliding >> window. >>>>>>>>> >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> On 7/22/20 4:35 PM, Sophie Blee-Goldman wrote: >>>>>>>>>> Hey John, >>>>>>>>>> >>>>>>>>>> Just a few follow-up questions/comments about the whole >> Windows >>>>>> thing: >>>>>>>>>> >>>>>>>>>> That's a good way of looking at things; in particular the >> point >>>>>> about >>>>>>>>>> SessionWindows >>>>>>>>>> for example requiring a Merger while other "statically >>>>>> enumerable" >>>>>>>> windows >>>>>>>>>> require >>>>>>>>>> only an adder seems to touch on the heart of the matter. >>>>>>>>>> >>>>>>>>>> It seems like what Time and Universal (and any other Windows >>>>>>>>>>> implementation) have in common is that the windows are >>>>>> statically >>>>>>>>>>> enumerable. >>>>>>>>>>> As a consequence, they can all rely on an aggregation >>>>>> maintenence >>>>>>>> algorithm >>>>>>>>>>> that involves enumerating each of the windows and updating >> it. >>>>>> That >>>>>>>>>>> also means that their DSL object (TimeWindowedKStream) >> doesn't >>>>>> need >>>>>>>>>>> "subtractors" or "mergers", but only "adders"; again, this >> is a >>>>>>>> consequence >>>>>>>>>>> of the fact that the windows are enumerable. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Given that, I'm a bit confused why you conclude that sliding >>>>>> windows >>>>>>>> are >>>>>>>>>> fundamentally >>>>>>>>>> different from the "statically enumerable" windows -- sliding >>>>>> windows >>>>>>>>>> require only an >>>>>>>>>> adder too. I'm not sure it's a consequence of being >> enumerable, >>>>>> or that >>>>>>>>>> being enumerable >>>>>>>>>> is the fundamental property that unites all Windows (ignoring >>>>>>>> JoinWindows >>>>>>>>>> here). Yes, it >>>>>>>>>> currently does apply to all Windows implementations, but we >>>>>> shouldn't >>>>>>>>>> assume that it >>>>>>>>>> *has *to be that way on the basis that it currently happens to >>>>>> be. >>>>>>>>>> >>>>>>>>>> Also, the fact that they can all rely on the same aggregation >>>>>> algorithm >>>>>>>>>> seems like an >>>>>>>>>> implementation detail and it would be weird to force a >>>>>> separate/new >>>>>>>> DSL API >>>>>>>>>> just because >>>>>>>>>> under the covers we swap in a different processor. >>>>>>>>>> >>>>>>>>>> To be fair, I don't think there's a strong reason *against* >> not >>>>>>>> extending >>>>>>>>>> Windows -- in the end >>>>>>>>>> it will just mean adding a new #windowedBy method and >>>>>> copy/pasting >>>>>>>>>> everything from >>>>>>>>>> TimeWindowedKStream pretty much word for word. But anytime >> you >>>>>> find >>>>>>>>>> yourself >>>>>>>>>> copying over code almost exactly, there should probably be a >>>>>> good >>>>>>>> reason >>>>>>>>>> why :) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Jul 22, 2020 at 3:48 PM John Roesler < >>>>>> vvcep...@apache.org> >>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks Leah! >>>>>>>>>>> >>>>>>>>>>> 5) Regarding the empty windows, I'm wondering if we should >>>>>> simply >>>>>>>> propose >>>>>>>>>>> that the windows should not be emitted downstream of the >>>>>> operator or >>>>>>>>>>> visible in IQ. Then, it'll be up to the implementation to >> make >>>>>> it >>>>>>>> happen. >>>>>>>>>>> I'm >>>>>>>>>>> personally not concerned about it, since it seems like there >>>>>> are >>>>>>>> multiple >>>>>>>>>>> ways to accomplish this. >>>>>>>>>>> >>>>>>>>>>> Note, the discrepancy Matthias pointed out is actually a >>>>>> design bug. >>>>>>>> The >>>>>>>>>>> windowed aggregation (like every operation in Streams) >>>>>> produces a >>>>>>>> "view", >>>>>>>>>>> which then forms the basis of downstream operations. When we >>>>>> pass the >>>>>>>>>>> Materialized option to the operation, all we're doing is >>>>>> saying to >>>>>>>>>>> "materialize" >>>>>>>>>>> the view (aka, actually store the computed view) and also >> make >>>>>> it >>>>>>>>>>> queriable. >>>>>>>>>>> It would be illegal for the "queriable, materialized view" to >>>>>> differ >>>>>>>> in >>>>>>>>>>> any way >>>>>>>>>>> from the "view". So, it seems we must either propose to emit >>>>>> the empty >>>>>>>>>>> windows AND make them visible in IQ, or propose NOT to emit >>>>>> the empty >>>>>>>>>>> windows AND NOT make them visible in IQ. >>>>>>>>>>> >>>>>>>>>>> 7) Regarding whether we can extend TimeWindows (or Windows): >>>>>>>>>>> I've been mulling this over more. I think it's worth asking >> the >>>>>>>> question of >>>>>>>>>>> what these classes even mean. For example, why is >>>>>> SessionWindows a >>>>>>>>>>> different thing from TimeWindows and UniversalWindows (which >>>>>> are both >>>>>>>>>>> Windows)? >>>>>>>>>>> >>>>>>>>>>> This conversation is extra complicated because of the >>>>>> incomplete and >>>>>>>>>>> mis-matched class hierarchy, but we can try to look past it >>>>>> for now. >>>>>>>>>>> >>>>>>>>>>> It seems like what Time and Universal (and any other Windows >>>>>>>>>>> implementation) have in common is that the windows are >>>>>> statically >>>>>>>>>>> enumerable. >>>>>>>>>>> As a consequence, they can all rely on an aggregation >>>>>> maintenence >>>>>>>> algorithm >>>>>>>>>>> that involves enumerating each of the windows and updating >> it. >>>>>> That >>>>>>>>>>> also means that their DSL object (TimeWindowedKStream) >> doesn't >>>>>> need >>>>>>>>>>> "subtractors" or "mergers", but only "adders"; again, this >> is a >>>>>>>> consequence >>>>>>>>>>> of the fact that the windows are enumerable. >>>>>>>>>>> >>>>>>>>>>> In contrast, session windows are data driven, so they are not >>>>>>>> statically >>>>>>>>>>> enumerable. Their algorithm has to rely on scans, and to do >>>>>> the scans, >>>>>>>>>>> it needs to know the "inactivity gap", which needs to be part >>>>>> of the >>>>>>>> window >>>>>>>>>>> definition. Likewise, session windows have the property that >>>>>> they need >>>>>>>>>>> to be merged, so their DSL object also requires mergers. >>>>>>>>>>> >>>>>>>>>>> It really seems like your new window definition doesn't fit >>>>>> into >>>>>>>> either >>>>>>>>>>> category. It uses a different algorithm, which relies on >>>>>> scans, but >>>>>>>> it is >>>>>>>>>>> also fixed in size, so it doesn't need mergers. In this >>>>>> situation, it >>>>>>>> seems >>>>>>>>>>> like the safe bet is to just create SessionWindows with no >>>>>> interface >>>>>>>> and >>>>>>>>>>> add a separate set of DSL operations and objects. It's a >>>>>> little extra >>>>>>>> code, >>>>>>>>>>> but it seems to keep everything tidier and more >>>>>> comprehensible, both >>>>>>>>>>> for us and for users. >>>>>>>>>>> >>>>>>>>>>> What do you think? >>>>>>>>>>> -John >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Jul 22, 2020, at 10:30, Leah Thomas wrote: >>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for the suggestions, I've updated the KIP and child >>>>>> page >>>>>>>>>>> accordingly >>>>>>>>>>>> and addressed some below. >>>>>>>>>>>> >>>>>>>>>>>> 1) For the mandatory grace period, we should use a static >>>>>> builder >>>>>>>> method >>>>>>>>>>>>> that take two parameters. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> That makes sense, I've changed that in the public API. >>>>>>>>>>>> >>>>>>>>>>>> Btw: this implementation actually raises an issue for IQ: >>>>>> those empty >>>>>>>>>>>>> windows would be returned. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> This is a great point, with the current implementation plan >>>>>> empty >>>>>>>> windows >>>>>>>>>>>> would be returned. I think creating a second window store >>>>>> would >>>>>>>>>>> definitely >>>>>>>>>>>> work, but there would be more overhead in having two stores >>>>>> and >>>>>>>> switching >>>>>>>>>>>> windows between the stores, as well as doing scans in both >>>>>> stores to >>>>>>>> find >>>>>>>>>>>> existing windows. There might be a way to do avoid emitting >>>>>> empty >>>>>>>> windows >>>>>>>>>>>> without creating a second window store, I'll look more into >>>>>> it. >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Leah >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Jul 21, 2020 at 1:25 PM Matthias J. Sax < >>>>>> mj...@apache.org> >>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks for updating the KIP. >>>>>>>>>>>>> >>>>>>>>>>>>> Couple of follow up comments: >>>>>>>>>>>>> >>>>>>>>>>>>> 1) For the mandatory grace period, we should use a static >>>>>> builder >>>>>>>>>>> method >>>>>>>>>>>>> that take two parameters. This provides a better API as >> user >>>>>> cannot >>>>>>>>>>>>> forget to set the grace period. Throwing a runtime >> exception >>>>>> seems >>>>>>>> not >>>>>>>>>>>>> to be the best way to handle this case. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> 2) In Fig.2 you list 10 hopping windows. I believe it >> should >>>>>>>> actually >>>>>>>>>>> be >>>>>>>>>>>>> more? There first hopping window would be [-6,-4[ and the >>>>>> last one >>>>>>>>>>> would >>>>>>>>>>>>> be from [19,29[ -- hence, the cost saving are actually much >>>>>> higher. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> 3a) IQ: you are saying that the user need to compute the >>>>>> start time >>>>>>>> as >>>>>>>>>>>>> >>>>>>>>>>>>>> windowSize+the time they're looking at >>>>>>>>>>>>> >>>>>>>>>>>>> Should this be "targetTime - windowSize" instead? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> 3b) IQ: in you example you say "window size of 10 minutes" >>>>>> with an >>>>>>>>>>>>> incident at 9:15. >>>>>>>>>>>>> >>>>>>>>>>>>>> they're looking for a window with the start time of 8:15. >>>>>>>>>>>>> >>>>>>>>>>>>> The example does not seem to add up? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> 4) For "Processing Windows": you describe a three step >>>>>> approach: I >>>>>>>> just >>>>>>>>>>>>> want to point out, that step (1) is not necessary for each >>>>>> input >>>>>>>>>>> record, >>>>>>>>>>>>> because timestamps are not guaranteed to be unique and >> thus a >>>>>>>> previous >>>>>>>>>>>>> record with the same key and timestamp might have create >> the >>>>>> windows >>>>>>>>>>>>> already. >>>>>>>>>>>>> >>>>>>>>>>>>> Nit: I am also not exactly sure what you mean by step (3) >> as >>>>>> you use >>>>>>>>>>> the >>>>>>>>>>>>> word "send". I guess you mean "put"? >>>>>>>>>>>>> >>>>>>>>>>>>> It seem there are actually more details in the sub-page: >>>>>>>>>>>>> >>>>>>>>>>>>>> A new record for SlidingWindows will always create two new >>>>>> windows. >>>>>>>>>>> If >>>>>>>>>>>>> either of those windows already exist in the windows store, >>>>>> their >>>>>>>>>>>>> aggregation will simply be updated to include the new >>>>>> record, but no >>>>>>>>>>>>> duplicate window will be added to the WindowStore. >>>>>>>>>>>>> >>>>>>>>>>>>> However, the first and second sentence contradict each >> other >>>>>> a >>>>>>>> little >>>>>>>>>>>>> bit. I think the first sentence is not correct. >>>>>>>>>>>>> >>>>>>>>>>>>> Nit: >>>>>>>>>>>>> >>>>>>>>>>>>>> For in-order records, the left window will always be >> empty. >>>>>>>>>>>>> >>>>>>>>>>>>> This should be "right window" ? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> 5) "Emitting Results": it might be worth to point out, >> that a >>>>>>>>>>>>> second/future window of a new record is create with no >>>>>> records, and >>>>>>>>>>>>> thus, even if it's initialized it won't be emitted. Only >> if a >>>>>>>>>>>>> consecutive record falls into the window, the window would >> be >>>>>>>> updates >>>>>>>>>>>>> and the window result (for a window content of one record) >>>>>> would be >>>>>>>>>>> sent >>>>>>>>>>>>> downstream. >>>>>>>>>>>>> >>>>>>>>>>>>> Again, the sub-page contains this details. Might still be >>>>>> worth to >>>>>>>> add >>>>>>>>>>>>> to the top level page, too. >>>>>>>>>>>>> >>>>>>>>>>>>> Btw: this implementation actually raises an issue for IQ: >>>>>> those >>>>>>>> empty >>>>>>>>>>>>> windows would be returned. Thus I am wondering if we need >> to >>>>>> use two >>>>>>>>>>>>> stores internally? One store for actual windows and one >>>>>> store for >>>>>>>> empty >>>>>>>>>>>>> windows? If an empty window is updated, it's move to the >>>>>> other >>>>>>>> store? >>>>>>>>>>>>> For IQ, we only allow to query the non-empty-window store? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> 6) On the sub-page: >>>>>>>>>>>>> >>>>>>>>>>>>>> The left window of in-order records and both windows for >>>>>>>> out-of-order >>>>>>>>>>>>> records need to be updated with the values of records that >>>>>> have >>>>>>>> already >>>>>>>>>>>>> been processed. >>>>>>>>>>>>> >>>>>>>>>>>>> Why "both windows for out-of-order records"? IMHO, we don't >>>>>> know how >>>>>>>>>>>>> many existing windows needs to be updated when processing >> an >>>>>>>>>>>>> out-of-order record. Of course, an out-of-order record >> could >>>>>> not >>>>>>>> fall >>>>>>>>>>>>> into any existing window but create two new windows, too. >>>>>>>>>>>>> >>>>>>>>>>>>>> Because each record creates one new window that includes >>>>>> itself >>>>>>>> and >>>>>>>>>>> one >>>>>>>>>>>>> window that does not >>>>>>>>>>>>> >>>>>>>>>>>>> As state above, this does not seem to hold. I understand >> why >>>>>> you >>>>>>>> mean, >>>>>>>>>>>>> but it would be good to be exact. >>>>>>>>>>>>> >>>>>>>>>>>>> Figure 2: You use the term "late" but you mean >>>>>> "out-of-order" I >>>>>>>> guess >>>>>>>>>>> -- >>>>>>>>>>>>> a record is _late_ if it's not processed any longer as the >>>>>> grace >>>>>>>> period >>>>>>>>>>>>> passed already. >>>>>>>>>>>>> >>>>>>>>>>>>> Figure 2: "Late" should be out-or-order. The example text >>>>>> say a >>>>>>>> window >>>>>>>>>>>>> [16,26] should be created but the figure shows the green >>>>>> window as >>>>>>>>>>> [15,20]. >>>>>>>>>>>>> >>>>>>>>>>>>> About the blue window: maybe add not that the blue window >>>>>> contains >>>>>>>> the >>>>>>>>>>>>> aggregate we need for the green window, _before_ the new >>>>>> record `a` >>>>>>>> is >>>>>>>>>>>>> added to the blue window. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> 7) I am not really happy to extend TimeWindows and I think >>>>>> the >>>>>>>> argument >>>>>>>>>>>>> about JoinWindows is not the best (IMHO, JoinWindows do it >>>>>> already >>>>>>>>>>> wrong >>>>>>>>>>>>> and we just repeat the same mistake). However, it seems our >>>>>> window >>>>>>>>>>>>> hierarchy is "broken" already and it might be out of scope >>>>>> for this >>>>>>>> KIP >>>>>>>>>>>>> to fix it. Hence, I am ok that we bite the bullet for now >>>>>> and clean >>>>>>>> it >>>>>>>>>>>>> up later. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -Matthias >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On 7/20/20 5:18 PM, Guozhang Wang wrote: >>>>>>>>>>>>>> Hi Leah, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for the updated KIP. I agree that extending >>>>>> SlidingWindows >>>>>>>>>>> from >>>>>>>>>>>>>> Windows is fine for the sake of not introducing more >> public >>>>>> APIs >>>>>>>> (and >>>>>>>>>>>>> their >>>>>>>>>>>>>> internal xxxImpl classes), and its cons is small enough to >>>>>> tolerate >>>>>>>>>>> to >>>>>>>>>>>>> me. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Jul 20, 2020 at 1:49 PM Leah Thomas < >>>>>> ltho...@confluent.io> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for the feedback on the KIP. I've updated the KIP >>>>>> page >>>>>>>>>>>>>>> < >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> to address these points and have created a child page >>>>>>>>>>>>>>> < >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> to go more in depth on certain implementation details. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *Grace Period:* >>>>>>>>>>>>>>> I think Sophie raises a good point that the default grace >>>>>> period >>>>>>>> of >>>>>>>>>>> 24 >>>>>>>>>>>>>>> hours is often too long and was chosen when retention >> time >>>>>> and >>>>>>>> grace >>>>>>>>>>>>> period >>>>>>>>>>>>>>> were the same. For SlidingWindows, I propose we make the >>>>>> grace >>>>>>>>>>> period >>>>>>>>>>>>>>> mandatory. To keep formatting consistent with other types >>>>>> of >>>>>>>>>>> windows, >>>>>>>>>>>>> grace >>>>>>>>>>>>>>> period won't be an additional parameter in the #of >> method, >>>>>> but >>>>>>>> will >>>>>>>>>>>>> still >>>>>>>>>>>>>>> look like it does in other use cases: >>>>>>>>>>>>>>> >>>>>> .windowedBy(SlidingWindows.of(twentySeconds).grace(fiftySeconds). >>>>>>>> If >>>>>>>>>>>>> grace >>>>>>>>>>>>>>> period isn't properly initialized, an error will be >> thrown >>>>>> through >>>>>>>>>>> the >>>>>>>>>>>>>>> process method. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *Storage Layer + Aggregation:* >>>>>>>>>>>>>>> SlidingWindows will use a WindowStore because computation >>>>>> can be >>>>>>>>>>> done >>>>>>>>>>>>> with >>>>>>>>>>>>>>> the information stored in a WindowStore (window timestamp >>>>>> and >>>>>>>>>>> value). >>>>>>>>>>>>> Using >>>>>>>>>>>>>>> the WindowStore also simplifies computation as >>>>>> SlidingWindows can >>>>>>>>>>>>> leverage >>>>>>>>>>>>>>> existing processes. Because we are using a WindowStore, >> the >>>>>>>>>>> aggregation >>>>>>>>>>>>>>> process will be similar to that of a hopping window. As >>>>>> records >>>>>>>>>>> come in >>>>>>>>>>>>>>> their value is added to the aggregation that already >>>>>> exists, >>>>>>>>>>> following >>>>>>>>>>>>> the >>>>>>>>>>>>>>> same procedure as hopping windows. The aggregation >>>>>> difference >>>>>>>>>>> between >>>>>>>>>>>>>>> SlidingWindows and HoppingWindows comes in creating new >>>>>> windows >>>>>>>> for >>>>>>>>>>> a >>>>>>>>>>>>>>> SlidingWindow, where you need to find the existing >> records >>>>>> that >>>>>>>>>>> belong >>>>>>>>>>>>> to >>>>>>>>>>>>>>> the new window. This computation is similar to the >>>>>> aggregation in >>>>>>>>>>>>>>> SessionWindows and requires a scan to the WindowStore to >>>>>> find the >>>>>>>>>>> window >>>>>>>>>>>>>>> with the aggregation needed, which will always be >>>>>> pre-computed. >>>>>>>> The >>>>>>>>>>> scan >>>>>>>>>>>>>>> requires creating an iterator, but should have minimal >>>>>> performance >>>>>>>>>>>>> effects >>>>>>>>>>>>>>> as this strategy is already implemented in >> SessionWindows. >>>>>> More >>>>>>>>>>> details >>>>>>>>>>>>> on >>>>>>>>>>>>>>> finding the aggregation that needs to be put in a new >>>>>> window can >>>>>>>> be >>>>>>>>>>>>> found >>>>>>>>>>>>>>> on the implementation page >>>>>>>>>>>>>>> < >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> . >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *Extending Windows<TimeWindow>, Windows<Window> or >> nothing* >>>>>>>>>>>>>>> Because SlidingWindows are still defined by a windowSize >>>>>> (whereas >>>>>>>>>>>>>>> SessionWindows are defined purely by data), I think it >>>>>> makes sense >>>>>>>>>>> to >>>>>>>>>>>>>>> leverage the existing Window processes instead of >> creating >>>>>> a new >>>>>>>>>>> store >>>>>>>>>>>>> type >>>>>>>>>>>>>>> that would be very similar to the WindowStore. While it's >>>>>> true >>>>>>>> that >>>>>>>>>>> the >>>>>>>>>>>>>>> #windowsFor method isn't necessary for SlidingWindows, >>>>>> JoinWindows >>>>>>>>>>> also >>>>>>>>>>>>>>> extends Windows<Window> and throws an >>>>>>>> UnsupportedOperationException >>>>>>>>>>> in >>>>>>>>>>>>> the >>>>>>>>>>>>>>> #windowsFor method, which is what SlidingWindows can do. >>>>>> The >>>>>>>>>>> difference >>>>>>>>>>>>>>> between extending Windows<TimeWindow> or Windows<Window> >> is >>>>>>>>>>> minimal, as >>>>>>>>>>>>>>> both are ways to pass window parameters. Extending >>>>>>>>>>> Windows<TimeWindow> >>>>>>>>>>>>> will >>>>>>>>>>>>>>> give us more leverage in utilizing existing processes. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *Emit Strategy* >>>>>>>>>>>>>>> I would argue that emitting for every update is still the >>>>>> best way >>>>>>>>>>> to go >>>>>>>>>>>>>>> for SlidingWindows because it mimics the other types of >>>>>> windows, >>>>>>>> and >>>>>>>>>>>>>>> suppression can be leveraged to limit what SlidingWindows >>>>>> emits. >>>>>>>>>>> While >>>>>>>>>>>>> some >>>>>>>>>>>>>>> users may only want to see the last value, others may >> want >>>>>> to see >>>>>>>>>>> more, >>>>>>>>>>>>> and >>>>>>>>>>>>>>> leaving the emit strategy to emit partial results allows >>>>>> both >>>>>>>> users >>>>>>>>>>> to >>>>>>>>>>>>>>> access what they want. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *Additional Features* >>>>>>>>>>>>>>> Supporting sliding windows inherently, and shifting >>>>>> inefficient >>>>>>>>>>> hopping >>>>>>>>>>>>>>> windows to sliding windows, is an interesting idea and >>>>>> could be >>>>>>>>>>> built on >>>>>>>>>>>>>>> top of SlidingWindows when they are finished, but right >>>>>> now seems >>>>>>>>>>> out of >>>>>>>>>>>>>>> scope for the needs of this KIP. Similarly, including a >>>>>>>>>>> `subtraction` >>>>>>>>>>>>>>> feature could have performance improvements, but doesn't >>>>>> seem >>>>>>>>>>> necessary >>>>>>>>>>>>> for >>>>>>>>>>>>>>> the implementation of this KIP. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Let me know what you think of the updates, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Leah >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Jul 16, 2020 at 11:57 AM John Roesler < >>>>>>>> vvcep...@apache.org> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hello all, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for the KIP, Leah! >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Regarding (1): I'd go farther actually. Making Windows >> an >>>>>>>> abstract >>>>>>>>>>>>>>>> class was a mistake from the beginning that led to us >> not >>>>>> being >>>>>>>>>>>>>>>> able to fix a very confusing situation for users around >>>>>> retention >>>>>>>>>>>>> times, >>>>>>>>>>>>>>>> final results emitting, etc. Thus, I would not suggest >>>>>> extending >>>>>>>>>>>>>>>> TimeWindows for sure, but would also not suggest >> extending >>>>>>>> Windows. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The very simplest thing to do is follow the example of >>>>>>>>>>> SessionWindows, >>>>>>>>>>>>>>>> which is just a completely self-contained class. If we >>>>>> don't mess >>>>>>>>>>> with >>>>>>>>>>>>>>>> class inheritance, we won't ever have any of the >> problems >>>>>> related >>>>>>>>>>> to >>>>>>>>>>>>>>>> class inheritance. This is my preferred solution. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Still, Sliding windows has a lot in common with >>>>>> TimeWindows and >>>>>>>>>>> other >>>>>>>>>>>>>>>> fixed-size windows, namely that the windows are fixed in >>>>>> size. If >>>>>>>>>>> we >>>>>>>>>>>>> want >>>>>>>>>>>>>>>> to preserve the current two-part windowing API in which >>>>>> you can >>>>>>>>>>> window >>>>>>>>>>>>>>>> by either "fixed" or "data driven" modes, I'd suggest we >>>>>> avoid >>>>>>>>>>>>> increasing >>>>>>>>>>>>>>>> the blast radius of Windows by taking the opportunity to >>>>>> replace >>>>>>>> it >>>>>>>>>>>>> with >>>>>>>>>>>>>>>> a proper interface and implement that interface instead. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> For example: >>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/9031 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Then, SlidingWindows would just implement >>>>>>>> FixedSizeWindowDefinition >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ====== >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Regarding (2), it seems more straightforward as a user >> of >>>>>> Streams >>>>>>>>>>>>>>>> to just have one mental model. _All_ of our aggregation >>>>>>>> operations >>>>>>>>>>>>>>>> follow an eager emission model, in which we just emit an >>>>>> update >>>>>>>>>>>>> whenever >>>>>>>>>>>>>>>> an update is available. We already provided Suppression >> to >>>>>>>>>>> explicitly >>>>>>>>>>>>>>> apply >>>>>>>>>>>>>>>> different update semantics in the case it's required. >> Why >>>>>> should >>>>>>>> we >>>>>>>>>>>>>>> define >>>>>>>>>>>>>>>> a snowflake operation with completely different >> semantics >>>>>> from >>>>>>>>>>>>> everything >>>>>>>>>>>>>>>> else? I.e., systems are generally easier to use when >> they >>>>>> follow >>>>>>>> a >>>>>>>>>>> few >>>>>>>>>>>>>>>> simple, composable rules than when they have a lot of >>>>>> different, >>>>>>>>>>>>> specific >>>>>>>>>>>>>>>> rules. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ====== >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> New point: (4): >>>>>>>>>>>>>>>> It would be nice to include some examples of user code >>>>>> that would >>>>>>>>>>> use >>>>>>>>>>>>> the >>>>>>>>>>>>>>>> new API, which should include: >>>>>>>>>>>>>>>> 1. using the DSL with the sliding window definition >>>>>>>>>>>>>>>> 2. accessing the stored results of a sliding window >>>>>> aggregation >>>>>>>>>>> via IQ >>>>>>>>>>>>>>>> 3. defining a custom processor to access sliding windows >>>>>> in a >>>>>>>> store >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> It generally helps reviewers wrap their heads around the >>>>>>>> proposal, >>>>>>>>>>> as >>>>>>>>>>>>>>> well >>>>>>>>>>>>>>>> as shaking out any design issues that would otherwise >>>>>> only come >>>>>>>> up >>>>>>>>>>>>> during >>>>>>>>>>>>>>>> implementation/testing/review. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks again for the awesome proposal! >>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Tue, Jul 14, 2020, at 12:31, Guozhang Wang wrote: >>>>>>>>>>>>>>>>> Hello Leah, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for the nice written KIP. A few thoughts: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 1) I echo the other reviewer's comments regarding the >>>>>> typing: >>>>>>>> why >>>>>>>>>>>>>>>> extending >>>>>>>>>>>>>>>>> TimeWindow instead of just extending Window? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 2) I also feel that emitting policy for this type of >>>>>> windowing >>>>>>>>>>>>>>>> aggregation >>>>>>>>>>>>>>>>> may be different from the existing ones. Existing >>>>>> emitting >>>>>>>> policy >>>>>>>>>>> is >>>>>>>>>>>>>>> very >>>>>>>>>>>>>>>>> simple: emit every time when window get updates, and >>>>>> emit every >>>>>>>>>>> time >>>>>>>>>>>>> on >>>>>>>>>>>>>>>>> out-of-ordering data within grace period, this is >>>>>> because for >>>>>>>>>>>>>>>> time-windows >>>>>>>>>>>>>>>>> the window close time is strictly depend on the window >>>>>> start >>>>>>>> time >>>>>>>>>>>>> which >>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>> fixed, while for session-windows although the window >>>>>> open/close >>>>>>>>>>> time >>>>>>>>>>>>> is >>>>>>>>>>>>>>>>> also data-dependent it is relatively infrequent >> compared >>>>>> to the >>>>>>>>>>>>>>>>> sliding-windows. For this KIP, since each new data >> would >>>>>> cause a >>>>>>>>>>>>>>>>> new sliding-window, the num. windows maintained >>>>>> logically could >>>>>>>> be >>>>>>>>>>>>> much >>>>>>>>>>>>>>>>> larger and hence emitting on each update may be too >>>>>> aggressive. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 3) Although KIP itself should be focusing on user face >>>>>>>>>>> interfaces, I'd >>>>>>>>>>>>>>>>> suggest we create a children page of KIP-450 discussing >>>>>> about >>>>>>>> its >>>>>>>>>>>>>>>>> implementations as well, since some of that may drive >> the >>>>>>>>>>> interface >>>>>>>>>>>>>>>> design. >>>>>>>>>>>>>>>>> E.g. personally I think having a combiner interface in >>>>>> addition >>>>>>>> to >>>>>>>>>>>>>>>>> aggregator would be useful but that's based on my >> 2cents >>>>>> about >>>>>>>> the >>>>>>>>>>>>>>>>> implementation design (I once created a child page >>>>>> describing >>>>>>>> it: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation >>>>>>>>>>>>>>>>> ). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Tue, Jul 14, 2020 at 5:31 AM Bruno Cadonna < >>>>>>>> br...@confluent.io >>>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Leah, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thank you for the KIP! >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Here is my feedback: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 1. The KIP would benefit from some code examples that >>>>>> show how >>>>>>>>>>> to use >>>>>>>>>>>>>>>>>> sliding windows in aggregations. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 2. The different sliding windows in Figure 1 and 2 are >>>>>> really >>>>>>>>>>> hard to >>>>>>>>>>>>>>>>>> distinguish. Could you please try to make them >>>>>> graphically >>>>>>>> better >>>>>>>>>>>>>>>>>> distinguishable? You could try to draw the frames of >>>>>>>> consecutive >>>>>>>>>>>>>>>>>> windows shifted to each other. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 3. I agree with Matthias, that extending >>>>>> Windows<TimeWindow> >>>>>>>>>>> does not >>>>>>>>>>>>>>>>>> seem to be the best approach. What would be the result >>>>>> of >>>>>>>>>>>>>>>>>> windowsFor()? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 4. In the section "Public Interfaces" you should >> remove >>>>>>>>>>>>>>> implementation >>>>>>>>>>>>>>>>>> details like private constructors and private fields. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 5. Do we need a new store interface or can we use >>>>>> WindowStore? >>>>>>>>>>> Some >>>>>>>>>>>>>>>>>> words about that would be informative. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 6. @Matthias, if the subtrator is not strictly needed, >>>>>> I would >>>>>>>>>>> skip >>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>> for now and add it later. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 7. I also agree that having a section that describes >>>>>> how to >>>>>>>>>>> handle >>>>>>>>>>>>>>>>>> out-of-order records would be good to understand what >>>>>> is still >>>>>>>>>>>>>>> missing >>>>>>>>>>>>>>>>>> and what we can reuse. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>> Bruno >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Sat, Jul 11, 2020 at 9:16 PM Matthias J. Sax < >>>>>>>>>>> mj...@apache.org> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Leah, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> thanks for your update. However, it does not >>>>>> completely answer >>>>>>>>>>> my >>>>>>>>>>>>>>>>>> question. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> In our current window implementations, we emit a >>>>>> window result >>>>>>>>>>>>>>> update >>>>>>>>>>>>>>>>>>> record (ie, early/partial result) for each input >>>>>> record. When >>>>>>>> an >>>>>>>>>>>>>>>>>>> out-of-order record arrives, we just update to >>>>>> corresponding >>>>>>>> old >>>>>>>>>>>>>>>> window >>>>>>>>>>>>>>>>>>> and emit another update. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> It's unclear from the KIP if you propose the same >> emit >>>>>>>>>>> strategy? -- >>>>>>>>>>>>>>>> For >>>>>>>>>>>>>>>>>>> sliding windows it might be worth to consider to use >> a >>>>>>>> different >>>>>>>>>>>>>>> emit >>>>>>>>>>>>>>>>>>> strategy and only support emitting the final result >>>>>> only (ie, >>>>>>>>>>> after >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> grace period passed)? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Boyang, also raises a good point that relates to my >>>>>> point from >>>>>>>>>>>>>>> above >>>>>>>>>>>>>>>>>>> about pre-aggregations and storage layout. Our >> current >>>>>> time >>>>>>>>>>> windows >>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>> all pre-aggregated and stored in parallel. We can >> also >>>>>> lookup >>>>>>>>>>>>>>> windows >>>>>>>>>>>>>>>>>>> efficiently, as we can compute the windowed-key given >>>>>> the >>>>>>>> input >>>>>>>>>>>>>>>> record >>>>>>>>>>>>>>>>>>> key and timestamp based on the window definition. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> However, for sliding windows, window boundaries are >>>>>> data >>>>>>>>>>> dependent >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> thus we cannot compute them upfront. Thus, how can we >>>>>> "find" >>>>>>>>>>>>>>> existing >>>>>>>>>>>>>>>>>>> window efficiently? Furthermore, out-of-order data >>>>>> would >>>>>>>> create >>>>>>>>>>> new >>>>>>>>>>>>>>>>>>> windows in the past and we need to be able to handle >>>>>> this >>>>>>>> case. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thus, to handle out-of-order data correctly, we need >>>>>> to store >>>>>>>>>>> all >>>>>>>>>>>>>>> raw >>>>>>>>>>>>>>>>>>> input events. Additionally, we could also store >>>>>> pre-aggregated >>>>>>>>>>>>>>>> results >>>>>>>>>>>>>>>>>>> if we thinks it's benfitial. -- If we apply "emit >> only >>>>>> final >>>>>>>>>>>>>>> results" >>>>>>>>>>>>>>>>>>> strategy, storing pre-aggregated result would not be >>>>>> necessary >>>>>>>>>>>>>>>> though. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Btw: for sliding windows it might also be useful to >>>>>> consider >>>>>>>>>>>>>>> allowing >>>>>>>>>>>>>>>>>>> users to supply a `Subtractor` -- this subtractor >>>>>> could be >>>>>>>>>>> applied >>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>> the current window result (in case we store it) if a >>>>>> record >>>>>>>>>>> drops >>>>>>>>>>>>>>>> out of >>>>>>>>>>>>>>>>>>> the window. Of course, not all aggregation functions >>>>>> are >>>>>>>>>>>>>>> subtractable >>>>>>>>>>>>>>>>>>> and we can consider this as a follow up task, too, >> and >>>>>> not >>>>>>>>>>> include >>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>> this KIP for now. Thoughts? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I was also thinking about the type hierarchy. I am >> not >>>>>> sure if >>>>>>>>>>>>>>>> extending >>>>>>>>>>>>>>>>>>> TimeWindow is the best approach? For TimeWindows, we >>>>>> can >>>>>>>>>>>>>>> pre-compute >>>>>>>>>>>>>>>>>>> window boundaries (cf `windowsFor()`) while for a >>>>>> sliding >>>>>>>> window >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> boundaries are data dependent. Session windows are >>>>>> also data >>>>>>>>>>>>>>>> dependent >>>>>>>>>>>>>>>>>>> and thus they don't inherit from TimeWindow (Maybe >>>>>> check out >>>>>>>> the >>>>>>>>>>>>>>> KIP >>>>>>>>>>>>>>>>>>> that added session windows? It could provides some >> good >>>>>>>>>>> insights.) >>>>>>>>>>>>>>>> -- I >>>>>>>>>>>>>>>>>>> believe the same rational applies to sliding windows? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On 7/10/20 12:47 PM, Boyang Chen wrote: >>>>>>>>>>>>>>>>>>>> Thanks Leah and Sophie for the KIP. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 1. I'm a bit surprised that we don't have an advance >>>>>> time. >>>>>>>>>>> Could >>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>> elaborate how the storage layer is structured? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 2. IIUC, there will be extra cost in terms of >> fetching >>>>>>>>>>>>>>> aggregation >>>>>>>>>>>>>>>>>> results, >>>>>>>>>>>>>>>>>>>> since we couldn't pre-aggregate until the user asks >>>>>> for it. >>>>>>>>>>> Would >>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>> good >>>>>>>>>>>>>>>>>>>> to also discuss it. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 3. We haven't discussed the possibility of >> supporting >>>>>> sliding >>>>>>>>>>>>>>>> windows >>>>>>>>>>>>>>>>>>>> inherently. For a user who actually uses a hopping >>>>>> window, >>>>>>>>>>>>>>> Streams >>>>>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>>> detect such an inefficiency doing a >>>>>> window_size/advance_time >>>>>>>>>>>>>>> ratio >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> reach >>>>>>>>>>>>>>>>>>>> a conclusion on whether the write amplification is >>>>>> too high >>>>>>>>>>>>>>>> compared >>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>> some configured threshold. The benefit of doing so >> is >>>>>> that >>>>>>>>>>>>>>> existing >>>>>>>>>>>>>>>>>> Streams >>>>>>>>>>>>>>>>>>>> users don't need to change their code, learn a new >>>>>> API, but >>>>>>>>>>> only >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> upgrade >>>>>>>>>>>>>>>>>>>> Streams library to get benefits for their >> inefficient >>>>>> hopping >>>>>>>>>>>>>>>> window >>>>>>>>>>>>>>>>>>>> implementation. There might be some compatibility >>>>>> issues for >>>>>>>>>>>>>>> sure, >>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>>> worth listing them out for trade-off. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Boyang >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Fri, Jul 10, 2020 at 12:40 PM Leah Thomas < >>>>>>>>>>>>>>> ltho...@confluent.io >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hey Matthias, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks for pointing that out. I added the following >>>>>> to the >>>>>>>>>>>>>>> Propose >>>>>>>>>>>>>>>>>> Changes >>>>>>>>>>>>>>>>>>>>> section of the KIP: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> "Records that come out of order will be processed >>>>>> the same >>>>>>>> way >>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>> in-order >>>>>>>>>>>>>>>>>>>>> records, as long as they fall within the grace >>>>>> period. Any >>>>>>>> new >>>>>>>>>>>>>>>> windows >>>>>>>>>>>>>>>>>>>>> created by the late record will still be created, >>>>>> and the >>>>>>>>>>>>>>> existing >>>>>>>>>>>>>>>>>> windows >>>>>>>>>>>>>>>>>>>>> that are changed by the late record will be >> updated. >>>>>> Any >>>>>>>>>>> record >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>> falls >>>>>>>>>>>>>>>>>>>>> outside of the grace period (either user defined or >>>>>> default) >>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>> discarded. " >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> All the best, >>>>>>>>>>>>>>>>>>>>> Leah >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Thu, Jul 9, 2020 at 9:47 PM Matthias J. Sax < >>>>>>>>>>>>>>> mj...@apache.org> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Leah, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> thanks a lot for the KIP. Very well written. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> The KIP does not talk about the handling of >>>>>> out-of-order >>>>>>>> data >>>>>>>>>>>>>>>> though. >>>>>>>>>>>>>>>>>>>>>> How do you propose to address this? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On 7/8/20 5:33 PM, Leah Thomas wrote: >>>>>>>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>>>>>>> I'd like to kick-off the discussion for KIP-450, >>>>>> adding >>>>>>>>>>>>>>> sliding >>>>>>>>>>>>>>>>>> window >>>>>>>>>>>>>>>>>>>>>>> aggregation support to Kafka Streams. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Let me know what you think, >>>>>>>>>>>>>>>>>>>>>>> Leah >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Attachments: >>>>>>>>> * signature.asc >>>>>>>> >>>>>>> >>>>>> >>>>> >> >
signature.asc
Description: OpenPGP digital signature