I think what John tries to say is the following: We have `windowedBy(Windows)` that accept hopping/tumbling windows but also custom window and we use a specific algorithm. Note, that custom windows must "work" based on the used algorithm.
For session windows we have `windowedBy(SessionWindows)` and apply a different algorithm. Users could pass in a custom `SessionWindows` as long as the session algorithm works correctly for it. For the new sliding windows, we want to use a different algorithm compare to hopping/tumbling windows. If we let sliding window extend `Windows`, we can decide at runtime if we need to use the hopping/tumbling window algorithm for hopping/tumbling windows or the new sliding window algorithm for sliding windows. However, if we get a custom window, which algorithm do we pick now? The existing tumbling/hopping window algorithm of the new sliding window algorithm? Both a custom "time-window" and custom "sliding window" implement the generic `Windows` class and thus we cannot make a decision as we don't know the user's intent. As a matter of fact, even if the user might not be aware of it, the algorithm we use does already leak into the API (if a user extends `Windows` is must work with our hopping/tumbling window algorithm and if a user extends `SessionWindows` it must work with our session algorithm) and it seems we need to preserve this property for sliding window. -Matthias On 7/22/20 4:35 PM, Sophie Blee-Goldman wrote: > Hey John, > > Just a few follow-up questions/comments about the whole Windows thing: > > That's a good way of looking at things; in particular the point about > SessionWindows > for example requiring a Merger while other "statically enumerable" windows > require > only an adder seems to touch on the heart of the matter. > > It seems like what Time and Universal (and any other Windows >> implementation) have in common is that the windows are statically >> enumerable. >> As a consequence, they can all rely on an aggregation maintenence algorithm >> that involves enumerating each of the windows and updating it. That >> also means that their DSL object (TimeWindowedKStream) doesn't need >> "subtractors" or "mergers", but only "adders"; again, this is a consequence >> of the fact that the windows are enumerable. > > > Given that, I'm a bit confused why you conclude that sliding windows are > fundamentally > different from the "statically enumerable" windows -- sliding windows > require only an > adder too. I'm not sure it's a consequence of being enumerable, or that > being enumerable > is the fundamental property that unites all Windows (ignoring JoinWindows > here). Yes, it > currently does apply to all Windows implementations, but we shouldn't > assume that it > *has *to be that way on the basis that it currently happens to be. > > Also, the fact that they can all rely on the same aggregation algorithm > seems like an > implementation detail and it would be weird to force a separate/new DSL API > just because > under the covers we swap in a different processor. > > To be fair, I don't think there's a strong reason *against* not extending > Windows -- in the end > it will just mean adding a new #windowedBy method and copy/pasting > everything from > TimeWindowedKStream pretty much word for word. But anytime you find > yourself > copying over code almost exactly, there should probably be a good reason > why :) > > > On Wed, Jul 22, 2020 at 3:48 PM John Roesler <vvcep...@apache.org> wrote: > >> Thanks Leah! >> >> 5) Regarding the empty windows, I'm wondering if we should simply propose >> that the windows should not be emitted downstream of the operator or >> visible in IQ. Then, it'll be up to the implementation to make it happen. >> I'm >> personally not concerned about it, since it seems like there are multiple >> ways to accomplish this. >> >> Note, the discrepancy Matthias pointed out is actually a design bug. The >> windowed aggregation (like every operation in Streams) produces a "view", >> which then forms the basis of downstream operations. When we pass the >> Materialized option to the operation, all we're doing is saying to >> "materialize" >> the view (aka, actually store the computed view) and also make it >> queriable. >> It would be illegal for the "queriable, materialized view" to differ in >> any way >> from the "view". So, it seems we must either propose to emit the empty >> windows AND make them visible in IQ, or propose NOT to emit the empty >> windows AND NOT make them visible in IQ. >> >> 7) Regarding whether we can extend TimeWindows (or Windows): >> I've been mulling this over more. I think it's worth asking the question of >> what these classes even mean. For example, why is SessionWindows a >> different thing from TimeWindows and UniversalWindows (which are both >> Windows)? >> >> This conversation is extra complicated because of the incomplete and >> mis-matched class hierarchy, but we can try to look past it for now. >> >> It seems like what Time and Universal (and any other Windows >> implementation) have in common is that the windows are statically >> enumerable. >> As a consequence, they can all rely on an aggregation maintenence algorithm >> that involves enumerating each of the windows and updating it. That >> also means that their DSL object (TimeWindowedKStream) doesn't need >> "subtractors" or "mergers", but only "adders"; again, this is a consequence >> of the fact that the windows are enumerable. >> >> In contrast, session windows are data driven, so they are not statically >> enumerable. Their algorithm has to rely on scans, and to do the scans, >> it needs to know the "inactivity gap", which needs to be part of the window >> definition. Likewise, session windows have the property that they need >> to be merged, so their DSL object also requires mergers. >> >> It really seems like your new window definition doesn't fit into either >> category. It uses a different algorithm, which relies on scans, but it is >> also fixed in size, so it doesn't need mergers. In this situation, it seems >> like the safe bet is to just create SessionWindows with no interface and >> add a separate set of DSL operations and objects. It's a little extra code, >> but it seems to keep everything tidier and more comprehensible, both >> for us and for users. >> >> What do you think? >> -John >> >> >> >> On Wed, Jul 22, 2020, at 10:30, Leah Thomas wrote: >>> Hi Matthias, >>> >>> Thanks for the suggestions, I've updated the KIP and child page >> accordingly >>> and addressed some below. >>> >>> 1) For the mandatory grace period, we should use a static builder method >>>> that take two parameters. >>>> >>> >>> That makes sense, I've changed that in the public API. >>> >>> Btw: this implementation actually raises an issue for IQ: those empty >>>> windows would be returned. >>> >>> >>> This is a great point, with the current implementation plan empty windows >>> would be returned. I think creating a second window store would >> definitely >>> work, but there would be more overhead in having two stores and switching >>> windows between the stores, as well as doing scans in both stores to find >>> existing windows. There might be a way to do avoid emitting empty windows >>> without creating a second window store, I'll look more into it. >>> >>> Cheers, >>> Leah >>> >>> On Tue, Jul 21, 2020 at 1:25 PM Matthias J. Sax <mj...@apache.org> >> wrote: >>> >>>> Thanks for updating the KIP. >>>> >>>> Couple of follow up comments: >>>> >>>> 1) For the mandatory grace period, we should use a static builder >> method >>>> that take two parameters. This provides a better API as user cannot >>>> forget to set the grace period. Throwing a runtime exception seems not >>>> to be the best way to handle this case. >>>> >>>> >>>> >>>> 2) In Fig.2 you list 10 hopping windows. I believe it should actually >> be >>>> more? There first hopping window would be [-6,-4[ and the last one >> would >>>> be from [19,29[ -- hence, the cost saving are actually much higher. >>>> >>>> >>>> >>>> 3a) IQ: you are saying that the user need to compute the start time as >>>> >>>>> windowSize+the time they're looking at >>>> >>>> Should this be "targetTime - windowSize" instead? >>>> >>>> >>>> >>>> 3b) IQ: in you example you say "window size of 10 minutes" with an >>>> incident at 9:15. >>>> >>>>> they're looking for a window with the start time of 8:15. >>>> >>>> The example does not seem to add up? >>>> >>>> >>>> >>>> 4) For "Processing Windows": you describe a three step approach: I just >>>> want to point out, that step (1) is not necessary for each input >> record, >>>> because timestamps are not guaranteed to be unique and thus a previous >>>> record with the same key and timestamp might have create the windows >>>> already. >>>> >>>> Nit: I am also not exactly sure what you mean by step (3) as you use >> the >>>> word "send". I guess you mean "put"? >>>> >>>> It seem there are actually more details in the sub-page: >>>> >>>>> A new record for SlidingWindows will always create two new windows. >> If >>>> either of those windows already exist in the windows store, their >>>> aggregation will simply be updated to include the new record, but no >>>> duplicate window will be added to the WindowStore. >>>> >>>> However, the first and second sentence contradict each other a little >>>> bit. I think the first sentence is not correct. >>>> >>>> Nit: >>>> >>>>> For in-order records, the left window will always be empty. >>>> >>>> This should be "right window" ? >>>> >>>> >>>> >>>> 5) "Emitting Results": it might be worth to point out, that a >>>> second/future window of a new record is create with no records, and >>>> thus, even if it's initialized it won't be emitted. Only if a >>>> consecutive record falls into the window, the window would be updates >>>> and the window result (for a window content of one record) would be >> sent >>>> downstream. >>>> >>>> Again, the sub-page contains this details. Might still be worth to add >>>> to the top level page, too. >>>> >>>> Btw: this implementation actually raises an issue for IQ: those empty >>>> windows would be returned. Thus I am wondering if we need to use two >>>> stores internally? One store for actual windows and one store for empty >>>> windows? If an empty window is updated, it's move to the other store? >>>> For IQ, we only allow to query the non-empty-window store? >>>> >>>> >>>> >>>> 6) On the sub-page: >>>> >>>>> The left window of in-order records and both windows for out-of-order >>>> records need to be updated with the values of records that have already >>>> been processed. >>>> >>>> Why "both windows for out-of-order records"? IMHO, we don't know how >>>> many existing windows needs to be updated when processing an >>>> out-of-order record. Of course, an out-of-order record could not fall >>>> into any existing window but create two new windows, too. >>>> >>>>> Because each record creates one new window that includes itself and >> one >>>> window that does not >>>> >>>> As state above, this does not seem to hold. I understand why you mean, >>>> but it would be good to be exact. >>>> >>>> Figure 2: You use the term "late" but you mean "out-of-order" I guess >> -- >>>> a record is _late_ if it's not processed any longer as the grace period >>>> passed already. >>>> >>>> Figure 2: "Late" should be out-or-order. The example text say a window >>>> [16,26] should be created but the figure shows the green window as >> [15,20]. >>>> >>>> About the blue window: maybe add not that the blue window contains the >>>> aggregate we need for the green window, _before_ the new record `a` is >>>> added to the blue window. >>>> >>>> >>>> >>>> 7) I am not really happy to extend TimeWindows and I think the argument >>>> about JoinWindows is not the best (IMHO, JoinWindows do it already >> wrong >>>> and we just repeat the same mistake). However, it seems our window >>>> hierarchy is "broken" already and it might be out of scope for this KIP >>>> to fix it. Hence, I am ok that we bite the bullet for now and clean it >>>> up later. >>>> >>>> >>>> >>>> -Matthias >>>> >>>> >>>> On 7/20/20 5:18 PM, Guozhang Wang wrote: >>>>> Hi Leah, >>>>> >>>>> Thanks for the updated KIP. I agree that extending SlidingWindows >> from >>>>> Windows is fine for the sake of not introducing more public APIs (and >>>> their >>>>> internal xxxImpl classes), and its cons is small enough to tolerate >> to >>>> me. >>>>> >>>>> >>>>> Guozhang >>>>> >>>>> >>>>> On Mon, Jul 20, 2020 at 1:49 PM Leah Thomas <ltho...@confluent.io> >>>> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> Thanks for the feedback on the KIP. I've updated the KIP page >>>>>> < >>>>>> >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL >>>>>>> >>>>>> to address these points and have created a child page >>>>>> < >>>>>> >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450 >>>>>>> >>>>>> to go more in depth on certain implementation details. >>>>>> >>>>>> *Grace Period:* >>>>>> I think Sophie raises a good point that the default grace period of >> 24 >>>>>> hours is often too long and was chosen when retention time and grace >>>> period >>>>>> were the same. For SlidingWindows, I propose we make the grace >> period >>>>>> mandatory. To keep formatting consistent with other types of >> windows, >>>> grace >>>>>> period won't be an additional parameter in the #of method, but will >>>> still >>>>>> look like it does in other use cases: >>>>>> .windowedBy(SlidingWindows.of(twentySeconds).grace(fiftySeconds). If >>>> grace >>>>>> period isn't properly initialized, an error will be thrown through >> the >>>>>> process method. >>>>>> >>>>>> *Storage Layer + Aggregation:* >>>>>> SlidingWindows will use a WindowStore because computation can be >> done >>>> with >>>>>> the information stored in a WindowStore (window timestamp and >> value). >>>> Using >>>>>> the WindowStore also simplifies computation as SlidingWindows can >>>> leverage >>>>>> existing processes. Because we are using a WindowStore, the >> aggregation >>>>>> process will be similar to that of a hopping window. As records >> come in >>>>>> their value is added to the aggregation that already exists, >> following >>>> the >>>>>> same procedure as hopping windows. The aggregation difference >> between >>>>>> SlidingWindows and HoppingWindows comes in creating new windows for >> a >>>>>> SlidingWindow, where you need to find the existing records that >> belong >>>> to >>>>>> the new window. This computation is similar to the aggregation in >>>>>> SessionWindows and requires a scan to the WindowStore to find the >> window >>>>>> with the aggregation needed, which will always be pre-computed. The >> scan >>>>>> requires creating an iterator, but should have minimal performance >>>> effects >>>>>> as this strategy is already implemented in SessionWindows. More >> details >>>> on >>>>>> finding the aggregation that needs to be put in a new window can be >>>> found >>>>>> on the implementation page >>>>>> < >>>>>> >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450 >>>>>>> >>>>>> . >>>>>> >>>>>> *Extending Windows<TimeWindow>, Windows<Window> or nothing* >>>>>> Because SlidingWindows are still defined by a windowSize (whereas >>>>>> SessionWindows are defined purely by data), I think it makes sense >> to >>>>>> leverage the existing Window processes instead of creating a new >> store >>>> type >>>>>> that would be very similar to the WindowStore. While it's true that >> the >>>>>> #windowsFor method isn't necessary for SlidingWindows, JoinWindows >> also >>>>>> extends Windows<Window> and throws an UnsupportedOperationException >> in >>>> the >>>>>> #windowsFor method, which is what SlidingWindows can do. The >> difference >>>>>> between extending Windows<TimeWindow> or Windows<Window> is >> minimal, as >>>>>> both are ways to pass window parameters. Extending >> Windows<TimeWindow> >>>> will >>>>>> give us more leverage in utilizing existing processes. >>>>>> >>>>>> *Emit Strategy* >>>>>> I would argue that emitting for every update is still the best way >> to go >>>>>> for SlidingWindows because it mimics the other types of windows, and >>>>>> suppression can be leveraged to limit what SlidingWindows emits. >> While >>>> some >>>>>> users may only want to see the last value, others may want to see >> more, >>>> and >>>>>> leaving the emit strategy to emit partial results allows both users >> to >>>>>> access what they want. >>>>>> >>>>>> *Additional Features* >>>>>> Supporting sliding windows inherently, and shifting inefficient >> hopping >>>>>> windows to sliding windows, is an interesting idea and could be >> built on >>>>>> top of SlidingWindows when they are finished, but right now seems >> out of >>>>>> scope for the needs of this KIP. Similarly, including a >> `subtraction` >>>>>> feature could have performance improvements, but doesn't seem >> necessary >>>> for >>>>>> the implementation of this KIP. >>>>>> >>>>>> Let me know what you think of the updates, >>>>>> >>>>>> Leah >>>>>> >>>>>> On Thu, Jul 16, 2020 at 11:57 AM John Roesler <vvcep...@apache.org> >>>> wrote: >>>>>> >>>>>>> Hello all, >>>>>>> >>>>>>> Thanks for the KIP, Leah! >>>>>>> >>>>>>> Regarding (1): I'd go farther actually. Making Windows an abstract >>>>>>> class was a mistake from the beginning that led to us not being >>>>>>> able to fix a very confusing situation for users around retention >>>> times, >>>>>>> final results emitting, etc. Thus, I would not suggest extending >>>>>>> TimeWindows for sure, but would also not suggest extending Windows. >>>>>>> >>>>>>> The very simplest thing to do is follow the example of >> SessionWindows, >>>>>>> which is just a completely self-contained class. If we don't mess >> with >>>>>>> class inheritance, we won't ever have any of the problems related >> to >>>>>>> class inheritance. This is my preferred solution. >>>>>>> >>>>>>> Still, Sliding windows has a lot in common with TimeWindows and >> other >>>>>>> fixed-size windows, namely that the windows are fixed in size. If >> we >>>> want >>>>>>> to preserve the current two-part windowing API in which you can >> window >>>>>>> by either "fixed" or "data driven" modes, I'd suggest we avoid >>>> increasing >>>>>>> the blast radius of Windows by taking the opportunity to replace it >>>> with >>>>>>> a proper interface and implement that interface instead. >>>>>>> >>>>>>> For example: >>>>>>> https://github.com/apache/kafka/pull/9031 >>>>>>> >>>>>>> Then, SlidingWindows would just implement FixedSizeWindowDefinition >>>>>>> >>>>>>> ====== >>>>>>> >>>>>>> Regarding (2), it seems more straightforward as a user of Streams >>>>>>> to just have one mental model. _All_ of our aggregation operations >>>>>>> follow an eager emission model, in which we just emit an update >>>> whenever >>>>>>> an update is available. We already provided Suppression to >> explicitly >>>>>> apply >>>>>>> different update semantics in the case it's required. Why should we >>>>>> define >>>>>>> a snowflake operation with completely different semantics from >>>> everything >>>>>>> else? I.e., systems are generally easier to use when they follow a >> few >>>>>>> simple, composable rules than when they have a lot of different, >>>> specific >>>>>>> rules. >>>>>>> >>>>>>> >>>>>>> ====== >>>>>>> >>>>>>> New point: (4): >>>>>>> It would be nice to include some examples of user code that would >> use >>>> the >>>>>>> new API, which should include: >>>>>>> 1. using the DSL with the sliding window definition >>>>>>> 2. accessing the stored results of a sliding window aggregation >> via IQ >>>>>>> 3. defining a custom processor to access sliding windows in a store >>>>>>> >>>>>>> It generally helps reviewers wrap their heads around the proposal, >> as >>>>>> well >>>>>>> as shaking out any design issues that would otherwise only come up >>>> during >>>>>>> implementation/testing/review. >>>>>>> >>>>>>> Thanks again for the awesome proposal! >>>>>>> -John >>>>>>> >>>>>>> >>>>>>> On Tue, Jul 14, 2020, at 12:31, Guozhang Wang wrote: >>>>>>>> Hello Leah, >>>>>>>> >>>>>>>> Thanks for the nice written KIP. A few thoughts: >>>>>>>> >>>>>>>> 1) I echo the other reviewer's comments regarding the typing: why >>>>>>> extending >>>>>>>> TimeWindow instead of just extending Window? >>>>>>>> >>>>>>>> 2) I also feel that emitting policy for this type of windowing >>>>>>> aggregation >>>>>>>> may be different from the existing ones. Existing emitting policy >> is >>>>>> very >>>>>>>> simple: emit every time when window get updates, and emit every >> time >>>> on >>>>>>>> out-of-ordering data within grace period, this is because for >>>>>>> time-windows >>>>>>>> the window close time is strictly depend on the window start time >>>> which >>>>>>> is >>>>>>>> fixed, while for session-windows although the window open/close >> time >>>> is >>>>>>>> also data-dependent it is relatively infrequent compared to the >>>>>>>> sliding-windows. For this KIP, since each new data would cause a >>>>>>>> new sliding-window, the num. windows maintained logically could be >>>> much >>>>>>>> larger and hence emitting on each update may be too aggressive. >>>>>>>> >>>>>>>> 3) Although KIP itself should be focusing on user face >> interfaces, I'd >>>>>>>> suggest we create a children page of KIP-450 discussing about its >>>>>>>> implementations as well, since some of that may drive the >> interface >>>>>>> design. >>>>>>>> E.g. personally I think having a combiner interface in addition to >>>>>>>> aggregator would be useful but that's based on my 2cents about the >>>>>>>> implementation design (I once created a child page describing it: >>>>>>>> >>>>>>> >>>>>> >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation >>>>>>>> ). >>>>>>>> >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Jul 14, 2020 at 5:31 AM Bruno Cadonna <br...@confluent.io >>> >>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Leah, >>>>>>>>> >>>>>>>>> Thank you for the KIP! >>>>>>>>> >>>>>>>>> Here is my feedback: >>>>>>>>> >>>>>>>>> 1. The KIP would benefit from some code examples that show how >> to use >>>>>>>>> sliding windows in aggregations. >>>>>>>>> >>>>>>>>> 2. The different sliding windows in Figure 1 and 2 are really >> hard to >>>>>>>>> distinguish. Could you please try to make them graphically better >>>>>>>>> distinguishable? You could try to draw the frames of consecutive >>>>>>>>> windows shifted to each other. >>>>>>>>> >>>>>>>>> 3. I agree with Matthias, that extending Windows<TimeWindow> >> does not >>>>>>>>> seem to be the best approach. What would be the result of >>>>>>>>> windowsFor()? >>>>>>>>> >>>>>>>>> 4. In the section "Public Interfaces" you should remove >>>>>> implementation >>>>>>>>> details like private constructors and private fields. >>>>>>>>> >>>>>>>>> 5. Do we need a new store interface or can we use WindowStore? >> Some >>>>>>>>> words about that would be informative. >>>>>>>>> >>>>>>>>> 6. @Matthias, if the subtrator is not strictly needed, I would >> skip >>>>>> it >>>>>>>>> for now and add it later. >>>>>>>>> >>>>>>>>> 7. I also agree that having a section that describes how to >> handle >>>>>>>>> out-of-order records would be good to understand what is still >>>>>> missing >>>>>>>>> and what we can reuse. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Bruno >>>>>>>>> >>>>>>>>> On Sat, Jul 11, 2020 at 9:16 PM Matthias J. Sax < >> mj...@apache.org> >>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Leah, >>>>>>>>>> >>>>>>>>>> thanks for your update. However, it does not completely answer >> my >>>>>>>>> question. >>>>>>>>>> >>>>>>>>>> In our current window implementations, we emit a window result >>>>>> update >>>>>>>>>> record (ie, early/partial result) for each input record. When an >>>>>>>>>> out-of-order record arrives, we just update to corresponding old >>>>>>> window >>>>>>>>>> and emit another update. >>>>>>>>>> >>>>>>>>>> It's unclear from the KIP if you propose the same emit >> strategy? -- >>>>>>> For >>>>>>>>>> sliding windows it might be worth to consider to use a different >>>>>> emit >>>>>>>>>> strategy and only support emitting the final result only (ie, >> after >>>>>>> the >>>>>>>>>> grace period passed)? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Boyang, also raises a good point that relates to my point from >>>>>> above >>>>>>>>>> about pre-aggregations and storage layout. Our current time >> windows >>>>>>> are >>>>>>>>>> all pre-aggregated and stored in parallel. We can also lookup >>>>>> windows >>>>>>>>>> efficiently, as we can compute the windowed-key given the input >>>>>>> record >>>>>>>>>> key and timestamp based on the window definition. >>>>>>>>>> >>>>>>>>>> However, for sliding windows, window boundaries are data >> dependent >>>>>>> and >>>>>>>>>> thus we cannot compute them upfront. Thus, how can we "find" >>>>>> existing >>>>>>>>>> window efficiently? Furthermore, out-of-order data would create >> new >>>>>>>>>> windows in the past and we need to be able to handle this case. >>>>>>>>>> >>>>>>>>>> Thus, to handle out-of-order data correctly, we need to store >> all >>>>>> raw >>>>>>>>>> input events. Additionally, we could also store pre-aggregated >>>>>>> results >>>>>>>>>> if we thinks it's benfitial. -- If we apply "emit only final >>>>>> results" >>>>>>>>>> strategy, storing pre-aggregated result would not be necessary >>>>>>> though. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Btw: for sliding windows it might also be useful to consider >>>>>> allowing >>>>>>>>>> users to supply a `Subtractor` -- this subtractor could be >> applied >>>>>> on >>>>>>>>>> the current window result (in case we store it) if a record >> drops >>>>>>> out of >>>>>>>>>> the window. Of course, not all aggregation functions are >>>>>> subtractable >>>>>>>>>> and we can consider this as a follow up task, too, and not >> include >>>>>> in >>>>>>>>>> this KIP for now. Thoughts? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I was also thinking about the type hierarchy. I am not sure if >>>>>>> extending >>>>>>>>>> TimeWindow is the best approach? For TimeWindows, we can >>>>>> pre-compute >>>>>>>>>> window boundaries (cf `windowsFor()`) while for a sliding window >>>>>> the >>>>>>>>>> boundaries are data dependent. Session windows are also data >>>>>>> dependent >>>>>>>>>> and thus they don't inherit from TimeWindow (Maybe check out the >>>>>> KIP >>>>>>>>>> that added session windows? It could provides some good >> insights.) >>>>>>> -- I >>>>>>>>>> believe the same rational applies to sliding windows? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 7/10/20 12:47 PM, Boyang Chen wrote: >>>>>>>>>>> Thanks Leah and Sophie for the KIP. >>>>>>>>>>> >>>>>>>>>>> 1. I'm a bit surprised that we don't have an advance time. >> Could >>>>>> we >>>>>>>>>>> elaborate how the storage layer is structured? >>>>>>>>>>> >>>>>>>>>>> 2. IIUC, there will be extra cost in terms of fetching >>>>>> aggregation >>>>>>>>> results, >>>>>>>>>>> since we couldn't pre-aggregate until the user asks for it. >> Would >>>>>>> be >>>>>>>>> good >>>>>>>>>>> to also discuss it. >>>>>>>>>>> >>>>>>>>>>> 3. We haven't discussed the possibility of supporting sliding >>>>>>> windows >>>>>>>>>>> inherently. For a user who actually uses a hopping window, >>>>>> Streams >>>>>>>>> could >>>>>>>>>>> detect such an inefficiency doing a window_size/advance_time >>>>>> ratio >>>>>>> to >>>>>>>>> reach >>>>>>>>>>> a conclusion on whether the write amplification is too high >>>>>>> compared >>>>>>>>> with >>>>>>>>>>> some configured threshold. The benefit of doing so is that >>>>>> existing >>>>>>>>> Streams >>>>>>>>>>> users don't need to change their code, learn a new API, but >> only >>>>>> to >>>>>>>>> upgrade >>>>>>>>>>> Streams library to get benefits for their inefficient hopping >>>>>>> window >>>>>>>>>>> implementation. There might be some compatibility issues for >>>>>> sure, >>>>>>> but >>>>>>>>>>> worth listing them out for trade-off. >>>>>>>>>>> >>>>>>>>>>> Boyang >>>>>>>>>>> >>>>>>>>>>> On Fri, Jul 10, 2020 at 12:40 PM Leah Thomas < >>>>>> ltho...@confluent.io >>>>>>>> >>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hey Matthias, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for pointing that out. I added the following to the >>>>>> Propose >>>>>>>>> Changes >>>>>>>>>>>> section of the KIP: >>>>>>>>>>>> >>>>>>>>>>>> "Records that come out of order will be processed the same way >>>>>> as >>>>>>>>> in-order >>>>>>>>>>>> records, as long as they fall within the grace period. Any new >>>>>>> windows >>>>>>>>>>>> created by the late record will still be created, and the >>>>>> existing >>>>>>>>> windows >>>>>>>>>>>> that are changed by the late record will be updated. Any >> record >>>>>>> that >>>>>>>>> falls >>>>>>>>>>>> outside of the grace period (either user defined or default) >>>>>> will >>>>>>> be >>>>>>>>>>>> discarded. " >>>>>>>>>>>> >>>>>>>>>>>> All the best, >>>>>>>>>>>> Leah >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Jul 9, 2020 at 9:47 PM Matthias J. Sax < >>>>>> mj...@apache.org> >>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Leah, >>>>>>>>>>>>> >>>>>>>>>>>>> thanks a lot for the KIP. Very well written. >>>>>>>>>>>>> >>>>>>>>>>>>> The KIP does not talk about the handling of out-of-order data >>>>>>> though. >>>>>>>>>>>>> How do you propose to address this? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -Matthias >>>>>>>>>>>>> >>>>>>>>>>>>> On 7/8/20 5:33 PM, Leah Thomas wrote: >>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>> I'd like to kick-off the discussion for KIP-450, adding >>>>>> sliding >>>>>>>>> window >>>>>>>>>>>>>> aggregation support to Kafka Streams. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL >>>>>>>>>>>>>> >>>>>>>>>>>>>> Let me know what you think, >>>>>>>>>>>>>> Leah >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> -- Guozhang >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >>>> >>> >> >
signature.asc
Description: OpenPGP digital signature