On Thu, Apr 11, 2019 at 2:10 PM Sophie Blee-Goldman <sop...@confluent.io>
wrote:

> Thanks for the comments Guozhang! I've answered your questions below
>
> On Tue, Apr 9, 2019 at 4:38 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Sophie,
> >
> > Thanks for the proposed KIP. I've made a pass over it and here are some
> > thoughts:
> >
> > 1. "The window size is effectively the grace and retention period". The
> > grace time is defined as "the time to admit late-arriving events after
> the
> > end of the window." hence it is the additional time beyond the window
> size.
> > I guess your were trying to say it should be zero?
> >
> > Also for retention period, it is not a notion of the window spec any
> more,
> > but only for the window store itself. So I'd suggest talking about window
> > size here, and note that store retention time cannot be controlled via
> > window spec at all.
> >
>
> Yes, I meant to say the grace period is effectively zero -- the retention
> period will ultimately be the same as the window size, which is
> configurable, but it can't be configured independently if that's what you
> mean?
>
>
You can confiture retention via Materialized (in DSL), when specifying the
store in KTable, or via WindowStoreBuilder#retentionPeriod (in PAPI). The
point here is that they are specified independently from the Windows spec.
So a user cannot control how long a materialized store can be retained from
the window spec itself, she must do that via the mentioned methods before.


>
> > 2. In the "O(sqrt(N)) Design" you did not mention when / how to expire a
> > bucket, so I'd assume you will expire one bucket as a whole when its end
> > time is smaller than the current window's starting time, right?
> >
>
> Since this design assumes we don't have a subtracter, each bucket would
> expire when it's start time is outside the current window; the remaining
> values in that bucket are then aggregated with the "running aggregate" of
> the next bucket to get the total aggregate for the entire window. I'll try
> to come up with a diagram and/or better way to explain what I have in mind
> here...
> (The values themselves in the buckets will expire automatically by setting
> the retention period of the underlying window store)
>
>
> > 3. Also in your algorithm how to choose "M" seems tricky, would it be a
> > configurable parameter exposed to users or is it abstracted away and only
> > being selected internally?
> >
>
> Good question. If we ignore the difference in cost between aggregation
> operations and writes to the underlying store, the optimal value of M is
> sqrt(N). But the reality is the aggregation might be very simple vs
> expensive RocksDB writes -- conversely the aggregation itself could be
> complicated/costly while the underlying store is cheap to write  (ie
> in-memory). I do feel it should be abstracted away from the user however
> and not an additional parameter they need to consider and tune (eg
> segmentInterval) ... some profiling would probably be needed to determine a
> reasonable choice
>
>
> > 4. "There is some tradeoff between purely optimizing " seems incomplete
> > paragraph?
> >
>
> Whoops
>
>
> > 5. Meta comment: for many aggregations it is commutative and associative
> so
> > we can require users to pass in a "substract" function as well. Given
> these
> > two function I think we can propose two set of APIs, 1) with the adder
> and
> > subtractor and 2) with the added only (if the aggregate logic is not
> comm.
> > and assoc.).
> >
> > We just maintain an aggregate value for each bucket (call it
> > bucket_aggregate) plus for the whole window (call it total_aggregate),
> i.e.
> > at most M + 1 values per key. We use the total_aggregate for queries, and
> > each update will cause 2 writes (to the bucket and to the total
> aggregate).
> >
> > And with 1) when expiring the oldest bucket we simply call
> > subtract(total_aggregate, bucket_aggregate); with 2) when expiring the
> > oldest bucket we can re-compute the total_aggregate by
> > sum(bucket_aggregate) over other buckets again.
> >
>
> This is a good point, ie we can definitely be much smarter in our design if
> we have a subtracter, in which case it's probably worth separate sets of
> APIs/implementations based on what the user can provide. I'll work this
> into the KIP
>
>
> > 6. Meta comment: it is reasonable to assume in practice out-of-ordering
> > data is not very common, hence most of the updates will be falling into
> the
> > latest bucket. So I'm wondering if it makes sense to always store the
> first
> > bucket in memory while making other buckets optionally on persistent
> > storage. In practice, as long as M is large enough (we probably need it
> to
> > be large enough to have sufficiently sensitive expiration anyways) then
> > each bucket's aggregate data is small enough to be in memory.
> >
>
> This sounds reasonable to me (looking into the future, if we want to
> eventually support a way to "tune" the total memory usage by Streams this
> could be turned on/off)
>
>
> > Guozhang
> >
> >
> >
> > On Fri, Apr 5, 2019 at 7:58 PM Sophie Blee-Goldman <sop...@confluent.io>
> > wrote:
> >
> > > Hello all,
> > >
> > > I would like to kick off discussion of this KIP aimed at providing
> > sliding
> > > window semantics to DSL aggregations.
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> > >
> > > Please take a look and share any thoughts you have regarding the API,
> > > semantics, design, etc!
> > >
> > > I also have a POC PR open with the "naive" implementation for your
> > > reference: https://github.com/apache/kafka/pull/6549
> > >
> > > Cheers,
> > > Sophie
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to