On Thu, Apr 11, 2019 at 2:10 PM Sophie Blee-Goldman <sop...@confluent.io> wrote:
> Thanks for the comments Guozhang! I've answered your questions below > > On Tue, Apr 9, 2019 at 4:38 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hi Sophie, > > > > Thanks for the proposed KIP. I've made a pass over it and here are some > > thoughts: > > > > 1. "The window size is effectively the grace and retention period". The > > grace time is defined as "the time to admit late-arriving events after > the > > end of the window." hence it is the additional time beyond the window > size. > > I guess your were trying to say it should be zero? > > > > Also for retention period, it is not a notion of the window spec any > more, > > but only for the window store itself. So I'd suggest talking about window > > size here, and note that store retention time cannot be controlled via > > window spec at all. > > > > Yes, I meant to say the grace period is effectively zero -- the retention > period will ultimately be the same as the window size, which is > configurable, but it can't be configured independently if that's what you > mean? > > You can confiture retention via Materialized (in DSL), when specifying the store in KTable, or via WindowStoreBuilder#retentionPeriod (in PAPI). The point here is that they are specified independently from the Windows spec. So a user cannot control how long a materialized store can be retained from the window spec itself, she must do that via the mentioned methods before. > > > 2. In the "O(sqrt(N)) Design" you did not mention when / how to expire a > > bucket, so I'd assume you will expire one bucket as a whole when its end > > time is smaller than the current window's starting time, right? > > > > Since this design assumes we don't have a subtracter, each bucket would > expire when it's start time is outside the current window; the remaining > values in that bucket are then aggregated with the "running aggregate" of > the next bucket to get the total aggregate for the entire window. I'll try > to come up with a diagram and/or better way to explain what I have in mind > here... > (The values themselves in the buckets will expire automatically by setting > the retention period of the underlying window store) > > > > 3. Also in your algorithm how to choose "M" seems tricky, would it be a > > configurable parameter exposed to users or is it abstracted away and only > > being selected internally? > > > > Good question. If we ignore the difference in cost between aggregation > operations and writes to the underlying store, the optimal value of M is > sqrt(N). But the reality is the aggregation might be very simple vs > expensive RocksDB writes -- conversely the aggregation itself could be > complicated/costly while the underlying store is cheap to write (ie > in-memory). I do feel it should be abstracted away from the user however > and not an additional parameter they need to consider and tune (eg > segmentInterval) ... some profiling would probably be needed to determine a > reasonable choice > > > > 4. "There is some tradeoff between purely optimizing " seems incomplete > > paragraph? > > > > Whoops > > > > 5. Meta comment: for many aggregations it is commutative and associative > so > > we can require users to pass in a "substract" function as well. Given > these > > two function I think we can propose two set of APIs, 1) with the adder > and > > subtractor and 2) with the added only (if the aggregate logic is not > comm. > > and assoc.). > > > > We just maintain an aggregate value for each bucket (call it > > bucket_aggregate) plus for the whole window (call it total_aggregate), > i.e. > > at most M + 1 values per key. We use the total_aggregate for queries, and > > each update will cause 2 writes (to the bucket and to the total > aggregate). > > > > And with 1) when expiring the oldest bucket we simply call > > subtract(total_aggregate, bucket_aggregate); with 2) when expiring the > > oldest bucket we can re-compute the total_aggregate by > > sum(bucket_aggregate) over other buckets again. > > > > This is a good point, ie we can definitely be much smarter in our design if > we have a subtracter, in which case it's probably worth separate sets of > APIs/implementations based on what the user can provide. I'll work this > into the KIP > > > > 6. Meta comment: it is reasonable to assume in practice out-of-ordering > > data is not very common, hence most of the updates will be falling into > the > > latest bucket. So I'm wondering if it makes sense to always store the > first > > bucket in memory while making other buckets optionally on persistent > > storage. In practice, as long as M is large enough (we probably need it > to > > be large enough to have sufficiently sensitive expiration anyways) then > > each bucket's aggregate data is small enough to be in memory. > > > > This sounds reasonable to me (looking into the future, if we want to > eventually support a way to "tune" the total memory usage by Streams this > could be turned on/off) > > > > Guozhang > > > > > > > > On Fri, Apr 5, 2019 at 7:58 PM Sophie Blee-Goldman <sop...@confluent.io> > > wrote: > > > > > Hello all, > > > > > > I would like to kick off discussion of this KIP aimed at providing > > sliding > > > window semantics to DSL aggregations. > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL > > > > > > Please take a look and share any thoughts you have regarding the API, > > > semantics, design, etc! > > > > > > I also have a POC PR open with the "naive" implementation for your > > > reference: https://github.com/apache/kafka/pull/6549 > > > > > > Cheers, > > > Sophie > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang