Thanks for this idea, Guozhang, it does seem to be a nice way to solve the problem.
I'm a _little_ concerned about the interface, though. It might be better to just add a new argument to a new method overload like `(initializer, aggregator, merger/combinator/whatever)`. Two reasons come to mind for this: 1) CombineAggregator is no longer a functional interface, so users have to switch to anonymous classes 2) there's a discoverability problem, because the API doesn't advertise CombineAggregator anywhere, it's just a magic parameter you can pass to get more efficient executions On the other hand, adding an argument (initializer, aggregator, merger/combinator/whatever) lets you supply lambdas for all the args, and also makes it clear that you're getting different (more efficient) execution behavior. WDYT? Thanks again, -John On Wed, Sep 4, 2019 at 7:53 PM Guozhang Wang <wangg...@gmail.com> wrote: > > Hi folks, > > I've been thinking more about this KIP and my understanding is that we want > to introduce a new SlidingWindow notion for aggregation since our current > TimeWindow aggregation is not very efficient with very small steps. So I'm > wondering that rather than introducing a new implementation mechanism, what > if we just optimize the TimeWindowed aggregations where we can allow a very > small advance step (which would in practice sufficient mimic the sliding > window behavior) compared to the window length itself, e.g. a window length > of 10 minutes with 1 second advance. > > I've quickly write up an alternative proposal for KIP-450 here: > https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation > Please > let me know your thoughts. > > > Guozhang > > On Tue, Apr 16, 2019 at 3:14 PM Matthias J. Sax <matth...@confluent.io> > wrote: > > > Thanks Sophie! > > > > > > Regarding (4), I am in favor to support both. Not sure if we can reuse > > existing window store (with enabling to store duplicates) for this case > > or not though, or if we need to design a new store to keep all raw records? > > > > Btw: for holistic aggregations, like media, we would need to support a > > different store layout for existing aggregations (time-window, > > session-window), too. Thus, if we add support for this, we might be able > > to kill two birds with one stone. Of course, we would still need new > > APIs for existing aggregations to allow users to pick between both cases. > > > > I only bring this up, because it might make sense to design the store in > > a way such that we can use it for all cases. > > > > > > About (3): atm we support wall-clock time via the corresponding > > `WallclockTimestampeExtractor`. Maybe Bill can elaborate a little bit > > more what he has in mind exactly, and why using this extractor would not > > meet the requirements for processing-time sliding windows? > > > > > > -Matthias > > > > > > On 4/16/19 10:16 AM, Guozhang Wang wrote: > > > Regarding 4): yes I agree with you that invertibility is not a common > > > property for agg-functions. Just to be clear about our current APIs: for > > > stream.aggregate we only require a single Adder function, whereas for > > > table.aggregate we require both Adder and Subtractor, but these are not > > > used to leverage any properties just that the incoming table changelog > > > stream may contain "tombstones" and hence we need to negate the effect of > > > the previous record that has been deleted by this tombstone. > > > > > > What I'm proposing is exactly having two APIs, one for Adder only (like > > > other Streams aggregations) and one for Subtractor + Adder (for agg > > > functions users think are invertible) for efficiency. Some other > > frameworks > > > (e.g. Spark) have similar options for users and will recommend using the > > > latter so that some optimization in implementation can be done. > > > > > > > > > Guozhang > > > > > > On Mon, Apr 15, 2019 at 12:29 PM Sophie Blee-Goldman < > > sop...@confluent.io> > > > wrote: > > > > > >> Thanks for the feedback Matthias and Bill. After discussing offline we > > >> realized the type of windows I originally had in mind were quite > > different, > > >> and I agree now that the semantics outlined by Matthias are the > > direction > > >> to go in here. I will update the KIP accordingly with the new semantics > > >> (and corresponding design) and restart the discussion from there. > > >> > > >> In the meantime, to respond to some other points: > > >> > > >> 1) API: > > >> > > >> I propose adding only the one class -- public class SlidingWindows > > extends > > >> Windows<TimeWindow> {} -- so I do not believe we need any new Serdes? > > It > > >> will still be a fixed size TimeWindow, but handled a bit differently. > > I've > > >> updated the KIP to state explicitly all of the classes/methods being > > added > > >> > > >> 2) Zero grace period > > >> > > >> The "zero grace period" was essentially just consequence of my original > > >> definition for sliding windows; with the new semantics we can (and > > should) > > >> allow for a nonzero grace period > > >> > > >> 3) Wall-clock time > > >> > > >> Hm, I had not considered this yet but it may be a good idea to keep in > > mind > > >> while rethinking the design. To clarify, we don't support wall-clock > > based > > >> aggregations with hopping or tumbling windows though (yet?) > > >> > > >> 4) Commutative vs associative vs invertible aggregations > > >> > > >> I agree that it's reasonable to assume commutativity and associativity, > > but > > >> that's not the same as being subtractable -- that requires > > invertibility, > > >> which is broken by a lot of very simple functions and is not, I think, > > ok > > >> to assume. However we could consider adding a separate API which also > > takes > > >> a subtractor and corresponds to a completely different implementation. > > We > > >> could also consider an API that takes a function that aggregates two > > >> aggregates together in addition to the existing aggregator (which > > >> aggregates a single value with an existing aggregate) WDYT? > > >> > > >> > > >> > > >> > > >> On Thu, Apr 11, 2019 at 1:13 AM Matthias J. Sax <matth...@confluent.io> > > >> wrote: > > >> > > >>> Thanks for the KIP Sophie. Couple of comments: > > >>> > > >>> It's a little unclear to me, what public API you propose. It seems you > > >>> want to add > > >>> > > >>>> public class SlidingWindow extends TimeWindow {} > > >>> > > >>> and > > >>> > > >>>> public class SlidingWindows extends TimeWindows {} // or maybe > > `extends > > >>> Windows` > > >>> > > >>> If yes, should we add corresponding public Serdes classes? > > >>> > > >>> Also, can you list all newly added classes/methods explicitly in the > > >> wiki? > > >>> > > >>> > > >>> About the semantics of the operator. > > >>> > > >>>> "Only one single window is defined at a time," > > >>> > > >>> Should this be "one window per key" instead? > > >>> > > >>> I agree that both window boundaries should be inclusive. However, I am > > >>> not sure about: > > >>> > > >>>> At most one record is forwarded when new data arrives > > >>> > > >>> (1) For what case, no output would be produced? > > >>> > > >>> (2) I think, if we advance in time, it can also happen that we emit > > >>> multiple records. If a window "slides" (not "hops"), we cannot just > > >>> advance it to the current record stream time but would need to emit > > more > > >>> result if records expire before the current input record is added. For > > >>> example, consider a window with size 5ms, and the following ts (all > > >>> records have the same key): > > >>> > > >>> 1 2 3 10 11 > > >>> > > >>> This should result in windows: > > >>> > > >>> [1] > > >>> [1,2] > > >>> [1,2,3] > > >>> [2,3] > > >>> [3] > > >>> [10] > > >>> [10,11] > > >>> > > >>> Ie, when the record with ts=10 is processed, it will trigger the > > >>> computation of [2,3], [3] and [10]. > > >>> > > >>> > > >>> About out-of-order handling: I am wondering, if the current design that > > >>> does not allow any grace period is too restrictive. Can you elaborate > > >>> more on the motivation for this suggestions? > > >>> > > >>> > > >>> Can you give more details about the "simple design"? Atm, it's not > > clear > > >>> to me how it works. I though we always need to store all raw values. If > > >>> we only store the current aggregate, would we end up with the same > > >>> inefficient solution as using a hopping window with advance 1ms? > > >>> > > >>> > > >>> For the O(sqrt(N)) proposal: can you maybe add an example with concrete > > >>> bucket sizes, window size etc. The current proposal is a little unclear > > >>> to me, atm. > > >>> > > >>> > > >>> How are windows advance? Do you propose to advance all windows over all > > >>> keys at the same time, or would each window (per key) advance > > >>> independent from all other windows? What would be the pros/cons for > > both > > >>> approaches? > > >>> > > >>> > > >>> To add to Guozhang's comment: atm, DSL operators assume that aggregate > > >>> functions are commutative and associative. Hence, it seems ok to make > > >>> the same assumption for sliding window. Addressing holistic and > > >>> non-subtractable aggregations should be supported out of the box at > > some > > >>> point, too, but this would be a different KIP adding this to all > > >>> existing aggregations. > > >>> > > >>> > > >>> -Matthias > > >>> > > >>> > > >>> > > >>> On 4/9/19 4:38 PM, Guozhang Wang wrote: > > >>>> Hi Sophie, > > >>>> > > >>>> Thanks for the proposed KIP. I've made a pass over it and here are > > some > > >>>> thoughts: > > >>>> > > >>>> 1. "The window size is effectively the grace and retention period". > > The > > >>>> grace time is defined as "the time to admit late-arriving events after > > >>> the > > >>>> end of the window." hence it is the additional time beyond the window > > >>> size. > > >>>> I guess your were trying to say it should be zero? > > >>>> > > >>>> Also for retention period, it is not a notion of the window spec any > > >>> more, > > >>>> but only for the window store itself. So I'd suggest talking about > > >> window > > >>>> size here, and note that store retention time cannot be controlled via > > >>>> window spec at all. > > >>>> > > >>>> 2. In the "O(sqrt(N)) Design" you did not mention when / how to expire > > >> a > > >>>> bucket, so I'd assume you will expire one bucket as a whole when its > > >> end > > >>>> time is smaller than the current window's starting time, right? > > >>>> > > >>>> 3. Also in your algorithm how to choose "M" seems tricky, would it be > > a > > >>>> configurable parameter exposed to users or is it abstracted away and > > >> only > > >>>> being selected internally? > > >>>> > > >>>> 4. "There is some tradeoff between purely optimizing " seems > > incomplete > > >>>> paragraph? > > >>>> > > >>>> 5. Meta comment: for many aggregations it is commutative and > > >> associative > > >>> so > > >>>> we can require users to pass in a "substract" function as well. Given > > >>> these > > >>>> two function I think we can propose two set of APIs, 1) with the adder > > >>> and > > >>>> subtractor and 2) with the added only (if the aggregate logic is not > > >>> comm. > > >>>> and assoc.). > > >>>> > > >>>> We just maintain an aggregate value for each bucket (call it > > >>>> bucket_aggregate) plus for the whole window (call it total_aggregate), > > >>> i.e. > > >>>> at most M + 1 values per key. We use the total_aggregate for queries, > > >> and > > >>>> each update will cause 2 writes (to the bucket and to the total > > >>> aggregate). > > >>>> > > >>>> And with 1) when expiring the oldest bucket we simply call > > >>>> subtract(total_aggregate, bucket_aggregate); with 2) when expiring the > > >>>> oldest bucket we can re-compute the total_aggregate by > > >>>> sum(bucket_aggregate) over other buckets again. > > >>>> > > >>>> 6. Meta comment: it is reasonable to assume in practice > > out-of-ordering > > >>>> data is not very common, hence most of the updates will be falling > > into > > >>> the > > >>>> latest bucket. So I'm wondering if it makes sense to always store the > > >>> first > > >>>> bucket in memory while making other buckets optionally on persistent > > >>>> storage. In practice, as long as M is large enough (we probably need > > it > > >>> to > > >>>> be large enough to have sufficiently sensitive expiration anyways) > > then > > >>>> each bucket's aggregate data is small enough to be in memory. > > >>>> > > >>>> > > >>>> > > >>>> Guozhang > > >>>> > > >>>> > > >>>> > > >>>> On Fri, Apr 5, 2019 at 7:58 PM Sophie Blee-Goldman < > > >> sop...@confluent.io> > > >>>> wrote: > > >>>> > > >>>>> Hello all, > > >>>>> > > >>>>> I would like to kick off discussion of this KIP aimed at providing > > >>> sliding > > >>>>> window semantics to DSL aggregations. > > >>>>> > > >>>>> > > >>>>> > > >>> > > >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL > > >>>>> > > >>>>> Please take a look and share any thoughts you have regarding the API, > > >>>>> semantics, design, etc! > > >>>>> > > >>>>> I also have a POC PR open with the "naive" implementation for your > > >>>>> reference: https://github.com/apache/kafka/pull/6549 > > >>>>> > > >>>>> Cheers, > > >>>>> Sophie > > >>>>> > > >>>> > > >>>> > > >>> > > >>> > > >> > > > > > > > > > > > > -- > -- Guozhang