Thanks for the KIP Sophie. I have a couple of additional comments.
The current proposal only considers stream-time. While I support this, each time we introduce a new operation based on stream-time, invariably users request that operation support wall-clock time as well. Would we want to consider this option in the current KIP proactively? Also, I think the concept of 0 grace period is too restrictive as well. I may have missed your response, but can you elaborate on the reasoning? Thanks, Bill On Fri, Apr 12, 2019 at 12:14 PM Guozhang Wang <wangg...@gmail.com> wrote: > On Thu, Apr 11, 2019 at 2:10 PM Sophie Blee-Goldman <sop...@confluent.io> > wrote: > > > Thanks for the comments Guozhang! I've answered your questions below > > > > On Tue, Apr 9, 2019 at 4:38 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > > > Hi Sophie, > > > > > > Thanks for the proposed KIP. I've made a pass over it and here are some > > > thoughts: > > > > > > 1. "The window size is effectively the grace and retention period". The > > > grace time is defined as "the time to admit late-arriving events after > > the > > > end of the window." hence it is the additional time beyond the window > > size. > > > I guess your were trying to say it should be zero? > > > > > > Also for retention period, it is not a notion of the window spec any > > more, > > > but only for the window store itself. So I'd suggest talking about > window > > > size here, and note that store retention time cannot be controlled via > > > window spec at all. > > > > > > > Yes, I meant to say the grace period is effectively zero -- the retention > > period will ultimately be the same as the window size, which is > > configurable, but it can't be configured independently if that's what you > > mean? > > > > > You can confiture retention via Materialized (in DSL), when specifying the > store in KTable, or via WindowStoreBuilder#retentionPeriod (in PAPI). The > point here is that they are specified independently from the Windows spec. > So a user cannot control how long a materialized store can be retained from > the window spec itself, she must do that via the mentioned methods before. > > > > > > > 2. In the "O(sqrt(N)) Design" you did not mention when / how to expire > a > > > bucket, so I'd assume you will expire one bucket as a whole when its > end > > > time is smaller than the current window's starting time, right? > > > > > > > Since this design assumes we don't have a subtracter, each bucket would > > expire when it's start time is outside the current window; the remaining > > values in that bucket are then aggregated with the "running aggregate" of > > the next bucket to get the total aggregate for the entire window. I'll > try > > to come up with a diagram and/or better way to explain what I have in > mind > > here... > > (The values themselves in the buckets will expire automatically by > setting > > the retention period of the underlying window store) > > > > > > > 3. Also in your algorithm how to choose "M" seems tricky, would it be a > > > configurable parameter exposed to users or is it abstracted away and > only > > > being selected internally? > > > > > > > Good question. If we ignore the difference in cost between aggregation > > operations and writes to the underlying store, the optimal value of M is > > sqrt(N). But the reality is the aggregation might be very simple vs > > expensive RocksDB writes -- conversely the aggregation itself could be > > complicated/costly while the underlying store is cheap to write (ie > > in-memory). I do feel it should be abstracted away from the user however > > and not an additional parameter they need to consider and tune (eg > > segmentInterval) ... some profiling would probably be needed to > determine a > > reasonable choice > > > > > > > 4. "There is some tradeoff between purely optimizing " seems incomplete > > > paragraph? > > > > > > > Whoops > > > > > > > 5. Meta comment: for many aggregations it is commutative and > associative > > so > > > we can require users to pass in a "substract" function as well. Given > > these > > > two function I think we can propose two set of APIs, 1) with the adder > > and > > > subtractor and 2) with the added only (if the aggregate logic is not > > comm. > > > and assoc.). > > > > > > We just maintain an aggregate value for each bucket (call it > > > bucket_aggregate) plus for the whole window (call it total_aggregate), > > i.e. > > > at most M + 1 values per key. We use the total_aggregate for queries, > and > > > each update will cause 2 writes (to the bucket and to the total > > aggregate). > > > > > > And with 1) when expiring the oldest bucket we simply call > > > subtract(total_aggregate, bucket_aggregate); with 2) when expiring the > > > oldest bucket we can re-compute the total_aggregate by > > > sum(bucket_aggregate) over other buckets again. > > > > > > > This is a good point, ie we can definitely be much smarter in our design > if > > we have a subtracter, in which case it's probably worth separate sets of > > APIs/implementations based on what the user can provide. I'll work this > > into the KIP > > > > > > > 6. Meta comment: it is reasonable to assume in practice out-of-ordering > > > data is not very common, hence most of the updates will be falling into > > the > > > latest bucket. So I'm wondering if it makes sense to always store the > > first > > > bucket in memory while making other buckets optionally on persistent > > > storage. In practice, as long as M is large enough (we probably need it > > to > > > be large enough to have sufficiently sensitive expiration anyways) then > > > each bucket's aggregate data is small enough to be in memory. > > > > > > > This sounds reasonable to me (looking into the future, if we want to > > eventually support a way to "tune" the total memory usage by Streams this > > could be turned on/off) > > > > > > > Guozhang > > > > > > > > > > > > On Fri, Apr 5, 2019 at 7:58 PM Sophie Blee-Goldman < > sop...@confluent.io> > > > wrote: > > > > > > > Hello all, > > > > > > > > I would like to kick off discussion of this KIP aimed at providing > > > sliding > > > > window semantics to DSL aggregations. > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL > > > > > > > > Please take a look and share any thoughts you have regarding the API, > > > > semantics, design, etc! > > > > > > > > I also have a POC PR open with the "naive" implementation for your > > > > reference: https://github.com/apache/kafka/pull/6549 > > > > > > > > Cheers, > > > > Sophie > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > -- > -- Guozhang >