Thanks Leah. Overall LGTM.

A few nits:

- the first figure shows window [9,19] but the window is not aligned
properly (it should be 1ms to the right; right now, it's aligned to
window [8,18])

- in the second figure, a hopping window would create more windows, ie,
the first window would be [-6,14) and the last window would be [19,29),
thus it's not just 10 windows but 26 windows (if I did not miss count)

- "Two new windows will be created by the late record"

late -> out-of-order


-Matthias



On 7/28/20 4:34 PM, Sophie Blee-Goldman wrote:
> Thanks for the update Leah -- I think that all makes sense.
> 
> Cheers,
> Sophie
> 
> On Tue, Jul 28, 2020 at 3:55 PM Leah Thomas <ltho...@confluent.io> wrote:
> 
>> Another minor tweak, instead of defining the window by the *size*, it will
>> be defined by *timeDifference*, which is the maximum time difference
>> between two events. This is a more precise way to define a window due to
>> its inclusive ends, while allowing the user to create the window they
>> expect. This definition fits with the current examples, where a record at
>> *10* would fall into a window of time difference *5* from *[5,10]*. This
>> window contains any records at 5, 6, 7, 8, 9, and 10, which is 6 instances
>> instead of 5. This semantic difference is why I've shifted *size* to
>> *timeDifference*.
>>
>> The new builder will be *withTimeDifferenceAndGrace*, keeping with other
>> conventions.
>>
>> Let me know if there are any concerns! The vote thread is open as well
>> here:
>> http://mail-archives.apache.org/mod_mbox/kafka-dev/202007.mbox/browser
>>
>> Best,
>> Leah
>>
>> On Mon, Jul 27, 2020 at 3:58 PM Leah Thomas <ltho...@confluent.io> wrote:
>>
>>> A small tweak - to make it more clear to users that grace is required, as
>>> well as cleaning up some of the confusing grammar semantics of windows,
>> the
>>> main builder method for *slidingWindows* will be *withSizeAndGrace*
>> instead
>>> of *of*.  This looks like it'll be the last change (for now) on the
>>> public API. If anyone has any comments or thoughts let me know.
>> Otherwise,
>>> I'll take this to vote shortly.
>>>
>>> Best,
>>> Leah
>>>
>>> On Fri, Jul 24, 2020 at 3:45 PM Leah Thomas <ltho...@confluent.io>
>> wrote:
>>>
>>>> To accommodate the change to a final class, I've added another
>>>> *windowedBy()* function in *CogroupedKStream.java *to handle
>>>> SlidingWindows.
>>>>
>>>> As far as the discussion goes, I think this is the last change we've
>>>> talked about. If anyone has other comments or concerns, please let me
>> know!
>>>>
>>>> Cheers,
>>>> Leah
>>>>
>>>> On Thu, Jul 23, 2020 at 7:34 PM Leah Thomas <ltho...@confluent.io>
>> wrote:
>>>>
>>>>> Thanks for the discussion about extending TimeWindows. I agree that
>>>>> making it future proof is important, and the implementation of
>>>>> SlidingWindows is unique enough that it seems logical to make it its
>> own
>>>>> final class.
>>>>>
>>>>> On that note, I've updated the KIP to make SlidingWindows a stand alone
>>>>> final class, and added the *windowedBy() *API in *KGroupedStream *to
>>>>> handle SlidingWindows. It seems that SlidingWindows would still be
>> able to
>>>>> leverage *TimeWindowedKStream by* creating a SlidingWindows version of
>>>>> *TimeWindowedKStreamImpl* that implements *TimeWindowedKStream. *If
>>>>> anyone sees issues with this implementation, please let me know.
>>>>>
>>>>> Best,
>>>>> Leah
>>>>>
>>>>> On Wed, Jul 22, 2020 at 10:47 PM John Roesler <vvcep...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the reply, Sophie.
>>>>>>
>>>>>> That all sounds about right to me.
>>>>>>
>>>>>> The Windows “interface”/algorithm is quite flexible, so it makes sense
>>>>>> for it to be extensible. Different implementations can (and do)
>> enumerate
>>>>>> different windows to suit different use cases.
>>>>>>
>>>>>> On the other hand, I can’t think of any way to extend SessionWindows
>> to
>>>>>> do something different using the same algorithm, so it makes sense
>> for it
>>>>>> to stay final.
>>>>>>
>>>>>> If we think SlidingWindows is similarly not usefully extensible, then
>>>>>> we can make it final. It’s easy to remove final later, but adding it
>> is not
>>>>>> possible. Or we could go the other route and just make it an
>> interface, on
>>>>>> general principle. Both of these choices are safe API design.
>>>>>>
>>>>>> Thanks again,
>>>>>> John
>>>>>>
>>>>>> On Wed, Jul 22, 2020, at 21:54, Sophie Blee-Goldman wrote:
>>>>>>>>
>>>>>>>> Users could pass in a custom `SessionWindows` as
>>>>>>>> long as the session algorithm works correctly for it.
>>>>>>>
>>>>>>>
>>>>>>> Well not really, SessionWindows is a final class. TimeWindows is
>> also
>>>>>> a
>>>>>>> final class, so neither of these can be extended/customized. For a
>>>>>> given
>>>>>>> Windows then there would only be three (non-overlapping)
>>>>>> possibilities:
>>>>>>> either it's TimeWindows, SlidingWindows, or a custom  Windows. I
>> don't
>>>>>>> think there's any problem with determining what the user wants in
>>>>>> this case
>>>>>>> --
>>>>>>> we would just check if it's a SlidingWindows and connect the new
>>>>>> processor,
>>>>>>> or else connect the existing hopping/tumbling window processor.
>>>>>>>
>>>>>>> I'll admit that last sentence does leave a bad taste in my mouth.
>>>>>> Part of
>>>>>>> that
>>>>>>> is probably the "leaking" API Matthias pointed out; we just assume
>> the
>>>>>>> hopping/tumbling window implementation fits all custom windows. But
>> I
>>>>>> guess
>>>>>>> if you really needed to customize the algorithm any further you may
>>>>>> as well
>>>>>>> stick in a transformer and do it all yourself.
>>>>>>>
>>>>>>> Anyways, given what we have, it does seem weird to apply one
>> algorithm
>>>>>>> for most Windows types and then swap in a different one for one
>>>>>> specific
>>>>>>> extension of Windows. So adding a new #windowedBy(SlidingWindows)
>>>>>>> sounds reasonable to me.
>>>>>>>
>>>>>>> I'm still not convinced that we need a whole new TimeWindowedKStream
>>>>>>> equivalent class for sliding windows though. It seems like the
>>>>>>> hopping/tumbling
>>>>>>> window implementation could benefit just as much from a subtractor
>> as
>>>>>> the
>>>>>>> sliding windows so the only future-proofing we get is the ability to
>>>>>> be
>>>>>>> lazy and
>>>>>>> add the subtractor to one but not the other. Of course it would only
>>>>>> be an
>>>>>>> optimization so we could just not apply it to one type and nothing
>>>>>> would
>>>>>>> break.
>>>>>>> It does make me nervous to go against the "future-proof" direction,
>>>>>> though.
>>>>>>> Are there any other examples of things we might want to add to one
>>>>>> window
>>>>>>> type but not to another?
>>>>>>>
>>>>>>> On another note, this actually brings up a new question: should
>>>>>>> SlidingWindows
>>>>>>> also be final? My take is "yes" since there's no reasonable
>>>>>> customization of
>>>>>>> sliding windows, at least not that I can think of. Thoughts?
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jul 22, 2020 at 7:15 PM John Roesler <vvcep...@apache.org>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks, all,
>>>>>>>>
>>>>>>>> I can see how my conclusion was kind of a leap.
>>>>>>>>
>>>>>>>> What Matthias said is indeed what I was thinking. When you
>> provide a
>>>>>>>> window definition to the windowBy() method, you are selecting an
>>>>>> algorithm
>>>>>>>> that will be used to compute the windows from the input data.
>>>>>>>>
>>>>>>>> I didn’t mean the code implementation  “algorithm”, but the
>>>>>> high-level
>>>>>>>> algorithm that describes how the input stream will be transformed
>>>>>> into a
>>>>>>>> sequence of windows.
>>>>>>>>
>>>>>>>> For example, the algorithm for Windows is something like “given
>> the
>>>>>> record
>>>>>>>> timestamp, include the record in each of the enumerated windows”.
>>>>>> Note that
>>>>>>>> there can be a lot of variation in how the windows are enumerated,
>>>>>> which is
>>>>>>>> why there are at least a couple of implementations of Windows, and
>>>>>> we are
>>>>>>>> open to adding more (like for natural calendar boundaries).
>>>>>>>>
>>>>>>>> For SessionWindows, it’s more like “if any window is within the
>> gap,
>>>>>>>> extend its boundaries to include this record and if two windows
>>>>>> touch, then
>>>>>>>> merge them”.
>>>>>>>>
>>>>>>>> Clearly, the algorithm for SlidingWindows doesn’t fall into either
>>>>>>>> category, so it seems inappropriate to claim that it does in the
>>>>>> API (by
>>>>>>>> implementing Windows with stubbed methods) and then cast
>> internally
>>>>>> to
>>>>>>>> execute a completely different algorithm.
>>>>>>>>
>>>>>>>> To your other observation, that the DSL object resulting from
>>>>>> windowBy
>>>>>>>> would look the same for Windows and SessionWindows, maybe it makes
>>>>>> sense
>>>>>>>> for windowBy(SessionWindows) also to return a TimeWindowedKStream.
>>>>>>>>
>>>>>>>> i.e.:
>>>>>>>> =======================
>>>>>>>> <W extends Window> TimeWindowedKStream<K, V> windowedBy(final
>>>>>> Windows<W>
>>>>>>>> windows);
>>>>>>>> TimeWindowedKStream<K, V> windowedBy(final SlidingWindows
>> windows);
>>>>>>>> =======================
>>>>>>>>
>>>>>>>>  I can’t think of a reason this wouldn’t work. But then again, it
>>>>>> would be
>>>>>>>> more future-proof to go ahead and specify a different return type
>>>>>> now, if
>>>>>>>> we think we'll want to add subtractors and stuff later. I don't
>>>>>> have a
>>>>>>>> strong feeling about that part of the API. It seems to be
>>>>>> independent of
>>>>>>>> whether SlidingWindows extends Windows or not.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> -John
>>>>>>>>
>>>>>>>> On Wed, Jul 22, 2020, at 19:41, Matthias J. Sax wrote:
>>>>>>>>> I think what John tries to say is the following:
>>>>>>>>>
>>>>>>>>> We have `windowedBy(Windows)` that accept hopping/tumbling
>>>>>> windows but
>>>>>>>>> also custom window and we use a specific algorithm. Note, that
>>>>>> custom
>>>>>>>>> windows must "work" based on the used algorithm.
>>>>>>>>>
>>>>>>>>> For session windows we have `windowedBy(SessionWindows)` and
>>>>>> apply a
>>>>>>>>> different algorithm. Users could pass in a custom
>>>>>> `SessionWindows` as
>>>>>>>>> long as the session algorithm works correctly for it.
>>>>>>>>>
>>>>>>>>> For the new sliding windows, we want to use a different
>> algorithm
>>>>>>>>> compare to hopping/tumbling windows. If we let sliding window
>>>>>> extend
>>>>>>>>> `Windows`, we can decide at runtime if we need to use the
>>>>>>>>> hopping/tumbling window algorithm for hopping/tumbling windows
>> or
>>>>>> the
>>>>>>>>> new sliding window algorithm for sliding windows. However, if we
>>>>>> get a
>>>>>>>>> custom window, which algorithm do we pick now? The existing
>>>>>>>>> tumbling/hopping window algorithm of the new sliding window
>>>>>> algorithm?
>>>>>>>>> Both a custom "time-window" and custom "sliding window"
>> implement
>>>>>> the
>>>>>>>>> generic `Windows` class and thus we cannot make a decision as we
>>>>>> don't
>>>>>>>>> know the user's intent.
>>>>>>>>>
>>>>>>>>> As a matter of fact, even if the user might not be aware of it,
>>>>>> the
>>>>>>>>> algorithm we use does already leak into the API (if a user
>> extends
>>>>>>>>> `Windows` is must work with our hopping/tumbling window
>> algorithm
>>>>>> and if
>>>>>>>>> a user extends `SessionWindows` it must work with our session
>>>>>> algorithm)
>>>>>>>>> and it seems we need to preserve this property for sliding
>> window.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>> On 7/22/20 4:35 PM, Sophie Blee-Goldman wrote:
>>>>>>>>>> Hey John,
>>>>>>>>>>
>>>>>>>>>> Just a few follow-up questions/comments about the whole
>> Windows
>>>>>> thing:
>>>>>>>>>>
>>>>>>>>>> That's a good way of looking at things; in particular the
>> point
>>>>>> about
>>>>>>>>>> SessionWindows
>>>>>>>>>> for example requiring a Merger while other "statically
>>>>>> enumerable"
>>>>>>>> windows
>>>>>>>>>> require
>>>>>>>>>> only an adder seems to touch on the heart of the matter.
>>>>>>>>>>
>>>>>>>>>>  It seems like what Time and Universal (and any other Windows
>>>>>>>>>>> implementation) have in common is that the windows are
>>>>>> statically
>>>>>>>>>>> enumerable.
>>>>>>>>>>> As a consequence, they can all rely on an aggregation
>>>>>> maintenence
>>>>>>>> algorithm
>>>>>>>>>>> that involves enumerating each of the windows and updating
>> it.
>>>>>> That
>>>>>>>>>>> also means that their DSL object (TimeWindowedKStream)
>> doesn't
>>>>>> need
>>>>>>>>>>> "subtractors" or "mergers", but only "adders"; again, this
>> is a
>>>>>>>> consequence
>>>>>>>>>>> of the fact that the windows are enumerable.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Given that, I'm a bit confused why you conclude that sliding
>>>>>> windows
>>>>>>>> are
>>>>>>>>>> fundamentally
>>>>>>>>>> different from the "statically enumerable" windows -- sliding
>>>>>> windows
>>>>>>>>>> require only an
>>>>>>>>>> adder too. I'm not sure it's a consequence of being
>> enumerable,
>>>>>> or that
>>>>>>>>>> being enumerable
>>>>>>>>>> is the fundamental property that unites all Windows (ignoring
>>>>>>>> JoinWindows
>>>>>>>>>> here). Yes,  it
>>>>>>>>>> currently does apply to all Windows implementations, but we
>>>>>> shouldn't
>>>>>>>>>> assume that it
>>>>>>>>>> *has *to be that way on the basis that it currently happens to
>>>>>> be.
>>>>>>>>>>
>>>>>>>>>> Also, the fact that they can all rely on the same aggregation
>>>>>> algorithm
>>>>>>>>>> seems like an
>>>>>>>>>> implementation detail and it would be weird to force a
>>>>>> separate/new
>>>>>>>> DSL API
>>>>>>>>>> just because
>>>>>>>>>> under the covers we swap in a different processor.
>>>>>>>>>>
>>>>>>>>>> To be fair, I don't think there's a strong reason *against*
>> not
>>>>>>>> extending
>>>>>>>>>> Windows -- in the end
>>>>>>>>>> it will just mean adding a new #windowedBy method and
>>>>>> copy/pasting
>>>>>>>>>> everything from
>>>>>>>>>>  TimeWindowedKStream pretty much word for word. But anytime
>> you
>>>>>> find
>>>>>>>>>> yourself
>>>>>>>>>> copying over code almost exactly, there should probably be a
>>>>>> good
>>>>>>>> reason
>>>>>>>>>> why :)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Jul 22, 2020 at 3:48 PM John Roesler <
>>>>>> vvcep...@apache.org>
>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks Leah!
>>>>>>>>>>>
>>>>>>>>>>> 5) Regarding the empty windows, I'm wondering if we should
>>>>>> simply
>>>>>>>> propose
>>>>>>>>>>> that the windows should not be emitted downstream of the
>>>>>> operator or
>>>>>>>>>>> visible in IQ. Then, it'll be up to the implementation to
>> make
>>>>>> it
>>>>>>>> happen.
>>>>>>>>>>> I'm
>>>>>>>>>>> personally not concerned about it, since it seems like there
>>>>>> are
>>>>>>>> multiple
>>>>>>>>>>> ways to accomplish this.
>>>>>>>>>>>
>>>>>>>>>>> Note, the discrepancy Matthias pointed out is actually a
>>>>>> design bug.
>>>>>>>> The
>>>>>>>>>>> windowed aggregation (like every operation in Streams)
>>>>>> produces a
>>>>>>>> "view",
>>>>>>>>>>> which then forms the basis of downstream operations. When we
>>>>>> pass the
>>>>>>>>>>> Materialized option to the operation, all we're doing is
>>>>>> saying to
>>>>>>>>>>> "materialize"
>>>>>>>>>>> the view (aka, actually store the computed view) and also
>> make
>>>>>> it
>>>>>>>>>>> queriable.
>>>>>>>>>>> It would be illegal for the "queriable, materialized view" to
>>>>>> differ
>>>>>>>> in
>>>>>>>>>>> any way
>>>>>>>>>>> from the "view". So, it seems we must either propose to emit
>>>>>> the empty
>>>>>>>>>>> windows AND make them visible in IQ, or propose NOT to emit
>>>>>> the empty
>>>>>>>>>>> windows AND NOT make them visible in IQ.
>>>>>>>>>>>
>>>>>>>>>>> 7) Regarding whether we can extend TimeWindows (or Windows):
>>>>>>>>>>> I've been mulling this over more. I think it's worth asking
>> the
>>>>>>>> question of
>>>>>>>>>>> what these classes even mean. For example, why is
>>>>>> SessionWindows a
>>>>>>>>>>> different thing from TimeWindows and UniversalWindows (which
>>>>>> are both
>>>>>>>>>>> Windows)?
>>>>>>>>>>>
>>>>>>>>>>> This conversation is extra complicated because of the
>>>>>> incomplete and
>>>>>>>>>>> mis-matched class hierarchy, but we can try to look past it
>>>>>> for now.
>>>>>>>>>>>
>>>>>>>>>>> It seems like what Time and Universal (and any other Windows
>>>>>>>>>>> implementation) have in common is that the windows are
>>>>>> statically
>>>>>>>>>>> enumerable.
>>>>>>>>>>> As a consequence, they can all rely on an aggregation
>>>>>> maintenence
>>>>>>>> algorithm
>>>>>>>>>>> that involves enumerating each of the windows and updating
>> it.
>>>>>> That
>>>>>>>>>>> also means that their DSL object (TimeWindowedKStream)
>> doesn't
>>>>>> need
>>>>>>>>>>> "subtractors" or "mergers", but only "adders"; again, this
>> is a
>>>>>>>> consequence
>>>>>>>>>>> of the fact that the windows are enumerable.
>>>>>>>>>>>
>>>>>>>>>>> In contrast, session windows are data driven, so they are not
>>>>>>>> statically
>>>>>>>>>>> enumerable. Their algorithm has to rely on scans, and to do
>>>>>> the scans,
>>>>>>>>>>> it needs to know the "inactivity gap", which needs to be part
>>>>>> of the
>>>>>>>> window
>>>>>>>>>>> definition. Likewise, session windows have the property that
>>>>>> they need
>>>>>>>>>>> to be merged, so their DSL object also requires mergers.
>>>>>>>>>>>
>>>>>>>>>>> It really seems like your new window definition doesn't fit
>>>>>> into
>>>>>>>> either
>>>>>>>>>>> category. It uses a different algorithm, which relies on
>>>>>> scans, but
>>>>>>>> it is
>>>>>>>>>>> also fixed in size, so it doesn't need mergers. In this
>>>>>> situation, it
>>>>>>>> seems
>>>>>>>>>>> like the safe bet is to just create SessionWindows with no
>>>>>> interface
>>>>>>>> and
>>>>>>>>>>> add a separate set of DSL operations and objects. It's a
>>>>>> little extra
>>>>>>>> code,
>>>>>>>>>>> but it seems to keep everything tidier and more
>>>>>> comprehensible, both
>>>>>>>>>>> for us and for users.
>>>>>>>>>>>
>>>>>>>>>>> What do you think?
>>>>>>>>>>> -John
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jul 22, 2020, at 10:30, Leah Thomas wrote:
>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the suggestions, I've updated the KIP and child
>>>>>> page
>>>>>>>>>>> accordingly
>>>>>>>>>>>> and addressed some below.
>>>>>>>>>>>>
>>>>>>>>>>>> 1) For the mandatory grace period, we should use a static
>>>>>> builder
>>>>>>>> method
>>>>>>>>>>>>> that take two parameters.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>  That makes sense, I've changed that in the public API.
>>>>>>>>>>>>
>>>>>>>>>>>> Btw: this implementation actually raises an issue for IQ:
>>>>>> those empty
>>>>>>>>>>>>> windows would be returned.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> This is a great point, with the current implementation plan
>>>>>> empty
>>>>>>>> windows
>>>>>>>>>>>> would be returned. I think creating a second window store
>>>>>> would
>>>>>>>>>>> definitely
>>>>>>>>>>>> work, but there would be more overhead in having two stores
>>>>>> and
>>>>>>>> switching
>>>>>>>>>>>> windows between the stores, as well as doing scans in both
>>>>>> stores to
>>>>>>>> find
>>>>>>>>>>>> existing windows. There might be a way to do avoid emitting
>>>>>> empty
>>>>>>>> windows
>>>>>>>>>>>> without creating a second window store, I'll look more into
>>>>>> it.
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Leah
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Jul 21, 2020 at 1:25 PM Matthias J. Sax <
>>>>>> mj...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for updating the KIP.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Couple of follow up comments:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1) For the mandatory grace period, we should use a static
>>>>>> builder
>>>>>>>>>>> method
>>>>>>>>>>>>> that take two parameters. This provides a better API as
>> user
>>>>>> cannot
>>>>>>>>>>>>> forget to set the grace period. Throwing a runtime
>> exception
>>>>>> seems
>>>>>>>> not
>>>>>>>>>>>>> to be the best way to handle this case.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2) In Fig.2 you list 10 hopping windows. I believe it
>> should
>>>>>>>> actually
>>>>>>>>>>> be
>>>>>>>>>>>>> more? There first hopping window would be [-6,-4[ and the
>>>>>> last one
>>>>>>>>>>> would
>>>>>>>>>>>>> be from [19,29[ -- hence, the cost saving are actually much
>>>>>> higher.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 3a) IQ: you are saying that the user need to compute the
>>>>>> start time
>>>>>>>> as
>>>>>>>>>>>>>
>>>>>>>>>>>>>> windowSize+the time they're looking at
>>>>>>>>>>>>>
>>>>>>>>>>>>> Should this be "targetTime - windowSize" instead?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 3b) IQ: in you example you say "window size of 10 minutes"
>>>>>> with an
>>>>>>>>>>>>> incident at 9:15.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> they're looking for a window with the start time of 8:15.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The example does not seem to add up?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 4) For "Processing Windows": you describe a three step
>>>>>> approach: I
>>>>>>>> just
>>>>>>>>>>>>> want to point out, that step (1) is not necessary for each
>>>>>> input
>>>>>>>>>>> record,
>>>>>>>>>>>>> because timestamps are not guaranteed to be unique and
>> thus a
>>>>>>>> previous
>>>>>>>>>>>>> record with the same key and timestamp might have create
>> the
>>>>>> windows
>>>>>>>>>>>>> already.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Nit: I am also not exactly sure what you mean by step (3)
>> as
>>>>>> you use
>>>>>>>>>>> the
>>>>>>>>>>>>> word "send". I guess you mean "put"?
>>>>>>>>>>>>>
>>>>>>>>>>>>> It seem there are actually more details in the sub-page:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> A new record for SlidingWindows will always create two new
>>>>>> windows.
>>>>>>>>>>> If
>>>>>>>>>>>>> either of those windows already exist in the windows store,
>>>>>> their
>>>>>>>>>>>>> aggregation will simply be updated to include the new
>>>>>> record, but no
>>>>>>>>>>>>> duplicate window will be added to the WindowStore.
>>>>>>>>>>>>>
>>>>>>>>>>>>> However, the first and second sentence contradict each
>> other
>>>>>> a
>>>>>>>> little
>>>>>>>>>>>>> bit. I think the first sentence is not correct.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Nit:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> For in-order records, the left window will always be
>> empty.
>>>>>>>>>>>>>
>>>>>>>>>>>>> This should be "right window" ?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 5) "Emitting Results": it might be worth to point out,
>> that a
>>>>>>>>>>>>> second/future window of a new record is create with no
>>>>>> records, and
>>>>>>>>>>>>> thus, even if it's initialized it won't be emitted. Only
>> if a
>>>>>>>>>>>>> consecutive record falls into the window, the window would
>> be
>>>>>>>> updates
>>>>>>>>>>>>> and the window result (for a window content of one record)
>>>>>> would be
>>>>>>>>>>> sent
>>>>>>>>>>>>> downstream.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Again, the sub-page contains this details. Might still be
>>>>>> worth to
>>>>>>>> add
>>>>>>>>>>>>> to the top level page, too.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Btw: this implementation actually raises an issue for IQ:
>>>>>> those
>>>>>>>> empty
>>>>>>>>>>>>> windows would be returned. Thus I am wondering if we need
>> to
>>>>>> use two
>>>>>>>>>>>>> stores internally? One store for actual windows and one
>>>>>> store for
>>>>>>>> empty
>>>>>>>>>>>>> windows? If an empty window is updated, it's move to the
>>>>>> other
>>>>>>>> store?
>>>>>>>>>>>>> For IQ, we only allow to query the non-empty-window store?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 6) On the sub-page:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> The left window of in-order records and both windows for
>>>>>>>> out-of-order
>>>>>>>>>>>>> records need to be updated with the values of records that
>>>>>> have
>>>>>>>> already
>>>>>>>>>>>>> been processed.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Why "both windows for out-of-order records"? IMHO, we don't
>>>>>> know how
>>>>>>>>>>>>> many existing windows needs to be updated when processing
>> an
>>>>>>>>>>>>> out-of-order record. Of course, an out-of-order record
>> could
>>>>>> not
>>>>>>>> fall
>>>>>>>>>>>>> into any existing window but create two new windows, too.
>>>>>>>>>>>>>
>>>>>>>>>>>>>>  Because each record creates one new window that includes
>>>>>> itself
>>>>>>>> and
>>>>>>>>>>> one
>>>>>>>>>>>>> window that does not
>>>>>>>>>>>>>
>>>>>>>>>>>>> As state above, this does not seem to hold. I understand
>> why
>>>>>> you
>>>>>>>> mean,
>>>>>>>>>>>>> but it would be good to be exact.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Figure 2: You use the term "late" but you mean
>>>>>> "out-of-order" I
>>>>>>>> guess
>>>>>>>>>>> --
>>>>>>>>>>>>> a record is _late_ if it's not processed any longer as the
>>>>>> grace
>>>>>>>> period
>>>>>>>>>>>>> passed already.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Figure 2: "Late" should be out-or-order. The example text
>>>>>> say a
>>>>>>>> window
>>>>>>>>>>>>> [16,26] should be created but the figure shows the green
>>>>>> window as
>>>>>>>>>>> [15,20].
>>>>>>>>>>>>>
>>>>>>>>>>>>> About the blue window: maybe add not that the blue window
>>>>>> contains
>>>>>>>> the
>>>>>>>>>>>>> aggregate we need for the green window, _before_ the new
>>>>>> record `a`
>>>>>>>> is
>>>>>>>>>>>>> added to the blue window.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 7) I am not really happy to extend TimeWindows and I think
>>>>>> the
>>>>>>>> argument
>>>>>>>>>>>>> about JoinWindows is not the best (IMHO, JoinWindows do it
>>>>>> already
>>>>>>>>>>> wrong
>>>>>>>>>>>>> and we just repeat the same mistake). However, it seems our
>>>>>> window
>>>>>>>>>>>>> hierarchy is "broken" already and it might be out of scope
>>>>>> for this
>>>>>>>> KIP
>>>>>>>>>>>>> to fix it. Hence, I am ok that we bite the bullet for now
>>>>>> and clean
>>>>>>>> it
>>>>>>>>>>>>> up later.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 7/20/20 5:18 PM, Guozhang Wang wrote:
>>>>>>>>>>>>>> Hi Leah,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the updated KIP. I agree that extending
>>>>>> SlidingWindows
>>>>>>>>>>> from
>>>>>>>>>>>>>> Windows is fine for the sake of not introducing more
>> public
>>>>>> APIs
>>>>>>>> (and
>>>>>>>>>>>>> their
>>>>>>>>>>>>>> internal xxxImpl classes), and its cons is small enough to
>>>>>> tolerate
>>>>>>>>>>> to
>>>>>>>>>>>>> me.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Jul 20, 2020 at 1:49 PM Leah Thomas <
>>>>>> ltho...@confluent.io>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for the feedback on the KIP. I've updated the KIP
>>>>>> page
>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> to address these points and have created a child page
>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> to go more in depth on certain implementation details.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *Grace Period:*
>>>>>>>>>>>>>>> I think Sophie raises a good point that the default grace
>>>>>> period
>>>>>>>> of
>>>>>>>>>>> 24
>>>>>>>>>>>>>>> hours is often too long and was chosen when retention
>> time
>>>>>> and
>>>>>>>> grace
>>>>>>>>>>>>> period
>>>>>>>>>>>>>>> were the same. For SlidingWindows, I propose we make the
>>>>>> grace
>>>>>>>>>>> period
>>>>>>>>>>>>>>> mandatory. To keep formatting consistent with other types
>>>>>> of
>>>>>>>>>>> windows,
>>>>>>>>>>>>> grace
>>>>>>>>>>>>>>> period won't be an additional parameter in the #of
>> method,
>>>>>> but
>>>>>>>> will
>>>>>>>>>>>>> still
>>>>>>>>>>>>>>> look like it does in other use cases:
>>>>>>>>>>>>>>>
>>>>>> .windowedBy(SlidingWindows.of(twentySeconds).grace(fiftySeconds).
>>>>>>>> If
>>>>>>>>>>>>> grace
>>>>>>>>>>>>>>> period isn't properly initialized, an error will be
>> thrown
>>>>>> through
>>>>>>>>>>> the
>>>>>>>>>>>>>>> process method.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *Storage Layer + Aggregation:*
>>>>>>>>>>>>>>> SlidingWindows will use a WindowStore because computation
>>>>>> can be
>>>>>>>>>>> done
>>>>>>>>>>>>> with
>>>>>>>>>>>>>>> the information stored in a WindowStore (window timestamp
>>>>>> and
>>>>>>>>>>> value).
>>>>>>>>>>>>> Using
>>>>>>>>>>>>>>> the WindowStore also simplifies computation as
>>>>>> SlidingWindows can
>>>>>>>>>>>>> leverage
>>>>>>>>>>>>>>> existing processes. Because we are using a WindowStore,
>> the
>>>>>>>>>>> aggregation
>>>>>>>>>>>>>>> process will be similar to that of a hopping window. As
>>>>>> records
>>>>>>>>>>> come in
>>>>>>>>>>>>>>> their value is added to the aggregation that already
>>>>>> exists,
>>>>>>>>>>> following
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> same procedure as hopping windows. The aggregation
>>>>>> difference
>>>>>>>>>>> between
>>>>>>>>>>>>>>> SlidingWindows and HoppingWindows comes in creating new
>>>>>> windows
>>>>>>>> for
>>>>>>>>>>> a
>>>>>>>>>>>>>>> SlidingWindow, where you need to find the existing
>> records
>>>>>> that
>>>>>>>>>>> belong
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> the new window. This computation is similar to the
>>>>>> aggregation in
>>>>>>>>>>>>>>> SessionWindows and requires a scan to the WindowStore to
>>>>>> find the
>>>>>>>>>>> window
>>>>>>>>>>>>>>> with the aggregation needed, which will always be
>>>>>> pre-computed.
>>>>>>>> The
>>>>>>>>>>> scan
>>>>>>>>>>>>>>> requires creating an iterator, but should have minimal
>>>>>> performance
>>>>>>>>>>>>> effects
>>>>>>>>>>>>>>> as this strategy is already implemented in
>> SessionWindows.
>>>>>> More
>>>>>>>>>>> details
>>>>>>>>>>>>> on
>>>>>>>>>>>>>>> finding the aggregation that needs to be put in a new
>>>>>> window can
>>>>>>>> be
>>>>>>>>>>>>> found
>>>>>>>>>>>>>>> on the implementation page
>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> .
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *Extending Windows<TimeWindow>, Windows<Window> or
>> nothing*
>>>>>>>>>>>>>>> Because SlidingWindows are still defined by a windowSize
>>>>>> (whereas
>>>>>>>>>>>>>>> SessionWindows are defined purely by data), I think it
>>>>>> makes sense
>>>>>>>>>>> to
>>>>>>>>>>>>>>> leverage the existing Window processes instead of
>> creating
>>>>>> a new
>>>>>>>>>>> store
>>>>>>>>>>>>> type
>>>>>>>>>>>>>>> that would be very similar to the WindowStore. While it's
>>>>>> true
>>>>>>>> that
>>>>>>>>>>> the
>>>>>>>>>>>>>>> #windowsFor method isn't necessary for SlidingWindows,
>>>>>> JoinWindows
>>>>>>>>>>> also
>>>>>>>>>>>>>>> extends Windows<Window> and throws an
>>>>>>>> UnsupportedOperationException
>>>>>>>>>>> in
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> #windowsFor method, which is what SlidingWindows can do.
>>>>>> The
>>>>>>>>>>> difference
>>>>>>>>>>>>>>> between extending Windows<TimeWindow> or Windows<Window>
>> is
>>>>>>>>>>> minimal, as
>>>>>>>>>>>>>>> both are ways to pass window parameters. Extending
>>>>>>>>>>> Windows<TimeWindow>
>>>>>>>>>>>>> will
>>>>>>>>>>>>>>> give us more leverage in utilizing existing processes.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *Emit Strategy*
>>>>>>>>>>>>>>> I would argue that emitting for every update is still the
>>>>>> best way
>>>>>>>>>>> to go
>>>>>>>>>>>>>>> for SlidingWindows because it mimics the other types of
>>>>>> windows,
>>>>>>>> and
>>>>>>>>>>>>>>> suppression can be leveraged to limit what SlidingWindows
>>>>>> emits.
>>>>>>>>>>> While
>>>>>>>>>>>>> some
>>>>>>>>>>>>>>> users may only want to see the last value, others may
>> want
>>>>>> to see
>>>>>>>>>>> more,
>>>>>>>>>>>>> and
>>>>>>>>>>>>>>> leaving the emit strategy to emit partial results allows
>>>>>> both
>>>>>>>> users
>>>>>>>>>>> to
>>>>>>>>>>>>>>> access what they want.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *Additional Features*
>>>>>>>>>>>>>>> Supporting sliding windows inherently, and shifting
>>>>>> inefficient
>>>>>>>>>>> hopping
>>>>>>>>>>>>>>> windows to sliding windows, is an interesting idea and
>>>>>> could be
>>>>>>>>>>> built on
>>>>>>>>>>>>>>> top of SlidingWindows when they are finished, but right
>>>>>> now seems
>>>>>>>>>>> out of
>>>>>>>>>>>>>>> scope for the needs of this KIP. Similarly, including a
>>>>>>>>>>> `subtraction`
>>>>>>>>>>>>>>> feature could have performance improvements, but doesn't
>>>>>> seem
>>>>>>>>>>> necessary
>>>>>>>>>>>>> for
>>>>>>>>>>>>>>> the implementation of this KIP.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Let me know what you think of the updates,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Leah
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Jul 16, 2020 at 11:57 AM John Roesler <
>>>>>>>> vvcep...@apache.org>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for the KIP, Leah!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Regarding (1): I'd go farther actually. Making Windows
>> an
>>>>>>>> abstract
>>>>>>>>>>>>>>>> class was a mistake from the beginning that led to us
>> not
>>>>>> being
>>>>>>>>>>>>>>>> able to fix a very confusing situation for users around
>>>>>> retention
>>>>>>>>>>>>> times,
>>>>>>>>>>>>>>>> final results emitting, etc. Thus, I would not suggest
>>>>>> extending
>>>>>>>>>>>>>>>> TimeWindows for sure, but would also not suggest
>> extending
>>>>>>>> Windows.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The very simplest thing to do is follow the example of
>>>>>>>>>>> SessionWindows,
>>>>>>>>>>>>>>>> which is just a completely self-contained class. If we
>>>>>> don't mess
>>>>>>>>>>> with
>>>>>>>>>>>>>>>> class inheritance, we won't ever have any of the
>> problems
>>>>>> related
>>>>>>>>>>> to
>>>>>>>>>>>>>>>> class inheritance. This is my preferred solution.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Still, Sliding windows has a lot in common with
>>>>>> TimeWindows and
>>>>>>>>>>> other
>>>>>>>>>>>>>>>> fixed-size windows, namely that the windows are fixed in
>>>>>> size. If
>>>>>>>>>>> we
>>>>>>>>>>>>> want
>>>>>>>>>>>>>>>> to preserve the current two-part windowing API in which
>>>>>> you can
>>>>>>>>>>> window
>>>>>>>>>>>>>>>> by either "fixed" or "data driven" modes, I'd suggest we
>>>>>> avoid
>>>>>>>>>>>>> increasing
>>>>>>>>>>>>>>>> the blast radius of Windows by taking the opportunity to
>>>>>> replace
>>>>>>>> it
>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>> a proper interface and implement that interface instead.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For example:
>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/9031
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Then, SlidingWindows would just implement
>>>>>>>> FixedSizeWindowDefinition
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ======
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Regarding (2), it seems more straightforward as a user
>> of
>>>>>> Streams
>>>>>>>>>>>>>>>> to just have one mental model. _All_ of our aggregation
>>>>>>>> operations
>>>>>>>>>>>>>>>> follow an eager emission model, in which we just emit an
>>>>>> update
>>>>>>>>>>>>> whenever
>>>>>>>>>>>>>>>> an update is available. We already provided Suppression
>> to
>>>>>>>>>>> explicitly
>>>>>>>>>>>>>>> apply
>>>>>>>>>>>>>>>> different update semantics in the case it's required.
>> Why
>>>>>> should
>>>>>>>> we
>>>>>>>>>>>>>>> define
>>>>>>>>>>>>>>>> a snowflake operation with completely different
>> semantics
>>>>>> from
>>>>>>>>>>>>> everything
>>>>>>>>>>>>>>>> else? I.e., systems are generally easier to use when
>> they
>>>>>> follow
>>>>>>>> a
>>>>>>>>>>> few
>>>>>>>>>>>>>>>> simple, composable rules than when they have a lot of
>>>>>> different,
>>>>>>>>>>>>> specific
>>>>>>>>>>>>>>>> rules.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ======
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> New point: (4):
>>>>>>>>>>>>>>>> It would be nice to include some examples of user code
>>>>>> that would
>>>>>>>>>>> use
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> new API, which should include:
>>>>>>>>>>>>>>>> 1. using the DSL with the sliding window definition
>>>>>>>>>>>>>>>> 2. accessing the stored results of a sliding window
>>>>>> aggregation
>>>>>>>>>>> via IQ
>>>>>>>>>>>>>>>> 3. defining a custom processor to access sliding windows
>>>>>> in a
>>>>>>>> store
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> It generally helps reviewers wrap their heads around the
>>>>>>>> proposal,
>>>>>>>>>>> as
>>>>>>>>>>>>>>> well
>>>>>>>>>>>>>>>> as shaking out any design issues that would otherwise
>>>>>> only come
>>>>>>>> up
>>>>>>>>>>>>> during
>>>>>>>>>>>>>>>> implementation/testing/review.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks again for the awesome proposal!
>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Jul 14, 2020, at 12:31, Guozhang Wang wrote:
>>>>>>>>>>>>>>>>> Hello Leah,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for the nice written KIP. A few thoughts:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 1) I echo the other reviewer's comments regarding the
>>>>>> typing:
>>>>>>>> why
>>>>>>>>>>>>>>>> extending
>>>>>>>>>>>>>>>>> TimeWindow instead of just extending Window?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2) I also feel that emitting policy for this type of
>>>>>> windowing
>>>>>>>>>>>>>>>> aggregation
>>>>>>>>>>>>>>>>> may be different from the existing ones. Existing
>>>>>> emitting
>>>>>>>> policy
>>>>>>>>>>> is
>>>>>>>>>>>>>>> very
>>>>>>>>>>>>>>>>> simple: emit every time when window get updates, and
>>>>>> emit every
>>>>>>>>>>> time
>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>> out-of-ordering data within grace period, this is
>>>>>> because for
>>>>>>>>>>>>>>>> time-windows
>>>>>>>>>>>>>>>>> the window close time is strictly depend on the window
>>>>>> start
>>>>>>>> time
>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>> fixed, while for session-windows although the window
>>>>>> open/close
>>>>>>>>>>> time
>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>> also data-dependent it is relatively infrequent
>> compared
>>>>>> to the
>>>>>>>>>>>>>>>>> sliding-windows. For this KIP, since each new data
>> would
>>>>>> cause a
>>>>>>>>>>>>>>>>> new sliding-window, the num. windows maintained
>>>>>> logically could
>>>>>>>> be
>>>>>>>>>>>>> much
>>>>>>>>>>>>>>>>> larger and hence emitting on each update may be too
>>>>>> aggressive.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 3) Although KIP itself should be focusing on user face
>>>>>>>>>>> interfaces, I'd
>>>>>>>>>>>>>>>>> suggest we create a children page of KIP-450 discussing
>>>>>> about
>>>>>>>> its
>>>>>>>>>>>>>>>>> implementations as well, since some of that may drive
>> the
>>>>>>>>>>> interface
>>>>>>>>>>>>>>>> design.
>>>>>>>>>>>>>>>>> E.g. personally I think having a combiner interface in
>>>>>> addition
>>>>>>>> to
>>>>>>>>>>>>>>>>> aggregator would be useful but that's based on my
>> 2cents
>>>>>> about
>>>>>>>> the
>>>>>>>>>>>>>>>>> implementation design (I once created a child page
>>>>>> describing
>>>>>>>> it:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation
>>>>>>>>>>>>>>>>> ).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Tue, Jul 14, 2020 at 5:31 AM Bruno Cadonna <
>>>>>>>> br...@confluent.io
>>>>>>>>>>>>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Leah,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thank you for the KIP!
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Here is my feedback:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 1. The KIP would benefit from some code examples that
>>>>>> show how
>>>>>>>>>>> to use
>>>>>>>>>>>>>>>>>> sliding windows in aggregations.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 2. The different sliding windows in Figure 1 and 2 are
>>>>>> really
>>>>>>>>>>> hard to
>>>>>>>>>>>>>>>>>> distinguish. Could you please try to make them
>>>>>> graphically
>>>>>>>> better
>>>>>>>>>>>>>>>>>> distinguishable? You could try to draw the frames of
>>>>>>>> consecutive
>>>>>>>>>>>>>>>>>> windows shifted to each other.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 3. I agree with Matthias, that extending
>>>>>> Windows<TimeWindow>
>>>>>>>>>>> does not
>>>>>>>>>>>>>>>>>> seem to be the best approach. What would be the result
>>>>>> of
>>>>>>>>>>>>>>>>>> windowsFor()?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 4. In the section "Public Interfaces" you should
>> remove
>>>>>>>>>>>>>>> implementation
>>>>>>>>>>>>>>>>>> details like private constructors and private fields.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 5. Do we need a new store interface or can we use
>>>>>> WindowStore?
>>>>>>>>>>> Some
>>>>>>>>>>>>>>>>>> words about that would be informative.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 6. @Matthias, if the subtrator is not strictly needed,
>>>>>> I would
>>>>>>>>>>> skip
>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>> for now and add it later.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 7. I also agree that having a section that describes
>>>>>> how to
>>>>>>>>>>> handle
>>>>>>>>>>>>>>>>>> out-of-order records would be good to understand what
>>>>>> is still
>>>>>>>>>>>>>>> missing
>>>>>>>>>>>>>>>>>> and what we can reuse.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>> Bruno
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Sat, Jul 11, 2020 at 9:16 PM Matthias J. Sax <
>>>>>>>>>>> mj...@apache.org>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Leah,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> thanks for your update. However, it does not
>>>>>> completely answer
>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>> question.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> In our current window implementations, we emit a
>>>>>> window result
>>>>>>>>>>>>>>> update
>>>>>>>>>>>>>>>>>>> record (ie, early/partial result) for each input
>>>>>> record. When
>>>>>>>> an
>>>>>>>>>>>>>>>>>>> out-of-order record arrives, we just update to
>>>>>> corresponding
>>>>>>>> old
>>>>>>>>>>>>>>>> window
>>>>>>>>>>>>>>>>>>> and emit another update.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> It's unclear from the KIP if you propose the same
>> emit
>>>>>>>>>>> strategy? --
>>>>>>>>>>>>>>>> For
>>>>>>>>>>>>>>>>>>> sliding windows it might be worth to consider to use
>> a
>>>>>>>> different
>>>>>>>>>>>>>>> emit
>>>>>>>>>>>>>>>>>>> strategy and only support emitting the final result
>>>>>> only (ie,
>>>>>>>>>>> after
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> grace period passed)?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Boyang, also raises a good point that relates to my
>>>>>> point from
>>>>>>>>>>>>>>> above
>>>>>>>>>>>>>>>>>>> about pre-aggregations and storage layout. Our
>> current
>>>>>> time
>>>>>>>>>>> windows
>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>> all pre-aggregated and stored in parallel. We can
>> also
>>>>>> lookup
>>>>>>>>>>>>>>> windows
>>>>>>>>>>>>>>>>>>> efficiently, as we can compute the windowed-key given
>>>>>> the
>>>>>>>> input
>>>>>>>>>>>>>>>> record
>>>>>>>>>>>>>>>>>>> key and timestamp based on the window definition.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> However, for sliding windows, window boundaries are
>>>>>> data
>>>>>>>>>>> dependent
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> thus we cannot compute them upfront. Thus, how can we
>>>>>> "find"
>>>>>>>>>>>>>>> existing
>>>>>>>>>>>>>>>>>>> window efficiently? Furthermore, out-of-order data
>>>>>> would
>>>>>>>> create
>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>> windows in the past and we need to be able to handle
>>>>>> this
>>>>>>>> case.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thus, to handle out-of-order data correctly, we need
>>>>>> to store
>>>>>>>>>>> all
>>>>>>>>>>>>>>> raw
>>>>>>>>>>>>>>>>>>> input events. Additionally, we could also store
>>>>>> pre-aggregated
>>>>>>>>>>>>>>>> results
>>>>>>>>>>>>>>>>>>> if we thinks it's benfitial. -- If we apply "emit
>> only
>>>>>> final
>>>>>>>>>>>>>>> results"
>>>>>>>>>>>>>>>>>>> strategy, storing pre-aggregated result would not be
>>>>>> necessary
>>>>>>>>>>>>>>>> though.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Btw: for sliding windows it might also be useful to
>>>>>> consider
>>>>>>>>>>>>>>> allowing
>>>>>>>>>>>>>>>>>>> users to supply a `Subtractor` -- this subtractor
>>>>>> could be
>>>>>>>>>>> applied
>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>> the current window result (in case we store it) if a
>>>>>> record
>>>>>>>>>>> drops
>>>>>>>>>>>>>>>> out of
>>>>>>>>>>>>>>>>>>> the window. Of course, not all aggregation functions
>>>>>> are
>>>>>>>>>>>>>>> subtractable
>>>>>>>>>>>>>>>>>>> and we can consider this as a follow up task, too,
>> and
>>>>>> not
>>>>>>>>>>> include
>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> this KIP for now. Thoughts?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I was also thinking about the type hierarchy. I am
>> not
>>>>>> sure if
>>>>>>>>>>>>>>>> extending
>>>>>>>>>>>>>>>>>>> TimeWindow is the best approach? For TimeWindows, we
>>>>>> can
>>>>>>>>>>>>>>> pre-compute
>>>>>>>>>>>>>>>>>>> window boundaries (cf `windowsFor()`) while for a
>>>>>> sliding
>>>>>>>> window
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> boundaries are data dependent. Session windows are
>>>>>> also data
>>>>>>>>>>>>>>>> dependent
>>>>>>>>>>>>>>>>>>> and thus they don't inherit from TimeWindow (Maybe
>>>>>> check out
>>>>>>>> the
>>>>>>>>>>>>>>> KIP
>>>>>>>>>>>>>>>>>>> that added session windows? It could provides some
>> good
>>>>>>>>>>> insights.)
>>>>>>>>>>>>>>>> -- I
>>>>>>>>>>>>>>>>>>> believe the same rational applies to sliding windows?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On 7/10/20 12:47 PM, Boyang Chen wrote:
>>>>>>>>>>>>>>>>>>>> Thanks Leah and Sophie for the KIP.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> 1. I'm a bit surprised that we don't have an advance
>>>>>> time.
>>>>>>>>>>> Could
>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>> elaborate how the storage layer is structured?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> 2. IIUC, there will be extra cost in terms of
>> fetching
>>>>>>>>>>>>>>> aggregation
>>>>>>>>>>>>>>>>>> results,
>>>>>>>>>>>>>>>>>>>> since we couldn't pre-aggregate until the user asks
>>>>>> for it.
>>>>>>>>>>> Would
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>> good
>>>>>>>>>>>>>>>>>>>> to also discuss it.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> 3. We haven't discussed the possibility of
>> supporting
>>>>>> sliding
>>>>>>>>>>>>>>>> windows
>>>>>>>>>>>>>>>>>>>> inherently. For a user who actually uses a hopping
>>>>>> window,
>>>>>>>>>>>>>>> Streams
>>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>> detect such an inefficiency doing a
>>>>>> window_size/advance_time
>>>>>>>>>>>>>>> ratio
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> reach
>>>>>>>>>>>>>>>>>>>> a conclusion on whether the write amplification is
>>>>>> too high
>>>>>>>>>>>>>>>> compared
>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>> some configured threshold. The benefit of doing so
>> is
>>>>>> that
>>>>>>>>>>>>>>> existing
>>>>>>>>>>>>>>>>>> Streams
>>>>>>>>>>>>>>>>>>>> users don't need to change their code, learn a new
>>>>>> API, but
>>>>>>>>>>> only
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> upgrade
>>>>>>>>>>>>>>>>>>>> Streams library to get benefits for their
>> inefficient
>>>>>> hopping
>>>>>>>>>>>>>>>> window
>>>>>>>>>>>>>>>>>>>> implementation. There might be some compatibility
>>>>>> issues for
>>>>>>>>>>>>>>> sure,
>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>>> worth listing them out for trade-off.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Boyang
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Fri, Jul 10, 2020 at 12:40 PM Leah Thomas <
>>>>>>>>>>>>>>> ltho...@confluent.io
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hey Matthias,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks for pointing that out. I added the following
>>>>>> to the
>>>>>>>>>>>>>>> Propose
>>>>>>>>>>>>>>>>>> Changes
>>>>>>>>>>>>>>>>>>>>> section of the KIP:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> "Records that come out of order will be processed
>>>>>> the same
>>>>>>>> way
>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>> in-order
>>>>>>>>>>>>>>>>>>>>> records, as long as they fall within the grace
>>>>>> period. Any
>>>>>>>> new
>>>>>>>>>>>>>>>> windows
>>>>>>>>>>>>>>>>>>>>> created by the late record will still be created,
>>>>>> and the
>>>>>>>>>>>>>>> existing
>>>>>>>>>>>>>>>>>> windows
>>>>>>>>>>>>>>>>>>>>> that are changed by the late record will be
>> updated.
>>>>>> Any
>>>>>>>>>>> record
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>> falls
>>>>>>>>>>>>>>>>>>>>> outside of the grace period (either user defined or
>>>>>> default)
>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>> discarded. "
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> All the best,
>>>>>>>>>>>>>>>>>>>>> Leah
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Thu, Jul 9, 2020 at 9:47 PM Matthias J. Sax <
>>>>>>>>>>>>>>> mj...@apache.org>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Leah,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> thanks a lot for the KIP. Very well written.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> The KIP does not talk about the handling of
>>>>>> out-of-order
>>>>>>>> data
>>>>>>>>>>>>>>>> though.
>>>>>>>>>>>>>>>>>>>>>> How do you propose to address this?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On 7/8/20 5:33 PM, Leah Thomas wrote:
>>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>>> I'd like to kick-off the discussion for KIP-450,
>>>>>> adding
>>>>>>>>>>>>>>> sliding
>>>>>>>>>>>>>>>>>> window
>>>>>>>>>>>>>>>>>>>>>>> aggregation support to Kafka Streams.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Let me know what you think,
>>>>>>>>>>>>>>>>>>>>>>> Leah
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Attachments:
>>>>>>>>> * signature.asc
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to