To accommodate the change to a final class, I've added another *windowedBy()* function in *CogroupedKStream.java *to handle SlidingWindows.
As far as the discussion goes, I think this is the last change we've talked about. If anyone has other comments or concerns, please let me know! Cheers, Leah On Thu, Jul 23, 2020 at 7:34 PM Leah Thomas <ltho...@confluent.io> wrote: > Thanks for the discussion about extending TimeWindows. I agree that making > it future proof is important, and the implementation of SlidingWindows is > unique enough that it seems logical to make it its own final class. > > On that note, I've updated the KIP to make SlidingWindows a stand alone > final class, and added the *windowedBy() *API in *KGroupedStream *to > handle SlidingWindows. It seems that SlidingWindows would still be able to > leverage *TimeWindowedKStream by* creating a SlidingWindows version of > *TimeWindowedKStreamImpl* that implements *TimeWindowedKStream. *If > anyone sees issues with this implementation, please let me know. > > Best, > Leah > > On Wed, Jul 22, 2020 at 10:47 PM John Roesler <vvcep...@apache.org> wrote: > >> Thanks for the reply, Sophie. >> >> That all sounds about right to me. >> >> The Windows “interface”/algorithm is quite flexible, so it makes sense >> for it to be extensible. Different implementations can (and do) enumerate >> different windows to suit different use cases. >> >> On the other hand, I can’t think of any way to extend SessionWindows to >> do something different using the same algorithm, so it makes sense for it >> to stay final. >> >> If we think SlidingWindows is similarly not usefully extensible, then we >> can make it final. It’s easy to remove final later, but adding it is not >> possible. Or we could go the other route and just make it an interface, on >> general principle. Both of these choices are safe API design. >> >> Thanks again, >> John >> >> On Wed, Jul 22, 2020, at 21:54, Sophie Blee-Goldman wrote: >> > > >> > > Users could pass in a custom `SessionWindows` as >> > > long as the session algorithm works correctly for it. >> > >> > >> > Well not really, SessionWindows is a final class. TimeWindows is also a >> > final class, so neither of these can be extended/customized. For a given >> > Windows then there would only be three (non-overlapping) possibilities: >> > either it's TimeWindows, SlidingWindows, or a custom Windows. I don't >> > think there's any problem with determining what the user wants in this >> case >> > -- >> > we would just check if it's a SlidingWindows and connect the new >> processor, >> > or else connect the existing hopping/tumbling window processor. >> > >> > I'll admit that last sentence does leave a bad taste in my mouth. Part >> of >> > that >> > is probably the "leaking" API Matthias pointed out; we just assume the >> > hopping/tumbling window implementation fits all custom windows. But I >> guess >> > if you really needed to customize the algorithm any further you may as >> well >> > stick in a transformer and do it all yourself. >> > >> > Anyways, given what we have, it does seem weird to apply one algorithm >> > for most Windows types and then swap in a different one for one specific >> > extension of Windows. So adding a new #windowedBy(SlidingWindows) >> > sounds reasonable to me. >> > >> > I'm still not convinced that we need a whole new TimeWindowedKStream >> > equivalent class for sliding windows though. It seems like the >> > hopping/tumbling >> > window implementation could benefit just as much from a subtractor as >> the >> > sliding windows so the only future-proofing we get is the ability to be >> > lazy and >> > add the subtractor to one but not the other. Of course it would only be >> an >> > optimization so we could just not apply it to one type and nothing would >> > break. >> > It does make me nervous to go against the "future-proof" direction, >> though. >> > Are there any other examples of things we might want to add to one >> window >> > type but not to another? >> > >> > On another note, this actually brings up a new question: should >> > SlidingWindows >> > also be final? My take is "yes" since there's no reasonable >> customization of >> > sliding windows, at least not that I can think of. Thoughts? >> > >> > >> > On Wed, Jul 22, 2020 at 7:15 PM John Roesler <vvcep...@apache.org> >> wrote: >> > >> > > Thanks, all, >> > > >> > > I can see how my conclusion was kind of a leap. >> > > >> > > What Matthias said is indeed what I was thinking. When you provide a >> > > window definition to the windowBy() method, you are selecting an >> algorithm >> > > that will be used to compute the windows from the input data. >> > > >> > > I didn’t mean the code implementation “algorithm”, but the high-level >> > > algorithm that describes how the input stream will be transformed >> into a >> > > sequence of windows. >> > > >> > > For example, the algorithm for Windows is something like “given the >> record >> > > timestamp, include the record in each of the enumerated windows”. >> Note that >> > > there can be a lot of variation in how the windows are enumerated, >> which is >> > > why there are at least a couple of implementations of Windows, and we >> are >> > > open to adding more (like for natural calendar boundaries). >> > > >> > > For SessionWindows, it’s more like “if any window is within the gap, >> > > extend its boundaries to include this record and if two windows >> touch, then >> > > merge them”. >> > > >> > > Clearly, the algorithm for SlidingWindows doesn’t fall into either >> > > category, so it seems inappropriate to claim that it does in the API >> (by >> > > implementing Windows with stubbed methods) and then cast internally to >> > > execute a completely different algorithm. >> > > >> > > To your other observation, that the DSL object resulting from windowBy >> > > would look the same for Windows and SessionWindows, maybe it makes >> sense >> > > for windowBy(SessionWindows) also to return a TimeWindowedKStream. >> > > >> > > i.e.: >> > > ======================= >> > > <W extends Window> TimeWindowedKStream<K, V> windowedBy(final >> Windows<W> >> > > windows); >> > > TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows); >> > > ======================= >> > > >> > > I can’t think of a reason this wouldn’t work. But then again, it >> would be >> > > more future-proof to go ahead and specify a different return type >> now, if >> > > we think we'll want to add subtractors and stuff later. I don't have a >> > > strong feeling about that part of the API. It seems to be independent >> of >> > > whether SlidingWindows extends Windows or not. >> > > >> > > Thanks, >> > > -John >> > > >> > > On Wed, Jul 22, 2020, at 19:41, Matthias J. Sax wrote: >> > > > I think what John tries to say is the following: >> > > > >> > > > We have `windowedBy(Windows)` that accept hopping/tumbling windows >> but >> > > > also custom window and we use a specific algorithm. Note, that >> custom >> > > > windows must "work" based on the used algorithm. >> > > > >> > > > For session windows we have `windowedBy(SessionWindows)` and apply a >> > > > different algorithm. Users could pass in a custom `SessionWindows` >> as >> > > > long as the session algorithm works correctly for it. >> > > > >> > > > For the new sliding windows, we want to use a different algorithm >> > > > compare to hopping/tumbling windows. If we let sliding window extend >> > > > `Windows`, we can decide at runtime if we need to use the >> > > > hopping/tumbling window algorithm for hopping/tumbling windows or >> the >> > > > new sliding window algorithm for sliding windows. However, if we >> get a >> > > > custom window, which algorithm do we pick now? The existing >> > > > tumbling/hopping window algorithm of the new sliding window >> algorithm? >> > > > Both a custom "time-window" and custom "sliding window" implement >> the >> > > > generic `Windows` class and thus we cannot make a decision as we >> don't >> > > > know the user's intent. >> > > > >> > > > As a matter of fact, even if the user might not be aware of it, the >> > > > algorithm we use does already leak into the API (if a user extends >> > > > `Windows` is must work with our hopping/tumbling window algorithm >> and if >> > > > a user extends `SessionWindows` it must work with our session >> algorithm) >> > > > and it seems we need to preserve this property for sliding window. >> > > > >> > > > >> > > > -Matthias >> > > > >> > > > On 7/22/20 4:35 PM, Sophie Blee-Goldman wrote: >> > > > > Hey John, >> > > > > >> > > > > Just a few follow-up questions/comments about the whole Windows >> thing: >> > > > > >> > > > > That's a good way of looking at things; in particular the point >> about >> > > > > SessionWindows >> > > > > for example requiring a Merger while other "statically enumerable" >> > > windows >> > > > > require >> > > > > only an adder seems to touch on the heart of the matter. >> > > > > >> > > > > It seems like what Time and Universal (and any other Windows >> > > > >> implementation) have in common is that the windows are statically >> > > > >> enumerable. >> > > > >> As a consequence, they can all rely on an aggregation maintenence >> > > algorithm >> > > > >> that involves enumerating each of the windows and updating it. >> That >> > > > >> also means that their DSL object (TimeWindowedKStream) doesn't >> need >> > > > >> "subtractors" or "mergers", but only "adders"; again, this is a >> > > consequence >> > > > >> of the fact that the windows are enumerable. >> > > > > >> > > > > >> > > > > Given that, I'm a bit confused why you conclude that sliding >> windows >> > > are >> > > > > fundamentally >> > > > > different from the "statically enumerable" windows -- sliding >> windows >> > > > > require only an >> > > > > adder too. I'm not sure it's a consequence of being enumerable, >> or that >> > > > > being enumerable >> > > > > is the fundamental property that unites all Windows (ignoring >> > > JoinWindows >> > > > > here). Yes, it >> > > > > currently does apply to all Windows implementations, but we >> shouldn't >> > > > > assume that it >> > > > > *has *to be that way on the basis that it currently happens to be. >> > > > > >> > > > > Also, the fact that they can all rely on the same aggregation >> algorithm >> > > > > seems like an >> > > > > implementation detail and it would be weird to force a >> separate/new >> > > DSL API >> > > > > just because >> > > > > under the covers we swap in a different processor. >> > > > > >> > > > > To be fair, I don't think there's a strong reason *against* not >> > > extending >> > > > > Windows -- in the end >> > > > > it will just mean adding a new #windowedBy method and copy/pasting >> > > > > everything from >> > > > > TimeWindowedKStream pretty much word for word. But anytime you >> find >> > > > > yourself >> > > > > copying over code almost exactly, there should probably be a good >> > > reason >> > > > > why :) >> > > > > >> > > > > >> > > > > On Wed, Jul 22, 2020 at 3:48 PM John Roesler <vvcep...@apache.org >> > >> > > wrote: >> > > > > >> > > > >> Thanks Leah! >> > > > >> >> > > > >> 5) Regarding the empty windows, I'm wondering if we should simply >> > > propose >> > > > >> that the windows should not be emitted downstream of the >> operator or >> > > > >> visible in IQ. Then, it'll be up to the implementation to make it >> > > happen. >> > > > >> I'm >> > > > >> personally not concerned about it, since it seems like there are >> > > multiple >> > > > >> ways to accomplish this. >> > > > >> >> > > > >> Note, the discrepancy Matthias pointed out is actually a design >> bug. >> > > The >> > > > >> windowed aggregation (like every operation in Streams) produces a >> > > "view", >> > > > >> which then forms the basis of downstream operations. When we >> pass the >> > > > >> Materialized option to the operation, all we're doing is saying >> to >> > > > >> "materialize" >> > > > >> the view (aka, actually store the computed view) and also make it >> > > > >> queriable. >> > > > >> It would be illegal for the "queriable, materialized view" to >> differ >> > > in >> > > > >> any way >> > > > >> from the "view". So, it seems we must either propose to emit the >> empty >> > > > >> windows AND make them visible in IQ, or propose NOT to emit the >> empty >> > > > >> windows AND NOT make them visible in IQ. >> > > > >> >> > > > >> 7) Regarding whether we can extend TimeWindows (or Windows): >> > > > >> I've been mulling this over more. I think it's worth asking the >> > > question of >> > > > >> what these classes even mean. For example, why is SessionWindows >> a >> > > > >> different thing from TimeWindows and UniversalWindows (which are >> both >> > > > >> Windows)? >> > > > >> >> > > > >> This conversation is extra complicated because of the incomplete >> and >> > > > >> mis-matched class hierarchy, but we can try to look past it for >> now. >> > > > >> >> > > > >> It seems like what Time and Universal (and any other Windows >> > > > >> implementation) have in common is that the windows are statically >> > > > >> enumerable. >> > > > >> As a consequence, they can all rely on an aggregation maintenence >> > > algorithm >> > > > >> that involves enumerating each of the windows and updating it. >> That >> > > > >> also means that their DSL object (TimeWindowedKStream) doesn't >> need >> > > > >> "subtractors" or "mergers", but only "adders"; again, this is a >> > > consequence >> > > > >> of the fact that the windows are enumerable. >> > > > >> >> > > > >> In contrast, session windows are data driven, so they are not >> > > statically >> > > > >> enumerable. Their algorithm has to rely on scans, and to do the >> scans, >> > > > >> it needs to know the "inactivity gap", which needs to be part of >> the >> > > window >> > > > >> definition. Likewise, session windows have the property that >> they need >> > > > >> to be merged, so their DSL object also requires mergers. >> > > > >> >> > > > >> It really seems like your new window definition doesn't fit into >> > > either >> > > > >> category. It uses a different algorithm, which relies on scans, >> but >> > > it is >> > > > >> also fixed in size, so it doesn't need mergers. In this >> situation, it >> > > seems >> > > > >> like the safe bet is to just create SessionWindows with no >> interface >> > > and >> > > > >> add a separate set of DSL operations and objects. It's a little >> extra >> > > code, >> > > > >> but it seems to keep everything tidier and more comprehensible, >> both >> > > > >> for us and for users. >> > > > >> >> > > > >> What do you think? >> > > > >> -John >> > > > >> >> > > > >> >> > > > >> >> > > > >> On Wed, Jul 22, 2020, at 10:30, Leah Thomas wrote: >> > > > >>> Hi Matthias, >> > > > >>> >> > > > >>> Thanks for the suggestions, I've updated the KIP and child page >> > > > >> accordingly >> > > > >>> and addressed some below. >> > > > >>> >> > > > >>> 1) For the mandatory grace period, we should use a static >> builder >> > > method >> > > > >>>> that take two parameters. >> > > > >>>> >> > > > >>> >> > > > >>> That makes sense, I've changed that in the public API. >> > > > >>> >> > > > >>> Btw: this implementation actually raises an issue for IQ: those >> empty >> > > > >>>> windows would be returned. >> > > > >>> >> > > > >>> >> > > > >>> This is a great point, with the current implementation plan >> empty >> > > windows >> > > > >>> would be returned. I think creating a second window store would >> > > > >> definitely >> > > > >>> work, but there would be more overhead in having two stores and >> > > switching >> > > > >>> windows between the stores, as well as doing scans in both >> stores to >> > > find >> > > > >>> existing windows. There might be a way to do avoid emitting >> empty >> > > windows >> > > > >>> without creating a second window store, I'll look more into it. >> > > > >>> >> > > > >>> Cheers, >> > > > >>> Leah >> > > > >>> >> > > > >>> On Tue, Jul 21, 2020 at 1:25 PM Matthias J. Sax < >> mj...@apache.org> >> > > > >> wrote: >> > > > >>> >> > > > >>>> Thanks for updating the KIP. >> > > > >>>> >> > > > >>>> Couple of follow up comments: >> > > > >>>> >> > > > >>>> 1) For the mandatory grace period, we should use a static >> builder >> > > > >> method >> > > > >>>> that take two parameters. This provides a better API as user >> cannot >> > > > >>>> forget to set the grace period. Throwing a runtime exception >> seems >> > > not >> > > > >>>> to be the best way to handle this case. >> > > > >>>> >> > > > >>>> >> > > > >>>> >> > > > >>>> 2) In Fig.2 you list 10 hopping windows. I believe it should >> > > actually >> > > > >> be >> > > > >>>> more? There first hopping window would be [-6,-4[ and the last >> one >> > > > >> would >> > > > >>>> be from [19,29[ -- hence, the cost saving are actually much >> higher. >> > > > >>>> >> > > > >>>> >> > > > >>>> >> > > > >>>> 3a) IQ: you are saying that the user need to compute the start >> time >> > > as >> > > > >>>> >> > > > >>>>> windowSize+the time they're looking at >> > > > >>>> >> > > > >>>> Should this be "targetTime - windowSize" instead? >> > > > >>>> >> > > > >>>> >> > > > >>>> >> > > > >>>> 3b) IQ: in you example you say "window size of 10 minutes" >> with an >> > > > >>>> incident at 9:15. >> > > > >>>> >> > > > >>>>> they're looking for a window with the start time of 8:15. >> > > > >>>> >> > > > >>>> The example does not seem to add up? >> > > > >>>> >> > > > >>>> >> > > > >>>> >> > > > >>>> 4) For "Processing Windows": you describe a three step >> approach: I >> > > just >> > > > >>>> want to point out, that step (1) is not necessary for each >> input >> > > > >> record, >> > > > >>>> because timestamps are not guaranteed to be unique and thus a >> > > previous >> > > > >>>> record with the same key and timestamp might have create the >> windows >> > > > >>>> already. >> > > > >>>> >> > > > >>>> Nit: I am also not exactly sure what you mean by step (3) as >> you use >> > > > >> the >> > > > >>>> word "send". I guess you mean "put"? >> > > > >>>> >> > > > >>>> It seem there are actually more details in the sub-page: >> > > > >>>> >> > > > >>>>> A new record for SlidingWindows will always create two new >> windows. >> > > > >> If >> > > > >>>> either of those windows already exist in the windows store, >> their >> > > > >>>> aggregation will simply be updated to include the new record, >> but no >> > > > >>>> duplicate window will be added to the WindowStore. >> > > > >>>> >> > > > >>>> However, the first and second sentence contradict each other a >> > > little >> > > > >>>> bit. I think the first sentence is not correct. >> > > > >>>> >> > > > >>>> Nit: >> > > > >>>> >> > > > >>>>> For in-order records, the left window will always be empty. >> > > > >>>> >> > > > >>>> This should be "right window" ? >> > > > >>>> >> > > > >>>> >> > > > >>>> >> > > > >>>> 5) "Emitting Results": it might be worth to point out, that a >> > > > >>>> second/future window of a new record is create with no >> records, and >> > > > >>>> thus, even if it's initialized it won't be emitted. Only if a >> > > > >>>> consecutive record falls into the window, the window would be >> > > updates >> > > > >>>> and the window result (for a window content of one record) >> would be >> > > > >> sent >> > > > >>>> downstream. >> > > > >>>> >> > > > >>>> Again, the sub-page contains this details. Might still be >> worth to >> > > add >> > > > >>>> to the top level page, too. >> > > > >>>> >> > > > >>>> Btw: this implementation actually raises an issue for IQ: those >> > > empty >> > > > >>>> windows would be returned. Thus I am wondering if we need to >> use two >> > > > >>>> stores internally? One store for actual windows and one store >> for >> > > empty >> > > > >>>> windows? If an empty window is updated, it's move to the other >> > > store? >> > > > >>>> For IQ, we only allow to query the non-empty-window store? >> > > > >>>> >> > > > >>>> >> > > > >>>> >> > > > >>>> 6) On the sub-page: >> > > > >>>> >> > > > >>>>> The left window of in-order records and both windows for >> > > out-of-order >> > > > >>>> records need to be updated with the values of records that have >> > > already >> > > > >>>> been processed. >> > > > >>>> >> > > > >>>> Why "both windows for out-of-order records"? IMHO, we don't >> know how >> > > > >>>> many existing windows needs to be updated when processing an >> > > > >>>> out-of-order record. Of course, an out-of-order record could >> not >> > > fall >> > > > >>>> into any existing window but create two new windows, too. >> > > > >>>> >> > > > >>>>> Because each record creates one new window that includes >> itself >> > > and >> > > > >> one >> > > > >>>> window that does not >> > > > >>>> >> > > > >>>> As state above, this does not seem to hold. I understand why >> you >> > > mean, >> > > > >>>> but it would be good to be exact. >> > > > >>>> >> > > > >>>> Figure 2: You use the term "late" but you mean "out-of-order" I >> > > guess >> > > > >> -- >> > > > >>>> a record is _late_ if it's not processed any longer as the >> grace >> > > period >> > > > >>>> passed already. >> > > > >>>> >> > > > >>>> Figure 2: "Late" should be out-or-order. The example text say a >> > > window >> > > > >>>> [16,26] should be created but the figure shows the green >> window as >> > > > >> [15,20]. >> > > > >>>> >> > > > >>>> About the blue window: maybe add not that the blue window >> contains >> > > the >> > > > >>>> aggregate we need for the green window, _before_ the new >> record `a` >> > > is >> > > > >>>> added to the blue window. >> > > > >>>> >> > > > >>>> >> > > > >>>> >> > > > >>>> 7) I am not really happy to extend TimeWindows and I think the >> > > argument >> > > > >>>> about JoinWindows is not the best (IMHO, JoinWindows do it >> already >> > > > >> wrong >> > > > >>>> and we just repeat the same mistake). However, it seems our >> window >> > > > >>>> hierarchy is "broken" already and it might be out of scope for >> this >> > > KIP >> > > > >>>> to fix it. Hence, I am ok that we bite the bullet for now and >> clean >> > > it >> > > > >>>> up later. >> > > > >>>> >> > > > >>>> >> > > > >>>> >> > > > >>>> -Matthias >> > > > >>>> >> > > > >>>> >> > > > >>>> On 7/20/20 5:18 PM, Guozhang Wang wrote: >> > > > >>>>> Hi Leah, >> > > > >>>>> >> > > > >>>>> Thanks for the updated KIP. I agree that extending >> SlidingWindows >> > > > >> from >> > > > >>>>> Windows is fine for the sake of not introducing more public >> APIs >> > > (and >> > > > >>>> their >> > > > >>>>> internal xxxImpl classes), and its cons is small enough to >> tolerate >> > > > >> to >> > > > >>>> me. >> > > > >>>>> >> > > > >>>>> >> > > > >>>>> Guozhang >> > > > >>>>> >> > > > >>>>> >> > > > >>>>> On Mon, Jul 20, 2020 at 1:49 PM Leah Thomas < >> ltho...@confluent.io> >> > > > >>>> wrote: >> > > > >>>>> >> > > > >>>>>> Hi all, >> > > > >>>>>> >> > > > >>>>>> Thanks for the feedback on the KIP. I've updated the KIP page >> > > > >>>>>> < >> > > > >>>>>> >> > > > >>>> >> > > > >> >> > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL >> > > > >>>>>>> >> > > > >>>>>> to address these points and have created a child page >> > > > >>>>>> < >> > > > >>>>>> >> > > > >>>> >> > > > >> >> > > >> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450 >> > > > >>>>>>> >> > > > >>>>>> to go more in depth on certain implementation details. >> > > > >>>>>> >> > > > >>>>>> *Grace Period:* >> > > > >>>>>> I think Sophie raises a good point that the default grace >> period >> > > of >> > > > >> 24 >> > > > >>>>>> hours is often too long and was chosen when retention time >> and >> > > grace >> > > > >>>> period >> > > > >>>>>> were the same. For SlidingWindows, I propose we make the >> grace >> > > > >> period >> > > > >>>>>> mandatory. To keep formatting consistent with other types of >> > > > >> windows, >> > > > >>>> grace >> > > > >>>>>> period won't be an additional parameter in the #of method, >> but >> > > will >> > > > >>>> still >> > > > >>>>>> look like it does in other use cases: >> > > > >>>>>> >> .windowedBy(SlidingWindows.of(twentySeconds).grace(fiftySeconds). >> > > If >> > > > >>>> grace >> > > > >>>>>> period isn't properly initialized, an error will be thrown >> through >> > > > >> the >> > > > >>>>>> process method. >> > > > >>>>>> >> > > > >>>>>> *Storage Layer + Aggregation:* >> > > > >>>>>> SlidingWindows will use a WindowStore because computation >> can be >> > > > >> done >> > > > >>>> with >> > > > >>>>>> the information stored in a WindowStore (window timestamp and >> > > > >> value). >> > > > >>>> Using >> > > > >>>>>> the WindowStore also simplifies computation as >> SlidingWindows can >> > > > >>>> leverage >> > > > >>>>>> existing processes. Because we are using a WindowStore, the >> > > > >> aggregation >> > > > >>>>>> process will be similar to that of a hopping window. As >> records >> > > > >> come in >> > > > >>>>>> their value is added to the aggregation that already exists, >> > > > >> following >> > > > >>>> the >> > > > >>>>>> same procedure as hopping windows. The aggregation difference >> > > > >> between >> > > > >>>>>> SlidingWindows and HoppingWindows comes in creating new >> windows >> > > for >> > > > >> a >> > > > >>>>>> SlidingWindow, where you need to find the existing records >> that >> > > > >> belong >> > > > >>>> to >> > > > >>>>>> the new window. This computation is similar to the >> aggregation in >> > > > >>>>>> SessionWindows and requires a scan to the WindowStore to >> find the >> > > > >> window >> > > > >>>>>> with the aggregation needed, which will always be >> pre-computed. >> > > The >> > > > >> scan >> > > > >>>>>> requires creating an iterator, but should have minimal >> performance >> > > > >>>> effects >> > > > >>>>>> as this strategy is already implemented in SessionWindows. >> More >> > > > >> details >> > > > >>>> on >> > > > >>>>>> finding the aggregation that needs to be put in a new window >> can >> > > be >> > > > >>>> found >> > > > >>>>>> on the implementation page >> > > > >>>>>> < >> > > > >>>>>> >> > > > >>>> >> > > > >> >> > > >> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450 >> > > > >>>>>>> >> > > > >>>>>> . >> > > > >>>>>> >> > > > >>>>>> *Extending Windows<TimeWindow>, Windows<Window> or nothing* >> > > > >>>>>> Because SlidingWindows are still defined by a windowSize >> (whereas >> > > > >>>>>> SessionWindows are defined purely by data), I think it makes >> sense >> > > > >> to >> > > > >>>>>> leverage the existing Window processes instead of creating a >> new >> > > > >> store >> > > > >>>> type >> > > > >>>>>> that would be very similar to the WindowStore. While it's >> true >> > > that >> > > > >> the >> > > > >>>>>> #windowsFor method isn't necessary for SlidingWindows, >> JoinWindows >> > > > >> also >> > > > >>>>>> extends Windows<Window> and throws an >> > > UnsupportedOperationException >> > > > >> in >> > > > >>>> the >> > > > >>>>>> #windowsFor method, which is what SlidingWindows can do. The >> > > > >> difference >> > > > >>>>>> between extending Windows<TimeWindow> or Windows<Window> is >> > > > >> minimal, as >> > > > >>>>>> both are ways to pass window parameters. Extending >> > > > >> Windows<TimeWindow> >> > > > >>>> will >> > > > >>>>>> give us more leverage in utilizing existing processes. >> > > > >>>>>> >> > > > >>>>>> *Emit Strategy* >> > > > >>>>>> I would argue that emitting for every update is still the >> best way >> > > > >> to go >> > > > >>>>>> for SlidingWindows because it mimics the other types of >> windows, >> > > and >> > > > >>>>>> suppression can be leveraged to limit what SlidingWindows >> emits. >> > > > >> While >> > > > >>>> some >> > > > >>>>>> users may only want to see the last value, others may want >> to see >> > > > >> more, >> > > > >>>> and >> > > > >>>>>> leaving the emit strategy to emit partial results allows both >> > > users >> > > > >> to >> > > > >>>>>> access what they want. >> > > > >>>>>> >> > > > >>>>>> *Additional Features* >> > > > >>>>>> Supporting sliding windows inherently, and shifting >> inefficient >> > > > >> hopping >> > > > >>>>>> windows to sliding windows, is an interesting idea and could >> be >> > > > >> built on >> > > > >>>>>> top of SlidingWindows when they are finished, but right now >> seems >> > > > >> out of >> > > > >>>>>> scope for the needs of this KIP. Similarly, including a >> > > > >> `subtraction` >> > > > >>>>>> feature could have performance improvements, but doesn't seem >> > > > >> necessary >> > > > >>>> for >> > > > >>>>>> the implementation of this KIP. >> > > > >>>>>> >> > > > >>>>>> Let me know what you think of the updates, >> > > > >>>>>> >> > > > >>>>>> Leah >> > > > >>>>>> >> > > > >>>>>> On Thu, Jul 16, 2020 at 11:57 AM John Roesler < >> > > vvcep...@apache.org> >> > > > >>>> wrote: >> > > > >>>>>> >> > > > >>>>>>> Hello all, >> > > > >>>>>>> >> > > > >>>>>>> Thanks for the KIP, Leah! >> > > > >>>>>>> >> > > > >>>>>>> Regarding (1): I'd go farther actually. Making Windows an >> > > abstract >> > > > >>>>>>> class was a mistake from the beginning that led to us not >> being >> > > > >>>>>>> able to fix a very confusing situation for users around >> retention >> > > > >>>> times, >> > > > >>>>>>> final results emitting, etc. Thus, I would not suggest >> extending >> > > > >>>>>>> TimeWindows for sure, but would also not suggest extending >> > > Windows. >> > > > >>>>>>> >> > > > >>>>>>> The very simplest thing to do is follow the example of >> > > > >> SessionWindows, >> > > > >>>>>>> which is just a completely self-contained class. If we >> don't mess >> > > > >> with >> > > > >>>>>>> class inheritance, we won't ever have any of the problems >> related >> > > > >> to >> > > > >>>>>>> class inheritance. This is my preferred solution. >> > > > >>>>>>> >> > > > >>>>>>> Still, Sliding windows has a lot in common with TimeWindows >> and >> > > > >> other >> > > > >>>>>>> fixed-size windows, namely that the windows are fixed in >> size. If >> > > > >> we >> > > > >>>> want >> > > > >>>>>>> to preserve the current two-part windowing API in which you >> can >> > > > >> window >> > > > >>>>>>> by either "fixed" or "data driven" modes, I'd suggest we >> avoid >> > > > >>>> increasing >> > > > >>>>>>> the blast radius of Windows by taking the opportunity to >> replace >> > > it >> > > > >>>> with >> > > > >>>>>>> a proper interface and implement that interface instead. >> > > > >>>>>>> >> > > > >>>>>>> For example: >> > > > >>>>>>> https://github.com/apache/kafka/pull/9031 >> > > > >>>>>>> >> > > > >>>>>>> Then, SlidingWindows would just implement >> > > FixedSizeWindowDefinition >> > > > >>>>>>> >> > > > >>>>>>> ====== >> > > > >>>>>>> >> > > > >>>>>>> Regarding (2), it seems more straightforward as a user of >> Streams >> > > > >>>>>>> to just have one mental model. _All_ of our aggregation >> > > operations >> > > > >>>>>>> follow an eager emission model, in which we just emit an >> update >> > > > >>>> whenever >> > > > >>>>>>> an update is available. We already provided Suppression to >> > > > >> explicitly >> > > > >>>>>> apply >> > > > >>>>>>> different update semantics in the case it's required. Why >> should >> > > we >> > > > >>>>>> define >> > > > >>>>>>> a snowflake operation with completely different semantics >> from >> > > > >>>> everything >> > > > >>>>>>> else? I.e., systems are generally easier to use when they >> follow >> > > a >> > > > >> few >> > > > >>>>>>> simple, composable rules than when they have a lot of >> different, >> > > > >>>> specific >> > > > >>>>>>> rules. >> > > > >>>>>>> >> > > > >>>>>>> >> > > > >>>>>>> ====== >> > > > >>>>>>> >> > > > >>>>>>> New point: (4): >> > > > >>>>>>> It would be nice to include some examples of user code that >> would >> > > > >> use >> > > > >>>> the >> > > > >>>>>>> new API, which should include: >> > > > >>>>>>> 1. using the DSL with the sliding window definition >> > > > >>>>>>> 2. accessing the stored results of a sliding window >> aggregation >> > > > >> via IQ >> > > > >>>>>>> 3. defining a custom processor to access sliding windows in >> a >> > > store >> > > > >>>>>>> >> > > > >>>>>>> It generally helps reviewers wrap their heads around the >> > > proposal, >> > > > >> as >> > > > >>>>>> well >> > > > >>>>>>> as shaking out any design issues that would otherwise only >> come >> > > up >> > > > >>>> during >> > > > >>>>>>> implementation/testing/review. >> > > > >>>>>>> >> > > > >>>>>>> Thanks again for the awesome proposal! >> > > > >>>>>>> -John >> > > > >>>>>>> >> > > > >>>>>>> >> > > > >>>>>>> On Tue, Jul 14, 2020, at 12:31, Guozhang Wang wrote: >> > > > >>>>>>>> Hello Leah, >> > > > >>>>>>>> >> > > > >>>>>>>> Thanks for the nice written KIP. A few thoughts: >> > > > >>>>>>>> >> > > > >>>>>>>> 1) I echo the other reviewer's comments regarding the >> typing: >> > > why >> > > > >>>>>>> extending >> > > > >>>>>>>> TimeWindow instead of just extending Window? >> > > > >>>>>>>> >> > > > >>>>>>>> 2) I also feel that emitting policy for this type of >> windowing >> > > > >>>>>>> aggregation >> > > > >>>>>>>> may be different from the existing ones. Existing emitting >> > > policy >> > > > >> is >> > > > >>>>>> very >> > > > >>>>>>>> simple: emit every time when window get updates, and emit >> every >> > > > >> time >> > > > >>>> on >> > > > >>>>>>>> out-of-ordering data within grace period, this is because >> for >> > > > >>>>>>> time-windows >> > > > >>>>>>>> the window close time is strictly depend on the window >> start >> > > time >> > > > >>>> which >> > > > >>>>>>> is >> > > > >>>>>>>> fixed, while for session-windows although the window >> open/close >> > > > >> time >> > > > >>>> is >> > > > >>>>>>>> also data-dependent it is relatively infrequent compared >> to the >> > > > >>>>>>>> sliding-windows. For this KIP, since each new data would >> cause a >> > > > >>>>>>>> new sliding-window, the num. windows maintained logically >> could >> > > be >> > > > >>>> much >> > > > >>>>>>>> larger and hence emitting on each update may be too >> aggressive. >> > > > >>>>>>>> >> > > > >>>>>>>> 3) Although KIP itself should be focusing on user face >> > > > >> interfaces, I'd >> > > > >>>>>>>> suggest we create a children page of KIP-450 discussing >> about >> > > its >> > > > >>>>>>>> implementations as well, since some of that may drive the >> > > > >> interface >> > > > >>>>>>> design. >> > > > >>>>>>>> E.g. personally I think having a combiner interface in >> addition >> > > to >> > > > >>>>>>>> aggregator would be useful but that's based on my 2cents >> about >> > > the >> > > > >>>>>>>> implementation design (I once created a child page >> describing >> > > it: >> > > > >>>>>>>> >> > > > >>>>>>> >> > > > >>>>>> >> > > > >>>> >> > > > >> >> > > >> https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation >> > > > >>>>>>>> ). >> > > > >>>>>>>> >> > > > >>>>>>>> >> > > > >>>>>>>> Guozhang >> > > > >>>>>>>> >> > > > >>>>>>>> >> > > > >>>>>>>> >> > > > >>>>>>>> >> > > > >>>>>>>> On Tue, Jul 14, 2020 at 5:31 AM Bruno Cadonna < >> > > br...@confluent.io >> > > > >>> >> > > > >>>>>>> wrote: >> > > > >>>>>>>> >> > > > >>>>>>>>> Hi Leah, >> > > > >>>>>>>>> >> > > > >>>>>>>>> Thank you for the KIP! >> > > > >>>>>>>>> >> > > > >>>>>>>>> Here is my feedback: >> > > > >>>>>>>>> >> > > > >>>>>>>>> 1. The KIP would benefit from some code examples that >> show how >> > > > >> to use >> > > > >>>>>>>>> sliding windows in aggregations. >> > > > >>>>>>>>> >> > > > >>>>>>>>> 2. The different sliding windows in Figure 1 and 2 are >> really >> > > > >> hard to >> > > > >>>>>>>>> distinguish. Could you please try to make them graphically >> > > better >> > > > >>>>>>>>> distinguishable? You could try to draw the frames of >> > > consecutive >> > > > >>>>>>>>> windows shifted to each other. >> > > > >>>>>>>>> >> > > > >>>>>>>>> 3. I agree with Matthias, that extending >> Windows<TimeWindow> >> > > > >> does not >> > > > >>>>>>>>> seem to be the best approach. What would be the result of >> > > > >>>>>>>>> windowsFor()? >> > > > >>>>>>>>> >> > > > >>>>>>>>> 4. In the section "Public Interfaces" you should remove >> > > > >>>>>> implementation >> > > > >>>>>>>>> details like private constructors and private fields. >> > > > >>>>>>>>> >> > > > >>>>>>>>> 5. Do we need a new store interface or can we use >> WindowStore? >> > > > >> Some >> > > > >>>>>>>>> words about that would be informative. >> > > > >>>>>>>>> >> > > > >>>>>>>>> 6. @Matthias, if the subtrator is not strictly needed, I >> would >> > > > >> skip >> > > > >>>>>> it >> > > > >>>>>>>>> for now and add it later. >> > > > >>>>>>>>> >> > > > >>>>>>>>> 7. I also agree that having a section that describes how >> to >> > > > >> handle >> > > > >>>>>>>>> out-of-order records would be good to understand what is >> still >> > > > >>>>>> missing >> > > > >>>>>>>>> and what we can reuse. >> > > > >>>>>>>>> >> > > > >>>>>>>>> Best, >> > > > >>>>>>>>> Bruno >> > > > >>>>>>>>> >> > > > >>>>>>>>> On Sat, Jul 11, 2020 at 9:16 PM Matthias J. Sax < >> > > > >> mj...@apache.org> >> > > > >>>>>>> wrote: >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> Leah, >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> thanks for your update. However, it does not completely >> answer >> > > > >> my >> > > > >>>>>>>>> question. >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> In our current window implementations, we emit a window >> result >> > > > >>>>>> update >> > > > >>>>>>>>>> record (ie, early/partial result) for each input record. >> When >> > > an >> > > > >>>>>>>>>> out-of-order record arrives, we just update to >> corresponding >> > > old >> > > > >>>>>>> window >> > > > >>>>>>>>>> and emit another update. >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> It's unclear from the KIP if you propose the same emit >> > > > >> strategy? -- >> > > > >>>>>>> For >> > > > >>>>>>>>>> sliding windows it might be worth to consider to use a >> > > different >> > > > >>>>>> emit >> > > > >>>>>>>>>> strategy and only support emitting the final result only >> (ie, >> > > > >> after >> > > > >>>>>>> the >> > > > >>>>>>>>>> grace period passed)? >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> Boyang, also raises a good point that relates to my >> point from >> > > > >>>>>> above >> > > > >>>>>>>>>> about pre-aggregations and storage layout. Our current >> time >> > > > >> windows >> > > > >>>>>>> are >> > > > >>>>>>>>>> all pre-aggregated and stored in parallel. We can also >> lookup >> > > > >>>>>> windows >> > > > >>>>>>>>>> efficiently, as we can compute the windowed-key given the >> > > input >> > > > >>>>>>> record >> > > > >>>>>>>>>> key and timestamp based on the window definition. >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> However, for sliding windows, window boundaries are data >> > > > >> dependent >> > > > >>>>>>> and >> > > > >>>>>>>>>> thus we cannot compute them upfront. Thus, how can we >> "find" >> > > > >>>>>> existing >> > > > >>>>>>>>>> window efficiently? Furthermore, out-of-order data would >> > > create >> > > > >> new >> > > > >>>>>>>>>> windows in the past and we need to be able to handle this >> > > case. >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> Thus, to handle out-of-order data correctly, we need to >> store >> > > > >> all >> > > > >>>>>> raw >> > > > >>>>>>>>>> input events. Additionally, we could also store >> pre-aggregated >> > > > >>>>>>> results >> > > > >>>>>>>>>> if we thinks it's benfitial. -- If we apply "emit only >> final >> > > > >>>>>> results" >> > > > >>>>>>>>>> strategy, storing pre-aggregated result would not be >> necessary >> > > > >>>>>>> though. >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> Btw: for sliding windows it might also be useful to >> consider >> > > > >>>>>> allowing >> > > > >>>>>>>>>> users to supply a `Subtractor` -- this subtractor could >> be >> > > > >> applied >> > > > >>>>>> on >> > > > >>>>>>>>>> the current window result (in case we store it) if a >> record >> > > > >> drops >> > > > >>>>>>> out of >> > > > >>>>>>>>>> the window. Of course, not all aggregation functions are >> > > > >>>>>> subtractable >> > > > >>>>>>>>>> and we can consider this as a follow up task, too, and >> not >> > > > >> include >> > > > >>>>>> in >> > > > >>>>>>>>>> this KIP for now. Thoughts? >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> I was also thinking about the type hierarchy. I am not >> sure if >> > > > >>>>>>> extending >> > > > >>>>>>>>>> TimeWindow is the best approach? For TimeWindows, we can >> > > > >>>>>> pre-compute >> > > > >>>>>>>>>> window boundaries (cf `windowsFor()`) while for a sliding >> > > window >> > > > >>>>>> the >> > > > >>>>>>>>>> boundaries are data dependent. Session windows are also >> data >> > > > >>>>>>> dependent >> > > > >>>>>>>>>> and thus they don't inherit from TimeWindow (Maybe check >> out >> > > the >> > > > >>>>>> KIP >> > > > >>>>>>>>>> that added session windows? It could provides some good >> > > > >> insights.) >> > > > >>>>>>> -- I >> > > > >>>>>>>>>> believe the same rational applies to sliding windows? >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> -Matthias >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> On 7/10/20 12:47 PM, Boyang Chen wrote: >> > > > >>>>>>>>>>> Thanks Leah and Sophie for the KIP. >> > > > >>>>>>>>>>> >> > > > >>>>>>>>>>> 1. I'm a bit surprised that we don't have an advance >> time. >> > > > >> Could >> > > > >>>>>> we >> > > > >>>>>>>>>>> elaborate how the storage layer is structured? >> > > > >>>>>>>>>>> >> > > > >>>>>>>>>>> 2. IIUC, there will be extra cost in terms of fetching >> > > > >>>>>> aggregation >> > > > >>>>>>>>> results, >> > > > >>>>>>>>>>> since we couldn't pre-aggregate until the user asks for >> it. >> > > > >> Would >> > > > >>>>>>> be >> > > > >>>>>>>>> good >> > > > >>>>>>>>>>> to also discuss it. >> > > > >>>>>>>>>>> >> > > > >>>>>>>>>>> 3. We haven't discussed the possibility of supporting >> sliding >> > > > >>>>>>> windows >> > > > >>>>>>>>>>> inherently. For a user who actually uses a hopping >> window, >> > > > >>>>>> Streams >> > > > >>>>>>>>> could >> > > > >>>>>>>>>>> detect such an inefficiency doing a >> window_size/advance_time >> > > > >>>>>> ratio >> > > > >>>>>>> to >> > > > >>>>>>>>> reach >> > > > >>>>>>>>>>> a conclusion on whether the write amplification is too >> high >> > > > >>>>>>> compared >> > > > >>>>>>>>> with >> > > > >>>>>>>>>>> some configured threshold. The benefit of doing so is >> that >> > > > >>>>>> existing >> > > > >>>>>>>>> Streams >> > > > >>>>>>>>>>> users don't need to change their code, learn a new API, >> but >> > > > >> only >> > > > >>>>>> to >> > > > >>>>>>>>> upgrade >> > > > >>>>>>>>>>> Streams library to get benefits for their inefficient >> hopping >> > > > >>>>>>> window >> > > > >>>>>>>>>>> implementation. There might be some compatibility >> issues for >> > > > >>>>>> sure, >> > > > >>>>>>> but >> > > > >>>>>>>>>>> worth listing them out for trade-off. >> > > > >>>>>>>>>>> >> > > > >>>>>>>>>>> Boyang >> > > > >>>>>>>>>>> >> > > > >>>>>>>>>>> On Fri, Jul 10, 2020 at 12:40 PM Leah Thomas < >> > > > >>>>>> ltho...@confluent.io >> > > > >>>>>>>> >> > > > >>>>>>>>> wrote: >> > > > >>>>>>>>>>> >> > > > >>>>>>>>>>>> Hey Matthias, >> > > > >>>>>>>>>>>> >> > > > >>>>>>>>>>>> Thanks for pointing that out. I added the following to >> the >> > > > >>>>>> Propose >> > > > >>>>>>>>> Changes >> > > > >>>>>>>>>>>> section of the KIP: >> > > > >>>>>>>>>>>> >> > > > >>>>>>>>>>>> "Records that come out of order will be processed the >> same >> > > way >> > > > >>>>>> as >> > > > >>>>>>>>> in-order >> > > > >>>>>>>>>>>> records, as long as they fall within the grace period. >> Any >> > > new >> > > > >>>>>>> windows >> > > > >>>>>>>>>>>> created by the late record will still be created, and >> the >> > > > >>>>>> existing >> > > > >>>>>>>>> windows >> > > > >>>>>>>>>>>> that are changed by the late record will be updated. >> Any >> > > > >> record >> > > > >>>>>>> that >> > > > >>>>>>>>> falls >> > > > >>>>>>>>>>>> outside of the grace period (either user defined or >> default) >> > > > >>>>>> will >> > > > >>>>>>> be >> > > > >>>>>>>>>>>> discarded. " >> > > > >>>>>>>>>>>> >> > > > >>>>>>>>>>>> All the best, >> > > > >>>>>>>>>>>> Leah >> > > > >>>>>>>>>>>> >> > > > >>>>>>>>>>>> On Thu, Jul 9, 2020 at 9:47 PM Matthias J. Sax < >> > > > >>>>>> mj...@apache.org> >> > > > >>>>>>>>> wrote: >> > > > >>>>>>>>>>>> >> > > > >>>>>>>>>>>>> Leah, >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> thanks a lot for the KIP. Very well written. >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> The KIP does not talk about the handling of >> out-of-order >> > > data >> > > > >>>>>>> though. >> > > > >>>>>>>>>>>>> How do you propose to address this? >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> -Matthias >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> On 7/8/20 5:33 PM, Leah Thomas wrote: >> > > > >>>>>>>>>>>>>> Hi all, >> > > > >>>>>>>>>>>>>> I'd like to kick-off the discussion for KIP-450, >> adding >> > > > >>>>>> sliding >> > > > >>>>>>>>> window >> > > > >>>>>>>>>>>>>> aggregation support to Kafka Streams. >> > > > >>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>> >> > > > >>>>>>>>> >> > > > >>>>>>> >> > > > >>>>>> >> > > > >>>> >> > > > >> >> > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL >> > > > >>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>>> Let me know what you think, >> > > > >>>>>>>>>>>>>> Leah >> > > > >>>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>>> >> > > > >>>>>>>>>>>> >> > > > >>>>>>>>>>> >> > > > >>>>>>>>>> >> > > > >>>>>>>>> >> > > > >>>>>>>> >> > > > >>>>>>>> >> > > > >>>>>>>> -- >> > > > >>>>>>>> -- Guozhang >> > > > >>>>>>>> >> > > > >>>>>>> >> > > > >>>>>> >> > > > >>>>> >> > > > >>>>> >> > > > >>>> >> > > > >>>> >> > > > >>> >> > > > >> >> > > > > >> > > > >> > > > >> > > > Attachments: >> > > > * signature.asc >> > > >> > >> >