Hi Sophie, Thanks for the proposed KIP. I've made a pass over it and here are some thoughts:
1. "The window size is effectively the grace and retention period". The grace time is defined as "the time to admit late-arriving events after the end of the window." hence it is the additional time beyond the window size. I guess your were trying to say it should be zero? Also for retention period, it is not a notion of the window spec any more, but only for the window store itself. So I'd suggest talking about window size here, and note that store retention time cannot be controlled via window spec at all. 2. In the "O(sqrt(N)) Design" you did not mention when / how to expire a bucket, so I'd assume you will expire one bucket as a whole when its end time is smaller than the current window's starting time, right? 3. Also in your algorithm how to choose "M" seems tricky, would it be a configurable parameter exposed to users or is it abstracted away and only being selected internally? 4. "There is some tradeoff between purely optimizing " seems incomplete paragraph? 5. Meta comment: for many aggregations it is commutative and associative so we can require users to pass in a "substract" function as well. Given these two function I think we can propose two set of APIs, 1) with the adder and subtractor and 2) with the added only (if the aggregate logic is not comm. and assoc.). We just maintain an aggregate value for each bucket (call it bucket_aggregate) plus for the whole window (call it total_aggregate), i.e. at most M + 1 values per key. We use the total_aggregate for queries, and each update will cause 2 writes (to the bucket and to the total aggregate). And with 1) when expiring the oldest bucket we simply call subtract(total_aggregate, bucket_aggregate); with 2) when expiring the oldest bucket we can re-compute the total_aggregate by sum(bucket_aggregate) over other buckets again. 6. Meta comment: it is reasonable to assume in practice out-of-ordering data is not very common, hence most of the updates will be falling into the latest bucket. So I'm wondering if it makes sense to always store the first bucket in memory while making other buckets optionally on persistent storage. In practice, as long as M is large enough (we probably need it to be large enough to have sufficiently sensitive expiration anyways) then each bucket's aggregate data is small enough to be in memory. Guozhang On Fri, Apr 5, 2019 at 7:58 PM Sophie Blee-Goldman <sop...@confluent.io> wrote: > Hello all, > > I would like to kick off discussion of this KIP aimed at providing sliding > window semantics to DSL aggregations. > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL > > Please take a look and share any thoughts you have regarding the API, > semantics, design, etc! > > I also have a POC PR open with the "naive" implementation for your > reference: https://github.com/apache/kafka/pull/6549 > > Cheers, > Sophie > -- -- Guozhang