Hi Sophie,

Thanks for the proposed KIP. I've made a pass over it and here are some
thoughts:

1. "The window size is effectively the grace and retention period". The
grace time is defined as "the time to admit late-arriving events after the
end of the window." hence it is the additional time beyond the window size.
I guess your were trying to say it should be zero?

Also for retention period, it is not a notion of the window spec any more,
but only for the window store itself. So I'd suggest talking about window
size here, and note that store retention time cannot be controlled via
window spec at all.

2. In the "O(sqrt(N)) Design" you did not mention when / how to expire a
bucket, so I'd assume you will expire one bucket as a whole when its end
time is smaller than the current window's starting time, right?

3. Also in your algorithm how to choose "M" seems tricky, would it be a
configurable parameter exposed to users or is it abstracted away and only
being selected internally?

4. "There is some tradeoff between purely optimizing " seems incomplete
paragraph?

5. Meta comment: for many aggregations it is commutative and associative so
we can require users to pass in a "substract" function as well. Given these
two function I think we can propose two set of APIs, 1) with the adder and
subtractor and 2) with the added only (if the aggregate logic is not comm.
and assoc.).

We just maintain an aggregate value for each bucket (call it
bucket_aggregate) plus for the whole window (call it total_aggregate), i.e.
at most M + 1 values per key. We use the total_aggregate for queries, and
each update will cause 2 writes (to the bucket and to the total aggregate).

And with 1) when expiring the oldest bucket we simply call
subtract(total_aggregate, bucket_aggregate); with 2) when expiring the
oldest bucket we can re-compute the total_aggregate by
sum(bucket_aggregate) over other buckets again.

6. Meta comment: it is reasonable to assume in practice out-of-ordering
data is not very common, hence most of the updates will be falling into the
latest bucket. So I'm wondering if it makes sense to always store the first
bucket in memory while making other buckets optionally on persistent
storage. In practice, as long as M is large enough (we probably need it to
be large enough to have sufficiently sensitive expiration anyways) then
each bucket's aggregate data is small enough to be in memory.



Guozhang



On Fri, Apr 5, 2019 at 7:58 PM Sophie Blee-Goldman <sop...@confluent.io>
wrote:

> Hello all,
>
> I would like to kick off discussion of this KIP aimed at providing sliding
> window semantics to DSL aggregations.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
>
> Please take a look and share any thoughts you have regarding the API,
> semantics, design, etc!
>
> I also have a POC PR open with the "naive" implementation for your
> reference: https://github.com/apache/kafka/pull/6549
>
> Cheers,
> Sophie
>


-- 
-- Guozhang

Reply via email to