Thanks for the update Leah -- I think that all makes sense.

Cheers,
Sophie

On Tue, Jul 28, 2020 at 3:55 PM Leah Thomas <ltho...@confluent.io> wrote:

> Another minor tweak, instead of defining the window by the *size*, it will
> be defined by *timeDifference*, which is the maximum time difference
> between two events. This is a more precise way to define a window due to
> its inclusive ends, while allowing the user to create the window they
> expect. This definition fits with the current examples, where a record at
> *10* would fall into a window of time difference *5* from *[5,10]*. This
> window contains any records at 5, 6, 7, 8, 9, and 10, which is 6 instances
> instead of 5. This semantic difference is why I've shifted *size* to
> *timeDifference*.
>
> The new builder will be *withTimeDifferenceAndGrace*, keeping with other
> conventions.
>
> Let me know if there are any concerns! The vote thread is open as well
> here:
> http://mail-archives.apache.org/mod_mbox/kafka-dev/202007.mbox/browser
>
> Best,
> Leah
>
> On Mon, Jul 27, 2020 at 3:58 PM Leah Thomas <ltho...@confluent.io> wrote:
>
> > A small tweak - to make it more clear to users that grace is required, as
> > well as cleaning up some of the confusing grammar semantics of windows,
> the
> > main builder method for *slidingWindows* will be *withSizeAndGrace*
> instead
> > of *of*.  This looks like it'll be the last change (for now) on the
> > public API. If anyone has any comments or thoughts let me know.
> Otherwise,
> > I'll take this to vote shortly.
> >
> > Best,
> > Leah
> >
> > On Fri, Jul 24, 2020 at 3:45 PM Leah Thomas <ltho...@confluent.io>
> wrote:
> >
> >> To accommodate the change to a final class, I've added another
> >> *windowedBy()* function in *CogroupedKStream.java *to handle
> >> SlidingWindows.
> >>
> >> As far as the discussion goes, I think this is the last change we've
> >> talked about. If anyone has other comments or concerns, please let me
> know!
> >>
> >> Cheers,
> >> Leah
> >>
> >> On Thu, Jul 23, 2020 at 7:34 PM Leah Thomas <ltho...@confluent.io>
> wrote:
> >>
> >>> Thanks for the discussion about extending TimeWindows. I agree that
> >>> making it future proof is important, and the implementation of
> >>> SlidingWindows is unique enough that it seems logical to make it its
> own
> >>> final class.
> >>>
> >>> On that note, I've updated the KIP to make SlidingWindows a stand alone
> >>> final class, and added the *windowedBy() *API in *KGroupedStream *to
> >>> handle SlidingWindows. It seems that SlidingWindows would still be
> able to
> >>> leverage *TimeWindowedKStream by* creating a SlidingWindows version of
> >>> *TimeWindowedKStreamImpl* that implements *TimeWindowedKStream. *If
> >>> anyone sees issues with this implementation, please let me know.
> >>>
> >>> Best,
> >>> Leah
> >>>
> >>> On Wed, Jul 22, 2020 at 10:47 PM John Roesler <vvcep...@apache.org>
> >>> wrote:
> >>>
> >>>> Thanks for the reply, Sophie.
> >>>>
> >>>> That all sounds about right to me.
> >>>>
> >>>> The Windows “interface”/algorithm is quite flexible, so it makes sense
> >>>> for it to be extensible. Different implementations can (and do)
> enumerate
> >>>> different windows to suit different use cases.
> >>>>
> >>>> On the other hand, I can’t think of any way to extend SessionWindows
> to
> >>>> do something different using the same algorithm, so it makes sense
> for it
> >>>> to stay final.
> >>>>
> >>>> If we think SlidingWindows is similarly not usefully extensible, then
> >>>> we can make it final. It’s easy to remove final later, but adding it
> is not
> >>>> possible. Or we could go the other route and just make it an
> interface, on
> >>>> general principle. Both of these choices are safe API design.
> >>>>
> >>>> Thanks again,
> >>>> John
> >>>>
> >>>> On Wed, Jul 22, 2020, at 21:54, Sophie Blee-Goldman wrote:
> >>>> > >
> >>>> > > Users could pass in a custom `SessionWindows` as
> >>>> > > long as the session algorithm works correctly for it.
> >>>> >
> >>>> >
> >>>> > Well not really, SessionWindows is a final class. TimeWindows is
> also
> >>>> a
> >>>> > final class, so neither of these can be extended/customized. For a
> >>>> given
> >>>> > Windows then there would only be three (non-overlapping)
> >>>> possibilities:
> >>>> > either it's TimeWindows, SlidingWindows, or a custom  Windows. I
> don't
> >>>> > think there's any problem with determining what the user wants in
> >>>> this case
> >>>> > --
> >>>> > we would just check if it's a SlidingWindows and connect the new
> >>>> processor,
> >>>> > or else connect the existing hopping/tumbling window processor.
> >>>> >
> >>>> > I'll admit that last sentence does leave a bad taste in my mouth.
> >>>> Part of
> >>>> > that
> >>>> > is probably the "leaking" API Matthias pointed out; we just assume
> the
> >>>> > hopping/tumbling window implementation fits all custom windows. But
> I
> >>>> guess
> >>>> > if you really needed to customize the algorithm any further you may
> >>>> as well
> >>>> > stick in a transformer and do it all yourself.
> >>>> >
> >>>> > Anyways, given what we have, it does seem weird to apply one
> algorithm
> >>>> > for most Windows types and then swap in a different one for one
> >>>> specific
> >>>> > extension of Windows. So adding a new #windowedBy(SlidingWindows)
> >>>> > sounds reasonable to me.
> >>>> >
> >>>> > I'm still not convinced that we need a whole new TimeWindowedKStream
> >>>> > equivalent class for sliding windows though. It seems like the
> >>>> > hopping/tumbling
> >>>> > window implementation could benefit just as much from a subtractor
> as
> >>>> the
> >>>> > sliding windows so the only future-proofing we get is the ability to
> >>>> be
> >>>> > lazy and
> >>>> > add the subtractor to one but not the other. Of course it would only
> >>>> be an
> >>>> > optimization so we could just not apply it to one type and nothing
> >>>> would
> >>>> > break.
> >>>> > It does make me nervous to go against the "future-proof" direction,
> >>>> though.
> >>>> > Are there any other examples of things we might want to add to one
> >>>> window
> >>>> > type but not to another?
> >>>> >
> >>>> > On another note, this actually brings up a new question: should
> >>>> > SlidingWindows
> >>>> > also be final? My take is "yes" since there's no reasonable
> >>>> customization of
> >>>> > sliding windows, at least not that I can think of. Thoughts?
> >>>> >
> >>>> >
> >>>> > On Wed, Jul 22, 2020 at 7:15 PM John Roesler <vvcep...@apache.org>
> >>>> wrote:
> >>>> >
> >>>> > > Thanks, all,
> >>>> > >
> >>>> > > I can see how my conclusion was kind of a leap.
> >>>> > >
> >>>> > > What Matthias said is indeed what I was thinking. When you
> provide a
> >>>> > > window definition to the windowBy() method, you are selecting an
> >>>> algorithm
> >>>> > > that will be used to compute the windows from the input data.
> >>>> > >
> >>>> > > I didn’t mean the code implementation  “algorithm”, but the
> >>>> high-level
> >>>> > > algorithm that describes how the input stream will be transformed
> >>>> into a
> >>>> > > sequence of windows.
> >>>> > >
> >>>> > > For example, the algorithm for Windows is something like “given
> the
> >>>> record
> >>>> > > timestamp, include the record in each of the enumerated windows”.
> >>>> Note that
> >>>> > > there can be a lot of variation in how the windows are enumerated,
> >>>> which is
> >>>> > > why there are at least a couple of implementations of Windows, and
> >>>> we are
> >>>> > > open to adding more (like for natural calendar boundaries).
> >>>> > >
> >>>> > > For SessionWindows, it’s more like “if any window is within the
> gap,
> >>>> > > extend its boundaries to include this record and if two windows
> >>>> touch, then
> >>>> > > merge them”.
> >>>> > >
> >>>> > > Clearly, the algorithm for SlidingWindows doesn’t fall into either
> >>>> > > category, so it seems inappropriate to claim that it does in the
> >>>> API (by
> >>>> > > implementing Windows with stubbed methods) and then cast
> internally
> >>>> to
> >>>> > > execute a completely different algorithm.
> >>>> > >
> >>>> > > To your other observation, that the DSL object resulting from
> >>>> windowBy
> >>>> > > would look the same for Windows and SessionWindows, maybe it makes
> >>>> sense
> >>>> > > for windowBy(SessionWindows) also to return a TimeWindowedKStream.
> >>>> > >
> >>>> > > i.e.:
> >>>> > > =======================
> >>>> > > <W extends Window> TimeWindowedKStream<K, V> windowedBy(final
> >>>> Windows<W>
> >>>> > > windows);
> >>>> > > TimeWindowedKStream<K, V> windowedBy(final SlidingWindows
> windows);
> >>>> > > =======================
> >>>> > >
> >>>> > >  I can’t think of a reason this wouldn’t work. But then again, it
> >>>> would be
> >>>> > > more future-proof to go ahead and specify a different return type
> >>>> now, if
> >>>> > > we think we'll want to add subtractors and stuff later. I don't
> >>>> have a
> >>>> > > strong feeling about that part of the API. It seems to be
> >>>> independent of
> >>>> > > whether SlidingWindows extends Windows or not.
> >>>> > >
> >>>> > > Thanks,
> >>>> > > -John
> >>>> > >
> >>>> > > On Wed, Jul 22, 2020, at 19:41, Matthias J. Sax wrote:
> >>>> > > > I think what John tries to say is the following:
> >>>> > > >
> >>>> > > > We have `windowedBy(Windows)` that accept hopping/tumbling
> >>>> windows but
> >>>> > > > also custom window and we use a specific algorithm. Note, that
> >>>> custom
> >>>> > > > windows must "work" based on the used algorithm.
> >>>> > > >
> >>>> > > > For session windows we have `windowedBy(SessionWindows)` and
> >>>> apply a
> >>>> > > > different algorithm. Users could pass in a custom
> >>>> `SessionWindows` as
> >>>> > > > long as the session algorithm works correctly for it.
> >>>> > > >
> >>>> > > > For the new sliding windows, we want to use a different
> algorithm
> >>>> > > > compare to hopping/tumbling windows. If we let sliding window
> >>>> extend
> >>>> > > > `Windows`, we can decide at runtime if we need to use the
> >>>> > > > hopping/tumbling window algorithm for hopping/tumbling windows
> or
> >>>> the
> >>>> > > > new sliding window algorithm for sliding windows. However, if we
> >>>> get a
> >>>> > > > custom window, which algorithm do we pick now? The existing
> >>>> > > > tumbling/hopping window algorithm of the new sliding window
> >>>> algorithm?
> >>>> > > > Both a custom "time-window" and custom "sliding window"
> implement
> >>>> the
> >>>> > > > generic `Windows` class and thus we cannot make a decision as we
> >>>> don't
> >>>> > > > know the user's intent.
> >>>> > > >
> >>>> > > > As a matter of fact, even if the user might not be aware of it,
> >>>> the
> >>>> > > > algorithm we use does already leak into the API (if a user
> extends
> >>>> > > > `Windows` is must work with our hopping/tumbling window
> algorithm
> >>>> and if
> >>>> > > > a user extends `SessionWindows` it must work with our session
> >>>> algorithm)
> >>>> > > > and it seems we need to preserve this property for sliding
> window.
> >>>> > > >
> >>>> > > >
> >>>> > > > -Matthias
> >>>> > > >
> >>>> > > > On 7/22/20 4:35 PM, Sophie Blee-Goldman wrote:
> >>>> > > > > Hey John,
> >>>> > > > >
> >>>> > > > > Just a few follow-up questions/comments about the whole
> Windows
> >>>> thing:
> >>>> > > > >
> >>>> > > > > That's a good way of looking at things; in particular the
> point
> >>>> about
> >>>> > > > > SessionWindows
> >>>> > > > > for example requiring a Merger while other "statically
> >>>> enumerable"
> >>>> > > windows
> >>>> > > > > require
> >>>> > > > > only an adder seems to touch on the heart of the matter.
> >>>> > > > >
> >>>> > > > >  It seems like what Time and Universal (and any other Windows
> >>>> > > > >> implementation) have in common is that the windows are
> >>>> statically
> >>>> > > > >> enumerable.
> >>>> > > > >> As a consequence, they can all rely on an aggregation
> >>>> maintenence
> >>>> > > algorithm
> >>>> > > > >> that involves enumerating each of the windows and updating
> it.
> >>>> That
> >>>> > > > >> also means that their DSL object (TimeWindowedKStream)
> doesn't
> >>>> need
> >>>> > > > >> "subtractors" or "mergers", but only "adders"; again, this
> is a
> >>>> > > consequence
> >>>> > > > >> of the fact that the windows are enumerable.
> >>>> > > > >
> >>>> > > > >
> >>>> > > > > Given that, I'm a bit confused why you conclude that sliding
> >>>> windows
> >>>> > > are
> >>>> > > > > fundamentally
> >>>> > > > > different from the "statically enumerable" windows -- sliding
> >>>> windows
> >>>> > > > > require only an
> >>>> > > > > adder too. I'm not sure it's a consequence of being
> enumerable,
> >>>> or that
> >>>> > > > > being enumerable
> >>>> > > > > is the fundamental property that unites all Windows (ignoring
> >>>> > > JoinWindows
> >>>> > > > > here). Yes,  it
> >>>> > > > > currently does apply to all Windows implementations, but we
> >>>> shouldn't
> >>>> > > > > assume that it
> >>>> > > > > *has *to be that way on the basis that it currently happens to
> >>>> be.
> >>>> > > > >
> >>>> > > > > Also, the fact that they can all rely on the same aggregation
> >>>> algorithm
> >>>> > > > > seems like an
> >>>> > > > > implementation detail and it would be weird to force a
> >>>> separate/new
> >>>> > > DSL API
> >>>> > > > > just because
> >>>> > > > > under the covers we swap in a different processor.
> >>>> > > > >
> >>>> > > > > To be fair, I don't think there's a strong reason *against*
> not
> >>>> > > extending
> >>>> > > > > Windows -- in the end
> >>>> > > > > it will just mean adding a new #windowedBy method and
> >>>> copy/pasting
> >>>> > > > > everything from
> >>>> > > > >  TimeWindowedKStream pretty much word for word. But anytime
> you
> >>>> find
> >>>> > > > > yourself
> >>>> > > > > copying over code almost exactly, there should probably be a
> >>>> good
> >>>> > > reason
> >>>> > > > > why :)
> >>>> > > > >
> >>>> > > > >
> >>>> > > > > On Wed, Jul 22, 2020 at 3:48 PM John Roesler <
> >>>> vvcep...@apache.org>
> >>>> > > wrote:
> >>>> > > > >
> >>>> > > > >> Thanks Leah!
> >>>> > > > >>
> >>>> > > > >> 5) Regarding the empty windows, I'm wondering if we should
> >>>> simply
> >>>> > > propose
> >>>> > > > >> that the windows should not be emitted downstream of the
> >>>> operator or
> >>>> > > > >> visible in IQ. Then, it'll be up to the implementation to
> make
> >>>> it
> >>>> > > happen.
> >>>> > > > >> I'm
> >>>> > > > >> personally not concerned about it, since it seems like there
> >>>> are
> >>>> > > multiple
> >>>> > > > >> ways to accomplish this.
> >>>> > > > >>
> >>>> > > > >> Note, the discrepancy Matthias pointed out is actually a
> >>>> design bug.
> >>>> > > The
> >>>> > > > >> windowed aggregation (like every operation in Streams)
> >>>> produces a
> >>>> > > "view",
> >>>> > > > >> which then forms the basis of downstream operations. When we
> >>>> pass the
> >>>> > > > >> Materialized option to the operation, all we're doing is
> >>>> saying to
> >>>> > > > >> "materialize"
> >>>> > > > >> the view (aka, actually store the computed view) and also
> make
> >>>> it
> >>>> > > > >> queriable.
> >>>> > > > >> It would be illegal for the "queriable, materialized view" to
> >>>> differ
> >>>> > > in
> >>>> > > > >> any way
> >>>> > > > >> from the "view". So, it seems we must either propose to emit
> >>>> the empty
> >>>> > > > >> windows AND make them visible in IQ, or propose NOT to emit
> >>>> the empty
> >>>> > > > >> windows AND NOT make them visible in IQ.
> >>>> > > > >>
> >>>> > > > >> 7) Regarding whether we can extend TimeWindows (or Windows):
> >>>> > > > >> I've been mulling this over more. I think it's worth asking
> the
> >>>> > > question of
> >>>> > > > >> what these classes even mean. For example, why is
> >>>> SessionWindows a
> >>>> > > > >> different thing from TimeWindows and UniversalWindows (which
> >>>> are both
> >>>> > > > >> Windows)?
> >>>> > > > >>
> >>>> > > > >> This conversation is extra complicated because of the
> >>>> incomplete and
> >>>> > > > >> mis-matched class hierarchy, but we can try to look past it
> >>>> for now.
> >>>> > > > >>
> >>>> > > > >> It seems like what Time and Universal (and any other Windows
> >>>> > > > >> implementation) have in common is that the windows are
> >>>> statically
> >>>> > > > >> enumerable.
> >>>> > > > >> As a consequence, they can all rely on an aggregation
> >>>> maintenence
> >>>> > > algorithm
> >>>> > > > >> that involves enumerating each of the windows and updating
> it.
> >>>> That
> >>>> > > > >> also means that their DSL object (TimeWindowedKStream)
> doesn't
> >>>> need
> >>>> > > > >> "subtractors" or "mergers", but only "adders"; again, this
> is a
> >>>> > > consequence
> >>>> > > > >> of the fact that the windows are enumerable.
> >>>> > > > >>
> >>>> > > > >> In contrast, session windows are data driven, so they are not
> >>>> > > statically
> >>>> > > > >> enumerable. Their algorithm has to rely on scans, and to do
> >>>> the scans,
> >>>> > > > >> it needs to know the "inactivity gap", which needs to be part
> >>>> of the
> >>>> > > window
> >>>> > > > >> definition. Likewise, session windows have the property that
> >>>> they need
> >>>> > > > >> to be merged, so their DSL object also requires mergers.
> >>>> > > > >>
> >>>> > > > >> It really seems like your new window definition doesn't fit
> >>>> into
> >>>> > > either
> >>>> > > > >> category. It uses a different algorithm, which relies on
> >>>> scans, but
> >>>> > > it is
> >>>> > > > >> also fixed in size, so it doesn't need mergers. In this
> >>>> situation, it
> >>>> > > seems
> >>>> > > > >> like the safe bet is to just create SessionWindows with no
> >>>> interface
> >>>> > > and
> >>>> > > > >> add a separate set of DSL operations and objects. It's a
> >>>> little extra
> >>>> > > code,
> >>>> > > > >> but it seems to keep everything tidier and more
> >>>> comprehensible, both
> >>>> > > > >> for us and for users.
> >>>> > > > >>
> >>>> > > > >> What do you think?
> >>>> > > > >> -John
> >>>> > > > >>
> >>>> > > > >>
> >>>> > > > >>
> >>>> > > > >> On Wed, Jul 22, 2020, at 10:30, Leah Thomas wrote:
> >>>> > > > >>> Hi Matthias,
> >>>> > > > >>>
> >>>> > > > >>> Thanks for the suggestions, I've updated the KIP and child
> >>>> page
> >>>> > > > >> accordingly
> >>>> > > > >>> and addressed some below.
> >>>> > > > >>>
> >>>> > > > >>> 1) For the mandatory grace period, we should use a static
> >>>> builder
> >>>> > > method
> >>>> > > > >>>> that take two parameters.
> >>>> > > > >>>>
> >>>> > > > >>>
> >>>> > > > >>>  That makes sense, I've changed that in the public API.
> >>>> > > > >>>
> >>>> > > > >>> Btw: this implementation actually raises an issue for IQ:
> >>>> those empty
> >>>> > > > >>>> windows would be returned.
> >>>> > > > >>>
> >>>> > > > >>>
> >>>> > > > >>> This is a great point, with the current implementation plan
> >>>> empty
> >>>> > > windows
> >>>> > > > >>> would be returned. I think creating a second window store
> >>>> would
> >>>> > > > >> definitely
> >>>> > > > >>> work, but there would be more overhead in having two stores
> >>>> and
> >>>> > > switching
> >>>> > > > >>> windows between the stores, as well as doing scans in both
> >>>> stores to
> >>>> > > find
> >>>> > > > >>> existing windows. There might be a way to do avoid emitting
> >>>> empty
> >>>> > > windows
> >>>> > > > >>> without creating a second window store, I'll look more into
> >>>> it.
> >>>> > > > >>>
> >>>> > > > >>> Cheers,
> >>>> > > > >>> Leah
> >>>> > > > >>>
> >>>> > > > >>> On Tue, Jul 21, 2020 at 1:25 PM Matthias J. Sax <
> >>>> mj...@apache.org>
> >>>> > > > >> wrote:
> >>>> > > > >>>
> >>>> > > > >>>> Thanks for updating the KIP.
> >>>> > > > >>>>
> >>>> > > > >>>> Couple of follow up comments:
> >>>> > > > >>>>
> >>>> > > > >>>> 1) For the mandatory grace period, we should use a static
> >>>> builder
> >>>> > > > >> method
> >>>> > > > >>>> that take two parameters. This provides a better API as
> user
> >>>> cannot
> >>>> > > > >>>> forget to set the grace period. Throwing a runtime
> exception
> >>>> seems
> >>>> > > not
> >>>> > > > >>>> to be the best way to handle this case.
> >>>> > > > >>>>
> >>>> > > > >>>>
> >>>> > > > >>>>
> >>>> > > > >>>> 2) In Fig.2 you list 10 hopping windows. I believe it
> should
> >>>> > > actually
> >>>> > > > >> be
> >>>> > > > >>>> more? There first hopping window would be [-6,-4[ and the
> >>>> last one
> >>>> > > > >> would
> >>>> > > > >>>> be from [19,29[ -- hence, the cost saving are actually much
> >>>> higher.
> >>>> > > > >>>>
> >>>> > > > >>>>
> >>>> > > > >>>>
> >>>> > > > >>>> 3a) IQ: you are saying that the user need to compute the
> >>>> start time
> >>>> > > as
> >>>> > > > >>>>
> >>>> > > > >>>>> windowSize+the time they're looking at
> >>>> > > > >>>>
> >>>> > > > >>>> Should this be "targetTime - windowSize" instead?
> >>>> > > > >>>>
> >>>> > > > >>>>
> >>>> > > > >>>>
> >>>> > > > >>>> 3b) IQ: in you example you say "window size of 10 minutes"
> >>>> with an
> >>>> > > > >>>> incident at 9:15.
> >>>> > > > >>>>
> >>>> > > > >>>>> they're looking for a window with the start time of 8:15.
> >>>> > > > >>>>
> >>>> > > > >>>> The example does not seem to add up?
> >>>> > > > >>>>
> >>>> > > > >>>>
> >>>> > > > >>>>
> >>>> > > > >>>> 4) For "Processing Windows": you describe a three step
> >>>> approach: I
> >>>> > > just
> >>>> > > > >>>> want to point out, that step (1) is not necessary for each
> >>>> input
> >>>> > > > >> record,
> >>>> > > > >>>> because timestamps are not guaranteed to be unique and
> thus a
> >>>> > > previous
> >>>> > > > >>>> record with the same key and timestamp might have create
> the
> >>>> windows
> >>>> > > > >>>> already.
> >>>> > > > >>>>
> >>>> > > > >>>> Nit: I am also not exactly sure what you mean by step (3)
> as
> >>>> you use
> >>>> > > > >> the
> >>>> > > > >>>> word "send". I guess you mean "put"?
> >>>> > > > >>>>
> >>>> > > > >>>> It seem there are actually more details in the sub-page:
> >>>> > > > >>>>
> >>>> > > > >>>>> A new record for SlidingWindows will always create two new
> >>>> windows.
> >>>> > > > >> If
> >>>> > > > >>>> either of those windows already exist in the windows store,
> >>>> their
> >>>> > > > >>>> aggregation will simply be updated to include the new
> >>>> record, but no
> >>>> > > > >>>> duplicate window will be added to the WindowStore.
> >>>> > > > >>>>
> >>>> > > > >>>> However, the first and second sentence contradict each
> other
> >>>> a
> >>>> > > little
> >>>> > > > >>>> bit. I think the first sentence is not correct.
> >>>> > > > >>>>
> >>>> > > > >>>> Nit:
> >>>> > > > >>>>
> >>>> > > > >>>>> For in-order records, the left window will always be
> empty.
> >>>> > > > >>>>
> >>>> > > > >>>> This should be "right window" ?
> >>>> > > > >>>>
> >>>> > > > >>>>
> >>>> > > > >>>>
> >>>> > > > >>>> 5) "Emitting Results": it might be worth to point out,
> that a
> >>>> > > > >>>> second/future window of a new record is create with no
> >>>> records, and
> >>>> > > > >>>> thus, even if it's initialized it won't be emitted. Only
> if a
> >>>> > > > >>>> consecutive record falls into the window, the window would
> be
> >>>> > > updates
> >>>> > > > >>>> and the window result (for a window content of one record)
> >>>> would be
> >>>> > > > >> sent
> >>>> > > > >>>> downstream.
> >>>> > > > >>>>
> >>>> > > > >>>> Again, the sub-page contains this details. Might still be
> >>>> worth to
> >>>> > > add
> >>>> > > > >>>> to the top level page, too.
> >>>> > > > >>>>
> >>>> > > > >>>> Btw: this implementation actually raises an issue for IQ:
> >>>> those
> >>>> > > empty
> >>>> > > > >>>> windows would be returned. Thus I am wondering if we need
> to
> >>>> use two
> >>>> > > > >>>> stores internally? One store for actual windows and one
> >>>> store for
> >>>> > > empty
> >>>> > > > >>>> windows? If an empty window is updated, it's move to the
> >>>> other
> >>>> > > store?
> >>>> > > > >>>> For IQ, we only allow to query the non-empty-window store?
> >>>> > > > >>>>
> >>>> > > > >>>>
> >>>> > > > >>>>
> >>>> > > > >>>> 6) On the sub-page:
> >>>> > > > >>>>
> >>>> > > > >>>>> The left window of in-order records and both windows for
> >>>> > > out-of-order
> >>>> > > > >>>> records need to be updated with the values of records that
> >>>> have
> >>>> > > already
> >>>> > > > >>>> been processed.
> >>>> > > > >>>>
> >>>> > > > >>>> Why "both windows for out-of-order records"? IMHO, we don't
> >>>> know how
> >>>> > > > >>>> many existing windows needs to be updated when processing
> an
> >>>> > > > >>>> out-of-order record. Of course, an out-of-order record
> could
> >>>> not
> >>>> > > fall
> >>>> > > > >>>> into any existing window but create two new windows, too.
> >>>> > > > >>>>
> >>>> > > > >>>>>  Because each record creates one new window that includes
> >>>> itself
> >>>> > > and
> >>>> > > > >> one
> >>>> > > > >>>> window that does not
> >>>> > > > >>>>
> >>>> > > > >>>> As state above, this does not seem to hold. I understand
> why
> >>>> you
> >>>> > > mean,
> >>>> > > > >>>> but it would be good to be exact.
> >>>> > > > >>>>
> >>>> > > > >>>> Figure 2: You use the term "late" but you mean
> >>>> "out-of-order" I
> >>>> > > guess
> >>>> > > > >> --
> >>>> > > > >>>> a record is _late_ if it's not processed any longer as the
> >>>> grace
> >>>> > > period
> >>>> > > > >>>> passed already.
> >>>> > > > >>>>
> >>>> > > > >>>> Figure 2: "Late" should be out-or-order. The example text
> >>>> say a
> >>>> > > window
> >>>> > > > >>>> [16,26] should be created but the figure shows the green
> >>>> window as
> >>>> > > > >> [15,20].
> >>>> > > > >>>>
> >>>> > > > >>>> About the blue window: maybe add not that the blue window
> >>>> contains
> >>>> > > the
> >>>> > > > >>>> aggregate we need for the green window, _before_ the new
> >>>> record `a`
> >>>> > > is
> >>>> > > > >>>> added to the blue window.
> >>>> > > > >>>>
> >>>> > > > >>>>
> >>>> > > > >>>>
> >>>> > > > >>>> 7) I am not really happy to extend TimeWindows and I think
> >>>> the
> >>>> > > argument
> >>>> > > > >>>> about JoinWindows is not the best (IMHO, JoinWindows do it
> >>>> already
> >>>> > > > >> wrong
> >>>> > > > >>>> and we just repeat the same mistake). However, it seems our
> >>>> window
> >>>> > > > >>>> hierarchy is "broken" already and it might be out of scope
> >>>> for this
> >>>> > > KIP
> >>>> > > > >>>> to fix it. Hence, I am ok that we bite the bullet for now
> >>>> and clean
> >>>> > > it
> >>>> > > > >>>> up later.
> >>>> > > > >>>>
> >>>> > > > >>>>
> >>>> > > > >>>>
> >>>> > > > >>>> -Matthias
> >>>> > > > >>>>
> >>>> > > > >>>>
> >>>> > > > >>>> On 7/20/20 5:18 PM, Guozhang Wang wrote:
> >>>> > > > >>>>> Hi Leah,
> >>>> > > > >>>>>
> >>>> > > > >>>>> Thanks for the updated KIP. I agree that extending
> >>>> SlidingWindows
> >>>> > > > >> from
> >>>> > > > >>>>> Windows is fine for the sake of not introducing more
> public
> >>>> APIs
> >>>> > > (and
> >>>> > > > >>>> their
> >>>> > > > >>>>> internal xxxImpl classes), and its cons is small enough to
> >>>> tolerate
> >>>> > > > >> to
> >>>> > > > >>>> me.
> >>>> > > > >>>>>
> >>>> > > > >>>>>
> >>>> > > > >>>>> Guozhang
> >>>> > > > >>>>>
> >>>> > > > >>>>>
> >>>> > > > >>>>> On Mon, Jul 20, 2020 at 1:49 PM Leah Thomas <
> >>>> ltho...@confluent.io>
> >>>> > > > >>>> wrote:
> >>>> > > > >>>>>
> >>>> > > > >>>>>> Hi all,
> >>>> > > > >>>>>>
> >>>> > > > >>>>>> Thanks for the feedback on the KIP. I've updated the KIP
> >>>> page
> >>>> > > > >>>>>> <
> >>>> > > > >>>>>>
> >>>> > > > >>>>
> >>>> > > > >>
> >>>> > >
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> >>>> > > > >>>>>>>
> >>>> > > > >>>>>> to address these points and have created a child page
> >>>> > > > >>>>>> <
> >>>> > > > >>>>>>
> >>>> > > > >>>>
> >>>> > > > >>
> >>>> > >
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450
> >>>> > > > >>>>>>>
> >>>> > > > >>>>>> to go more in depth on certain implementation details.
> >>>> > > > >>>>>>
> >>>> > > > >>>>>> *Grace Period:*
> >>>> > > > >>>>>> I think Sophie raises a good point that the default grace
> >>>> period
> >>>> > > of
> >>>> > > > >> 24
> >>>> > > > >>>>>> hours is often too long and was chosen when retention
> time
> >>>> and
> >>>> > > grace
> >>>> > > > >>>> period
> >>>> > > > >>>>>> were the same. For SlidingWindows, I propose we make the
> >>>> grace
> >>>> > > > >> period
> >>>> > > > >>>>>> mandatory. To keep formatting consistent with other types
> >>>> of
> >>>> > > > >> windows,
> >>>> > > > >>>> grace
> >>>> > > > >>>>>> period won't be an additional parameter in the #of
> method,
> >>>> but
> >>>> > > will
> >>>> > > > >>>> still
> >>>> > > > >>>>>> look like it does in other use cases:
> >>>> > > > >>>>>>
> >>>> .windowedBy(SlidingWindows.of(twentySeconds).grace(fiftySeconds).
> >>>> > > If
> >>>> > > > >>>> grace
> >>>> > > > >>>>>> period isn't properly initialized, an error will be
> thrown
> >>>> through
> >>>> > > > >> the
> >>>> > > > >>>>>> process method.
> >>>> > > > >>>>>>
> >>>> > > > >>>>>> *Storage Layer + Aggregation:*
> >>>> > > > >>>>>> SlidingWindows will use a WindowStore because computation
> >>>> can be
> >>>> > > > >> done
> >>>> > > > >>>> with
> >>>> > > > >>>>>> the information stored in a WindowStore (window timestamp
> >>>> and
> >>>> > > > >> value).
> >>>> > > > >>>> Using
> >>>> > > > >>>>>> the WindowStore also simplifies computation as
> >>>> SlidingWindows can
> >>>> > > > >>>> leverage
> >>>> > > > >>>>>> existing processes. Because we are using a WindowStore,
> the
> >>>> > > > >> aggregation
> >>>> > > > >>>>>> process will be similar to that of a hopping window. As
> >>>> records
> >>>> > > > >> come in
> >>>> > > > >>>>>> their value is added to the aggregation that already
> >>>> exists,
> >>>> > > > >> following
> >>>> > > > >>>> the
> >>>> > > > >>>>>> same procedure as hopping windows. The aggregation
> >>>> difference
> >>>> > > > >> between
> >>>> > > > >>>>>> SlidingWindows and HoppingWindows comes in creating new
> >>>> windows
> >>>> > > for
> >>>> > > > >> a
> >>>> > > > >>>>>> SlidingWindow, where you need to find the existing
> records
> >>>> that
> >>>> > > > >> belong
> >>>> > > > >>>> to
> >>>> > > > >>>>>> the new window. This computation is similar to the
> >>>> aggregation in
> >>>> > > > >>>>>> SessionWindows and requires a scan to the WindowStore to
> >>>> find the
> >>>> > > > >> window
> >>>> > > > >>>>>> with the aggregation needed, which will always be
> >>>> pre-computed.
> >>>> > > The
> >>>> > > > >> scan
> >>>> > > > >>>>>> requires creating an iterator, but should have minimal
> >>>> performance
> >>>> > > > >>>> effects
> >>>> > > > >>>>>> as this strategy is already implemented in
> SessionWindows.
> >>>> More
> >>>> > > > >> details
> >>>> > > > >>>> on
> >>>> > > > >>>>>> finding the aggregation that needs to be put in a new
> >>>> window can
> >>>> > > be
> >>>> > > > >>>> found
> >>>> > > > >>>>>> on the implementation page
> >>>> > > > >>>>>> <
> >>>> > > > >>>>>>
> >>>> > > > >>>>
> >>>> > > > >>
> >>>> > >
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450
> >>>> > > > >>>>>>>
> >>>> > > > >>>>>> .
> >>>> > > > >>>>>>
> >>>> > > > >>>>>> *Extending Windows<TimeWindow>, Windows<Window> or
> nothing*
> >>>> > > > >>>>>> Because SlidingWindows are still defined by a windowSize
> >>>> (whereas
> >>>> > > > >>>>>> SessionWindows are defined purely by data), I think it
> >>>> makes sense
> >>>> > > > >> to
> >>>> > > > >>>>>> leverage the existing Window processes instead of
> creating
> >>>> a new
> >>>> > > > >> store
> >>>> > > > >>>> type
> >>>> > > > >>>>>> that would be very similar to the WindowStore. While it's
> >>>> true
> >>>> > > that
> >>>> > > > >> the
> >>>> > > > >>>>>> #windowsFor method isn't necessary for SlidingWindows,
> >>>> JoinWindows
> >>>> > > > >> also
> >>>> > > > >>>>>> extends Windows<Window> and throws an
> >>>> > > UnsupportedOperationException
> >>>> > > > >> in
> >>>> > > > >>>> the
> >>>> > > > >>>>>> #windowsFor method, which is what SlidingWindows can do.
> >>>> The
> >>>> > > > >> difference
> >>>> > > > >>>>>> between extending Windows<TimeWindow> or Windows<Window>
> is
> >>>> > > > >> minimal, as
> >>>> > > > >>>>>> both are ways to pass window parameters. Extending
> >>>> > > > >> Windows<TimeWindow>
> >>>> > > > >>>> will
> >>>> > > > >>>>>> give us more leverage in utilizing existing processes.
> >>>> > > > >>>>>>
> >>>> > > > >>>>>> *Emit Strategy*
> >>>> > > > >>>>>> I would argue that emitting for every update is still the
> >>>> best way
> >>>> > > > >> to go
> >>>> > > > >>>>>> for SlidingWindows because it mimics the other types of
> >>>> windows,
> >>>> > > and
> >>>> > > > >>>>>> suppression can be leveraged to limit what SlidingWindows
> >>>> emits.
> >>>> > > > >> While
> >>>> > > > >>>> some
> >>>> > > > >>>>>> users may only want to see the last value, others may
> want
> >>>> to see
> >>>> > > > >> more,
> >>>> > > > >>>> and
> >>>> > > > >>>>>> leaving the emit strategy to emit partial results allows
> >>>> both
> >>>> > > users
> >>>> > > > >> to
> >>>> > > > >>>>>> access what they want.
> >>>> > > > >>>>>>
> >>>> > > > >>>>>> *Additional Features*
> >>>> > > > >>>>>> Supporting sliding windows inherently, and shifting
> >>>> inefficient
> >>>> > > > >> hopping
> >>>> > > > >>>>>> windows to sliding windows, is an interesting idea and
> >>>> could be
> >>>> > > > >> built on
> >>>> > > > >>>>>> top of SlidingWindows when they are finished, but right
> >>>> now seems
> >>>> > > > >> out of
> >>>> > > > >>>>>> scope for the needs of this KIP. Similarly, including a
> >>>> > > > >> `subtraction`
> >>>> > > > >>>>>> feature could have performance improvements, but doesn't
> >>>> seem
> >>>> > > > >> necessary
> >>>> > > > >>>> for
> >>>> > > > >>>>>> the implementation of this KIP.
> >>>> > > > >>>>>>
> >>>> > > > >>>>>> Let me know what you think of the updates,
> >>>> > > > >>>>>>
> >>>> > > > >>>>>> Leah
> >>>> > > > >>>>>>
> >>>> > > > >>>>>> On Thu, Jul 16, 2020 at 11:57 AM John Roesler <
> >>>> > > vvcep...@apache.org>
> >>>> > > > >>>> wrote:
> >>>> > > > >>>>>>
> >>>> > > > >>>>>>> Hello all,
> >>>> > > > >>>>>>>
> >>>> > > > >>>>>>> Thanks for the KIP, Leah!
> >>>> > > > >>>>>>>
> >>>> > > > >>>>>>> Regarding (1): I'd go farther actually. Making Windows
> an
> >>>> > > abstract
> >>>> > > > >>>>>>> class was a mistake from the beginning that led to us
> not
> >>>> being
> >>>> > > > >>>>>>> able to fix a very confusing situation for users around
> >>>> retention
> >>>> > > > >>>> times,
> >>>> > > > >>>>>>> final results emitting, etc. Thus, I would not suggest
> >>>> extending
> >>>> > > > >>>>>>> TimeWindows for sure, but would also not suggest
> extending
> >>>> > > Windows.
> >>>> > > > >>>>>>>
> >>>> > > > >>>>>>> The very simplest thing to do is follow the example of
> >>>> > > > >> SessionWindows,
> >>>> > > > >>>>>>> which is just a completely self-contained class. If we
> >>>> don't mess
> >>>> > > > >> with
> >>>> > > > >>>>>>> class inheritance, we won't ever have any of the
> problems
> >>>> related
> >>>> > > > >> to
> >>>> > > > >>>>>>> class inheritance. This is my preferred solution.
> >>>> > > > >>>>>>>
> >>>> > > > >>>>>>> Still, Sliding windows has a lot in common with
> >>>> TimeWindows and
> >>>> > > > >> other
> >>>> > > > >>>>>>> fixed-size windows, namely that the windows are fixed in
> >>>> size. If
> >>>> > > > >> we
> >>>> > > > >>>> want
> >>>> > > > >>>>>>> to preserve the current two-part windowing API in which
> >>>> you can
> >>>> > > > >> window
> >>>> > > > >>>>>>> by either "fixed" or "data driven" modes, I'd suggest we
> >>>> avoid
> >>>> > > > >>>> increasing
> >>>> > > > >>>>>>> the blast radius of Windows by taking the opportunity to
> >>>> replace
> >>>> > > it
> >>>> > > > >>>> with
> >>>> > > > >>>>>>> a proper interface and implement that interface instead.
> >>>> > > > >>>>>>>
> >>>> > > > >>>>>>> For example:
> >>>> > > > >>>>>>> https://github.com/apache/kafka/pull/9031
> >>>> > > > >>>>>>>
> >>>> > > > >>>>>>> Then, SlidingWindows would just implement
> >>>> > > FixedSizeWindowDefinition
> >>>> > > > >>>>>>>
> >>>> > > > >>>>>>> ======
> >>>> > > > >>>>>>>
> >>>> > > > >>>>>>> Regarding (2), it seems more straightforward as a user
> of
> >>>> Streams
> >>>> > > > >>>>>>> to just have one mental model. _All_ of our aggregation
> >>>> > > operations
> >>>> > > > >>>>>>> follow an eager emission model, in which we just emit an
> >>>> update
> >>>> > > > >>>> whenever
> >>>> > > > >>>>>>> an update is available. We already provided Suppression
> to
> >>>> > > > >> explicitly
> >>>> > > > >>>>>> apply
> >>>> > > > >>>>>>> different update semantics in the case it's required.
> Why
> >>>> should
> >>>> > > we
> >>>> > > > >>>>>> define
> >>>> > > > >>>>>>> a snowflake operation with completely different
> semantics
> >>>> from
> >>>> > > > >>>> everything
> >>>> > > > >>>>>>> else? I.e., systems are generally easier to use when
> they
> >>>> follow
> >>>> > > a
> >>>> > > > >> few
> >>>> > > > >>>>>>> simple, composable rules than when they have a lot of
> >>>> different,
> >>>> > > > >>>> specific
> >>>> > > > >>>>>>> rules.
> >>>> > > > >>>>>>>
> >>>> > > > >>>>>>>
> >>>> > > > >>>>>>> ======
> >>>> > > > >>>>>>>
> >>>> > > > >>>>>>> New point: (4):
> >>>> > > > >>>>>>> It would be nice to include some examples of user code
> >>>> that would
> >>>> > > > >> use
> >>>> > > > >>>> the
> >>>> > > > >>>>>>> new API, which should include:
> >>>> > > > >>>>>>> 1. using the DSL with the sliding window definition
> >>>> > > > >>>>>>> 2. accessing the stored results of a sliding window
> >>>> aggregation
> >>>> > > > >> via IQ
> >>>> > > > >>>>>>> 3. defining a custom processor to access sliding windows
> >>>> in a
> >>>> > > store
> >>>> > > > >>>>>>>
> >>>> > > > >>>>>>> It generally helps reviewers wrap their heads around the
> >>>> > > proposal,
> >>>> > > > >> as
> >>>> > > > >>>>>> well
> >>>> > > > >>>>>>> as shaking out any design issues that would otherwise
> >>>> only come
> >>>> > > up
> >>>> > > > >>>> during
> >>>> > > > >>>>>>> implementation/testing/review.
> >>>> > > > >>>>>>>
> >>>> > > > >>>>>>> Thanks again for the awesome proposal!
> >>>> > > > >>>>>>> -John
> >>>> > > > >>>>>>>
> >>>> > > > >>>>>>>
> >>>> > > > >>>>>>> On Tue, Jul 14, 2020, at 12:31, Guozhang Wang wrote:
> >>>> > > > >>>>>>>> Hello Leah,
> >>>> > > > >>>>>>>>
> >>>> > > > >>>>>>>> Thanks for the nice written KIP. A few thoughts:
> >>>> > > > >>>>>>>>
> >>>> > > > >>>>>>>> 1) I echo the other reviewer's comments regarding the
> >>>> typing:
> >>>> > > why
> >>>> > > > >>>>>>> extending
> >>>> > > > >>>>>>>> TimeWindow instead of just extending Window?
> >>>> > > > >>>>>>>>
> >>>> > > > >>>>>>>> 2) I also feel that emitting policy for this type of
> >>>> windowing
> >>>> > > > >>>>>>> aggregation
> >>>> > > > >>>>>>>> may be different from the existing ones. Existing
> >>>> emitting
> >>>> > > policy
> >>>> > > > >> is
> >>>> > > > >>>>>> very
> >>>> > > > >>>>>>>> simple: emit every time when window get updates, and
> >>>> emit every
> >>>> > > > >> time
> >>>> > > > >>>> on
> >>>> > > > >>>>>>>> out-of-ordering data within grace period, this is
> >>>> because for
> >>>> > > > >>>>>>> time-windows
> >>>> > > > >>>>>>>> the window close time is strictly depend on the window
> >>>> start
> >>>> > > time
> >>>> > > > >>>> which
> >>>> > > > >>>>>>> is
> >>>> > > > >>>>>>>> fixed, while for session-windows although the window
> >>>> open/close
> >>>> > > > >> time
> >>>> > > > >>>> is
> >>>> > > > >>>>>>>> also data-dependent it is relatively infrequent
> compared
> >>>> to the
> >>>> > > > >>>>>>>> sliding-windows. For this KIP, since each new data
> would
> >>>> cause a
> >>>> > > > >>>>>>>> new sliding-window, the num. windows maintained
> >>>> logically could
> >>>> > > be
> >>>> > > > >>>> much
> >>>> > > > >>>>>>>> larger and hence emitting on each update may be too
> >>>> aggressive.
> >>>> > > > >>>>>>>>
> >>>> > > > >>>>>>>> 3) Although KIP itself should be focusing on user face
> >>>> > > > >> interfaces, I'd
> >>>> > > > >>>>>>>> suggest we create a children page of KIP-450 discussing
> >>>> about
> >>>> > > its
> >>>> > > > >>>>>>>> implementations as well, since some of that may drive
> the
> >>>> > > > >> interface
> >>>> > > > >>>>>>> design.
> >>>> > > > >>>>>>>> E.g. personally I think having a combiner interface in
> >>>> addition
> >>>> > > to
> >>>> > > > >>>>>>>> aggregator would be useful but that's based on my
> 2cents
> >>>> about
> >>>> > > the
> >>>> > > > >>>>>>>> implementation design (I once created a child page
> >>>> describing
> >>>> > > it:
> >>>> > > > >>>>>>>>
> >>>> > > > >>>>>>>
> >>>> > > > >>>>>>
> >>>> > > > >>>>
> >>>> > > > >>
> >>>> > >
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation
> >>>> > > > >>>>>>>> ).
> >>>> > > > >>>>>>>>
> >>>> > > > >>>>>>>>
> >>>> > > > >>>>>>>> Guozhang
> >>>> > > > >>>>>>>>
> >>>> > > > >>>>>>>>
> >>>> > > > >>>>>>>>
> >>>> > > > >>>>>>>>
> >>>> > > > >>>>>>>> On Tue, Jul 14, 2020 at 5:31 AM Bruno Cadonna <
> >>>> > > br...@confluent.io
> >>>> > > > >>>
> >>>> > > > >>>>>>> wrote:
> >>>> > > > >>>>>>>>
> >>>> > > > >>>>>>>>> Hi Leah,
> >>>> > > > >>>>>>>>>
> >>>> > > > >>>>>>>>> Thank you for the KIP!
> >>>> > > > >>>>>>>>>
> >>>> > > > >>>>>>>>> Here is my feedback:
> >>>> > > > >>>>>>>>>
> >>>> > > > >>>>>>>>> 1. The KIP would benefit from some code examples that
> >>>> show how
> >>>> > > > >> to use
> >>>> > > > >>>>>>>>> sliding windows in aggregations.
> >>>> > > > >>>>>>>>>
> >>>> > > > >>>>>>>>> 2. The different sliding windows in Figure 1 and 2 are
> >>>> really
> >>>> > > > >> hard to
> >>>> > > > >>>>>>>>> distinguish. Could you please try to make them
> >>>> graphically
> >>>> > > better
> >>>> > > > >>>>>>>>> distinguishable? You could try to draw the frames of
> >>>> > > consecutive
> >>>> > > > >>>>>>>>> windows shifted to each other.
> >>>> > > > >>>>>>>>>
> >>>> > > > >>>>>>>>> 3. I agree with Matthias, that extending
> >>>> Windows<TimeWindow>
> >>>> > > > >> does not
> >>>> > > > >>>>>>>>> seem to be the best approach. What would be the result
> >>>> of
> >>>> > > > >>>>>>>>> windowsFor()?
> >>>> > > > >>>>>>>>>
> >>>> > > > >>>>>>>>> 4. In the section "Public Interfaces" you should
> remove
> >>>> > > > >>>>>> implementation
> >>>> > > > >>>>>>>>> details like private constructors and private fields.
> >>>> > > > >>>>>>>>>
> >>>> > > > >>>>>>>>> 5. Do we need a new store interface or can we use
> >>>> WindowStore?
> >>>> > > > >> Some
> >>>> > > > >>>>>>>>> words about that would be informative.
> >>>> > > > >>>>>>>>>
> >>>> > > > >>>>>>>>> 6. @Matthias, if the subtrator is not strictly needed,
> >>>> I would
> >>>> > > > >> skip
> >>>> > > > >>>>>> it
> >>>> > > > >>>>>>>>> for now and add it later.
> >>>> > > > >>>>>>>>>
> >>>> > > > >>>>>>>>> 7. I also agree that having a section that describes
> >>>> how to
> >>>> > > > >> handle
> >>>> > > > >>>>>>>>> out-of-order records would be good to understand what
> >>>> is still
> >>>> > > > >>>>>> missing
> >>>> > > > >>>>>>>>> and what we can reuse.
> >>>> > > > >>>>>>>>>
> >>>> > > > >>>>>>>>> Best,
> >>>> > > > >>>>>>>>> Bruno
> >>>> > > > >>>>>>>>>
> >>>> > > > >>>>>>>>> On Sat, Jul 11, 2020 at 9:16 PM Matthias J. Sax <
> >>>> > > > >> mj...@apache.org>
> >>>> > > > >>>>>>> wrote:
> >>>> > > > >>>>>>>>>>
> >>>> > > > >>>>>>>>>> Leah,
> >>>> > > > >>>>>>>>>>
> >>>> > > > >>>>>>>>>> thanks for your update. However, it does not
> >>>> completely answer
> >>>> > > > >> my
> >>>> > > > >>>>>>>>> question.
> >>>> > > > >>>>>>>>>>
> >>>> > > > >>>>>>>>>> In our current window implementations, we emit a
> >>>> window result
> >>>> > > > >>>>>> update
> >>>> > > > >>>>>>>>>> record (ie, early/partial result) for each input
> >>>> record. When
> >>>> > > an
> >>>> > > > >>>>>>>>>> out-of-order record arrives, we just update to
> >>>> corresponding
> >>>> > > old
> >>>> > > > >>>>>>> window
> >>>> > > > >>>>>>>>>> and emit another update.
> >>>> > > > >>>>>>>>>>
> >>>> > > > >>>>>>>>>> It's unclear from the KIP if you propose the same
> emit
> >>>> > > > >> strategy? --
> >>>> > > > >>>>>>> For
> >>>> > > > >>>>>>>>>> sliding windows it might be worth to consider to use
> a
> >>>> > > different
> >>>> > > > >>>>>> emit
> >>>> > > > >>>>>>>>>> strategy and only support emitting the final result
> >>>> only (ie,
> >>>> > > > >> after
> >>>> > > > >>>>>>> the
> >>>> > > > >>>>>>>>>> grace period passed)?
> >>>> > > > >>>>>>>>>>
> >>>> > > > >>>>>>>>>>
> >>>> > > > >>>>>>>>>>
> >>>> > > > >>>>>>>>>> Boyang, also raises a good point that relates to my
> >>>> point from
> >>>> > > > >>>>>> above
> >>>> > > > >>>>>>>>>> about pre-aggregations and storage layout. Our
> current
> >>>> time
> >>>> > > > >> windows
> >>>> > > > >>>>>>> are
> >>>> > > > >>>>>>>>>> all pre-aggregated and stored in parallel. We can
> also
> >>>> lookup
> >>>> > > > >>>>>> windows
> >>>> > > > >>>>>>>>>> efficiently, as we can compute the windowed-key given
> >>>> the
> >>>> > > input
> >>>> > > > >>>>>>> record
> >>>> > > > >>>>>>>>>> key and timestamp based on the window definition.
> >>>> > > > >>>>>>>>>>
> >>>> > > > >>>>>>>>>> However, for sliding windows, window boundaries are
> >>>> data
> >>>> > > > >> dependent
> >>>> > > > >>>>>>> and
> >>>> > > > >>>>>>>>>> thus we cannot compute them upfront. Thus, how can we
> >>>> "find"
> >>>> > > > >>>>>> existing
> >>>> > > > >>>>>>>>>> window efficiently? Furthermore, out-of-order data
> >>>> would
> >>>> > > create
> >>>> > > > >> new
> >>>> > > > >>>>>>>>>> windows in the past and we need to be able to handle
> >>>> this
> >>>> > > case.
> >>>> > > > >>>>>>>>>>
> >>>> > > > >>>>>>>>>> Thus, to handle out-of-order data correctly, we need
> >>>> to store
> >>>> > > > >> all
> >>>> > > > >>>>>> raw
> >>>> > > > >>>>>>>>>> input events. Additionally, we could also store
> >>>> pre-aggregated
> >>>> > > > >>>>>>> results
> >>>> > > > >>>>>>>>>> if we thinks it's benfitial. -- If we apply "emit
> only
> >>>> final
> >>>> > > > >>>>>> results"
> >>>> > > > >>>>>>>>>> strategy, storing pre-aggregated result would not be
> >>>> necessary
> >>>> > > > >>>>>>> though.
> >>>> > > > >>>>>>>>>>
> >>>> > > > >>>>>>>>>>
> >>>> > > > >>>>>>>>>> Btw: for sliding windows it might also be useful to
> >>>> consider
> >>>> > > > >>>>>> allowing
> >>>> > > > >>>>>>>>>> users to supply a `Subtractor` -- this subtractor
> >>>> could be
> >>>> > > > >> applied
> >>>> > > > >>>>>> on
> >>>> > > > >>>>>>>>>> the current window result (in case we store it) if a
> >>>> record
> >>>> > > > >> drops
> >>>> > > > >>>>>>> out of
> >>>> > > > >>>>>>>>>> the window. Of course, not all aggregation functions
> >>>> are
> >>>> > > > >>>>>> subtractable
> >>>> > > > >>>>>>>>>> and we can consider this as a follow up task, too,
> and
> >>>> not
> >>>> > > > >> include
> >>>> > > > >>>>>> in
> >>>> > > > >>>>>>>>>> this KIP for now. Thoughts?
> >>>> > > > >>>>>>>>>>
> >>>> > > > >>>>>>>>>>
> >>>> > > > >>>>>>>>>>
> >>>> > > > >>>>>>>>>> I was also thinking about the type hierarchy. I am
> not
> >>>> sure if
> >>>> > > > >>>>>>> extending
> >>>> > > > >>>>>>>>>> TimeWindow is the best approach? For TimeWindows, we
> >>>> can
> >>>> > > > >>>>>> pre-compute
> >>>> > > > >>>>>>>>>> window boundaries (cf `windowsFor()`) while for a
> >>>> sliding
> >>>> > > window
> >>>> > > > >>>>>> the
> >>>> > > > >>>>>>>>>> boundaries are data dependent. Session windows are
> >>>> also data
> >>>> > > > >>>>>>> dependent
> >>>> > > > >>>>>>>>>> and thus they don't inherit from TimeWindow (Maybe
> >>>> check out
> >>>> > > the
> >>>> > > > >>>>>> KIP
> >>>> > > > >>>>>>>>>> that added session windows? It could provides some
> good
> >>>> > > > >> insights.)
> >>>> > > > >>>>>>> -- I
> >>>> > > > >>>>>>>>>> believe the same rational applies to sliding windows?
> >>>> > > > >>>>>>>>>>
> >>>> > > > >>>>>>>>>>
> >>>> > > > >>>>>>>>>>
> >>>> > > > >>>>>>>>>> -Matthias
> >>>> > > > >>>>>>>>>>
> >>>> > > > >>>>>>>>>>
> >>>> > > > >>>>>>>>>>
> >>>> > > > >>>>>>>>>>
> >>>> > > > >>>>>>>>>> On 7/10/20 12:47 PM, Boyang Chen wrote:
> >>>> > > > >>>>>>>>>>> Thanks Leah and Sophie for the KIP.
> >>>> > > > >>>>>>>>>>>
> >>>> > > > >>>>>>>>>>> 1. I'm a bit surprised that we don't have an advance
> >>>> time.
> >>>> > > > >> Could
> >>>> > > > >>>>>> we
> >>>> > > > >>>>>>>>>>> elaborate how the storage layer is structured?
> >>>> > > > >>>>>>>>>>>
> >>>> > > > >>>>>>>>>>> 2. IIUC, there will be extra cost in terms of
> fetching
> >>>> > > > >>>>>> aggregation
> >>>> > > > >>>>>>>>> results,
> >>>> > > > >>>>>>>>>>> since we couldn't pre-aggregate until the user asks
> >>>> for it.
> >>>> > > > >> Would
> >>>> > > > >>>>>>> be
> >>>> > > > >>>>>>>>> good
> >>>> > > > >>>>>>>>>>> to also discuss it.
> >>>> > > > >>>>>>>>>>>
> >>>> > > > >>>>>>>>>>> 3. We haven't discussed the possibility of
> supporting
> >>>> sliding
> >>>> > > > >>>>>>> windows
> >>>> > > > >>>>>>>>>>> inherently. For a user who actually uses a hopping
> >>>> window,
> >>>> > > > >>>>>> Streams
> >>>> > > > >>>>>>>>> could
> >>>> > > > >>>>>>>>>>> detect such an inefficiency doing a
> >>>> window_size/advance_time
> >>>> > > > >>>>>> ratio
> >>>> > > > >>>>>>> to
> >>>> > > > >>>>>>>>> reach
> >>>> > > > >>>>>>>>>>> a conclusion on whether the write amplification is
> >>>> too high
> >>>> > > > >>>>>>> compared
> >>>> > > > >>>>>>>>> with
> >>>> > > > >>>>>>>>>>> some configured threshold. The benefit of doing so
> is
> >>>> that
> >>>> > > > >>>>>> existing
> >>>> > > > >>>>>>>>> Streams
> >>>> > > > >>>>>>>>>>> users don't need to change their code, learn a new
> >>>> API, but
> >>>> > > > >> only
> >>>> > > > >>>>>> to
> >>>> > > > >>>>>>>>> upgrade
> >>>> > > > >>>>>>>>>>> Streams library to get benefits for their
> inefficient
> >>>> hopping
> >>>> > > > >>>>>>> window
> >>>> > > > >>>>>>>>>>> implementation. There might be some compatibility
> >>>> issues for
> >>>> > > > >>>>>> sure,
> >>>> > > > >>>>>>> but
> >>>> > > > >>>>>>>>>>> worth listing them out for trade-off.
> >>>> > > > >>>>>>>>>>>
> >>>> > > > >>>>>>>>>>> Boyang
> >>>> > > > >>>>>>>>>>>
> >>>> > > > >>>>>>>>>>> On Fri, Jul 10, 2020 at 12:40 PM Leah Thomas <
> >>>> > > > >>>>>> ltho...@confluent.io
> >>>> > > > >>>>>>>>
> >>>> > > > >>>>>>>>> wrote:
> >>>> > > > >>>>>>>>>>>
> >>>> > > > >>>>>>>>>>>> Hey Matthias,
> >>>> > > > >>>>>>>>>>>>
> >>>> > > > >>>>>>>>>>>> Thanks for pointing that out. I added the following
> >>>> to the
> >>>> > > > >>>>>> Propose
> >>>> > > > >>>>>>>>> Changes
> >>>> > > > >>>>>>>>>>>> section of the KIP:
> >>>> > > > >>>>>>>>>>>>
> >>>> > > > >>>>>>>>>>>> "Records that come out of order will be processed
> >>>> the same
> >>>> > > way
> >>>> > > > >>>>>> as
> >>>> > > > >>>>>>>>> in-order
> >>>> > > > >>>>>>>>>>>> records, as long as they fall within the grace
> >>>> period. Any
> >>>> > > new
> >>>> > > > >>>>>>> windows
> >>>> > > > >>>>>>>>>>>> created by the late record will still be created,
> >>>> and the
> >>>> > > > >>>>>> existing
> >>>> > > > >>>>>>>>> windows
> >>>> > > > >>>>>>>>>>>> that are changed by the late record will be
> updated.
> >>>> Any
> >>>> > > > >> record
> >>>> > > > >>>>>>> that
> >>>> > > > >>>>>>>>> falls
> >>>> > > > >>>>>>>>>>>> outside of the grace period (either user defined or
> >>>> default)
> >>>> > > > >>>>>> will
> >>>> > > > >>>>>>> be
> >>>> > > > >>>>>>>>>>>> discarded. "
> >>>> > > > >>>>>>>>>>>>
> >>>> > > > >>>>>>>>>>>> All the best,
> >>>> > > > >>>>>>>>>>>> Leah
> >>>> > > > >>>>>>>>>>>>
> >>>> > > > >>>>>>>>>>>> On Thu, Jul 9, 2020 at 9:47 PM Matthias J. Sax <
> >>>> > > > >>>>>> mj...@apache.org>
> >>>> > > > >>>>>>>>> wrote:
> >>>> > > > >>>>>>>>>>>>
> >>>> > > > >>>>>>>>>>>>> Leah,
> >>>> > > > >>>>>>>>>>>>>
> >>>> > > > >>>>>>>>>>>>> thanks a lot for the KIP. Very well written.
> >>>> > > > >>>>>>>>>>>>>
> >>>> > > > >>>>>>>>>>>>> The KIP does not talk about the handling of
> >>>> out-of-order
> >>>> > > data
> >>>> > > > >>>>>>> though.
> >>>> > > > >>>>>>>>>>>>> How do you propose to address this?
> >>>> > > > >>>>>>>>>>>>>
> >>>> > > > >>>>>>>>>>>>>
> >>>> > > > >>>>>>>>>>>>> -Matthias
> >>>> > > > >>>>>>>>>>>>>
> >>>> > > > >>>>>>>>>>>>> On 7/8/20 5:33 PM, Leah Thomas wrote:
> >>>> > > > >>>>>>>>>>>>>> Hi all,
> >>>> > > > >>>>>>>>>>>>>> I'd like to kick-off the discussion for KIP-450,
> >>>> adding
> >>>> > > > >>>>>> sliding
> >>>> > > > >>>>>>>>> window
> >>>> > > > >>>>>>>>>>>>>> aggregation support to Kafka Streams.
> >>>> > > > >>>>>>>>>>>>>>
> >>>> > > > >>>>>>>>>>>>>>
> >>>> > > > >>>>>>>>>>>>>
> >>>> > > > >>>>>>>>>>>>
> >>>> > > > >>>>>>>>>
> >>>> > > > >>>>>>>
> >>>> > > > >>>>>>
> >>>> > > > >>>>
> >>>> > > > >>
> >>>> > >
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> >>>> > > > >>>>>>>>>>>>>>
> >>>> > > > >>>>>>>>>>>>>> Let me know what you think,
> >>>> > > > >>>>>>>>>>>>>> Leah
> >>>> > > > >>>>>>>>>>>>>>
> >>>> > > > >>>>>>>>>>>>>
> >>>> > > > >>>>>>>>>>>>>
> >>>> > > > >>>>>>>>>>>>
> >>>> > > > >>>>>>>>>>>
> >>>> > > > >>>>>>>>>>
> >>>> > > > >>>>>>>>>
> >>>> > > > >>>>>>>>
> >>>> > > > >>>>>>>>
> >>>> > > > >>>>>>>> --
> >>>> > > > >>>>>>>> -- Guozhang
> >>>> > > > >>>>>>>>
> >>>> > > > >>>>>>>
> >>>> > > > >>>>>>
> >>>> > > > >>>>>
> >>>> > > > >>>>>
> >>>> > > > >>>>
> >>>> > > > >>>>
> >>>> > > > >>>
> >>>> > > > >>
> >>>> > > > >
> >>>> > > >
> >>>> > > >
> >>>> > > > Attachments:
> >>>> > > > * signature.asc
> >>>> > >
> >>>> >
> >>>>
> >>>
>

Reply via email to