For

stream
      .groupBy(..)
      .windowedBy(..)
      .aggregate(..)
      .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
      .mapValues(..)

I think after `aggregate` it's already a table and then the emit strategy
is too late to control
how windowed stream is outputted to table. This is the concern Guozhang
raised about having this in existing `suppress` operator as well.

Thanks,
Hao

On Wed, Mar 23, 2022 at 1:05 PM Bruno Cadonna <cado...@apache.org> wrote:

> Hi,
>
> Thank you for your answers to my questions!
>
> I see the argument about conciseness of configuring a stream with
> methods instead of config objects. I just miss a bit the descriptive
> aspect.
>
> What about
>
> stream
>       .groupBy(..)
>       .windowedBy(..)
>       .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
>       .aggregate(..)
>       .mapValues(..)
>
> I have also another question. Why should emitting of results be
> controlled by the window level api? If I want to emit results for each
> input record the emit strategy is quite independent from the window. So
> I somehow share Matthias' and Guozhang's concern that the emit strategy
> seems misplaced there.
>
> What are the arguments against?
>
> stream
>       .groupBy(..)
>       .windowedBy(..)
>       .aggregate(..)
>       .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
>       .mapValues(..)
>
>
> A final administrative request: Hao, could you please add the rejected
> alternatives to the KIP so that future us will know why we rejected them?
>
> Best,
> Bruno
>
> On 23.03.22 19:38, John Roesler wrote:
> > Hi all,
> >
> > I can see both sides of this.
> >
> > On one hand, when we say
> > "stream.groupBy().windowBy().count()", it seems like we're
> > telling KS to take the raw stream, group it based on key,
> > then window it based on time, and then compute an
> > aggregation on the windows. In that model, "trigger()" would
> > have to mean something like "trigger it", which doesn't
> > really make sense, since we aren't "triggering" the
> > aggregation (then again, to an outside observer, it would
> > appear that way... food for thought).
> >
> > Another way to look at it is that all we're really doing is
> > configuring a windowed aggreation on the stream, and we're
> > doing it with a progressive builder interface. In other
> > words, the above is just a progressive builder for
> > configuring an operation like
> > "stream.aggregate(groupingConfig, windowingConfig,
> > countFn)". Under the latter interpretation of the DSL, it
> > makes perfect sense to add more optional progressive builder
> > methods like trigger() to the WindowedKStream interfaces.
> >
> > Since part of the motivation for choosing the word "trigger"
> > here is to stay close to what Flink defines, I'll also point
> > out that Flink's syntax is also
> > "stream.keyBy().window().trigger().aggregate()". Not that
> > their API is the holy grail or anything, but it's at least
> > an indication that this API isn't a horrible mistake.
> >
> > All other things being equal, I also prefer to leave tie-
> > breakers in the hands of the contributor. So, if we've all
> > said our piece and Hao still prefers option 1, then (as long
> > as we don't think it's a horrible mistake), I think we
> > should just let him go for it.
> >
> > Speaking of which, after reviewing the responses regarding
> > deprecating `Suppressed#onWindowClose`, I still think we
> > should just go ahead and deprecate it. Although it's not
> > expressed exactly the same way, it still does exactly the
> > same thing, or so close that it seems confusing to keep
> > both. But again, if Hao really prefers to keep both, I won't
> > insist on it :)
> >
> > Thanks all,
> > -John
> >
> > On Wed, 2022-03-23 at 09:59 -0700, Hao Li wrote:
> >> Thanks Bruno!
> >>
> >> Argument for option 1 is:
> >> 1. Concise and descriptive. It avoids overloading existing functions and
> >> it's very clear what it's doing. Imagine if there's a autocomplete
> feature
> >> in Intellij or other IDE for our DSL in the future, it's not favorable
> to
> >> show 6 `windowedBy` functions.
> >> 2. Option 1 is operated on `windowedStream` to configure how it should
> be
> >> outputted. Option 2 operates on `KGroupedStream` to produce
> >> `windowedStream` as well as configure how `windowedStream` should be
> >>      outputted. I feel it's better to have a `windowedStream` and then
> >> configure how it can be outputted. Somehow I feel option 2 breaks the
> >> builder pattern.
> >> 3. `WindowedByParameters` doesn't seem very descriptive. If we put all
> >> kinds of different parameters into it to avoid future overloading, it's
> too
> >> bloated and not very user friendly.
> >>
> >> I agree option 1's `trigger` function is configuring the stream which
> feels
> >> different from existing `count` or `aggregate` etc. Configuring might be
> >> also a kind of action to stream :) I'm not sure if it breaks DSL
> principle
> >> and if it does,
> >> can we relax the principle given the benefits compared to option 2)?
> Maybe
> >> John can chime in as the DSL grammar author.
> >>
> >> Thanks,
> >> Hao
> >>
> >> On Wed, Mar 23, 2022 at 2:59 AM Bruno Cadonna <cado...@apache.org>
> wrote:
> >>
> >>> Hi Hao,
> >>>
> >>> I agree with Guozhang: Great summary! Thank you!
> >>>
> >>> Regarding "aligned with other config class names", there is this DSL
> >>> grammar John once specified
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
> >>> and we have already used it in the code. I found the grammar quite
> useful.
> >>>
> >>> I am undecided if option 1 is really worth it. What are actually the
> >>> arguments in favor of it? Is it only that we do not need to overload
> >>> other methods? This does not seem worth to break DSL principles. An
> >>> alternative proposal would be to go with option 2 and conform with the
> >>> grammar above:
> >>>
> >>> <W extends Window> TimeWindowedKStream<K, V> windowedBy(final
> Windows<W>
> >>> windows, WindowedByParameters parameters);
> >>>
> >>> TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows,
> >>> WindowedByParameters parameters);
> >>>
> >>> SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows,
> >>> WindowedByParameters parameters);
> >>>
> >>> This is similar to option 2 in the KIP, but it ensures that we put all
> >>> future needed configs in the parameters object and we do not need to
> >>> overload the methods anymore.
> >>>
> >>> Then if we also get KAFKA-10298 done, we could even collapse all
> >>> `windowedBy()` methods into one.
> >>>
> >>> Best,
> >>> Bruno
> >>>
> >>> On 22.03.22 22:31, Guozhang Wang wrote:
> >>>> Thanks for the great summary Hao. I'm still learning towards option 2)
> >>>> here, and I'm in favor of `trigger` as function name, and `Triggered`
> as
> >>>> config class name (mainly to be aligned with other config class
> names).
> >>>> Also want to see other's preferences between the options, as well as
> the
> >>>> namings.
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>>
> >>>> On Tue, Mar 22, 2022 at 12:23 PM Hao Li <h...@confluent.io.invalid>
> >>> wrote:
> >>>>
> >>>>> `windowedStream.onWindowClose()` was the original option 1
> >>>>> (`windowedStream.emitFinal()`) but was rejected
> >>>>> because we could add more emit types and this will result in adding
> more
> >>>>> functions. I still prefer the
> >>>>> "windowedStream.someFunc(Controlled.onWindowClose)"
> >>>>> model since it's flexible and clear that it's configuring the emit
> >>> policy.
> >>>>> Let me summarize all the naming options we have and compare:
> >>>>>
> >>>>> *API function name:*
> >>>>>
> >>>>> *1. `windowedStream.trigger()`*
> >>>>>       Pros:
> >>>>>          i. Simple
> >>>>>          ii. Similar to Flink's trigger function (is this a con
> >>> actually?)
> >>>>>       Cons:
> >>>>>          i. `trigger()` can be confused with Flink trigger (raised by
> >>> John)
> >>>>>          ii. `trigger()` feels like an operation instead of a
> configure
> >>>>> function (raised by Bruno)?
> >>>>>
> >>>>> *2. `windowedStream.emitTrigger()`*
> >>>>>        Pros:
> >>>>>          i. Avoid confusion from Flink's trigger API
> >>>>>          ii. `emitTrigger` feels like configuring the trigger because
> >>>>> "trigger" here is a noun instead of verbose in `trigger()`
> >>>>>        Cons:
> >>>>>        i: Verbose?
> >>>>>       ii: Not consistent with `Suppressed.untilWindowClose`?
> >>>>>
> >>>>>
> >>>>> *Config class/object name:*
> >>>>>
> >>>>> 1. *`Emitted.onWindowClose()`* and *`Emitted.onEachUpdate()`*
> >>>>>        Cons:
> >>>>>        i. Doesn't go along with `trigger` (raised by Bruno)
> >>>>>
> >>>>> 2. *`Triggered.onWindowClose()`* and *`Triggered.onEachUpdate()`*
> >>>>>
> >>>>> 3. *`EmitTrigger.onWindowClose()`* and *`EmitTrigger.onEachUpdate()`*
> >>>>>
> >>>>> 4. *`(Emit|Trigger)(Config|Policy).onWindowClose()`* and
> >>>>> *`(Emit|Trigger)(Config|Policy).onEachUpdate()`*
> >>>>>        This is a combination of different names like: `EmitConfig`,
> >>>>> `EmitPolicy`, `TriggerConfig` and `TriggerPolicy`...
> >>>>>
> >>>>>
> >>>>> If we are settled with option 1), we can add new options to these
> names
> >>> and
> >>>>> comment on their Pros and Cons.
> >>>>>
> >>>>> Hao
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Tue, Mar 22, 2022 at 10:48 AM Guozhang Wang <wangg...@gmail.com>
> >>> wrote:
> >>>>>
> >>>>>> I see what you mean now, and I think it's a fair point that
> composing
> >>>>>> `trigger` and `emitted` seems awkward.
> >>>>>>
> >>>>>> Re: data process operator v.s. control operator, I shared your
> concern
> >>> as
> >>>>>> well, and here's my train of thoughts: Having only data process
> >>> operators
> >>>>>> was my primary motivation for how we add the suppress operator ---
> it
> >>>>>> indeed "suppresses" data. But as a hind-sight it's disadvantage is
> >>> that,
> >>>>>> for example in Suppressed.onWindowClose() should be only related to
> an
> >>>>>> earlier windowedBy operator which is possibly very far from it in
> the
> >>>>>> resulting DSL code. It's not only a bit awkward for users to write
> such
> >>>>>> code, but also in such cases the DSL builder needs to maintain and
> >>>>>> propagate this information to the suppress operator further down.
> So we
> >>>>> are
> >>>>>> now thinking about "putting the control object as close as to where
> the
> >>>>>> related processor really happens". And in that world my original
> >>>>>> preference was somewhere in option 2), i.e. just put the control as
> a
> >>>>> param
> >>>>>> of the related "windowedBy" operator, but the trade-off is we keep
> >>> adding
> >>>>>> overloaded functions to these operators. So after some back and
> forth
> >>>>>> thoughts I'm learning towards relaxing our principles to only have
> >>>>>> processing operators but no flow-control operators. That being
> said, if
> >>>>> you
> >>>>>> have any ideas that we can have both world's benefits I'm all ears.
> >>>>>>
> >>>>>> Re: using a direct function like "windowedStream.onWindowClose()"
> v.s.
> >>>>>> "windowedStream.someFunc(Controlled.onWindowClose)", again my
> >>> motivation
> >>>>>> for the latter is for extensibility without adding more functions in
> >>> the
> >>>>>> future. If people feel this is not worthy we can do the first
> option as
> >>>>>> well. If we just feel the `trigger` and `emitted` does not feel
> >>>>> composible
> >>>>>> together, maybe we can consider something like
> >>>>>> `windowedStream.trigger(Triggered.onWindowClose())"?
> >>>>>>
> >>>>>> Re: windowedBy v.s. windowBy, yeah I do not really have a good
> reason
> >>> why
> >>>>>> we should use past term as well :P But if it's not bothering people
> >>> much
> >>>>>> I'd say we just keep it than deprecate/rename new APIs.
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Mar 22, 2022 at 9:42 AM Bruno Cadonna <cado...@apache.org>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hi Guozhang,
> >>>>>>>
> >>>>>>> There is no semantic difference. It is a cosmetic difference.
> >>>>>>> Conceptually, I relate `Emitted` with the aggregation and not with
> >>>>>>> `trigger()` in the API flow, because the aggregation emits the
> result
> >>>>>>> not `trigger()`. Therefore, I proposed to not use `Emitted` as the
> >>> name
> >>>>>>> of the config object passed to `trigger()`.
> >>>>>>>
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Bruno
> >>>>>>>
> >>>>>>> On 22.03.22 17:24, Guozhang Wang wrote:
> >>>>>>>> Hi Bruno,
> >>>>>>>>
> >>>>>>>> Could you elaborate a bit more here, what's the semantic
> difference
> >>>>>>> between
> >>>>>>>> "the aggregation is triggered on window close and all aggregation
> >>>>>> results
> >>>>>>>> are emitted." for trigger(TriggerParameters.onWindowClose()), and
> >>>>> "the
> >>>>>>>> aggregation is configured to only emit final results." for
> >>>>>>>> trigger(Emitted.onWindowClose())?
> >>>>>>>>
> >>>>>>>> On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna <cado...@apache.org
> >
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Hao,
> >>>>>>>>>
> >>>>>>>>> Thank you for the KIP!
> >>>>>>>>>
> >>>>>>>>> Regarding option 1, I would not use `Emitted.onWindowClose()`
> since
> >>>>>> that
> >>>>>>>>> does not seem compatible with the proposed flow. Conceptually,
> now
> >>>>> the
> >>>>>>>>> flow states that the aggregation is triggered on window close and
> >>>>> all
> >>>>>>>>> aggregation results are emitted. `Emitted` suggests that the
> >>>>>> aggregation
> >>>>>>>>> is configured to only emit final results.
> >>>>>>>>>
> >>>>>>>>> Thus, I propose the following:
> >>>>>>>>>
> >>>>>>>>> stream
> >>>>>>>>>         .groupBy(..)
> >>>>>>>>>         .windowedBy(..)
> >>>>>>>>>         .trigger(TriggerParameters.onWindowClose())
> >>>>>>>>>         .aggregate(..) //result in a KTable<Windowed<..>>
> >>>>>>>>>         .mapValues(..)
> >>>>>>>>>
> >>>>>>>>> An alternative to `trigger()` could be `schedule()`, but I do not
> >>>>>> really
> >>>>>>>>> like it.
> >>>>>>>>>
> >>>>>>>>> One thing I noticed with option 1 is that all other methods in
> the
> >>>>>>>>> example above are operations on data. `groupBy()` groups,
> >>>>>> `windowedBy()`
> >>>>>>>>> partitions, `aggregate()` computes the aggregate, `mapValues()`
> maps
> >>>>>>>>> values, even `suppress()` suppresses intermediate results. But
> what
> >>>>>> does
> >>>>>>>>> `trigger()` do? `trigger()` seems a config lost among operations.
> >>>>>>>>>
> >>>>>>>>> However, if we do not want to restrict ourselves to only use
> methods
> >>>>>>>>> when we want to specify operations on data, I have the following
> >>>>>>> proposal:
> >>>>>>>>>
> >>>>>>>>> stream
> >>>>>>>>>         .groupBy(..)
> >>>>>>>>>         .windowedBy(..)
> >>>>>>>>>         .onWindowClose()
> >>>>>>>>>         .aggregate(..) //result in a KTable<Windowed<..>>
> >>>>>>>>>         .mapValues(..)
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Bruno
> >>>>>>>>>
> >>>>>>>>> P.S.: Why is it `windowedBy()` and not `windowBy()`? All other
> >>>>>>>>> operations also use present tense.
> >>>>>>>>>
> >>>>>>>>> On 22.03.22 06:36, Hao Li wrote:
> >>>>>>>>>> Hi John,
> >>>>>>>>>>
> >>>>>>>>>> Yes. For naming, `trigger` is similar to Flink's trigger, but it
> >>>>> has
> >>>>>> a
> >>>>>>>>>> different meaning in our case. `emit` sounds like an action to
> >>>>> emit?
> >>>>>>> How
> >>>>>>>>>> about `emitTrigger`? I'm open to suggestions for the naming.
> >>>>>>>>>>
> >>>>>>>>>> For deprecating `Suppressed.untilWindowClose`, I agree with
> >>>>> Guozhang
> >>>>>> we
> >>>>>>>>> can
> >>>>>>>>>> deprecate `Suppressed` config as a whole later. Or we can
> deprecate
> >>>>>>>>>> `Suppressed.untilWindowClose` in later KIP after implementation
> of
> >>>>>> emit
> >>>>>>>>>> final is done.
> >>>>>>>>>>
> >>>>>>>>>> BTW, isn't
> >>>>>>>>>>
> >>>>>>>>>> stream
> >>>>>>>>>>       .groupBy(..)
> >>>>>>>>>>       .windowBy(..)
> >>>>>>>>>>       .aggregate(..) //result in a KTable<Windowed<..>>
> >>>>>>>>>>       .mapValues(..)
> >>>>>>>>>>       .suppress(Suppressed.untilWindowClose) // since we can
> trace
> >>>>> back
> >>>>>>> to
> >>>>>>>>>> parent node, to find a window definition
> >>>>>>>>>>
> >>>>>>>>>> same as
> >>>>>>>>>>
> >>>>>>>>>> stream
> >>>>>>>>>>       .groupBy(..)
> >>>>>>>>>>       .windowBy(..)
> >>>>>>>>>>       .trigger(Emitted.onWindowClose)
> >>>>>>>>>>       .aggregate(..) //result in a KTable<Windowed<..>>
> >>>>>>>>>>       .mapValues(..)
> >>>>>>>>>> ?
> >>>>>>>>>>
> >>>>>>>>>> Hao
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang <
> wangg...@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> I think the following case is only doable via `suppress`:
> >>>>>>>>>>>
> >>>>>>>>>>> stream
> >>>>>>>>>>>       .groupBy(..)
> >>>>>>>>>>>       .windowBy(..)
> >>>>>>>>>>>       .aggregate(..) //result in a KTable<Windowed<..>>
> >>>>>>>>>>>       .mapValues(..)
> >>>>>>>>>>>       .suppress(Suppressed.untilWindowClose) // since we can
> trace
> >>>>>> back
> >>>>>>> to
> >>>>>>>>>>> parent node, to find a window definition
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Guozhang
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Mon, Mar 21, 2022 at 6:36 PM John Roesler <
> vvcep...@apache.org
> >>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Thanks, Guozhang!
> >>>>>>>>>>>>
> >>>>>>>>>>>> To clarify, I was asking specifically about deprecating just
> the
> >>>>>>> method
> >>>>>>>>>>>> ‘untilWindowClose’. I might not be thinking clearly about it,
> >>>>>> though.
> >>>>>>>>>>> What
> >>>>>>>>>>>> does untilWindowClose do that this KIP doesn’t cover?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>> John
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote:
> >>>>>>>>>>>>> Just my 2c: Suppressed is in `suppress` whose application
> scope
> >>>>> is
> >>>>>>>>> much
> >>>>>>>>>>>>> larger and hence more flexible. I.e. it can be used anywhere
> >>>>> for a
> >>>>>>>>>>>> `KTable`
> >>>>>>>>>>>>> (but internally we would check whether certain emit policies
> >>>>> like
> >>>>>>>>>>>>> `untilWindowClose` is valid or not), whereas `trigger` as for
> >>>>> now
> >>>>>> is
> >>>>>>>>>>> only
> >>>>>>>>>>>>> applicable in XXWindowedKStream. So I think it would not be
> >>>>>>> completely
> >>>>>>>>>>>>> replacing Suppressed.untilWindowClose.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> In the future, personally I'd still want to keep one control
> >>>>>> object
> >>>>>>>>>>> still
> >>>>>>>>>>>>> for all emit policies, and maybe if we have extended Emitted
> for
> >>>>>>> other
> >>>>>>>>>>>>> emitting policies covered by Suppressed today, we can
> discuss if
> >>>>>> we
> >>>>>>>>>>> could
> >>>>>>>>>>>>> have `KTable.suppress(Emitted..)` replacing
> >>>>>>>>>>>> `KTable.suppress(Suppressed..)`
> >>>>>>>>>>>>> as a whole, but for this KIP I think it's too early.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Mon, Mar 21, 2022 at 6:18 PM John Roesler <
> >>>>> vvcep...@apache.org
> >>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for the Kip, Hao!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> For what it’s worth, I’m also in favor of your latest
> framing
> >>>>> of
> >>>>>>> the
> >>>>>>>>>>>> API,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I think the name is fine. I assume it’s inspired by Flink?
> It’s
> >>>>>> not
> >>>>>>>>>>>>>> identical to the concept of a trigger in Flink, which
> specifies
> >>>>>>> when
> >>>>>>>>>>> to
> >>>>>>>>>>>>>> evaluate the window, which might be confusing to some people
> >>>>> who
> >>>>>>> have
> >>>>>>>>>>>> deep
> >>>>>>>>>>>>>> experience with Flink. Then again, it seems close enough
> that
> >>>>> it
> >>>>>>>>>>> should
> >>>>>>>>>>>> be
> >>>>>>>>>>>>>> clear to casual Flink users. For people with no other stream
> >>>>>>>>>>> processing
> >>>>>>>>>>>>>> experience, it might seem a bit esoteric compared to
> something
> >>>>>>>>>>>>>> self-documenting like ‘emit()’, but the docs should  make it
> >>>>>> clear.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> One small question: it seems like this proposal is
> identical to
> >>>>>>>>>>>>>> Suppressed.untilWindowClose, and the KIP states that this
> API
> >>>>> is
> >>>>>>>>>>>> superior.
> >>>>>>>>>>>>>> In that case, should we deprecate
> Suppressed.untilWindowClose?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>> John
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote:
> >>>>>>>>>>>>>>> Hi Hao,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> For 2), I think it's a good idea in general to use a
> separate
> >>>>>>>>>>>> function on
> >>>>>>>>>>>>>>> the Time/SessionWindowedKStream itself, to achieve the same
> >>>>>> effect
> >>>>>>>>>>>> that,
> >>>>>>>>>>>>>>> for now, the emitting control is only for windowed
> >>>>> aggregations
> >>>>>> as
> >>>>>>>>>>> in
> >>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>> KIP, than overloading existing functions. We can discuss
> >>>>> further
> >>>>>>>>>>> about
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> actual function names, whether others like the name
> `trigger`
> >>>>> or
> >>>>>>>>>>> not.
> >>>>>>>>>>>> As
> >>>>>>>>>>>>>>> for myself I feel `trigger` is a good one but I'd like to
> see
> >>>>> if
> >>>>>>>>>>>> others
> >>>>>>>>>>>>>>> have opinions as well.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Mon, Mar 21, 2022 at 5:18 PM Hao Li
> >>>>> <h...@confluent.io.invalid
> >>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Guozhang,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks for the feedback.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 1. I agree to have an `Emitted` control class and two
> static
> >>>>>>>>>>>>>> constructors
> >>>>>>>>>>>>>>>> named `onWindowClose` and `onEachUpdate`.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 2. For the API function changes, I'm thinking of adding a
> new
> >>>>>>>>>>>> function
> >>>>>>>>>>>>>>>> called `trigger` to `TimeWindowedKStream` and
> >>>>>>>>>>>> `SessionWindowedKStream`.
> >>>>>>>>>>>>>> It
> >>>>>>>>>>>>>>>> takes `Emitted` config and returns the same stream.
> Example:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> stream
> >>>>>>>>>>>>>>>>       .groupBy(...)
> >>>>>>>>>>>>>>>>       .windowedBy(...)
> >>>>>>>>>>>>>>>>       .trigger(Emitted.onWindowClose). // N
> >>>>>>>>>>>>>>>>       .count()
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> The benefits are:
> >>>>>>>>>>>>>>>>       1. It's simple and avoids creating overloading of
> >>>>> existing
> >>>>>>>>>>>> functions
> >>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>> `windowedBy` or `count`, `reduce` or `aggregate`. In
> fact, to
> >>>>>> add
> >>>>>>>>>>> it
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> `aggregate` functions, we need to add it to all existing
> >>>>>> `count`,
> >>>>>>>>>>>>>>>> `aggregate` overloading functions which is a lot.
> >>>>>>>>>>>>>>>>       2. It operates directly on windowed kstream and
> tells
> >>> how
> >>>>>> its
> >>>>>>>>>>>> output
> >>>>>>>>>>>>>>>> should be configured, if later we need to add this other
> type
> >>>>>> of
> >>>>>>>>>>>>>> streams,
> >>>>>>>>>>>>>>>> we can reuse same `trigger` API whereas other type of
> >>>>>>>>>>> streams/tables
> >>>>>>>>>>>> may
> >>>>>>>>>>>>>>>> not have `aggregate`, `windowedby` api to make it
> consistent.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hao
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang <
> >>>>>>> wangg...@gmail.com>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hello Hao,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I'm preferring option 2 over the other options mainly
> >>>>> because
> >>>>>>> the
> >>>>>>>>>>>>>> added
> >>>>>>>>>>>>>>>>> config object could potentially be used in other
> operators
> >>>>> as
> >>>>>>>>>>> well
> >>>>>>>>>>>>>> (not
> >>>>>>>>>>>>>>>>> necessarily has to be a windowed operator and hence have
> to
> >>>>> be
> >>>>>>>>>>>>>>>> piggy-backed
> >>>>>>>>>>>>>>>>> on `windowedBy`, and that's also why I suggested not
> naming
> >>>>> it
> >>>>>>>>>>>>>>>>> `WindowConfig` but just `EmitConfig`).
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> As for Matthias' question, I think the difference between
> >>>>> the
> >>>>>>>>>>>> windowed
> >>>>>>>>>>>>>>>>> aggregate operator and the stream-stream join operator is
> >>>>>> that,
> >>>>>>>>>>> for
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> latter we think emit-final should be the only right
> emitting
> >>>>>>>>>>> policy
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> hence we should not let users to configure it. If users
> >>>>>>> configure
> >>>>>>>>>>>> it
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> e.g. emit eager they may get the old spurious emitting
> >>>>>> behavior
> >>>>>>>>>>>> which
> >>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> violating the semantics.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> For option 2) itself, I have a few more thoughts:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 1. Thinking about Matthias' suggestions, I'm also
> leaning a
> >>>>>> bit
> >>>>>>>>>>>>>>>>> towards adding the new param in the overloaded
> `aggregate`,
> >>>>>> than
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> overloaded `windowBy` function. The reason is that the
> >>>>>> emitting
> >>>>>>>>>>>> logic
> >>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>> be either window based or non-window based, in the long
> run.
> >>>>>>>>>>> Though
> >>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>> this KIP we could just add it in
> >>>>>>>>>>> `XXXWindowedKStream.aggregate()`,
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>> may
> >>>>>>>>>>>>>>>>> want to extend to other non-windowed operators in the
> >>>>> future.
> >>>>>>>>>>>>>>>>> 2. To be consistent with other control class names, I
> feel
> >>>>>> maybe
> >>>>>>>>>>> we
> >>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>> name it "Emitted", not "EmitConfig".
> >>>>>>>>>>>>>>>>> 3. Following the first comment, I think we can have the
> >>>>> static
> >>>>>>>>>>>>>>>> constructor
> >>>>>>>>>>>>>>>>> names as "onWindowClose" and "onEachUpdate".
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> The resulted code pattern would be like this:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>        stream
> >>>>>>>>>>>>>>>>>          .groupBy(..)
> >>>>>>>>>>>>>>>>>          .windowBy(TimeWindow..)
> >>>>>>>>>>>>>>>>>          .count(Emitted.onWindowClose)
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> WDYT?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax <
> >>>>>>>>>>> mj...@apache.org
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> `allowedLateness` may not be a good name. What I have
> in
> >>>>>>>>>>> mind
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>>>>> this to control how frequently we try to emit final
> >>>>>> results.
> >>>>>>>>>>>>>> Maybe
> >>>>>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>>>> more flexible to be used as config in properties as we
> >>>>>> don't
> >>>>>>>>>>>>>> need to
> >>>>>>>>>>>>>>>>>>>> recompile DSL to change it.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I see; making it a config seems better. Frankly, I am
> not
> >>>>>> even
> >>>>>>>>>>>> sure
> >>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>> we need a config at all or if we can just hard code it?
> For
> >>>>>> the
> >>>>>>>>>>>>>>>>>> stream-stream join left/outer join fix, there is only an
> >>>>>>>>>>> internal
> >>>>>>>>>>>>>>>> config
> >>>>>>>>>>>>>>>>>> but no public config either.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Option 1: Your proposal is?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>        stream
> >>>>>>>>>>>>>>>>>>          .groupByKey()
> >>>>>>>>>>>>>>>>>>          .windowBy(TimeWindow.ofSizeNoGrace(...))
> >>>>>>>>>>>>>>>>>>          .configure(EmitConfig.emitFinal()
> >>>>>>>>>>>>>>>>>>          .count()
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Does not change my argument that it seems to be misplace
> >>>>> from
> >>>>>>>>>>> an
> >>>>>>>>>>>> API
> >>>>>>>>>>>>>>>>>> flow POV.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Option 1 seems to be the least desirable to me.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> For option 2 and 3, and not sure which one I like
> better.
> >>>>>> Might
> >>>>>>>>>>>> be
> >>>>>>>>>>>>>> good
> >>>>>>>>>>>>>>>>>> if other could chime in, too. I think I slightly prefer
> >>>>>> option
> >>>>>>>>>>> 2
> >>>>>>>>>>>>>> over
> >>>>>>>>>>>>>>>>>> option 3.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On 3/15/22 5:33 PM, Hao Li wrote:
> >>>>>>>>>>>>>>>>>>> Thanks for the feedback Matthias.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> `allowedLateness` may not be a good name. What I have
> in
> >>>>>> mind
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>>>> this to control how frequently we try to emit final
> >>>>> results.
> >>>>>>>>>>>> Maybe
> >>>>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>>> more flexible to be used as config in properties as we
> >>>>> don't
> >>>>>>>>>>>> need
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> recompile DSL to change it.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> For option 1, I intend to use `emitFinal` to configure
> how
> >>>>>>>>>>>>>>>>>>> `TimeWindowedKStream` should be outputted to `KTable`
> >>>>> after
> >>>>>>>>>>>>>>>>> aggregation.
> >>>>>>>>>>>>>>>>>>> But `emitFinal` is not an action to the
> >>>>>> `TimeWindowedKStream`
> >>>>>>>>>>>>>>>>> interface.
> >>>>>>>>>>>>>>>>>>> Maybe adding `configure(EmitConfig config)` makes more
> >>>>>> sense?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> For option 2, config can be created using
> >>>>>>>>>>>>>> `WindowConfig.emitFinal()`
> >>>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>> `EmitConfig.emitFinal`
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> For option 3, it will be something like
> `TimeWindows(...,
> >>>>>>>>>>>>>> EmitConfig
> >>>>>>>>>>>>>>>>>>> emitConfig)`.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> For putting `EmitConfig` in aggregation operator, I
> think
> >>>>> it
> >>>>>>>>>>>>>> doesn't
> >>>>>>>>>>>>>>>>>>> control how we do aggregation but how we output to
> >>>>> `KTable`.
> >>>>>>>>>>>>>> That's
> >>>>>>>>>>>>>>>>> why I
> >>>>>>>>>>>>>>>>>>> feel option 1 makes more sense as it applies to
> >>>>>>>>>>>>>>>> `TimeWindowedKStream`.
> >>>>>>>>>>>>>>>>>> But
> >>>>>>>>>>>>>>>>>>> I'm also OK with option 2.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hao
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax <
> >>>>>>>>>>>> mj...@apache.org
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> A general comment: it seem that we won't need any new
> >>>>>>>>>>>>>>>>> `allowedLateness`
> >>>>>>>>>>>>>>>>>>>> parameter because the grace-period is defined on the
> >>>>> window
> >>>>>>>>>>>>>> itself
> >>>>>>>>>>>>>>>>>> already?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> (On the other hand, if I think about it once more,
> maybe
> >>>>>> the
> >>>>>>>>>>>>>>>>>>>> `grace-period` is actually not a property of the
> window
> >>>>> but
> >>>>>>>>>>> a
> >>>>>>>>>>>>>>>> property
> >>>>>>>>>>>>>>>>>>>> of the aggregation operator? _thinking_)
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>       From an API flow point of view, option 1 might
> not
> >>> be
> >>>>>>>>>>>> desirable
> >>>>>>>>>>>>>>>>> IMHO:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>         stream
> >>>>>>>>>>>>>>>>>>>>           .groupByKey()
> >>>>>>>>>>>>>>>>>>>>           .windowBy(TimeWindow.ofSizeNoGrace(...))
> >>>>>>>>>>>>>>>>>>>>           .emitFinal()
> >>>>>>>>>>>>>>>>>>>>           .count()
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> The call to `emitFinal(0` seems not to be on the right
> >>>>>> place
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>> case?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Option 2 might work (I think we need to discuss a few
> >>>>>>>>>>> details
> >>>>>>>>>>>> of
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> API
> >>>>>>>>>>>>>>>>>>>> though):
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>         stream
> >>>>>>>>>>>>>>>>>>>>           .groupByKey()
> >>>>>>>>>>>>>>>>>>>>           .windowBy(
> >>>>>>>>>>>>>>>>>>>>             TimeWindow.ofSizeNoGrace(...),
> >>>>>>>>>>>>>>>>>>>>             EmitConfig.emitFinal() -- just made this
> up;
> >>>>> it's
> >>>>>>>>>>> not
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>>>>>           )
> >>>>>>>>>>>>>>>>>>>>           .count()
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I made up the `WindowConfig.emitFinal()` call -- from
> the
> >>>>>>>>>>> KIP
> >>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>>>> unclear what API you have in mind? `EmitFinalConfig`
> has
> >>>>>> not
> >>>>>>>>>>>>>> public
> >>>>>>>>>>>>>>>>>>>> constructor not any builder method.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> For option 3, I am not sure what you really have in
> mind.
> >>>>>>>>>>> Can
> >>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>> given
> >>>>>>>>>>>>>>>>>>>> a concrete example (similar to above) how users would
> >>>>> write
> >>>>>>>>>>>> their
> >>>>>>>>>>>>>>>>> code?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Did you consider to actually pass in the `EmitConfig`
> >>>>> into
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> aggregation operator? In the end, it seems not to be
> >>>>>>>>>>> property
> >>>>>>>>>>>> of
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> window definition or windowing step, but a property of
> >>>>> the
> >>>>>>>>>>>> actual
> >>>>>>>>>>>>>>>>>> operator:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>         stream
> >>>>>>>>>>>>>>>>>>>>           .groupByKey()
> >>>>>>>>>>>>>>>>>>>>           .windowBy(
> >>>>>>>>>>>>>>>>>>>>             TimeWindow.ofSizeNoGrace(...)
> >>>>>>>>>>>>>>>>>>>>           )
> >>>>>>>>>>>>>>>>>>>>           .count(EmitConfig.emitFinal())
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> The API surface area that need to be updated might be
> >>>>>> larger
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>> case though...
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On 3/14/22 9:21 PM, Hao Li wrote:
> >>>>>>>>>>>>>>>>>>>>> Thanks Guozhang!
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 1. I agree `EmitConfig` is better than `WindowConfig`
> >>>>> and
> >>>>>>>>>>>>>> option 2
> >>>>>>>>>>>>>>>>>>>> modifies
> >>>>>>>>>>>>>>>>>>>>> less places. What do you think of option 1 which
> doesn't
> >>>>>>>>>>>> change
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> current
> >>>>>>>>>>>>>>>>>>>>> `windowedBy` api but configures `EmitConfig`
> separately.
> >>>>>>>>>>> The
> >>>>>>>>>>>>>>>> benefit
> >>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>> option 1 is if we need to configure something else
> >>>>> later,
> >>>>>>>>>>> we
> >>>>>>>>>>>>>> don't
> >>>>>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> pile them on `windowedBy` but can add separate APIs.
> >>>>>>>>>>>>>>>>>>>>> 2. I added it to `Stores` mainly to conform to
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231
> >>>>>>>>>>>>>>>>>>>> .
> >>>>>>>>>>>>>>>>>>>>> But We can also create an internal API to do that
> >>>>> without
> >>>>>>>>>>>>>> modifying
> >>>>>>>>>>>>>>>>>>>>> `Stores`.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hao
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang <
> >>>>>>>>>>>>>> wangg...@gmail.com>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Hello Hao,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Thanks for the proposal, I have some preference
> among
> >>>>> the
> >>>>>>>>>>>>>> options
> >>>>>>>>>>>>>>>>> here
> >>>>>>>>>>>>>>>>>>>> so I
> >>>>>>>>>>>>>>>>>>>>>> will copy them here:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I'm now thinking if it's better to not add this new
> >>>>>> config
> >>>>>>>>>>>> on
> >>>>>>>>>>>>>> each
> >>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> Window interfaces, but instead add that at the
> >>>>>>>>>>>>>>>>>> KGroupedStream#windowedBy
> >>>>>>>>>>>>>>>>>>>>>> function. Also instead of adding just a boolean
> flag,
> >>>>>>>>>>> maybe
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>> add a
> >>>>>>>>>>>>>>>>>>>>>> Configured class like Grouped, Suppressed, etc, e.g.
> >>>>>> let's
> >>>>>>>>>>>> call
> >>>>>>>>>>>>>>>> it a
> >>>>>>>>>>>>>>>>>>>>>> Emitted which for now would just have a single
> >>>>> construct
> >>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>> Emitted.atWindowClose whose semantics is the same as
> >>>>>>>>>>>> emitFinal
> >>>>>>>>>>>>>> ==
> >>>>>>>>>>>>>>>>>> true.
> >>>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>> think the benefits are:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 1) you do not need to modify multiple Window
> classes,
> >>>>> but
> >>>>>>>>>>>> just
> >>>>>>>>>>>>>>>>>> overload
> >>>>>>>>>>>>>>>>>>>> one
> >>>>>>>>>>>>>>>>>>>>>> windowedBy function with a second param. This is
> less
> >>>>> of
> >>>>>> a
> >>>>>>>>>>>>>> scope
> >>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> now,
> >>>>>>>>>>>>>>>>>>>>>> and also more extensible for any future changes.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 2) With a config interface, we maintain its
> >>>>> extensibility
> >>>>>>>>>>> as
> >>>>>>>>>>>>>> well
> >>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>> being
> >>>>>>>>>>>>>>>>>>>>>> able to reuse this Emitted interface for other
> >>>>> operators
> >>>>>>>>>>> if
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>> wanted
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> expand to.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> ----------------------------
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> So in general I'm leaning towards option 2). For
> that,
> >>>>>>>>>>> some
> >>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>> detailed
> >>>>>>>>>>>>>>>>>>>>>> comments:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 1) If we want to reuse that config object for other
> >>>>>>>>>>>> non-window
> >>>>>>>>>>>>>>>>>> stateful
> >>>>>>>>>>>>>>>>>>>>>> operations, I think naming it as `EmitConfig` is
> >>>>> probably
> >>>>>>>>>>>>>> better
> >>>>>>>>>>>>>>>>> than
> >>>>>>>>>>>>>>>>>>>>>> `WindowConfig`.
> >>>>>>>>>>>>>>>>>>>>>> 2) I saw your PR (
> >>>>>>>>>>>> https://github.com/apache/kafka/pull/11892)
> >>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>>> also proposing to add new stores into the public
> >>>>> factory
> >>>>>>>>>>>>>> Stores,
> >>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>>>>>> not included in the KIP. Is that intentional?
> >>>>> Personally
> >>>>>> I
> >>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>> although we may eventually want to add a new store
> type
> >>>>>> to
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> public
> >>>>>>>>>>>>>>>>>>>> APIs,
> >>>>>>>>>>>>>>>>>>>>>> for this KIP maybe we do not have to add them but
> can
> >>>>>>>>>>> delay
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>> later
> >>>>>>>>>>>>>>>>>>>> after
> >>>>>>>>>>>>>>>>>>>>>> we've learned the best way to layout. LMK what do
> you
> >>>>>>>>>>> think?
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 11, 2022 at 2:13 PM Hao Li
> >>>>>>>>>>>>>> <h...@confluent.io.invalid>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Hi Dev team,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion thread on Kafka
> Streams
> >>>>>>>>>>>>>> KIP-825:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> This KIP is aimed to add new APIs to support
> >>>>> outputting
> >>>>>>>>>>>> final
> >>>>>>>>>>>>>>>>>>>> aggregated
> >>>>>>>>>>>>>>>>>>>>>>> results for windowed aggregations. I listed several
> >>>>>>>>>>> options
> >>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>> I'm
> >>>>>>>>>>>>>>>>>>>>>>> looking forward to your feedback.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>> Hao
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>> Hao
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> --
> >>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> Thanks,
> >>>>> Hao
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>


-- 
Thanks,
Hao

Reply via email to