Thanks for the KIP Sophie.

I have a couple of additional comments.

The current proposal only considers stream-time.

While I support this, each time we introduce a new operation based on
stream-time, invariably users request that operation support wall-clock
time as well.  Would we want to consider this option in the current KIP
proactively?

Also, I think the concept of 0 grace period is too restrictive as well.  I
may have missed your response, but can you elaborate on the reasoning?

Thanks,
Bill


On Fri, Apr 12, 2019 at 12:14 PM Guozhang Wang <wangg...@gmail.com> wrote:

> On Thu, Apr 11, 2019 at 2:10 PM Sophie Blee-Goldman <sop...@confluent.io>
> wrote:
>
> > Thanks for the comments Guozhang! I've answered your questions below
> >
> > On Tue, Apr 9, 2019 at 4:38 PM Guozhang Wang <wangg...@gmail.com> wrote:
> >
> > > Hi Sophie,
> > >
> > > Thanks for the proposed KIP. I've made a pass over it and here are some
> > > thoughts:
> > >
> > > 1. "The window size is effectively the grace and retention period". The
> > > grace time is defined as "the time to admit late-arriving events after
> > the
> > > end of the window." hence it is the additional time beyond the window
> > size.
> > > I guess your were trying to say it should be zero?
> > >
> > > Also for retention period, it is not a notion of the window spec any
> > more,
> > > but only for the window store itself. So I'd suggest talking about
> window
> > > size here, and note that store retention time cannot be controlled via
> > > window spec at all.
> > >
> >
> > Yes, I meant to say the grace period is effectively zero -- the retention
> > period will ultimately be the same as the window size, which is
> > configurable, but it can't be configured independently if that's what you
> > mean?
> >
> >
> You can confiture retention via Materialized (in DSL), when specifying the
> store in KTable, or via WindowStoreBuilder#retentionPeriod (in PAPI). The
> point here is that they are specified independently from the Windows spec.
> So a user cannot control how long a materialized store can be retained from
> the window spec itself, she must do that via the mentioned methods before.
>
>
> >
> > > 2. In the "O(sqrt(N)) Design" you did not mention when / how to expire
> a
> > > bucket, so I'd assume you will expire one bucket as a whole when its
> end
> > > time is smaller than the current window's starting time, right?
> > >
> >
> > Since this design assumes we don't have a subtracter, each bucket would
> > expire when it's start time is outside the current window; the remaining
> > values in that bucket are then aggregated with the "running aggregate" of
> > the next bucket to get the total aggregate for the entire window. I'll
> try
> > to come up with a diagram and/or better way to explain what I have in
> mind
> > here...
> > (The values themselves in the buckets will expire automatically by
> setting
> > the retention period of the underlying window store)
> >
> >
> > > 3. Also in your algorithm how to choose "M" seems tricky, would it be a
> > > configurable parameter exposed to users or is it abstracted away and
> only
> > > being selected internally?
> > >
> >
> > Good question. If we ignore the difference in cost between aggregation
> > operations and writes to the underlying store, the optimal value of M is
> > sqrt(N). But the reality is the aggregation might be very simple vs
> > expensive RocksDB writes -- conversely the aggregation itself could be
> > complicated/costly while the underlying store is cheap to write  (ie
> > in-memory). I do feel it should be abstracted away from the user however
> > and not an additional parameter they need to consider and tune (eg
> > segmentInterval) ... some profiling would probably be needed to
> determine a
> > reasonable choice
> >
> >
> > > 4. "There is some tradeoff between purely optimizing " seems incomplete
> > > paragraph?
> > >
> >
> > Whoops
> >
> >
> > > 5. Meta comment: for many aggregations it is commutative and
> associative
> > so
> > > we can require users to pass in a "substract" function as well. Given
> > these
> > > two function I think we can propose two set of APIs, 1) with the adder
> > and
> > > subtractor and 2) with the added only (if the aggregate logic is not
> > comm.
> > > and assoc.).
> > >
> > > We just maintain an aggregate value for each bucket (call it
> > > bucket_aggregate) plus for the whole window (call it total_aggregate),
> > i.e.
> > > at most M + 1 values per key. We use the total_aggregate for queries,
> and
> > > each update will cause 2 writes (to the bucket and to the total
> > aggregate).
> > >
> > > And with 1) when expiring the oldest bucket we simply call
> > > subtract(total_aggregate, bucket_aggregate); with 2) when expiring the
> > > oldest bucket we can re-compute the total_aggregate by
> > > sum(bucket_aggregate) over other buckets again.
> > >
> >
> > This is a good point, ie we can definitely be much smarter in our design
> if
> > we have a subtracter, in which case it's probably worth separate sets of
> > APIs/implementations based on what the user can provide. I'll work this
> > into the KIP
> >
> >
> > > 6. Meta comment: it is reasonable to assume in practice out-of-ordering
> > > data is not very common, hence most of the updates will be falling into
> > the
> > > latest bucket. So I'm wondering if it makes sense to always store the
> > first
> > > bucket in memory while making other buckets optionally on persistent
> > > storage. In practice, as long as M is large enough (we probably need it
> > to
> > > be large enough to have sufficiently sensitive expiration anyways) then
> > > each bucket's aggregate data is small enough to be in memory.
> > >
> >
> > This sounds reasonable to me (looking into the future, if we want to
> > eventually support a way to "tune" the total memory usage by Streams this
> > could be turned on/off)
> >
> >
> > > Guozhang
> > >
> > >
> > >
> > > On Fri, Apr 5, 2019 at 7:58 PM Sophie Blee-Goldman <
> sop...@confluent.io>
> > > wrote:
> > >
> > > > Hello all,
> > > >
> > > > I would like to kick off discussion of this KIP aimed at providing
> > > sliding
> > > > window semantics to DSL aggregations.
> > > >
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> > > >
> > > > Please take a look and share any thoughts you have regarding the API,
> > > > semantics, design, etc!
> > > >
> > > > I also have a POC PR open with the "naive" implementation for your
> > > > reference: https://github.com/apache/kafka/pull/6549
> > > >
> > > > Cheers,
> > > > Sophie
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to