Another minor tweak, instead of defining the window by the *size*, it will be defined by *timeDifference*, which is the maximum time difference between two events. This is a more precise way to define a window due to its inclusive ends, while allowing the user to create the window they expect. This definition fits with the current examples, where a record at *10* would fall into a window of time difference *5* from *[5,10]*. This window contains any records at 5, 6, 7, 8, 9, and 10, which is 6 instances instead of 5. This semantic difference is why I've shifted *size* to *timeDifference*.
The new builder will be *withTimeDifferenceAndGrace*, keeping with other conventions. Let me know if there are any concerns! The vote thread is open as well here: http://mail-archives.apache.org/mod_mbox/kafka-dev/202007.mbox/browser Best, Leah On Mon, Jul 27, 2020 at 3:58 PM Leah Thomas <ltho...@confluent.io> wrote: > A small tweak - to make it more clear to users that grace is required, as > well as cleaning up some of the confusing grammar semantics of windows, the > main builder method for *slidingWindows* will be *withSizeAndGrace* instead > of *of*. This looks like it'll be the last change (for now) on the > public API. If anyone has any comments or thoughts let me know. Otherwise, > I'll take this to vote shortly. > > Best, > Leah > > On Fri, Jul 24, 2020 at 3:45 PM Leah Thomas <ltho...@confluent.io> wrote: > >> To accommodate the change to a final class, I've added another >> *windowedBy()* function in *CogroupedKStream.java *to handle >> SlidingWindows. >> >> As far as the discussion goes, I think this is the last change we've >> talked about. If anyone has other comments or concerns, please let me know! >> >> Cheers, >> Leah >> >> On Thu, Jul 23, 2020 at 7:34 PM Leah Thomas <ltho...@confluent.io> wrote: >> >>> Thanks for the discussion about extending TimeWindows. I agree that >>> making it future proof is important, and the implementation of >>> SlidingWindows is unique enough that it seems logical to make it its own >>> final class. >>> >>> On that note, I've updated the KIP to make SlidingWindows a stand alone >>> final class, and added the *windowedBy() *API in *KGroupedStream *to >>> handle SlidingWindows. It seems that SlidingWindows would still be able to >>> leverage *TimeWindowedKStream by* creating a SlidingWindows version of >>> *TimeWindowedKStreamImpl* that implements *TimeWindowedKStream. *If >>> anyone sees issues with this implementation, please let me know. >>> >>> Best, >>> Leah >>> >>> On Wed, Jul 22, 2020 at 10:47 PM John Roesler <vvcep...@apache.org> >>> wrote: >>> >>>> Thanks for the reply, Sophie. >>>> >>>> That all sounds about right to me. >>>> >>>> The Windows “interface”/algorithm is quite flexible, so it makes sense >>>> for it to be extensible. Different implementations can (and do) enumerate >>>> different windows to suit different use cases. >>>> >>>> On the other hand, I can’t think of any way to extend SessionWindows to >>>> do something different using the same algorithm, so it makes sense for it >>>> to stay final. >>>> >>>> If we think SlidingWindows is similarly not usefully extensible, then >>>> we can make it final. It’s easy to remove final later, but adding it is not >>>> possible. Or we could go the other route and just make it an interface, on >>>> general principle. Both of these choices are safe API design. >>>> >>>> Thanks again, >>>> John >>>> >>>> On Wed, Jul 22, 2020, at 21:54, Sophie Blee-Goldman wrote: >>>> > > >>>> > > Users could pass in a custom `SessionWindows` as >>>> > > long as the session algorithm works correctly for it. >>>> > >>>> > >>>> > Well not really, SessionWindows is a final class. TimeWindows is also >>>> a >>>> > final class, so neither of these can be extended/customized. For a >>>> given >>>> > Windows then there would only be three (non-overlapping) >>>> possibilities: >>>> > either it's TimeWindows, SlidingWindows, or a custom Windows. I don't >>>> > think there's any problem with determining what the user wants in >>>> this case >>>> > -- >>>> > we would just check if it's a SlidingWindows and connect the new >>>> processor, >>>> > or else connect the existing hopping/tumbling window processor. >>>> > >>>> > I'll admit that last sentence does leave a bad taste in my mouth. >>>> Part of >>>> > that >>>> > is probably the "leaking" API Matthias pointed out; we just assume the >>>> > hopping/tumbling window implementation fits all custom windows. But I >>>> guess >>>> > if you really needed to customize the algorithm any further you may >>>> as well >>>> > stick in a transformer and do it all yourself. >>>> > >>>> > Anyways, given what we have, it does seem weird to apply one algorithm >>>> > for most Windows types and then swap in a different one for one >>>> specific >>>> > extension of Windows. So adding a new #windowedBy(SlidingWindows) >>>> > sounds reasonable to me. >>>> > >>>> > I'm still not convinced that we need a whole new TimeWindowedKStream >>>> > equivalent class for sliding windows though. It seems like the >>>> > hopping/tumbling >>>> > window implementation could benefit just as much from a subtractor as >>>> the >>>> > sliding windows so the only future-proofing we get is the ability to >>>> be >>>> > lazy and >>>> > add the subtractor to one but not the other. Of course it would only >>>> be an >>>> > optimization so we could just not apply it to one type and nothing >>>> would >>>> > break. >>>> > It does make me nervous to go against the "future-proof" direction, >>>> though. >>>> > Are there any other examples of things we might want to add to one >>>> window >>>> > type but not to another? >>>> > >>>> > On another note, this actually brings up a new question: should >>>> > SlidingWindows >>>> > also be final? My take is "yes" since there's no reasonable >>>> customization of >>>> > sliding windows, at least not that I can think of. Thoughts? >>>> > >>>> > >>>> > On Wed, Jul 22, 2020 at 7:15 PM John Roesler <vvcep...@apache.org> >>>> wrote: >>>> > >>>> > > Thanks, all, >>>> > > >>>> > > I can see how my conclusion was kind of a leap. >>>> > > >>>> > > What Matthias said is indeed what I was thinking. When you provide a >>>> > > window definition to the windowBy() method, you are selecting an >>>> algorithm >>>> > > that will be used to compute the windows from the input data. >>>> > > >>>> > > I didn’t mean the code implementation “algorithm”, but the >>>> high-level >>>> > > algorithm that describes how the input stream will be transformed >>>> into a >>>> > > sequence of windows. >>>> > > >>>> > > For example, the algorithm for Windows is something like “given the >>>> record >>>> > > timestamp, include the record in each of the enumerated windows”. >>>> Note that >>>> > > there can be a lot of variation in how the windows are enumerated, >>>> which is >>>> > > why there are at least a couple of implementations of Windows, and >>>> we are >>>> > > open to adding more (like for natural calendar boundaries). >>>> > > >>>> > > For SessionWindows, it’s more like “if any window is within the gap, >>>> > > extend its boundaries to include this record and if two windows >>>> touch, then >>>> > > merge them”. >>>> > > >>>> > > Clearly, the algorithm for SlidingWindows doesn’t fall into either >>>> > > category, so it seems inappropriate to claim that it does in the >>>> API (by >>>> > > implementing Windows with stubbed methods) and then cast internally >>>> to >>>> > > execute a completely different algorithm. >>>> > > >>>> > > To your other observation, that the DSL object resulting from >>>> windowBy >>>> > > would look the same for Windows and SessionWindows, maybe it makes >>>> sense >>>> > > for windowBy(SessionWindows) also to return a TimeWindowedKStream. >>>> > > >>>> > > i.e.: >>>> > > ======================= >>>> > > <W extends Window> TimeWindowedKStream<K, V> windowedBy(final >>>> Windows<W> >>>> > > windows); >>>> > > TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows); >>>> > > ======================= >>>> > > >>>> > > I can’t think of a reason this wouldn’t work. But then again, it >>>> would be >>>> > > more future-proof to go ahead and specify a different return type >>>> now, if >>>> > > we think we'll want to add subtractors and stuff later. I don't >>>> have a >>>> > > strong feeling about that part of the API. It seems to be >>>> independent of >>>> > > whether SlidingWindows extends Windows or not. >>>> > > >>>> > > Thanks, >>>> > > -John >>>> > > >>>> > > On Wed, Jul 22, 2020, at 19:41, Matthias J. Sax wrote: >>>> > > > I think what John tries to say is the following: >>>> > > > >>>> > > > We have `windowedBy(Windows)` that accept hopping/tumbling >>>> windows but >>>> > > > also custom window and we use a specific algorithm. Note, that >>>> custom >>>> > > > windows must "work" based on the used algorithm. >>>> > > > >>>> > > > For session windows we have `windowedBy(SessionWindows)` and >>>> apply a >>>> > > > different algorithm. Users could pass in a custom >>>> `SessionWindows` as >>>> > > > long as the session algorithm works correctly for it. >>>> > > > >>>> > > > For the new sliding windows, we want to use a different algorithm >>>> > > > compare to hopping/tumbling windows. If we let sliding window >>>> extend >>>> > > > `Windows`, we can decide at runtime if we need to use the >>>> > > > hopping/tumbling window algorithm for hopping/tumbling windows or >>>> the >>>> > > > new sliding window algorithm for sliding windows. However, if we >>>> get a >>>> > > > custom window, which algorithm do we pick now? The existing >>>> > > > tumbling/hopping window algorithm of the new sliding window >>>> algorithm? >>>> > > > Both a custom "time-window" and custom "sliding window" implement >>>> the >>>> > > > generic `Windows` class and thus we cannot make a decision as we >>>> don't >>>> > > > know the user's intent. >>>> > > > >>>> > > > As a matter of fact, even if the user might not be aware of it, >>>> the >>>> > > > algorithm we use does already leak into the API (if a user extends >>>> > > > `Windows` is must work with our hopping/tumbling window algorithm >>>> and if >>>> > > > a user extends `SessionWindows` it must work with our session >>>> algorithm) >>>> > > > and it seems we need to preserve this property for sliding window. >>>> > > > >>>> > > > >>>> > > > -Matthias >>>> > > > >>>> > > > On 7/22/20 4:35 PM, Sophie Blee-Goldman wrote: >>>> > > > > Hey John, >>>> > > > > >>>> > > > > Just a few follow-up questions/comments about the whole Windows >>>> thing: >>>> > > > > >>>> > > > > That's a good way of looking at things; in particular the point >>>> about >>>> > > > > SessionWindows >>>> > > > > for example requiring a Merger while other "statically >>>> enumerable" >>>> > > windows >>>> > > > > require >>>> > > > > only an adder seems to touch on the heart of the matter. >>>> > > > > >>>> > > > > It seems like what Time and Universal (and any other Windows >>>> > > > >> implementation) have in common is that the windows are >>>> statically >>>> > > > >> enumerable. >>>> > > > >> As a consequence, they can all rely on an aggregation >>>> maintenence >>>> > > algorithm >>>> > > > >> that involves enumerating each of the windows and updating it. >>>> That >>>> > > > >> also means that their DSL object (TimeWindowedKStream) doesn't >>>> need >>>> > > > >> "subtractors" or "mergers", but only "adders"; again, this is a >>>> > > consequence >>>> > > > >> of the fact that the windows are enumerable. >>>> > > > > >>>> > > > > >>>> > > > > Given that, I'm a bit confused why you conclude that sliding >>>> windows >>>> > > are >>>> > > > > fundamentally >>>> > > > > different from the "statically enumerable" windows -- sliding >>>> windows >>>> > > > > require only an >>>> > > > > adder too. I'm not sure it's a consequence of being enumerable, >>>> or that >>>> > > > > being enumerable >>>> > > > > is the fundamental property that unites all Windows (ignoring >>>> > > JoinWindows >>>> > > > > here). Yes, it >>>> > > > > currently does apply to all Windows implementations, but we >>>> shouldn't >>>> > > > > assume that it >>>> > > > > *has *to be that way on the basis that it currently happens to >>>> be. >>>> > > > > >>>> > > > > Also, the fact that they can all rely on the same aggregation >>>> algorithm >>>> > > > > seems like an >>>> > > > > implementation detail and it would be weird to force a >>>> separate/new >>>> > > DSL API >>>> > > > > just because >>>> > > > > under the covers we swap in a different processor. >>>> > > > > >>>> > > > > To be fair, I don't think there's a strong reason *against* not >>>> > > extending >>>> > > > > Windows -- in the end >>>> > > > > it will just mean adding a new #windowedBy method and >>>> copy/pasting >>>> > > > > everything from >>>> > > > > TimeWindowedKStream pretty much word for word. But anytime you >>>> find >>>> > > > > yourself >>>> > > > > copying over code almost exactly, there should probably be a >>>> good >>>> > > reason >>>> > > > > why :) >>>> > > > > >>>> > > > > >>>> > > > > On Wed, Jul 22, 2020 at 3:48 PM John Roesler < >>>> vvcep...@apache.org> >>>> > > wrote: >>>> > > > > >>>> > > > >> Thanks Leah! >>>> > > > >> >>>> > > > >> 5) Regarding the empty windows, I'm wondering if we should >>>> simply >>>> > > propose >>>> > > > >> that the windows should not be emitted downstream of the >>>> operator or >>>> > > > >> visible in IQ. Then, it'll be up to the implementation to make >>>> it >>>> > > happen. >>>> > > > >> I'm >>>> > > > >> personally not concerned about it, since it seems like there >>>> are >>>> > > multiple >>>> > > > >> ways to accomplish this. >>>> > > > >> >>>> > > > >> Note, the discrepancy Matthias pointed out is actually a >>>> design bug. >>>> > > The >>>> > > > >> windowed aggregation (like every operation in Streams) >>>> produces a >>>> > > "view", >>>> > > > >> which then forms the basis of downstream operations. When we >>>> pass the >>>> > > > >> Materialized option to the operation, all we're doing is >>>> saying to >>>> > > > >> "materialize" >>>> > > > >> the view (aka, actually store the computed view) and also make >>>> it >>>> > > > >> queriable. >>>> > > > >> It would be illegal for the "queriable, materialized view" to >>>> differ >>>> > > in >>>> > > > >> any way >>>> > > > >> from the "view". So, it seems we must either propose to emit >>>> the empty >>>> > > > >> windows AND make them visible in IQ, or propose NOT to emit >>>> the empty >>>> > > > >> windows AND NOT make them visible in IQ. >>>> > > > >> >>>> > > > >> 7) Regarding whether we can extend TimeWindows (or Windows): >>>> > > > >> I've been mulling this over more. I think it's worth asking the >>>> > > question of >>>> > > > >> what these classes even mean. For example, why is >>>> SessionWindows a >>>> > > > >> different thing from TimeWindows and UniversalWindows (which >>>> are both >>>> > > > >> Windows)? >>>> > > > >> >>>> > > > >> This conversation is extra complicated because of the >>>> incomplete and >>>> > > > >> mis-matched class hierarchy, but we can try to look past it >>>> for now. >>>> > > > >> >>>> > > > >> It seems like what Time and Universal (and any other Windows >>>> > > > >> implementation) have in common is that the windows are >>>> statically >>>> > > > >> enumerable. >>>> > > > >> As a consequence, they can all rely on an aggregation >>>> maintenence >>>> > > algorithm >>>> > > > >> that involves enumerating each of the windows and updating it. >>>> That >>>> > > > >> also means that their DSL object (TimeWindowedKStream) doesn't >>>> need >>>> > > > >> "subtractors" or "mergers", but only "adders"; again, this is a >>>> > > consequence >>>> > > > >> of the fact that the windows are enumerable. >>>> > > > >> >>>> > > > >> In contrast, session windows are data driven, so they are not >>>> > > statically >>>> > > > >> enumerable. Their algorithm has to rely on scans, and to do >>>> the scans, >>>> > > > >> it needs to know the "inactivity gap", which needs to be part >>>> of the >>>> > > window >>>> > > > >> definition. Likewise, session windows have the property that >>>> they need >>>> > > > >> to be merged, so their DSL object also requires mergers. >>>> > > > >> >>>> > > > >> It really seems like your new window definition doesn't fit >>>> into >>>> > > either >>>> > > > >> category. It uses a different algorithm, which relies on >>>> scans, but >>>> > > it is >>>> > > > >> also fixed in size, so it doesn't need mergers. In this >>>> situation, it >>>> > > seems >>>> > > > >> like the safe bet is to just create SessionWindows with no >>>> interface >>>> > > and >>>> > > > >> add a separate set of DSL operations and objects. It's a >>>> little extra >>>> > > code, >>>> > > > >> but it seems to keep everything tidier and more >>>> comprehensible, both >>>> > > > >> for us and for users. >>>> > > > >> >>>> > > > >> What do you think? >>>> > > > >> -John >>>> > > > >> >>>> > > > >> >>>> > > > >> >>>> > > > >> On Wed, Jul 22, 2020, at 10:30, Leah Thomas wrote: >>>> > > > >>> Hi Matthias, >>>> > > > >>> >>>> > > > >>> Thanks for the suggestions, I've updated the KIP and child >>>> page >>>> > > > >> accordingly >>>> > > > >>> and addressed some below. >>>> > > > >>> >>>> > > > >>> 1) For the mandatory grace period, we should use a static >>>> builder >>>> > > method >>>> > > > >>>> that take two parameters. >>>> > > > >>>> >>>> > > > >>> >>>> > > > >>> That makes sense, I've changed that in the public API. >>>> > > > >>> >>>> > > > >>> Btw: this implementation actually raises an issue for IQ: >>>> those empty >>>> > > > >>>> windows would be returned. >>>> > > > >>> >>>> > > > >>> >>>> > > > >>> This is a great point, with the current implementation plan >>>> empty >>>> > > windows >>>> > > > >>> would be returned. I think creating a second window store >>>> would >>>> > > > >> definitely >>>> > > > >>> work, but there would be more overhead in having two stores >>>> and >>>> > > switching >>>> > > > >>> windows between the stores, as well as doing scans in both >>>> stores to >>>> > > find >>>> > > > >>> existing windows. There might be a way to do avoid emitting >>>> empty >>>> > > windows >>>> > > > >>> without creating a second window store, I'll look more into >>>> it. >>>> > > > >>> >>>> > > > >>> Cheers, >>>> > > > >>> Leah >>>> > > > >>> >>>> > > > >>> On Tue, Jul 21, 2020 at 1:25 PM Matthias J. Sax < >>>> mj...@apache.org> >>>> > > > >> wrote: >>>> > > > >>> >>>> > > > >>>> Thanks for updating the KIP. >>>> > > > >>>> >>>> > > > >>>> Couple of follow up comments: >>>> > > > >>>> >>>> > > > >>>> 1) For the mandatory grace period, we should use a static >>>> builder >>>> > > > >> method >>>> > > > >>>> that take two parameters. This provides a better API as user >>>> cannot >>>> > > > >>>> forget to set the grace period. Throwing a runtime exception >>>> seems >>>> > > not >>>> > > > >>>> to be the best way to handle this case. >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> 2) In Fig.2 you list 10 hopping windows. I believe it should >>>> > > actually >>>> > > > >> be >>>> > > > >>>> more? There first hopping window would be [-6,-4[ and the >>>> last one >>>> > > > >> would >>>> > > > >>>> be from [19,29[ -- hence, the cost saving are actually much >>>> higher. >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> 3a) IQ: you are saying that the user need to compute the >>>> start time >>>> > > as >>>> > > > >>>> >>>> > > > >>>>> windowSize+the time they're looking at >>>> > > > >>>> >>>> > > > >>>> Should this be "targetTime - windowSize" instead? >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> 3b) IQ: in you example you say "window size of 10 minutes" >>>> with an >>>> > > > >>>> incident at 9:15. >>>> > > > >>>> >>>> > > > >>>>> they're looking for a window with the start time of 8:15. >>>> > > > >>>> >>>> > > > >>>> The example does not seem to add up? >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> 4) For "Processing Windows": you describe a three step >>>> approach: I >>>> > > just >>>> > > > >>>> want to point out, that step (1) is not necessary for each >>>> input >>>> > > > >> record, >>>> > > > >>>> because timestamps are not guaranteed to be unique and thus a >>>> > > previous >>>> > > > >>>> record with the same key and timestamp might have create the >>>> windows >>>> > > > >>>> already. >>>> > > > >>>> >>>> > > > >>>> Nit: I am also not exactly sure what you mean by step (3) as >>>> you use >>>> > > > >> the >>>> > > > >>>> word "send". I guess you mean "put"? >>>> > > > >>>> >>>> > > > >>>> It seem there are actually more details in the sub-page: >>>> > > > >>>> >>>> > > > >>>>> A new record for SlidingWindows will always create two new >>>> windows. >>>> > > > >> If >>>> > > > >>>> either of those windows already exist in the windows store, >>>> their >>>> > > > >>>> aggregation will simply be updated to include the new >>>> record, but no >>>> > > > >>>> duplicate window will be added to the WindowStore. >>>> > > > >>>> >>>> > > > >>>> However, the first and second sentence contradict each other >>>> a >>>> > > little >>>> > > > >>>> bit. I think the first sentence is not correct. >>>> > > > >>>> >>>> > > > >>>> Nit: >>>> > > > >>>> >>>> > > > >>>>> For in-order records, the left window will always be empty. >>>> > > > >>>> >>>> > > > >>>> This should be "right window" ? >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> 5) "Emitting Results": it might be worth to point out, that a >>>> > > > >>>> second/future window of a new record is create with no >>>> records, and >>>> > > > >>>> thus, even if it's initialized it won't be emitted. Only if a >>>> > > > >>>> consecutive record falls into the window, the window would be >>>> > > updates >>>> > > > >>>> and the window result (for a window content of one record) >>>> would be >>>> > > > >> sent >>>> > > > >>>> downstream. >>>> > > > >>>> >>>> > > > >>>> Again, the sub-page contains this details. Might still be >>>> worth to >>>> > > add >>>> > > > >>>> to the top level page, too. >>>> > > > >>>> >>>> > > > >>>> Btw: this implementation actually raises an issue for IQ: >>>> those >>>> > > empty >>>> > > > >>>> windows would be returned. Thus I am wondering if we need to >>>> use two >>>> > > > >>>> stores internally? One store for actual windows and one >>>> store for >>>> > > empty >>>> > > > >>>> windows? If an empty window is updated, it's move to the >>>> other >>>> > > store? >>>> > > > >>>> For IQ, we only allow to query the non-empty-window store? >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> 6) On the sub-page: >>>> > > > >>>> >>>> > > > >>>>> The left window of in-order records and both windows for >>>> > > out-of-order >>>> > > > >>>> records need to be updated with the values of records that >>>> have >>>> > > already >>>> > > > >>>> been processed. >>>> > > > >>>> >>>> > > > >>>> Why "both windows for out-of-order records"? IMHO, we don't >>>> know how >>>> > > > >>>> many existing windows needs to be updated when processing an >>>> > > > >>>> out-of-order record. Of course, an out-of-order record could >>>> not >>>> > > fall >>>> > > > >>>> into any existing window but create two new windows, too. >>>> > > > >>>> >>>> > > > >>>>> Because each record creates one new window that includes >>>> itself >>>> > > and >>>> > > > >> one >>>> > > > >>>> window that does not >>>> > > > >>>> >>>> > > > >>>> As state above, this does not seem to hold. I understand why >>>> you >>>> > > mean, >>>> > > > >>>> but it would be good to be exact. >>>> > > > >>>> >>>> > > > >>>> Figure 2: You use the term "late" but you mean >>>> "out-of-order" I >>>> > > guess >>>> > > > >> -- >>>> > > > >>>> a record is _late_ if it's not processed any longer as the >>>> grace >>>> > > period >>>> > > > >>>> passed already. >>>> > > > >>>> >>>> > > > >>>> Figure 2: "Late" should be out-or-order. The example text >>>> say a >>>> > > window >>>> > > > >>>> [16,26] should be created but the figure shows the green >>>> window as >>>> > > > >> [15,20]. >>>> > > > >>>> >>>> > > > >>>> About the blue window: maybe add not that the blue window >>>> contains >>>> > > the >>>> > > > >>>> aggregate we need for the green window, _before_ the new >>>> record `a` >>>> > > is >>>> > > > >>>> added to the blue window. >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> 7) I am not really happy to extend TimeWindows and I think >>>> the >>>> > > argument >>>> > > > >>>> about JoinWindows is not the best (IMHO, JoinWindows do it >>>> already >>>> > > > >> wrong >>>> > > > >>>> and we just repeat the same mistake). However, it seems our >>>> window >>>> > > > >>>> hierarchy is "broken" already and it might be out of scope >>>> for this >>>> > > KIP >>>> > > > >>>> to fix it. Hence, I am ok that we bite the bullet for now >>>> and clean >>>> > > it >>>> > > > >>>> up later. >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> -Matthias >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> On 7/20/20 5:18 PM, Guozhang Wang wrote: >>>> > > > >>>>> Hi Leah, >>>> > > > >>>>> >>>> > > > >>>>> Thanks for the updated KIP. I agree that extending >>>> SlidingWindows >>>> > > > >> from >>>> > > > >>>>> Windows is fine for the sake of not introducing more public >>>> APIs >>>> > > (and >>>> > > > >>>> their >>>> > > > >>>>> internal xxxImpl classes), and its cons is small enough to >>>> tolerate >>>> > > > >> to >>>> > > > >>>> me. >>>> > > > >>>>> >>>> > > > >>>>> >>>> > > > >>>>> Guozhang >>>> > > > >>>>> >>>> > > > >>>>> >>>> > > > >>>>> On Mon, Jul 20, 2020 at 1:49 PM Leah Thomas < >>>> ltho...@confluent.io> >>>> > > > >>>> wrote: >>>> > > > >>>>> >>>> > > > >>>>>> Hi all, >>>> > > > >>>>>> >>>> > > > >>>>>> Thanks for the feedback on the KIP. I've updated the KIP >>>> page >>>> > > > >>>>>> < >>>> > > > >>>>>> >>>> > > > >>>> >>>> > > > >> >>>> > > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL >>>> > > > >>>>>>> >>>> > > > >>>>>> to address these points and have created a child page >>>> > > > >>>>>> < >>>> > > > >>>>>> >>>> > > > >>>> >>>> > > > >> >>>> > > >>>> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450 >>>> > > > >>>>>>> >>>> > > > >>>>>> to go more in depth on certain implementation details. >>>> > > > >>>>>> >>>> > > > >>>>>> *Grace Period:* >>>> > > > >>>>>> I think Sophie raises a good point that the default grace >>>> period >>>> > > of >>>> > > > >> 24 >>>> > > > >>>>>> hours is often too long and was chosen when retention time >>>> and >>>> > > grace >>>> > > > >>>> period >>>> > > > >>>>>> were the same. For SlidingWindows, I propose we make the >>>> grace >>>> > > > >> period >>>> > > > >>>>>> mandatory. To keep formatting consistent with other types >>>> of >>>> > > > >> windows, >>>> > > > >>>> grace >>>> > > > >>>>>> period won't be an additional parameter in the #of method, >>>> but >>>> > > will >>>> > > > >>>> still >>>> > > > >>>>>> look like it does in other use cases: >>>> > > > >>>>>> >>>> .windowedBy(SlidingWindows.of(twentySeconds).grace(fiftySeconds). >>>> > > If >>>> > > > >>>> grace >>>> > > > >>>>>> period isn't properly initialized, an error will be thrown >>>> through >>>> > > > >> the >>>> > > > >>>>>> process method. >>>> > > > >>>>>> >>>> > > > >>>>>> *Storage Layer + Aggregation:* >>>> > > > >>>>>> SlidingWindows will use a WindowStore because computation >>>> can be >>>> > > > >> done >>>> > > > >>>> with >>>> > > > >>>>>> the information stored in a WindowStore (window timestamp >>>> and >>>> > > > >> value). >>>> > > > >>>> Using >>>> > > > >>>>>> the WindowStore also simplifies computation as >>>> SlidingWindows can >>>> > > > >>>> leverage >>>> > > > >>>>>> existing processes. Because we are using a WindowStore, the >>>> > > > >> aggregation >>>> > > > >>>>>> process will be similar to that of a hopping window. As >>>> records >>>> > > > >> come in >>>> > > > >>>>>> their value is added to the aggregation that already >>>> exists, >>>> > > > >> following >>>> > > > >>>> the >>>> > > > >>>>>> same procedure as hopping windows. The aggregation >>>> difference >>>> > > > >> between >>>> > > > >>>>>> SlidingWindows and HoppingWindows comes in creating new >>>> windows >>>> > > for >>>> > > > >> a >>>> > > > >>>>>> SlidingWindow, where you need to find the existing records >>>> that >>>> > > > >> belong >>>> > > > >>>> to >>>> > > > >>>>>> the new window. This computation is similar to the >>>> aggregation in >>>> > > > >>>>>> SessionWindows and requires a scan to the WindowStore to >>>> find the >>>> > > > >> window >>>> > > > >>>>>> with the aggregation needed, which will always be >>>> pre-computed. >>>> > > The >>>> > > > >> scan >>>> > > > >>>>>> requires creating an iterator, but should have minimal >>>> performance >>>> > > > >>>> effects >>>> > > > >>>>>> as this strategy is already implemented in SessionWindows. >>>> More >>>> > > > >> details >>>> > > > >>>> on >>>> > > > >>>>>> finding the aggregation that needs to be put in a new >>>> window can >>>> > > be >>>> > > > >>>> found >>>> > > > >>>>>> on the implementation page >>>> > > > >>>>>> < >>>> > > > >>>>>> >>>> > > > >>>> >>>> > > > >> >>>> > > >>>> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450 >>>> > > > >>>>>>> >>>> > > > >>>>>> . >>>> > > > >>>>>> >>>> > > > >>>>>> *Extending Windows<TimeWindow>, Windows<Window> or nothing* >>>> > > > >>>>>> Because SlidingWindows are still defined by a windowSize >>>> (whereas >>>> > > > >>>>>> SessionWindows are defined purely by data), I think it >>>> makes sense >>>> > > > >> to >>>> > > > >>>>>> leverage the existing Window processes instead of creating >>>> a new >>>> > > > >> store >>>> > > > >>>> type >>>> > > > >>>>>> that would be very similar to the WindowStore. While it's >>>> true >>>> > > that >>>> > > > >> the >>>> > > > >>>>>> #windowsFor method isn't necessary for SlidingWindows, >>>> JoinWindows >>>> > > > >> also >>>> > > > >>>>>> extends Windows<Window> and throws an >>>> > > UnsupportedOperationException >>>> > > > >> in >>>> > > > >>>> the >>>> > > > >>>>>> #windowsFor method, which is what SlidingWindows can do. >>>> The >>>> > > > >> difference >>>> > > > >>>>>> between extending Windows<TimeWindow> or Windows<Window> is >>>> > > > >> minimal, as >>>> > > > >>>>>> both are ways to pass window parameters. Extending >>>> > > > >> Windows<TimeWindow> >>>> > > > >>>> will >>>> > > > >>>>>> give us more leverage in utilizing existing processes. >>>> > > > >>>>>> >>>> > > > >>>>>> *Emit Strategy* >>>> > > > >>>>>> I would argue that emitting for every update is still the >>>> best way >>>> > > > >> to go >>>> > > > >>>>>> for SlidingWindows because it mimics the other types of >>>> windows, >>>> > > and >>>> > > > >>>>>> suppression can be leveraged to limit what SlidingWindows >>>> emits. >>>> > > > >> While >>>> > > > >>>> some >>>> > > > >>>>>> users may only want to see the last value, others may want >>>> to see >>>> > > > >> more, >>>> > > > >>>> and >>>> > > > >>>>>> leaving the emit strategy to emit partial results allows >>>> both >>>> > > users >>>> > > > >> to >>>> > > > >>>>>> access what they want. >>>> > > > >>>>>> >>>> > > > >>>>>> *Additional Features* >>>> > > > >>>>>> Supporting sliding windows inherently, and shifting >>>> inefficient >>>> > > > >> hopping >>>> > > > >>>>>> windows to sliding windows, is an interesting idea and >>>> could be >>>> > > > >> built on >>>> > > > >>>>>> top of SlidingWindows when they are finished, but right >>>> now seems >>>> > > > >> out of >>>> > > > >>>>>> scope for the needs of this KIP. Similarly, including a >>>> > > > >> `subtraction` >>>> > > > >>>>>> feature could have performance improvements, but doesn't >>>> seem >>>> > > > >> necessary >>>> > > > >>>> for >>>> > > > >>>>>> the implementation of this KIP. >>>> > > > >>>>>> >>>> > > > >>>>>> Let me know what you think of the updates, >>>> > > > >>>>>> >>>> > > > >>>>>> Leah >>>> > > > >>>>>> >>>> > > > >>>>>> On Thu, Jul 16, 2020 at 11:57 AM John Roesler < >>>> > > vvcep...@apache.org> >>>> > > > >>>> wrote: >>>> > > > >>>>>> >>>> > > > >>>>>>> Hello all, >>>> > > > >>>>>>> >>>> > > > >>>>>>> Thanks for the KIP, Leah! >>>> > > > >>>>>>> >>>> > > > >>>>>>> Regarding (1): I'd go farther actually. Making Windows an >>>> > > abstract >>>> > > > >>>>>>> class was a mistake from the beginning that led to us not >>>> being >>>> > > > >>>>>>> able to fix a very confusing situation for users around >>>> retention >>>> > > > >>>> times, >>>> > > > >>>>>>> final results emitting, etc. Thus, I would not suggest >>>> extending >>>> > > > >>>>>>> TimeWindows for sure, but would also not suggest extending >>>> > > Windows. >>>> > > > >>>>>>> >>>> > > > >>>>>>> The very simplest thing to do is follow the example of >>>> > > > >> SessionWindows, >>>> > > > >>>>>>> which is just a completely self-contained class. If we >>>> don't mess >>>> > > > >> with >>>> > > > >>>>>>> class inheritance, we won't ever have any of the problems >>>> related >>>> > > > >> to >>>> > > > >>>>>>> class inheritance. This is my preferred solution. >>>> > > > >>>>>>> >>>> > > > >>>>>>> Still, Sliding windows has a lot in common with >>>> TimeWindows and >>>> > > > >> other >>>> > > > >>>>>>> fixed-size windows, namely that the windows are fixed in >>>> size. If >>>> > > > >> we >>>> > > > >>>> want >>>> > > > >>>>>>> to preserve the current two-part windowing API in which >>>> you can >>>> > > > >> window >>>> > > > >>>>>>> by either "fixed" or "data driven" modes, I'd suggest we >>>> avoid >>>> > > > >>>> increasing >>>> > > > >>>>>>> the blast radius of Windows by taking the opportunity to >>>> replace >>>> > > it >>>> > > > >>>> with >>>> > > > >>>>>>> a proper interface and implement that interface instead. >>>> > > > >>>>>>> >>>> > > > >>>>>>> For example: >>>> > > > >>>>>>> https://github.com/apache/kafka/pull/9031 >>>> > > > >>>>>>> >>>> > > > >>>>>>> Then, SlidingWindows would just implement >>>> > > FixedSizeWindowDefinition >>>> > > > >>>>>>> >>>> > > > >>>>>>> ====== >>>> > > > >>>>>>> >>>> > > > >>>>>>> Regarding (2), it seems more straightforward as a user of >>>> Streams >>>> > > > >>>>>>> to just have one mental model. _All_ of our aggregation >>>> > > operations >>>> > > > >>>>>>> follow an eager emission model, in which we just emit an >>>> update >>>> > > > >>>> whenever >>>> > > > >>>>>>> an update is available. We already provided Suppression to >>>> > > > >> explicitly >>>> > > > >>>>>> apply >>>> > > > >>>>>>> different update semantics in the case it's required. Why >>>> should >>>> > > we >>>> > > > >>>>>> define >>>> > > > >>>>>>> a snowflake operation with completely different semantics >>>> from >>>> > > > >>>> everything >>>> > > > >>>>>>> else? I.e., systems are generally easier to use when they >>>> follow >>>> > > a >>>> > > > >> few >>>> > > > >>>>>>> simple, composable rules than when they have a lot of >>>> different, >>>> > > > >>>> specific >>>> > > > >>>>>>> rules. >>>> > > > >>>>>>> >>>> > > > >>>>>>> >>>> > > > >>>>>>> ====== >>>> > > > >>>>>>> >>>> > > > >>>>>>> New point: (4): >>>> > > > >>>>>>> It would be nice to include some examples of user code >>>> that would >>>> > > > >> use >>>> > > > >>>> the >>>> > > > >>>>>>> new API, which should include: >>>> > > > >>>>>>> 1. using the DSL with the sliding window definition >>>> > > > >>>>>>> 2. accessing the stored results of a sliding window >>>> aggregation >>>> > > > >> via IQ >>>> > > > >>>>>>> 3. defining a custom processor to access sliding windows >>>> in a >>>> > > store >>>> > > > >>>>>>> >>>> > > > >>>>>>> It generally helps reviewers wrap their heads around the >>>> > > proposal, >>>> > > > >> as >>>> > > > >>>>>> well >>>> > > > >>>>>>> as shaking out any design issues that would otherwise >>>> only come >>>> > > up >>>> > > > >>>> during >>>> > > > >>>>>>> implementation/testing/review. >>>> > > > >>>>>>> >>>> > > > >>>>>>> Thanks again for the awesome proposal! >>>> > > > >>>>>>> -John >>>> > > > >>>>>>> >>>> > > > >>>>>>> >>>> > > > >>>>>>> On Tue, Jul 14, 2020, at 12:31, Guozhang Wang wrote: >>>> > > > >>>>>>>> Hello Leah, >>>> > > > >>>>>>>> >>>> > > > >>>>>>>> Thanks for the nice written KIP. A few thoughts: >>>> > > > >>>>>>>> >>>> > > > >>>>>>>> 1) I echo the other reviewer's comments regarding the >>>> typing: >>>> > > why >>>> > > > >>>>>>> extending >>>> > > > >>>>>>>> TimeWindow instead of just extending Window? >>>> > > > >>>>>>>> >>>> > > > >>>>>>>> 2) I also feel that emitting policy for this type of >>>> windowing >>>> > > > >>>>>>> aggregation >>>> > > > >>>>>>>> may be different from the existing ones. Existing >>>> emitting >>>> > > policy >>>> > > > >> is >>>> > > > >>>>>> very >>>> > > > >>>>>>>> simple: emit every time when window get updates, and >>>> emit every >>>> > > > >> time >>>> > > > >>>> on >>>> > > > >>>>>>>> out-of-ordering data within grace period, this is >>>> because for >>>> > > > >>>>>>> time-windows >>>> > > > >>>>>>>> the window close time is strictly depend on the window >>>> start >>>> > > time >>>> > > > >>>> which >>>> > > > >>>>>>> is >>>> > > > >>>>>>>> fixed, while for session-windows although the window >>>> open/close >>>> > > > >> time >>>> > > > >>>> is >>>> > > > >>>>>>>> also data-dependent it is relatively infrequent compared >>>> to the >>>> > > > >>>>>>>> sliding-windows. For this KIP, since each new data would >>>> cause a >>>> > > > >>>>>>>> new sliding-window, the num. windows maintained >>>> logically could >>>> > > be >>>> > > > >>>> much >>>> > > > >>>>>>>> larger and hence emitting on each update may be too >>>> aggressive. >>>> > > > >>>>>>>> >>>> > > > >>>>>>>> 3) Although KIP itself should be focusing on user face >>>> > > > >> interfaces, I'd >>>> > > > >>>>>>>> suggest we create a children page of KIP-450 discussing >>>> about >>>> > > its >>>> > > > >>>>>>>> implementations as well, since some of that may drive the >>>> > > > >> interface >>>> > > > >>>>>>> design. >>>> > > > >>>>>>>> E.g. personally I think having a combiner interface in >>>> addition >>>> > > to >>>> > > > >>>>>>>> aggregator would be useful but that's based on my 2cents >>>> about >>>> > > the >>>> > > > >>>>>>>> implementation design (I once created a child page >>>> describing >>>> > > it: >>>> > > > >>>>>>>> >>>> > > > >>>>>>> >>>> > > > >>>>>> >>>> > > > >>>> >>>> > > > >> >>>> > > >>>> https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation >>>> > > > >>>>>>>> ). >>>> > > > >>>>>>>> >>>> > > > >>>>>>>> >>>> > > > >>>>>>>> Guozhang >>>> > > > >>>>>>>> >>>> > > > >>>>>>>> >>>> > > > >>>>>>>> >>>> > > > >>>>>>>> >>>> > > > >>>>>>>> On Tue, Jul 14, 2020 at 5:31 AM Bruno Cadonna < >>>> > > br...@confluent.io >>>> > > > >>> >>>> > > > >>>>>>> wrote: >>>> > > > >>>>>>>> >>>> > > > >>>>>>>>> Hi Leah, >>>> > > > >>>>>>>>> >>>> > > > >>>>>>>>> Thank you for the KIP! >>>> > > > >>>>>>>>> >>>> > > > >>>>>>>>> Here is my feedback: >>>> > > > >>>>>>>>> >>>> > > > >>>>>>>>> 1. The KIP would benefit from some code examples that >>>> show how >>>> > > > >> to use >>>> > > > >>>>>>>>> sliding windows in aggregations. >>>> > > > >>>>>>>>> >>>> > > > >>>>>>>>> 2. The different sliding windows in Figure 1 and 2 are >>>> really >>>> > > > >> hard to >>>> > > > >>>>>>>>> distinguish. Could you please try to make them >>>> graphically >>>> > > better >>>> > > > >>>>>>>>> distinguishable? You could try to draw the frames of >>>> > > consecutive >>>> > > > >>>>>>>>> windows shifted to each other. >>>> > > > >>>>>>>>> >>>> > > > >>>>>>>>> 3. I agree with Matthias, that extending >>>> Windows<TimeWindow> >>>> > > > >> does not >>>> > > > >>>>>>>>> seem to be the best approach. What would be the result >>>> of >>>> > > > >>>>>>>>> windowsFor()? >>>> > > > >>>>>>>>> >>>> > > > >>>>>>>>> 4. In the section "Public Interfaces" you should remove >>>> > > > >>>>>> implementation >>>> > > > >>>>>>>>> details like private constructors and private fields. >>>> > > > >>>>>>>>> >>>> > > > >>>>>>>>> 5. Do we need a new store interface or can we use >>>> WindowStore? >>>> > > > >> Some >>>> > > > >>>>>>>>> words about that would be informative. >>>> > > > >>>>>>>>> >>>> > > > >>>>>>>>> 6. @Matthias, if the subtrator is not strictly needed, >>>> I would >>>> > > > >> skip >>>> > > > >>>>>> it >>>> > > > >>>>>>>>> for now and add it later. >>>> > > > >>>>>>>>> >>>> > > > >>>>>>>>> 7. I also agree that having a section that describes >>>> how to >>>> > > > >> handle >>>> > > > >>>>>>>>> out-of-order records would be good to understand what >>>> is still >>>> > > > >>>>>> missing >>>> > > > >>>>>>>>> and what we can reuse. >>>> > > > >>>>>>>>> >>>> > > > >>>>>>>>> Best, >>>> > > > >>>>>>>>> Bruno >>>> > > > >>>>>>>>> >>>> > > > >>>>>>>>> On Sat, Jul 11, 2020 at 9:16 PM Matthias J. Sax < >>>> > > > >> mj...@apache.org> >>>> > > > >>>>>>> wrote: >>>> > > > >>>>>>>>>> >>>> > > > >>>>>>>>>> Leah, >>>> > > > >>>>>>>>>> >>>> > > > >>>>>>>>>> thanks for your update. However, it does not >>>> completely answer >>>> > > > >> my >>>> > > > >>>>>>>>> question. >>>> > > > >>>>>>>>>> >>>> > > > >>>>>>>>>> In our current window implementations, we emit a >>>> window result >>>> > > > >>>>>> update >>>> > > > >>>>>>>>>> record (ie, early/partial result) for each input >>>> record. When >>>> > > an >>>> > > > >>>>>>>>>> out-of-order record arrives, we just update to >>>> corresponding >>>> > > old >>>> > > > >>>>>>> window >>>> > > > >>>>>>>>>> and emit another update. >>>> > > > >>>>>>>>>> >>>> > > > >>>>>>>>>> It's unclear from the KIP if you propose the same emit >>>> > > > >> strategy? -- >>>> > > > >>>>>>> For >>>> > > > >>>>>>>>>> sliding windows it might be worth to consider to use a >>>> > > different >>>> > > > >>>>>> emit >>>> > > > >>>>>>>>>> strategy and only support emitting the final result >>>> only (ie, >>>> > > > >> after >>>> > > > >>>>>>> the >>>> > > > >>>>>>>>>> grace period passed)? >>>> > > > >>>>>>>>>> >>>> > > > >>>>>>>>>> >>>> > > > >>>>>>>>>> >>>> > > > >>>>>>>>>> Boyang, also raises a good point that relates to my >>>> point from >>>> > > > >>>>>> above >>>> > > > >>>>>>>>>> about pre-aggregations and storage layout. Our current >>>> time >>>> > > > >> windows >>>> > > > >>>>>>> are >>>> > > > >>>>>>>>>> all pre-aggregated and stored in parallel. We can also >>>> lookup >>>> > > > >>>>>> windows >>>> > > > >>>>>>>>>> efficiently, as we can compute the windowed-key given >>>> the >>>> > > input >>>> > > > >>>>>>> record >>>> > > > >>>>>>>>>> key and timestamp based on the window definition. >>>> > > > >>>>>>>>>> >>>> > > > >>>>>>>>>> However, for sliding windows, window boundaries are >>>> data >>>> > > > >> dependent >>>> > > > >>>>>>> and >>>> > > > >>>>>>>>>> thus we cannot compute them upfront. Thus, how can we >>>> "find" >>>> > > > >>>>>> existing >>>> > > > >>>>>>>>>> window efficiently? Furthermore, out-of-order data >>>> would >>>> > > create >>>> > > > >> new >>>> > > > >>>>>>>>>> windows in the past and we need to be able to handle >>>> this >>>> > > case. >>>> > > > >>>>>>>>>> >>>> > > > >>>>>>>>>> Thus, to handle out-of-order data correctly, we need >>>> to store >>>> > > > >> all >>>> > > > >>>>>> raw >>>> > > > >>>>>>>>>> input events. Additionally, we could also store >>>> pre-aggregated >>>> > > > >>>>>>> results >>>> > > > >>>>>>>>>> if we thinks it's benfitial. -- If we apply "emit only >>>> final >>>> > > > >>>>>> results" >>>> > > > >>>>>>>>>> strategy, storing pre-aggregated result would not be >>>> necessary >>>> > > > >>>>>>> though. >>>> > > > >>>>>>>>>> >>>> > > > >>>>>>>>>> >>>> > > > >>>>>>>>>> Btw: for sliding windows it might also be useful to >>>> consider >>>> > > > >>>>>> allowing >>>> > > > >>>>>>>>>> users to supply a `Subtractor` -- this subtractor >>>> could be >>>> > > > >> applied >>>> > > > >>>>>> on >>>> > > > >>>>>>>>>> the current window result (in case we store it) if a >>>> record >>>> > > > >> drops >>>> > > > >>>>>>> out of >>>> > > > >>>>>>>>>> the window. Of course, not all aggregation functions >>>> are >>>> > > > >>>>>> subtractable >>>> > > > >>>>>>>>>> and we can consider this as a follow up task, too, and >>>> not >>>> > > > >> include >>>> > > > >>>>>> in >>>> > > > >>>>>>>>>> this KIP for now. Thoughts? >>>> > > > >>>>>>>>>> >>>> > > > >>>>>>>>>> >>>> > > > >>>>>>>>>> >>>> > > > >>>>>>>>>> I was also thinking about the type hierarchy. I am not >>>> sure if >>>> > > > >>>>>>> extending >>>> > > > >>>>>>>>>> TimeWindow is the best approach? For TimeWindows, we >>>> can >>>> > > > >>>>>> pre-compute >>>> > > > >>>>>>>>>> window boundaries (cf `windowsFor()`) while for a >>>> sliding >>>> > > window >>>> > > > >>>>>> the >>>> > > > >>>>>>>>>> boundaries are data dependent. Session windows are >>>> also data >>>> > > > >>>>>>> dependent >>>> > > > >>>>>>>>>> and thus they don't inherit from TimeWindow (Maybe >>>> check out >>>> > > the >>>> > > > >>>>>> KIP >>>> > > > >>>>>>>>>> that added session windows? It could provides some good >>>> > > > >> insights.) >>>> > > > >>>>>>> -- I >>>> > > > >>>>>>>>>> believe the same rational applies to sliding windows? >>>> > > > >>>>>>>>>> >>>> > > > >>>>>>>>>> >>>> > > > >>>>>>>>>> >>>> > > > >>>>>>>>>> -Matthias >>>> > > > >>>>>>>>>> >>>> > > > >>>>>>>>>> >>>> > > > >>>>>>>>>> >>>> > > > >>>>>>>>>> >>>> > > > >>>>>>>>>> On 7/10/20 12:47 PM, Boyang Chen wrote: >>>> > > > >>>>>>>>>>> Thanks Leah and Sophie for the KIP. >>>> > > > >>>>>>>>>>> >>>> > > > >>>>>>>>>>> 1. I'm a bit surprised that we don't have an advance >>>> time. >>>> > > > >> Could >>>> > > > >>>>>> we >>>> > > > >>>>>>>>>>> elaborate how the storage layer is structured? >>>> > > > >>>>>>>>>>> >>>> > > > >>>>>>>>>>> 2. IIUC, there will be extra cost in terms of fetching >>>> > > > >>>>>> aggregation >>>> > > > >>>>>>>>> results, >>>> > > > >>>>>>>>>>> since we couldn't pre-aggregate until the user asks >>>> for it. >>>> > > > >> Would >>>> > > > >>>>>>> be >>>> > > > >>>>>>>>> good >>>> > > > >>>>>>>>>>> to also discuss it. >>>> > > > >>>>>>>>>>> >>>> > > > >>>>>>>>>>> 3. We haven't discussed the possibility of supporting >>>> sliding >>>> > > > >>>>>>> windows >>>> > > > >>>>>>>>>>> inherently. For a user who actually uses a hopping >>>> window, >>>> > > > >>>>>> Streams >>>> > > > >>>>>>>>> could >>>> > > > >>>>>>>>>>> detect such an inefficiency doing a >>>> window_size/advance_time >>>> > > > >>>>>> ratio >>>> > > > >>>>>>> to >>>> > > > >>>>>>>>> reach >>>> > > > >>>>>>>>>>> a conclusion on whether the write amplification is >>>> too high >>>> > > > >>>>>>> compared >>>> > > > >>>>>>>>> with >>>> > > > >>>>>>>>>>> some configured threshold. The benefit of doing so is >>>> that >>>> > > > >>>>>> existing >>>> > > > >>>>>>>>> Streams >>>> > > > >>>>>>>>>>> users don't need to change their code, learn a new >>>> API, but >>>> > > > >> only >>>> > > > >>>>>> to >>>> > > > >>>>>>>>> upgrade >>>> > > > >>>>>>>>>>> Streams library to get benefits for their inefficient >>>> hopping >>>> > > > >>>>>>> window >>>> > > > >>>>>>>>>>> implementation. There might be some compatibility >>>> issues for >>>> > > > >>>>>> sure, >>>> > > > >>>>>>> but >>>> > > > >>>>>>>>>>> worth listing them out for trade-off. >>>> > > > >>>>>>>>>>> >>>> > > > >>>>>>>>>>> Boyang >>>> > > > >>>>>>>>>>> >>>> > > > >>>>>>>>>>> On Fri, Jul 10, 2020 at 12:40 PM Leah Thomas < >>>> > > > >>>>>> ltho...@confluent.io >>>> > > > >>>>>>>> >>>> > > > >>>>>>>>> wrote: >>>> > > > >>>>>>>>>>> >>>> > > > >>>>>>>>>>>> Hey Matthias, >>>> > > > >>>>>>>>>>>> >>>> > > > >>>>>>>>>>>> Thanks for pointing that out. I added the following >>>> to the >>>> > > > >>>>>> Propose >>>> > > > >>>>>>>>> Changes >>>> > > > >>>>>>>>>>>> section of the KIP: >>>> > > > >>>>>>>>>>>> >>>> > > > >>>>>>>>>>>> "Records that come out of order will be processed >>>> the same >>>> > > way >>>> > > > >>>>>> as >>>> > > > >>>>>>>>> in-order >>>> > > > >>>>>>>>>>>> records, as long as they fall within the grace >>>> period. Any >>>> > > new >>>> > > > >>>>>>> windows >>>> > > > >>>>>>>>>>>> created by the late record will still be created, >>>> and the >>>> > > > >>>>>> existing >>>> > > > >>>>>>>>> windows >>>> > > > >>>>>>>>>>>> that are changed by the late record will be updated. >>>> Any >>>> > > > >> record >>>> > > > >>>>>>> that >>>> > > > >>>>>>>>> falls >>>> > > > >>>>>>>>>>>> outside of the grace period (either user defined or >>>> default) >>>> > > > >>>>>> will >>>> > > > >>>>>>> be >>>> > > > >>>>>>>>>>>> discarded. " >>>> > > > >>>>>>>>>>>> >>>> > > > >>>>>>>>>>>> All the best, >>>> > > > >>>>>>>>>>>> Leah >>>> > > > >>>>>>>>>>>> >>>> > > > >>>>>>>>>>>> On Thu, Jul 9, 2020 at 9:47 PM Matthias J. Sax < >>>> > > > >>>>>> mj...@apache.org> >>>> > > > >>>>>>>>> wrote: >>>> > > > >>>>>>>>>>>> >>>> > > > >>>>>>>>>>>>> Leah, >>>> > > > >>>>>>>>>>>>> >>>> > > > >>>>>>>>>>>>> thanks a lot for the KIP. Very well written. >>>> > > > >>>>>>>>>>>>> >>>> > > > >>>>>>>>>>>>> The KIP does not talk about the handling of >>>> out-of-order >>>> > > data >>>> > > > >>>>>>> though. >>>> > > > >>>>>>>>>>>>> How do you propose to address this? >>>> > > > >>>>>>>>>>>>> >>>> > > > >>>>>>>>>>>>> >>>> > > > >>>>>>>>>>>>> -Matthias >>>> > > > >>>>>>>>>>>>> >>>> > > > >>>>>>>>>>>>> On 7/8/20 5:33 PM, Leah Thomas wrote: >>>> > > > >>>>>>>>>>>>>> Hi all, >>>> > > > >>>>>>>>>>>>>> I'd like to kick-off the discussion for KIP-450, >>>> adding >>>> > > > >>>>>> sliding >>>> > > > >>>>>>>>> window >>>> > > > >>>>>>>>>>>>>> aggregation support to Kafka Streams. >>>> > > > >>>>>>>>>>>>>> >>>> > > > >>>>>>>>>>>>>> >>>> > > > >>>>>>>>>>>>> >>>> > > > >>>>>>>>>>>> >>>> > > > >>>>>>>>> >>>> > > > >>>>>>> >>>> > > > >>>>>> >>>> > > > >>>> >>>> > > > >> >>>> > > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL >>>> > > > >>>>>>>>>>>>>> >>>> > > > >>>>>>>>>>>>>> Let me know what you think, >>>> > > > >>>>>>>>>>>>>> Leah >>>> > > > >>>>>>>>>>>>>> >>>> > > > >>>>>>>>>>>>> >>>> > > > >>>>>>>>>>>>> >>>> > > > >>>>>>>>>>>> >>>> > > > >>>>>>>>>>> >>>> > > > >>>>>>>>>> >>>> > > > >>>>>>>>> >>>> > > > >>>>>>>> >>>> > > > >>>>>>>> >>>> > > > >>>>>>>> -- >>>> > > > >>>>>>>> -- Guozhang >>>> > > > >>>>>>>> >>>> > > > >>>>>>> >>>> > > > >>>>>> >>>> > > > >>>>> >>>> > > > >>>>> >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>> >>>> > > > >> >>>> > > > > >>>> > > > >>>> > > > >>>> > > > Attachments: >>>> > > > * signature.asc >>>> > > >>>> > >>>> >>>