Thanks for the discussion about extending TimeWindows. I agree that making it future proof is important, and the implementation of SlidingWindows is unique enough that it seems logical to make it its own final class.
On that note, I've updated the KIP to make SlidingWindows a stand alone final class, and added the *windowedBy() *API in *KGroupedStream *to handle SlidingWindows. It seems that SlidingWindows would still be able to leverage *TimeWindowedKStream by* creating a SlidingWindows version of *TimeWindowedKStreamImpl* that implements *TimeWindowedKStream. *If anyone sees issues with this implementation, please let me know. Best, Leah On Wed, Jul 22, 2020 at 10:47 PM John Roesler <vvcep...@apache.org> wrote: > Thanks for the reply, Sophie. > > That all sounds about right to me. > > The Windows “interface”/algorithm is quite flexible, so it makes sense for > it to be extensible. Different implementations can (and do) enumerate > different windows to suit different use cases. > > On the other hand, I can’t think of any way to extend SessionWindows to do > something different using the same algorithm, so it makes sense for it to > stay final. > > If we think SlidingWindows is similarly not usefully extensible, then we > can make it final. It’s easy to remove final later, but adding it is not > possible. Or we could go the other route and just make it an interface, on > general principle. Both of these choices are safe API design. > > Thanks again, > John > > On Wed, Jul 22, 2020, at 21:54, Sophie Blee-Goldman wrote: > > > > > > Users could pass in a custom `SessionWindows` as > > > long as the session algorithm works correctly for it. > > > > > > Well not really, SessionWindows is a final class. TimeWindows is also a > > final class, so neither of these can be extended/customized. For a given > > Windows then there would only be three (non-overlapping) possibilities: > > either it's TimeWindows, SlidingWindows, or a custom Windows. I don't > > think there's any problem with determining what the user wants in this > case > > -- > > we would just check if it's a SlidingWindows and connect the new > processor, > > or else connect the existing hopping/tumbling window processor. > > > > I'll admit that last sentence does leave a bad taste in my mouth. Part of > > that > > is probably the "leaking" API Matthias pointed out; we just assume the > > hopping/tumbling window implementation fits all custom windows. But I > guess > > if you really needed to customize the algorithm any further you may as > well > > stick in a transformer and do it all yourself. > > > > Anyways, given what we have, it does seem weird to apply one algorithm > > for most Windows types and then swap in a different one for one specific > > extension of Windows. So adding a new #windowedBy(SlidingWindows) > > sounds reasonable to me. > > > > I'm still not convinced that we need a whole new TimeWindowedKStream > > equivalent class for sliding windows though. It seems like the > > hopping/tumbling > > window implementation could benefit just as much from a subtractor as the > > sliding windows so the only future-proofing we get is the ability to be > > lazy and > > add the subtractor to one but not the other. Of course it would only be > an > > optimization so we could just not apply it to one type and nothing would > > break. > > It does make me nervous to go against the "future-proof" direction, > though. > > Are there any other examples of things we might want to add to one window > > type but not to another? > > > > On another note, this actually brings up a new question: should > > SlidingWindows > > also be final? My take is "yes" since there's no reasonable > customization of > > sliding windows, at least not that I can think of. Thoughts? > > > > > > On Wed, Jul 22, 2020 at 7:15 PM John Roesler <vvcep...@apache.org> > wrote: > > > > > Thanks, all, > > > > > > I can see how my conclusion was kind of a leap. > > > > > > What Matthias said is indeed what I was thinking. When you provide a > > > window definition to the windowBy() method, you are selecting an > algorithm > > > that will be used to compute the windows from the input data. > > > > > > I didn’t mean the code implementation “algorithm”, but the high-level > > > algorithm that describes how the input stream will be transformed into > a > > > sequence of windows. > > > > > > For example, the algorithm for Windows is something like “given the > record > > > timestamp, include the record in each of the enumerated windows”. Note > that > > > there can be a lot of variation in how the windows are enumerated, > which is > > > why there are at least a couple of implementations of Windows, and we > are > > > open to adding more (like for natural calendar boundaries). > > > > > > For SessionWindows, it’s more like “if any window is within the gap, > > > extend its boundaries to include this record and if two windows touch, > then > > > merge them”. > > > > > > Clearly, the algorithm for SlidingWindows doesn’t fall into either > > > category, so it seems inappropriate to claim that it does in the API > (by > > > implementing Windows with stubbed methods) and then cast internally to > > > execute a completely different algorithm. > > > > > > To your other observation, that the DSL object resulting from windowBy > > > would look the same for Windows and SessionWindows, maybe it makes > sense > > > for windowBy(SessionWindows) also to return a TimeWindowedKStream. > > > > > > i.e.: > > > ======================= > > > <W extends Window> TimeWindowedKStream<K, V> windowedBy(final > Windows<W> > > > windows); > > > TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows); > > > ======================= > > > > > > I can’t think of a reason this wouldn’t work. But then again, it > would be > > > more future-proof to go ahead and specify a different return type now, > if > > > we think we'll want to add subtractors and stuff later. I don't have a > > > strong feeling about that part of the API. It seems to be independent > of > > > whether SlidingWindows extends Windows or not. > > > > > > Thanks, > > > -John > > > > > > On Wed, Jul 22, 2020, at 19:41, Matthias J. Sax wrote: > > > > I think what John tries to say is the following: > > > > > > > > We have `windowedBy(Windows)` that accept hopping/tumbling windows > but > > > > also custom window and we use a specific algorithm. Note, that custom > > > > windows must "work" based on the used algorithm. > > > > > > > > For session windows we have `windowedBy(SessionWindows)` and apply a > > > > different algorithm. Users could pass in a custom `SessionWindows` as > > > > long as the session algorithm works correctly for it. > > > > > > > > For the new sliding windows, we want to use a different algorithm > > > > compare to hopping/tumbling windows. If we let sliding window extend > > > > `Windows`, we can decide at runtime if we need to use the > > > > hopping/tumbling window algorithm for hopping/tumbling windows or the > > > > new sliding window algorithm for sliding windows. However, if we get > a > > > > custom window, which algorithm do we pick now? The existing > > > > tumbling/hopping window algorithm of the new sliding window > algorithm? > > > > Both a custom "time-window" and custom "sliding window" implement the > > > > generic `Windows` class and thus we cannot make a decision as we > don't > > > > know the user's intent. > > > > > > > > As a matter of fact, even if the user might not be aware of it, the > > > > algorithm we use does already leak into the API (if a user extends > > > > `Windows` is must work with our hopping/tumbling window algorithm > and if > > > > a user extends `SessionWindows` it must work with our session > algorithm) > > > > and it seems we need to preserve this property for sliding window. > > > > > > > > > > > > -Matthias > > > > > > > > On 7/22/20 4:35 PM, Sophie Blee-Goldman wrote: > > > > > Hey John, > > > > > > > > > > Just a few follow-up questions/comments about the whole Windows > thing: > > > > > > > > > > That's a good way of looking at things; in particular the point > about > > > > > SessionWindows > > > > > for example requiring a Merger while other "statically enumerable" > > > windows > > > > > require > > > > > only an adder seems to touch on the heart of the matter. > > > > > > > > > > It seems like what Time and Universal (and any other Windows > > > > >> implementation) have in common is that the windows are statically > > > > >> enumerable. > > > > >> As a consequence, they can all rely on an aggregation maintenence > > > algorithm > > > > >> that involves enumerating each of the windows and updating it. > That > > > > >> also means that their DSL object (TimeWindowedKStream) doesn't > need > > > > >> "subtractors" or "mergers", but only "adders"; again, this is a > > > consequence > > > > >> of the fact that the windows are enumerable. > > > > > > > > > > > > > > > Given that, I'm a bit confused why you conclude that sliding > windows > > > are > > > > > fundamentally > > > > > different from the "statically enumerable" windows -- sliding > windows > > > > > require only an > > > > > adder too. I'm not sure it's a consequence of being enumerable, or > that > > > > > being enumerable > > > > > is the fundamental property that unites all Windows (ignoring > > > JoinWindows > > > > > here). Yes, it > > > > > currently does apply to all Windows implementations, but we > shouldn't > > > > > assume that it > > > > > *has *to be that way on the basis that it currently happens to be. > > > > > > > > > > Also, the fact that they can all rely on the same aggregation > algorithm > > > > > seems like an > > > > > implementation detail and it would be weird to force a separate/new > > > DSL API > > > > > just because > > > > > under the covers we swap in a different processor. > > > > > > > > > > To be fair, I don't think there's a strong reason *against* not > > > extending > > > > > Windows -- in the end > > > > > it will just mean adding a new #windowedBy method and copy/pasting > > > > > everything from > > > > > TimeWindowedKStream pretty much word for word. But anytime you > find > > > > > yourself > > > > > copying over code almost exactly, there should probably be a good > > > reason > > > > > why :) > > > > > > > > > > > > > > > On Wed, Jul 22, 2020 at 3:48 PM John Roesler <vvcep...@apache.org> > > > wrote: > > > > > > > > > >> Thanks Leah! > > > > >> > > > > >> 5) Regarding the empty windows, I'm wondering if we should simply > > > propose > > > > >> that the windows should not be emitted downstream of the operator > or > > > > >> visible in IQ. Then, it'll be up to the implementation to make it > > > happen. > > > > >> I'm > > > > >> personally not concerned about it, since it seems like there are > > > multiple > > > > >> ways to accomplish this. > > > > >> > > > > >> Note, the discrepancy Matthias pointed out is actually a design > bug. > > > The > > > > >> windowed aggregation (like every operation in Streams) produces a > > > "view", > > > > >> which then forms the basis of downstream operations. When we pass > the > > > > >> Materialized option to the operation, all we're doing is saying to > > > > >> "materialize" > > > > >> the view (aka, actually store the computed view) and also make it > > > > >> queriable. > > > > >> It would be illegal for the "queriable, materialized view" to > differ > > > in > > > > >> any way > > > > >> from the "view". So, it seems we must either propose to emit the > empty > > > > >> windows AND make them visible in IQ, or propose NOT to emit the > empty > > > > >> windows AND NOT make them visible in IQ. > > > > >> > > > > >> 7) Regarding whether we can extend TimeWindows (or Windows): > > > > >> I've been mulling this over more. I think it's worth asking the > > > question of > > > > >> what these classes even mean. For example, why is SessionWindows a > > > > >> different thing from TimeWindows and UniversalWindows (which are > both > > > > >> Windows)? > > > > >> > > > > >> This conversation is extra complicated because of the incomplete > and > > > > >> mis-matched class hierarchy, but we can try to look past it for > now. > > > > >> > > > > >> It seems like what Time and Universal (and any other Windows > > > > >> implementation) have in common is that the windows are statically > > > > >> enumerable. > > > > >> As a consequence, they can all rely on an aggregation maintenence > > > algorithm > > > > >> that involves enumerating each of the windows and updating it. > That > > > > >> also means that their DSL object (TimeWindowedKStream) doesn't > need > > > > >> "subtractors" or "mergers", but only "adders"; again, this is a > > > consequence > > > > >> of the fact that the windows are enumerable. > > > > >> > > > > >> In contrast, session windows are data driven, so they are not > > > statically > > > > >> enumerable. Their algorithm has to rely on scans, and to do the > scans, > > > > >> it needs to know the "inactivity gap", which needs to be part of > the > > > window > > > > >> definition. Likewise, session windows have the property that they > need > > > > >> to be merged, so their DSL object also requires mergers. > > > > >> > > > > >> It really seems like your new window definition doesn't fit into > > > either > > > > >> category. It uses a different algorithm, which relies on scans, > but > > > it is > > > > >> also fixed in size, so it doesn't need mergers. In this > situation, it > > > seems > > > > >> like the safe bet is to just create SessionWindows with no > interface > > > and > > > > >> add a separate set of DSL operations and objects. It's a little > extra > > > code, > > > > >> but it seems to keep everything tidier and more comprehensible, > both > > > > >> for us and for users. > > > > >> > > > > >> What do you think? > > > > >> -John > > > > >> > > > > >> > > > > >> > > > > >> On Wed, Jul 22, 2020, at 10:30, Leah Thomas wrote: > > > > >>> Hi Matthias, > > > > >>> > > > > >>> Thanks for the suggestions, I've updated the KIP and child page > > > > >> accordingly > > > > >>> and addressed some below. > > > > >>> > > > > >>> 1) For the mandatory grace period, we should use a static builder > > > method > > > > >>>> that take two parameters. > > > > >>>> > > > > >>> > > > > >>> That makes sense, I've changed that in the public API. > > > > >>> > > > > >>> Btw: this implementation actually raises an issue for IQ: those > empty > > > > >>>> windows would be returned. > > > > >>> > > > > >>> > > > > >>> This is a great point, with the current implementation plan empty > > > windows > > > > >>> would be returned. I think creating a second window store would > > > > >> definitely > > > > >>> work, but there would be more overhead in having two stores and > > > switching > > > > >>> windows between the stores, as well as doing scans in both > stores to > > > find > > > > >>> existing windows. There might be a way to do avoid emitting empty > > > windows > > > > >>> without creating a second window store, I'll look more into it. > > > > >>> > > > > >>> Cheers, > > > > >>> Leah > > > > >>> > > > > >>> On Tue, Jul 21, 2020 at 1:25 PM Matthias J. Sax < > mj...@apache.org> > > > > >> wrote: > > > > >>> > > > > >>>> Thanks for updating the KIP. > > > > >>>> > > > > >>>> Couple of follow up comments: > > > > >>>> > > > > >>>> 1) For the mandatory grace period, we should use a static > builder > > > > >> method > > > > >>>> that take two parameters. This provides a better API as user > cannot > > > > >>>> forget to set the grace period. Throwing a runtime exception > seems > > > not > > > > >>>> to be the best way to handle this case. > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> 2) In Fig.2 you list 10 hopping windows. I believe it should > > > actually > > > > >> be > > > > >>>> more? There first hopping window would be [-6,-4[ and the last > one > > > > >> would > > > > >>>> be from [19,29[ -- hence, the cost saving are actually much > higher. > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> 3a) IQ: you are saying that the user need to compute the start > time > > > as > > > > >>>> > > > > >>>>> windowSize+the time they're looking at > > > > >>>> > > > > >>>> Should this be "targetTime - windowSize" instead? > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> 3b) IQ: in you example you say "window size of 10 minutes" with > an > > > > >>>> incident at 9:15. > > > > >>>> > > > > >>>>> they're looking for a window with the start time of 8:15. > > > > >>>> > > > > >>>> The example does not seem to add up? > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> 4) For "Processing Windows": you describe a three step > approach: I > > > just > > > > >>>> want to point out, that step (1) is not necessary for each input > > > > >> record, > > > > >>>> because timestamps are not guaranteed to be unique and thus a > > > previous > > > > >>>> record with the same key and timestamp might have create the > windows > > > > >>>> already. > > > > >>>> > > > > >>>> Nit: I am also not exactly sure what you mean by step (3) as > you use > > > > >> the > > > > >>>> word "send". I guess you mean "put"? > > > > >>>> > > > > >>>> It seem there are actually more details in the sub-page: > > > > >>>> > > > > >>>>> A new record for SlidingWindows will always create two new > windows. > > > > >> If > > > > >>>> either of those windows already exist in the windows store, > their > > > > >>>> aggregation will simply be updated to include the new record, > but no > > > > >>>> duplicate window will be added to the WindowStore. > > > > >>>> > > > > >>>> However, the first and second sentence contradict each other a > > > little > > > > >>>> bit. I think the first sentence is not correct. > > > > >>>> > > > > >>>> Nit: > > > > >>>> > > > > >>>>> For in-order records, the left window will always be empty. > > > > >>>> > > > > >>>> This should be "right window" ? > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> 5) "Emitting Results": it might be worth to point out, that a > > > > >>>> second/future window of a new record is create with no records, > and > > > > >>>> thus, even if it's initialized it won't be emitted. Only if a > > > > >>>> consecutive record falls into the window, the window would be > > > updates > > > > >>>> and the window result (for a window content of one record) > would be > > > > >> sent > > > > >>>> downstream. > > > > >>>> > > > > >>>> Again, the sub-page contains this details. Might still be worth > to > > > add > > > > >>>> to the top level page, too. > > > > >>>> > > > > >>>> Btw: this implementation actually raises an issue for IQ: those > > > empty > > > > >>>> windows would be returned. Thus I am wondering if we need to > use two > > > > >>>> stores internally? One store for actual windows and one store > for > > > empty > > > > >>>> windows? If an empty window is updated, it's move to the other > > > store? > > > > >>>> For IQ, we only allow to query the non-empty-window store? > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> 6) On the sub-page: > > > > >>>> > > > > >>>>> The left window of in-order records and both windows for > > > out-of-order > > > > >>>> records need to be updated with the values of records that have > > > already > > > > >>>> been processed. > > > > >>>> > > > > >>>> Why "both windows for out-of-order records"? IMHO, we don't > know how > > > > >>>> many existing windows needs to be updated when processing an > > > > >>>> out-of-order record. Of course, an out-of-order record could not > > > fall > > > > >>>> into any existing window but create two new windows, too. > > > > >>>> > > > > >>>>> Because each record creates one new window that includes > itself > > > and > > > > >> one > > > > >>>> window that does not > > > > >>>> > > > > >>>> As state above, this does not seem to hold. I understand why you > > > mean, > > > > >>>> but it would be good to be exact. > > > > >>>> > > > > >>>> Figure 2: You use the term "late" but you mean "out-of-order" I > > > guess > > > > >> -- > > > > >>>> a record is _late_ if it's not processed any longer as the grace > > > period > > > > >>>> passed already. > > > > >>>> > > > > >>>> Figure 2: "Late" should be out-or-order. The example text say a > > > window > > > > >>>> [16,26] should be created but the figure shows the green window > as > > > > >> [15,20]. > > > > >>>> > > > > >>>> About the blue window: maybe add not that the blue window > contains > > > the > > > > >>>> aggregate we need for the green window, _before_ the new record > `a` > > > is > > > > >>>> added to the blue window. > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> 7) I am not really happy to extend TimeWindows and I think the > > > argument > > > > >>>> about JoinWindows is not the best (IMHO, JoinWindows do it > already > > > > >> wrong > > > > >>>> and we just repeat the same mistake). However, it seems our > window > > > > >>>> hierarchy is "broken" already and it might be out of scope for > this > > > KIP > > > > >>>> to fix it. Hence, I am ok that we bite the bullet for now and > clean > > > it > > > > >>>> up later. > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> -Matthias > > > > >>>> > > > > >>>> > > > > >>>> On 7/20/20 5:18 PM, Guozhang Wang wrote: > > > > >>>>> Hi Leah, > > > > >>>>> > > > > >>>>> Thanks for the updated KIP. I agree that extending > SlidingWindows > > > > >> from > > > > >>>>> Windows is fine for the sake of not introducing more public > APIs > > > (and > > > > >>>> their > > > > >>>>> internal xxxImpl classes), and its cons is small enough to > tolerate > > > > >> to > > > > >>>> me. > > > > >>>>> > > > > >>>>> > > > > >>>>> Guozhang > > > > >>>>> > > > > >>>>> > > > > >>>>> On Mon, Jul 20, 2020 at 1:49 PM Leah Thomas < > ltho...@confluent.io> > > > > >>>> wrote: > > > > >>>>> > > > > >>>>>> Hi all, > > > > >>>>>> > > > > >>>>>> Thanks for the feedback on the KIP. I've updated the KIP page > > > > >>>>>> < > > > > >>>>>> > > > > >>>> > > > > >> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL > > > > >>>>>>> > > > > >>>>>> to address these points and have created a child page > > > > >>>>>> < > > > > >>>>>> > > > > >>>> > > > > >> > > > > https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450 > > > > >>>>>>> > > > > >>>>>> to go more in depth on certain implementation details. > > > > >>>>>> > > > > >>>>>> *Grace Period:* > > > > >>>>>> I think Sophie raises a good point that the default grace > period > > > of > > > > >> 24 > > > > >>>>>> hours is often too long and was chosen when retention time and > > > grace > > > > >>>> period > > > > >>>>>> were the same. For SlidingWindows, I propose we make the grace > > > > >> period > > > > >>>>>> mandatory. To keep formatting consistent with other types of > > > > >> windows, > > > > >>>> grace > > > > >>>>>> period won't be an additional parameter in the #of method, but > > > will > > > > >>>> still > > > > >>>>>> look like it does in other use cases: > > > > >>>>>> > .windowedBy(SlidingWindows.of(twentySeconds).grace(fiftySeconds). > > > If > > > > >>>> grace > > > > >>>>>> period isn't properly initialized, an error will be thrown > through > > > > >> the > > > > >>>>>> process method. > > > > >>>>>> > > > > >>>>>> *Storage Layer + Aggregation:* > > > > >>>>>> SlidingWindows will use a WindowStore because computation can > be > > > > >> done > > > > >>>> with > > > > >>>>>> the information stored in a WindowStore (window timestamp and > > > > >> value). > > > > >>>> Using > > > > >>>>>> the WindowStore also simplifies computation as SlidingWindows > can > > > > >>>> leverage > > > > >>>>>> existing processes. Because we are using a WindowStore, the > > > > >> aggregation > > > > >>>>>> process will be similar to that of a hopping window. As > records > > > > >> come in > > > > >>>>>> their value is added to the aggregation that already exists, > > > > >> following > > > > >>>> the > > > > >>>>>> same procedure as hopping windows. The aggregation difference > > > > >> between > > > > >>>>>> SlidingWindows and HoppingWindows comes in creating new > windows > > > for > > > > >> a > > > > >>>>>> SlidingWindow, where you need to find the existing records > that > > > > >> belong > > > > >>>> to > > > > >>>>>> the new window. This computation is similar to the > aggregation in > > > > >>>>>> SessionWindows and requires a scan to the WindowStore to find > the > > > > >> window > > > > >>>>>> with the aggregation needed, which will always be > pre-computed. > > > The > > > > >> scan > > > > >>>>>> requires creating an iterator, but should have minimal > performance > > > > >>>> effects > > > > >>>>>> as this strategy is already implemented in SessionWindows. > More > > > > >> details > > > > >>>> on > > > > >>>>>> finding the aggregation that needs to be put in a new window > can > > > be > > > > >>>> found > > > > >>>>>> on the implementation page > > > > >>>>>> < > > > > >>>>>> > > > > >>>> > > > > >> > > > > https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450 > > > > >>>>>>> > > > > >>>>>> . > > > > >>>>>> > > > > >>>>>> *Extending Windows<TimeWindow>, Windows<Window> or nothing* > > > > >>>>>> Because SlidingWindows are still defined by a windowSize > (whereas > > > > >>>>>> SessionWindows are defined purely by data), I think it makes > sense > > > > >> to > > > > >>>>>> leverage the existing Window processes instead of creating a > new > > > > >> store > > > > >>>> type > > > > >>>>>> that would be very similar to the WindowStore. While it's true > > > that > > > > >> the > > > > >>>>>> #windowsFor method isn't necessary for SlidingWindows, > JoinWindows > > > > >> also > > > > >>>>>> extends Windows<Window> and throws an > > > UnsupportedOperationException > > > > >> in > > > > >>>> the > > > > >>>>>> #windowsFor method, which is what SlidingWindows can do. The > > > > >> difference > > > > >>>>>> between extending Windows<TimeWindow> or Windows<Window> is > > > > >> minimal, as > > > > >>>>>> both are ways to pass window parameters. Extending > > > > >> Windows<TimeWindow> > > > > >>>> will > > > > >>>>>> give us more leverage in utilizing existing processes. > > > > >>>>>> > > > > >>>>>> *Emit Strategy* > > > > >>>>>> I would argue that emitting for every update is still the > best way > > > > >> to go > > > > >>>>>> for SlidingWindows because it mimics the other types of > windows, > > > and > > > > >>>>>> suppression can be leveraged to limit what SlidingWindows > emits. > > > > >> While > > > > >>>> some > > > > >>>>>> users may only want to see the last value, others may want to > see > > > > >> more, > > > > >>>> and > > > > >>>>>> leaving the emit strategy to emit partial results allows both > > > users > > > > >> to > > > > >>>>>> access what they want. > > > > >>>>>> > > > > >>>>>> *Additional Features* > > > > >>>>>> Supporting sliding windows inherently, and shifting > inefficient > > > > >> hopping > > > > >>>>>> windows to sliding windows, is an interesting idea and could > be > > > > >> built on > > > > >>>>>> top of SlidingWindows when they are finished, but right now > seems > > > > >> out of > > > > >>>>>> scope for the needs of this KIP. Similarly, including a > > > > >> `subtraction` > > > > >>>>>> feature could have performance improvements, but doesn't seem > > > > >> necessary > > > > >>>> for > > > > >>>>>> the implementation of this KIP. > > > > >>>>>> > > > > >>>>>> Let me know what you think of the updates, > > > > >>>>>> > > > > >>>>>> Leah > > > > >>>>>> > > > > >>>>>> On Thu, Jul 16, 2020 at 11:57 AM John Roesler < > > > vvcep...@apache.org> > > > > >>>> wrote: > > > > >>>>>> > > > > >>>>>>> Hello all, > > > > >>>>>>> > > > > >>>>>>> Thanks for the KIP, Leah! > > > > >>>>>>> > > > > >>>>>>> Regarding (1): I'd go farther actually. Making Windows an > > > abstract > > > > >>>>>>> class was a mistake from the beginning that led to us not > being > > > > >>>>>>> able to fix a very confusing situation for users around > retention > > > > >>>> times, > > > > >>>>>>> final results emitting, etc. Thus, I would not suggest > extending > > > > >>>>>>> TimeWindows for sure, but would also not suggest extending > > > Windows. > > > > >>>>>>> > > > > >>>>>>> The very simplest thing to do is follow the example of > > > > >> SessionWindows, > > > > >>>>>>> which is just a completely self-contained class. If we don't > mess > > > > >> with > > > > >>>>>>> class inheritance, we won't ever have any of the problems > related > > > > >> to > > > > >>>>>>> class inheritance. This is my preferred solution. > > > > >>>>>>> > > > > >>>>>>> Still, Sliding windows has a lot in common with TimeWindows > and > > > > >> other > > > > >>>>>>> fixed-size windows, namely that the windows are fixed in > size. If > > > > >> we > > > > >>>> want > > > > >>>>>>> to preserve the current two-part windowing API in which you > can > > > > >> window > > > > >>>>>>> by either "fixed" or "data driven" modes, I'd suggest we > avoid > > > > >>>> increasing > > > > >>>>>>> the blast radius of Windows by taking the opportunity to > replace > > > it > > > > >>>> with > > > > >>>>>>> a proper interface and implement that interface instead. > > > > >>>>>>> > > > > >>>>>>> For example: > > > > >>>>>>> https://github.com/apache/kafka/pull/9031 > > > > >>>>>>> > > > > >>>>>>> Then, SlidingWindows would just implement > > > FixedSizeWindowDefinition > > > > >>>>>>> > > > > >>>>>>> ====== > > > > >>>>>>> > > > > >>>>>>> Regarding (2), it seems more straightforward as a user of > Streams > > > > >>>>>>> to just have one mental model. _All_ of our aggregation > > > operations > > > > >>>>>>> follow an eager emission model, in which we just emit an > update > > > > >>>> whenever > > > > >>>>>>> an update is available. We already provided Suppression to > > > > >> explicitly > > > > >>>>>> apply > > > > >>>>>>> different update semantics in the case it's required. Why > should > > > we > > > > >>>>>> define > > > > >>>>>>> a snowflake operation with completely different semantics > from > > > > >>>> everything > > > > >>>>>>> else? I.e., systems are generally easier to use when they > follow > > > a > > > > >> few > > > > >>>>>>> simple, composable rules than when they have a lot of > different, > > > > >>>> specific > > > > >>>>>>> rules. > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> ====== > > > > >>>>>>> > > > > >>>>>>> New point: (4): > > > > >>>>>>> It would be nice to include some examples of user code that > would > > > > >> use > > > > >>>> the > > > > >>>>>>> new API, which should include: > > > > >>>>>>> 1. using the DSL with the sliding window definition > > > > >>>>>>> 2. accessing the stored results of a sliding window > aggregation > > > > >> via IQ > > > > >>>>>>> 3. defining a custom processor to access sliding windows in a > > > store > > > > >>>>>>> > > > > >>>>>>> It generally helps reviewers wrap their heads around the > > > proposal, > > > > >> as > > > > >>>>>> well > > > > >>>>>>> as shaking out any design issues that would otherwise only > come > > > up > > > > >>>> during > > > > >>>>>>> implementation/testing/review. > > > > >>>>>>> > > > > >>>>>>> Thanks again for the awesome proposal! > > > > >>>>>>> -John > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> On Tue, Jul 14, 2020, at 12:31, Guozhang Wang wrote: > > > > >>>>>>>> Hello Leah, > > > > >>>>>>>> > > > > >>>>>>>> Thanks for the nice written KIP. A few thoughts: > > > > >>>>>>>> > > > > >>>>>>>> 1) I echo the other reviewer's comments regarding the > typing: > > > why > > > > >>>>>>> extending > > > > >>>>>>>> TimeWindow instead of just extending Window? > > > > >>>>>>>> > > > > >>>>>>>> 2) I also feel that emitting policy for this type of > windowing > > > > >>>>>>> aggregation > > > > >>>>>>>> may be different from the existing ones. Existing emitting > > > policy > > > > >> is > > > > >>>>>> very > > > > >>>>>>>> simple: emit every time when window get updates, and emit > every > > > > >> time > > > > >>>> on > > > > >>>>>>>> out-of-ordering data within grace period, this is because > for > > > > >>>>>>> time-windows > > > > >>>>>>>> the window close time is strictly depend on the window start > > > time > > > > >>>> which > > > > >>>>>>> is > > > > >>>>>>>> fixed, while for session-windows although the window > open/close > > > > >> time > > > > >>>> is > > > > >>>>>>>> also data-dependent it is relatively infrequent compared to > the > > > > >>>>>>>> sliding-windows. For this KIP, since each new data would > cause a > > > > >>>>>>>> new sliding-window, the num. windows maintained logically > could > > > be > > > > >>>> much > > > > >>>>>>>> larger and hence emitting on each update may be too > aggressive. > > > > >>>>>>>> > > > > >>>>>>>> 3) Although KIP itself should be focusing on user face > > > > >> interfaces, I'd > > > > >>>>>>>> suggest we create a children page of KIP-450 discussing > about > > > its > > > > >>>>>>>> implementations as well, since some of that may drive the > > > > >> interface > > > > >>>>>>> design. > > > > >>>>>>>> E.g. personally I think having a combiner interface in > addition > > > to > > > > >>>>>>>> aggregator would be useful but that's based on my 2cents > about > > > the > > > > >>>>>>>> implementation design (I once created a child page > describing > > > it: > > > > >>>>>>>> > > > > >>>>>>> > > > > >>>>>> > > > > >>>> > > > > >> > > > > https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation > > > > >>>>>>>> ). > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> Guozhang > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> On Tue, Jul 14, 2020 at 5:31 AM Bruno Cadonna < > > > br...@confluent.io > > > > >>> > > > > >>>>>>> wrote: > > > > >>>>>>>> > > > > >>>>>>>>> Hi Leah, > > > > >>>>>>>>> > > > > >>>>>>>>> Thank you for the KIP! > > > > >>>>>>>>> > > > > >>>>>>>>> Here is my feedback: > > > > >>>>>>>>> > > > > >>>>>>>>> 1. The KIP would benefit from some code examples that show > how > > > > >> to use > > > > >>>>>>>>> sliding windows in aggregations. > > > > >>>>>>>>> > > > > >>>>>>>>> 2. The different sliding windows in Figure 1 and 2 are > really > > > > >> hard to > > > > >>>>>>>>> distinguish. Could you please try to make them graphically > > > better > > > > >>>>>>>>> distinguishable? You could try to draw the frames of > > > consecutive > > > > >>>>>>>>> windows shifted to each other. > > > > >>>>>>>>> > > > > >>>>>>>>> 3. I agree with Matthias, that extending > Windows<TimeWindow> > > > > >> does not > > > > >>>>>>>>> seem to be the best approach. What would be the result of > > > > >>>>>>>>> windowsFor()? > > > > >>>>>>>>> > > > > >>>>>>>>> 4. In the section "Public Interfaces" you should remove > > > > >>>>>> implementation > > > > >>>>>>>>> details like private constructors and private fields. > > > > >>>>>>>>> > > > > >>>>>>>>> 5. Do we need a new store interface or can we use > WindowStore? > > > > >> Some > > > > >>>>>>>>> words about that would be informative. > > > > >>>>>>>>> > > > > >>>>>>>>> 6. @Matthias, if the subtrator is not strictly needed, I > would > > > > >> skip > > > > >>>>>> it > > > > >>>>>>>>> for now and add it later. > > > > >>>>>>>>> > > > > >>>>>>>>> 7. I also agree that having a section that describes how to > > > > >> handle > > > > >>>>>>>>> out-of-order records would be good to understand what is > still > > > > >>>>>> missing > > > > >>>>>>>>> and what we can reuse. > > > > >>>>>>>>> > > > > >>>>>>>>> Best, > > > > >>>>>>>>> Bruno > > > > >>>>>>>>> > > > > >>>>>>>>> On Sat, Jul 11, 2020 at 9:16 PM Matthias J. Sax < > > > > >> mj...@apache.org> > > > > >>>>>>> wrote: > > > > >>>>>>>>>> > > > > >>>>>>>>>> Leah, > > > > >>>>>>>>>> > > > > >>>>>>>>>> thanks for your update. However, it does not completely > answer > > > > >> my > > > > >>>>>>>>> question. > > > > >>>>>>>>>> > > > > >>>>>>>>>> In our current window implementations, we emit a window > result > > > > >>>>>> update > > > > >>>>>>>>>> record (ie, early/partial result) for each input record. > When > > > an > > > > >>>>>>>>>> out-of-order record arrives, we just update to > corresponding > > > old > > > > >>>>>>> window > > > > >>>>>>>>>> and emit another update. > > > > >>>>>>>>>> > > > > >>>>>>>>>> It's unclear from the KIP if you propose the same emit > > > > >> strategy? -- > > > > >>>>>>> For > > > > >>>>>>>>>> sliding windows it might be worth to consider to use a > > > different > > > > >>>>>> emit > > > > >>>>>>>>>> strategy and only support emitting the final result only > (ie, > > > > >> after > > > > >>>>>>> the > > > > >>>>>>>>>> grace period passed)? > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> Boyang, also raises a good point that relates to my point > from > > > > >>>>>> above > > > > >>>>>>>>>> about pre-aggregations and storage layout. Our current > time > > > > >> windows > > > > >>>>>>> are > > > > >>>>>>>>>> all pre-aggregated and stored in parallel. We can also > lookup > > > > >>>>>> windows > > > > >>>>>>>>>> efficiently, as we can compute the windowed-key given the > > > input > > > > >>>>>>> record > > > > >>>>>>>>>> key and timestamp based on the window definition. > > > > >>>>>>>>>> > > > > >>>>>>>>>> However, for sliding windows, window boundaries are data > > > > >> dependent > > > > >>>>>>> and > > > > >>>>>>>>>> thus we cannot compute them upfront. Thus, how can we > "find" > > > > >>>>>> existing > > > > >>>>>>>>>> window efficiently? Furthermore, out-of-order data would > > > create > > > > >> new > > > > >>>>>>>>>> windows in the past and we need to be able to handle this > > > case. > > > > >>>>>>>>>> > > > > >>>>>>>>>> Thus, to handle out-of-order data correctly, we need to > store > > > > >> all > > > > >>>>>> raw > > > > >>>>>>>>>> input events. Additionally, we could also store > pre-aggregated > > > > >>>>>>> results > > > > >>>>>>>>>> if we thinks it's benfitial. -- If we apply "emit only > final > > > > >>>>>> results" > > > > >>>>>>>>>> strategy, storing pre-aggregated result would not be > necessary > > > > >>>>>>> though. > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> Btw: for sliding windows it might also be useful to > consider > > > > >>>>>> allowing > > > > >>>>>>>>>> users to supply a `Subtractor` -- this subtractor could be > > > > >> applied > > > > >>>>>> on > > > > >>>>>>>>>> the current window result (in case we store it) if a > record > > > > >> drops > > > > >>>>>>> out of > > > > >>>>>>>>>> the window. Of course, not all aggregation functions are > > > > >>>>>> subtractable > > > > >>>>>>>>>> and we can consider this as a follow up task, too, and not > > > > >> include > > > > >>>>>> in > > > > >>>>>>>>>> this KIP for now. Thoughts? > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> I was also thinking about the type hierarchy. I am not > sure if > > > > >>>>>>> extending > > > > >>>>>>>>>> TimeWindow is the best approach? For TimeWindows, we can > > > > >>>>>> pre-compute > > > > >>>>>>>>>> window boundaries (cf `windowsFor()`) while for a sliding > > > window > > > > >>>>>> the > > > > >>>>>>>>>> boundaries are data dependent. Session windows are also > data > > > > >>>>>>> dependent > > > > >>>>>>>>>> and thus they don't inherit from TimeWindow (Maybe check > out > > > the > > > > >>>>>> KIP > > > > >>>>>>>>>> that added session windows? It could provides some good > > > > >> insights.) > > > > >>>>>>> -- I > > > > >>>>>>>>>> believe the same rational applies to sliding windows? > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> -Matthias > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> On 7/10/20 12:47 PM, Boyang Chen wrote: > > > > >>>>>>>>>>> Thanks Leah and Sophie for the KIP. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> 1. I'm a bit surprised that we don't have an advance > time. > > > > >> Could > > > > >>>>>> we > > > > >>>>>>>>>>> elaborate how the storage layer is structured? > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> 2. IIUC, there will be extra cost in terms of fetching > > > > >>>>>> aggregation > > > > >>>>>>>>> results, > > > > >>>>>>>>>>> since we couldn't pre-aggregate until the user asks for > it. > > > > >> Would > > > > >>>>>>> be > > > > >>>>>>>>> good > > > > >>>>>>>>>>> to also discuss it. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> 3. We haven't discussed the possibility of supporting > sliding > > > > >>>>>>> windows > > > > >>>>>>>>>>> inherently. For a user who actually uses a hopping > window, > > > > >>>>>> Streams > > > > >>>>>>>>> could > > > > >>>>>>>>>>> detect such an inefficiency doing a > window_size/advance_time > > > > >>>>>> ratio > > > > >>>>>>> to > > > > >>>>>>>>> reach > > > > >>>>>>>>>>> a conclusion on whether the write amplification is too > high > > > > >>>>>>> compared > > > > >>>>>>>>> with > > > > >>>>>>>>>>> some configured threshold. The benefit of doing so is > that > > > > >>>>>> existing > > > > >>>>>>>>> Streams > > > > >>>>>>>>>>> users don't need to change their code, learn a new API, > but > > > > >> only > > > > >>>>>> to > > > > >>>>>>>>> upgrade > > > > >>>>>>>>>>> Streams library to get benefits for their inefficient > hopping > > > > >>>>>>> window > > > > >>>>>>>>>>> implementation. There might be some compatibility issues > for > > > > >>>>>> sure, > > > > >>>>>>> but > > > > >>>>>>>>>>> worth listing them out for trade-off. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Boyang > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> On Fri, Jul 10, 2020 at 12:40 PM Leah Thomas < > > > > >>>>>> ltho...@confluent.io > > > > >>>>>>>> > > > > >>>>>>>>> wrote: > > > > >>>>>>>>>>> > > > > >>>>>>>>>>>> Hey Matthias, > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Thanks for pointing that out. I added the following to > the > > > > >>>>>> Propose > > > > >>>>>>>>> Changes > > > > >>>>>>>>>>>> section of the KIP: > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> "Records that come out of order will be processed the > same > > > way > > > > >>>>>> as > > > > >>>>>>>>> in-order > > > > >>>>>>>>>>>> records, as long as they fall within the grace period. > Any > > > new > > > > >>>>>>> windows > > > > >>>>>>>>>>>> created by the late record will still be created, and > the > > > > >>>>>> existing > > > > >>>>>>>>> windows > > > > >>>>>>>>>>>> that are changed by the late record will be updated. Any > > > > >> record > > > > >>>>>>> that > > > > >>>>>>>>> falls > > > > >>>>>>>>>>>> outside of the grace period (either user defined or > default) > > > > >>>>>> will > > > > >>>>>>> be > > > > >>>>>>>>>>>> discarded. " > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> All the best, > > > > >>>>>>>>>>>> Leah > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> On Thu, Jul 9, 2020 at 9:47 PM Matthias J. Sax < > > > > >>>>>> mj...@apache.org> > > > > >>>>>>>>> wrote: > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>>> Leah, > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> thanks a lot for the KIP. Very well written. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> The KIP does not talk about the handling of > out-of-order > > > data > > > > >>>>>>> though. > > > > >>>>>>>>>>>>> How do you propose to address this? > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> -Matthias > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> On 7/8/20 5:33 PM, Leah Thomas wrote: > > > > >>>>>>>>>>>>>> Hi all, > > > > >>>>>>>>>>>>>> I'd like to kick-off the discussion for KIP-450, > adding > > > > >>>>>> sliding > > > > >>>>>>>>> window > > > > >>>>>>>>>>>>>> aggregation support to Kafka Streams. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>> > > > > >>>>>> > > > > >>>> > > > > >> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Let me know what you think, > > > > >>>>>>>>>>>>>> Leah > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> -- > > > > >>>>>>>> -- Guozhang > > > > >>>>>>>> > > > > >>>>>>> > > > > >>>>>> > > > > >>>>> > > > > >>>>> > > > > >>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > Attachments: > > > > * signature.asc > > > > > >