Thanks for the nits Matthias, I've updated the examples and language
accordingly.

Leah

On Tue, Jul 28, 2020 at 6:43 PM Matthias J. Sax <mj...@apache.org> wrote:

> Thanks Leah. Overall LGTM.
>
> A few nits:
>
> - the first figure shows window [9,19] but the window is not aligned
> properly (it should be 1ms to the right; right now, it's aligned to
> window [8,18])
>
> - in the second figure, a hopping window would create more windows, ie,
> the first window would be [-6,14) and the last window would be [19,29),
> thus it's not just 10 windows but 26 windows (if I did not miss count)
>
> - "Two new windows will be created by the late record"
>
> late -> out-of-order
>
>
> -Matthias
>
>
>
> On 7/28/20 4:34 PM, Sophie Blee-Goldman wrote:
> > Thanks for the update Leah -- I think that all makes sense.
> >
> > Cheers,
> > Sophie
> >
> > On Tue, Jul 28, 2020 at 3:55 PM Leah Thomas <ltho...@confluent.io>
> wrote:
> >
> >> Another minor tweak, instead of defining the window by the *size*, it
> will
> >> be defined by *timeDifference*, which is the maximum time difference
> >> between two events. This is a more precise way to define a window due to
> >> its inclusive ends, while allowing the user to create the window they
> >> expect. This definition fits with the current examples, where a record
> at
> >> *10* would fall into a window of time difference *5* from *[5,10]*. This
> >> window contains any records at 5, 6, 7, 8, 9, and 10, which is 6
> instances
> >> instead of 5. This semantic difference is why I've shifted *size* to
> >> *timeDifference*.
> >>
> >> The new builder will be *withTimeDifferenceAndGrace*, keeping with other
> >> conventions.
> >>
> >> Let me know if there are any concerns! The vote thread is open as well
> >> here:
> >> http://mail-archives.apache.org/mod_mbox/kafka-dev/202007.mbox/browser
> >>
> >> Best,
> >> Leah
> >>
> >> On Mon, Jul 27, 2020 at 3:58 PM Leah Thomas <ltho...@confluent.io>
> wrote:
> >>
> >>> A small tweak - to make it more clear to users that grace is required,
> as
> >>> well as cleaning up some of the confusing grammar semantics of windows,
> >> the
> >>> main builder method for *slidingWindows* will be *withSizeAndGrace*
> >> instead
> >>> of *of*.  This looks like it'll be the last change (for now) on the
> >>> public API. If anyone has any comments or thoughts let me know.
> >> Otherwise,
> >>> I'll take this to vote shortly.
> >>>
> >>> Best,
> >>> Leah
> >>>
> >>> On Fri, Jul 24, 2020 at 3:45 PM Leah Thomas <ltho...@confluent.io>
> >> wrote:
> >>>
> >>>> To accommodate the change to a final class, I've added another
> >>>> *windowedBy()* function in *CogroupedKStream.java *to handle
> >>>> SlidingWindows.
> >>>>
> >>>> As far as the discussion goes, I think this is the last change we've
> >>>> talked about. If anyone has other comments or concerns, please let me
> >> know!
> >>>>
> >>>> Cheers,
> >>>> Leah
> >>>>
> >>>> On Thu, Jul 23, 2020 at 7:34 PM Leah Thomas <ltho...@confluent.io>
> >> wrote:
> >>>>
> >>>>> Thanks for the discussion about extending TimeWindows. I agree that
> >>>>> making it future proof is important, and the implementation of
> >>>>> SlidingWindows is unique enough that it seems logical to make it its
> >> own
> >>>>> final class.
> >>>>>
> >>>>> On that note, I've updated the KIP to make SlidingWindows a stand
> alone
> >>>>> final class, and added the *windowedBy() *API in *KGroupedStream *to
> >>>>> handle SlidingWindows. It seems that SlidingWindows would still be
> >> able to
> >>>>> leverage *TimeWindowedKStream by* creating a SlidingWindows version
> of
> >>>>> *TimeWindowedKStreamImpl* that implements *TimeWindowedKStream. *If
> >>>>> anyone sees issues with this implementation, please let me know.
> >>>>>
> >>>>> Best,
> >>>>> Leah
> >>>>>
> >>>>> On Wed, Jul 22, 2020 at 10:47 PM John Roesler <vvcep...@apache.org>
> >>>>> wrote:
> >>>>>
> >>>>>> Thanks for the reply, Sophie.
> >>>>>>
> >>>>>> That all sounds about right to me.
> >>>>>>
> >>>>>> The Windows “interface”/algorithm is quite flexible, so it makes
> sense
> >>>>>> for it to be extensible. Different implementations can (and do)
> >> enumerate
> >>>>>> different windows to suit different use cases.
> >>>>>>
> >>>>>> On the other hand, I can’t think of any way to extend SessionWindows
> >> to
> >>>>>> do something different using the same algorithm, so it makes sense
> >> for it
> >>>>>> to stay final.
> >>>>>>
> >>>>>> If we think SlidingWindows is similarly not usefully extensible,
> then
> >>>>>> we can make it final. It’s easy to remove final later, but adding it
> >> is not
> >>>>>> possible. Or we could go the other route and just make it an
> >> interface, on
> >>>>>> general principle. Both of these choices are safe API design.
> >>>>>>
> >>>>>> Thanks again,
> >>>>>> John
> >>>>>>
> >>>>>> On Wed, Jul 22, 2020, at 21:54, Sophie Blee-Goldman wrote:
> >>>>>>>>
> >>>>>>>> Users could pass in a custom `SessionWindows` as
> >>>>>>>> long as the session algorithm works correctly for it.
> >>>>>>>
> >>>>>>>
> >>>>>>> Well not really, SessionWindows is a final class. TimeWindows is
> >> also
> >>>>>> a
> >>>>>>> final class, so neither of these can be extended/customized. For a
> >>>>>> given
> >>>>>>> Windows then there would only be three (non-overlapping)
> >>>>>> possibilities:
> >>>>>>> either it's TimeWindows, SlidingWindows, or a custom  Windows. I
> >> don't
> >>>>>>> think there's any problem with determining what the user wants in
> >>>>>> this case
> >>>>>>> --
> >>>>>>> we would just check if it's a SlidingWindows and connect the new
> >>>>>> processor,
> >>>>>>> or else connect the existing hopping/tumbling window processor.
> >>>>>>>
> >>>>>>> I'll admit that last sentence does leave a bad taste in my mouth.
> >>>>>> Part of
> >>>>>>> that
> >>>>>>> is probably the "leaking" API Matthias pointed out; we just assume
> >> the
> >>>>>>> hopping/tumbling window implementation fits all custom windows. But
> >> I
> >>>>>> guess
> >>>>>>> if you really needed to customize the algorithm any further you may
> >>>>>> as well
> >>>>>>> stick in a transformer and do it all yourself.
> >>>>>>>
> >>>>>>> Anyways, given what we have, it does seem weird to apply one
> >> algorithm
> >>>>>>> for most Windows types and then swap in a different one for one
> >>>>>> specific
> >>>>>>> extension of Windows. So adding a new #windowedBy(SlidingWindows)
> >>>>>>> sounds reasonable to me.
> >>>>>>>
> >>>>>>> I'm still not convinced that we need a whole new
> TimeWindowedKStream
> >>>>>>> equivalent class for sliding windows though. It seems like the
> >>>>>>> hopping/tumbling
> >>>>>>> window implementation could benefit just as much from a subtractor
> >> as
> >>>>>> the
> >>>>>>> sliding windows so the only future-proofing we get is the ability
> to
> >>>>>> be
> >>>>>>> lazy and
> >>>>>>> add the subtractor to one but not the other. Of course it would
> only
> >>>>>> be an
> >>>>>>> optimization so we could just not apply it to one type and nothing
> >>>>>> would
> >>>>>>> break.
> >>>>>>> It does make me nervous to go against the "future-proof" direction,
> >>>>>> though.
> >>>>>>> Are there any other examples of things we might want to add to one
> >>>>>> window
> >>>>>>> type but not to another?
> >>>>>>>
> >>>>>>> On another note, this actually brings up a new question: should
> >>>>>>> SlidingWindows
> >>>>>>> also be final? My take is "yes" since there's no reasonable
> >>>>>> customization of
> >>>>>>> sliding windows, at least not that I can think of. Thoughts?
> >>>>>>>
> >>>>>>>
> >>>>>>> On Wed, Jul 22, 2020 at 7:15 PM John Roesler <vvcep...@apache.org>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Thanks, all,
> >>>>>>>>
> >>>>>>>> I can see how my conclusion was kind of a leap.
> >>>>>>>>
> >>>>>>>> What Matthias said is indeed what I was thinking. When you
> >> provide a
> >>>>>>>> window definition to the windowBy() method, you are selecting an
> >>>>>> algorithm
> >>>>>>>> that will be used to compute the windows from the input data.
> >>>>>>>>
> >>>>>>>> I didn’t mean the code implementation  “algorithm”, but the
> >>>>>> high-level
> >>>>>>>> algorithm that describes how the input stream will be transformed
> >>>>>> into a
> >>>>>>>> sequence of windows.
> >>>>>>>>
> >>>>>>>> For example, the algorithm for Windows is something like “given
> >> the
> >>>>>> record
> >>>>>>>> timestamp, include the record in each of the enumerated windows”.
> >>>>>> Note that
> >>>>>>>> there can be a lot of variation in how the windows are enumerated,
> >>>>>> which is
> >>>>>>>> why there are at least a couple of implementations of Windows, and
> >>>>>> we are
> >>>>>>>> open to adding more (like for natural calendar boundaries).
> >>>>>>>>
> >>>>>>>> For SessionWindows, it’s more like “if any window is within the
> >> gap,
> >>>>>>>> extend its boundaries to include this record and if two windows
> >>>>>> touch, then
> >>>>>>>> merge them”.
> >>>>>>>>
> >>>>>>>> Clearly, the algorithm for SlidingWindows doesn’t fall into either
> >>>>>>>> category, so it seems inappropriate to claim that it does in the
> >>>>>> API (by
> >>>>>>>> implementing Windows with stubbed methods) and then cast
> >> internally
> >>>>>> to
> >>>>>>>> execute a completely different algorithm.
> >>>>>>>>
> >>>>>>>> To your other observation, that the DSL object resulting from
> >>>>>> windowBy
> >>>>>>>> would look the same for Windows and SessionWindows, maybe it makes
> >>>>>> sense
> >>>>>>>> for windowBy(SessionWindows) also to return a TimeWindowedKStream.
> >>>>>>>>
> >>>>>>>> i.e.:
> >>>>>>>> =======================
> >>>>>>>> <W extends Window> TimeWindowedKStream<K, V> windowedBy(final
> >>>>>> Windows<W>
> >>>>>>>> windows);
> >>>>>>>> TimeWindowedKStream<K, V> windowedBy(final SlidingWindows
> >> windows);
> >>>>>>>> =======================
> >>>>>>>>
> >>>>>>>>  I can’t think of a reason this wouldn’t work. But then again, it
> >>>>>> would be
> >>>>>>>> more future-proof to go ahead and specify a different return type
> >>>>>> now, if
> >>>>>>>> we think we'll want to add subtractors and stuff later. I don't
> >>>>>> have a
> >>>>>>>> strong feeling about that part of the API. It seems to be
> >>>>>> independent of
> >>>>>>>> whether SlidingWindows extends Windows or not.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> -John
> >>>>>>>>
> >>>>>>>> On Wed, Jul 22, 2020, at 19:41, Matthias J. Sax wrote:
> >>>>>>>>> I think what John tries to say is the following:
> >>>>>>>>>
> >>>>>>>>> We have `windowedBy(Windows)` that accept hopping/tumbling
> >>>>>> windows but
> >>>>>>>>> also custom window and we use a specific algorithm. Note, that
> >>>>>> custom
> >>>>>>>>> windows must "work" based on the used algorithm.
> >>>>>>>>>
> >>>>>>>>> For session windows we have `windowedBy(SessionWindows)` and
> >>>>>> apply a
> >>>>>>>>> different algorithm. Users could pass in a custom
> >>>>>> `SessionWindows` as
> >>>>>>>>> long as the session algorithm works correctly for it.
> >>>>>>>>>
> >>>>>>>>> For the new sliding windows, we want to use a different
> >> algorithm
> >>>>>>>>> compare to hopping/tumbling windows. If we let sliding window
> >>>>>> extend
> >>>>>>>>> `Windows`, we can decide at runtime if we need to use the
> >>>>>>>>> hopping/tumbling window algorithm for hopping/tumbling windows
> >> or
> >>>>>> the
> >>>>>>>>> new sliding window algorithm for sliding windows. However, if we
> >>>>>> get a
> >>>>>>>>> custom window, which algorithm do we pick now? The existing
> >>>>>>>>> tumbling/hopping window algorithm of the new sliding window
> >>>>>> algorithm?
> >>>>>>>>> Both a custom "time-window" and custom "sliding window"
> >> implement
> >>>>>> the
> >>>>>>>>> generic `Windows` class and thus we cannot make a decision as we
> >>>>>> don't
> >>>>>>>>> know the user's intent.
> >>>>>>>>>
> >>>>>>>>> As a matter of fact, even if the user might not be aware of it,
> >>>>>> the
> >>>>>>>>> algorithm we use does already leak into the API (if a user
> >> extends
> >>>>>>>>> `Windows` is must work with our hopping/tumbling window
> >> algorithm
> >>>>>> and if
> >>>>>>>>> a user extends `SessionWindows` it must work with our session
> >>>>>> algorithm)
> >>>>>>>>> and it seems we need to preserve this property for sliding
> >> window.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -Matthias
> >>>>>>>>>
> >>>>>>>>> On 7/22/20 4:35 PM, Sophie Blee-Goldman wrote:
> >>>>>>>>>> Hey John,
> >>>>>>>>>>
> >>>>>>>>>> Just a few follow-up questions/comments about the whole
> >> Windows
> >>>>>> thing:
> >>>>>>>>>>
> >>>>>>>>>> That's a good way of looking at things; in particular the
> >> point
> >>>>>> about
> >>>>>>>>>> SessionWindows
> >>>>>>>>>> for example requiring a Merger while other "statically
> >>>>>> enumerable"
> >>>>>>>> windows
> >>>>>>>>>> require
> >>>>>>>>>> only an adder seems to touch on the heart of the matter.
> >>>>>>>>>>
> >>>>>>>>>>  It seems like what Time and Universal (and any other Windows
> >>>>>>>>>>> implementation) have in common is that the windows are
> >>>>>> statically
> >>>>>>>>>>> enumerable.
> >>>>>>>>>>> As a consequence, they can all rely on an aggregation
> >>>>>> maintenence
> >>>>>>>> algorithm
> >>>>>>>>>>> that involves enumerating each of the windows and updating
> >> it.
> >>>>>> That
> >>>>>>>>>>> also means that their DSL object (TimeWindowedKStream)
> >> doesn't
> >>>>>> need
> >>>>>>>>>>> "subtractors" or "mergers", but only "adders"; again, this
> >> is a
> >>>>>>>> consequence
> >>>>>>>>>>> of the fact that the windows are enumerable.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Given that, I'm a bit confused why you conclude that sliding
> >>>>>> windows
> >>>>>>>> are
> >>>>>>>>>> fundamentally
> >>>>>>>>>> different from the "statically enumerable" windows -- sliding
> >>>>>> windows
> >>>>>>>>>> require only an
> >>>>>>>>>> adder too. I'm not sure it's a consequence of being
> >> enumerable,
> >>>>>> or that
> >>>>>>>>>> being enumerable
> >>>>>>>>>> is the fundamental property that unites all Windows (ignoring
> >>>>>>>> JoinWindows
> >>>>>>>>>> here). Yes,  it
> >>>>>>>>>> currently does apply to all Windows implementations, but we
> >>>>>> shouldn't
> >>>>>>>>>> assume that it
> >>>>>>>>>> *has *to be that way on the basis that it currently happens to
> >>>>>> be.
> >>>>>>>>>>
> >>>>>>>>>> Also, the fact that they can all rely on the same aggregation
> >>>>>> algorithm
> >>>>>>>>>> seems like an
> >>>>>>>>>> implementation detail and it would be weird to force a
> >>>>>> separate/new
> >>>>>>>> DSL API
> >>>>>>>>>> just because
> >>>>>>>>>> under the covers we swap in a different processor.
> >>>>>>>>>>
> >>>>>>>>>> To be fair, I don't think there's a strong reason *against*
> >> not
> >>>>>>>> extending
> >>>>>>>>>> Windows -- in the end
> >>>>>>>>>> it will just mean adding a new #windowedBy method and
> >>>>>> copy/pasting
> >>>>>>>>>> everything from
> >>>>>>>>>>  TimeWindowedKStream pretty much word for word. But anytime
> >> you
> >>>>>> find
> >>>>>>>>>> yourself
> >>>>>>>>>> copying over code almost exactly, there should probably be a
> >>>>>> good
> >>>>>>>> reason
> >>>>>>>>>> why :)
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Jul 22, 2020 at 3:48 PM John Roesler <
> >>>>>> vvcep...@apache.org>
> >>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Thanks Leah!
> >>>>>>>>>>>
> >>>>>>>>>>> 5) Regarding the empty windows, I'm wondering if we should
> >>>>>> simply
> >>>>>>>> propose
> >>>>>>>>>>> that the windows should not be emitted downstream of the
> >>>>>> operator or
> >>>>>>>>>>> visible in IQ. Then, it'll be up to the implementation to
> >> make
> >>>>>> it
> >>>>>>>> happen.
> >>>>>>>>>>> I'm
> >>>>>>>>>>> personally not concerned about it, since it seems like there
> >>>>>> are
> >>>>>>>> multiple
> >>>>>>>>>>> ways to accomplish this.
> >>>>>>>>>>>
> >>>>>>>>>>> Note, the discrepancy Matthias pointed out is actually a
> >>>>>> design bug.
> >>>>>>>> The
> >>>>>>>>>>> windowed aggregation (like every operation in Streams)
> >>>>>> produces a
> >>>>>>>> "view",
> >>>>>>>>>>> which then forms the basis of downstream operations. When we
> >>>>>> pass the
> >>>>>>>>>>> Materialized option to the operation, all we're doing is
> >>>>>> saying to
> >>>>>>>>>>> "materialize"
> >>>>>>>>>>> the view (aka, actually store the computed view) and also
> >> make
> >>>>>> it
> >>>>>>>>>>> queriable.
> >>>>>>>>>>> It would be illegal for the "queriable, materialized view" to
> >>>>>> differ
> >>>>>>>> in
> >>>>>>>>>>> any way
> >>>>>>>>>>> from the "view". So, it seems we must either propose to emit
> >>>>>> the empty
> >>>>>>>>>>> windows AND make them visible in IQ, or propose NOT to emit
> >>>>>> the empty
> >>>>>>>>>>> windows AND NOT make them visible in IQ.
> >>>>>>>>>>>
> >>>>>>>>>>> 7) Regarding whether we can extend TimeWindows (or Windows):
> >>>>>>>>>>> I've been mulling this over more. I think it's worth asking
> >> the
> >>>>>>>> question of
> >>>>>>>>>>> what these classes even mean. For example, why is
> >>>>>> SessionWindows a
> >>>>>>>>>>> different thing from TimeWindows and UniversalWindows (which
> >>>>>> are both
> >>>>>>>>>>> Windows)?
> >>>>>>>>>>>
> >>>>>>>>>>> This conversation is extra complicated because of the
> >>>>>> incomplete and
> >>>>>>>>>>> mis-matched class hierarchy, but we can try to look past it
> >>>>>> for now.
> >>>>>>>>>>>
> >>>>>>>>>>> It seems like what Time and Universal (and any other Windows
> >>>>>>>>>>> implementation) have in common is that the windows are
> >>>>>> statically
> >>>>>>>>>>> enumerable.
> >>>>>>>>>>> As a consequence, they can all rely on an aggregation
> >>>>>> maintenence
> >>>>>>>> algorithm
> >>>>>>>>>>> that involves enumerating each of the windows and updating
> >> it.
> >>>>>> That
> >>>>>>>>>>> also means that their DSL object (TimeWindowedKStream)
> >> doesn't
> >>>>>> need
> >>>>>>>>>>> "subtractors" or "mergers", but only "adders"; again, this
> >> is a
> >>>>>>>> consequence
> >>>>>>>>>>> of the fact that the windows are enumerable.
> >>>>>>>>>>>
> >>>>>>>>>>> In contrast, session windows are data driven, so they are not
> >>>>>>>> statically
> >>>>>>>>>>> enumerable. Their algorithm has to rely on scans, and to do
> >>>>>> the scans,
> >>>>>>>>>>> it needs to know the "inactivity gap", which needs to be part
> >>>>>> of the
> >>>>>>>> window
> >>>>>>>>>>> definition. Likewise, session windows have the property that
> >>>>>> they need
> >>>>>>>>>>> to be merged, so their DSL object also requires mergers.
> >>>>>>>>>>>
> >>>>>>>>>>> It really seems like your new window definition doesn't fit
> >>>>>> into
> >>>>>>>> either
> >>>>>>>>>>> category. It uses a different algorithm, which relies on
> >>>>>> scans, but
> >>>>>>>> it is
> >>>>>>>>>>> also fixed in size, so it doesn't need mergers. In this
> >>>>>> situation, it
> >>>>>>>> seems
> >>>>>>>>>>> like the safe bet is to just create SessionWindows with no
> >>>>>> interface
> >>>>>>>> and
> >>>>>>>>>>> add a separate set of DSL operations and objects. It's a
> >>>>>> little extra
> >>>>>>>> code,
> >>>>>>>>>>> but it seems to keep everything tidier and more
> >>>>>> comprehensible, both
> >>>>>>>>>>> for us and for users.
> >>>>>>>>>>>
> >>>>>>>>>>> What do you think?
> >>>>>>>>>>> -John
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, Jul 22, 2020, at 10:30, Leah Thomas wrote:
> >>>>>>>>>>>> Hi Matthias,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for the suggestions, I've updated the KIP and child
> >>>>>> page
> >>>>>>>>>>> accordingly
> >>>>>>>>>>>> and addressed some below.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1) For the mandatory grace period, we should use a static
> >>>>>> builder
> >>>>>>>> method
> >>>>>>>>>>>>> that take two parameters.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>  That makes sense, I've changed that in the public API.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Btw: this implementation actually raises an issue for IQ:
> >>>>>> those empty
> >>>>>>>>>>>>> windows would be returned.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> This is a great point, with the current implementation plan
> >>>>>> empty
> >>>>>>>> windows
> >>>>>>>>>>>> would be returned. I think creating a second window store
> >>>>>> would
> >>>>>>>>>>> definitely
> >>>>>>>>>>>> work, but there would be more overhead in having two stores
> >>>>>> and
> >>>>>>>> switching
> >>>>>>>>>>>> windows between the stores, as well as doing scans in both
> >>>>>> stores to
> >>>>>>>> find
> >>>>>>>>>>>> existing windows. There might be a way to do avoid emitting
> >>>>>> empty
> >>>>>>>> windows
> >>>>>>>>>>>> without creating a second window store, I'll look more into
> >>>>>> it.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Cheers,
> >>>>>>>>>>>> Leah
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Tue, Jul 21, 2020 at 1:25 PM Matthias J. Sax <
> >>>>>> mj...@apache.org>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for updating the KIP.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Couple of follow up comments:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1) For the mandatory grace period, we should use a static
> >>>>>> builder
> >>>>>>>>>>> method
> >>>>>>>>>>>>> that take two parameters. This provides a better API as
> >> user
> >>>>>> cannot
> >>>>>>>>>>>>> forget to set the grace period. Throwing a runtime
> >> exception
> >>>>>> seems
> >>>>>>>> not
> >>>>>>>>>>>>> to be the best way to handle this case.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2) In Fig.2 you list 10 hopping windows. I believe it
> >> should
> >>>>>>>> actually
> >>>>>>>>>>> be
> >>>>>>>>>>>>> more? There first hopping window would be [-6,-4[ and the
> >>>>>> last one
> >>>>>>>>>>> would
> >>>>>>>>>>>>> be from [19,29[ -- hence, the cost saving are actually much
> >>>>>> higher.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 3a) IQ: you are saying that the user need to compute the
> >>>>>> start time
> >>>>>>>> as
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> windowSize+the time they're looking at
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Should this be "targetTime - windowSize" instead?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 3b) IQ: in you example you say "window size of 10 minutes"
> >>>>>> with an
> >>>>>>>>>>>>> incident at 9:15.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> they're looking for a window with the start time of 8:15.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The example does not seem to add up?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 4) For "Processing Windows": you describe a three step
> >>>>>> approach: I
> >>>>>>>> just
> >>>>>>>>>>>>> want to point out, that step (1) is not necessary for each
> >>>>>> input
> >>>>>>>>>>> record,
> >>>>>>>>>>>>> because timestamps are not guaranteed to be unique and
> >> thus a
> >>>>>>>> previous
> >>>>>>>>>>>>> record with the same key and timestamp might have create
> >> the
> >>>>>> windows
> >>>>>>>>>>>>> already.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Nit: I am also not exactly sure what you mean by step (3)
> >> as
> >>>>>> you use
> >>>>>>>>>>> the
> >>>>>>>>>>>>> word "send". I guess you mean "put"?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> It seem there are actually more details in the sub-page:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> A new record for SlidingWindows will always create two new
> >>>>>> windows.
> >>>>>>>>>>> If
> >>>>>>>>>>>>> either of those windows already exist in the windows store,
> >>>>>> their
> >>>>>>>>>>>>> aggregation will simply be updated to include the new
> >>>>>> record, but no
> >>>>>>>>>>>>> duplicate window will be added to the WindowStore.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> However, the first and second sentence contradict each
> >> other
> >>>>>> a
> >>>>>>>> little
> >>>>>>>>>>>>> bit. I think the first sentence is not correct.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Nit:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> For in-order records, the left window will always be
> >> empty.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> This should be "right window" ?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 5) "Emitting Results": it might be worth to point out,
> >> that a
> >>>>>>>>>>>>> second/future window of a new record is create with no
> >>>>>> records, and
> >>>>>>>>>>>>> thus, even if it's initialized it won't be emitted. Only
> >> if a
> >>>>>>>>>>>>> consecutive record falls into the window, the window would
> >> be
> >>>>>>>> updates
> >>>>>>>>>>>>> and the window result (for a window content of one record)
> >>>>>> would be
> >>>>>>>>>>> sent
> >>>>>>>>>>>>> downstream.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Again, the sub-page contains this details. Might still be
> >>>>>> worth to
> >>>>>>>> add
> >>>>>>>>>>>>> to the top level page, too.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Btw: this implementation actually raises an issue for IQ:
> >>>>>> those
> >>>>>>>> empty
> >>>>>>>>>>>>> windows would be returned. Thus I am wondering if we need
> >> to
> >>>>>> use two
> >>>>>>>>>>>>> stores internally? One store for actual windows and one
> >>>>>> store for
> >>>>>>>> empty
> >>>>>>>>>>>>> windows? If an empty window is updated, it's move to the
> >>>>>> other
> >>>>>>>> store?
> >>>>>>>>>>>>> For IQ, we only allow to query the non-empty-window store?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 6) On the sub-page:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> The left window of in-order records and both windows for
> >>>>>>>> out-of-order
> >>>>>>>>>>>>> records need to be updated with the values of records that
> >>>>>> have
> >>>>>>>> already
> >>>>>>>>>>>>> been processed.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Why "both windows for out-of-order records"? IMHO, we don't
> >>>>>> know how
> >>>>>>>>>>>>> many existing windows needs to be updated when processing
> >> an
> >>>>>>>>>>>>> out-of-order record. Of course, an out-of-order record
> >> could
> >>>>>> not
> >>>>>>>> fall
> >>>>>>>>>>>>> into any existing window but create two new windows, too.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>  Because each record creates one new window that includes
> >>>>>> itself
> >>>>>>>> and
> >>>>>>>>>>> one
> >>>>>>>>>>>>> window that does not
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> As state above, this does not seem to hold. I understand
> >> why
> >>>>>> you
> >>>>>>>> mean,
> >>>>>>>>>>>>> but it would be good to be exact.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Figure 2: You use the term "late" but you mean
> >>>>>> "out-of-order" I
> >>>>>>>> guess
> >>>>>>>>>>> --
> >>>>>>>>>>>>> a record is _late_ if it's not processed any longer as the
> >>>>>> grace
> >>>>>>>> period
> >>>>>>>>>>>>> passed already.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Figure 2: "Late" should be out-or-order. The example text
> >>>>>> say a
> >>>>>>>> window
> >>>>>>>>>>>>> [16,26] should be created but the figure shows the green
> >>>>>> window as
> >>>>>>>>>>> [15,20].
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> About the blue window: maybe add not that the blue window
> >>>>>> contains
> >>>>>>>> the
> >>>>>>>>>>>>> aggregate we need for the green window, _before_ the new
> >>>>>> record `a`
> >>>>>>>> is
> >>>>>>>>>>>>> added to the blue window.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 7) I am not really happy to extend TimeWindows and I think
> >>>>>> the
> >>>>>>>> argument
> >>>>>>>>>>>>> about JoinWindows is not the best (IMHO, JoinWindows do it
> >>>>>> already
> >>>>>>>>>>> wrong
> >>>>>>>>>>>>> and we just repeat the same mistake). However, it seems our
> >>>>>> window
> >>>>>>>>>>>>> hierarchy is "broken" already and it might be out of scope
> >>>>>> for this
> >>>>>>>> KIP
> >>>>>>>>>>>>> to fix it. Hence, I am ok that we bite the bullet for now
> >>>>>> and clean
> >>>>>>>> it
> >>>>>>>>>>>>> up later.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 7/20/20 5:18 PM, Guozhang Wang wrote:
> >>>>>>>>>>>>>> Hi Leah,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for the updated KIP. I agree that extending
> >>>>>> SlidingWindows
> >>>>>>>>>>> from
> >>>>>>>>>>>>>> Windows is fine for the sake of not introducing more
> >> public
> >>>>>> APIs
> >>>>>>>> (and
> >>>>>>>>>>>>> their
> >>>>>>>>>>>>>> internal xxxImpl classes), and its cons is small enough to
> >>>>>> tolerate
> >>>>>>>>>>> to
> >>>>>>>>>>>>> me.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Mon, Jul 20, 2020 at 1:49 PM Leah Thomas <
> >>>>>> ltho...@confluent.io>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for the feedback on the KIP. I've updated the KIP
> >>>>>> page
> >>>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> to address these points and have created a child page
> >>>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> to go more in depth on certain implementation details.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> *Grace Period:*
> >>>>>>>>>>>>>>> I think Sophie raises a good point that the default grace
> >>>>>> period
> >>>>>>>> of
> >>>>>>>>>>> 24
> >>>>>>>>>>>>>>> hours is often too long and was chosen when retention
> >> time
> >>>>>> and
> >>>>>>>> grace
> >>>>>>>>>>>>> period
> >>>>>>>>>>>>>>> were the same. For SlidingWindows, I propose we make the
> >>>>>> grace
> >>>>>>>>>>> period
> >>>>>>>>>>>>>>> mandatory. To keep formatting consistent with other types
> >>>>>> of
> >>>>>>>>>>> windows,
> >>>>>>>>>>>>> grace
> >>>>>>>>>>>>>>> period won't be an additional parameter in the #of
> >> method,
> >>>>>> but
> >>>>>>>> will
> >>>>>>>>>>>>> still
> >>>>>>>>>>>>>>> look like it does in other use cases:
> >>>>>>>>>>>>>>>
> >>>>>> .windowedBy(SlidingWindows.of(twentySeconds).grace(fiftySeconds).
> >>>>>>>> If
> >>>>>>>>>>>>> grace
> >>>>>>>>>>>>>>> period isn't properly initialized, an error will be
> >> thrown
> >>>>>> through
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>> process method.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> *Storage Layer + Aggregation:*
> >>>>>>>>>>>>>>> SlidingWindows will use a WindowStore because computation
> >>>>>> can be
> >>>>>>>>>>> done
> >>>>>>>>>>>>> with
> >>>>>>>>>>>>>>> the information stored in a WindowStore (window timestamp
> >>>>>> and
> >>>>>>>>>>> value).
> >>>>>>>>>>>>> Using
> >>>>>>>>>>>>>>> the WindowStore also simplifies computation as
> >>>>>> SlidingWindows can
> >>>>>>>>>>>>> leverage
> >>>>>>>>>>>>>>> existing processes. Because we are using a WindowStore,
> >> the
> >>>>>>>>>>> aggregation
> >>>>>>>>>>>>>>> process will be similar to that of a hopping window. As
> >>>>>> records
> >>>>>>>>>>> come in
> >>>>>>>>>>>>>>> their value is added to the aggregation that already
> >>>>>> exists,
> >>>>>>>>>>> following
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> same procedure as hopping windows. The aggregation
> >>>>>> difference
> >>>>>>>>>>> between
> >>>>>>>>>>>>>>> SlidingWindows and HoppingWindows comes in creating new
> >>>>>> windows
> >>>>>>>> for
> >>>>>>>>>>> a
> >>>>>>>>>>>>>>> SlidingWindow, where you need to find the existing
> >> records
> >>>>>> that
> >>>>>>>>>>> belong
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>> the new window. This computation is similar to the
> >>>>>> aggregation in
> >>>>>>>>>>>>>>> SessionWindows and requires a scan to the WindowStore to
> >>>>>> find the
> >>>>>>>>>>> window
> >>>>>>>>>>>>>>> with the aggregation needed, which will always be
> >>>>>> pre-computed.
> >>>>>>>> The
> >>>>>>>>>>> scan
> >>>>>>>>>>>>>>> requires creating an iterator, but should have minimal
> >>>>>> performance
> >>>>>>>>>>>>> effects
> >>>>>>>>>>>>>>> as this strategy is already implemented in
> >> SessionWindows.
> >>>>>> More
> >>>>>>>>>>> details
> >>>>>>>>>>>>> on
> >>>>>>>>>>>>>>> finding the aggregation that needs to be put in a new
> >>>>>> window can
> >>>>>>>> be
> >>>>>>>>>>>>> found
> >>>>>>>>>>>>>>> on the implementation page
> >>>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> .
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> *Extending Windows<TimeWindow>, Windows<Window> or
> >> nothing*
> >>>>>>>>>>>>>>> Because SlidingWindows are still defined by a windowSize
> >>>>>> (whereas
> >>>>>>>>>>>>>>> SessionWindows are defined purely by data), I think it
> >>>>>> makes sense
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>> leverage the existing Window processes instead of
> >> creating
> >>>>>> a new
> >>>>>>>>>>> store
> >>>>>>>>>>>>> type
> >>>>>>>>>>>>>>> that would be very similar to the WindowStore. While it's
> >>>>>> true
> >>>>>>>> that
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>> #windowsFor method isn't necessary for SlidingWindows,
> >>>>>> JoinWindows
> >>>>>>>>>>> also
> >>>>>>>>>>>>>>> extends Windows<Window> and throws an
> >>>>>>>> UnsupportedOperationException
> >>>>>>>>>>> in
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> #windowsFor method, which is what SlidingWindows can do.
> >>>>>> The
> >>>>>>>>>>> difference
> >>>>>>>>>>>>>>> between extending Windows<TimeWindow> or Windows<Window>
> >> is
> >>>>>>>>>>> minimal, as
> >>>>>>>>>>>>>>> both are ways to pass window parameters. Extending
> >>>>>>>>>>> Windows<TimeWindow>
> >>>>>>>>>>>>> will
> >>>>>>>>>>>>>>> give us more leverage in utilizing existing processes.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> *Emit Strategy*
> >>>>>>>>>>>>>>> I would argue that emitting for every update is still the
> >>>>>> best way
> >>>>>>>>>>> to go
> >>>>>>>>>>>>>>> for SlidingWindows because it mimics the other types of
> >>>>>> windows,
> >>>>>>>> and
> >>>>>>>>>>>>>>> suppression can be leveraged to limit what SlidingWindows
> >>>>>> emits.
> >>>>>>>>>>> While
> >>>>>>>>>>>>> some
> >>>>>>>>>>>>>>> users may only want to see the last value, others may
> >> want
> >>>>>> to see
> >>>>>>>>>>> more,
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>>>> leaving the emit strategy to emit partial results allows
> >>>>>> both
> >>>>>>>> users
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>> access what they want.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> *Additional Features*
> >>>>>>>>>>>>>>> Supporting sliding windows inherently, and shifting
> >>>>>> inefficient
> >>>>>>>>>>> hopping
> >>>>>>>>>>>>>>> windows to sliding windows, is an interesting idea and
> >>>>>> could be
> >>>>>>>>>>> built on
> >>>>>>>>>>>>>>> top of SlidingWindows when they are finished, but right
> >>>>>> now seems
> >>>>>>>>>>> out of
> >>>>>>>>>>>>>>> scope for the needs of this KIP. Similarly, including a
> >>>>>>>>>>> `subtraction`
> >>>>>>>>>>>>>>> feature could have performance improvements, but doesn't
> >>>>>> seem
> >>>>>>>>>>> necessary
> >>>>>>>>>>>>> for
> >>>>>>>>>>>>>>> the implementation of this KIP.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Let me know what you think of the updates,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Leah
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Thu, Jul 16, 2020 at 11:57 AM John Roesler <
> >>>>>>>> vvcep...@apache.org>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hello all,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks for the KIP, Leah!
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Regarding (1): I'd go farther actually. Making Windows
> >> an
> >>>>>>>> abstract
> >>>>>>>>>>>>>>>> class was a mistake from the beginning that led to us
> >> not
> >>>>>> being
> >>>>>>>>>>>>>>>> able to fix a very confusing situation for users around
> >>>>>> retention
> >>>>>>>>>>>>> times,
> >>>>>>>>>>>>>>>> final results emitting, etc. Thus, I would not suggest
> >>>>>> extending
> >>>>>>>>>>>>>>>> TimeWindows for sure, but would also not suggest
> >> extending
> >>>>>>>> Windows.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> The very simplest thing to do is follow the example of
> >>>>>>>>>>> SessionWindows,
> >>>>>>>>>>>>>>>> which is just a completely self-contained class. If we
> >>>>>> don't mess
> >>>>>>>>>>> with
> >>>>>>>>>>>>>>>> class inheritance, we won't ever have any of the
> >> problems
> >>>>>> related
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>>> class inheritance. This is my preferred solution.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Still, Sliding windows has a lot in common with
> >>>>>> TimeWindows and
> >>>>>>>>>>> other
> >>>>>>>>>>>>>>>> fixed-size windows, namely that the windows are fixed in
> >>>>>> size. If
> >>>>>>>>>>> we
> >>>>>>>>>>>>> want
> >>>>>>>>>>>>>>>> to preserve the current two-part windowing API in which
> >>>>>> you can
> >>>>>>>>>>> window
> >>>>>>>>>>>>>>>> by either "fixed" or "data driven" modes, I'd suggest we
> >>>>>> avoid
> >>>>>>>>>>>>> increasing
> >>>>>>>>>>>>>>>> the blast radius of Windows by taking the opportunity to
> >>>>>> replace
> >>>>>>>> it
> >>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>> a proper interface and implement that interface instead.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> For example:
> >>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/9031
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Then, SlidingWindows would just implement
> >>>>>>>> FixedSizeWindowDefinition
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> ======
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Regarding (2), it seems more straightforward as a user
> >> of
> >>>>>> Streams
> >>>>>>>>>>>>>>>> to just have one mental model. _All_ of our aggregation
> >>>>>>>> operations
> >>>>>>>>>>>>>>>> follow an eager emission model, in which we just emit an
> >>>>>> update
> >>>>>>>>>>>>> whenever
> >>>>>>>>>>>>>>>> an update is available. We already provided Suppression
> >> to
> >>>>>>>>>>> explicitly
> >>>>>>>>>>>>>>> apply
> >>>>>>>>>>>>>>>> different update semantics in the case it's required.
> >> Why
> >>>>>> should
> >>>>>>>> we
> >>>>>>>>>>>>>>> define
> >>>>>>>>>>>>>>>> a snowflake operation with completely different
> >> semantics
> >>>>>> from
> >>>>>>>>>>>>> everything
> >>>>>>>>>>>>>>>> else? I.e., systems are generally easier to use when
> >> they
> >>>>>> follow
> >>>>>>>> a
> >>>>>>>>>>> few
> >>>>>>>>>>>>>>>> simple, composable rules than when they have a lot of
> >>>>>> different,
> >>>>>>>>>>>>> specific
> >>>>>>>>>>>>>>>> rules.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> ======
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> New point: (4):
> >>>>>>>>>>>>>>>> It would be nice to include some examples of user code
> >>>>>> that would
> >>>>>>>>>>> use
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> new API, which should include:
> >>>>>>>>>>>>>>>> 1. using the DSL with the sliding window definition
> >>>>>>>>>>>>>>>> 2. accessing the stored results of a sliding window
> >>>>>> aggregation
> >>>>>>>>>>> via IQ
> >>>>>>>>>>>>>>>> 3. defining a custom processor to access sliding windows
> >>>>>> in a
> >>>>>>>> store
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> It generally helps reviewers wrap their heads around the
> >>>>>>>> proposal,
> >>>>>>>>>>> as
> >>>>>>>>>>>>>>> well
> >>>>>>>>>>>>>>>> as shaking out any design issues that would otherwise
> >>>>>> only come
> >>>>>>>> up
> >>>>>>>>>>>>> during
> >>>>>>>>>>>>>>>> implementation/testing/review.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks again for the awesome proposal!
> >>>>>>>>>>>>>>>> -John
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Tue, Jul 14, 2020, at 12:31, Guozhang Wang wrote:
> >>>>>>>>>>>>>>>>> Hello Leah,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks for the nice written KIP. A few thoughts:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 1) I echo the other reviewer's comments regarding the
> >>>>>> typing:
> >>>>>>>> why
> >>>>>>>>>>>>>>>> extending
> >>>>>>>>>>>>>>>>> TimeWindow instead of just extending Window?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 2) I also feel that emitting policy for this type of
> >>>>>> windowing
> >>>>>>>>>>>>>>>> aggregation
> >>>>>>>>>>>>>>>>> may be different from the existing ones. Existing
> >>>>>> emitting
> >>>>>>>> policy
> >>>>>>>>>>> is
> >>>>>>>>>>>>>>> very
> >>>>>>>>>>>>>>>>> simple: emit every time when window get updates, and
> >>>>>> emit every
> >>>>>>>>>>> time
> >>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>> out-of-ordering data within grace period, this is
> >>>>>> because for
> >>>>>>>>>>>>>>>> time-windows
> >>>>>>>>>>>>>>>>> the window close time is strictly depend on the window
> >>>>>> start
> >>>>>>>> time
> >>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> fixed, while for session-windows although the window
> >>>>>> open/close
> >>>>>>>>>>> time
> >>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> also data-dependent it is relatively infrequent
> >> compared
> >>>>>> to the
> >>>>>>>>>>>>>>>>> sliding-windows. For this KIP, since each new data
> >> would
> >>>>>> cause a
> >>>>>>>>>>>>>>>>> new sliding-window, the num. windows maintained
> >>>>>> logically could
> >>>>>>>> be
> >>>>>>>>>>>>> much
> >>>>>>>>>>>>>>>>> larger and hence emitting on each update may be too
> >>>>>> aggressive.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 3) Although KIP itself should be focusing on user face
> >>>>>>>>>>> interfaces, I'd
> >>>>>>>>>>>>>>>>> suggest we create a children page of KIP-450 discussing
> >>>>>> about
> >>>>>>>> its
> >>>>>>>>>>>>>>>>> implementations as well, since some of that may drive
> >> the
> >>>>>>>>>>> interface
> >>>>>>>>>>>>>>>> design.
> >>>>>>>>>>>>>>>>> E.g. personally I think having a combiner interface in
> >>>>>> addition
> >>>>>>>> to
> >>>>>>>>>>>>>>>>> aggregator would be useful but that's based on my
> >> 2cents
> >>>>>> about
> >>>>>>>> the
> >>>>>>>>>>>>>>>>> implementation design (I once created a child page
> >>>>>> describing
> >>>>>>>> it:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation
> >>>>>>>>>>>>>>>>> ).
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Tue, Jul 14, 2020 at 5:31 AM Bruno Cadonna <
> >>>>>>>> br...@confluent.io
> >>>>>>>>>>>>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi Leah,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thank you for the KIP!
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Here is my feedback:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 1. The KIP would benefit from some code examples that
> >>>>>> show how
> >>>>>>>>>>> to use
> >>>>>>>>>>>>>>>>>> sliding windows in aggregations.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 2. The different sliding windows in Figure 1 and 2 are
> >>>>>> really
> >>>>>>>>>>> hard to
> >>>>>>>>>>>>>>>>>> distinguish. Could you please try to make them
> >>>>>> graphically
> >>>>>>>> better
> >>>>>>>>>>>>>>>>>> distinguishable? You could try to draw the frames of
> >>>>>>>> consecutive
> >>>>>>>>>>>>>>>>>> windows shifted to each other.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 3. I agree with Matthias, that extending
> >>>>>> Windows<TimeWindow>
> >>>>>>>>>>> does not
> >>>>>>>>>>>>>>>>>> seem to be the best approach. What would be the result
> >>>>>> of
> >>>>>>>>>>>>>>>>>> windowsFor()?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 4. In the section "Public Interfaces" you should
> >> remove
> >>>>>>>>>>>>>>> implementation
> >>>>>>>>>>>>>>>>>> details like private constructors and private fields.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 5. Do we need a new store interface or can we use
> >>>>>> WindowStore?
> >>>>>>>>>>> Some
> >>>>>>>>>>>>>>>>>> words about that would be informative.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 6. @Matthias, if the subtrator is not strictly needed,
> >>>>>> I would
> >>>>>>>>>>> skip
> >>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>> for now and add it later.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 7. I also agree that having a section that describes
> >>>>>> how to
> >>>>>>>>>>> handle
> >>>>>>>>>>>>>>>>>> out-of-order records would be good to understand what
> >>>>>> is still
> >>>>>>>>>>>>>>> missing
> >>>>>>>>>>>>>>>>>> and what we can reuse.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Sat, Jul 11, 2020 at 9:16 PM Matthias J. Sax <
> >>>>>>>>>>> mj...@apache.org>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Leah,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> thanks for your update. However, it does not
> >>>>>> completely answer
> >>>>>>>>>>> my
> >>>>>>>>>>>>>>>>>> question.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> In our current window implementations, we emit a
> >>>>>> window result
> >>>>>>>>>>>>>>> update
> >>>>>>>>>>>>>>>>>>> record (ie, early/partial result) for each input
> >>>>>> record. When
> >>>>>>>> an
> >>>>>>>>>>>>>>>>>>> out-of-order record arrives, we just update to
> >>>>>> corresponding
> >>>>>>>> old
> >>>>>>>>>>>>>>>> window
> >>>>>>>>>>>>>>>>>>> and emit another update.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> It's unclear from the KIP if you propose the same
> >> emit
> >>>>>>>>>>> strategy? --
> >>>>>>>>>>>>>>>> For
> >>>>>>>>>>>>>>>>>>> sliding windows it might be worth to consider to use
> >> a
> >>>>>>>> different
> >>>>>>>>>>>>>>> emit
> >>>>>>>>>>>>>>>>>>> strategy and only support emitting the final result
> >>>>>> only (ie,
> >>>>>>>>>>> after
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> grace period passed)?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Boyang, also raises a good point that relates to my
> >>>>>> point from
> >>>>>>>>>>>>>>> above
> >>>>>>>>>>>>>>>>>>> about pre-aggregations and storage layout. Our
> >> current
> >>>>>> time
> >>>>>>>>>>> windows
> >>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>> all pre-aggregated and stored in parallel. We can
> >> also
> >>>>>> lookup
> >>>>>>>>>>>>>>> windows
> >>>>>>>>>>>>>>>>>>> efficiently, as we can compute the windowed-key given
> >>>>>> the
> >>>>>>>> input
> >>>>>>>>>>>>>>>> record
> >>>>>>>>>>>>>>>>>>> key and timestamp based on the window definition.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> However, for sliding windows, window boundaries are
> >>>>>> data
> >>>>>>>>>>> dependent
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>> thus we cannot compute them upfront. Thus, how can we
> >>>>>> "find"
> >>>>>>>>>>>>>>> existing
> >>>>>>>>>>>>>>>>>>> window efficiently? Furthermore, out-of-order data
> >>>>>> would
> >>>>>>>> create
> >>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>>> windows in the past and we need to be able to handle
> >>>>>> this
> >>>>>>>> case.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thus, to handle out-of-order data correctly, we need
> >>>>>> to store
> >>>>>>>>>>> all
> >>>>>>>>>>>>>>> raw
> >>>>>>>>>>>>>>>>>>> input events. Additionally, we could also store
> >>>>>> pre-aggregated
> >>>>>>>>>>>>>>>> results
> >>>>>>>>>>>>>>>>>>> if we thinks it's benfitial. -- If we apply "emit
> >> only
> >>>>>> final
> >>>>>>>>>>>>>>> results"
> >>>>>>>>>>>>>>>>>>> strategy, storing pre-aggregated result would not be
> >>>>>> necessary
> >>>>>>>>>>>>>>>> though.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Btw: for sliding windows it might also be useful to
> >>>>>> consider
> >>>>>>>>>>>>>>> allowing
> >>>>>>>>>>>>>>>>>>> users to supply a `Subtractor` -- this subtractor
> >>>>>> could be
> >>>>>>>>>>> applied
> >>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>> the current window result (in case we store it) if a
> >>>>>> record
> >>>>>>>>>>> drops
> >>>>>>>>>>>>>>>> out of
> >>>>>>>>>>>>>>>>>>> the window. Of course, not all aggregation functions
> >>>>>> are
> >>>>>>>>>>>>>>> subtractable
> >>>>>>>>>>>>>>>>>>> and we can consider this as a follow up task, too,
> >> and
> >>>>>> not
> >>>>>>>>>>> include
> >>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>> this KIP for now. Thoughts?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I was also thinking about the type hierarchy. I am
> >> not
> >>>>>> sure if
> >>>>>>>>>>>>>>>> extending
> >>>>>>>>>>>>>>>>>>> TimeWindow is the best approach? For TimeWindows, we
> >>>>>> can
> >>>>>>>>>>>>>>> pre-compute
> >>>>>>>>>>>>>>>>>>> window boundaries (cf `windowsFor()`) while for a
> >>>>>> sliding
> >>>>>>>> window
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> boundaries are data dependent. Session windows are
> >>>>>> also data
> >>>>>>>>>>>>>>>> dependent
> >>>>>>>>>>>>>>>>>>> and thus they don't inherit from TimeWindow (Maybe
> >>>>>> check out
> >>>>>>>> the
> >>>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>>>> that added session windows? It could provides some
> >> good
> >>>>>>>>>>> insights.)
> >>>>>>>>>>>>>>>> -- I
> >>>>>>>>>>>>>>>>>>> believe the same rational applies to sliding windows?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On 7/10/20 12:47 PM, Boyang Chen wrote:
> >>>>>>>>>>>>>>>>>>>> Thanks Leah and Sophie for the KIP.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 1. I'm a bit surprised that we don't have an advance
> >>>>>> time.
> >>>>>>>>>>> Could
> >>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>> elaborate how the storage layer is structured?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 2. IIUC, there will be extra cost in terms of
> >> fetching
> >>>>>>>>>>>>>>> aggregation
> >>>>>>>>>>>>>>>>>> results,
> >>>>>>>>>>>>>>>>>>>> since we couldn't pre-aggregate until the user asks
> >>>>>> for it.
> >>>>>>>>>>> Would
> >>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>> good
> >>>>>>>>>>>>>>>>>>>> to also discuss it.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 3. We haven't discussed the possibility of
> >> supporting
> >>>>>> sliding
> >>>>>>>>>>>>>>>> windows
> >>>>>>>>>>>>>>>>>>>> inherently. For a user who actually uses a hopping
> >>>>>> window,
> >>>>>>>>>>>>>>> Streams
> >>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>> detect such an inefficiency doing a
> >>>>>> window_size/advance_time
> >>>>>>>>>>>>>>> ratio
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> reach
> >>>>>>>>>>>>>>>>>>>> a conclusion on whether the write amplification is
> >>>>>> too high
> >>>>>>>>>>>>>>>> compared
> >>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>> some configured threshold. The benefit of doing so
> >> is
> >>>>>> that
> >>>>>>>>>>>>>>> existing
> >>>>>>>>>>>>>>>>>> Streams
> >>>>>>>>>>>>>>>>>>>> users don't need to change their code, learn a new
> >>>>>> API, but
> >>>>>>>>>>> only
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> upgrade
> >>>>>>>>>>>>>>>>>>>> Streams library to get benefits for their
> >> inefficient
> >>>>>> hopping
> >>>>>>>>>>>>>>>> window
> >>>>>>>>>>>>>>>>>>>> implementation. There might be some compatibility
> >>>>>> issues for
> >>>>>>>>>>>>>>> sure,
> >>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>> worth listing them out for trade-off.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Boyang
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Fri, Jul 10, 2020 at 12:40 PM Leah Thomas <
> >>>>>>>>>>>>>>> ltho...@confluent.io
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hey Matthias,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks for pointing that out. I added the following
> >>>>>> to the
> >>>>>>>>>>>>>>> Propose
> >>>>>>>>>>>>>>>>>> Changes
> >>>>>>>>>>>>>>>>>>>>> section of the KIP:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> "Records that come out of order will be processed
> >>>>>> the same
> >>>>>>>> way
> >>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>> in-order
> >>>>>>>>>>>>>>>>>>>>> records, as long as they fall within the grace
> >>>>>> period. Any
> >>>>>>>> new
> >>>>>>>>>>>>>>>> windows
> >>>>>>>>>>>>>>>>>>>>> created by the late record will still be created,
> >>>>>> and the
> >>>>>>>>>>>>>>> existing
> >>>>>>>>>>>>>>>>>> windows
> >>>>>>>>>>>>>>>>>>>>> that are changed by the late record will be
> >> updated.
> >>>>>> Any
> >>>>>>>>>>> record
> >>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>> falls
> >>>>>>>>>>>>>>>>>>>>> outside of the grace period (either user defined or
> >>>>>> default)
> >>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>> discarded. "
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> All the best,
> >>>>>>>>>>>>>>>>>>>>> Leah
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Thu, Jul 9, 2020 at 9:47 PM Matthias J. Sax <
> >>>>>>>>>>>>>>> mj...@apache.org>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Leah,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> thanks a lot for the KIP. Very well written.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> The KIP does not talk about the handling of
> >>>>>> out-of-order
> >>>>>>>> data
> >>>>>>>>>>>>>>>> though.
> >>>>>>>>>>>>>>>>>>>>>> How do you propose to address this?
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On 7/8/20 5:33 PM, Leah Thomas wrote:
> >>>>>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>>>> I'd like to kick-off the discussion for KIP-450,
> >>>>>> adding
> >>>>>>>>>>>>>>> sliding
> >>>>>>>>>>>>>>>>>> window
> >>>>>>>>>>>>>>>>>>>>>>> aggregation support to Kafka Streams.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Let me know what you think,
> >>>>>>>>>>>>>>>>>>>>>>> Leah
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Attachments:
> >>>>>>>>> * signature.asc
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>
> >
>
>

Reply via email to