Thanks for the feedback Matthias and Bill. After discussing offline we
realized the type of windows I originally had in mind were quite different,
and I agree now that the semantics outlined by Matthias are the direction
to go in here. I will update the KIP accordingly with the new semantics
(and corresponding design) and restart the discussion from there.

In the meantime, to respond to some other points:

1) API:

I propose adding only the one class -- public class SlidingWindows extends
Windows<TimeWindow> {} --  so I do not believe we need any new Serdes? It
will still be a fixed size TimeWindow, but handled a bit differently. I've
updated the KIP to state explicitly all of the classes/methods being added

2) Zero grace period

The "zero grace period" was essentially just consequence of my original
definition for sliding windows; with the new semantics we can (and should)
allow for a nonzero grace period

3) Wall-clock time

Hm, I had not considered this yet but it may be a good idea to keep in mind
while rethinking the design. To clarify, we don't support wall-clock based
aggregations with hopping or tumbling windows though (yet?)

4) Commutative vs associative vs invertible aggregations

I agree that it's reasonable to assume commutativity and associativity, but
that's not the same as being subtractable -- that requires invertibility,
which is broken by a lot of very simple functions and is not, I think, ok
to assume. However we could consider adding a separate API which also takes
a subtractor and corresponds to a completely different implementation. We
could also consider an API that takes a function that aggregates two
aggregates together in addition to the existing aggregator (which
aggregates a single value with an existing aggregate) WDYT?




On Thu, Apr 11, 2019 at 1:13 AM Matthias J. Sax <matth...@confluent.io>
wrote:

> Thanks for the KIP Sophie. Couple of comments:
>
> It's a little unclear to me, what public API you propose. It seems you
> want to add
>
> > public class SlidingWindow extends TimeWindow {}
>
> and
>
> > public class SlidingWindows extends TimeWindows {} // or maybe `extends
> Windows`
>
> If yes, should we add corresponding public Serdes classes?
>
> Also, can you list all newly added classes/methods explicitly in the wiki?
>
>
> About the semantics of the operator.
>
> > "Only one single window is defined at a time,"
>
> Should this be "one window per key" instead?
>
> I agree that both window boundaries should be inclusive. However, I am
> not sure about:
>
> > At most one record is forwarded when new data arrives
>
> (1) For what case, no output would be produced?
>
> (2) I think, if we advance in time, it can also happen that we emit
> multiple records. If a window "slides" (not "hops"), we cannot just
> advance it to the current record stream time but would need to emit more
> result if records expire before the current input record is added. For
> example, consider a window with size 5ms, and the following ts (all
> records have the same key):
>
> 1 2 3 10 11
>
> This should result in windows:
>
> [1]
> [1,2]
> [1,2,3]
> [2,3]
> [3]
> [10]
> [10,11]
>
> Ie, when the record with ts=10 is processed, it will trigger the
> computation of [2,3], [3] and [10].
>
>
> About out-of-order handling: I am wondering, if the current design that
> does not allow any grace period is too restrictive. Can you elaborate
> more on the motivation for this suggestions?
>
>
> Can you give more details about the "simple design"? Atm, it's not clear
> to me how it works. I though we always need to store all raw values. If
> we only store the current aggregate, would we end up with the same
> inefficient solution as using a hopping window with advance 1ms?
>
>
> For the O(sqrt(N)) proposal: can you maybe add an example with concrete
> bucket sizes, window size etc. The current proposal is a little unclear
> to me, atm.
>
>
> How are windows advance? Do you propose to advance all windows over all
> keys at the same time, or would each window (per key) advance
> independent from all other windows? What would be the pros/cons for both
> approaches?
>
>
> To add to Guozhang's comment: atm, DSL operators assume that aggregate
> functions are commutative and associative. Hence, it seems ok to make
> the same assumption for sliding window. Addressing holistic and
> non-subtractable aggregations should be supported out of the box at some
> point, too, but this would be a different KIP adding this to all
> existing aggregations.
>
>
> -Matthias
>
>
>
> On 4/9/19 4:38 PM, Guozhang Wang wrote:
> > Hi Sophie,
> >
> > Thanks for the proposed KIP. I've made a pass over it and here are some
> > thoughts:
> >
> > 1. "The window size is effectively the grace and retention period". The
> > grace time is defined as "the time to admit late-arriving events after
> the
> > end of the window." hence it is the additional time beyond the window
> size.
> > I guess your were trying to say it should be zero?
> >
> > Also for retention period, it is not a notion of the window spec any
> more,
> > but only for the window store itself. So I'd suggest talking about window
> > size here, and note that store retention time cannot be controlled via
> > window spec at all.
> >
> > 2. In the "O(sqrt(N)) Design" you did not mention when / how to expire a
> > bucket, so I'd assume you will expire one bucket as a whole when its end
> > time is smaller than the current window's starting time, right?
> >
> > 3. Also in your algorithm how to choose "M" seems tricky, would it be a
> > configurable parameter exposed to users or is it abstracted away and only
> > being selected internally?
> >
> > 4. "There is some tradeoff between purely optimizing " seems incomplete
> > paragraph?
> >
> > 5. Meta comment: for many aggregations it is commutative and associative
> so
> > we can require users to pass in a "substract" function as well. Given
> these
> > two function I think we can propose two set of APIs, 1) with the adder
> and
> > subtractor and 2) with the added only (if the aggregate logic is not
> comm.
> > and assoc.).
> >
> > We just maintain an aggregate value for each bucket (call it
> > bucket_aggregate) plus for the whole window (call it total_aggregate),
> i.e.
> > at most M + 1 values per key. We use the total_aggregate for queries, and
> > each update will cause 2 writes (to the bucket and to the total
> aggregate).
> >
> > And with 1) when expiring the oldest bucket we simply call
> > subtract(total_aggregate, bucket_aggregate); with 2) when expiring the
> > oldest bucket we can re-compute the total_aggregate by
> > sum(bucket_aggregate) over other buckets again.
> >
> > 6. Meta comment: it is reasonable to assume in practice out-of-ordering
> > data is not very common, hence most of the updates will be falling into
> the
> > latest bucket. So I'm wondering if it makes sense to always store the
> first
> > bucket in memory while making other buckets optionally on persistent
> > storage. In practice, as long as M is large enough (we probably need it
> to
> > be large enough to have sufficiently sensitive expiration anyways) then
> > each bucket's aggregate data is small enough to be in memory.
> >
> >
> >
> > Guozhang
> >
> >
> >
> > On Fri, Apr 5, 2019 at 7:58 PM Sophie Blee-Goldman <sop...@confluent.io>
> > wrote:
> >
> >> Hello all,
> >>
> >> I would like to kick off discussion of this KIP aimed at providing
> sliding
> >> window semantics to DSL aggregations.
> >>
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> >>
> >> Please take a look and share any thoughts you have regarding the API,
> >> semantics, design, etc!
> >>
> >> I also have a POC PR open with the "naive" implementation for your
> >> reference: https://github.com/apache/kafka/pull/6549
> >>
> >> Cheers,
> >> Sophie
> >>
> >
> >
>
>

Reply via email to