Thanks for this idea, Guozhang, it does seem to be a nice way to solve
the problem.

I'm a _little_ concerned about the interface, though. It might be
better to just add a new argument to a new method overload like
`(initializer, aggregator, merger/combinator/whatever)`.

Two reasons come to mind for this:
1) CombineAggregator is no longer a functional interface, so users
have to switch to anonymous classes
2) there's a discoverability problem, because the API doesn't
advertise CombineAggregator anywhere, it's just a magic parameter you
can pass to get more efficient executions

On the other hand, adding an argument (initializer, aggregator,
merger/combinator/whatever) lets you supply lambdas for all the args,
and also makes it clear that you're getting different (more efficient)
execution behavior.

WDYT?

Thanks again,
-John


On Wed, Sep 4, 2019 at 7:53 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> Hi folks,
>
> I've been thinking more about this KIP and my understanding is that we want
> to introduce a new SlidingWindow notion for aggregation since our current
> TimeWindow aggregation is not very efficient with very small steps. So I'm
> wondering that rather than introducing a new implementation mechanism, what
> if we just optimize the TimeWindowed aggregations where we can allow a very
> small advance step (which would in practice sufficient mimic the sliding
> window behavior) compared to the window length itself, e.g. a window length
> of 10 minutes with 1 second advance.
>
> I've quickly write up an alternative proposal for KIP-450 here:
> https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation
> Please
> let me know your thoughts.
>
>
> Guozhang
>
> On Tue, Apr 16, 2019 at 3:14 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > Thanks Sophie!
> >
> >
> > Regarding (4), I am in favor to support both. Not sure if we can reuse
> > existing window store (with enabling to store duplicates) for this case
> > or not though, or if we need to design a new store to keep all raw records?
> >
> > Btw: for holistic aggregations, like media, we would need to support a
> > different store layout for existing aggregations (time-window,
> > session-window), too. Thus, if we add support for this, we might be able
> > to kill two birds with one stone. Of course, we would still need new
> > APIs for existing aggregations to allow users to pick between both cases.
> >
> > I only bring this up, because it might make sense to design the store in
> > a way such that we can use it for all cases.
> >
> >
> > About (3): atm we support wall-clock time via the corresponding
> > `WallclockTimestampeExtractor`. Maybe Bill can elaborate a little bit
> > more what he has in mind exactly, and why using this extractor would not
> > meet the requirements for processing-time sliding windows?
> >
> >
> > -Matthias
> >
> >
> > On 4/16/19 10:16 AM, Guozhang Wang wrote:
> > > Regarding 4): yes I agree with you that invertibility is not a common
> > > property for agg-functions. Just to be clear about our current APIs: for
> > > stream.aggregate we only require a single Adder function, whereas for
> > > table.aggregate we require both Adder and Subtractor, but these are not
> > > used to leverage any properties just that the incoming table changelog
> > > stream may contain "tombstones" and hence we need to negate the effect of
> > > the previous record that has been deleted by this tombstone.
> > >
> > > What I'm proposing is exactly having two APIs, one for Adder only (like
> > > other Streams aggregations) and one for Subtractor + Adder (for agg
> > > functions users think are invertible) for efficiency. Some other
> > frameworks
> > > (e.g. Spark) have similar options for users and will recommend using the
> > > latter so that some optimization in implementation can be done.
> > >
> > >
> > > Guozhang
> > >
> > > On Mon, Apr 15, 2019 at 12:29 PM Sophie Blee-Goldman <
> > sop...@confluent.io>
> > > wrote:
> > >
> > >> Thanks for the feedback Matthias and Bill. After discussing offline we
> > >> realized the type of windows I originally had in mind were quite
> > different,
> > >> and I agree now that the semantics outlined by Matthias are the
> > direction
> > >> to go in here. I will update the KIP accordingly with the new semantics
> > >> (and corresponding design) and restart the discussion from there.
> > >>
> > >> In the meantime, to respond to some other points:
> > >>
> > >> 1) API:
> > >>
> > >> I propose adding only the one class -- public class SlidingWindows
> > extends
> > >> Windows<TimeWindow> {} --  so I do not believe we need any new Serdes?
> > It
> > >> will still be a fixed size TimeWindow, but handled a bit differently.
> > I've
> > >> updated the KIP to state explicitly all of the classes/methods being
> > added
> > >>
> > >> 2) Zero grace period
> > >>
> > >> The "zero grace period" was essentially just consequence of my original
> > >> definition for sliding windows; with the new semantics we can (and
> > should)
> > >> allow for a nonzero grace period
> > >>
> > >> 3) Wall-clock time
> > >>
> > >> Hm, I had not considered this yet but it may be a good idea to keep in
> > mind
> > >> while rethinking the design. To clarify, we don't support wall-clock
> > based
> > >> aggregations with hopping or tumbling windows though (yet?)
> > >>
> > >> 4) Commutative vs associative vs invertible aggregations
> > >>
> > >> I agree that it's reasonable to assume commutativity and associativity,
> > but
> > >> that's not the same as being subtractable -- that requires
> > invertibility,
> > >> which is broken by a lot of very simple functions and is not, I think,
> > ok
> > >> to assume. However we could consider adding a separate API which also
> > takes
> > >> a subtractor and corresponds to a completely different implementation.
> > We
> > >> could also consider an API that takes a function that aggregates two
> > >> aggregates together in addition to the existing aggregator (which
> > >> aggregates a single value with an existing aggregate) WDYT?
> > >>
> > >>
> > >>
> > >>
> > >> On Thu, Apr 11, 2019 at 1:13 AM Matthias J. Sax <matth...@confluent.io>
> > >> wrote:
> > >>
> > >>> Thanks for the KIP Sophie. Couple of comments:
> > >>>
> > >>> It's a little unclear to me, what public API you propose. It seems you
> > >>> want to add
> > >>>
> > >>>> public class SlidingWindow extends TimeWindow {}
> > >>>
> > >>> and
> > >>>
> > >>>> public class SlidingWindows extends TimeWindows {} // or maybe
> > `extends
> > >>> Windows`
> > >>>
> > >>> If yes, should we add corresponding public Serdes classes?
> > >>>
> > >>> Also, can you list all newly added classes/methods explicitly in the
> > >> wiki?
> > >>>
> > >>>
> > >>> About the semantics of the operator.
> > >>>
> > >>>> "Only one single window is defined at a time,"
> > >>>
> > >>> Should this be "one window per key" instead?
> > >>>
> > >>> I agree that both window boundaries should be inclusive. However, I am
> > >>> not sure about:
> > >>>
> > >>>> At most one record is forwarded when new data arrives
> > >>>
> > >>> (1) For what case, no output would be produced?
> > >>>
> > >>> (2) I think, if we advance in time, it can also happen that we emit
> > >>> multiple records. If a window "slides" (not "hops"), we cannot just
> > >>> advance it to the current record stream time but would need to emit
> > more
> > >>> result if records expire before the current input record is added. For
> > >>> example, consider a window with size 5ms, and the following ts (all
> > >>> records have the same key):
> > >>>
> > >>> 1 2 3 10 11
> > >>>
> > >>> This should result in windows:
> > >>>
> > >>> [1]
> > >>> [1,2]
> > >>> [1,2,3]
> > >>> [2,3]
> > >>> [3]
> > >>> [10]
> > >>> [10,11]
> > >>>
> > >>> Ie, when the record with ts=10 is processed, it will trigger the
> > >>> computation of [2,3], [3] and [10].
> > >>>
> > >>>
> > >>> About out-of-order handling: I am wondering, if the current design that
> > >>> does not allow any grace period is too restrictive. Can you elaborate
> > >>> more on the motivation for this suggestions?
> > >>>
> > >>>
> > >>> Can you give more details about the "simple design"? Atm, it's not
> > clear
> > >>> to me how it works. I though we always need to store all raw values. If
> > >>> we only store the current aggregate, would we end up with the same
> > >>> inefficient solution as using a hopping window with advance 1ms?
> > >>>
> > >>>
> > >>> For the O(sqrt(N)) proposal: can you maybe add an example with concrete
> > >>> bucket sizes, window size etc. The current proposal is a little unclear
> > >>> to me, atm.
> > >>>
> > >>>
> > >>> How are windows advance? Do you propose to advance all windows over all
> > >>> keys at the same time, or would each window (per key) advance
> > >>> independent from all other windows? What would be the pros/cons for
> > both
> > >>> approaches?
> > >>>
> > >>>
> > >>> To add to Guozhang's comment: atm, DSL operators assume that aggregate
> > >>> functions are commutative and associative. Hence, it seems ok to make
> > >>> the same assumption for sliding window. Addressing holistic and
> > >>> non-subtractable aggregations should be supported out of the box at
> > some
> > >>> point, too, but this would be a different KIP adding this to all
> > >>> existing aggregations.
> > >>>
> > >>>
> > >>> -Matthias
> > >>>
> > >>>
> > >>>
> > >>> On 4/9/19 4:38 PM, Guozhang Wang wrote:
> > >>>> Hi Sophie,
> > >>>>
> > >>>> Thanks for the proposed KIP. I've made a pass over it and here are
> > some
> > >>>> thoughts:
> > >>>>
> > >>>> 1. "The window size is effectively the grace and retention period".
> > The
> > >>>> grace time is defined as "the time to admit late-arriving events after
> > >>> the
> > >>>> end of the window." hence it is the additional time beyond the window
> > >>> size.
> > >>>> I guess your were trying to say it should be zero?
> > >>>>
> > >>>> Also for retention period, it is not a notion of the window spec any
> > >>> more,
> > >>>> but only for the window store itself. So I'd suggest talking about
> > >> window
> > >>>> size here, and note that store retention time cannot be controlled via
> > >>>> window spec at all.
> > >>>>
> > >>>> 2. In the "O(sqrt(N)) Design" you did not mention when / how to expire
> > >> a
> > >>>> bucket, so I'd assume you will expire one bucket as a whole when its
> > >> end
> > >>>> time is smaller than the current window's starting time, right?
> > >>>>
> > >>>> 3. Also in your algorithm how to choose "M" seems tricky, would it be
> > a
> > >>>> configurable parameter exposed to users or is it abstracted away and
> > >> only
> > >>>> being selected internally?
> > >>>>
> > >>>> 4. "There is some tradeoff between purely optimizing " seems
> > incomplete
> > >>>> paragraph?
> > >>>>
> > >>>> 5. Meta comment: for many aggregations it is commutative and
> > >> associative
> > >>> so
> > >>>> we can require users to pass in a "substract" function as well. Given
> > >>> these
> > >>>> two function I think we can propose two set of APIs, 1) with the adder
> > >>> and
> > >>>> subtractor and 2) with the added only (if the aggregate logic is not
> > >>> comm.
> > >>>> and assoc.).
> > >>>>
> > >>>> We just maintain an aggregate value for each bucket (call it
> > >>>> bucket_aggregate) plus for the whole window (call it total_aggregate),
> > >>> i.e.
> > >>>> at most M + 1 values per key. We use the total_aggregate for queries,
> > >> and
> > >>>> each update will cause 2 writes (to the bucket and to the total
> > >>> aggregate).
> > >>>>
> > >>>> And with 1) when expiring the oldest bucket we simply call
> > >>>> subtract(total_aggregate, bucket_aggregate); with 2) when expiring the
> > >>>> oldest bucket we can re-compute the total_aggregate by
> > >>>> sum(bucket_aggregate) over other buckets again.
> > >>>>
> > >>>> 6. Meta comment: it is reasonable to assume in practice
> > out-of-ordering
> > >>>> data is not very common, hence most of the updates will be falling
> > into
> > >>> the
> > >>>> latest bucket. So I'm wondering if it makes sense to always store the
> > >>> first
> > >>>> bucket in memory while making other buckets optionally on persistent
> > >>>> storage. In practice, as long as M is large enough (we probably need
> > it
> > >>> to
> > >>>> be large enough to have sufficiently sensitive expiration anyways)
> > then
> > >>>> each bucket's aggregate data is small enough to be in memory.
> > >>>>
> > >>>>
> > >>>>
> > >>>> Guozhang
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Fri, Apr 5, 2019 at 7:58 PM Sophie Blee-Goldman <
> > >> sop...@confluent.io>
> > >>>> wrote:
> > >>>>
> > >>>>> Hello all,
> > >>>>>
> > >>>>> I would like to kick off discussion of this KIP aimed at providing
> > >>> sliding
> > >>>>> window semantics to DSL aggregations.
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>
> > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> > >>>>>
> > >>>>> Please take a look and share any thoughts you have regarding the API,
> > >>>>> semantics, design, etc!
> > >>>>>
> > >>>>> I also have a POC PR open with the "naive" implementation for your
> > >>>>> reference: https://github.com/apache/kafka/pull/6549
> > >>>>>
> > >>>>> Cheers,
> > >>>>> Sophie
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>>
> > >>
> > >
> > >
> >
> >
>
> --
> -- Guozhang

Reply via email to