Thanks for the nits Matthias, I've updated the examples and language accordingly.
Leah On Tue, Jul 28, 2020 at 6:43 PM Matthias J. Sax <mj...@apache.org> wrote: > Thanks Leah. Overall LGTM. > > A few nits: > > - the first figure shows window [9,19] but the window is not aligned > properly (it should be 1ms to the right; right now, it's aligned to > window [8,18]) > > - in the second figure, a hopping window would create more windows, ie, > the first window would be [-6,14) and the last window would be [19,29), > thus it's not just 10 windows but 26 windows (if I did not miss count) > > - "Two new windows will be created by the late record" > > late -> out-of-order > > > -Matthias > > > > On 7/28/20 4:34 PM, Sophie Blee-Goldman wrote: > > Thanks for the update Leah -- I think that all makes sense. > > > > Cheers, > > Sophie > > > > On Tue, Jul 28, 2020 at 3:55 PM Leah Thomas <ltho...@confluent.io> > wrote: > > > >> Another minor tweak, instead of defining the window by the *size*, it > will > >> be defined by *timeDifference*, which is the maximum time difference > >> between two events. This is a more precise way to define a window due to > >> its inclusive ends, while allowing the user to create the window they > >> expect. This definition fits with the current examples, where a record > at > >> *10* would fall into a window of time difference *5* from *[5,10]*. This > >> window contains any records at 5, 6, 7, 8, 9, and 10, which is 6 > instances > >> instead of 5. This semantic difference is why I've shifted *size* to > >> *timeDifference*. > >> > >> The new builder will be *withTimeDifferenceAndGrace*, keeping with other > >> conventions. > >> > >> Let me know if there are any concerns! The vote thread is open as well > >> here: > >> http://mail-archives.apache.org/mod_mbox/kafka-dev/202007.mbox/browser > >> > >> Best, > >> Leah > >> > >> On Mon, Jul 27, 2020 at 3:58 PM Leah Thomas <ltho...@confluent.io> > wrote: > >> > >>> A small tweak - to make it more clear to users that grace is required, > as > >>> well as cleaning up some of the confusing grammar semantics of windows, > >> the > >>> main builder method for *slidingWindows* will be *withSizeAndGrace* > >> instead > >>> of *of*. This looks like it'll be the last change (for now) on the > >>> public API. If anyone has any comments or thoughts let me know. > >> Otherwise, > >>> I'll take this to vote shortly. > >>> > >>> Best, > >>> Leah > >>> > >>> On Fri, Jul 24, 2020 at 3:45 PM Leah Thomas <ltho...@confluent.io> > >> wrote: > >>> > >>>> To accommodate the change to a final class, I've added another > >>>> *windowedBy()* function in *CogroupedKStream.java *to handle > >>>> SlidingWindows. > >>>> > >>>> As far as the discussion goes, I think this is the last change we've > >>>> talked about. If anyone has other comments or concerns, please let me > >> know! > >>>> > >>>> Cheers, > >>>> Leah > >>>> > >>>> On Thu, Jul 23, 2020 at 7:34 PM Leah Thomas <ltho...@confluent.io> > >> wrote: > >>>> > >>>>> Thanks for the discussion about extending TimeWindows. I agree that > >>>>> making it future proof is important, and the implementation of > >>>>> SlidingWindows is unique enough that it seems logical to make it its > >> own > >>>>> final class. > >>>>> > >>>>> On that note, I've updated the KIP to make SlidingWindows a stand > alone > >>>>> final class, and added the *windowedBy() *API in *KGroupedStream *to > >>>>> handle SlidingWindows. It seems that SlidingWindows would still be > >> able to > >>>>> leverage *TimeWindowedKStream by* creating a SlidingWindows version > of > >>>>> *TimeWindowedKStreamImpl* that implements *TimeWindowedKStream. *If > >>>>> anyone sees issues with this implementation, please let me know. > >>>>> > >>>>> Best, > >>>>> Leah > >>>>> > >>>>> On Wed, Jul 22, 2020 at 10:47 PM John Roesler <vvcep...@apache.org> > >>>>> wrote: > >>>>> > >>>>>> Thanks for the reply, Sophie. > >>>>>> > >>>>>> That all sounds about right to me. > >>>>>> > >>>>>> The Windows “interface”/algorithm is quite flexible, so it makes > sense > >>>>>> for it to be extensible. Different implementations can (and do) > >> enumerate > >>>>>> different windows to suit different use cases. > >>>>>> > >>>>>> On the other hand, I can’t think of any way to extend SessionWindows > >> to > >>>>>> do something different using the same algorithm, so it makes sense > >> for it > >>>>>> to stay final. > >>>>>> > >>>>>> If we think SlidingWindows is similarly not usefully extensible, > then > >>>>>> we can make it final. It’s easy to remove final later, but adding it > >> is not > >>>>>> possible. Or we could go the other route and just make it an > >> interface, on > >>>>>> general principle. Both of these choices are safe API design. > >>>>>> > >>>>>> Thanks again, > >>>>>> John > >>>>>> > >>>>>> On Wed, Jul 22, 2020, at 21:54, Sophie Blee-Goldman wrote: > >>>>>>>> > >>>>>>>> Users could pass in a custom `SessionWindows` as > >>>>>>>> long as the session algorithm works correctly for it. > >>>>>>> > >>>>>>> > >>>>>>> Well not really, SessionWindows is a final class. TimeWindows is > >> also > >>>>>> a > >>>>>>> final class, so neither of these can be extended/customized. For a > >>>>>> given > >>>>>>> Windows then there would only be three (non-overlapping) > >>>>>> possibilities: > >>>>>>> either it's TimeWindows, SlidingWindows, or a custom Windows. I > >> don't > >>>>>>> think there's any problem with determining what the user wants in > >>>>>> this case > >>>>>>> -- > >>>>>>> we would just check if it's a SlidingWindows and connect the new > >>>>>> processor, > >>>>>>> or else connect the existing hopping/tumbling window processor. > >>>>>>> > >>>>>>> I'll admit that last sentence does leave a bad taste in my mouth. > >>>>>> Part of > >>>>>>> that > >>>>>>> is probably the "leaking" API Matthias pointed out; we just assume > >> the > >>>>>>> hopping/tumbling window implementation fits all custom windows. But > >> I > >>>>>> guess > >>>>>>> if you really needed to customize the algorithm any further you may > >>>>>> as well > >>>>>>> stick in a transformer and do it all yourself. > >>>>>>> > >>>>>>> Anyways, given what we have, it does seem weird to apply one > >> algorithm > >>>>>>> for most Windows types and then swap in a different one for one > >>>>>> specific > >>>>>>> extension of Windows. So adding a new #windowedBy(SlidingWindows) > >>>>>>> sounds reasonable to me. > >>>>>>> > >>>>>>> I'm still not convinced that we need a whole new > TimeWindowedKStream > >>>>>>> equivalent class for sliding windows though. It seems like the > >>>>>>> hopping/tumbling > >>>>>>> window implementation could benefit just as much from a subtractor > >> as > >>>>>> the > >>>>>>> sliding windows so the only future-proofing we get is the ability > to > >>>>>> be > >>>>>>> lazy and > >>>>>>> add the subtractor to one but not the other. Of course it would > only > >>>>>> be an > >>>>>>> optimization so we could just not apply it to one type and nothing > >>>>>> would > >>>>>>> break. > >>>>>>> It does make me nervous to go against the "future-proof" direction, > >>>>>> though. > >>>>>>> Are there any other examples of things we might want to add to one > >>>>>> window > >>>>>>> type but not to another? > >>>>>>> > >>>>>>> On another note, this actually brings up a new question: should > >>>>>>> SlidingWindows > >>>>>>> also be final? My take is "yes" since there's no reasonable > >>>>>> customization of > >>>>>>> sliding windows, at least not that I can think of. Thoughts? > >>>>>>> > >>>>>>> > >>>>>>> On Wed, Jul 22, 2020 at 7:15 PM John Roesler <vvcep...@apache.org> > >>>>>> wrote: > >>>>>>> > >>>>>>>> Thanks, all, > >>>>>>>> > >>>>>>>> I can see how my conclusion was kind of a leap. > >>>>>>>> > >>>>>>>> What Matthias said is indeed what I was thinking. When you > >> provide a > >>>>>>>> window definition to the windowBy() method, you are selecting an > >>>>>> algorithm > >>>>>>>> that will be used to compute the windows from the input data. > >>>>>>>> > >>>>>>>> I didn’t mean the code implementation “algorithm”, but the > >>>>>> high-level > >>>>>>>> algorithm that describes how the input stream will be transformed > >>>>>> into a > >>>>>>>> sequence of windows. > >>>>>>>> > >>>>>>>> For example, the algorithm for Windows is something like “given > >> the > >>>>>> record > >>>>>>>> timestamp, include the record in each of the enumerated windows”. > >>>>>> Note that > >>>>>>>> there can be a lot of variation in how the windows are enumerated, > >>>>>> which is > >>>>>>>> why there are at least a couple of implementations of Windows, and > >>>>>> we are > >>>>>>>> open to adding more (like for natural calendar boundaries). > >>>>>>>> > >>>>>>>> For SessionWindows, it’s more like “if any window is within the > >> gap, > >>>>>>>> extend its boundaries to include this record and if two windows > >>>>>> touch, then > >>>>>>>> merge them”. > >>>>>>>> > >>>>>>>> Clearly, the algorithm for SlidingWindows doesn’t fall into either > >>>>>>>> category, so it seems inappropriate to claim that it does in the > >>>>>> API (by > >>>>>>>> implementing Windows with stubbed methods) and then cast > >> internally > >>>>>> to > >>>>>>>> execute a completely different algorithm. > >>>>>>>> > >>>>>>>> To your other observation, that the DSL object resulting from > >>>>>> windowBy > >>>>>>>> would look the same for Windows and SessionWindows, maybe it makes > >>>>>> sense > >>>>>>>> for windowBy(SessionWindows) also to return a TimeWindowedKStream. > >>>>>>>> > >>>>>>>> i.e.: > >>>>>>>> ======================= > >>>>>>>> <W extends Window> TimeWindowedKStream<K, V> windowedBy(final > >>>>>> Windows<W> > >>>>>>>> windows); > >>>>>>>> TimeWindowedKStream<K, V> windowedBy(final SlidingWindows > >> windows); > >>>>>>>> ======================= > >>>>>>>> > >>>>>>>> I can’t think of a reason this wouldn’t work. But then again, it > >>>>>> would be > >>>>>>>> more future-proof to go ahead and specify a different return type > >>>>>> now, if > >>>>>>>> we think we'll want to add subtractors and stuff later. I don't > >>>>>> have a > >>>>>>>> strong feeling about that part of the API. It seems to be > >>>>>> independent of > >>>>>>>> whether SlidingWindows extends Windows or not. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> -John > >>>>>>>> > >>>>>>>> On Wed, Jul 22, 2020, at 19:41, Matthias J. Sax wrote: > >>>>>>>>> I think what John tries to say is the following: > >>>>>>>>> > >>>>>>>>> We have `windowedBy(Windows)` that accept hopping/tumbling > >>>>>> windows but > >>>>>>>>> also custom window and we use a specific algorithm. Note, that > >>>>>> custom > >>>>>>>>> windows must "work" based on the used algorithm. > >>>>>>>>> > >>>>>>>>> For session windows we have `windowedBy(SessionWindows)` and > >>>>>> apply a > >>>>>>>>> different algorithm. Users could pass in a custom > >>>>>> `SessionWindows` as > >>>>>>>>> long as the session algorithm works correctly for it. > >>>>>>>>> > >>>>>>>>> For the new sliding windows, we want to use a different > >> algorithm > >>>>>>>>> compare to hopping/tumbling windows. If we let sliding window > >>>>>> extend > >>>>>>>>> `Windows`, we can decide at runtime if we need to use the > >>>>>>>>> hopping/tumbling window algorithm for hopping/tumbling windows > >> or > >>>>>> the > >>>>>>>>> new sliding window algorithm for sliding windows. However, if we > >>>>>> get a > >>>>>>>>> custom window, which algorithm do we pick now? The existing > >>>>>>>>> tumbling/hopping window algorithm of the new sliding window > >>>>>> algorithm? > >>>>>>>>> Both a custom "time-window" and custom "sliding window" > >> implement > >>>>>> the > >>>>>>>>> generic `Windows` class and thus we cannot make a decision as we > >>>>>> don't > >>>>>>>>> know the user's intent. > >>>>>>>>> > >>>>>>>>> As a matter of fact, even if the user might not be aware of it, > >>>>>> the > >>>>>>>>> algorithm we use does already leak into the API (if a user > >> extends > >>>>>>>>> `Windows` is must work with our hopping/tumbling window > >> algorithm > >>>>>> and if > >>>>>>>>> a user extends `SessionWindows` it must work with our session > >>>>>> algorithm) > >>>>>>>>> and it seems we need to preserve this property for sliding > >> window. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -Matthias > >>>>>>>>> > >>>>>>>>> On 7/22/20 4:35 PM, Sophie Blee-Goldman wrote: > >>>>>>>>>> Hey John, > >>>>>>>>>> > >>>>>>>>>> Just a few follow-up questions/comments about the whole > >> Windows > >>>>>> thing: > >>>>>>>>>> > >>>>>>>>>> That's a good way of looking at things; in particular the > >> point > >>>>>> about > >>>>>>>>>> SessionWindows > >>>>>>>>>> for example requiring a Merger while other "statically > >>>>>> enumerable" > >>>>>>>> windows > >>>>>>>>>> require > >>>>>>>>>> only an adder seems to touch on the heart of the matter. > >>>>>>>>>> > >>>>>>>>>> It seems like what Time and Universal (and any other Windows > >>>>>>>>>>> implementation) have in common is that the windows are > >>>>>> statically > >>>>>>>>>>> enumerable. > >>>>>>>>>>> As a consequence, they can all rely on an aggregation > >>>>>> maintenence > >>>>>>>> algorithm > >>>>>>>>>>> that involves enumerating each of the windows and updating > >> it. > >>>>>> That > >>>>>>>>>>> also means that their DSL object (TimeWindowedKStream) > >> doesn't > >>>>>> need > >>>>>>>>>>> "subtractors" or "mergers", but only "adders"; again, this > >> is a > >>>>>>>> consequence > >>>>>>>>>>> of the fact that the windows are enumerable. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Given that, I'm a bit confused why you conclude that sliding > >>>>>> windows > >>>>>>>> are > >>>>>>>>>> fundamentally > >>>>>>>>>> different from the "statically enumerable" windows -- sliding > >>>>>> windows > >>>>>>>>>> require only an > >>>>>>>>>> adder too. I'm not sure it's a consequence of being > >> enumerable, > >>>>>> or that > >>>>>>>>>> being enumerable > >>>>>>>>>> is the fundamental property that unites all Windows (ignoring > >>>>>>>> JoinWindows > >>>>>>>>>> here). Yes, it > >>>>>>>>>> currently does apply to all Windows implementations, but we > >>>>>> shouldn't > >>>>>>>>>> assume that it > >>>>>>>>>> *has *to be that way on the basis that it currently happens to > >>>>>> be. > >>>>>>>>>> > >>>>>>>>>> Also, the fact that they can all rely on the same aggregation > >>>>>> algorithm > >>>>>>>>>> seems like an > >>>>>>>>>> implementation detail and it would be weird to force a > >>>>>> separate/new > >>>>>>>> DSL API > >>>>>>>>>> just because > >>>>>>>>>> under the covers we swap in a different processor. > >>>>>>>>>> > >>>>>>>>>> To be fair, I don't think there's a strong reason *against* > >> not > >>>>>>>> extending > >>>>>>>>>> Windows -- in the end > >>>>>>>>>> it will just mean adding a new #windowedBy method and > >>>>>> copy/pasting > >>>>>>>>>> everything from > >>>>>>>>>> TimeWindowedKStream pretty much word for word. But anytime > >> you > >>>>>> find > >>>>>>>>>> yourself > >>>>>>>>>> copying over code almost exactly, there should probably be a > >>>>>> good > >>>>>>>> reason > >>>>>>>>>> why :) > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Wed, Jul 22, 2020 at 3:48 PM John Roesler < > >>>>>> vvcep...@apache.org> > >>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Thanks Leah! > >>>>>>>>>>> > >>>>>>>>>>> 5) Regarding the empty windows, I'm wondering if we should > >>>>>> simply > >>>>>>>> propose > >>>>>>>>>>> that the windows should not be emitted downstream of the > >>>>>> operator or > >>>>>>>>>>> visible in IQ. Then, it'll be up to the implementation to > >> make > >>>>>> it > >>>>>>>> happen. > >>>>>>>>>>> I'm > >>>>>>>>>>> personally not concerned about it, since it seems like there > >>>>>> are > >>>>>>>> multiple > >>>>>>>>>>> ways to accomplish this. > >>>>>>>>>>> > >>>>>>>>>>> Note, the discrepancy Matthias pointed out is actually a > >>>>>> design bug. > >>>>>>>> The > >>>>>>>>>>> windowed aggregation (like every operation in Streams) > >>>>>> produces a > >>>>>>>> "view", > >>>>>>>>>>> which then forms the basis of downstream operations. When we > >>>>>> pass the > >>>>>>>>>>> Materialized option to the operation, all we're doing is > >>>>>> saying to > >>>>>>>>>>> "materialize" > >>>>>>>>>>> the view (aka, actually store the computed view) and also > >> make > >>>>>> it > >>>>>>>>>>> queriable. > >>>>>>>>>>> It would be illegal for the "queriable, materialized view" to > >>>>>> differ > >>>>>>>> in > >>>>>>>>>>> any way > >>>>>>>>>>> from the "view". So, it seems we must either propose to emit > >>>>>> the empty > >>>>>>>>>>> windows AND make them visible in IQ, or propose NOT to emit > >>>>>> the empty > >>>>>>>>>>> windows AND NOT make them visible in IQ. > >>>>>>>>>>> > >>>>>>>>>>> 7) Regarding whether we can extend TimeWindows (or Windows): > >>>>>>>>>>> I've been mulling this over more. I think it's worth asking > >> the > >>>>>>>> question of > >>>>>>>>>>> what these classes even mean. For example, why is > >>>>>> SessionWindows a > >>>>>>>>>>> different thing from TimeWindows and UniversalWindows (which > >>>>>> are both > >>>>>>>>>>> Windows)? > >>>>>>>>>>> > >>>>>>>>>>> This conversation is extra complicated because of the > >>>>>> incomplete and > >>>>>>>>>>> mis-matched class hierarchy, but we can try to look past it > >>>>>> for now. > >>>>>>>>>>> > >>>>>>>>>>> It seems like what Time and Universal (and any other Windows > >>>>>>>>>>> implementation) have in common is that the windows are > >>>>>> statically > >>>>>>>>>>> enumerable. > >>>>>>>>>>> As a consequence, they can all rely on an aggregation > >>>>>> maintenence > >>>>>>>> algorithm > >>>>>>>>>>> that involves enumerating each of the windows and updating > >> it. > >>>>>> That > >>>>>>>>>>> also means that their DSL object (TimeWindowedKStream) > >> doesn't > >>>>>> need > >>>>>>>>>>> "subtractors" or "mergers", but only "adders"; again, this > >> is a > >>>>>>>> consequence > >>>>>>>>>>> of the fact that the windows are enumerable. > >>>>>>>>>>> > >>>>>>>>>>> In contrast, session windows are data driven, so they are not > >>>>>>>> statically > >>>>>>>>>>> enumerable. Their algorithm has to rely on scans, and to do > >>>>>> the scans, > >>>>>>>>>>> it needs to know the "inactivity gap", which needs to be part > >>>>>> of the > >>>>>>>> window > >>>>>>>>>>> definition. Likewise, session windows have the property that > >>>>>> they need > >>>>>>>>>>> to be merged, so their DSL object also requires mergers. > >>>>>>>>>>> > >>>>>>>>>>> It really seems like your new window definition doesn't fit > >>>>>> into > >>>>>>>> either > >>>>>>>>>>> category. It uses a different algorithm, which relies on > >>>>>> scans, but > >>>>>>>> it is > >>>>>>>>>>> also fixed in size, so it doesn't need mergers. In this > >>>>>> situation, it > >>>>>>>> seems > >>>>>>>>>>> like the safe bet is to just create SessionWindows with no > >>>>>> interface > >>>>>>>> and > >>>>>>>>>>> add a separate set of DSL operations and objects. It's a > >>>>>> little extra > >>>>>>>> code, > >>>>>>>>>>> but it seems to keep everything tidier and more > >>>>>> comprehensible, both > >>>>>>>>>>> for us and for users. > >>>>>>>>>>> > >>>>>>>>>>> What do you think? > >>>>>>>>>>> -John > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Wed, Jul 22, 2020, at 10:30, Leah Thomas wrote: > >>>>>>>>>>>> Hi Matthias, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for the suggestions, I've updated the KIP and child > >>>>>> page > >>>>>>>>>>> accordingly > >>>>>>>>>>>> and addressed some below. > >>>>>>>>>>>> > >>>>>>>>>>>> 1) For the mandatory grace period, we should use a static > >>>>>> builder > >>>>>>>> method > >>>>>>>>>>>>> that take two parameters. > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> That makes sense, I've changed that in the public API. > >>>>>>>>>>>> > >>>>>>>>>>>> Btw: this implementation actually raises an issue for IQ: > >>>>>> those empty > >>>>>>>>>>>>> windows would be returned. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> This is a great point, with the current implementation plan > >>>>>> empty > >>>>>>>> windows > >>>>>>>>>>>> would be returned. I think creating a second window store > >>>>>> would > >>>>>>>>>>> definitely > >>>>>>>>>>>> work, but there would be more overhead in having two stores > >>>>>> and > >>>>>>>> switching > >>>>>>>>>>>> windows between the stores, as well as doing scans in both > >>>>>> stores to > >>>>>>>> find > >>>>>>>>>>>> existing windows. There might be a way to do avoid emitting > >>>>>> empty > >>>>>>>> windows > >>>>>>>>>>>> without creating a second window store, I'll look more into > >>>>>> it. > >>>>>>>>>>>> > >>>>>>>>>>>> Cheers, > >>>>>>>>>>>> Leah > >>>>>>>>>>>> > >>>>>>>>>>>> On Tue, Jul 21, 2020 at 1:25 PM Matthias J. Sax < > >>>>>> mj...@apache.org> > >>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for updating the KIP. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Couple of follow up comments: > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1) For the mandatory grace period, we should use a static > >>>>>> builder > >>>>>>>>>>> method > >>>>>>>>>>>>> that take two parameters. This provides a better API as > >> user > >>>>>> cannot > >>>>>>>>>>>>> forget to set the grace period. Throwing a runtime > >> exception > >>>>>> seems > >>>>>>>> not > >>>>>>>>>>>>> to be the best way to handle this case. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2) In Fig.2 you list 10 hopping windows. I believe it > >> should > >>>>>>>> actually > >>>>>>>>>>> be > >>>>>>>>>>>>> more? There first hopping window would be [-6,-4[ and the > >>>>>> last one > >>>>>>>>>>> would > >>>>>>>>>>>>> be from [19,29[ -- hence, the cost saving are actually much > >>>>>> higher. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> 3a) IQ: you are saying that the user need to compute the > >>>>>> start time > >>>>>>>> as > >>>>>>>>>>>>> > >>>>>>>>>>>>>> windowSize+the time they're looking at > >>>>>>>>>>>>> > >>>>>>>>>>>>> Should this be "targetTime - windowSize" instead? > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> 3b) IQ: in you example you say "window size of 10 minutes" > >>>>>> with an > >>>>>>>>>>>>> incident at 9:15. > >>>>>>>>>>>>> > >>>>>>>>>>>>>> they're looking for a window with the start time of 8:15. > >>>>>>>>>>>>> > >>>>>>>>>>>>> The example does not seem to add up? > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> 4) For "Processing Windows": you describe a three step > >>>>>> approach: I > >>>>>>>> just > >>>>>>>>>>>>> want to point out, that step (1) is not necessary for each > >>>>>> input > >>>>>>>>>>> record, > >>>>>>>>>>>>> because timestamps are not guaranteed to be unique and > >> thus a > >>>>>>>> previous > >>>>>>>>>>>>> record with the same key and timestamp might have create > >> the > >>>>>> windows > >>>>>>>>>>>>> already. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Nit: I am also not exactly sure what you mean by step (3) > >> as > >>>>>> you use > >>>>>>>>>>> the > >>>>>>>>>>>>> word "send". I guess you mean "put"? > >>>>>>>>>>>>> > >>>>>>>>>>>>> It seem there are actually more details in the sub-page: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> A new record for SlidingWindows will always create two new > >>>>>> windows. > >>>>>>>>>>> If > >>>>>>>>>>>>> either of those windows already exist in the windows store, > >>>>>> their > >>>>>>>>>>>>> aggregation will simply be updated to include the new > >>>>>> record, but no > >>>>>>>>>>>>> duplicate window will be added to the WindowStore. > >>>>>>>>>>>>> > >>>>>>>>>>>>> However, the first and second sentence contradict each > >> other > >>>>>> a > >>>>>>>> little > >>>>>>>>>>>>> bit. I think the first sentence is not correct. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Nit: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> For in-order records, the left window will always be > >> empty. > >>>>>>>>>>>>> > >>>>>>>>>>>>> This should be "right window" ? > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> 5) "Emitting Results": it might be worth to point out, > >> that a > >>>>>>>>>>>>> second/future window of a new record is create with no > >>>>>> records, and > >>>>>>>>>>>>> thus, even if it's initialized it won't be emitted. Only > >> if a > >>>>>>>>>>>>> consecutive record falls into the window, the window would > >> be > >>>>>>>> updates > >>>>>>>>>>>>> and the window result (for a window content of one record) > >>>>>> would be > >>>>>>>>>>> sent > >>>>>>>>>>>>> downstream. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Again, the sub-page contains this details. Might still be > >>>>>> worth to > >>>>>>>> add > >>>>>>>>>>>>> to the top level page, too. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Btw: this implementation actually raises an issue for IQ: > >>>>>> those > >>>>>>>> empty > >>>>>>>>>>>>> windows would be returned. Thus I am wondering if we need > >> to > >>>>>> use two > >>>>>>>>>>>>> stores internally? One store for actual windows and one > >>>>>> store for > >>>>>>>> empty > >>>>>>>>>>>>> windows? If an empty window is updated, it's move to the > >>>>>> other > >>>>>>>> store? > >>>>>>>>>>>>> For IQ, we only allow to query the non-empty-window store? > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> 6) On the sub-page: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> The left window of in-order records and both windows for > >>>>>>>> out-of-order > >>>>>>>>>>>>> records need to be updated with the values of records that > >>>>>> have > >>>>>>>> already > >>>>>>>>>>>>> been processed. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Why "both windows for out-of-order records"? IMHO, we don't > >>>>>> know how > >>>>>>>>>>>>> many existing windows needs to be updated when processing > >> an > >>>>>>>>>>>>> out-of-order record. Of course, an out-of-order record > >> could > >>>>>> not > >>>>>>>> fall > >>>>>>>>>>>>> into any existing window but create two new windows, too. > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Because each record creates one new window that includes > >>>>>> itself > >>>>>>>> and > >>>>>>>>>>> one > >>>>>>>>>>>>> window that does not > >>>>>>>>>>>>> > >>>>>>>>>>>>> As state above, this does not seem to hold. I understand > >> why > >>>>>> you > >>>>>>>> mean, > >>>>>>>>>>>>> but it would be good to be exact. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Figure 2: You use the term "late" but you mean > >>>>>> "out-of-order" I > >>>>>>>> guess > >>>>>>>>>>> -- > >>>>>>>>>>>>> a record is _late_ if it's not processed any longer as the > >>>>>> grace > >>>>>>>> period > >>>>>>>>>>>>> passed already. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Figure 2: "Late" should be out-or-order. The example text > >>>>>> say a > >>>>>>>> window > >>>>>>>>>>>>> [16,26] should be created but the figure shows the green > >>>>>> window as > >>>>>>>>>>> [15,20]. > >>>>>>>>>>>>> > >>>>>>>>>>>>> About the blue window: maybe add not that the blue window > >>>>>> contains > >>>>>>>> the > >>>>>>>>>>>>> aggregate we need for the green window, _before_ the new > >>>>>> record `a` > >>>>>>>> is > >>>>>>>>>>>>> added to the blue window. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> 7) I am not really happy to extend TimeWindows and I think > >>>>>> the > >>>>>>>> argument > >>>>>>>>>>>>> about JoinWindows is not the best (IMHO, JoinWindows do it > >>>>>> already > >>>>>>>>>>> wrong > >>>>>>>>>>>>> and we just repeat the same mistake). However, it seems our > >>>>>> window > >>>>>>>>>>>>> hierarchy is "broken" already and it might be out of scope > >>>>>> for this > >>>>>>>> KIP > >>>>>>>>>>>>> to fix it. Hence, I am ok that we bite the bullet for now > >>>>>> and clean > >>>>>>>> it > >>>>>>>>>>>>> up later. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On 7/20/20 5:18 PM, Guozhang Wang wrote: > >>>>>>>>>>>>>> Hi Leah, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for the updated KIP. I agree that extending > >>>>>> SlidingWindows > >>>>>>>>>>> from > >>>>>>>>>>>>>> Windows is fine for the sake of not introducing more > >> public > >>>>>> APIs > >>>>>>>> (and > >>>>>>>>>>>>> their > >>>>>>>>>>>>>> internal xxxImpl classes), and its cons is small enough to > >>>>>> tolerate > >>>>>>>>>>> to > >>>>>>>>>>>>> me. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Mon, Jul 20, 2020 at 1:49 PM Leah Thomas < > >>>>>> ltho...@confluent.io> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks for the feedback on the KIP. I've updated the KIP > >>>>>> page > >>>>>>>>>>>>>>> < > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> to address these points and have created a child page > >>>>>>>>>>>>>>> < > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450 > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> to go more in depth on certain implementation details. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> *Grace Period:* > >>>>>>>>>>>>>>> I think Sophie raises a good point that the default grace > >>>>>> period > >>>>>>>> of > >>>>>>>>>>> 24 > >>>>>>>>>>>>>>> hours is often too long and was chosen when retention > >> time > >>>>>> and > >>>>>>>> grace > >>>>>>>>>>>>> period > >>>>>>>>>>>>>>> were the same. For SlidingWindows, I propose we make the > >>>>>> grace > >>>>>>>>>>> period > >>>>>>>>>>>>>>> mandatory. To keep formatting consistent with other types > >>>>>> of > >>>>>>>>>>> windows, > >>>>>>>>>>>>> grace > >>>>>>>>>>>>>>> period won't be an additional parameter in the #of > >> method, > >>>>>> but > >>>>>>>> will > >>>>>>>>>>>>> still > >>>>>>>>>>>>>>> look like it does in other use cases: > >>>>>>>>>>>>>>> > >>>>>> .windowedBy(SlidingWindows.of(twentySeconds).grace(fiftySeconds). > >>>>>>>> If > >>>>>>>>>>>>> grace > >>>>>>>>>>>>>>> period isn't properly initialized, an error will be > >> thrown > >>>>>> through > >>>>>>>>>>> the > >>>>>>>>>>>>>>> process method. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> *Storage Layer + Aggregation:* > >>>>>>>>>>>>>>> SlidingWindows will use a WindowStore because computation > >>>>>> can be > >>>>>>>>>>> done > >>>>>>>>>>>>> with > >>>>>>>>>>>>>>> the information stored in a WindowStore (window timestamp > >>>>>> and > >>>>>>>>>>> value). > >>>>>>>>>>>>> Using > >>>>>>>>>>>>>>> the WindowStore also simplifies computation as > >>>>>> SlidingWindows can > >>>>>>>>>>>>> leverage > >>>>>>>>>>>>>>> existing processes. Because we are using a WindowStore, > >> the > >>>>>>>>>>> aggregation > >>>>>>>>>>>>>>> process will be similar to that of a hopping window. As > >>>>>> records > >>>>>>>>>>> come in > >>>>>>>>>>>>>>> their value is added to the aggregation that already > >>>>>> exists, > >>>>>>>>>>> following > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>> same procedure as hopping windows. The aggregation > >>>>>> difference > >>>>>>>>>>> between > >>>>>>>>>>>>>>> SlidingWindows and HoppingWindows comes in creating new > >>>>>> windows > >>>>>>>> for > >>>>>>>>>>> a > >>>>>>>>>>>>>>> SlidingWindow, where you need to find the existing > >> records > >>>>>> that > >>>>>>>>>>> belong > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>> the new window. This computation is similar to the > >>>>>> aggregation in > >>>>>>>>>>>>>>> SessionWindows and requires a scan to the WindowStore to > >>>>>> find the > >>>>>>>>>>> window > >>>>>>>>>>>>>>> with the aggregation needed, which will always be > >>>>>> pre-computed. > >>>>>>>> The > >>>>>>>>>>> scan > >>>>>>>>>>>>>>> requires creating an iterator, but should have minimal > >>>>>> performance > >>>>>>>>>>>>> effects > >>>>>>>>>>>>>>> as this strategy is already implemented in > >> SessionWindows. > >>>>>> More > >>>>>>>>>>> details > >>>>>>>>>>>>> on > >>>>>>>>>>>>>>> finding the aggregation that needs to be put in a new > >>>>>> window can > >>>>>>>> be > >>>>>>>>>>>>> found > >>>>>>>>>>>>>>> on the implementation page > >>>>>>>>>>>>>>> < > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450 > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> . > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> *Extending Windows<TimeWindow>, Windows<Window> or > >> nothing* > >>>>>>>>>>>>>>> Because SlidingWindows are still defined by a windowSize > >>>>>> (whereas > >>>>>>>>>>>>>>> SessionWindows are defined purely by data), I think it > >>>>>> makes sense > >>>>>>>>>>> to > >>>>>>>>>>>>>>> leverage the existing Window processes instead of > >> creating > >>>>>> a new > >>>>>>>>>>> store > >>>>>>>>>>>>> type > >>>>>>>>>>>>>>> that would be very similar to the WindowStore. While it's > >>>>>> true > >>>>>>>> that > >>>>>>>>>>> the > >>>>>>>>>>>>>>> #windowsFor method isn't necessary for SlidingWindows, > >>>>>> JoinWindows > >>>>>>>>>>> also > >>>>>>>>>>>>>>> extends Windows<Window> and throws an > >>>>>>>> UnsupportedOperationException > >>>>>>>>>>> in > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>> #windowsFor method, which is what SlidingWindows can do. > >>>>>> The > >>>>>>>>>>> difference > >>>>>>>>>>>>>>> between extending Windows<TimeWindow> or Windows<Window> > >> is > >>>>>>>>>>> minimal, as > >>>>>>>>>>>>>>> both are ways to pass window parameters. Extending > >>>>>>>>>>> Windows<TimeWindow> > >>>>>>>>>>>>> will > >>>>>>>>>>>>>>> give us more leverage in utilizing existing processes. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> *Emit Strategy* > >>>>>>>>>>>>>>> I would argue that emitting for every update is still the > >>>>>> best way > >>>>>>>>>>> to go > >>>>>>>>>>>>>>> for SlidingWindows because it mimics the other types of > >>>>>> windows, > >>>>>>>> and > >>>>>>>>>>>>>>> suppression can be leveraged to limit what SlidingWindows > >>>>>> emits. > >>>>>>>>>>> While > >>>>>>>>>>>>> some > >>>>>>>>>>>>>>> users may only want to see the last value, others may > >> want > >>>>>> to see > >>>>>>>>>>> more, > >>>>>>>>>>>>> and > >>>>>>>>>>>>>>> leaving the emit strategy to emit partial results allows > >>>>>> both > >>>>>>>> users > >>>>>>>>>>> to > >>>>>>>>>>>>>>> access what they want. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> *Additional Features* > >>>>>>>>>>>>>>> Supporting sliding windows inherently, and shifting > >>>>>> inefficient > >>>>>>>>>>> hopping > >>>>>>>>>>>>>>> windows to sliding windows, is an interesting idea and > >>>>>> could be > >>>>>>>>>>> built on > >>>>>>>>>>>>>>> top of SlidingWindows when they are finished, but right > >>>>>> now seems > >>>>>>>>>>> out of > >>>>>>>>>>>>>>> scope for the needs of this KIP. Similarly, including a > >>>>>>>>>>> `subtraction` > >>>>>>>>>>>>>>> feature could have performance improvements, but doesn't > >>>>>> seem > >>>>>>>>>>> necessary > >>>>>>>>>>>>> for > >>>>>>>>>>>>>>> the implementation of this KIP. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Let me know what you think of the updates, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Leah > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Thu, Jul 16, 2020 at 11:57 AM John Roesler < > >>>>>>>> vvcep...@apache.org> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hello all, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks for the KIP, Leah! > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Regarding (1): I'd go farther actually. Making Windows > >> an > >>>>>>>> abstract > >>>>>>>>>>>>>>>> class was a mistake from the beginning that led to us > >> not > >>>>>> being > >>>>>>>>>>>>>>>> able to fix a very confusing situation for users around > >>>>>> retention > >>>>>>>>>>>>> times, > >>>>>>>>>>>>>>>> final results emitting, etc. Thus, I would not suggest > >>>>>> extending > >>>>>>>>>>>>>>>> TimeWindows for sure, but would also not suggest > >> extending > >>>>>>>> Windows. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> The very simplest thing to do is follow the example of > >>>>>>>>>>> SessionWindows, > >>>>>>>>>>>>>>>> which is just a completely self-contained class. If we > >>>>>> don't mess > >>>>>>>>>>> with > >>>>>>>>>>>>>>>> class inheritance, we won't ever have any of the > >> problems > >>>>>> related > >>>>>>>>>>> to > >>>>>>>>>>>>>>>> class inheritance. This is my preferred solution. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Still, Sliding windows has a lot in common with > >>>>>> TimeWindows and > >>>>>>>>>>> other > >>>>>>>>>>>>>>>> fixed-size windows, namely that the windows are fixed in > >>>>>> size. If > >>>>>>>>>>> we > >>>>>>>>>>>>> want > >>>>>>>>>>>>>>>> to preserve the current two-part windowing API in which > >>>>>> you can > >>>>>>>>>>> window > >>>>>>>>>>>>>>>> by either "fixed" or "data driven" modes, I'd suggest we > >>>>>> avoid > >>>>>>>>>>>>> increasing > >>>>>>>>>>>>>>>> the blast radius of Windows by taking the opportunity to > >>>>>> replace > >>>>>>>> it > >>>>>>>>>>>>> with > >>>>>>>>>>>>>>>> a proper interface and implement that interface instead. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> For example: > >>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/9031 > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Then, SlidingWindows would just implement > >>>>>>>> FixedSizeWindowDefinition > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> ====== > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Regarding (2), it seems more straightforward as a user > >> of > >>>>>> Streams > >>>>>>>>>>>>>>>> to just have one mental model. _All_ of our aggregation > >>>>>>>> operations > >>>>>>>>>>>>>>>> follow an eager emission model, in which we just emit an > >>>>>> update > >>>>>>>>>>>>> whenever > >>>>>>>>>>>>>>>> an update is available. We already provided Suppression > >> to > >>>>>>>>>>> explicitly > >>>>>>>>>>>>>>> apply > >>>>>>>>>>>>>>>> different update semantics in the case it's required. > >> Why > >>>>>> should > >>>>>>>> we > >>>>>>>>>>>>>>> define > >>>>>>>>>>>>>>>> a snowflake operation with completely different > >> semantics > >>>>>> from > >>>>>>>>>>>>> everything > >>>>>>>>>>>>>>>> else? I.e., systems are generally easier to use when > >> they > >>>>>> follow > >>>>>>>> a > >>>>>>>>>>> few > >>>>>>>>>>>>>>>> simple, composable rules than when they have a lot of > >>>>>> different, > >>>>>>>>>>>>> specific > >>>>>>>>>>>>>>>> rules. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> ====== > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> New point: (4): > >>>>>>>>>>>>>>>> It would be nice to include some examples of user code > >>>>>> that would > >>>>>>>>>>> use > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> new API, which should include: > >>>>>>>>>>>>>>>> 1. using the DSL with the sliding window definition > >>>>>>>>>>>>>>>> 2. accessing the stored results of a sliding window > >>>>>> aggregation > >>>>>>>>>>> via IQ > >>>>>>>>>>>>>>>> 3. defining a custom processor to access sliding windows > >>>>>> in a > >>>>>>>> store > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> It generally helps reviewers wrap their heads around the > >>>>>>>> proposal, > >>>>>>>>>>> as > >>>>>>>>>>>>>>> well > >>>>>>>>>>>>>>>> as shaking out any design issues that would otherwise > >>>>>> only come > >>>>>>>> up > >>>>>>>>>>>>> during > >>>>>>>>>>>>>>>> implementation/testing/review. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks again for the awesome proposal! > >>>>>>>>>>>>>>>> -John > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Tue, Jul 14, 2020, at 12:31, Guozhang Wang wrote: > >>>>>>>>>>>>>>>>> Hello Leah, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks for the nice written KIP. A few thoughts: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 1) I echo the other reviewer's comments regarding the > >>>>>> typing: > >>>>>>>> why > >>>>>>>>>>>>>>>> extending > >>>>>>>>>>>>>>>>> TimeWindow instead of just extending Window? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 2) I also feel that emitting policy for this type of > >>>>>> windowing > >>>>>>>>>>>>>>>> aggregation > >>>>>>>>>>>>>>>>> may be different from the existing ones. Existing > >>>>>> emitting > >>>>>>>> policy > >>>>>>>>>>> is > >>>>>>>>>>>>>>> very > >>>>>>>>>>>>>>>>> simple: emit every time when window get updates, and > >>>>>> emit every > >>>>>>>>>>> time > >>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>> out-of-ordering data within grace period, this is > >>>>>> because for > >>>>>>>>>>>>>>>> time-windows > >>>>>>>>>>>>>>>>> the window close time is strictly depend on the window > >>>>>> start > >>>>>>>> time > >>>>>>>>>>>>> which > >>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>> fixed, while for session-windows although the window > >>>>>> open/close > >>>>>>>>>>> time > >>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>> also data-dependent it is relatively infrequent > >> compared > >>>>>> to the > >>>>>>>>>>>>>>>>> sliding-windows. For this KIP, since each new data > >> would > >>>>>> cause a > >>>>>>>>>>>>>>>>> new sliding-window, the num. windows maintained > >>>>>> logically could > >>>>>>>> be > >>>>>>>>>>>>> much > >>>>>>>>>>>>>>>>> larger and hence emitting on each update may be too > >>>>>> aggressive. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 3) Although KIP itself should be focusing on user face > >>>>>>>>>>> interfaces, I'd > >>>>>>>>>>>>>>>>> suggest we create a children page of KIP-450 discussing > >>>>>> about > >>>>>>>> its > >>>>>>>>>>>>>>>>> implementations as well, since some of that may drive > >> the > >>>>>>>>>>> interface > >>>>>>>>>>>>>>>> design. > >>>>>>>>>>>>>>>>> E.g. personally I think having a combiner interface in > >>>>>> addition > >>>>>>>> to > >>>>>>>>>>>>>>>>> aggregator would be useful but that's based on my > >> 2cents > >>>>>> about > >>>>>>>> the > >>>>>>>>>>>>>>>>> implementation design (I once created a child page > >>>>>> describing > >>>>>>>> it: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation > >>>>>>>>>>>>>>>>> ). > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Tue, Jul 14, 2020 at 5:31 AM Bruno Cadonna < > >>>>>>>> br...@confluent.io > >>>>>>>>>>>> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi Leah, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thank you for the KIP! > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Here is my feedback: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 1. The KIP would benefit from some code examples that > >>>>>> show how > >>>>>>>>>>> to use > >>>>>>>>>>>>>>>>>> sliding windows in aggregations. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 2. The different sliding windows in Figure 1 and 2 are > >>>>>> really > >>>>>>>>>>> hard to > >>>>>>>>>>>>>>>>>> distinguish. Could you please try to make them > >>>>>> graphically > >>>>>>>> better > >>>>>>>>>>>>>>>>>> distinguishable? You could try to draw the frames of > >>>>>>>> consecutive > >>>>>>>>>>>>>>>>>> windows shifted to each other. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 3. I agree with Matthias, that extending > >>>>>> Windows<TimeWindow> > >>>>>>>>>>> does not > >>>>>>>>>>>>>>>>>> seem to be the best approach. What would be the result > >>>>>> of > >>>>>>>>>>>>>>>>>> windowsFor()? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 4. In the section "Public Interfaces" you should > >> remove > >>>>>>>>>>>>>>> implementation > >>>>>>>>>>>>>>>>>> details like private constructors and private fields. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 5. Do we need a new store interface or can we use > >>>>>> WindowStore? > >>>>>>>>>>> Some > >>>>>>>>>>>>>>>>>> words about that would be informative. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 6. @Matthias, if the subtrator is not strictly needed, > >>>>>> I would > >>>>>>>>>>> skip > >>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>> for now and add it later. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 7. I also agree that having a section that describes > >>>>>> how to > >>>>>>>>>>> handle > >>>>>>>>>>>>>>>>>> out-of-order records would be good to understand what > >>>>>> is still > >>>>>>>>>>>>>>> missing > >>>>>>>>>>>>>>>>>> and what we can reuse. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>> Bruno > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Sat, Jul 11, 2020 at 9:16 PM Matthias J. Sax < > >>>>>>>>>>> mj...@apache.org> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Leah, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> thanks for your update. However, it does not > >>>>>> completely answer > >>>>>>>>>>> my > >>>>>>>>>>>>>>>>>> question. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> In our current window implementations, we emit a > >>>>>> window result > >>>>>>>>>>>>>>> update > >>>>>>>>>>>>>>>>>>> record (ie, early/partial result) for each input > >>>>>> record. When > >>>>>>>> an > >>>>>>>>>>>>>>>>>>> out-of-order record arrives, we just update to > >>>>>> corresponding > >>>>>>>> old > >>>>>>>>>>>>>>>> window > >>>>>>>>>>>>>>>>>>> and emit another update. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> It's unclear from the KIP if you propose the same > >> emit > >>>>>>>>>>> strategy? -- > >>>>>>>>>>>>>>>> For > >>>>>>>>>>>>>>>>>>> sliding windows it might be worth to consider to use > >> a > >>>>>>>> different > >>>>>>>>>>>>>>> emit > >>>>>>>>>>>>>>>>>>> strategy and only support emitting the final result > >>>>>> only (ie, > >>>>>>>>>>> after > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> grace period passed)? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Boyang, also raises a good point that relates to my > >>>>>> point from > >>>>>>>>>>>>>>> above > >>>>>>>>>>>>>>>>>>> about pre-aggregations and storage layout. Our > >> current > >>>>>> time > >>>>>>>>>>> windows > >>>>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>> all pre-aggregated and stored in parallel. We can > >> also > >>>>>> lookup > >>>>>>>>>>>>>>> windows > >>>>>>>>>>>>>>>>>>> efficiently, as we can compute the windowed-key given > >>>>>> the > >>>>>>>> input > >>>>>>>>>>>>>>>> record > >>>>>>>>>>>>>>>>>>> key and timestamp based on the window definition. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> However, for sliding windows, window boundaries are > >>>>>> data > >>>>>>>>>>> dependent > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>> thus we cannot compute them upfront. Thus, how can we > >>>>>> "find" > >>>>>>>>>>>>>>> existing > >>>>>>>>>>>>>>>>>>> window efficiently? Furthermore, out-of-order data > >>>>>> would > >>>>>>>> create > >>>>>>>>>>> new > >>>>>>>>>>>>>>>>>>> windows in the past and we need to be able to handle > >>>>>> this > >>>>>>>> case. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thus, to handle out-of-order data correctly, we need > >>>>>> to store > >>>>>>>>>>> all > >>>>>>>>>>>>>>> raw > >>>>>>>>>>>>>>>>>>> input events. Additionally, we could also store > >>>>>> pre-aggregated > >>>>>>>>>>>>>>>> results > >>>>>>>>>>>>>>>>>>> if we thinks it's benfitial. -- If we apply "emit > >> only > >>>>>> final > >>>>>>>>>>>>>>> results" > >>>>>>>>>>>>>>>>>>> strategy, storing pre-aggregated result would not be > >>>>>> necessary > >>>>>>>>>>>>>>>> though. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Btw: for sliding windows it might also be useful to > >>>>>> consider > >>>>>>>>>>>>>>> allowing > >>>>>>>>>>>>>>>>>>> users to supply a `Subtractor` -- this subtractor > >>>>>> could be > >>>>>>>>>>> applied > >>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>> the current window result (in case we store it) if a > >>>>>> record > >>>>>>>>>>> drops > >>>>>>>>>>>>>>>> out of > >>>>>>>>>>>>>>>>>>> the window. Of course, not all aggregation functions > >>>>>> are > >>>>>>>>>>>>>>> subtractable > >>>>>>>>>>>>>>>>>>> and we can consider this as a follow up task, too, > >> and > >>>>>> not > >>>>>>>>>>> include > >>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>> this KIP for now. Thoughts? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I was also thinking about the type hierarchy. I am > >> not > >>>>>> sure if > >>>>>>>>>>>>>>>> extending > >>>>>>>>>>>>>>>>>>> TimeWindow is the best approach? For TimeWindows, we > >>>>>> can > >>>>>>>>>>>>>>> pre-compute > >>>>>>>>>>>>>>>>>>> window boundaries (cf `windowsFor()`) while for a > >>>>>> sliding > >>>>>>>> window > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> boundaries are data dependent. Session windows are > >>>>>> also data > >>>>>>>>>>>>>>>> dependent > >>>>>>>>>>>>>>>>>>> and thus they don't inherit from TimeWindow (Maybe > >>>>>> check out > >>>>>>>> the > >>>>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>>>>> that added session windows? It could provides some > >> good > >>>>>>>>>>> insights.) > >>>>>>>>>>>>>>>> -- I > >>>>>>>>>>>>>>>>>>> believe the same rational applies to sliding windows? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On 7/10/20 12:47 PM, Boyang Chen wrote: > >>>>>>>>>>>>>>>>>>>> Thanks Leah and Sophie for the KIP. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 1. I'm a bit surprised that we don't have an advance > >>>>>> time. > >>>>>>>>>>> Could > >>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>> elaborate how the storage layer is structured? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 2. IIUC, there will be extra cost in terms of > >> fetching > >>>>>>>>>>>>>>> aggregation > >>>>>>>>>>>>>>>>>> results, > >>>>>>>>>>>>>>>>>>>> since we couldn't pre-aggregate until the user asks > >>>>>> for it. > >>>>>>>>>>> Would > >>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>> good > >>>>>>>>>>>>>>>>>>>> to also discuss it. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 3. We haven't discussed the possibility of > >> supporting > >>>>>> sliding > >>>>>>>>>>>>>>>> windows > >>>>>>>>>>>>>>>>>>>> inherently. For a user who actually uses a hopping > >>>>>> window, > >>>>>>>>>>>>>>> Streams > >>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>> detect such an inefficiency doing a > >>>>>> window_size/advance_time > >>>>>>>>>>>>>>> ratio > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>> reach > >>>>>>>>>>>>>>>>>>>> a conclusion on whether the write amplification is > >>>>>> too high > >>>>>>>>>>>>>>>> compared > >>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>> some configured threshold. The benefit of doing so > >> is > >>>>>> that > >>>>>>>>>>>>>>> existing > >>>>>>>>>>>>>>>>>> Streams > >>>>>>>>>>>>>>>>>>>> users don't need to change their code, learn a new > >>>>>> API, but > >>>>>>>>>>> only > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>> upgrade > >>>>>>>>>>>>>>>>>>>> Streams library to get benefits for their > >> inefficient > >>>>>> hopping > >>>>>>>>>>>>>>>> window > >>>>>>>>>>>>>>>>>>>> implementation. There might be some compatibility > >>>>>> issues for > >>>>>>>>>>>>>>> sure, > >>>>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>>>>>> worth listing them out for trade-off. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Boyang > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Fri, Jul 10, 2020 at 12:40 PM Leah Thomas < > >>>>>>>>>>>>>>> ltho...@confluent.io > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Hey Matthias, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks for pointing that out. I added the following > >>>>>> to the > >>>>>>>>>>>>>>> Propose > >>>>>>>>>>>>>>>>>> Changes > >>>>>>>>>>>>>>>>>>>>> section of the KIP: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> "Records that come out of order will be processed > >>>>>> the same > >>>>>>>> way > >>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>> in-order > >>>>>>>>>>>>>>>>>>>>> records, as long as they fall within the grace > >>>>>> period. Any > >>>>>>>> new > >>>>>>>>>>>>>>>> windows > >>>>>>>>>>>>>>>>>>>>> created by the late record will still be created, > >>>>>> and the > >>>>>>>>>>>>>>> existing > >>>>>>>>>>>>>>>>>> windows > >>>>>>>>>>>>>>>>>>>>> that are changed by the late record will be > >> updated. > >>>>>> Any > >>>>>>>>>>> record > >>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>> falls > >>>>>>>>>>>>>>>>>>>>> outside of the grace period (either user defined or > >>>>>> default) > >>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>> discarded. " > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> All the best, > >>>>>>>>>>>>>>>>>>>>> Leah > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On Thu, Jul 9, 2020 at 9:47 PM Matthias J. Sax < > >>>>>>>>>>>>>>> mj...@apache.org> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Leah, > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> thanks a lot for the KIP. Very well written. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> The KIP does not talk about the handling of > >>>>>> out-of-order > >>>>>>>> data > >>>>>>>>>>>>>>>> though. > >>>>>>>>>>>>>>>>>>>>>> How do you propose to address this? > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> On 7/8/20 5:33 PM, Leah Thomas wrote: > >>>>>>>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>>>>>>> I'd like to kick-off the discussion for KIP-450, > >>>>>> adding > >>>>>>>>>>>>>>> sliding > >>>>>>>>>>>>>>>>>> window > >>>>>>>>>>>>>>>>>>>>>>> aggregation support to Kafka Streams. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Let me know what you think, > >>>>>>>>>>>>>>>>>>>>>>> Leah > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Attachments: > >>>>>>>>> * signature.asc > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >> > > > >