Another minor tweak, instead of defining the window by the *size*, it will
be defined by *timeDifference*, which is the maximum time difference
between two events. This is a more precise way to define a window due to
its inclusive ends, while allowing the user to create the window they
expect. This definition fits with the current examples, where a record at
*10* would fall into a window of time difference *5* from *[5,10]*. This
window contains any records at 5, 6, 7, 8, 9, and 10, which is 6 instances
instead of 5. This semantic difference is why I've shifted *size* to
*timeDifference*.

The new builder will be *withTimeDifferenceAndGrace*, keeping with other
conventions.

Let me know if there are any concerns! The vote thread is open as well
here: http://mail-archives.apache.org/mod_mbox/kafka-dev/202007.mbox/browser

Best,
Leah

On Mon, Jul 27, 2020 at 3:58 PM Leah Thomas <ltho...@confluent.io> wrote:

> A small tweak - to make it more clear to users that grace is required, as
> well as cleaning up some of the confusing grammar semantics of windows, the
> main builder method for *slidingWindows* will be *withSizeAndGrace* instead
> of *of*.  This looks like it'll be the last change (for now) on the
> public API. If anyone has any comments or thoughts let me know. Otherwise,
> I'll take this to vote shortly.
>
> Best,
> Leah
>
> On Fri, Jul 24, 2020 at 3:45 PM Leah Thomas <ltho...@confluent.io> wrote:
>
>> To accommodate the change to a final class, I've added another
>> *windowedBy()* function in *CogroupedKStream.java *to handle
>> SlidingWindows.
>>
>> As far as the discussion goes, I think this is the last change we've
>> talked about. If anyone has other comments or concerns, please let me know!
>>
>> Cheers,
>> Leah
>>
>> On Thu, Jul 23, 2020 at 7:34 PM Leah Thomas <ltho...@confluent.io> wrote:
>>
>>> Thanks for the discussion about extending TimeWindows. I agree that
>>> making it future proof is important, and the implementation of
>>> SlidingWindows is unique enough that it seems logical to make it its own
>>> final class.
>>>
>>> On that note, I've updated the KIP to make SlidingWindows a stand alone
>>> final class, and added the *windowedBy() *API in *KGroupedStream *to
>>> handle SlidingWindows. It seems that SlidingWindows would still be able to
>>> leverage *TimeWindowedKStream by* creating a SlidingWindows version of
>>> *TimeWindowedKStreamImpl* that implements *TimeWindowedKStream. *If
>>> anyone sees issues with this implementation, please let me know.
>>>
>>> Best,
>>> Leah
>>>
>>> On Wed, Jul 22, 2020 at 10:47 PM John Roesler <vvcep...@apache.org>
>>> wrote:
>>>
>>>> Thanks for the reply, Sophie.
>>>>
>>>> That all sounds about right to me.
>>>>
>>>> The Windows “interface”/algorithm is quite flexible, so it makes sense
>>>> for it to be extensible. Different implementations can (and do) enumerate
>>>> different windows to suit different use cases.
>>>>
>>>> On the other hand, I can’t think of any way to extend SessionWindows to
>>>> do something different using the same algorithm, so it makes sense for it
>>>> to stay final.
>>>>
>>>> If we think SlidingWindows is similarly not usefully extensible, then
>>>> we can make it final. It’s easy to remove final later, but adding it is not
>>>> possible. Or we could go the other route and just make it an interface, on
>>>> general principle. Both of these choices are safe API design.
>>>>
>>>> Thanks again,
>>>> John
>>>>
>>>> On Wed, Jul 22, 2020, at 21:54, Sophie Blee-Goldman wrote:
>>>> > >
>>>> > > Users could pass in a custom `SessionWindows` as
>>>> > > long as the session algorithm works correctly for it.
>>>> >
>>>> >
>>>> > Well not really, SessionWindows is a final class. TimeWindows is also
>>>> a
>>>> > final class, so neither of these can be extended/customized. For a
>>>> given
>>>> > Windows then there would only be three (non-overlapping)
>>>> possibilities:
>>>> > either it's TimeWindows, SlidingWindows, or a custom  Windows. I don't
>>>> > think there's any problem with determining what the user wants in
>>>> this case
>>>> > --
>>>> > we would just check if it's a SlidingWindows and connect the new
>>>> processor,
>>>> > or else connect the existing hopping/tumbling window processor.
>>>> >
>>>> > I'll admit that last sentence does leave a bad taste in my mouth.
>>>> Part of
>>>> > that
>>>> > is probably the "leaking" API Matthias pointed out; we just assume the
>>>> > hopping/tumbling window implementation fits all custom windows. But I
>>>> guess
>>>> > if you really needed to customize the algorithm any further you may
>>>> as well
>>>> > stick in a transformer and do it all yourself.
>>>> >
>>>> > Anyways, given what we have, it does seem weird to apply one algorithm
>>>> > for most Windows types and then swap in a different one for one
>>>> specific
>>>> > extension of Windows. So adding a new #windowedBy(SlidingWindows)
>>>> > sounds reasonable to me.
>>>> >
>>>> > I'm still not convinced that we need a whole new TimeWindowedKStream
>>>> > equivalent class for sliding windows though. It seems like the
>>>> > hopping/tumbling
>>>> > window implementation could benefit just as much from a subtractor as
>>>> the
>>>> > sliding windows so the only future-proofing we get is the ability to
>>>> be
>>>> > lazy and
>>>> > add the subtractor to one but not the other. Of course it would only
>>>> be an
>>>> > optimization so we could just not apply it to one type and nothing
>>>> would
>>>> > break.
>>>> > It does make me nervous to go against the "future-proof" direction,
>>>> though.
>>>> > Are there any other examples of things we might want to add to one
>>>> window
>>>> > type but not to another?
>>>> >
>>>> > On another note, this actually brings up a new question: should
>>>> > SlidingWindows
>>>> > also be final? My take is "yes" since there's no reasonable
>>>> customization of
>>>> > sliding windows, at least not that I can think of. Thoughts?
>>>> >
>>>> >
>>>> > On Wed, Jul 22, 2020 at 7:15 PM John Roesler <vvcep...@apache.org>
>>>> wrote:
>>>> >
>>>> > > Thanks, all,
>>>> > >
>>>> > > I can see how my conclusion was kind of a leap.
>>>> > >
>>>> > > What Matthias said is indeed what I was thinking. When you provide a
>>>> > > window definition to the windowBy() method, you are selecting an
>>>> algorithm
>>>> > > that will be used to compute the windows from the input data.
>>>> > >
>>>> > > I didn’t mean the code implementation  “algorithm”, but the
>>>> high-level
>>>> > > algorithm that describes how the input stream will be transformed
>>>> into a
>>>> > > sequence of windows.
>>>> > >
>>>> > > For example, the algorithm for Windows is something like “given the
>>>> record
>>>> > > timestamp, include the record in each of the enumerated windows”.
>>>> Note that
>>>> > > there can be a lot of variation in how the windows are enumerated,
>>>> which is
>>>> > > why there are at least a couple of implementations of Windows, and
>>>> we are
>>>> > > open to adding more (like for natural calendar boundaries).
>>>> > >
>>>> > > For SessionWindows, it’s more like “if any window is within the gap,
>>>> > > extend its boundaries to include this record and if two windows
>>>> touch, then
>>>> > > merge them”.
>>>> > >
>>>> > > Clearly, the algorithm for SlidingWindows doesn’t fall into either
>>>> > > category, so it seems inappropriate to claim that it does in the
>>>> API (by
>>>> > > implementing Windows with stubbed methods) and then cast internally
>>>> to
>>>> > > execute a completely different algorithm.
>>>> > >
>>>> > > To your other observation, that the DSL object resulting from
>>>> windowBy
>>>> > > would look the same for Windows and SessionWindows, maybe it makes
>>>> sense
>>>> > > for windowBy(SessionWindows) also to return a TimeWindowedKStream.
>>>> > >
>>>> > > i.e.:
>>>> > > =======================
>>>> > > <W extends Window> TimeWindowedKStream<K, V> windowedBy(final
>>>> Windows<W>
>>>> > > windows);
>>>> > > TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows);
>>>> > > =======================
>>>> > >
>>>> > >  I can’t think of a reason this wouldn’t work. But then again, it
>>>> would be
>>>> > > more future-proof to go ahead and specify a different return type
>>>> now, if
>>>> > > we think we'll want to add subtractors and stuff later. I don't
>>>> have a
>>>> > > strong feeling about that part of the API. It seems to be
>>>> independent of
>>>> > > whether SlidingWindows extends Windows or not.
>>>> > >
>>>> > > Thanks,
>>>> > > -John
>>>> > >
>>>> > > On Wed, Jul 22, 2020, at 19:41, Matthias J. Sax wrote:
>>>> > > > I think what John tries to say is the following:
>>>> > > >
>>>> > > > We have `windowedBy(Windows)` that accept hopping/tumbling
>>>> windows but
>>>> > > > also custom window and we use a specific algorithm. Note, that
>>>> custom
>>>> > > > windows must "work" based on the used algorithm.
>>>> > > >
>>>> > > > For session windows we have `windowedBy(SessionWindows)` and
>>>> apply a
>>>> > > > different algorithm. Users could pass in a custom
>>>> `SessionWindows` as
>>>> > > > long as the session algorithm works correctly for it.
>>>> > > >
>>>> > > > For the new sliding windows, we want to use a different algorithm
>>>> > > > compare to hopping/tumbling windows. If we let sliding window
>>>> extend
>>>> > > > `Windows`, we can decide at runtime if we need to use the
>>>> > > > hopping/tumbling window algorithm for hopping/tumbling windows or
>>>> the
>>>> > > > new sliding window algorithm for sliding windows. However, if we
>>>> get a
>>>> > > > custom window, which algorithm do we pick now? The existing
>>>> > > > tumbling/hopping window algorithm of the new sliding window
>>>> algorithm?
>>>> > > > Both a custom "time-window" and custom "sliding window" implement
>>>> the
>>>> > > > generic `Windows` class and thus we cannot make a decision as we
>>>> don't
>>>> > > > know the user's intent.
>>>> > > >
>>>> > > > As a matter of fact, even if the user might not be aware of it,
>>>> the
>>>> > > > algorithm we use does already leak into the API (if a user extends
>>>> > > > `Windows` is must work with our hopping/tumbling window algorithm
>>>> and if
>>>> > > > a user extends `SessionWindows` it must work with our session
>>>> algorithm)
>>>> > > > and it seems we need to preserve this property for sliding window.
>>>> > > >
>>>> > > >
>>>> > > > -Matthias
>>>> > > >
>>>> > > > On 7/22/20 4:35 PM, Sophie Blee-Goldman wrote:
>>>> > > > > Hey John,
>>>> > > > >
>>>> > > > > Just a few follow-up questions/comments about the whole Windows
>>>> thing:
>>>> > > > >
>>>> > > > > That's a good way of looking at things; in particular the point
>>>> about
>>>> > > > > SessionWindows
>>>> > > > > for example requiring a Merger while other "statically
>>>> enumerable"
>>>> > > windows
>>>> > > > > require
>>>> > > > > only an adder seems to touch on the heart of the matter.
>>>> > > > >
>>>> > > > >  It seems like what Time and Universal (and any other Windows
>>>> > > > >> implementation) have in common is that the windows are
>>>> statically
>>>> > > > >> enumerable.
>>>> > > > >> As a consequence, they can all rely on an aggregation
>>>> maintenence
>>>> > > algorithm
>>>> > > > >> that involves enumerating each of the windows and updating it.
>>>> That
>>>> > > > >> also means that their DSL object (TimeWindowedKStream) doesn't
>>>> need
>>>> > > > >> "subtractors" or "mergers", but only "adders"; again, this is a
>>>> > > consequence
>>>> > > > >> of the fact that the windows are enumerable.
>>>> > > > >
>>>> > > > >
>>>> > > > > Given that, I'm a bit confused why you conclude that sliding
>>>> windows
>>>> > > are
>>>> > > > > fundamentally
>>>> > > > > different from the "statically enumerable" windows -- sliding
>>>> windows
>>>> > > > > require only an
>>>> > > > > adder too. I'm not sure it's a consequence of being enumerable,
>>>> or that
>>>> > > > > being enumerable
>>>> > > > > is the fundamental property that unites all Windows (ignoring
>>>> > > JoinWindows
>>>> > > > > here). Yes,  it
>>>> > > > > currently does apply to all Windows implementations, but we
>>>> shouldn't
>>>> > > > > assume that it
>>>> > > > > *has *to be that way on the basis that it currently happens to
>>>> be.
>>>> > > > >
>>>> > > > > Also, the fact that they can all rely on the same aggregation
>>>> algorithm
>>>> > > > > seems like an
>>>> > > > > implementation detail and it would be weird to force a
>>>> separate/new
>>>> > > DSL API
>>>> > > > > just because
>>>> > > > > under the covers we swap in a different processor.
>>>> > > > >
>>>> > > > > To be fair, I don't think there's a strong reason *against* not
>>>> > > extending
>>>> > > > > Windows -- in the end
>>>> > > > > it will just mean adding a new #windowedBy method and
>>>> copy/pasting
>>>> > > > > everything from
>>>> > > > >  TimeWindowedKStream pretty much word for word. But anytime you
>>>> find
>>>> > > > > yourself
>>>> > > > > copying over code almost exactly, there should probably be a
>>>> good
>>>> > > reason
>>>> > > > > why :)
>>>> > > > >
>>>> > > > >
>>>> > > > > On Wed, Jul 22, 2020 at 3:48 PM John Roesler <
>>>> vvcep...@apache.org>
>>>> > > wrote:
>>>> > > > >
>>>> > > > >> Thanks Leah!
>>>> > > > >>
>>>> > > > >> 5) Regarding the empty windows, I'm wondering if we should
>>>> simply
>>>> > > propose
>>>> > > > >> that the windows should not be emitted downstream of the
>>>> operator or
>>>> > > > >> visible in IQ. Then, it'll be up to the implementation to make
>>>> it
>>>> > > happen.
>>>> > > > >> I'm
>>>> > > > >> personally not concerned about it, since it seems like there
>>>> are
>>>> > > multiple
>>>> > > > >> ways to accomplish this.
>>>> > > > >>
>>>> > > > >> Note, the discrepancy Matthias pointed out is actually a
>>>> design bug.
>>>> > > The
>>>> > > > >> windowed aggregation (like every operation in Streams)
>>>> produces a
>>>> > > "view",
>>>> > > > >> which then forms the basis of downstream operations. When we
>>>> pass the
>>>> > > > >> Materialized option to the operation, all we're doing is
>>>> saying to
>>>> > > > >> "materialize"
>>>> > > > >> the view (aka, actually store the computed view) and also make
>>>> it
>>>> > > > >> queriable.
>>>> > > > >> It would be illegal for the "queriable, materialized view" to
>>>> differ
>>>> > > in
>>>> > > > >> any way
>>>> > > > >> from the "view". So, it seems we must either propose to emit
>>>> the empty
>>>> > > > >> windows AND make them visible in IQ, or propose NOT to emit
>>>> the empty
>>>> > > > >> windows AND NOT make them visible in IQ.
>>>> > > > >>
>>>> > > > >> 7) Regarding whether we can extend TimeWindows (or Windows):
>>>> > > > >> I've been mulling this over more. I think it's worth asking the
>>>> > > question of
>>>> > > > >> what these classes even mean. For example, why is
>>>> SessionWindows a
>>>> > > > >> different thing from TimeWindows and UniversalWindows (which
>>>> are both
>>>> > > > >> Windows)?
>>>> > > > >>
>>>> > > > >> This conversation is extra complicated because of the
>>>> incomplete and
>>>> > > > >> mis-matched class hierarchy, but we can try to look past it
>>>> for now.
>>>> > > > >>
>>>> > > > >> It seems like what Time and Universal (and any other Windows
>>>> > > > >> implementation) have in common is that the windows are
>>>> statically
>>>> > > > >> enumerable.
>>>> > > > >> As a consequence, they can all rely on an aggregation
>>>> maintenence
>>>> > > algorithm
>>>> > > > >> that involves enumerating each of the windows and updating it.
>>>> That
>>>> > > > >> also means that their DSL object (TimeWindowedKStream) doesn't
>>>> need
>>>> > > > >> "subtractors" or "mergers", but only "adders"; again, this is a
>>>> > > consequence
>>>> > > > >> of the fact that the windows are enumerable.
>>>> > > > >>
>>>> > > > >> In contrast, session windows are data driven, so they are not
>>>> > > statically
>>>> > > > >> enumerable. Their algorithm has to rely on scans, and to do
>>>> the scans,
>>>> > > > >> it needs to know the "inactivity gap", which needs to be part
>>>> of the
>>>> > > window
>>>> > > > >> definition. Likewise, session windows have the property that
>>>> they need
>>>> > > > >> to be merged, so their DSL object also requires mergers.
>>>> > > > >>
>>>> > > > >> It really seems like your new window definition doesn't fit
>>>> into
>>>> > > either
>>>> > > > >> category. It uses a different algorithm, which relies on
>>>> scans, but
>>>> > > it is
>>>> > > > >> also fixed in size, so it doesn't need mergers. In this
>>>> situation, it
>>>> > > seems
>>>> > > > >> like the safe bet is to just create SessionWindows with no
>>>> interface
>>>> > > and
>>>> > > > >> add a separate set of DSL operations and objects. It's a
>>>> little extra
>>>> > > code,
>>>> > > > >> but it seems to keep everything tidier and more
>>>> comprehensible, both
>>>> > > > >> for us and for users.
>>>> > > > >>
>>>> > > > >> What do you think?
>>>> > > > >> -John
>>>> > > > >>
>>>> > > > >>
>>>> > > > >>
>>>> > > > >> On Wed, Jul 22, 2020, at 10:30, Leah Thomas wrote:
>>>> > > > >>> Hi Matthias,
>>>> > > > >>>
>>>> > > > >>> Thanks for the suggestions, I've updated the KIP and child
>>>> page
>>>> > > > >> accordingly
>>>> > > > >>> and addressed some below.
>>>> > > > >>>
>>>> > > > >>> 1) For the mandatory grace period, we should use a static
>>>> builder
>>>> > > method
>>>> > > > >>>> that take two parameters.
>>>> > > > >>>>
>>>> > > > >>>
>>>> > > > >>>  That makes sense, I've changed that in the public API.
>>>> > > > >>>
>>>> > > > >>> Btw: this implementation actually raises an issue for IQ:
>>>> those empty
>>>> > > > >>>> windows would be returned.
>>>> > > > >>>
>>>> > > > >>>
>>>> > > > >>> This is a great point, with the current implementation plan
>>>> empty
>>>> > > windows
>>>> > > > >>> would be returned. I think creating a second window store
>>>> would
>>>> > > > >> definitely
>>>> > > > >>> work, but there would be more overhead in having two stores
>>>> and
>>>> > > switching
>>>> > > > >>> windows between the stores, as well as doing scans in both
>>>> stores to
>>>> > > find
>>>> > > > >>> existing windows. There might be a way to do avoid emitting
>>>> empty
>>>> > > windows
>>>> > > > >>> without creating a second window store, I'll look more into
>>>> it.
>>>> > > > >>>
>>>> > > > >>> Cheers,
>>>> > > > >>> Leah
>>>> > > > >>>
>>>> > > > >>> On Tue, Jul 21, 2020 at 1:25 PM Matthias J. Sax <
>>>> mj...@apache.org>
>>>> > > > >> wrote:
>>>> > > > >>>
>>>> > > > >>>> Thanks for updating the KIP.
>>>> > > > >>>>
>>>> > > > >>>> Couple of follow up comments:
>>>> > > > >>>>
>>>> > > > >>>> 1) For the mandatory grace period, we should use a static
>>>> builder
>>>> > > > >> method
>>>> > > > >>>> that take two parameters. This provides a better API as user
>>>> cannot
>>>> > > > >>>> forget to set the grace period. Throwing a runtime exception
>>>> seems
>>>> > > not
>>>> > > > >>>> to be the best way to handle this case.
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>> 2) In Fig.2 you list 10 hopping windows. I believe it should
>>>> > > actually
>>>> > > > >> be
>>>> > > > >>>> more? There first hopping window would be [-6,-4[ and the
>>>> last one
>>>> > > > >> would
>>>> > > > >>>> be from [19,29[ -- hence, the cost saving are actually much
>>>> higher.
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>> 3a) IQ: you are saying that the user need to compute the
>>>> start time
>>>> > > as
>>>> > > > >>>>
>>>> > > > >>>>> windowSize+the time they're looking at
>>>> > > > >>>>
>>>> > > > >>>> Should this be "targetTime - windowSize" instead?
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>> 3b) IQ: in you example you say "window size of 10 minutes"
>>>> with an
>>>> > > > >>>> incident at 9:15.
>>>> > > > >>>>
>>>> > > > >>>>> they're looking for a window with the start time of 8:15.
>>>> > > > >>>>
>>>> > > > >>>> The example does not seem to add up?
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>> 4) For "Processing Windows": you describe a three step
>>>> approach: I
>>>> > > just
>>>> > > > >>>> want to point out, that step (1) is not necessary for each
>>>> input
>>>> > > > >> record,
>>>> > > > >>>> because timestamps are not guaranteed to be unique and thus a
>>>> > > previous
>>>> > > > >>>> record with the same key and timestamp might have create the
>>>> windows
>>>> > > > >>>> already.
>>>> > > > >>>>
>>>> > > > >>>> Nit: I am also not exactly sure what you mean by step (3) as
>>>> you use
>>>> > > > >> the
>>>> > > > >>>> word "send". I guess you mean "put"?
>>>> > > > >>>>
>>>> > > > >>>> It seem there are actually more details in the sub-page:
>>>> > > > >>>>
>>>> > > > >>>>> A new record for SlidingWindows will always create two new
>>>> windows.
>>>> > > > >> If
>>>> > > > >>>> either of those windows already exist in the windows store,
>>>> their
>>>> > > > >>>> aggregation will simply be updated to include the new
>>>> record, but no
>>>> > > > >>>> duplicate window will be added to the WindowStore.
>>>> > > > >>>>
>>>> > > > >>>> However, the first and second sentence contradict each other
>>>> a
>>>> > > little
>>>> > > > >>>> bit. I think the first sentence is not correct.
>>>> > > > >>>>
>>>> > > > >>>> Nit:
>>>> > > > >>>>
>>>> > > > >>>>> For in-order records, the left window will always be empty.
>>>> > > > >>>>
>>>> > > > >>>> This should be "right window" ?
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>> 5) "Emitting Results": it might be worth to point out, that a
>>>> > > > >>>> second/future window of a new record is create with no
>>>> records, and
>>>> > > > >>>> thus, even if it's initialized it won't be emitted. Only if a
>>>> > > > >>>> consecutive record falls into the window, the window would be
>>>> > > updates
>>>> > > > >>>> and the window result (for a window content of one record)
>>>> would be
>>>> > > > >> sent
>>>> > > > >>>> downstream.
>>>> > > > >>>>
>>>> > > > >>>> Again, the sub-page contains this details. Might still be
>>>> worth to
>>>> > > add
>>>> > > > >>>> to the top level page, too.
>>>> > > > >>>>
>>>> > > > >>>> Btw: this implementation actually raises an issue for IQ:
>>>> those
>>>> > > empty
>>>> > > > >>>> windows would be returned. Thus I am wondering if we need to
>>>> use two
>>>> > > > >>>> stores internally? One store for actual windows and one
>>>> store for
>>>> > > empty
>>>> > > > >>>> windows? If an empty window is updated, it's move to the
>>>> other
>>>> > > store?
>>>> > > > >>>> For IQ, we only allow to query the non-empty-window store?
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>> 6) On the sub-page:
>>>> > > > >>>>
>>>> > > > >>>>> The left window of in-order records and both windows for
>>>> > > out-of-order
>>>> > > > >>>> records need to be updated with the values of records that
>>>> have
>>>> > > already
>>>> > > > >>>> been processed.
>>>> > > > >>>>
>>>> > > > >>>> Why "both windows for out-of-order records"? IMHO, we don't
>>>> know how
>>>> > > > >>>> many existing windows needs to be updated when processing an
>>>> > > > >>>> out-of-order record. Of course, an out-of-order record could
>>>> not
>>>> > > fall
>>>> > > > >>>> into any existing window but create two new windows, too.
>>>> > > > >>>>
>>>> > > > >>>>>  Because each record creates one new window that includes
>>>> itself
>>>> > > and
>>>> > > > >> one
>>>> > > > >>>> window that does not
>>>> > > > >>>>
>>>> > > > >>>> As state above, this does not seem to hold. I understand why
>>>> you
>>>> > > mean,
>>>> > > > >>>> but it would be good to be exact.
>>>> > > > >>>>
>>>> > > > >>>> Figure 2: You use the term "late" but you mean
>>>> "out-of-order" I
>>>> > > guess
>>>> > > > >> --
>>>> > > > >>>> a record is _late_ if it's not processed any longer as the
>>>> grace
>>>> > > period
>>>> > > > >>>> passed already.
>>>> > > > >>>>
>>>> > > > >>>> Figure 2: "Late" should be out-or-order. The example text
>>>> say a
>>>> > > window
>>>> > > > >>>> [16,26] should be created but the figure shows the green
>>>> window as
>>>> > > > >> [15,20].
>>>> > > > >>>>
>>>> > > > >>>> About the blue window: maybe add not that the blue window
>>>> contains
>>>> > > the
>>>> > > > >>>> aggregate we need for the green window, _before_ the new
>>>> record `a`
>>>> > > is
>>>> > > > >>>> added to the blue window.
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>> 7) I am not really happy to extend TimeWindows and I think
>>>> the
>>>> > > argument
>>>> > > > >>>> about JoinWindows is not the best (IMHO, JoinWindows do it
>>>> already
>>>> > > > >> wrong
>>>> > > > >>>> and we just repeat the same mistake). However, it seems our
>>>> window
>>>> > > > >>>> hierarchy is "broken" already and it might be out of scope
>>>> for this
>>>> > > KIP
>>>> > > > >>>> to fix it. Hence, I am ok that we bite the bullet for now
>>>> and clean
>>>> > > it
>>>> > > > >>>> up later.
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>> -Matthias
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>> On 7/20/20 5:18 PM, Guozhang Wang wrote:
>>>> > > > >>>>> Hi Leah,
>>>> > > > >>>>>
>>>> > > > >>>>> Thanks for the updated KIP. I agree that extending
>>>> SlidingWindows
>>>> > > > >> from
>>>> > > > >>>>> Windows is fine for the sake of not introducing more public
>>>> APIs
>>>> > > (and
>>>> > > > >>>> their
>>>> > > > >>>>> internal xxxImpl classes), and its cons is small enough to
>>>> tolerate
>>>> > > > >> to
>>>> > > > >>>> me.
>>>> > > > >>>>>
>>>> > > > >>>>>
>>>> > > > >>>>> Guozhang
>>>> > > > >>>>>
>>>> > > > >>>>>
>>>> > > > >>>>> On Mon, Jul 20, 2020 at 1:49 PM Leah Thomas <
>>>> ltho...@confluent.io>
>>>> > > > >>>> wrote:
>>>> > > > >>>>>
>>>> > > > >>>>>> Hi all,
>>>> > > > >>>>>>
>>>> > > > >>>>>> Thanks for the feedback on the KIP. I've updated the KIP
>>>> page
>>>> > > > >>>>>> <
>>>> > > > >>>>>>
>>>> > > > >>>>
>>>> > > > >>
>>>> > >
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
>>>> > > > >>>>>>>
>>>> > > > >>>>>> to address these points and have created a child page
>>>> > > > >>>>>> <
>>>> > > > >>>>>>
>>>> > > > >>>>
>>>> > > > >>
>>>> > >
>>>> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450
>>>> > > > >>>>>>>
>>>> > > > >>>>>> to go more in depth on certain implementation details.
>>>> > > > >>>>>>
>>>> > > > >>>>>> *Grace Period:*
>>>> > > > >>>>>> I think Sophie raises a good point that the default grace
>>>> period
>>>> > > of
>>>> > > > >> 24
>>>> > > > >>>>>> hours is often too long and was chosen when retention time
>>>> and
>>>> > > grace
>>>> > > > >>>> period
>>>> > > > >>>>>> were the same. For SlidingWindows, I propose we make the
>>>> grace
>>>> > > > >> period
>>>> > > > >>>>>> mandatory. To keep formatting consistent with other types
>>>> of
>>>> > > > >> windows,
>>>> > > > >>>> grace
>>>> > > > >>>>>> period won't be an additional parameter in the #of method,
>>>> but
>>>> > > will
>>>> > > > >>>> still
>>>> > > > >>>>>> look like it does in other use cases:
>>>> > > > >>>>>>
>>>> .windowedBy(SlidingWindows.of(twentySeconds).grace(fiftySeconds).
>>>> > > If
>>>> > > > >>>> grace
>>>> > > > >>>>>> period isn't properly initialized, an error will be thrown
>>>> through
>>>> > > > >> the
>>>> > > > >>>>>> process method.
>>>> > > > >>>>>>
>>>> > > > >>>>>> *Storage Layer + Aggregation:*
>>>> > > > >>>>>> SlidingWindows will use a WindowStore because computation
>>>> can be
>>>> > > > >> done
>>>> > > > >>>> with
>>>> > > > >>>>>> the information stored in a WindowStore (window timestamp
>>>> and
>>>> > > > >> value).
>>>> > > > >>>> Using
>>>> > > > >>>>>> the WindowStore also simplifies computation as
>>>> SlidingWindows can
>>>> > > > >>>> leverage
>>>> > > > >>>>>> existing processes. Because we are using a WindowStore, the
>>>> > > > >> aggregation
>>>> > > > >>>>>> process will be similar to that of a hopping window. As
>>>> records
>>>> > > > >> come in
>>>> > > > >>>>>> their value is added to the aggregation that already
>>>> exists,
>>>> > > > >> following
>>>> > > > >>>> the
>>>> > > > >>>>>> same procedure as hopping windows. The aggregation
>>>> difference
>>>> > > > >> between
>>>> > > > >>>>>> SlidingWindows and HoppingWindows comes in creating new
>>>> windows
>>>> > > for
>>>> > > > >> a
>>>> > > > >>>>>> SlidingWindow, where you need to find the existing records
>>>> that
>>>> > > > >> belong
>>>> > > > >>>> to
>>>> > > > >>>>>> the new window. This computation is similar to the
>>>> aggregation in
>>>> > > > >>>>>> SessionWindows and requires a scan to the WindowStore to
>>>> find the
>>>> > > > >> window
>>>> > > > >>>>>> with the aggregation needed, which will always be
>>>> pre-computed.
>>>> > > The
>>>> > > > >> scan
>>>> > > > >>>>>> requires creating an iterator, but should have minimal
>>>> performance
>>>> > > > >>>> effects
>>>> > > > >>>>>> as this strategy is already implemented in SessionWindows.
>>>> More
>>>> > > > >> details
>>>> > > > >>>> on
>>>> > > > >>>>>> finding the aggregation that needs to be put in a new
>>>> window can
>>>> > > be
>>>> > > > >>>> found
>>>> > > > >>>>>> on the implementation page
>>>> > > > >>>>>> <
>>>> > > > >>>>>>
>>>> > > > >>>>
>>>> > > > >>
>>>> > >
>>>> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450
>>>> > > > >>>>>>>
>>>> > > > >>>>>> .
>>>> > > > >>>>>>
>>>> > > > >>>>>> *Extending Windows<TimeWindow>, Windows<Window> or nothing*
>>>> > > > >>>>>> Because SlidingWindows are still defined by a windowSize
>>>> (whereas
>>>> > > > >>>>>> SessionWindows are defined purely by data), I think it
>>>> makes sense
>>>> > > > >> to
>>>> > > > >>>>>> leverage the existing Window processes instead of creating
>>>> a new
>>>> > > > >> store
>>>> > > > >>>> type
>>>> > > > >>>>>> that would be very similar to the WindowStore. While it's
>>>> true
>>>> > > that
>>>> > > > >> the
>>>> > > > >>>>>> #windowsFor method isn't necessary for SlidingWindows,
>>>> JoinWindows
>>>> > > > >> also
>>>> > > > >>>>>> extends Windows<Window> and throws an
>>>> > > UnsupportedOperationException
>>>> > > > >> in
>>>> > > > >>>> the
>>>> > > > >>>>>> #windowsFor method, which is what SlidingWindows can do.
>>>> The
>>>> > > > >> difference
>>>> > > > >>>>>> between extending Windows<TimeWindow> or Windows<Window> is
>>>> > > > >> minimal, as
>>>> > > > >>>>>> both are ways to pass window parameters. Extending
>>>> > > > >> Windows<TimeWindow>
>>>> > > > >>>> will
>>>> > > > >>>>>> give us more leverage in utilizing existing processes.
>>>> > > > >>>>>>
>>>> > > > >>>>>> *Emit Strategy*
>>>> > > > >>>>>> I would argue that emitting for every update is still the
>>>> best way
>>>> > > > >> to go
>>>> > > > >>>>>> for SlidingWindows because it mimics the other types of
>>>> windows,
>>>> > > and
>>>> > > > >>>>>> suppression can be leveraged to limit what SlidingWindows
>>>> emits.
>>>> > > > >> While
>>>> > > > >>>> some
>>>> > > > >>>>>> users may only want to see the last value, others may want
>>>> to see
>>>> > > > >> more,
>>>> > > > >>>> and
>>>> > > > >>>>>> leaving the emit strategy to emit partial results allows
>>>> both
>>>> > > users
>>>> > > > >> to
>>>> > > > >>>>>> access what they want.
>>>> > > > >>>>>>
>>>> > > > >>>>>> *Additional Features*
>>>> > > > >>>>>> Supporting sliding windows inherently, and shifting
>>>> inefficient
>>>> > > > >> hopping
>>>> > > > >>>>>> windows to sliding windows, is an interesting idea and
>>>> could be
>>>> > > > >> built on
>>>> > > > >>>>>> top of SlidingWindows when they are finished, but right
>>>> now seems
>>>> > > > >> out of
>>>> > > > >>>>>> scope for the needs of this KIP. Similarly, including a
>>>> > > > >> `subtraction`
>>>> > > > >>>>>> feature could have performance improvements, but doesn't
>>>> seem
>>>> > > > >> necessary
>>>> > > > >>>> for
>>>> > > > >>>>>> the implementation of this KIP.
>>>> > > > >>>>>>
>>>> > > > >>>>>> Let me know what you think of the updates,
>>>> > > > >>>>>>
>>>> > > > >>>>>> Leah
>>>> > > > >>>>>>
>>>> > > > >>>>>> On Thu, Jul 16, 2020 at 11:57 AM John Roesler <
>>>> > > vvcep...@apache.org>
>>>> > > > >>>> wrote:
>>>> > > > >>>>>>
>>>> > > > >>>>>>> Hello all,
>>>> > > > >>>>>>>
>>>> > > > >>>>>>> Thanks for the KIP, Leah!
>>>> > > > >>>>>>>
>>>> > > > >>>>>>> Regarding (1): I'd go farther actually. Making Windows an
>>>> > > abstract
>>>> > > > >>>>>>> class was a mistake from the beginning that led to us not
>>>> being
>>>> > > > >>>>>>> able to fix a very confusing situation for users around
>>>> retention
>>>> > > > >>>> times,
>>>> > > > >>>>>>> final results emitting, etc. Thus, I would not suggest
>>>> extending
>>>> > > > >>>>>>> TimeWindows for sure, but would also not suggest extending
>>>> > > Windows.
>>>> > > > >>>>>>>
>>>> > > > >>>>>>> The very simplest thing to do is follow the example of
>>>> > > > >> SessionWindows,
>>>> > > > >>>>>>> which is just a completely self-contained class. If we
>>>> don't mess
>>>> > > > >> with
>>>> > > > >>>>>>> class inheritance, we won't ever have any of the problems
>>>> related
>>>> > > > >> to
>>>> > > > >>>>>>> class inheritance. This is my preferred solution.
>>>> > > > >>>>>>>
>>>> > > > >>>>>>> Still, Sliding windows has a lot in common with
>>>> TimeWindows and
>>>> > > > >> other
>>>> > > > >>>>>>> fixed-size windows, namely that the windows are fixed in
>>>> size. If
>>>> > > > >> we
>>>> > > > >>>> want
>>>> > > > >>>>>>> to preserve the current two-part windowing API in which
>>>> you can
>>>> > > > >> window
>>>> > > > >>>>>>> by either "fixed" or "data driven" modes, I'd suggest we
>>>> avoid
>>>> > > > >>>> increasing
>>>> > > > >>>>>>> the blast radius of Windows by taking the opportunity to
>>>> replace
>>>> > > it
>>>> > > > >>>> with
>>>> > > > >>>>>>> a proper interface and implement that interface instead.
>>>> > > > >>>>>>>
>>>> > > > >>>>>>> For example:
>>>> > > > >>>>>>> https://github.com/apache/kafka/pull/9031
>>>> > > > >>>>>>>
>>>> > > > >>>>>>> Then, SlidingWindows would just implement
>>>> > > FixedSizeWindowDefinition
>>>> > > > >>>>>>>
>>>> > > > >>>>>>> ======
>>>> > > > >>>>>>>
>>>> > > > >>>>>>> Regarding (2), it seems more straightforward as a user of
>>>> Streams
>>>> > > > >>>>>>> to just have one mental model. _All_ of our aggregation
>>>> > > operations
>>>> > > > >>>>>>> follow an eager emission model, in which we just emit an
>>>> update
>>>> > > > >>>> whenever
>>>> > > > >>>>>>> an update is available. We already provided Suppression to
>>>> > > > >> explicitly
>>>> > > > >>>>>> apply
>>>> > > > >>>>>>> different update semantics in the case it's required. Why
>>>> should
>>>> > > we
>>>> > > > >>>>>> define
>>>> > > > >>>>>>> a snowflake operation with completely different semantics
>>>> from
>>>> > > > >>>> everything
>>>> > > > >>>>>>> else? I.e., systems are generally easier to use when they
>>>> follow
>>>> > > a
>>>> > > > >> few
>>>> > > > >>>>>>> simple, composable rules than when they have a lot of
>>>> different,
>>>> > > > >>>> specific
>>>> > > > >>>>>>> rules.
>>>> > > > >>>>>>>
>>>> > > > >>>>>>>
>>>> > > > >>>>>>> ======
>>>> > > > >>>>>>>
>>>> > > > >>>>>>> New point: (4):
>>>> > > > >>>>>>> It would be nice to include some examples of user code
>>>> that would
>>>> > > > >> use
>>>> > > > >>>> the
>>>> > > > >>>>>>> new API, which should include:
>>>> > > > >>>>>>> 1. using the DSL with the sliding window definition
>>>> > > > >>>>>>> 2. accessing the stored results of a sliding window
>>>> aggregation
>>>> > > > >> via IQ
>>>> > > > >>>>>>> 3. defining a custom processor to access sliding windows
>>>> in a
>>>> > > store
>>>> > > > >>>>>>>
>>>> > > > >>>>>>> It generally helps reviewers wrap their heads around the
>>>> > > proposal,
>>>> > > > >> as
>>>> > > > >>>>>> well
>>>> > > > >>>>>>> as shaking out any design issues that would otherwise
>>>> only come
>>>> > > up
>>>> > > > >>>> during
>>>> > > > >>>>>>> implementation/testing/review.
>>>> > > > >>>>>>>
>>>> > > > >>>>>>> Thanks again for the awesome proposal!
>>>> > > > >>>>>>> -John
>>>> > > > >>>>>>>
>>>> > > > >>>>>>>
>>>> > > > >>>>>>> On Tue, Jul 14, 2020, at 12:31, Guozhang Wang wrote:
>>>> > > > >>>>>>>> Hello Leah,
>>>> > > > >>>>>>>>
>>>> > > > >>>>>>>> Thanks for the nice written KIP. A few thoughts:
>>>> > > > >>>>>>>>
>>>> > > > >>>>>>>> 1) I echo the other reviewer's comments regarding the
>>>> typing:
>>>> > > why
>>>> > > > >>>>>>> extending
>>>> > > > >>>>>>>> TimeWindow instead of just extending Window?
>>>> > > > >>>>>>>>
>>>> > > > >>>>>>>> 2) I also feel that emitting policy for this type of
>>>> windowing
>>>> > > > >>>>>>> aggregation
>>>> > > > >>>>>>>> may be different from the existing ones. Existing
>>>> emitting
>>>> > > policy
>>>> > > > >> is
>>>> > > > >>>>>> very
>>>> > > > >>>>>>>> simple: emit every time when window get updates, and
>>>> emit every
>>>> > > > >> time
>>>> > > > >>>> on
>>>> > > > >>>>>>>> out-of-ordering data within grace period, this is
>>>> because for
>>>> > > > >>>>>>> time-windows
>>>> > > > >>>>>>>> the window close time is strictly depend on the window
>>>> start
>>>> > > time
>>>> > > > >>>> which
>>>> > > > >>>>>>> is
>>>> > > > >>>>>>>> fixed, while for session-windows although the window
>>>> open/close
>>>> > > > >> time
>>>> > > > >>>> is
>>>> > > > >>>>>>>> also data-dependent it is relatively infrequent compared
>>>> to the
>>>> > > > >>>>>>>> sliding-windows. For this KIP, since each new data would
>>>> cause a
>>>> > > > >>>>>>>> new sliding-window, the num. windows maintained
>>>> logically could
>>>> > > be
>>>> > > > >>>> much
>>>> > > > >>>>>>>> larger and hence emitting on each update may be too
>>>> aggressive.
>>>> > > > >>>>>>>>
>>>> > > > >>>>>>>> 3) Although KIP itself should be focusing on user face
>>>> > > > >> interfaces, I'd
>>>> > > > >>>>>>>> suggest we create a children page of KIP-450 discussing
>>>> about
>>>> > > its
>>>> > > > >>>>>>>> implementations as well, since some of that may drive the
>>>> > > > >> interface
>>>> > > > >>>>>>> design.
>>>> > > > >>>>>>>> E.g. personally I think having a combiner interface in
>>>> addition
>>>> > > to
>>>> > > > >>>>>>>> aggregator would be useful but that's based on my 2cents
>>>> about
>>>> > > the
>>>> > > > >>>>>>>> implementation design (I once created a child page
>>>> describing
>>>> > > it:
>>>> > > > >>>>>>>>
>>>> > > > >>>>>>>
>>>> > > > >>>>>>
>>>> > > > >>>>
>>>> > > > >>
>>>> > >
>>>> https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation
>>>> > > > >>>>>>>> ).
>>>> > > > >>>>>>>>
>>>> > > > >>>>>>>>
>>>> > > > >>>>>>>> Guozhang
>>>> > > > >>>>>>>>
>>>> > > > >>>>>>>>
>>>> > > > >>>>>>>>
>>>> > > > >>>>>>>>
>>>> > > > >>>>>>>> On Tue, Jul 14, 2020 at 5:31 AM Bruno Cadonna <
>>>> > > br...@confluent.io
>>>> > > > >>>
>>>> > > > >>>>>>> wrote:
>>>> > > > >>>>>>>>
>>>> > > > >>>>>>>>> Hi Leah,
>>>> > > > >>>>>>>>>
>>>> > > > >>>>>>>>> Thank you for the KIP!
>>>> > > > >>>>>>>>>
>>>> > > > >>>>>>>>> Here is my feedback:
>>>> > > > >>>>>>>>>
>>>> > > > >>>>>>>>> 1. The KIP would benefit from some code examples that
>>>> show how
>>>> > > > >> to use
>>>> > > > >>>>>>>>> sliding windows in aggregations.
>>>> > > > >>>>>>>>>
>>>> > > > >>>>>>>>> 2. The different sliding windows in Figure 1 and 2 are
>>>> really
>>>> > > > >> hard to
>>>> > > > >>>>>>>>> distinguish. Could you please try to make them
>>>> graphically
>>>> > > better
>>>> > > > >>>>>>>>> distinguishable? You could try to draw the frames of
>>>> > > consecutive
>>>> > > > >>>>>>>>> windows shifted to each other.
>>>> > > > >>>>>>>>>
>>>> > > > >>>>>>>>> 3. I agree with Matthias, that extending
>>>> Windows<TimeWindow>
>>>> > > > >> does not
>>>> > > > >>>>>>>>> seem to be the best approach. What would be the result
>>>> of
>>>> > > > >>>>>>>>> windowsFor()?
>>>> > > > >>>>>>>>>
>>>> > > > >>>>>>>>> 4. In the section "Public Interfaces" you should remove
>>>> > > > >>>>>> implementation
>>>> > > > >>>>>>>>> details like private constructors and private fields.
>>>> > > > >>>>>>>>>
>>>> > > > >>>>>>>>> 5. Do we need a new store interface or can we use
>>>> WindowStore?
>>>> > > > >> Some
>>>> > > > >>>>>>>>> words about that would be informative.
>>>> > > > >>>>>>>>>
>>>> > > > >>>>>>>>> 6. @Matthias, if the subtrator is not strictly needed,
>>>> I would
>>>> > > > >> skip
>>>> > > > >>>>>> it
>>>> > > > >>>>>>>>> for now and add it later.
>>>> > > > >>>>>>>>>
>>>> > > > >>>>>>>>> 7. I also agree that having a section that describes
>>>> how to
>>>> > > > >> handle
>>>> > > > >>>>>>>>> out-of-order records would be good to understand what
>>>> is still
>>>> > > > >>>>>> missing
>>>> > > > >>>>>>>>> and what we can reuse.
>>>> > > > >>>>>>>>>
>>>> > > > >>>>>>>>> Best,
>>>> > > > >>>>>>>>> Bruno
>>>> > > > >>>>>>>>>
>>>> > > > >>>>>>>>> On Sat, Jul 11, 2020 at 9:16 PM Matthias J. Sax <
>>>> > > > >> mj...@apache.org>
>>>> > > > >>>>>>> wrote:
>>>> > > > >>>>>>>>>>
>>>> > > > >>>>>>>>>> Leah,
>>>> > > > >>>>>>>>>>
>>>> > > > >>>>>>>>>> thanks for your update. However, it does not
>>>> completely answer
>>>> > > > >> my
>>>> > > > >>>>>>>>> question.
>>>> > > > >>>>>>>>>>
>>>> > > > >>>>>>>>>> In our current window implementations, we emit a
>>>> window result
>>>> > > > >>>>>> update
>>>> > > > >>>>>>>>>> record (ie, early/partial result) for each input
>>>> record. When
>>>> > > an
>>>> > > > >>>>>>>>>> out-of-order record arrives, we just update to
>>>> corresponding
>>>> > > old
>>>> > > > >>>>>>> window
>>>> > > > >>>>>>>>>> and emit another update.
>>>> > > > >>>>>>>>>>
>>>> > > > >>>>>>>>>> It's unclear from the KIP if you propose the same emit
>>>> > > > >> strategy? --
>>>> > > > >>>>>>> For
>>>> > > > >>>>>>>>>> sliding windows it might be worth to consider to use a
>>>> > > different
>>>> > > > >>>>>> emit
>>>> > > > >>>>>>>>>> strategy and only support emitting the final result
>>>> only (ie,
>>>> > > > >> after
>>>> > > > >>>>>>> the
>>>> > > > >>>>>>>>>> grace period passed)?
>>>> > > > >>>>>>>>>>
>>>> > > > >>>>>>>>>>
>>>> > > > >>>>>>>>>>
>>>> > > > >>>>>>>>>> Boyang, also raises a good point that relates to my
>>>> point from
>>>> > > > >>>>>> above
>>>> > > > >>>>>>>>>> about pre-aggregations and storage layout. Our current
>>>> time
>>>> > > > >> windows
>>>> > > > >>>>>>> are
>>>> > > > >>>>>>>>>> all pre-aggregated and stored in parallel. We can also
>>>> lookup
>>>> > > > >>>>>> windows
>>>> > > > >>>>>>>>>> efficiently, as we can compute the windowed-key given
>>>> the
>>>> > > input
>>>> > > > >>>>>>> record
>>>> > > > >>>>>>>>>> key and timestamp based on the window definition.
>>>> > > > >>>>>>>>>>
>>>> > > > >>>>>>>>>> However, for sliding windows, window boundaries are
>>>> data
>>>> > > > >> dependent
>>>> > > > >>>>>>> and
>>>> > > > >>>>>>>>>> thus we cannot compute them upfront. Thus, how can we
>>>> "find"
>>>> > > > >>>>>> existing
>>>> > > > >>>>>>>>>> window efficiently? Furthermore, out-of-order data
>>>> would
>>>> > > create
>>>> > > > >> new
>>>> > > > >>>>>>>>>> windows in the past and we need to be able to handle
>>>> this
>>>> > > case.
>>>> > > > >>>>>>>>>>
>>>> > > > >>>>>>>>>> Thus, to handle out-of-order data correctly, we need
>>>> to store
>>>> > > > >> all
>>>> > > > >>>>>> raw
>>>> > > > >>>>>>>>>> input events. Additionally, we could also store
>>>> pre-aggregated
>>>> > > > >>>>>>> results
>>>> > > > >>>>>>>>>> if we thinks it's benfitial. -- If we apply "emit only
>>>> final
>>>> > > > >>>>>> results"
>>>> > > > >>>>>>>>>> strategy, storing pre-aggregated result would not be
>>>> necessary
>>>> > > > >>>>>>> though.
>>>> > > > >>>>>>>>>>
>>>> > > > >>>>>>>>>>
>>>> > > > >>>>>>>>>> Btw: for sliding windows it might also be useful to
>>>> consider
>>>> > > > >>>>>> allowing
>>>> > > > >>>>>>>>>> users to supply a `Subtractor` -- this subtractor
>>>> could be
>>>> > > > >> applied
>>>> > > > >>>>>> on
>>>> > > > >>>>>>>>>> the current window result (in case we store it) if a
>>>> record
>>>> > > > >> drops
>>>> > > > >>>>>>> out of
>>>> > > > >>>>>>>>>> the window. Of course, not all aggregation functions
>>>> are
>>>> > > > >>>>>> subtractable
>>>> > > > >>>>>>>>>> and we can consider this as a follow up task, too, and
>>>> not
>>>> > > > >> include
>>>> > > > >>>>>> in
>>>> > > > >>>>>>>>>> this KIP for now. Thoughts?
>>>> > > > >>>>>>>>>>
>>>> > > > >>>>>>>>>>
>>>> > > > >>>>>>>>>>
>>>> > > > >>>>>>>>>> I was also thinking about the type hierarchy. I am not
>>>> sure if
>>>> > > > >>>>>>> extending
>>>> > > > >>>>>>>>>> TimeWindow is the best approach? For TimeWindows, we
>>>> can
>>>> > > > >>>>>> pre-compute
>>>> > > > >>>>>>>>>> window boundaries (cf `windowsFor()`) while for a
>>>> sliding
>>>> > > window
>>>> > > > >>>>>> the
>>>> > > > >>>>>>>>>> boundaries are data dependent. Session windows are
>>>> also data
>>>> > > > >>>>>>> dependent
>>>> > > > >>>>>>>>>> and thus they don't inherit from TimeWindow (Maybe
>>>> check out
>>>> > > the
>>>> > > > >>>>>> KIP
>>>> > > > >>>>>>>>>> that added session windows? It could provides some good
>>>> > > > >> insights.)
>>>> > > > >>>>>>> -- I
>>>> > > > >>>>>>>>>> believe the same rational applies to sliding windows?
>>>> > > > >>>>>>>>>>
>>>> > > > >>>>>>>>>>
>>>> > > > >>>>>>>>>>
>>>> > > > >>>>>>>>>> -Matthias
>>>> > > > >>>>>>>>>>
>>>> > > > >>>>>>>>>>
>>>> > > > >>>>>>>>>>
>>>> > > > >>>>>>>>>>
>>>> > > > >>>>>>>>>> On 7/10/20 12:47 PM, Boyang Chen wrote:
>>>> > > > >>>>>>>>>>> Thanks Leah and Sophie for the KIP.
>>>> > > > >>>>>>>>>>>
>>>> > > > >>>>>>>>>>> 1. I'm a bit surprised that we don't have an advance
>>>> time.
>>>> > > > >> Could
>>>> > > > >>>>>> we
>>>> > > > >>>>>>>>>>> elaborate how the storage layer is structured?
>>>> > > > >>>>>>>>>>>
>>>> > > > >>>>>>>>>>> 2. IIUC, there will be extra cost in terms of fetching
>>>> > > > >>>>>> aggregation
>>>> > > > >>>>>>>>> results,
>>>> > > > >>>>>>>>>>> since we couldn't pre-aggregate until the user asks
>>>> for it.
>>>> > > > >> Would
>>>> > > > >>>>>>> be
>>>> > > > >>>>>>>>> good
>>>> > > > >>>>>>>>>>> to also discuss it.
>>>> > > > >>>>>>>>>>>
>>>> > > > >>>>>>>>>>> 3. We haven't discussed the possibility of supporting
>>>> sliding
>>>> > > > >>>>>>> windows
>>>> > > > >>>>>>>>>>> inherently. For a user who actually uses a hopping
>>>> window,
>>>> > > > >>>>>> Streams
>>>> > > > >>>>>>>>> could
>>>> > > > >>>>>>>>>>> detect such an inefficiency doing a
>>>> window_size/advance_time
>>>> > > > >>>>>> ratio
>>>> > > > >>>>>>> to
>>>> > > > >>>>>>>>> reach
>>>> > > > >>>>>>>>>>> a conclusion on whether the write amplification is
>>>> too high
>>>> > > > >>>>>>> compared
>>>> > > > >>>>>>>>> with
>>>> > > > >>>>>>>>>>> some configured threshold. The benefit of doing so is
>>>> that
>>>> > > > >>>>>> existing
>>>> > > > >>>>>>>>> Streams
>>>> > > > >>>>>>>>>>> users don't need to change their code, learn a new
>>>> API, but
>>>> > > > >> only
>>>> > > > >>>>>> to
>>>> > > > >>>>>>>>> upgrade
>>>> > > > >>>>>>>>>>> Streams library to get benefits for their inefficient
>>>> hopping
>>>> > > > >>>>>>> window
>>>> > > > >>>>>>>>>>> implementation. There might be some compatibility
>>>> issues for
>>>> > > > >>>>>> sure,
>>>> > > > >>>>>>> but
>>>> > > > >>>>>>>>>>> worth listing them out for trade-off.
>>>> > > > >>>>>>>>>>>
>>>> > > > >>>>>>>>>>> Boyang
>>>> > > > >>>>>>>>>>>
>>>> > > > >>>>>>>>>>> On Fri, Jul 10, 2020 at 12:40 PM Leah Thomas <
>>>> > > > >>>>>> ltho...@confluent.io
>>>> > > > >>>>>>>>
>>>> > > > >>>>>>>>> wrote:
>>>> > > > >>>>>>>>>>>
>>>> > > > >>>>>>>>>>>> Hey Matthias,
>>>> > > > >>>>>>>>>>>>
>>>> > > > >>>>>>>>>>>> Thanks for pointing that out. I added the following
>>>> to the
>>>> > > > >>>>>> Propose
>>>> > > > >>>>>>>>> Changes
>>>> > > > >>>>>>>>>>>> section of the KIP:
>>>> > > > >>>>>>>>>>>>
>>>> > > > >>>>>>>>>>>> "Records that come out of order will be processed
>>>> the same
>>>> > > way
>>>> > > > >>>>>> as
>>>> > > > >>>>>>>>> in-order
>>>> > > > >>>>>>>>>>>> records, as long as they fall within the grace
>>>> period. Any
>>>> > > new
>>>> > > > >>>>>>> windows
>>>> > > > >>>>>>>>>>>> created by the late record will still be created,
>>>> and the
>>>> > > > >>>>>> existing
>>>> > > > >>>>>>>>> windows
>>>> > > > >>>>>>>>>>>> that are changed by the late record will be updated.
>>>> Any
>>>> > > > >> record
>>>> > > > >>>>>>> that
>>>> > > > >>>>>>>>> falls
>>>> > > > >>>>>>>>>>>> outside of the grace period (either user defined or
>>>> default)
>>>> > > > >>>>>> will
>>>> > > > >>>>>>> be
>>>> > > > >>>>>>>>>>>> discarded. "
>>>> > > > >>>>>>>>>>>>
>>>> > > > >>>>>>>>>>>> All the best,
>>>> > > > >>>>>>>>>>>> Leah
>>>> > > > >>>>>>>>>>>>
>>>> > > > >>>>>>>>>>>> On Thu, Jul 9, 2020 at 9:47 PM Matthias J. Sax <
>>>> > > > >>>>>> mj...@apache.org>
>>>> > > > >>>>>>>>> wrote:
>>>> > > > >>>>>>>>>>>>
>>>> > > > >>>>>>>>>>>>> Leah,
>>>> > > > >>>>>>>>>>>>>
>>>> > > > >>>>>>>>>>>>> thanks a lot for the KIP. Very well written.
>>>> > > > >>>>>>>>>>>>>
>>>> > > > >>>>>>>>>>>>> The KIP does not talk about the handling of
>>>> out-of-order
>>>> > > data
>>>> > > > >>>>>>> though.
>>>> > > > >>>>>>>>>>>>> How do you propose to address this?
>>>> > > > >>>>>>>>>>>>>
>>>> > > > >>>>>>>>>>>>>
>>>> > > > >>>>>>>>>>>>> -Matthias
>>>> > > > >>>>>>>>>>>>>
>>>> > > > >>>>>>>>>>>>> On 7/8/20 5:33 PM, Leah Thomas wrote:
>>>> > > > >>>>>>>>>>>>>> Hi all,
>>>> > > > >>>>>>>>>>>>>> I'd like to kick-off the discussion for KIP-450,
>>>> adding
>>>> > > > >>>>>> sliding
>>>> > > > >>>>>>>>> window
>>>> > > > >>>>>>>>>>>>>> aggregation support to Kafka Streams.
>>>> > > > >>>>>>>>>>>>>>
>>>> > > > >>>>>>>>>>>>>>
>>>> > > > >>>>>>>>>>>>>
>>>> > > > >>>>>>>>>>>>
>>>> > > > >>>>>>>>>
>>>> > > > >>>>>>>
>>>> > > > >>>>>>
>>>> > > > >>>>
>>>> > > > >>
>>>> > >
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
>>>> > > > >>>>>>>>>>>>>>
>>>> > > > >>>>>>>>>>>>>> Let me know what you think,
>>>> > > > >>>>>>>>>>>>>> Leah
>>>> > > > >>>>>>>>>>>>>>
>>>> > > > >>>>>>>>>>>>>
>>>> > > > >>>>>>>>>>>>>
>>>> > > > >>>>>>>>>>>>
>>>> > > > >>>>>>>>>>>
>>>> > > > >>>>>>>>>>
>>>> > > > >>>>>>>>>
>>>> > > > >>>>>>>>
>>>> > > > >>>>>>>>
>>>> > > > >>>>>>>> --
>>>> > > > >>>>>>>> -- Guozhang
>>>> > > > >>>>>>>>
>>>> > > > >>>>>>>
>>>> > > > >>>>>>
>>>> > > > >>>>>
>>>> > > > >>>>>
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>
>>>> > > > >>
>>>> > > > >
>>>> > > >
>>>> > > >
>>>> > > > Attachments:
>>>> > > > * signature.asc
>>>> > >
>>>> >
>>>>
>>>

Reply via email to