Thanks, Hao,

The KIP looks good to me overall, so I'll go ahead and vote.
I did notice a couple of typos, though:

> static EmitStrategy onWindowClose() {
> return new WindowCloseTrigger();
> }

should return WindowCloseStrategy. The other strategy also
references WindowUpdateTrigger, which I'm guessing should be
WindowUpdateStrategy, and which should also be specified.

More generally, it seems a bit roundabout to define an
interface that specifies an enum along with static methods
that return classes that themselves return enum values from
the interface. I'm guessing this is all by way of just
making the API look nice.

Both of those issues seem relatively minor, though, so I'm
comfortable casting my vote at this point.

Thanks for all the good work here,
-John

On Wed, 2022-03-23 at 20:51 -0700, Hao Li wrote:
> Hi all,
> 
> I just updated the KIP with option 1 as design and put option 2 and 3 in
> rejected alternatives. Since Matthias is strongly against `trigger`,
> I adopted the proposed `EmitStrategy` and dropped the "with" in the
> function name. So it's like this:
> 
> stream
>       .groupBy(..)
>       .windowedBy(..)
>       .emitStrategy(EmitStrategy.onWindowClose())
>       .aggregate(..)
>       .mapValues(..)
> 
> I used `onWindowClose` since `EmitStrategy` is meant to be an interface.
> 
> Hao
> 
> On Wed, Mar 23, 2022 at 6:35 PM Matthias J. Sax <mj...@apache.org> wrote:
> 
> > Wow. Quite a thread... #namingIsHard :D
> > 
> > I won't repeat all arguments which are all very good ones. I can just
> > state my personal favorite option:
> > 
> > stream
> >       .groupBy(..)
> >       .windowedBy(..)
> >       .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
> >       .aggregate(..)
> >       .mapValues(..)
> > 
> > Is seems to be the best compromise / trade-off across the board.
> > Personally, I would strong advocate against using `trigger()`!
> > 
> > 
> > -Matthias
> > 
> > 
> > On 3/23/22 4:38 PM, Guozhang Wang wrote:
> > > Hao is right, I think that's the hindsight we have for `suppress` which
> > > since can be applied anywhere for a K(windowed)Table, incurs an awkward
> > > programming flexibility and I felt it's better to make its application
> > > scope more constraint.
> > > 
> > > And I also agree with John that, unless any of us feel strongly about any
> > > options, Hao could make the final call about the namings.
> > > 
> > > 
> > > Guozhang
> > > 
> > > On Wed, Mar 23, 2022 at 1:49 PM Hao Li <h...@confluent.io.invalid> wrote:
> > > 
> > > > For
> > > > 
> > > > stream
> > > >        .groupBy(..)
> > > >        .windowedBy(..)
> > > >        .aggregate(..)
> > > >        .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
> > > >        .mapValues(..)
> > > > 
> > > > I think after `aggregate` it's already a table and then the emit
> > strategy
> > > > is too late to control
> > > > how windowed stream is outputted to table. This is the concern Guozhang
> > > > raised about having this in existing `suppress` operator as well.
> > > > 
> > > > Thanks,
> > > > Hao
> > > > 
> > > > On Wed, Mar 23, 2022 at 1:05 PM Bruno Cadonna <cado...@apache.org>
> > wrote:
> > > > 
> > > > > Hi,
> > > > > 
> > > > > Thank you for your answers to my questions!
> > > > > 
> > > > > I see the argument about conciseness of configuring a stream with
> > > > > methods instead of config objects. I just miss a bit the descriptive
> > > > > aspect.
> > > > > 
> > > > > What about
> > > > > 
> > > > > stream
> > > > >        .groupBy(..)
> > > > >        .windowedBy(..)
> > > > >        .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
> > > > >        .aggregate(..)
> > > > >        .mapValues(..)
> > > > > 
> > > > > I have also another question. Why should emitting of results be
> > > > > controlled by the window level api? If I want to emit results for each
> > > > > input record the emit strategy is quite independent from the window. 
> > > > > So
> > > > > I somehow share Matthias' and Guozhang's concern that the emit 
> > > > > strategy
> > > > > seems misplaced there.
> > > > > 
> > > > > What are the arguments against?
> > > > > 
> > > > > stream
> > > > >        .groupBy(..)
> > > > >        .windowedBy(..)
> > > > >        .aggregate(..)
> > > > >        .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
> > > > >        .mapValues(..)
> > > > > 
> > > > > 
> > > > > A final administrative request: Hao, could you please add the rejected
> > > > > alternatives to the KIP so that future us will know why we rejected
> > them?
> > > > > 
> > > > > Best,
> > > > > Bruno
> > > > > 
> > > > > On 23.03.22 19:38, John Roesler wrote:
> > > > > > Hi all,
> > > > > > 
> > > > > > I can see both sides of this.
> > > > > > 
> > > > > > On one hand, when we say
> > > > > > "stream.groupBy().windowBy().count()", it seems like we're
> > > > > > telling KS to take the raw stream, group it based on key,
> > > > > > then window it based on time, and then compute an
> > > > > > aggregation on the windows. In that model, "trigger()" would
> > > > > > have to mean something like "trigger it", which doesn't
> > > > > > really make sense, since we aren't "triggering" the
> > > > > > aggregation (then again, to an outside observer, it would
> > > > > > appear that way... food for thought).
> > > > > > 
> > > > > > Another way to look at it is that all we're really doing is
> > > > > > configuring a windowed aggreation on the stream, and we're
> > > > > > doing it with a progressive builder interface. In other
> > > > > > words, the above is just a progressive builder for
> > > > > > configuring an operation like
> > > > > > "stream.aggregate(groupingConfig, windowingConfig,
> > > > > > countFn)". Under the latter interpretation of the DSL, it
> > > > > > makes perfect sense to add more optional progressive builder
> > > > > > methods like trigger() to the WindowedKStream interfaces.
> > > > > > 
> > > > > > Since part of the motivation for choosing the word "trigger"
> > > > > > here is to stay close to what Flink defines, I'll also point
> > > > > > out that Flink's syntax is also
> > > > > > "stream.keyBy().window().trigger().aggregate()". Not that
> > > > > > their API is the holy grail or anything, but it's at least
> > > > > > an indication that this API isn't a horrible mistake.
> > > > > > 
> > > > > > All other things being equal, I also prefer to leave tie-
> > > > > > breakers in the hands of the contributor. So, if we've all
> > > > > > said our piece and Hao still prefers option 1, then (as long
> > > > > > as we don't think it's a horrible mistake), I think we
> > > > > > should just let him go for it.
> > > > > > 
> > > > > > Speaking of which, after reviewing the responses regarding
> > > > > > deprecating `Suppressed#onWindowClose`, I still think we
> > > > > > should just go ahead and deprecate it. Although it's not
> > > > > > expressed exactly the same way, it still does exactly the
> > > > > > same thing, or so close that it seems confusing to keep
> > > > > > both. But again, if Hao really prefers to keep both, I won't
> > > > > > insist on it :)
> > > > > > 
> > > > > > Thanks all,
> > > > > > -John
> > > > > > 
> > > > > > On Wed, 2022-03-23 at 09:59 -0700, Hao Li wrote:
> > > > > > > Thanks Bruno!
> > > > > > > 
> > > > > > > Argument for option 1 is:
> > > > > > > 1. Concise and descriptive. It avoids overloading existing 
> > > > > > > functions
> > > > and
> > > > > > > it's very clear what it's doing. Imagine if there's a autocomplete
> > > > > feature
> > > > > > > in Intellij or other IDE for our DSL in the future, it's not
> > favorable
> > > > > to
> > > > > > > show 6 `windowedBy` functions.
> > > > > > > 2. Option 1 is operated on `windowedStream` to configure how it
> > should
> > > > > be
> > > > > > > outputted. Option 2 operates on `KGroupedStream` to produce
> > > > > > > `windowedStream` as well as configure how `windowedStream` should 
> > > > > > > be
> > > > > > >       outputted. I feel it's better to have a `windowedStream` and
> > then
> > > > > > > configure how it can be outputted. Somehow I feel option 2 breaks 
> > > > > > > the
> > > > > > > builder pattern.
> > > > > > > 3. `WindowedByParameters` doesn't seem very descriptive. If we put
> > all
> > > > > > > kinds of different parameters into it to avoid future overloading,
> > > > it's
> > > > > too
> > > > > > > bloated and not very user friendly.
> > > > > > > 
> > > > > > > I agree option 1's `trigger` function is configuring the stream 
> > > > > > > which
> > > > > feels
> > > > > > > different from existing `count` or `aggregate` etc. Configuring 
> > > > > > > might
> > > > be
> > > > > > > also a kind of action to stream :) I'm not sure if it breaks DSL
> > > > > principle
> > > > > > > and if it does,
> > > > > > > can we relax the principle given the benefits compared to option 
> > > > > > > 2)?
> > > > > Maybe
> > > > > > > John can chime in as the DSL grammar author.
> > > > > > > 
> > > > > > > Thanks,
> > > > > > > Hao
> > > > > > > 
> > > > > > > On Wed, Mar 23, 2022 at 2:59 AM Bruno Cadonna <cado...@apache.org>
> > > > > wrote:
> > > > > > > 
> > > > > > > > Hi Hao,
> > > > > > > > 
> > > > > > > > I agree with Guozhang: Great summary! Thank you!
> > > > > > > > 
> > > > > > > > Regarding "aligned with other config class names", there is 
> > > > > > > > this DSL
> > > > > > > > grammar John once specified
> > > > > > > > 
> > > > > 
> > > > 
> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
> > > > > > > > and we have already used it in the code. I found the grammar 
> > > > > > > > quite
> > > > > useful.
> > > > > > > > 
> > > > > > > > I am undecided if option 1 is really worth it. What are 
> > > > > > > > actually the
> > > > > > > > arguments in favor of it? Is it only that we do not need to 
> > > > > > > > overload
> > > > > > > > other methods? This does not seem worth to break DSL 
> > > > > > > > principles. An
> > > > > > > > alternative proposal would be to go with option 2 and conform 
> > > > > > > > with
> > > > the
> > > > > > > > grammar above:
> > > > > > > > 
> > > > > > > > <W extends Window> TimeWindowedKStream<K, V> windowedBy(final
> > > > > Windows<W>
> > > > > > > > windows, WindowedByParameters parameters);
> > > > > > > > 
> > > > > > > > TimeWindowedKStream<K, V> windowedBy(final SlidingWindows 
> > > > > > > > windows,
> > > > > > > > WindowedByParameters parameters);
> > > > > > > > 
> > > > > > > > SessionWindowedKStream<K, V> windowedBy(final SessionWindows
> > windows,
> > > > > > > > WindowedByParameters parameters);
> > > > > > > > 
> > > > > > > > This is similar to option 2 in the KIP, but it ensures that we 
> > > > > > > > put
> > > > all
> > > > > > > > future needed configs in the parameters object and we do not 
> > > > > > > > need to
> > > > > > > > overload the methods anymore.
> > > > > > > > 
> > > > > > > > Then if we also get KAFKA-10298 done, we could even collapse all
> > > > > > > > `windowedBy()` methods into one.
> > > > > > > > 
> > > > > > > > Best,
> > > > > > > > Bruno
> > > > > > > > 
> > > > > > > > On 22.03.22 22:31, Guozhang Wang wrote:
> > > > > > > > > Thanks for the great summary Hao. I'm still learning towards 
> > > > > > > > > option
> > > > 2)
> > > > > > > > > here, and I'm in favor of `trigger` as function name, and
> > > > `Triggered`
> > > > > as
> > > > > > > > > config class name (mainly to be aligned with other config 
> > > > > > > > > class
> > > > > names).
> > > > > > > > > Also want to see other's preferences between the options, as 
> > > > > > > > > well
> > as
> > > > > the
> > > > > > > > > namings.
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > Guozhang
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > On Tue, Mar 22, 2022 at 12:23 PM Hao Li 
> > > > > > > > > <h...@confluent.io.invalid>
> > > > > > > > wrote:
> > > > > > > > > 
> > > > > > > > > > `windowedStream.onWindowClose()` was the original option 1
> > > > > > > > > > (`windowedStream.emitFinal()`) but was rejected
> > > > > > > > > > because we could add more emit types and this will result in
> > adding
> > > > > more
> > > > > > > > > > functions. I still prefer the
> > > > > > > > > > "windowedStream.someFunc(Controlled.onWindowClose)"
> > > > > > > > > > model since it's flexible and clear that it's configuring 
> > > > > > > > > > the emit
> > > > > > > > policy.
> > > > > > > > > > Let me summarize all the naming options we have and compare:
> > > > > > > > > > 
> > > > > > > > > > *API function name:*
> > > > > > > > > > 
> > > > > > > > > > *1. `windowedStream.trigger()`*
> > > > > > > > > >        Pros:
> > > > > > > > > >           i. Simple
> > > > > > > > > >           ii. Similar to Flink's trigger function (is this 
> > > > > > > > > > a con
> > > > > > > > actually?)
> > > > > > > > > >        Cons:
> > > > > > > > > >           i. `trigger()` can be confused with Flink trigger
> > (raised
> > > > by
> > > > > > > > John)
> > > > > > > > > >           ii. `trigger()` feels like an operation instead 
> > > > > > > > > > of a
> > > > > configure
> > > > > > > > > > function (raised by Bruno)?
> > > > > > > > > > 
> > > > > > > > > > *2. `windowedStream.emitTrigger()`*
> > > > > > > > > >         Pros:
> > > > > > > > > >           i. Avoid confusion from Flink's trigger API
> > > > > > > > > >           ii. `emitTrigger` feels like configuring the 
> > > > > > > > > > trigger
> > > > because
> > > > > > > > > > "trigger" here is a noun instead of verbose in `trigger()`
> > > > > > > > > >         Cons:
> > > > > > > > > >         i: Verbose?
> > > > > > > > > >        ii: Not consistent with 
> > > > > > > > > > `Suppressed.untilWindowClose`?
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > *Config class/object name:*
> > > > > > > > > > 
> > > > > > > > > > 1. *`Emitted.onWindowClose()`* and 
> > > > > > > > > > *`Emitted.onEachUpdate()`*
> > > > > > > > > >         Cons:
> > > > > > > > > >         i. Doesn't go along with `trigger` (raised by Bruno)
> > > > > > > > > > 
> > > > > > > > > > 2. *`Triggered.onWindowClose()`* and 
> > > > > > > > > > *`Triggered.onEachUpdate()`*
> > > > > > > > > > 
> > > > > > > > > > 3. *`EmitTrigger.onWindowClose()`* and
> > > > *`EmitTrigger.onEachUpdate()`*
> > > > > > > > > > 
> > > > > > > > > > 4. *`(Emit|Trigger)(Config|Policy).onWindowClose()`* and
> > > > > > > > > > *`(Emit|Trigger)(Config|Policy).onEachUpdate()`*
> > > > > > > > > >         This is a combination of different names like:
> > `EmitConfig`,
> > > > > > > > > > `EmitPolicy`, `TriggerConfig` and `TriggerPolicy`...
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > If we are settled with option 1), we can add new options to 
> > > > > > > > > > these
> > > > > names
> > > > > > > > and
> > > > > > > > > > comment on their Pros and Cons.
> > > > > > > > > > 
> > > > > > > > > > Hao
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > On Tue, Mar 22, 2022 at 10:48 AM Guozhang Wang <
> > wangg...@gmail.com
> > > > > 
> > > > > > > > wrote:
> > > > > > > > > > 
> > > > > > > > > > > I see what you mean now, and I think it's a fair point 
> > > > > > > > > > > that
> > > > > composing
> > > > > > > > > > > `trigger` and `emitted` seems awkward.
> > > > > > > > > > > 
> > > > > > > > > > > Re: data process operator v.s. control operator, I shared 
> > > > > > > > > > > your
> > > > > concern
> > > > > > > > as
> > > > > > > > > > > well, and here's my train of thoughts: Having only data 
> > > > > > > > > > > process
> > > > > > > > operators
> > > > > > > > > > > was my primary motivation for how we add the suppress 
> > > > > > > > > > > operator
> > ---
> > > > > it
> > > > > > > > > > > indeed "suppresses" data. But as a hind-sight it's 
> > > > > > > > > > > disadvantage
> > is
> > > > > > > > that,
> > > > > > > > > > > for example in Suppressed.onWindowClose() should be only 
> > > > > > > > > > > related
> > > > to
> > > > > an
> > > > > > > > > > > earlier windowedBy operator which is possibly very far 
> > > > > > > > > > > from it in
> > > > > the
> > > > > > > > > > > resulting DSL code. It's not only a bit awkward for users 
> > > > > > > > > > > to
> > write
> > > > > such
> > > > > > > > > > > code, but also in such cases the DSL builder needs to 
> > > > > > > > > > > maintain
> > and
> > > > > > > > > > > propagate this information to the suppress operator 
> > > > > > > > > > > further down.
> > > > > So we
> > > > > > > > > > are
> > > > > > > > > > > now thinking about "putting the control object as close 
> > > > > > > > > > > as to
> > > > where
> > > > > the
> > > > > > > > > > > related processor really happens". And in that world my 
> > > > > > > > > > > original
> > > > > > > > > > > preference was somewhere in option 2), i.e. just put the 
> > > > > > > > > > > control
> > > > as
> > > > > a
> > > > > > > > > > param
> > > > > > > > > > > of the related "windowedBy" operator, but the trade-off 
> > > > > > > > > > > is we
> > keep
> > > > > > > > adding
> > > > > > > > > > > overloaded functions to these operators. So after some 
> > > > > > > > > > > back and
> > > > > forth
> > > > > > > > > > > thoughts I'm learning towards relaxing our principles to 
> > > > > > > > > > > only
> > have
> > > > > > > > > > > processing operators but no flow-control operators. That 
> > > > > > > > > > > being
> > > > > said, if
> > > > > > > > > > you
> > > > > > > > > > > have any ideas that we can have both world's benefits I'm 
> > > > > > > > > > > all
> > > > ears.
> > > > > > > > > > > 
> > > > > > > > > > > Re: using a direct function like 
> > > > > > > > > > > "windowedStream.onWindowClose()"
> > > > > v.s.
> > > > > > > > > > > "windowedStream.someFunc(Controlled.onWindowClose)", 
> > > > > > > > > > > again my
> > > > > > > > motivation
> > > > > > > > > > > for the latter is for extensibility without adding more 
> > > > > > > > > > > functions
> > > > in
> > > > > > > > the
> > > > > > > > > > > future. If people feel this is not worthy we can do the 
> > > > > > > > > > > first
> > > > > option as
> > > > > > > > > > > well. If we just feel the `trigger` and `emitted` does 
> > > > > > > > > > > not feel
> > > > > > > > > > composible
> > > > > > > > > > > together, maybe we can consider something like
> > > > > > > > > > > `windowedStream.trigger(Triggered.onWindowClose())"?
> > > > > > > > > > > 
> > > > > > > > > > > Re: windowedBy v.s. windowBy, yeah I do not really have a 
> > > > > > > > > > > good
> > > > > reason
> > > > > > > > why
> > > > > > > > > > > we should use past term as well :P But if it's not 
> > > > > > > > > > > bothering
> > > > people
> > > > > > > > much
> > > > > > > > > > > I'd say we just keep it than deprecate/rename new APIs.
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > On Tue, Mar 22, 2022 at 9:42 AM Bruno Cadonna <
> > cado...@apache.org
> > > > > 
> > > > > > > > > > wrote:
> > > > > > > > > > > 
> > > > > > > > > > > > Hi Guozhang,
> > > > > > > > > > > > 
> > > > > > > > > > > > There is no semantic difference. It is a cosmetic 
> > > > > > > > > > > > difference.
> > > > > > > > > > > > Conceptually, I relate `Emitted` with the aggregation 
> > > > > > > > > > > > and not
> > > > with
> > > > > > > > > > > > `trigger()` in the API flow, because the aggregation 
> > > > > > > > > > > > emits the
> > > > > result
> > > > > > > > > > > > not `trigger()`. Therefore, I proposed to not use 
> > > > > > > > > > > > `Emitted` as
> > > > the
> > > > > > > > name
> > > > > > > > > > > > of the config object passed to `trigger()`.
> > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > > Best,
> > > > > > > > > > > > Bruno
> > > > > > > > > > > > 
> > > > > > > > > > > > On 22.03.22 17:24, Guozhang Wang wrote:
> > > > > > > > > > > > > Hi Bruno,
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Could you elaborate a bit more here, what's the 
> > > > > > > > > > > > > semantic
> > > > > difference
> > > > > > > > > > > > between
> > > > > > > > > > > > > "the aggregation is triggered on window close and all
> > > > aggregation
> > > > > > > > > > > results
> > > > > > > > > > > > > are emitted." for 
> > > > > > > > > > > > > trigger(TriggerParameters.onWindowClose()),
> > > > and
> > > > > > > > > > "the
> > > > > > > > > > > > > aggregation is configured to only emit final 
> > > > > > > > > > > > > results." for
> > > > > > > > > > > > > trigger(Emitted.onWindowClose())?
> > > > > > > > > > > > > 
> > > > > > > > > > > > > On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna <
> > > > cado...@apache.org
> > > > > > 
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > 
> > > > > > > > > > > > > > Hi Hao,
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Thank you for the KIP!
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Regarding option 1, I would not use 
> > > > > > > > > > > > > > `Emitted.onWindowClose()`
> > > > > since
> > > > > > > > > > > that
> > > > > > > > > > > > > > does not seem compatible with the proposed flow. 
> > > > > > > > > > > > > > Conceptually,
> > > > > now
> > > > > > > > > > the
> > > > > > > > > > > > > > flow states that the aggregation is triggered on 
> > > > > > > > > > > > > > window close
> > > > and
> > > > > > > > > > all
> > > > > > > > > > > > > > aggregation results are emitted. `Emitted` suggests 
> > > > > > > > > > > > > > that the
> > > > > > > > > > > aggregation
> > > > > > > > > > > > > > is configured to only emit final results.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Thus, I propose the following:
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > stream
> > > > > > > > > > > > > >          .groupBy(..)
> > > > > > > > > > > > > >          .windowedBy(..)
> > > > > > > > > > > > > >          .trigger(TriggerParameters.onWindowClose())
> > > > > > > > > > > > > >          .aggregate(..) //result in a 
> > > > > > > > > > > > > > KTable<Windowed<..>>
> > > > > > > > > > > > > >          .mapValues(..)
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > An alternative to `trigger()` could be 
> > > > > > > > > > > > > > `schedule()`, but I do
> > > > not
> > > > > > > > > > > really
> > > > > > > > > > > > > > like it.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > One thing I noticed with option 1 is that all other 
> > > > > > > > > > > > > > methods in
> > > > > the
> > > > > > > > > > > > > > example above are operations on data. `groupBy()` 
> > > > > > > > > > > > > > groups,
> > > > > > > > > > > `windowedBy()`
> > > > > > > > > > > > > > partitions, `aggregate()` computes the aggregate,
> > `mapValues()`
> > > > > maps
> > > > > > > > > > > > > > values, even `suppress()` suppresses intermediate 
> > > > > > > > > > > > > > results. But
> > > > > what
> > > > > > > > > > > does
> > > > > > > > > > > > > > `trigger()` do? `trigger()` seems a config lost 
> > > > > > > > > > > > > > among
> > > > operations.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > However, if we do not want to restrict ourselves to 
> > > > > > > > > > > > > > only use
> > > > > methods
> > > > > > > > > > > > > > when we want to specify operations on data, I have 
> > > > > > > > > > > > > > the
> > > > following
> > > > > > > > > > > > proposal:
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > stream
> > > > > > > > > > > > > >          .groupBy(..)
> > > > > > > > > > > > > >          .windowedBy(..)
> > > > > > > > > > > > > >          .onWindowClose()
> > > > > > > > > > > > > >          .aggregate(..) //result in a 
> > > > > > > > > > > > > > KTable<Windowed<..>>
> > > > > > > > > > > > > >          .mapValues(..)
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > Bruno
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > P.S.: Why is it `windowedBy()` and not 
> > > > > > > > > > > > > > `windowBy()`? All other
> > > > > > > > > > > > > > operations also use present tense.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > On 22.03.22 06:36, Hao Li wrote:
> > > > > > > > > > > > > > > Hi John,
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Yes. For naming, `trigger` is similar to Flink's 
> > > > > > > > > > > > > > > trigger, but
> > > > it
> > > > > > > > > > has
> > > > > > > > > > > a
> > > > > > > > > > > > > > > different meaning in our case. `emit` sounds like 
> > > > > > > > > > > > > > > an action
> > to
> > > > > > > > > > emit?
> > > > > > > > > > > > How
> > > > > > > > > > > > > > > about `emitTrigger`? I'm open to suggestions for 
> > > > > > > > > > > > > > > the naming.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > For deprecating `Suppressed.untilWindowClose`, I 
> > > > > > > > > > > > > > > agree with
> > > > > > > > > > Guozhang
> > > > > > > > > > > we
> > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > deprecate `Suppressed` config as a whole later. 
> > > > > > > > > > > > > > > Or we can
> > > > > deprecate
> > > > > > > > > > > > > > > `Suppressed.untilWindowClose` in later KIP after
> > > > implementation
> > > > > of
> > > > > > > > > > > emit
> > > > > > > > > > > > > > > final is done.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > BTW, isn't
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > stream
> > > > > > > > > > > > > > >        .groupBy(..)
> > > > > > > > > > > > > > >        .windowBy(..)
> > > > > > > > > > > > > > >        .aggregate(..) //result in a 
> > > > > > > > > > > > > > > KTable<Windowed<..>>
> > > > > > > > > > > > > > >        .mapValues(..)
> > > > > > > > > > > > > > >        .suppress(Suppressed.untilWindowClose) // 
> > > > > > > > > > > > > > > since we can
> > > > > trace
> > > > > > > > > > back
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > parent node, to find a window definition
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > same as
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > stream
> > > > > > > > > > > > > > >        .groupBy(..)
> > > > > > > > > > > > > > >        .windowBy(..)
> > > > > > > > > > > > > > >        .trigger(Emitted.onWindowClose)
> > > > > > > > > > > > > > >        .aggregate(..) //result in a 
> > > > > > > > > > > > > > > KTable<Windowed<..>>
> > > > > > > > > > > > > > >        .mapValues(..)
> > > > > > > > > > > > > > > ?
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Hao
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang <
> > > > > wangg...@gmail.com>
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > I think the following case is only doable via 
> > > > > > > > > > > > > > > > `suppress`:
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > stream
> > > > > > > > > > > > > > > >        .groupBy(..)
> > > > > > > > > > > > > > > >        .windowBy(..)
> > > > > > > > > > > > > > > >        .aggregate(..) //result in a 
> > > > > > > > > > > > > > > > KTable<Windowed<..>>
> > > > > > > > > > > > > > > >        .mapValues(..)
> > > > > > > > > > > > > > > >        .suppress(Suppressed.untilWindowClose) 
> > > > > > > > > > > > > > > > // since we
> > can
> > > > > trace
> > > > > > > > > > > back
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > > parent node, to find a window definition
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > Guozhang
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > On Mon, Mar 21, 2022 at 6:36 PM John Roesler <
> > > > > vvcep...@apache.org
> > > > > > > > > > > 
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > Thanks, Guozhang!
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > To clarify, I was asking specifically about 
> > > > > > > > > > > > > > > > > deprecating
> > just
> > > > > the
> > > > > > > > > > > > method
> > > > > > > > > > > > > > > > > ‘untilWindowClose’. I might not be thinking 
> > > > > > > > > > > > > > > > > clearly about
> > > > it,
> > > > > > > > > > > though.
> > > > > > > > > > > > > > > > What
> > > > > > > > > > > > > > > > > does untilWindowClose do that this KIP 
> > > > > > > > > > > > > > > > > doesn’t cover?
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > John
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > On Mon, Mar 21, 2022, at 20:31, Guozhang Wang 
> > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > Just my 2c: Suppressed is in `suppress` 
> > > > > > > > > > > > > > > > > > whose application
> > > > > scope
> > > > > > > > > > is
> > > > > > > > > > > > > > much
> > > > > > > > > > > > > > > > > > larger and hence more flexible. I.e. it can 
> > > > > > > > > > > > > > > > > > be used
> > > > anywhere
> > > > > > > > > > for a
> > > > > > > > > > > > > > > > > `KTable`
> > > > > > > > > > > > > > > > > > (but internally we would check whether 
> > > > > > > > > > > > > > > > > > certain emit
> > > > policies
> > > > > > > > > > like
> > > > > > > > > > > > > > > > > > `untilWindowClose` is valid or not), 
> > > > > > > > > > > > > > > > > > whereas `trigger` as
> > > > for
> > > > > > > > > > now
> > > > > > > > > > > is
> > > > > > > > > > > > > > > > only
> > > > > > > > > > > > > > > > > > applicable in XXWindowedKStream. So I think 
> > > > > > > > > > > > > > > > > > it would not
> > be
> > > > > > > > > > > > completely
> > > > > > > > > > > > > > > > > > replacing Suppressed.untilWindowClose.
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > In the future, personally I'd still want to 
> > > > > > > > > > > > > > > > > > keep one
> > > > control
> > > > > > > > > > > object
> > > > > > > > > > > > > > > > still
> > > > > > > > > > > > > > > > > > for all emit policies, and maybe if we have 
> > > > > > > > > > > > > > > > > > extended
> > > > Emitted
> > > > > for
> > > > > > > > > > > > other
> > > > > > > > > > > > > > > > > > emitting policies covered by Suppressed 
> > > > > > > > > > > > > > > > > > today, we can
> > > > > discuss if
> > > > > > > > > > > we
> > > > > > > > > > > > > > > > could
> > > > > > > > > > > > > > > > > > have `KTable.suppress(Emitted..)` replacing
> > > > > > > > > > > > > > > > > `KTable.suppress(Suppressed..)`
> > > > > > > > > > > > > > > > > > as a whole, but for this KIP I think it's 
> > > > > > > > > > > > > > > > > > too early.
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > Guozhang
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > On Mon, Mar 21, 2022 at 6:18 PM John 
> > > > > > > > > > > > > > > > > > Roesler <
> > > > > > > > > > vvcep...@apache.org
> > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > Thanks for the Kip, Hao!
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > For what it’s worth, I’m also in favor of 
> > > > > > > > > > > > > > > > > > > your latest
> > > > > framing
> > > > > > > > > > of
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > API,
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > I think the name is fine. I assume it’s 
> > > > > > > > > > > > > > > > > > > inspired by
> > Flink?
> > > > > It’s
> > > > > > > > > > > not
> > > > > > > > > > > > > > > > > > > identical to the concept of a trigger in 
> > > > > > > > > > > > > > > > > > > Flink, which
> > > > > specifies
> > > > > > > > > > > > when
> > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > evaluate the window, which might be 
> > > > > > > > > > > > > > > > > > > confusing to some
> > > > people
> > > > > > > > > > who
> > > > > > > > > > > > have
> > > > > > > > > > > > > > > > > deep
> > > > > > > > > > > > > > > > > > > experience with Flink. Then again, it 
> > > > > > > > > > > > > > > > > > > seems close enough
> > > > > that
> > > > > > > > > > it
> > > > > > > > > > > > > > > > should
> > > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > > clear to casual Flink users. For people 
> > > > > > > > > > > > > > > > > > > with no other
> > > > stream
> > > > > > > > > > > > > > > > processing
> > > > > > > > > > > > > > > > > > > experience, it might seem a bit esoteric 
> > > > > > > > > > > > > > > > > > > compared to
> > > > > something
> > > > > > > > > > > > > > > > > > > self-documenting like ‘emit()’, but the 
> > > > > > > > > > > > > > > > > > > docs should  make
> > > > it
> > > > > > > > > > > clear.
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > One small question: it seems like this 
> > > > > > > > > > > > > > > > > > > proposal is
> > > > > identical to
> > > > > > > > > > > > > > > > > > > Suppressed.untilWindowClose, and the KIP 
> > > > > > > > > > > > > > > > > > > states that this
> > > > > API
> > > > > > > > > > is
> > > > > > > > > > > > > > > > > superior.
> > > > > > > > > > > > > > > > > > > In that case, should we deprecate
> > > > > Suppressed.untilWindowClose?
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > John
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > On Mon, Mar 21, 2022, at 19:30, Guozhang 
> > > > > > > > > > > > > > > > > > > Wang wrote:
> > > > > > > > > > > > > > > > > > > > Hi Hao,
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > For 2), I think it's a good idea in 
> > > > > > > > > > > > > > > > > > > > general to use a
> > > > > separate
> > > > > > > > > > > > > > > > > function on
> > > > > > > > > > > > > > > > > > > > the Time/SessionWindowedKStream itself, 
> > > > > > > > > > > > > > > > > > > > to achieve the
> > > > same
> > > > > > > > > > > effect
> > > > > > > > > > > > > > > > > that,
> > > > > > > > > > > > > > > > > > > > for now, the emitting control is only 
> > > > > > > > > > > > > > > > > > > > for windowed
> > > > > > > > > > aggregations
> > > > > > > > > > > as
> > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > KIP, than overloading existing 
> > > > > > > > > > > > > > > > > > > > functions. We can discuss
> > > > > > > > > > further
> > > > > > > > > > > > > > > > about
> > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > actual function names, whether others 
> > > > > > > > > > > > > > > > > > > > like the name
> > > > > `trigger`
> > > > > > > > > > or
> > > > > > > > > > > > > > > > not.
> > > > > > > > > > > > > > > > > As
> > > > > > > > > > > > > > > > > > > > for myself I feel `trigger` is a good 
> > > > > > > > > > > > > > > > > > > > one but I'd like
> > to
> > > > > see
> > > > > > > > > > if
> > > > > > > > > > > > > > > > > others
> > > > > > > > > > > > > > > > > > > > have opinions as well.
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > Guozhang
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > On Mon, Mar 21, 2022 at 5:18 PM Hao Li
> > > > > > > > > > <h...@confluent.io.invalid
> > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > Hi Guozhang,
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > Thanks for the feedback.
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > 1. I agree to have an `Emitted` 
> > > > > > > > > > > > > > > > > > > > > control class and two
> > > > > static
> > > > > > > > > > > > > > > > > > > constructors
> > > > > > > > > > > > > > > > > > > > > named `onWindowClose` and 
> > > > > > > > > > > > > > > > > > > > > `onEachUpdate`.
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > 2. For the API function changes, I'm 
> > > > > > > > > > > > > > > > > > > > > thinking of adding
> > > > a
> > > > > new
> > > > > > > > > > > > > > > > > function
> > > > > > > > > > > > > > > > > > > > > called `trigger` to 
> > > > > > > > > > > > > > > > > > > > > `TimeWindowedKStream` and
> > > > > > > > > > > > > > > > > `SessionWindowedKStream`.
> > > > > > > > > > > > > > > > > > > It
> > > > > > > > > > > > > > > > > > > > > takes `Emitted` config and returns 
> > > > > > > > > > > > > > > > > > > > > the same stream.
> > > > > Example:
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > stream
> > > > > > > > > > > > > > > > > > > > >        .groupBy(...)
> > > > > > > > > > > > > > > > > > > > >        .windowedBy(...)
> > > > > > > > > > > > > > > > > > > > >        
> > > > > > > > > > > > > > > > > > > > > .trigger(Emitted.onWindowClose). // N
> > > > > > > > > > > > > > > > > > > > >        .count()
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > The benefits are:
> > > > > > > > > > > > > > > > > > > > >        1. It's simple and avoids 
> > > > > > > > > > > > > > > > > > > > > creating overloading
> > of
> > > > > > > > > > existing
> > > > > > > > > > > > > > > > > functions
> > > > > > > > > > > > > > > > > > > like
> > > > > > > > > > > > > > > > > > > > > `windowedBy` or `count`, `reduce` or 
> > > > > > > > > > > > > > > > > > > > > `aggregate`. In
> > > > > fact, to
> > > > > > > > > > > add
> > > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > `aggregate` functions, we need to add 
> > > > > > > > > > > > > > > > > > > > > it to all
> > existing
> > > > > > > > > > > `count`,
> > > > > > > > > > > > > > > > > > > > > `aggregate` overloading functions 
> > > > > > > > > > > > > > > > > > > > > which is a lot.
> > > > > > > > > > > > > > > > > > > > >        2. It operates directly on 
> > > > > > > > > > > > > > > > > > > > > windowed kstream and
> > > > > tells
> > > > > > > > how
> > > > > > > > > > > its
> > > > > > > > > > > > > > > > > output
> > > > > > > > > > > > > > > > > > > > > should be configured, if later we 
> > > > > > > > > > > > > > > > > > > > > need to add this
> > other
> > > > > type
> > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > > streams,
> > > > > > > > > > > > > > > > > > > > > we can reuse same `trigger` API 
> > > > > > > > > > > > > > > > > > > > > whereas other type of
> > > > > > > > > > > > > > > > streams/tables
> > > > > > > > > > > > > > > > > may
> > > > > > > > > > > > > > > > > > > > > not have `aggregate`, `windowedby` 
> > > > > > > > > > > > > > > > > > > > > api to make it
> > > > > consistent.
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > Hao
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > On Sat, Mar 19, 2022 at 5:40 PM 
> > > > > > > > > > > > > > > > > > > > > Guozhang Wang <
> > > > > > > > > > > > wangg...@gmail.com>
> > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > Hello Hao,
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > I'm preferring option 2 over the 
> > > > > > > > > > > > > > > > > > > > > > other options mainly
> > > > > > > > > > because
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > added
> > > > > > > > > > > > > > > > > > > > > > config object could potentially be 
> > > > > > > > > > > > > > > > > > > > > > used in other
> > > > > operators
> > > > > > > > > > as
> > > > > > > > > > > > > > > > well
> > > > > > > > > > > > > > > > > > > (not
> > > > > > > > > > > > > > > > > > > > > > necessarily has to be a windowed 
> > > > > > > > > > > > > > > > > > > > > > operator and hence
> > > > have
> > > > > to
> > > > > > > > > > be
> > > > > > > > > > > > > > > > > > > > > piggy-backed
> > > > > > > > > > > > > > > > > > > > > > on `windowedBy`, and that's also 
> > > > > > > > > > > > > > > > > > > > > > why I suggested not
> > > > > naming
> > > > > > > > > > it
> > > > > > > > > > > > > > > > > > > > > > `WindowConfig` but just 
> > > > > > > > > > > > > > > > > > > > > > `EmitConfig`).
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > As for Matthias' question, I think 
> > > > > > > > > > > > > > > > > > > > > > the difference
> > > > between
> > > > > > > > > > the
> > > > > > > > > > > > > > > > > windowed
> > > > > > > > > > > > > > > > > > > > > > aggregate operator and the 
> > > > > > > > > > > > > > > > > > > > > > stream-stream join operator
> > > > is
> > > > > > > > > > > that,
> > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > latter we think emit-final should 
> > > > > > > > > > > > > > > > > > > > > > be the only right
> > > > > emitting
> > > > > > > > > > > > > > > > policy
> > > > > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > > > > hence we should not let users to 
> > > > > > > > > > > > > > > > > > > > > > configure it. If
> > users
> > > > > > > > > > > > configure
> > > > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > > e.g. emit eager they may get the 
> > > > > > > > > > > > > > > > > > > > > > old spurious emitting
> > > > > > > > > > > behavior
> > > > > > > > > > > > > > > > > which
> > > > > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > > > > > violating the semantics.
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > For option 2) itself, I have a few 
> > > > > > > > > > > > > > > > > > > > > > more thoughts:
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > 1. Thinking about Matthias' 
> > > > > > > > > > > > > > > > > > > > > > suggestions, I'm also
> > > > > leaning a
> > > > > > > > > > > bit
> > > > > > > > > > > > > > > > > > > > > > towards adding the new param in the 
> > > > > > > > > > > > > > > > > > > > > > overloaded
> > > > > `aggregate`,
> > > > > > > > > > > than
> > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > overloaded `windowBy` function. The 
> > > > > > > > > > > > > > > > > > > > > > reason is that the
> > > > > > > > > > > emitting
> > > > > > > > > > > > > > > > > logic
> > > > > > > > > > > > > > > > > > > > > could
> > > > > > > > > > > > > > > > > > > > > > be either window based or 
> > > > > > > > > > > > > > > > > > > > > > non-window based, in the
> > long
> > > > > run.
> > > > > > > > > > > > > > > > Though
> > > > > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > > > this KIP we could just add it in
> > > > > > > > > > > > > > > > `XXXWindowedKStream.aggregate()`,
> > > > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > > may
> > > > > > > > > > > > > > > > > > > > > > want to extend to other 
> > > > > > > > > > > > > > > > > > > > > > non-windowed operators in the
> > > > > > > > > > future.
> > > > > > > > > > > > > > > > > > > > > > 2. To be consistent with other 
> > > > > > > > > > > > > > > > > > > > > > control class names, I
> > > > > feel
> > > > > > > > > > > maybe
> > > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > > > > > > > name it "Emitted", not "EmitConfig".
> > > > > > > > > > > > > > > > > > > > > > 3. Following the first comment, I 
> > > > > > > > > > > > > > > > > > > > > > think we can have
> > the
> > > > > > > > > > static
> > > > > > > > > > > > > > > > > > > > > constructor
> > > > > > > > > > > > > > > > > > > > > > names as "onWindowClose" and 
> > > > > > > > > > > > > > > > > > > > > > "onEachUpdate".
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > The resulted code pattern would be 
> > > > > > > > > > > > > > > > > > > > > > like this:
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > >         stream
> > > > > > > > > > > > > > > > > > > > > >           .groupBy(..)
> > > > > > > > > > > > > > > > > > > > > >           .windowBy(TimeWindow..)
> > > > > > > > > > > > > > > > > > > > > >           
> > > > > > > > > > > > > > > > > > > > > > .count(Emitted.onWindowClose)
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > WDYT?
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 16, 2022 at 12:07 PM 
> > > > > > > > > > > > > > > > > > > > > > Matthias J. Sax <
> > > > > > > > > > > > > > > > mj...@apache.org
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > `allowedLateness` may not be 
> > > > > > > > > > > > > > > > > > > > > > > > > a good name. What I
> > > > have
> > > > > in
> > > > > > > > > > > > > > > > mind
> > > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > > use
> > > > > > > > > > > > > > > > > > > > > > > > > this to control how 
> > > > > > > > > > > > > > > > > > > > > > > > > frequently we try to emit 
> > > > > > > > > > > > > > > > > > > > > > > > > final
> > > > > > > > > > > results.
> > > > > > > > > > > > > > > > > > > Maybe
> > > > > > > > > > > > > > > > > > > > > > it's
> > > > > > > > > > > > > > > > > > > > > > > > > more flexible to be used as 
> > > > > > > > > > > > > > > > > > > > > > > > > config in properties as
> > > > we
> > > > > > > > > > > don't
> > > > > > > > > > > > > > > > > > > need to
> > > > > > > > > > > > > > > > > > > > > > > > > recompile DSL to change it.
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > I see; making it a config seems 
> > > > > > > > > > > > > > > > > > > > > > > better. Frankly, I am
> > > > > not
> > > > > > > > > > > even
> > > > > > > > > > > > > > > > > sure
> > > > > > > > > > > > > > > > > > > if
> > > > > > > > > > > > > > > > > > > > > > > we need a config at all or if we 
> > > > > > > > > > > > > > > > > > > > > > > can just hard code
> > > > it?
> > > > > For
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > stream-stream join left/outer 
> > > > > > > > > > > > > > > > > > > > > > > join fix, there is only
> > > > an
> > > > > > > > > > > > > > > > internal
> > > > > > > > > > > > > > > > > > > > > config
> > > > > > > > > > > > > > > > > > > > > > > but no public config either.
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > Option 1: Your proposal is?
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > >         stream
> > > > > > > > > > > > > > > > > > > > > > >           .groupByKey()
> > > > > > > > > > > > > > > > > > > > > > >           
> > > > > > > > > > > > > > > > > > > > > > > .windowBy(TimeWindow.ofSizeNoGrace(...))
> > > > > > > > > > > > > > > > > > > > > > >           
> > > > > > > > > > > > > > > > > > > > > > > .configure(EmitConfig.emitFinal()
> > > > > > > > > > > > > > > > > > > > > > >           .count()
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > Does not change my argument that 
> > > > > > > > > > > > > > > > > > > > > > > it seems to be
> > > > misplace
> > > > > > > > > > from
> > > > > > > > > > > > > > > > an
> > > > > > > > > > > > > > > > > API
> > > > > > > > > > > > > > > > > > > > > > > flow POV.
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > Option 1 seems to be the least 
> > > > > > > > > > > > > > > > > > > > > > > desirable to me.
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > For option 2 and 3, and not sure 
> > > > > > > > > > > > > > > > > > > > > > > which one I like
> > > > > better.
> > > > > > > > > > > Might
> > > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > > good
> > > > > > > > > > > > > > > > > > > > > > > if other could chime in, too. I 
> > > > > > > > > > > > > > > > > > > > > > > think I slightly
> > > > prefer
> > > > > > > > > > > option
> > > > > > > > > > > > > > > > 2
> > > > > > > > > > > > > > > > > > > over
> > > > > > > > > > > > > > > > > > > > > > > option 3.
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > -Matthias
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > On 3/15/22 5:33 PM, Hao Li wrote:
> > > > > > > > > > > > > > > > > > > > > > > > Thanks for the feedback 
> > > > > > > > > > > > > > > > > > > > > > > > Matthias.
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > `allowedLateness` may not be a 
> > > > > > > > > > > > > > > > > > > > > > > > good name. What I
> > have
> > > > > in
> > > > > > > > > > > mind
> > > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > use
> > > > > > > > > > > > > > > > > > > > > > > > this to control how frequently 
> > > > > > > > > > > > > > > > > > > > > > > > we try to emit final
> > > > > > > > > > results.
> > > > > > > > > > > > > > > > > Maybe
> > > > > > > > > > > > > > > > > > > > > it's
> > > > > > > > > > > > > > > > > > > > > > > > more flexible to be used as 
> > > > > > > > > > > > > > > > > > > > > > > > config in properties as
> > > > we
> > > > > > > > > > don't
> > > > > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > > > > recompile DSL to change it.
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > For option 1, I intend to use 
> > > > > > > > > > > > > > > > > > > > > > > > `emitFinal` to
> > > > configure
> > > > > how
> > > > > > > > > > > > > > > > > > > > > > > > `TimeWindowedKStream` should be 
> > > > > > > > > > > > > > > > > > > > > > > > outputted to
> > `KTable`
> > > > > > > > > > after
> > > > > > > > > > > > > > > > > > > > > > aggregation.
> > > > > > > > > > > > > > > > > > > > > > > > But `emitFinal` is not an 
> > > > > > > > > > > > > > > > > > > > > > > > action to the
> > > > > > > > > > > `TimeWindowedKStream`
> > > > > > > > > > > > > > > > > > > > > > interface.
> > > > > > > > > > > > > > > > > > > > > > > > Maybe adding 
> > > > > > > > > > > > > > > > > > > > > > > > `configure(EmitConfig config)` 
> > > > > > > > > > > > > > > > > > > > > > > > makes
> > > > more
> > > > > > > > > > > sense?
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > For option 2, config can be 
> > > > > > > > > > > > > > > > > > > > > > > > created using
> > > > > > > > > > > > > > > > > > > `WindowConfig.emitFinal()`
> > > > > > > > > > > > > > > > > > > > > or
> > > > > > > > > > > > > > > > > > > > > > > > `EmitConfig.emitFinal`
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > For option 3, it will be 
> > > > > > > > > > > > > > > > > > > > > > > > something like
> > > > > `TimeWindows(...,
> > > > > > > > > > > > > > > > > > > EmitConfig
> > > > > > > > > > > > > > > > > > > > > > > > emitConfig)`.
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > For putting `EmitConfig` in 
> > > > > > > > > > > > > > > > > > > > > > > > aggregation operator, I
> > > > > think
> > > > > > > > > > it
> > > > > > > > > > > > > > > > > > > doesn't
> > > > > > > > > > > > > > > > > > > > > > > > control how we do aggregation 
> > > > > > > > > > > > > > > > > > > > > > > > but how we output to
> > > > > > > > > > `KTable`.
> > > > > > > > > > > > > > > > > > > That's
> > > > > > > > > > > > > > > > > > > > > > why I
> > > > > > > > > > > > > > > > > > > > > > > > feel option 1 makes more sense 
> > > > > > > > > > > > > > > > > > > > > > > > as it applies to
> > > > > > > > > > > > > > > > > > > > > `TimeWindowedKStream`.
> > > > > > > > > > > > > > > > > > > > > > > But
> > > > > > > > > > > > > > > > > > > > > > > > I'm also OK with option 2.
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > Hao
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 15, 2022 at 4:48 PM 
> > > > > > > > > > > > > > > > > > > > > > > > Matthias J. Sax <
> > > > > > > > > > > > > > > > > mj...@apache.org
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP.
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > A general comment: it seem 
> > > > > > > > > > > > > > > > > > > > > > > > > that we won't need any
> > > > new
> > > > > > > > > > > > > > > > > > > > > > `allowedLateness`
> > > > > > > > > > > > > > > > > > > > > > > > > parameter because the 
> > > > > > > > > > > > > > > > > > > > > > > > > grace-period is defined on
> > the
> > > > > > > > > > window
> > > > > > > > > > > > > > > > > > > itself
> > > > > > > > > > > > > > > > > > > > > > > already?
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > (On the other hand, if I 
> > > > > > > > > > > > > > > > > > > > > > > > > think about it once more,
> > > > > maybe
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > `grace-period` is actually 
> > > > > > > > > > > > > > > > > > > > > > > > > not a property of the
> > > > > window
> > > > > > > > > > but
> > > > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > > > > > property
> > > > > > > > > > > > > > > > > > > > > > > > > of the aggregation operator? 
> > > > > > > > > > > > > > > > > > > > > > > > > _thinking_)
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > >        From an API flow point 
> > > > > > > > > > > > > > > > > > > > > > > > > of view, option 1
> > might
> > > > > not
> > > > > > > > be
> > > > > > > > > > > > > > > > > desirable
> > > > > > > > > > > > > > > > > > > > > > IMHO:
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > >          stream
> > > > > > > > > > > > > > > > > > > > > > > > >            .groupByKey()
> > > > > > > > > > > > > > > > > > > > > > > > >            
> > > > > > > > > > > > > > > > > > > > > > > > > .windowBy(TimeWindow.ofSizeNoGrace(...))
> > > > > > > > > > > > > > > > > > > > > > > > >            .emitFinal()
> > > > > > > > > > > > > > > > > > > > > > > > >            .count()
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > The call to `emitFinal(0` 
> > > > > > > > > > > > > > > > > > > > > > > > > seems not to be on the
> > > > right
> > > > > > > > > > > place
> > > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > > > > case?
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > Option 2 might work (I think 
> > > > > > > > > > > > > > > > > > > > > > > > > we need to discuss a
> > > > few
> > > > > > > > > > > > > > > > details
> > > > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > API
> > > > > > > > > > > > > > > > > > > > > > > > > though):
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > >          stream
> > > > > > > > > > > > > > > > > > > > > > > > >            .groupByKey()
> > > > > > > > > > > > > > > > > > > > > > > > >            .windowBy(
> > > > > > > > > > > > > > > > > > > > > > > > >              
> > > > > > > > > > > > > > > > > > > > > > > > > TimeWindow.ofSizeNoGrace(...),
> > > > > > > > > > > > > > > > > > > > > > > > >              
> > > > > > > > > > > > > > > > > > > > > > > > > EmitConfig.emitFinal() -- 
> > > > > > > > > > > > > > > > > > > > > > > > > just made
> > this
> > > > > up;
> > > > > > > > > > it's
> > > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > KIP
> > > > > > > > > > > > > > > > > > > > > > > > >            )
> > > > > > > > > > > > > > > > > > > > > > > > >            .count()
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > I made up the 
> > > > > > > > > > > > > > > > > > > > > > > > > `WindowConfig.emitFinal()` 
> > > > > > > > > > > > > > > > > > > > > > > > > call --
> > > > from
> > > > > the
> > > > > > > > > > > > > > > > KIP
> > > > > > > > > > > > > > > > > > > it's
> > > > > > > > > > > > > > > > > > > > > > > > > unclear what API you have in 
> > > > > > > > > > > > > > > > > > > > > > > > > mind?
> > `EmitFinalConfig`
> > > > > has
> > > > > > > > > > > not
> > > > > > > > > > > > > > > > > > > public
> > > > > > > > > > > > > > > > > > > > > > > > > constructor not any builder 
> > > > > > > > > > > > > > > > > > > > > > > > > method.
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > For option 3, I am not sure 
> > > > > > > > > > > > > > > > > > > > > > > > > what you really have in
> > > > > mind.
> > > > > > > > > > > > > > > > Can
> > > > > > > > > > > > > > > > > you
> > > > > > > > > > > > > > > > > > > > > > given
> > > > > > > > > > > > > > > > > > > > > > > > > a concrete example (similar 
> > > > > > > > > > > > > > > > > > > > > > > > > to above) how users
> > > > would
> > > > > > > > > > write
> > > > > > > > > > > > > > > > > their
> > > > > > > > > > > > > > > > > > > > > > code?
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > Did you consider to actually 
> > > > > > > > > > > > > > > > > > > > > > > > > pass in the
> > > > `EmitConfig`
> > > > > > > > > > into
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > aggregation operator? In the 
> > > > > > > > > > > > > > > > > > > > > > > > > end, it seems not to
> > be
> > > > > > > > > > > > > > > > property
> > > > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > window definition or 
> > > > > > > > > > > > > > > > > > > > > > > > > windowing step, but a property
> > > > of
> > > > > > > > > > the
> > > > > > > > > > > > > > > > > actual
> > > > > > > > > > > > > > > > > > > > > > > operator:
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > >          stream
> > > > > > > > > > > > > > > > > > > > > > > > >            .groupByKey()
> > > > > > > > > > > > > > > > > > > > > > > > >            .windowBy(
> > > > > > > > > > > > > > > > > > > > > > > > >              
> > > > > > > > > > > > > > > > > > > > > > > > > TimeWindow.ofSizeNoGrace(...)
> > > > > > > > > > > > > > > > > > > > > > > > >            )
> > > > > > > > > > > > > > > > > > > > > > > > >            
> > > > > > > > > > > > > > > > > > > > > > > > > .count(EmitConfig.emitFinal())
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > The API surface area that 
> > > > > > > > > > > > > > > > > > > > > > > > > need to be updated might
> > > > be
> > > > > > > > > > > larger
> > > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > > > > > > case though...
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > -Matthias
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > On 3/14/22 9:21 PM, Hao Li 
> > > > > > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Guozhang!
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > 1. I agree `EmitConfig` is 
> > > > > > > > > > > > > > > > > > > > > > > > > > better than
> > > > `WindowConfig`
> > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > option 2
> > > > > > > > > > > > > > > > > > > > > > > > > modifies
> > > > > > > > > > > > > > > > > > > > > > > > > > less places. What do you 
> > > > > > > > > > > > > > > > > > > > > > > > > > think of option 1 which
> > > > > doesn't
> > > > > > > > > > > > > > > > > change
> > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > current
> > > > > > > > > > > > > > > > > > > > > > > > > > `windowedBy` api but 
> > > > > > > > > > > > > > > > > > > > > > > > > > configures `EmitConfig`
> > > > > separately.
> > > > > > > > > > > > > > > > The
> > > > > > > > > > > > > > > > > > > > > benefit
> > > > > > > > > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > > > > > > > > > option 1 is if we need to 
> > > > > > > > > > > > > > > > > > > > > > > > > > configure something else
> > > > > > > > > > later,
> > > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > > don't
> > > > > > > > > > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > > > > > > pile them on `windowedBy` 
> > > > > > > > > > > > > > > > > > > > > > > > > > but can add separate
> > > > APIs.
> > > > > > > > > > > > > > > > > > > > > > > > > > 2. I added it to `Stores` 
> > > > > > > > > > > > > > > > > > > > > > > > > > mainly to conform to
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > 
> > > > > 
> > > > 
> > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231
> > > > > > > > > > > > > > > > > > > > > > > > > .
> > > > > > > > > > > > > > > > > > > > > > > > > > But We can also create an 
> > > > > > > > > > > > > > > > > > > > > > > > > > internal API to do that
> > > > > > > > > > without
> > > > > > > > > > > > > > > > > > > modifying
> > > > > > > > > > > > > > > > > > > > > > > > > > `Stores`.
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > Hao
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Mar 14, 2022 at 
> > > > > > > > > > > > > > > > > > > > > > > > > > 7:52 PM Guozhang Wang <
> > > > > > > > > > > > > > > > > > > wangg...@gmail.com>
> > > > > > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > Hello Hao,
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the proposal, 
> > > > > > > > > > > > > > > > > > > > > > > > > > > I have some preference
> > > > > among
> > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > options
> > > > > > > > > > > > > > > > > > > > > > here
> > > > > > > > > > > > > > > > > > > > > > > > > so I
> > > > > > > > > > > > > > > > > > > > > > > > > > > will copy them here:
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > I'm now thinking if it's 
> > > > > > > > > > > > > > > > > > > > > > > > > > > better to not add this
> > > > new
> > > > > > > > > > > config
> > > > > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > > > > each
> > > > > > > > > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > > > Window interfaces, but 
> > > > > > > > > > > > > > > > > > > > > > > > > > > instead add that at the
> > > > > > > > > > > > > > > > > > > > > > > KGroupedStream#windowedBy
> > > > > > > > > > > > > > > > > > > > > > > > > > > function. Also instead of 
> > > > > > > > > > > > > > > > > > > > > > > > > > > adding just a boolean
> > > > > flag,
> > > > > > > > > > > > > > > > maybe
> > > > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > > > > > > > > add a
> > > > > > > > > > > > > > > > > > > > > > > > > > > Configured class like 
> > > > > > > > > > > > > > > > > > > > > > > > > > > Grouped, Suppressed, etc,
> > > > e.g.
> > > > > > > > > > > let's
> > > > > > > > > > > > > > > > > call
> > > > > > > > > > > > > > > > > > > > > it a
> > > > > > > > > > > > > > > > > > > > > > > > > > > Emitted which for now 
> > > > > > > > > > > > > > > > > > > > > > > > > > > would just have a single
> > > > > > > > > > construct
> > > > > > > > > > > > > > > > as
> > > > > > > > > > > > > > > > > > > > > > > > > > > Emitted.atWindowClose 
> > > > > > > > > > > > > > > > > > > > > > > > > > > whose semantics is the 
> > > > > > > > > > > > > > > > > > > > > > > > > > > same
> > > > as
> > > > > > > > > > > > > > > > > emitFinal
> > > > > > > > > > > > > > > > > > > ==
> > > > > > > > > > > > > > > > > > > > > > > true.
> > > > > > > > > > > > > > > > > > > > > > > > > I
> > > > > > > > > > > > > > > > > > > > > > > > > > > think the benefits are:
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > 1) you do not need to 
> > > > > > > > > > > > > > > > > > > > > > > > > > > modify multiple Window
> > > > > classes,
> > > > > > > > > > but
> > > > > > > > > > > > > > > > > just
> > > > > > > > > > > > > > > > > > > > > > > overload
> > > > > > > > > > > > > > > > > > > > > > > > > one
> > > > > > > > > > > > > > > > > > > > > > > > > > > windowedBy function with 
> > > > > > > > > > > > > > > > > > > > > > > > > > > a second param. This is
> > > > > less
> > > > > > > > > > of
> > > > > > > > > > > a
> > > > > > > > > > > > > > > > > > > scope
> > > > > > > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > > > > > > now,
> > > > > > > > > > > > > > > > > > > > > > > > > > > and also more extensible 
> > > > > > > > > > > > > > > > > > > > > > > > > > > for any future changes.
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > 2) With a config 
> > > > > > > > > > > > > > > > > > > > > > > > > > > interface, we maintain its
> > > > > > > > > > extensibility
> > > > > > > > > > > > > > > > as
> > > > > > > > > > > > > > > > > > > well
> > > > > > > > > > > > > > > > > > > > > as
> > > > > > > > > > > > > > > > > > > > > > > > > being
> > > > > > > > > > > > > > > > > > > > > > > > > > > able to reuse this 
> > > > > > > > > > > > > > > > > > > > > > > > > > > Emitted interface for 
> > > > > > > > > > > > > > > > > > > > > > > > > > > other
> > > > > > > > > > operators
> > > > > > > > > > > > > > > > if
> > > > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > > > > > wanted
> > > > > > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > > > > > > > expand to.
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > ----------------------------
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > So in general I'm leaning 
> > > > > > > > > > > > > > > > > > > > > > > > > > > towards option 2). For
> > > > > that,
> > > > > > > > > > > > > > > > some
> > > > > > > > > > > > > > > > > > > more
> > > > > > > > > > > > > > > > > > > > > > > > > detailed
> > > > > > > > > > > > > > > > > > > > > > > > > > > comments:
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > 1) If we want to reuse 
> > > > > > > > > > > > > > > > > > > > > > > > > > > that config object for
> > > > other
> > > > > > > > > > > > > > > > > non-window
> > > > > > > > > > > > > > > > > > > > > > > stateful
> > > > > > > > > > > > > > > > > > > > > > > > > > > operations, I think 
> > > > > > > > > > > > > > > > > > > > > > > > > > > naming it as `EmitConfig` 
> > > > > > > > > > > > > > > > > > > > > > > > > > > is
> > > > > > > > > > probably
> > > > > > > > > > > > > > > > > > > better
> > > > > > > > > > > > > > > > > > > > > > than
> > > > > > > > > > > > > > > > > > > > > > > > > > > `WindowConfig`.
> > > > > > > > > > > > > > > > > > > > > > > > > > > 2) I saw your PR (
> > > > > > > > > > > > > > > > > https://github.com/apache/kafka/pull/11892)
> > > > > > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > > you
> > > > > > > > > > > > > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > > > > > > > > > > > > also proposing to add new 
> > > > > > > > > > > > > > > > > > > > > > > > > > > stores into the public
> > > > > > > > > > factory
> > > > > > > > > > > > > > > > > > > Stores,
> > > > > > > > > > > > > > > > > > > > > but
> > > > > > > > > > > > > > > > > > > > > > > > > it's
> > > > > > > > > > > > > > > > > > > > > > > > > > > not included in the KIP. 
> > > > > > > > > > > > > > > > > > > > > > > > > > > Is that intentional?
> > > > > > > > > > Personally
> > > > > > > > > > > I
> > > > > > > > > > > > > > > > > > > think
> > > > > > > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > > > > > > although we may 
> > > > > > > > > > > > > > > > > > > > > > > > > > > eventually want to add a 
> > > > > > > > > > > > > > > > > > > > > > > > > > > new
> > store
> > > > > type
> > > > > > > > > > > to
> > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > public
> > > > > > > > > > > > > > > > > > > > > > > > > APIs,
> > > > > > > > > > > > > > > > > > > > > > > > > > > for this KIP maybe we do 
> > > > > > > > > > > > > > > > > > > > > > > > > > > not have to add them but
> > > > > can
> > > > > > > > > > > > > > > > delay
> > > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > > > later
> > > > > > > > > > > > > > > > > > > > > > > > > after
> > > > > > > > > > > > > > > > > > > > > > > > > > > we've learned the best 
> > > > > > > > > > > > > > > > > > > > > > > > > > > way to layout. LMK what do
> > > > > you
> > > > > > > > > > > > > > > > think?
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Mar 11, 2022 at 
> > > > > > > > > > > > > > > > > > > > > > > > > > > 2:13 PM Hao Li
> > > > > > > > > > > > > > > > > > > <h...@confluent.io.invalid>
> > > > > > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Dev team,
> > > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > I'd like to start a 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > discussion thread on 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > Kafka
> > > > > Streams
> > > > > > > > > > > > > > > > > > > KIP-825:
> > > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > 
> > > > > 
> > > > 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced
> > > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > This KIP is aimed to 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > add new APIs to support
> > > > > > > > > > outputting
> > > > > > > > > > > > > > > > > final
> > > > > > > > > > > > > > > > > > > > > > > > > aggregated
> > > > > > > > > > > > > > > > > > > > > > > > > > > > results for windowed 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > aggregations. I listed
> > > > several
> > > > > > > > > > > > > > > > options
> > > > > > > > > > > > > > > > > > > there
> > > > > > > > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > > > > > > > I'm
> > > > > > > > > > > > > > > > > > > > > > > > > > > > looking forward to your 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > feedback.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > > > > > > > > Hao
> > > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > --
> > > > > > > > > > > > > > > > > > > > > > > > > > > -- Guozhang
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > --
> > > > > > > > > > > > > > > > > > > > > > -- Guozhang
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > --
> > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > Hao
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > --
> > > > > > > > > > > > > > > > > > > > -- Guozhang
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > --
> > > > > > > > > > > > > > > > > > -- Guozhang
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > --
> > > > > > > > > > > > > > > > -- Guozhang
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > --
> > > > > > > > > > > -- Guozhang
> > > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > --
> > > > > > > > > > Thanks,
> > > > > > > > > > Hao
> > > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > 
> > > > > > > 
> > > > > > > 
> > > > > > 
> > > > > 
> > > > 
> > > > 
> > > > --
> > > > Thanks,
> > > > Hao
> > > > 
> > > 
> > > 
> > 
> 
> 

Reply via email to