Thanks for the update Leah -- I think that all makes sense. Cheers, Sophie
On Tue, Jul 28, 2020 at 3:55 PM Leah Thomas <ltho...@confluent.io> wrote: > Another minor tweak, instead of defining the window by the *size*, it will > be defined by *timeDifference*, which is the maximum time difference > between two events. This is a more precise way to define a window due to > its inclusive ends, while allowing the user to create the window they > expect. This definition fits with the current examples, where a record at > *10* would fall into a window of time difference *5* from *[5,10]*. This > window contains any records at 5, 6, 7, 8, 9, and 10, which is 6 instances > instead of 5. This semantic difference is why I've shifted *size* to > *timeDifference*. > > The new builder will be *withTimeDifferenceAndGrace*, keeping with other > conventions. > > Let me know if there are any concerns! The vote thread is open as well > here: > http://mail-archives.apache.org/mod_mbox/kafka-dev/202007.mbox/browser > > Best, > Leah > > On Mon, Jul 27, 2020 at 3:58 PM Leah Thomas <ltho...@confluent.io> wrote: > > > A small tweak - to make it more clear to users that grace is required, as > > well as cleaning up some of the confusing grammar semantics of windows, > the > > main builder method for *slidingWindows* will be *withSizeAndGrace* > instead > > of *of*. This looks like it'll be the last change (for now) on the > > public API. If anyone has any comments or thoughts let me know. > Otherwise, > > I'll take this to vote shortly. > > > > Best, > > Leah > > > > On Fri, Jul 24, 2020 at 3:45 PM Leah Thomas <ltho...@confluent.io> > wrote: > > > >> To accommodate the change to a final class, I've added another > >> *windowedBy()* function in *CogroupedKStream.java *to handle > >> SlidingWindows. > >> > >> As far as the discussion goes, I think this is the last change we've > >> talked about. If anyone has other comments or concerns, please let me > know! > >> > >> Cheers, > >> Leah > >> > >> On Thu, Jul 23, 2020 at 7:34 PM Leah Thomas <ltho...@confluent.io> > wrote: > >> > >>> Thanks for the discussion about extending TimeWindows. I agree that > >>> making it future proof is important, and the implementation of > >>> SlidingWindows is unique enough that it seems logical to make it its > own > >>> final class. > >>> > >>> On that note, I've updated the KIP to make SlidingWindows a stand alone > >>> final class, and added the *windowedBy() *API in *KGroupedStream *to > >>> handle SlidingWindows. It seems that SlidingWindows would still be > able to > >>> leverage *TimeWindowedKStream by* creating a SlidingWindows version of > >>> *TimeWindowedKStreamImpl* that implements *TimeWindowedKStream. *If > >>> anyone sees issues with this implementation, please let me know. > >>> > >>> Best, > >>> Leah > >>> > >>> On Wed, Jul 22, 2020 at 10:47 PM John Roesler <vvcep...@apache.org> > >>> wrote: > >>> > >>>> Thanks for the reply, Sophie. > >>>> > >>>> That all sounds about right to me. > >>>> > >>>> The Windows “interface”/algorithm is quite flexible, so it makes sense > >>>> for it to be extensible. Different implementations can (and do) > enumerate > >>>> different windows to suit different use cases. > >>>> > >>>> On the other hand, I can’t think of any way to extend SessionWindows > to > >>>> do something different using the same algorithm, so it makes sense > for it > >>>> to stay final. > >>>> > >>>> If we think SlidingWindows is similarly not usefully extensible, then > >>>> we can make it final. It’s easy to remove final later, but adding it > is not > >>>> possible. Or we could go the other route and just make it an > interface, on > >>>> general principle. Both of these choices are safe API design. > >>>> > >>>> Thanks again, > >>>> John > >>>> > >>>> On Wed, Jul 22, 2020, at 21:54, Sophie Blee-Goldman wrote: > >>>> > > > >>>> > > Users could pass in a custom `SessionWindows` as > >>>> > > long as the session algorithm works correctly for it. > >>>> > > >>>> > > >>>> > Well not really, SessionWindows is a final class. TimeWindows is > also > >>>> a > >>>> > final class, so neither of these can be extended/customized. For a > >>>> given > >>>> > Windows then there would only be three (non-overlapping) > >>>> possibilities: > >>>> > either it's TimeWindows, SlidingWindows, or a custom Windows. I > don't > >>>> > think there's any problem with determining what the user wants in > >>>> this case > >>>> > -- > >>>> > we would just check if it's a SlidingWindows and connect the new > >>>> processor, > >>>> > or else connect the existing hopping/tumbling window processor. > >>>> > > >>>> > I'll admit that last sentence does leave a bad taste in my mouth. > >>>> Part of > >>>> > that > >>>> > is probably the "leaking" API Matthias pointed out; we just assume > the > >>>> > hopping/tumbling window implementation fits all custom windows. But > I > >>>> guess > >>>> > if you really needed to customize the algorithm any further you may > >>>> as well > >>>> > stick in a transformer and do it all yourself. > >>>> > > >>>> > Anyways, given what we have, it does seem weird to apply one > algorithm > >>>> > for most Windows types and then swap in a different one for one > >>>> specific > >>>> > extension of Windows. So adding a new #windowedBy(SlidingWindows) > >>>> > sounds reasonable to me. > >>>> > > >>>> > I'm still not convinced that we need a whole new TimeWindowedKStream > >>>> > equivalent class for sliding windows though. It seems like the > >>>> > hopping/tumbling > >>>> > window implementation could benefit just as much from a subtractor > as > >>>> the > >>>> > sliding windows so the only future-proofing we get is the ability to > >>>> be > >>>> > lazy and > >>>> > add the subtractor to one but not the other. Of course it would only > >>>> be an > >>>> > optimization so we could just not apply it to one type and nothing > >>>> would > >>>> > break. > >>>> > It does make me nervous to go against the "future-proof" direction, > >>>> though. > >>>> > Are there any other examples of things we might want to add to one > >>>> window > >>>> > type but not to another? > >>>> > > >>>> > On another note, this actually brings up a new question: should > >>>> > SlidingWindows > >>>> > also be final? My take is "yes" since there's no reasonable > >>>> customization of > >>>> > sliding windows, at least not that I can think of. Thoughts? > >>>> > > >>>> > > >>>> > On Wed, Jul 22, 2020 at 7:15 PM John Roesler <vvcep...@apache.org> > >>>> wrote: > >>>> > > >>>> > > Thanks, all, > >>>> > > > >>>> > > I can see how my conclusion was kind of a leap. > >>>> > > > >>>> > > What Matthias said is indeed what I was thinking. When you > provide a > >>>> > > window definition to the windowBy() method, you are selecting an > >>>> algorithm > >>>> > > that will be used to compute the windows from the input data. > >>>> > > > >>>> > > I didn’t mean the code implementation “algorithm”, but the > >>>> high-level > >>>> > > algorithm that describes how the input stream will be transformed > >>>> into a > >>>> > > sequence of windows. > >>>> > > > >>>> > > For example, the algorithm for Windows is something like “given > the > >>>> record > >>>> > > timestamp, include the record in each of the enumerated windows”. > >>>> Note that > >>>> > > there can be a lot of variation in how the windows are enumerated, > >>>> which is > >>>> > > why there are at least a couple of implementations of Windows, and > >>>> we are > >>>> > > open to adding more (like for natural calendar boundaries). > >>>> > > > >>>> > > For SessionWindows, it’s more like “if any window is within the > gap, > >>>> > > extend its boundaries to include this record and if two windows > >>>> touch, then > >>>> > > merge them”. > >>>> > > > >>>> > > Clearly, the algorithm for SlidingWindows doesn’t fall into either > >>>> > > category, so it seems inappropriate to claim that it does in the > >>>> API (by > >>>> > > implementing Windows with stubbed methods) and then cast > internally > >>>> to > >>>> > > execute a completely different algorithm. > >>>> > > > >>>> > > To your other observation, that the DSL object resulting from > >>>> windowBy > >>>> > > would look the same for Windows and SessionWindows, maybe it makes > >>>> sense > >>>> > > for windowBy(SessionWindows) also to return a TimeWindowedKStream. > >>>> > > > >>>> > > i.e.: > >>>> > > ======================= > >>>> > > <W extends Window> TimeWindowedKStream<K, V> windowedBy(final > >>>> Windows<W> > >>>> > > windows); > >>>> > > TimeWindowedKStream<K, V> windowedBy(final SlidingWindows > windows); > >>>> > > ======================= > >>>> > > > >>>> > > I can’t think of a reason this wouldn’t work. But then again, it > >>>> would be > >>>> > > more future-proof to go ahead and specify a different return type > >>>> now, if > >>>> > > we think we'll want to add subtractors and stuff later. I don't > >>>> have a > >>>> > > strong feeling about that part of the API. It seems to be > >>>> independent of > >>>> > > whether SlidingWindows extends Windows or not. > >>>> > > > >>>> > > Thanks, > >>>> > > -John > >>>> > > > >>>> > > On Wed, Jul 22, 2020, at 19:41, Matthias J. Sax wrote: > >>>> > > > I think what John tries to say is the following: > >>>> > > > > >>>> > > > We have `windowedBy(Windows)` that accept hopping/tumbling > >>>> windows but > >>>> > > > also custom window and we use a specific algorithm. Note, that > >>>> custom > >>>> > > > windows must "work" based on the used algorithm. > >>>> > > > > >>>> > > > For session windows we have `windowedBy(SessionWindows)` and > >>>> apply a > >>>> > > > different algorithm. Users could pass in a custom > >>>> `SessionWindows` as > >>>> > > > long as the session algorithm works correctly for it. > >>>> > > > > >>>> > > > For the new sliding windows, we want to use a different > algorithm > >>>> > > > compare to hopping/tumbling windows. If we let sliding window > >>>> extend > >>>> > > > `Windows`, we can decide at runtime if we need to use the > >>>> > > > hopping/tumbling window algorithm for hopping/tumbling windows > or > >>>> the > >>>> > > > new sliding window algorithm for sliding windows. However, if we > >>>> get a > >>>> > > > custom window, which algorithm do we pick now? The existing > >>>> > > > tumbling/hopping window algorithm of the new sliding window > >>>> algorithm? > >>>> > > > Both a custom "time-window" and custom "sliding window" > implement > >>>> the > >>>> > > > generic `Windows` class and thus we cannot make a decision as we > >>>> don't > >>>> > > > know the user's intent. > >>>> > > > > >>>> > > > As a matter of fact, even if the user might not be aware of it, > >>>> the > >>>> > > > algorithm we use does already leak into the API (if a user > extends > >>>> > > > `Windows` is must work with our hopping/tumbling window > algorithm > >>>> and if > >>>> > > > a user extends `SessionWindows` it must work with our session > >>>> algorithm) > >>>> > > > and it seems we need to preserve this property for sliding > window. > >>>> > > > > >>>> > > > > >>>> > > > -Matthias > >>>> > > > > >>>> > > > On 7/22/20 4:35 PM, Sophie Blee-Goldman wrote: > >>>> > > > > Hey John, > >>>> > > > > > >>>> > > > > Just a few follow-up questions/comments about the whole > Windows > >>>> thing: > >>>> > > > > > >>>> > > > > That's a good way of looking at things; in particular the > point > >>>> about > >>>> > > > > SessionWindows > >>>> > > > > for example requiring a Merger while other "statically > >>>> enumerable" > >>>> > > windows > >>>> > > > > require > >>>> > > > > only an adder seems to touch on the heart of the matter. > >>>> > > > > > >>>> > > > > It seems like what Time and Universal (and any other Windows > >>>> > > > >> implementation) have in common is that the windows are > >>>> statically > >>>> > > > >> enumerable. > >>>> > > > >> As a consequence, they can all rely on an aggregation > >>>> maintenence > >>>> > > algorithm > >>>> > > > >> that involves enumerating each of the windows and updating > it. > >>>> That > >>>> > > > >> also means that their DSL object (TimeWindowedKStream) > doesn't > >>>> need > >>>> > > > >> "subtractors" or "mergers", but only "adders"; again, this > is a > >>>> > > consequence > >>>> > > > >> of the fact that the windows are enumerable. > >>>> > > > > > >>>> > > > > > >>>> > > > > Given that, I'm a bit confused why you conclude that sliding > >>>> windows > >>>> > > are > >>>> > > > > fundamentally > >>>> > > > > different from the "statically enumerable" windows -- sliding > >>>> windows > >>>> > > > > require only an > >>>> > > > > adder too. I'm not sure it's a consequence of being > enumerable, > >>>> or that > >>>> > > > > being enumerable > >>>> > > > > is the fundamental property that unites all Windows (ignoring > >>>> > > JoinWindows > >>>> > > > > here). Yes, it > >>>> > > > > currently does apply to all Windows implementations, but we > >>>> shouldn't > >>>> > > > > assume that it > >>>> > > > > *has *to be that way on the basis that it currently happens to > >>>> be. > >>>> > > > > > >>>> > > > > Also, the fact that they can all rely on the same aggregation > >>>> algorithm > >>>> > > > > seems like an > >>>> > > > > implementation detail and it would be weird to force a > >>>> separate/new > >>>> > > DSL API > >>>> > > > > just because > >>>> > > > > under the covers we swap in a different processor. > >>>> > > > > > >>>> > > > > To be fair, I don't think there's a strong reason *against* > not > >>>> > > extending > >>>> > > > > Windows -- in the end > >>>> > > > > it will just mean adding a new #windowedBy method and > >>>> copy/pasting > >>>> > > > > everything from > >>>> > > > > TimeWindowedKStream pretty much word for word. But anytime > you > >>>> find > >>>> > > > > yourself > >>>> > > > > copying over code almost exactly, there should probably be a > >>>> good > >>>> > > reason > >>>> > > > > why :) > >>>> > > > > > >>>> > > > > > >>>> > > > > On Wed, Jul 22, 2020 at 3:48 PM John Roesler < > >>>> vvcep...@apache.org> > >>>> > > wrote: > >>>> > > > > > >>>> > > > >> Thanks Leah! > >>>> > > > >> > >>>> > > > >> 5) Regarding the empty windows, I'm wondering if we should > >>>> simply > >>>> > > propose > >>>> > > > >> that the windows should not be emitted downstream of the > >>>> operator or > >>>> > > > >> visible in IQ. Then, it'll be up to the implementation to > make > >>>> it > >>>> > > happen. > >>>> > > > >> I'm > >>>> > > > >> personally not concerned about it, since it seems like there > >>>> are > >>>> > > multiple > >>>> > > > >> ways to accomplish this. > >>>> > > > >> > >>>> > > > >> Note, the discrepancy Matthias pointed out is actually a > >>>> design bug. > >>>> > > The > >>>> > > > >> windowed aggregation (like every operation in Streams) > >>>> produces a > >>>> > > "view", > >>>> > > > >> which then forms the basis of downstream operations. When we > >>>> pass the > >>>> > > > >> Materialized option to the operation, all we're doing is > >>>> saying to > >>>> > > > >> "materialize" > >>>> > > > >> the view (aka, actually store the computed view) and also > make > >>>> it > >>>> > > > >> queriable. > >>>> > > > >> It would be illegal for the "queriable, materialized view" to > >>>> differ > >>>> > > in > >>>> > > > >> any way > >>>> > > > >> from the "view". So, it seems we must either propose to emit > >>>> the empty > >>>> > > > >> windows AND make them visible in IQ, or propose NOT to emit > >>>> the empty > >>>> > > > >> windows AND NOT make them visible in IQ. > >>>> > > > >> > >>>> > > > >> 7) Regarding whether we can extend TimeWindows (or Windows): > >>>> > > > >> I've been mulling this over more. I think it's worth asking > the > >>>> > > question of > >>>> > > > >> what these classes even mean. For example, why is > >>>> SessionWindows a > >>>> > > > >> different thing from TimeWindows and UniversalWindows (which > >>>> are both > >>>> > > > >> Windows)? > >>>> > > > >> > >>>> > > > >> This conversation is extra complicated because of the > >>>> incomplete and > >>>> > > > >> mis-matched class hierarchy, but we can try to look past it > >>>> for now. > >>>> > > > >> > >>>> > > > >> It seems like what Time and Universal (and any other Windows > >>>> > > > >> implementation) have in common is that the windows are > >>>> statically > >>>> > > > >> enumerable. > >>>> > > > >> As a consequence, they can all rely on an aggregation > >>>> maintenence > >>>> > > algorithm > >>>> > > > >> that involves enumerating each of the windows and updating > it. > >>>> That > >>>> > > > >> also means that their DSL object (TimeWindowedKStream) > doesn't > >>>> need > >>>> > > > >> "subtractors" or "mergers", but only "adders"; again, this > is a > >>>> > > consequence > >>>> > > > >> of the fact that the windows are enumerable. > >>>> > > > >> > >>>> > > > >> In contrast, session windows are data driven, so they are not > >>>> > > statically > >>>> > > > >> enumerable. Their algorithm has to rely on scans, and to do > >>>> the scans, > >>>> > > > >> it needs to know the "inactivity gap", which needs to be part > >>>> of the > >>>> > > window > >>>> > > > >> definition. Likewise, session windows have the property that > >>>> they need > >>>> > > > >> to be merged, so their DSL object also requires mergers. > >>>> > > > >> > >>>> > > > >> It really seems like your new window definition doesn't fit > >>>> into > >>>> > > either > >>>> > > > >> category. It uses a different algorithm, which relies on > >>>> scans, but > >>>> > > it is > >>>> > > > >> also fixed in size, so it doesn't need mergers. In this > >>>> situation, it > >>>> > > seems > >>>> > > > >> like the safe bet is to just create SessionWindows with no > >>>> interface > >>>> > > and > >>>> > > > >> add a separate set of DSL operations and objects. It's a > >>>> little extra > >>>> > > code, > >>>> > > > >> but it seems to keep everything tidier and more > >>>> comprehensible, both > >>>> > > > >> for us and for users. > >>>> > > > >> > >>>> > > > >> What do you think? > >>>> > > > >> -John > >>>> > > > >> > >>>> > > > >> > >>>> > > > >> > >>>> > > > >> On Wed, Jul 22, 2020, at 10:30, Leah Thomas wrote: > >>>> > > > >>> Hi Matthias, > >>>> > > > >>> > >>>> > > > >>> Thanks for the suggestions, I've updated the KIP and child > >>>> page > >>>> > > > >> accordingly > >>>> > > > >>> and addressed some below. > >>>> > > > >>> > >>>> > > > >>> 1) For the mandatory grace period, we should use a static > >>>> builder > >>>> > > method > >>>> > > > >>>> that take two parameters. > >>>> > > > >>>> > >>>> > > > >>> > >>>> > > > >>> That makes sense, I've changed that in the public API. > >>>> > > > >>> > >>>> > > > >>> Btw: this implementation actually raises an issue for IQ: > >>>> those empty > >>>> > > > >>>> windows would be returned. > >>>> > > > >>> > >>>> > > > >>> > >>>> > > > >>> This is a great point, with the current implementation plan > >>>> empty > >>>> > > windows > >>>> > > > >>> would be returned. I think creating a second window store > >>>> would > >>>> > > > >> definitely > >>>> > > > >>> work, but there would be more overhead in having two stores > >>>> and > >>>> > > switching > >>>> > > > >>> windows between the stores, as well as doing scans in both > >>>> stores to > >>>> > > find > >>>> > > > >>> existing windows. There might be a way to do avoid emitting > >>>> empty > >>>> > > windows > >>>> > > > >>> without creating a second window store, I'll look more into > >>>> it. > >>>> > > > >>> > >>>> > > > >>> Cheers, > >>>> > > > >>> Leah > >>>> > > > >>> > >>>> > > > >>> On Tue, Jul 21, 2020 at 1:25 PM Matthias J. Sax < > >>>> mj...@apache.org> > >>>> > > > >> wrote: > >>>> > > > >>> > >>>> > > > >>>> Thanks for updating the KIP. > >>>> > > > >>>> > >>>> > > > >>>> Couple of follow up comments: > >>>> > > > >>>> > >>>> > > > >>>> 1) For the mandatory grace period, we should use a static > >>>> builder > >>>> > > > >> method > >>>> > > > >>>> that take two parameters. This provides a better API as > user > >>>> cannot > >>>> > > > >>>> forget to set the grace period. Throwing a runtime > exception > >>>> seems > >>>> > > not > >>>> > > > >>>> to be the best way to handle this case. > >>>> > > > >>>> > >>>> > > > >>>> > >>>> > > > >>>> > >>>> > > > >>>> 2) In Fig.2 you list 10 hopping windows. I believe it > should > >>>> > > actually > >>>> > > > >> be > >>>> > > > >>>> more? There first hopping window would be [-6,-4[ and the > >>>> last one > >>>> > > > >> would > >>>> > > > >>>> be from [19,29[ -- hence, the cost saving are actually much > >>>> higher. > >>>> > > > >>>> > >>>> > > > >>>> > >>>> > > > >>>> > >>>> > > > >>>> 3a) IQ: you are saying that the user need to compute the > >>>> start time > >>>> > > as > >>>> > > > >>>> > >>>> > > > >>>>> windowSize+the time they're looking at > >>>> > > > >>>> > >>>> > > > >>>> Should this be "targetTime - windowSize" instead? > >>>> > > > >>>> > >>>> > > > >>>> > >>>> > > > >>>> > >>>> > > > >>>> 3b) IQ: in you example you say "window size of 10 minutes" > >>>> with an > >>>> > > > >>>> incident at 9:15. > >>>> > > > >>>> > >>>> > > > >>>>> they're looking for a window with the start time of 8:15. > >>>> > > > >>>> > >>>> > > > >>>> The example does not seem to add up? > >>>> > > > >>>> > >>>> > > > >>>> > >>>> > > > >>>> > >>>> > > > >>>> 4) For "Processing Windows": you describe a three step > >>>> approach: I > >>>> > > just > >>>> > > > >>>> want to point out, that step (1) is not necessary for each > >>>> input > >>>> > > > >> record, > >>>> > > > >>>> because timestamps are not guaranteed to be unique and > thus a > >>>> > > previous > >>>> > > > >>>> record with the same key and timestamp might have create > the > >>>> windows > >>>> > > > >>>> already. > >>>> > > > >>>> > >>>> > > > >>>> Nit: I am also not exactly sure what you mean by step (3) > as > >>>> you use > >>>> > > > >> the > >>>> > > > >>>> word "send". I guess you mean "put"? > >>>> > > > >>>> > >>>> > > > >>>> It seem there are actually more details in the sub-page: > >>>> > > > >>>> > >>>> > > > >>>>> A new record for SlidingWindows will always create two new > >>>> windows. > >>>> > > > >> If > >>>> > > > >>>> either of those windows already exist in the windows store, > >>>> their > >>>> > > > >>>> aggregation will simply be updated to include the new > >>>> record, but no > >>>> > > > >>>> duplicate window will be added to the WindowStore. > >>>> > > > >>>> > >>>> > > > >>>> However, the first and second sentence contradict each > other > >>>> a > >>>> > > little > >>>> > > > >>>> bit. I think the first sentence is not correct. > >>>> > > > >>>> > >>>> > > > >>>> Nit: > >>>> > > > >>>> > >>>> > > > >>>>> For in-order records, the left window will always be > empty. > >>>> > > > >>>> > >>>> > > > >>>> This should be "right window" ? > >>>> > > > >>>> > >>>> > > > >>>> > >>>> > > > >>>> > >>>> > > > >>>> 5) "Emitting Results": it might be worth to point out, > that a > >>>> > > > >>>> second/future window of a new record is create with no > >>>> records, and > >>>> > > > >>>> thus, even if it's initialized it won't be emitted. Only > if a > >>>> > > > >>>> consecutive record falls into the window, the window would > be > >>>> > > updates > >>>> > > > >>>> and the window result (for a window content of one record) > >>>> would be > >>>> > > > >> sent > >>>> > > > >>>> downstream. > >>>> > > > >>>> > >>>> > > > >>>> Again, the sub-page contains this details. Might still be > >>>> worth to > >>>> > > add > >>>> > > > >>>> to the top level page, too. > >>>> > > > >>>> > >>>> > > > >>>> Btw: this implementation actually raises an issue for IQ: > >>>> those > >>>> > > empty > >>>> > > > >>>> windows would be returned. Thus I am wondering if we need > to > >>>> use two > >>>> > > > >>>> stores internally? One store for actual windows and one > >>>> store for > >>>> > > empty > >>>> > > > >>>> windows? If an empty window is updated, it's move to the > >>>> other > >>>> > > store? > >>>> > > > >>>> For IQ, we only allow to query the non-empty-window store? > >>>> > > > >>>> > >>>> > > > >>>> > >>>> > > > >>>> > >>>> > > > >>>> 6) On the sub-page: > >>>> > > > >>>> > >>>> > > > >>>>> The left window of in-order records and both windows for > >>>> > > out-of-order > >>>> > > > >>>> records need to be updated with the values of records that > >>>> have > >>>> > > already > >>>> > > > >>>> been processed. > >>>> > > > >>>> > >>>> > > > >>>> Why "both windows for out-of-order records"? IMHO, we don't > >>>> know how > >>>> > > > >>>> many existing windows needs to be updated when processing > an > >>>> > > > >>>> out-of-order record. Of course, an out-of-order record > could > >>>> not > >>>> > > fall > >>>> > > > >>>> into any existing window but create two new windows, too. > >>>> > > > >>>> > >>>> > > > >>>>> Because each record creates one new window that includes > >>>> itself > >>>> > > and > >>>> > > > >> one > >>>> > > > >>>> window that does not > >>>> > > > >>>> > >>>> > > > >>>> As state above, this does not seem to hold. I understand > why > >>>> you > >>>> > > mean, > >>>> > > > >>>> but it would be good to be exact. > >>>> > > > >>>> > >>>> > > > >>>> Figure 2: You use the term "late" but you mean > >>>> "out-of-order" I > >>>> > > guess > >>>> > > > >> -- > >>>> > > > >>>> a record is _late_ if it's not processed any longer as the > >>>> grace > >>>> > > period > >>>> > > > >>>> passed already. > >>>> > > > >>>> > >>>> > > > >>>> Figure 2: "Late" should be out-or-order. The example text > >>>> say a > >>>> > > window > >>>> > > > >>>> [16,26] should be created but the figure shows the green > >>>> window as > >>>> > > > >> [15,20]. > >>>> > > > >>>> > >>>> > > > >>>> About the blue window: maybe add not that the blue window > >>>> contains > >>>> > > the > >>>> > > > >>>> aggregate we need for the green window, _before_ the new > >>>> record `a` > >>>> > > is > >>>> > > > >>>> added to the blue window. > >>>> > > > >>>> > >>>> > > > >>>> > >>>> > > > >>>> > >>>> > > > >>>> 7) I am not really happy to extend TimeWindows and I think > >>>> the > >>>> > > argument > >>>> > > > >>>> about JoinWindows is not the best (IMHO, JoinWindows do it > >>>> already > >>>> > > > >> wrong > >>>> > > > >>>> and we just repeat the same mistake). However, it seems our > >>>> window > >>>> > > > >>>> hierarchy is "broken" already and it might be out of scope > >>>> for this > >>>> > > KIP > >>>> > > > >>>> to fix it. Hence, I am ok that we bite the bullet for now > >>>> and clean > >>>> > > it > >>>> > > > >>>> up later. > >>>> > > > >>>> > >>>> > > > >>>> > >>>> > > > >>>> > >>>> > > > >>>> -Matthias > >>>> > > > >>>> > >>>> > > > >>>> > >>>> > > > >>>> On 7/20/20 5:18 PM, Guozhang Wang wrote: > >>>> > > > >>>>> Hi Leah, > >>>> > > > >>>>> > >>>> > > > >>>>> Thanks for the updated KIP. I agree that extending > >>>> SlidingWindows > >>>> > > > >> from > >>>> > > > >>>>> Windows is fine for the sake of not introducing more > public > >>>> APIs > >>>> > > (and > >>>> > > > >>>> their > >>>> > > > >>>>> internal xxxImpl classes), and its cons is small enough to > >>>> tolerate > >>>> > > > >> to > >>>> > > > >>>> me. > >>>> > > > >>>>> > >>>> > > > >>>>> > >>>> > > > >>>>> Guozhang > >>>> > > > >>>>> > >>>> > > > >>>>> > >>>> > > > >>>>> On Mon, Jul 20, 2020 at 1:49 PM Leah Thomas < > >>>> ltho...@confluent.io> > >>>> > > > >>>> wrote: > >>>> > > > >>>>> > >>>> > > > >>>>>> Hi all, > >>>> > > > >>>>>> > >>>> > > > >>>>>> Thanks for the feedback on the KIP. I've updated the KIP > >>>> page > >>>> > > > >>>>>> < > >>>> > > > >>>>>> > >>>> > > > >>>> > >>>> > > > >> > >>>> > > > >>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL > >>>> > > > >>>>>>> > >>>> > > > >>>>>> to address these points and have created a child page > >>>> > > > >>>>>> < > >>>> > > > >>>>>> > >>>> > > > >>>> > >>>> > > > >> > >>>> > > > >>>> > https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450 > >>>> > > > >>>>>>> > >>>> > > > >>>>>> to go more in depth on certain implementation details. > >>>> > > > >>>>>> > >>>> > > > >>>>>> *Grace Period:* > >>>> > > > >>>>>> I think Sophie raises a good point that the default grace > >>>> period > >>>> > > of > >>>> > > > >> 24 > >>>> > > > >>>>>> hours is often too long and was chosen when retention > time > >>>> and > >>>> > > grace > >>>> > > > >>>> period > >>>> > > > >>>>>> were the same. For SlidingWindows, I propose we make the > >>>> grace > >>>> > > > >> period > >>>> > > > >>>>>> mandatory. To keep formatting consistent with other types > >>>> of > >>>> > > > >> windows, > >>>> > > > >>>> grace > >>>> > > > >>>>>> period won't be an additional parameter in the #of > method, > >>>> but > >>>> > > will > >>>> > > > >>>> still > >>>> > > > >>>>>> look like it does in other use cases: > >>>> > > > >>>>>> > >>>> .windowedBy(SlidingWindows.of(twentySeconds).grace(fiftySeconds). > >>>> > > If > >>>> > > > >>>> grace > >>>> > > > >>>>>> period isn't properly initialized, an error will be > thrown > >>>> through > >>>> > > > >> the > >>>> > > > >>>>>> process method. > >>>> > > > >>>>>> > >>>> > > > >>>>>> *Storage Layer + Aggregation:* > >>>> > > > >>>>>> SlidingWindows will use a WindowStore because computation > >>>> can be > >>>> > > > >> done > >>>> > > > >>>> with > >>>> > > > >>>>>> the information stored in a WindowStore (window timestamp > >>>> and > >>>> > > > >> value). > >>>> > > > >>>> Using > >>>> > > > >>>>>> the WindowStore also simplifies computation as > >>>> SlidingWindows can > >>>> > > > >>>> leverage > >>>> > > > >>>>>> existing processes. Because we are using a WindowStore, > the > >>>> > > > >> aggregation > >>>> > > > >>>>>> process will be similar to that of a hopping window. As > >>>> records > >>>> > > > >> come in > >>>> > > > >>>>>> their value is added to the aggregation that already > >>>> exists, > >>>> > > > >> following > >>>> > > > >>>> the > >>>> > > > >>>>>> same procedure as hopping windows. The aggregation > >>>> difference > >>>> > > > >> between > >>>> > > > >>>>>> SlidingWindows and HoppingWindows comes in creating new > >>>> windows > >>>> > > for > >>>> > > > >> a > >>>> > > > >>>>>> SlidingWindow, where you need to find the existing > records > >>>> that > >>>> > > > >> belong > >>>> > > > >>>> to > >>>> > > > >>>>>> the new window. This computation is similar to the > >>>> aggregation in > >>>> > > > >>>>>> SessionWindows and requires a scan to the WindowStore to > >>>> find the > >>>> > > > >> window > >>>> > > > >>>>>> with the aggregation needed, which will always be > >>>> pre-computed. > >>>> > > The > >>>> > > > >> scan > >>>> > > > >>>>>> requires creating an iterator, but should have minimal > >>>> performance > >>>> > > > >>>> effects > >>>> > > > >>>>>> as this strategy is already implemented in > SessionWindows. > >>>> More > >>>> > > > >> details > >>>> > > > >>>> on > >>>> > > > >>>>>> finding the aggregation that needs to be put in a new > >>>> window can > >>>> > > be > >>>> > > > >>>> found > >>>> > > > >>>>>> on the implementation page > >>>> > > > >>>>>> < > >>>> > > > >>>>>> > >>>> > > > >>>> > >>>> > > > >> > >>>> > > > >>>> > https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450 > >>>> > > > >>>>>>> > >>>> > > > >>>>>> . > >>>> > > > >>>>>> > >>>> > > > >>>>>> *Extending Windows<TimeWindow>, Windows<Window> or > nothing* > >>>> > > > >>>>>> Because SlidingWindows are still defined by a windowSize > >>>> (whereas > >>>> > > > >>>>>> SessionWindows are defined purely by data), I think it > >>>> makes sense > >>>> > > > >> to > >>>> > > > >>>>>> leverage the existing Window processes instead of > creating > >>>> a new > >>>> > > > >> store > >>>> > > > >>>> type > >>>> > > > >>>>>> that would be very similar to the WindowStore. While it's > >>>> true > >>>> > > that > >>>> > > > >> the > >>>> > > > >>>>>> #windowsFor method isn't necessary for SlidingWindows, > >>>> JoinWindows > >>>> > > > >> also > >>>> > > > >>>>>> extends Windows<Window> and throws an > >>>> > > UnsupportedOperationException > >>>> > > > >> in > >>>> > > > >>>> the > >>>> > > > >>>>>> #windowsFor method, which is what SlidingWindows can do. > >>>> The > >>>> > > > >> difference > >>>> > > > >>>>>> between extending Windows<TimeWindow> or Windows<Window> > is > >>>> > > > >> minimal, as > >>>> > > > >>>>>> both are ways to pass window parameters. Extending > >>>> > > > >> Windows<TimeWindow> > >>>> > > > >>>> will > >>>> > > > >>>>>> give us more leverage in utilizing existing processes. > >>>> > > > >>>>>> > >>>> > > > >>>>>> *Emit Strategy* > >>>> > > > >>>>>> I would argue that emitting for every update is still the > >>>> best way > >>>> > > > >> to go > >>>> > > > >>>>>> for SlidingWindows because it mimics the other types of > >>>> windows, > >>>> > > and > >>>> > > > >>>>>> suppression can be leveraged to limit what SlidingWindows > >>>> emits. > >>>> > > > >> While > >>>> > > > >>>> some > >>>> > > > >>>>>> users may only want to see the last value, others may > want > >>>> to see > >>>> > > > >> more, > >>>> > > > >>>> and > >>>> > > > >>>>>> leaving the emit strategy to emit partial results allows > >>>> both > >>>> > > users > >>>> > > > >> to > >>>> > > > >>>>>> access what they want. > >>>> > > > >>>>>> > >>>> > > > >>>>>> *Additional Features* > >>>> > > > >>>>>> Supporting sliding windows inherently, and shifting > >>>> inefficient > >>>> > > > >> hopping > >>>> > > > >>>>>> windows to sliding windows, is an interesting idea and > >>>> could be > >>>> > > > >> built on > >>>> > > > >>>>>> top of SlidingWindows when they are finished, but right > >>>> now seems > >>>> > > > >> out of > >>>> > > > >>>>>> scope for the needs of this KIP. Similarly, including a > >>>> > > > >> `subtraction` > >>>> > > > >>>>>> feature could have performance improvements, but doesn't > >>>> seem > >>>> > > > >> necessary > >>>> > > > >>>> for > >>>> > > > >>>>>> the implementation of this KIP. > >>>> > > > >>>>>> > >>>> > > > >>>>>> Let me know what you think of the updates, > >>>> > > > >>>>>> > >>>> > > > >>>>>> Leah > >>>> > > > >>>>>> > >>>> > > > >>>>>> On Thu, Jul 16, 2020 at 11:57 AM John Roesler < > >>>> > > vvcep...@apache.org> > >>>> > > > >>>> wrote: > >>>> > > > >>>>>> > >>>> > > > >>>>>>> Hello all, > >>>> > > > >>>>>>> > >>>> > > > >>>>>>> Thanks for the KIP, Leah! > >>>> > > > >>>>>>> > >>>> > > > >>>>>>> Regarding (1): I'd go farther actually. Making Windows > an > >>>> > > abstract > >>>> > > > >>>>>>> class was a mistake from the beginning that led to us > not > >>>> being > >>>> > > > >>>>>>> able to fix a very confusing situation for users around > >>>> retention > >>>> > > > >>>> times, > >>>> > > > >>>>>>> final results emitting, etc. Thus, I would not suggest > >>>> extending > >>>> > > > >>>>>>> TimeWindows for sure, but would also not suggest > extending > >>>> > > Windows. > >>>> > > > >>>>>>> > >>>> > > > >>>>>>> The very simplest thing to do is follow the example of > >>>> > > > >> SessionWindows, > >>>> > > > >>>>>>> which is just a completely self-contained class. If we > >>>> don't mess > >>>> > > > >> with > >>>> > > > >>>>>>> class inheritance, we won't ever have any of the > problems > >>>> related > >>>> > > > >> to > >>>> > > > >>>>>>> class inheritance. This is my preferred solution. > >>>> > > > >>>>>>> > >>>> > > > >>>>>>> Still, Sliding windows has a lot in common with > >>>> TimeWindows and > >>>> > > > >> other > >>>> > > > >>>>>>> fixed-size windows, namely that the windows are fixed in > >>>> size. If > >>>> > > > >> we > >>>> > > > >>>> want > >>>> > > > >>>>>>> to preserve the current two-part windowing API in which > >>>> you can > >>>> > > > >> window > >>>> > > > >>>>>>> by either "fixed" or "data driven" modes, I'd suggest we > >>>> avoid > >>>> > > > >>>> increasing > >>>> > > > >>>>>>> the blast radius of Windows by taking the opportunity to > >>>> replace > >>>> > > it > >>>> > > > >>>> with > >>>> > > > >>>>>>> a proper interface and implement that interface instead. > >>>> > > > >>>>>>> > >>>> > > > >>>>>>> For example: > >>>> > > > >>>>>>> https://github.com/apache/kafka/pull/9031 > >>>> > > > >>>>>>> > >>>> > > > >>>>>>> Then, SlidingWindows would just implement > >>>> > > FixedSizeWindowDefinition > >>>> > > > >>>>>>> > >>>> > > > >>>>>>> ====== > >>>> > > > >>>>>>> > >>>> > > > >>>>>>> Regarding (2), it seems more straightforward as a user > of > >>>> Streams > >>>> > > > >>>>>>> to just have one mental model. _All_ of our aggregation > >>>> > > operations > >>>> > > > >>>>>>> follow an eager emission model, in which we just emit an > >>>> update > >>>> > > > >>>> whenever > >>>> > > > >>>>>>> an update is available. We already provided Suppression > to > >>>> > > > >> explicitly > >>>> > > > >>>>>> apply > >>>> > > > >>>>>>> different update semantics in the case it's required. > Why > >>>> should > >>>> > > we > >>>> > > > >>>>>> define > >>>> > > > >>>>>>> a snowflake operation with completely different > semantics > >>>> from > >>>> > > > >>>> everything > >>>> > > > >>>>>>> else? I.e., systems are generally easier to use when > they > >>>> follow > >>>> > > a > >>>> > > > >> few > >>>> > > > >>>>>>> simple, composable rules than when they have a lot of > >>>> different, > >>>> > > > >>>> specific > >>>> > > > >>>>>>> rules. > >>>> > > > >>>>>>> > >>>> > > > >>>>>>> > >>>> > > > >>>>>>> ====== > >>>> > > > >>>>>>> > >>>> > > > >>>>>>> New point: (4): > >>>> > > > >>>>>>> It would be nice to include some examples of user code > >>>> that would > >>>> > > > >> use > >>>> > > > >>>> the > >>>> > > > >>>>>>> new API, which should include: > >>>> > > > >>>>>>> 1. using the DSL with the sliding window definition > >>>> > > > >>>>>>> 2. accessing the stored results of a sliding window > >>>> aggregation > >>>> > > > >> via IQ > >>>> > > > >>>>>>> 3. defining a custom processor to access sliding windows > >>>> in a > >>>> > > store > >>>> > > > >>>>>>> > >>>> > > > >>>>>>> It generally helps reviewers wrap their heads around the > >>>> > > proposal, > >>>> > > > >> as > >>>> > > > >>>>>> well > >>>> > > > >>>>>>> as shaking out any design issues that would otherwise > >>>> only come > >>>> > > up > >>>> > > > >>>> during > >>>> > > > >>>>>>> implementation/testing/review. > >>>> > > > >>>>>>> > >>>> > > > >>>>>>> Thanks again for the awesome proposal! > >>>> > > > >>>>>>> -John > >>>> > > > >>>>>>> > >>>> > > > >>>>>>> > >>>> > > > >>>>>>> On Tue, Jul 14, 2020, at 12:31, Guozhang Wang wrote: > >>>> > > > >>>>>>>> Hello Leah, > >>>> > > > >>>>>>>> > >>>> > > > >>>>>>>> Thanks for the nice written KIP. A few thoughts: > >>>> > > > >>>>>>>> > >>>> > > > >>>>>>>> 1) I echo the other reviewer's comments regarding the > >>>> typing: > >>>> > > why > >>>> > > > >>>>>>> extending > >>>> > > > >>>>>>>> TimeWindow instead of just extending Window? > >>>> > > > >>>>>>>> > >>>> > > > >>>>>>>> 2) I also feel that emitting policy for this type of > >>>> windowing > >>>> > > > >>>>>>> aggregation > >>>> > > > >>>>>>>> may be different from the existing ones. Existing > >>>> emitting > >>>> > > policy > >>>> > > > >> is > >>>> > > > >>>>>> very > >>>> > > > >>>>>>>> simple: emit every time when window get updates, and > >>>> emit every > >>>> > > > >> time > >>>> > > > >>>> on > >>>> > > > >>>>>>>> out-of-ordering data within grace period, this is > >>>> because for > >>>> > > > >>>>>>> time-windows > >>>> > > > >>>>>>>> the window close time is strictly depend on the window > >>>> start > >>>> > > time > >>>> > > > >>>> which > >>>> > > > >>>>>>> is > >>>> > > > >>>>>>>> fixed, while for session-windows although the window > >>>> open/close > >>>> > > > >> time > >>>> > > > >>>> is > >>>> > > > >>>>>>>> also data-dependent it is relatively infrequent > compared > >>>> to the > >>>> > > > >>>>>>>> sliding-windows. For this KIP, since each new data > would > >>>> cause a > >>>> > > > >>>>>>>> new sliding-window, the num. windows maintained > >>>> logically could > >>>> > > be > >>>> > > > >>>> much > >>>> > > > >>>>>>>> larger and hence emitting on each update may be too > >>>> aggressive. > >>>> > > > >>>>>>>> > >>>> > > > >>>>>>>> 3) Although KIP itself should be focusing on user face > >>>> > > > >> interfaces, I'd > >>>> > > > >>>>>>>> suggest we create a children page of KIP-450 discussing > >>>> about > >>>> > > its > >>>> > > > >>>>>>>> implementations as well, since some of that may drive > the > >>>> > > > >> interface > >>>> > > > >>>>>>> design. > >>>> > > > >>>>>>>> E.g. personally I think having a combiner interface in > >>>> addition > >>>> > > to > >>>> > > > >>>>>>>> aggregator would be useful but that's based on my > 2cents > >>>> about > >>>> > > the > >>>> > > > >>>>>>>> implementation design (I once created a child page > >>>> describing > >>>> > > it: > >>>> > > > >>>>>>>> > >>>> > > > >>>>>>> > >>>> > > > >>>>>> > >>>> > > > >>>> > >>>> > > > >> > >>>> > > > >>>> > https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation > >>>> > > > >>>>>>>> ). > >>>> > > > >>>>>>>> > >>>> > > > >>>>>>>> > >>>> > > > >>>>>>>> Guozhang > >>>> > > > >>>>>>>> > >>>> > > > >>>>>>>> > >>>> > > > >>>>>>>> > >>>> > > > >>>>>>>> > >>>> > > > >>>>>>>> On Tue, Jul 14, 2020 at 5:31 AM Bruno Cadonna < > >>>> > > br...@confluent.io > >>>> > > > >>> > >>>> > > > >>>>>>> wrote: > >>>> > > > >>>>>>>> > >>>> > > > >>>>>>>>> Hi Leah, > >>>> > > > >>>>>>>>> > >>>> > > > >>>>>>>>> Thank you for the KIP! > >>>> > > > >>>>>>>>> > >>>> > > > >>>>>>>>> Here is my feedback: > >>>> > > > >>>>>>>>> > >>>> > > > >>>>>>>>> 1. The KIP would benefit from some code examples that > >>>> show how > >>>> > > > >> to use > >>>> > > > >>>>>>>>> sliding windows in aggregations. > >>>> > > > >>>>>>>>> > >>>> > > > >>>>>>>>> 2. The different sliding windows in Figure 1 and 2 are > >>>> really > >>>> > > > >> hard to > >>>> > > > >>>>>>>>> distinguish. Could you please try to make them > >>>> graphically > >>>> > > better > >>>> > > > >>>>>>>>> distinguishable? You could try to draw the frames of > >>>> > > consecutive > >>>> > > > >>>>>>>>> windows shifted to each other. > >>>> > > > >>>>>>>>> > >>>> > > > >>>>>>>>> 3. I agree with Matthias, that extending > >>>> Windows<TimeWindow> > >>>> > > > >> does not > >>>> > > > >>>>>>>>> seem to be the best approach. What would be the result > >>>> of > >>>> > > > >>>>>>>>> windowsFor()? > >>>> > > > >>>>>>>>> > >>>> > > > >>>>>>>>> 4. In the section "Public Interfaces" you should > remove > >>>> > > > >>>>>> implementation > >>>> > > > >>>>>>>>> details like private constructors and private fields. > >>>> > > > >>>>>>>>> > >>>> > > > >>>>>>>>> 5. Do we need a new store interface or can we use > >>>> WindowStore? > >>>> > > > >> Some > >>>> > > > >>>>>>>>> words about that would be informative. > >>>> > > > >>>>>>>>> > >>>> > > > >>>>>>>>> 6. @Matthias, if the subtrator is not strictly needed, > >>>> I would > >>>> > > > >> skip > >>>> > > > >>>>>> it > >>>> > > > >>>>>>>>> for now and add it later. > >>>> > > > >>>>>>>>> > >>>> > > > >>>>>>>>> 7. I also agree that having a section that describes > >>>> how to > >>>> > > > >> handle > >>>> > > > >>>>>>>>> out-of-order records would be good to understand what > >>>> is still > >>>> > > > >>>>>> missing > >>>> > > > >>>>>>>>> and what we can reuse. > >>>> > > > >>>>>>>>> > >>>> > > > >>>>>>>>> Best, > >>>> > > > >>>>>>>>> Bruno > >>>> > > > >>>>>>>>> > >>>> > > > >>>>>>>>> On Sat, Jul 11, 2020 at 9:16 PM Matthias J. Sax < > >>>> > > > >> mj...@apache.org> > >>>> > > > >>>>>>> wrote: > >>>> > > > >>>>>>>>>> > >>>> > > > >>>>>>>>>> Leah, > >>>> > > > >>>>>>>>>> > >>>> > > > >>>>>>>>>> thanks for your update. However, it does not > >>>> completely answer > >>>> > > > >> my > >>>> > > > >>>>>>>>> question. > >>>> > > > >>>>>>>>>> > >>>> > > > >>>>>>>>>> In our current window implementations, we emit a > >>>> window result > >>>> > > > >>>>>> update > >>>> > > > >>>>>>>>>> record (ie, early/partial result) for each input > >>>> record. When > >>>> > > an > >>>> > > > >>>>>>>>>> out-of-order record arrives, we just update to > >>>> corresponding > >>>> > > old > >>>> > > > >>>>>>> window > >>>> > > > >>>>>>>>>> and emit another update. > >>>> > > > >>>>>>>>>> > >>>> > > > >>>>>>>>>> It's unclear from the KIP if you propose the same > emit > >>>> > > > >> strategy? -- > >>>> > > > >>>>>>> For > >>>> > > > >>>>>>>>>> sliding windows it might be worth to consider to use > a > >>>> > > different > >>>> > > > >>>>>> emit > >>>> > > > >>>>>>>>>> strategy and only support emitting the final result > >>>> only (ie, > >>>> > > > >> after > >>>> > > > >>>>>>> the > >>>> > > > >>>>>>>>>> grace period passed)? > >>>> > > > >>>>>>>>>> > >>>> > > > >>>>>>>>>> > >>>> > > > >>>>>>>>>> > >>>> > > > >>>>>>>>>> Boyang, also raises a good point that relates to my > >>>> point from > >>>> > > > >>>>>> above > >>>> > > > >>>>>>>>>> about pre-aggregations and storage layout. Our > current > >>>> time > >>>> > > > >> windows > >>>> > > > >>>>>>> are > >>>> > > > >>>>>>>>>> all pre-aggregated and stored in parallel. We can > also > >>>> lookup > >>>> > > > >>>>>> windows > >>>> > > > >>>>>>>>>> efficiently, as we can compute the windowed-key given > >>>> the > >>>> > > input > >>>> > > > >>>>>>> record > >>>> > > > >>>>>>>>>> key and timestamp based on the window definition. > >>>> > > > >>>>>>>>>> > >>>> > > > >>>>>>>>>> However, for sliding windows, window boundaries are > >>>> data > >>>> > > > >> dependent > >>>> > > > >>>>>>> and > >>>> > > > >>>>>>>>>> thus we cannot compute them upfront. Thus, how can we > >>>> "find" > >>>> > > > >>>>>> existing > >>>> > > > >>>>>>>>>> window efficiently? Furthermore, out-of-order data > >>>> would > >>>> > > create > >>>> > > > >> new > >>>> > > > >>>>>>>>>> windows in the past and we need to be able to handle > >>>> this > >>>> > > case. > >>>> > > > >>>>>>>>>> > >>>> > > > >>>>>>>>>> Thus, to handle out-of-order data correctly, we need > >>>> to store > >>>> > > > >> all > >>>> > > > >>>>>> raw > >>>> > > > >>>>>>>>>> input events. Additionally, we could also store > >>>> pre-aggregated > >>>> > > > >>>>>>> results > >>>> > > > >>>>>>>>>> if we thinks it's benfitial. -- If we apply "emit > only > >>>> final > >>>> > > > >>>>>> results" > >>>> > > > >>>>>>>>>> strategy, storing pre-aggregated result would not be > >>>> necessary > >>>> > > > >>>>>>> though. > >>>> > > > >>>>>>>>>> > >>>> > > > >>>>>>>>>> > >>>> > > > >>>>>>>>>> Btw: for sliding windows it might also be useful to > >>>> consider > >>>> > > > >>>>>> allowing > >>>> > > > >>>>>>>>>> users to supply a `Subtractor` -- this subtractor > >>>> could be > >>>> > > > >> applied > >>>> > > > >>>>>> on > >>>> > > > >>>>>>>>>> the current window result (in case we store it) if a > >>>> record > >>>> > > > >> drops > >>>> > > > >>>>>>> out of > >>>> > > > >>>>>>>>>> the window. Of course, not all aggregation functions > >>>> are > >>>> > > > >>>>>> subtractable > >>>> > > > >>>>>>>>>> and we can consider this as a follow up task, too, > and > >>>> not > >>>> > > > >> include > >>>> > > > >>>>>> in > >>>> > > > >>>>>>>>>> this KIP for now. Thoughts? > >>>> > > > >>>>>>>>>> > >>>> > > > >>>>>>>>>> > >>>> > > > >>>>>>>>>> > >>>> > > > >>>>>>>>>> I was also thinking about the type hierarchy. I am > not > >>>> sure if > >>>> > > > >>>>>>> extending > >>>> > > > >>>>>>>>>> TimeWindow is the best approach? For TimeWindows, we > >>>> can > >>>> > > > >>>>>> pre-compute > >>>> > > > >>>>>>>>>> window boundaries (cf `windowsFor()`) while for a > >>>> sliding > >>>> > > window > >>>> > > > >>>>>> the > >>>> > > > >>>>>>>>>> boundaries are data dependent. Session windows are > >>>> also data > >>>> > > > >>>>>>> dependent > >>>> > > > >>>>>>>>>> and thus they don't inherit from TimeWindow (Maybe > >>>> check out > >>>> > > the > >>>> > > > >>>>>> KIP > >>>> > > > >>>>>>>>>> that added session windows? It could provides some > good > >>>> > > > >> insights.) > >>>> > > > >>>>>>> -- I > >>>> > > > >>>>>>>>>> believe the same rational applies to sliding windows? > >>>> > > > >>>>>>>>>> > >>>> > > > >>>>>>>>>> > >>>> > > > >>>>>>>>>> > >>>> > > > >>>>>>>>>> -Matthias > >>>> > > > >>>>>>>>>> > >>>> > > > >>>>>>>>>> > >>>> > > > >>>>>>>>>> > >>>> > > > >>>>>>>>>> > >>>> > > > >>>>>>>>>> On 7/10/20 12:47 PM, Boyang Chen wrote: > >>>> > > > >>>>>>>>>>> Thanks Leah and Sophie for the KIP. > >>>> > > > >>>>>>>>>>> > >>>> > > > >>>>>>>>>>> 1. I'm a bit surprised that we don't have an advance > >>>> time. > >>>> > > > >> Could > >>>> > > > >>>>>> we > >>>> > > > >>>>>>>>>>> elaborate how the storage layer is structured? > >>>> > > > >>>>>>>>>>> > >>>> > > > >>>>>>>>>>> 2. IIUC, there will be extra cost in terms of > fetching > >>>> > > > >>>>>> aggregation > >>>> > > > >>>>>>>>> results, > >>>> > > > >>>>>>>>>>> since we couldn't pre-aggregate until the user asks > >>>> for it. > >>>> > > > >> Would > >>>> > > > >>>>>>> be > >>>> > > > >>>>>>>>> good > >>>> > > > >>>>>>>>>>> to also discuss it. > >>>> > > > >>>>>>>>>>> > >>>> > > > >>>>>>>>>>> 3. We haven't discussed the possibility of > supporting > >>>> sliding > >>>> > > > >>>>>>> windows > >>>> > > > >>>>>>>>>>> inherently. For a user who actually uses a hopping > >>>> window, > >>>> > > > >>>>>> Streams > >>>> > > > >>>>>>>>> could > >>>> > > > >>>>>>>>>>> detect such an inefficiency doing a > >>>> window_size/advance_time > >>>> > > > >>>>>> ratio > >>>> > > > >>>>>>> to > >>>> > > > >>>>>>>>> reach > >>>> > > > >>>>>>>>>>> a conclusion on whether the write amplification is > >>>> too high > >>>> > > > >>>>>>> compared > >>>> > > > >>>>>>>>> with > >>>> > > > >>>>>>>>>>> some configured threshold. The benefit of doing so > is > >>>> that > >>>> > > > >>>>>> existing > >>>> > > > >>>>>>>>> Streams > >>>> > > > >>>>>>>>>>> users don't need to change their code, learn a new > >>>> API, but > >>>> > > > >> only > >>>> > > > >>>>>> to > >>>> > > > >>>>>>>>> upgrade > >>>> > > > >>>>>>>>>>> Streams library to get benefits for their > inefficient > >>>> hopping > >>>> > > > >>>>>>> window > >>>> > > > >>>>>>>>>>> implementation. There might be some compatibility > >>>> issues for > >>>> > > > >>>>>> sure, > >>>> > > > >>>>>>> but > >>>> > > > >>>>>>>>>>> worth listing them out for trade-off. > >>>> > > > >>>>>>>>>>> > >>>> > > > >>>>>>>>>>> Boyang > >>>> > > > >>>>>>>>>>> > >>>> > > > >>>>>>>>>>> On Fri, Jul 10, 2020 at 12:40 PM Leah Thomas < > >>>> > > > >>>>>> ltho...@confluent.io > >>>> > > > >>>>>>>> > >>>> > > > >>>>>>>>> wrote: > >>>> > > > >>>>>>>>>>> > >>>> > > > >>>>>>>>>>>> Hey Matthias, > >>>> > > > >>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>> Thanks for pointing that out. I added the following > >>>> to the > >>>> > > > >>>>>> Propose > >>>> > > > >>>>>>>>> Changes > >>>> > > > >>>>>>>>>>>> section of the KIP: > >>>> > > > >>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>> "Records that come out of order will be processed > >>>> the same > >>>> > > way > >>>> > > > >>>>>> as > >>>> > > > >>>>>>>>> in-order > >>>> > > > >>>>>>>>>>>> records, as long as they fall within the grace > >>>> period. Any > >>>> > > new > >>>> > > > >>>>>>> windows > >>>> > > > >>>>>>>>>>>> created by the late record will still be created, > >>>> and the > >>>> > > > >>>>>> existing > >>>> > > > >>>>>>>>> windows > >>>> > > > >>>>>>>>>>>> that are changed by the late record will be > updated. > >>>> Any > >>>> > > > >> record > >>>> > > > >>>>>>> that > >>>> > > > >>>>>>>>> falls > >>>> > > > >>>>>>>>>>>> outside of the grace period (either user defined or > >>>> default) > >>>> > > > >>>>>> will > >>>> > > > >>>>>>> be > >>>> > > > >>>>>>>>>>>> discarded. " > >>>> > > > >>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>> All the best, > >>>> > > > >>>>>>>>>>>> Leah > >>>> > > > >>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>> On Thu, Jul 9, 2020 at 9:47 PM Matthias J. Sax < > >>>> > > > >>>>>> mj...@apache.org> > >>>> > > > >>>>>>>>> wrote: > >>>> > > > >>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>> Leah, > >>>> > > > >>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>> thanks a lot for the KIP. Very well written. > >>>> > > > >>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>> The KIP does not talk about the handling of > >>>> out-of-order > >>>> > > data > >>>> > > > >>>>>>> though. > >>>> > > > >>>>>>>>>>>>> How do you propose to address this? > >>>> > > > >>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>> -Matthias > >>>> > > > >>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>> On 7/8/20 5:33 PM, Leah Thomas wrote: > >>>> > > > >>>>>>>>>>>>>> Hi all, > >>>> > > > >>>>>>>>>>>>>> I'd like to kick-off the discussion for KIP-450, > >>>> adding > >>>> > > > >>>>>> sliding > >>>> > > > >>>>>>>>> window > >>>> > > > >>>>>>>>>>>>>> aggregation support to Kafka Streams. > >>>> > > > >>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>> > >>>> > > > >>>>>>>>> > >>>> > > > >>>>>>> > >>>> > > > >>>>>> > >>>> > > > >>>> > >>>> > > > >> > >>>> > > > >>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL > >>>> > > > >>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>> Let me know what you think, > >>>> > > > >>>>>>>>>>>>>> Leah > >>>> > > > >>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>> > >>>> > > > >>>>>>>>>>> > >>>> > > > >>>>>>>>>> > >>>> > > > >>>>>>>>> > >>>> > > > >>>>>>>> > >>>> > > > >>>>>>>> > >>>> > > > >>>>>>>> -- > >>>> > > > >>>>>>>> -- Guozhang > >>>> > > > >>>>>>>> > >>>> > > > >>>>>>> > >>>> > > > >>>>>> > >>>> > > > >>>>> > >>>> > > > >>>>> > >>>> > > > >>>> > >>>> > > > >>>> > >>>> > > > >>> > >>>> > > > >> > >>>> > > > > > >>>> > > > > >>>> > > > > >>>> > > > Attachments: > >>>> > > > * signature.asc > >>>> > > > >>>> > > >>>> > >>> >