Thanks for the feedback Matthias and Bill. After discussing offline we realized the type of windows I originally had in mind were quite different, and I agree now that the semantics outlined by Matthias are the direction to go in here. I will update the KIP accordingly with the new semantics (and corresponding design) and restart the discussion from there.
In the meantime, to respond to some other points: 1) API: I propose adding only the one class -- public class SlidingWindows extends Windows<TimeWindow> {} -- so I do not believe we need any new Serdes? It will still be a fixed size TimeWindow, but handled a bit differently. I've updated the KIP to state explicitly all of the classes/methods being added 2) Zero grace period The "zero grace period" was essentially just consequence of my original definition for sliding windows; with the new semantics we can (and should) allow for a nonzero grace period 3) Wall-clock time Hm, I had not considered this yet but it may be a good idea to keep in mind while rethinking the design. To clarify, we don't support wall-clock based aggregations with hopping or tumbling windows though (yet?) 4) Commutative vs associative vs invertible aggregations I agree that it's reasonable to assume commutativity and associativity, but that's not the same as being subtractable -- that requires invertibility, which is broken by a lot of very simple functions and is not, I think, ok to assume. However we could consider adding a separate API which also takes a subtractor and corresponds to a completely different implementation. We could also consider an API that takes a function that aggregates two aggregates together in addition to the existing aggregator (which aggregates a single value with an existing aggregate) WDYT? On Thu, Apr 11, 2019 at 1:13 AM Matthias J. Sax <matth...@confluent.io> wrote: > Thanks for the KIP Sophie. Couple of comments: > > It's a little unclear to me, what public API you propose. It seems you > want to add > > > public class SlidingWindow extends TimeWindow {} > > and > > > public class SlidingWindows extends TimeWindows {} // or maybe `extends > Windows` > > If yes, should we add corresponding public Serdes classes? > > Also, can you list all newly added classes/methods explicitly in the wiki? > > > About the semantics of the operator. > > > "Only one single window is defined at a time," > > Should this be "one window per key" instead? > > I agree that both window boundaries should be inclusive. However, I am > not sure about: > > > At most one record is forwarded when new data arrives > > (1) For what case, no output would be produced? > > (2) I think, if we advance in time, it can also happen that we emit > multiple records. If a window "slides" (not "hops"), we cannot just > advance it to the current record stream time but would need to emit more > result if records expire before the current input record is added. For > example, consider a window with size 5ms, and the following ts (all > records have the same key): > > 1 2 3 10 11 > > This should result in windows: > > [1] > [1,2] > [1,2,3] > [2,3] > [3] > [10] > [10,11] > > Ie, when the record with ts=10 is processed, it will trigger the > computation of [2,3], [3] and [10]. > > > About out-of-order handling: I am wondering, if the current design that > does not allow any grace period is too restrictive. Can you elaborate > more on the motivation for this suggestions? > > > Can you give more details about the "simple design"? Atm, it's not clear > to me how it works. I though we always need to store all raw values. If > we only store the current aggregate, would we end up with the same > inefficient solution as using a hopping window with advance 1ms? > > > For the O(sqrt(N)) proposal: can you maybe add an example with concrete > bucket sizes, window size etc. The current proposal is a little unclear > to me, atm. > > > How are windows advance? Do you propose to advance all windows over all > keys at the same time, or would each window (per key) advance > independent from all other windows? What would be the pros/cons for both > approaches? > > > To add to Guozhang's comment: atm, DSL operators assume that aggregate > functions are commutative and associative. Hence, it seems ok to make > the same assumption for sliding window. Addressing holistic and > non-subtractable aggregations should be supported out of the box at some > point, too, but this would be a different KIP adding this to all > existing aggregations. > > > -Matthias > > > > On 4/9/19 4:38 PM, Guozhang Wang wrote: > > Hi Sophie, > > > > Thanks for the proposed KIP. I've made a pass over it and here are some > > thoughts: > > > > 1. "The window size is effectively the grace and retention period". The > > grace time is defined as "the time to admit late-arriving events after > the > > end of the window." hence it is the additional time beyond the window > size. > > I guess your were trying to say it should be zero? > > > > Also for retention period, it is not a notion of the window spec any > more, > > but only for the window store itself. So I'd suggest talking about window > > size here, and note that store retention time cannot be controlled via > > window spec at all. > > > > 2. In the "O(sqrt(N)) Design" you did not mention when / how to expire a > > bucket, so I'd assume you will expire one bucket as a whole when its end > > time is smaller than the current window's starting time, right? > > > > 3. Also in your algorithm how to choose "M" seems tricky, would it be a > > configurable parameter exposed to users or is it abstracted away and only > > being selected internally? > > > > 4. "There is some tradeoff between purely optimizing " seems incomplete > > paragraph? > > > > 5. Meta comment: for many aggregations it is commutative and associative > so > > we can require users to pass in a "substract" function as well. Given > these > > two function I think we can propose two set of APIs, 1) with the adder > and > > subtractor and 2) with the added only (if the aggregate logic is not > comm. > > and assoc.). > > > > We just maintain an aggregate value for each bucket (call it > > bucket_aggregate) plus for the whole window (call it total_aggregate), > i.e. > > at most M + 1 values per key. We use the total_aggregate for queries, and > > each update will cause 2 writes (to the bucket and to the total > aggregate). > > > > And with 1) when expiring the oldest bucket we simply call > > subtract(total_aggregate, bucket_aggregate); with 2) when expiring the > > oldest bucket we can re-compute the total_aggregate by > > sum(bucket_aggregate) over other buckets again. > > > > 6. Meta comment: it is reasonable to assume in practice out-of-ordering > > data is not very common, hence most of the updates will be falling into > the > > latest bucket. So I'm wondering if it makes sense to always store the > first > > bucket in memory while making other buckets optionally on persistent > > storage. In practice, as long as M is large enough (we probably need it > to > > be large enough to have sufficiently sensitive expiration anyways) then > > each bucket's aggregate data is small enough to be in memory. > > > > > > > > Guozhang > > > > > > > > On Fri, Apr 5, 2019 at 7:58 PM Sophie Blee-Goldman <sop...@confluent.io> > > wrote: > > > >> Hello all, > >> > >> I would like to kick off discussion of this KIP aimed at providing > sliding > >> window semantics to DSL aggregations. > >> > >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL > >> > >> Please take a look and share any thoughts you have regarding the API, > >> semantics, design, etc! > >> > >> I also have a POC PR open with the "naive" implementation for your > >> reference: https://github.com/apache/kafka/pull/6549 > >> > >> Cheers, > >> Sophie > >> > > > > > >