Thanks for the KIP Sophie. Couple of comments:

It's a little unclear to me, what public API you propose. It seems you
want to add

> public class SlidingWindow extends TimeWindow {}

and

> public class SlidingWindows extends TimeWindows {} // or maybe `extends 
> Windows`

If yes, should we add corresponding public Serdes classes?

Also, can you list all newly added classes/methods explicitly in the wiki?


About the semantics of the operator.

> "Only one single window is defined at a time,"

Should this be "one window per key" instead?

I agree that both window boundaries should be inclusive. However, I am
not sure about:

> At most one record is forwarded when new data arrives

(1) For what case, no output would be produced?

(2) I think, if we advance in time, it can also happen that we emit
multiple records. If a window "slides" (not "hops"), we cannot just
advance it to the current record stream time but would need to emit more
result if records expire before the current input record is added. For
example, consider a window with size 5ms, and the following ts (all
records have the same key):

1 2 3 10 11

This should result in windows:

[1]
[1,2]
[1,2,3]
[2,3]
[3]
[10]
[10,11]

Ie, when the record with ts=10 is processed, it will trigger the
computation of [2,3], [3] and [10].


About out-of-order handling: I am wondering, if the current design that
does not allow any grace period is too restrictive. Can you elaborate
more on the motivation for this suggestions?


Can you give more details about the "simple design"? Atm, it's not clear
to me how it works. I though we always need to store all raw values. If
we only store the current aggregate, would we end up with the same
inefficient solution as using a hopping window with advance 1ms?


For the O(sqrt(N)) proposal: can you maybe add an example with concrete
bucket sizes, window size etc. The current proposal is a little unclear
to me, atm.


How are windows advance? Do you propose to advance all windows over all
keys at the same time, or would each window (per key) advance
independent from all other windows? What would be the pros/cons for both
approaches?


To add to Guozhang's comment: atm, DSL operators assume that aggregate
functions are commutative and associative. Hence, it seems ok to make
the same assumption for sliding window. Addressing holistic and
non-subtractable aggregations should be supported out of the box at some
point, too, but this would be a different KIP adding this to all
existing aggregations.


-Matthias



On 4/9/19 4:38 PM, Guozhang Wang wrote:
> Hi Sophie,
> 
> Thanks for the proposed KIP. I've made a pass over it and here are some
> thoughts:
> 
> 1. "The window size is effectively the grace and retention period". The
> grace time is defined as "the time to admit late-arriving events after the
> end of the window." hence it is the additional time beyond the window size.
> I guess your were trying to say it should be zero?
> 
> Also for retention period, it is not a notion of the window spec any more,
> but only for the window store itself. So I'd suggest talking about window
> size here, and note that store retention time cannot be controlled via
> window spec at all.
> 
> 2. In the "O(sqrt(N)) Design" you did not mention when / how to expire a
> bucket, so I'd assume you will expire one bucket as a whole when its end
> time is smaller than the current window's starting time, right?
> 
> 3. Also in your algorithm how to choose "M" seems tricky, would it be a
> configurable parameter exposed to users or is it abstracted away and only
> being selected internally?
> 
> 4. "There is some tradeoff between purely optimizing " seems incomplete
> paragraph?
> 
> 5. Meta comment: for many aggregations it is commutative and associative so
> we can require users to pass in a "substract" function as well. Given these
> two function I think we can propose two set of APIs, 1) with the adder and
> subtractor and 2) with the added only (if the aggregate logic is not comm.
> and assoc.).
> 
> We just maintain an aggregate value for each bucket (call it
> bucket_aggregate) plus for the whole window (call it total_aggregate), i.e.
> at most M + 1 values per key. We use the total_aggregate for queries, and
> each update will cause 2 writes (to the bucket and to the total aggregate).
> 
> And with 1) when expiring the oldest bucket we simply call
> subtract(total_aggregate, bucket_aggregate); with 2) when expiring the
> oldest bucket we can re-compute the total_aggregate by
> sum(bucket_aggregate) over other buckets again.
> 
> 6. Meta comment: it is reasonable to assume in practice out-of-ordering
> data is not very common, hence most of the updates will be falling into the
> latest bucket. So I'm wondering if it makes sense to always store the first
> bucket in memory while making other buckets optionally on persistent
> storage. In practice, as long as M is large enough (we probably need it to
> be large enough to have sufficiently sensitive expiration anyways) then
> each bucket's aggregate data is small enough to be in memory.
> 
> 
> 
> Guozhang
> 
> 
> 
> On Fri, Apr 5, 2019 at 7:58 PM Sophie Blee-Goldman <sop...@confluent.io>
> wrote:
> 
>> Hello all,
>>
>> I would like to kick off discussion of this KIP aimed at providing sliding
>> window semantics to DSL aggregations.
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
>>
>> Please take a look and share any thoughts you have regarding the API,
>> semantics, design, etc!
>>
>> I also have a POC PR open with the "naive" implementation for your
>> reference: https://github.com/apache/kafka/pull/6549
>>
>> Cheers,
>> Sophie
>>
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to