`windowedStream.onWindowClose()` was the original option 1
(`windowedStream.emitFinal()`) but was rejected
because we could add more emit types and this will result in adding more
functions. I still prefer the
"windowedStream.someFunc(Controlled.onWindowClose)"
model since it's flexible and clear that it's configuring the emit policy.
Let me summarize all the naming options we have and compare:

*API function name:*

*1. `windowedStream.trigger()`*
    Pros:
       i. Simple
       ii. Similar to Flink's trigger function (is this a con actually?)
    Cons:
       i. `trigger()` can be confused with Flink trigger (raised by John)
       ii. `trigger()` feels like an operation instead of a configure
function (raised by Bruno)?

*2. `windowedStream.emitTrigger()`*
     Pros:
       i. Avoid confusion from Flink's trigger API
       ii. `emitTrigger` feels like configuring the trigger because
"trigger" here is a noun instead of verbose in `trigger()`
     Cons:
     i: Verbose?
    ii: Not consistent with `Suppressed.untilWindowClose`?


*Config class/object name:*

1. *`Emitted.onWindowClose()`* and *`Emitted.onEachUpdate()`*
     Cons:
     i. Doesn't go along with `trigger` (raised by Bruno)

2. *`Triggered.onWindowClose()`* and *`Triggered.onEachUpdate()`*

3. *`EmitTrigger.onWindowClose()`* and *`EmitTrigger.onEachUpdate()`*

4. *`(Emit|Trigger)(Config|Policy).onWindowClose()`* and
*`(Emit|Trigger)(Config|Policy).onEachUpdate()`*
     This is a combination of different names like: `EmitConfig`,
`EmitPolicy`, `TriggerConfig` and `TriggerPolicy`...


If we are settled with option 1), we can add new options to these names and
comment on their Pros and Cons.

Hao





On Tue, Mar 22, 2022 at 10:48 AM Guozhang Wang <wangg...@gmail.com> wrote:

> I see what you mean now, and I think it's a fair point that composing
> `trigger` and `emitted` seems awkward.
>
> Re: data process operator v.s. control operator, I shared your concern as
> well, and here's my train of thoughts: Having only data process operators
> was my primary motivation for how we add the suppress operator --- it
> indeed "suppresses" data. But as a hind-sight it's disadvantage is that,
> for example in Suppressed.onWindowClose() should be only related to an
> earlier windowedBy operator which is possibly very far from it in the
> resulting DSL code. It's not only a bit awkward for users to write such
> code, but also in such cases the DSL builder needs to maintain and
> propagate this information to the suppress operator further down. So we are
> now thinking about "putting the control object as close as to where the
> related processor really happens". And in that world my original
> preference was somewhere in option 2), i.e. just put the control as a param
> of the related "windowedBy" operator, but the trade-off is we keep adding
> overloaded functions to these operators. So after some back and forth
> thoughts I'm learning towards relaxing our principles to only have
> processing operators but no flow-control operators. That being said, if you
> have any ideas that we can have both world's benefits I'm all ears.
>
> Re: using a direct function like "windowedStream.onWindowClose()" v.s.
> "windowedStream.someFunc(Controlled.onWindowClose)", again my motivation
> for the latter is for extensibility without adding more functions in the
> future. If people feel this is not worthy we can do the first option as
> well. If we just feel the `trigger` and `emitted` does not feel composible
> together, maybe we can consider something like
> `windowedStream.trigger(Triggered.onWindowClose())"?
>
> Re: windowedBy v.s. windowBy, yeah I do not really have a good reason why
> we should use past term as well :P But if it's not bothering people much
> I'd say we just keep it than deprecate/rename new APIs.
>
>
> On Tue, Mar 22, 2022 at 9:42 AM Bruno Cadonna <cado...@apache.org> wrote:
>
> > Hi Guozhang,
> >
> > There is no semantic difference. It is a cosmetic difference.
> > Conceptually, I relate `Emitted` with the aggregation and not with
> > `trigger()` in the API flow, because the aggregation emits the result
> > not `trigger()`. Therefore, I proposed to not use `Emitted` as the name
> > of the config object passed to `trigger()`.
> >
> >
> > Best,
> > Bruno
> >
> > On 22.03.22 17:24, Guozhang Wang wrote:
> > > Hi Bruno,
> > >
> > > Could you elaborate a bit more here, what's the semantic difference
> > between
> > > "the aggregation is triggered on window close and all aggregation
> results
> > > are emitted." for trigger(TriggerParameters.onWindowClose()), and "the
> > > aggregation is configured to only emit final results." for
> > > trigger(Emitted.onWindowClose())?
> > >
> > > On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna <cado...@apache.org>
> > wrote:
> > >
> > >> Hi Hao,
> > >>
> > >> Thank you for the KIP!
> > >>
> > >> Regarding option 1, I would not use `Emitted.onWindowClose()` since
> that
> > >> does not seem compatible with the proposed flow. Conceptually, now the
> > >> flow states that the aggregation is triggered on window close and all
> > >> aggregation results are emitted. `Emitted` suggests that the
> aggregation
> > >> is configured to only emit final results.
> > >>
> > >> Thus, I propose the following:
> > >>
> > >> stream
> > >>       .groupBy(..)
> > >>       .windowedBy(..)
> > >>       .trigger(TriggerParameters.onWindowClose())
> > >>       .aggregate(..) //result in a KTable<Windowed<..>>
> > >>       .mapValues(..)
> > >>
> > >> An alternative to `trigger()` could be `schedule()`, but I do not
> really
> > >> like it.
> > >>
> > >> One thing I noticed with option 1 is that all other methods in the
> > >> example above are operations on data. `groupBy()` groups,
> `windowedBy()`
> > >> partitions, `aggregate()` computes the aggregate, `mapValues()` maps
> > >> values, even `suppress()` suppresses intermediate results. But what
> does
> > >> `trigger()` do? `trigger()` seems a config lost among operations.
> > >>
> > >> However, if we do not want to restrict ourselves to only use methods
> > >> when we want to specify operations on data, I have the following
> > proposal:
> > >>
> > >> stream
> > >>       .groupBy(..)
> > >>       .windowedBy(..)
> > >>       .onWindowClose()
> > >>       .aggregate(..) //result in a KTable<Windowed<..>>
> > >>       .mapValues(..)
> > >>
> > >> Best,
> > >> Bruno
> > >>
> > >> P.S.: Why is it `windowedBy()` and not `windowBy()`? All other
> > >> operations also use present tense.
> > >>
> > >> On 22.03.22 06:36, Hao Li wrote:
> > >>> Hi John,
> > >>>
> > >>> Yes. For naming, `trigger` is similar to Flink's trigger, but it has
> a
> > >>> different meaning in our case. `emit` sounds like an action to emit?
> > How
> > >>> about `emitTrigger`? I'm open to suggestions for the naming.
> > >>>
> > >>> For deprecating `Suppressed.untilWindowClose`, I agree with Guozhang
> we
> > >> can
> > >>> deprecate `Suppressed` config as a whole later. Or we can deprecate
> > >>> `Suppressed.untilWindowClose` in later KIP after implementation of
> emit
> > >>> final is done.
> > >>>
> > >>> BTW, isn't
> > >>>
> > >>> stream
> > >>>     .groupBy(..)
> > >>>     .windowBy(..)
> > >>>     .aggregate(..) //result in a KTable<Windowed<..>>
> > >>>     .mapValues(..)
> > >>>     .suppress(Suppressed.untilWindowClose) // since we can trace back
> > to
> > >>> parent node, to find a window definition
> > >>>
> > >>> same as
> > >>>
> > >>> stream
> > >>>     .groupBy(..)
> > >>>     .windowBy(..)
> > >>>     .trigger(Emitted.onWindowClose)
> > >>>     .aggregate(..) //result in a KTable<Windowed<..>>
> > >>>     .mapValues(..)
> > >>> ?
> > >>>
> > >>> Hao
> > >>>
> > >>>
> > >>> On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang <wangg...@gmail.com>
> > >> wrote:
> > >>>
> > >>>> I think the following case is only doable via `suppress`:
> > >>>>
> > >>>> stream
> > >>>>     .groupBy(..)
> > >>>>     .windowBy(..)
> > >>>>     .aggregate(..) //result in a KTable<Windowed<..>>
> > >>>>     .mapValues(..)
> > >>>>     .suppress(Suppressed.untilWindowClose) // since we can trace
> back
> > to
> > >>>> parent node, to find a window definition
> > >>>>
> > >>>>
> > >>>> Guozhang
> > >>>>
> > >>>>
> > >>>> On Mon, Mar 21, 2022 at 6:36 PM John Roesler <vvcep...@apache.org>
> > >> wrote:
> > >>>>
> > >>>>> Thanks, Guozhang!
> > >>>>>
> > >>>>> To clarify, I was asking specifically about deprecating just the
> > method
> > >>>>> ‘untilWindowClose’. I might not be thinking clearly about it,
> though.
> > >>>> What
> > >>>>> does untilWindowClose do that this KIP doesn’t cover?
> > >>>>>
> > >>>>> Thanks,
> > >>>>> John
> > >>>>>
> > >>>>> On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote:
> > >>>>>> Just my 2c: Suppressed is in `suppress` whose application scope is
> > >> much
> > >>>>>> larger and hence more flexible. I.e. it can be used anywhere for a
> > >>>>> `KTable`
> > >>>>>> (but internally we would check whether certain emit policies like
> > >>>>>> `untilWindowClose` is valid or not), whereas `trigger` as for now
> is
> > >>>> only
> > >>>>>> applicable in XXWindowedKStream. So I think it would not be
> > completely
> > >>>>>> replacing Suppressed.untilWindowClose.
> > >>>>>>
> > >>>>>> In the future, personally I'd still want to keep one control
> object
> > >>>> still
> > >>>>>> for all emit policies, and maybe if we have extended Emitted for
> > other
> > >>>>>> emitting policies covered by Suppressed today, we can discuss if
> we
> > >>>> could
> > >>>>>> have `KTable.suppress(Emitted..)` replacing
> > >>>>> `KTable.suppress(Suppressed..)`
> > >>>>>> as a whole, but for this KIP I think it's too early.
> > >>>>>>
> > >>>>>>
> > >>>>>> Guozhang
> > >>>>>>
> > >>>>>>
> > >>>>>> On Mon, Mar 21, 2022 at 6:18 PM John Roesler <vvcep...@apache.org
> >
> > >>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi all,
> > >>>>>>>
> > >>>>>>> Thanks for the Kip, Hao!
> > >>>>>>>
> > >>>>>>> For what it’s worth, I’m also in favor of your latest framing of
> > the
> > >>>>> API,
> > >>>>>>>
> > >>>>>>> I think the name is fine. I assume it’s inspired by Flink? It’s
> not
> > >>>>>>> identical to the concept of a trigger in Flink, which specifies
> > when
> > >>>> to
> > >>>>>>> evaluate the window, which might be confusing to some people who
> > have
> > >>>>> deep
> > >>>>>>> experience with Flink. Then again, it seems close enough that it
> > >>>> should
> > >>>>> be
> > >>>>>>> clear to casual Flink users. For people with no other stream
> > >>>> processing
> > >>>>>>> experience, it might seem a bit esoteric compared to something
> > >>>>>>> self-documenting like ‘emit()’, but the docs should  make it
> clear.
> > >>>>>>>
> > >>>>>>> One small question: it seems like this proposal is identical to
> > >>>>>>> Suppressed.untilWindowClose, and the KIP states that this API is
> > >>>>> superior.
> > >>>>>>> In that case, should we deprecate Suppressed.untilWindowClose?
> > >>>>>>>
> > >>>>>>> Thanks,
> > >>>>>>> John
> > >>>>>>>
> > >>>>>>> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote:
> > >>>>>>>> Hi Hao,
> > >>>>>>>>
> > >>>>>>>> For 2), I think it's a good idea in general to use a separate
> > >>>>> function on
> > >>>>>>>> the Time/SessionWindowedKStream itself, to achieve the same
> effect
> > >>>>> that,
> > >>>>>>>> for now, the emitting control is only for windowed aggregations
> as
> > >>>> in
> > >>>>>>> this
> > >>>>>>>> KIP, than overloading existing functions. We can discuss further
> > >>>> about
> > >>>>>>> the
> > >>>>>>>> actual function names, whether others like the name `trigger` or
> > >>>> not.
> > >>>>> As
> > >>>>>>>> for myself I feel `trigger` is a good one but I'd like to see if
> > >>>>> others
> > >>>>>>>> have opinions as well.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Guozhang
> > >>>>>>>>
> > >>>>>>>> On Mon, Mar 21, 2022 at 5:18 PM Hao Li <h...@confluent.io.invalid
> >
> > >>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi Guozhang,
> > >>>>>>>>>
> > >>>>>>>>> Thanks for the feedback.
> > >>>>>>>>>
> > >>>>>>>>> 1. I agree to have an `Emitted` control class and two static
> > >>>>>>> constructors
> > >>>>>>>>> named `onWindowClose` and `onEachUpdate`.
> > >>>>>>>>>
> > >>>>>>>>> 2. For the API function changes, I'm thinking of adding a new
> > >>>>> function
> > >>>>>>>>> called `trigger` to `TimeWindowedKStream` and
> > >>>>> `SessionWindowedKStream`.
> > >>>>>>> It
> > >>>>>>>>> takes `Emitted` config and returns the same stream. Example:
> > >>>>>>>>>
> > >>>>>>>>> stream
> > >>>>>>>>>     .groupBy(...)
> > >>>>>>>>>     .windowedBy(...)
> > >>>>>>>>>     .trigger(Emitted.onWindowClose). // N
> > >>>>>>>>>     .count()
> > >>>>>>>>>
> > >>>>>>>>> The benefits are:
> > >>>>>>>>>     1. It's simple and avoids creating overloading of existing
> > >>>>> functions
> > >>>>>>> like
> > >>>>>>>>> `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to
> add
> > >>>> it
> > >>>>> to
> > >>>>>>>>> `aggregate` functions, we need to add it to all existing
> `count`,
> > >>>>>>>>> `aggregate` overloading functions which is a lot.
> > >>>>>>>>>     2. It operates directly on windowed kstream and tells how
> its
> > >>>>> output
> > >>>>>>>>> should be configured, if later we need to add this other type
> of
> > >>>>>>> streams,
> > >>>>>>>>> we can reuse same `trigger` API whereas other type of
> > >>>> streams/tables
> > >>>>> may
> > >>>>>>>>> not have `aggregate`, `windowedby` api to make it consistent.
> > >>>>>>>>>
> > >>>>>>>>> Hao
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang <
> > wangg...@gmail.com>
> > >>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Hello Hao,
> > >>>>>>>>>>
> > >>>>>>>>>> I'm preferring option 2 over the other options mainly because
> > the
> > >>>>>>> added
> > >>>>>>>>>> config object could potentially be used in other operators as
> > >>>> well
> > >>>>>>> (not
> > >>>>>>>>>> necessarily has to be a windowed operator and hence have to be
> > >>>>>>>>> piggy-backed
> > >>>>>>>>>> on `windowedBy`, and that's also why I suggested not naming it
> > >>>>>>>>>> `WindowConfig` but just `EmitConfig`).
> > >>>>>>>>>>
> > >>>>>>>>>> As for Matthias' question, I think the difference between the
> > >>>>> windowed
> > >>>>>>>>>> aggregate operator and the stream-stream join operator is
> that,
> > >>>> for
> > >>>>>>> the
> > >>>>>>>>>> latter we think emit-final should be the only right emitting
> > >>>> policy
> > >>>>>>> and
> > >>>>>>>>>> hence we should not let users to configure it. If users
> > configure
> > >>>>> it
> > >>>>>>> to
> > >>>>>>>>>> e.g. emit eager they may get the old spurious emitting
> behavior
> > >>>>> which
> > >>>>>>> is
> > >>>>>>>>>> violating the semantics.
> > >>>>>>>>>>
> > >>>>>>>>>> For option 2) itself, I have a few more thoughts:
> > >>>>>>>>>>
> > >>>>>>>>>> 1. Thinking about Matthias' suggestions, I'm also leaning a
> bit
> > >>>>>>>>>> towards adding the new param in the overloaded `aggregate`,
> than
> > >>>>> the
> > >>>>>>>>>> overloaded `windowBy` function. The reason is that the
> emitting
> > >>>>> logic
> > >>>>>>>>> could
> > >>>>>>>>>> be either window based or non-window based, in the long run.
> > >>>> Though
> > >>>>>>> for
> > >>>>>>>>>> this KIP we could just add it in
> > >>>> `XXXWindowedKStream.aggregate()`,
> > >>>>> we
> > >>>>>>> may
> > >>>>>>>>>> want to extend to other non-windowed operators in the future.
> > >>>>>>>>>> 2. To be consistent with other control class names, I feel
> maybe
> > >>>> we
> > >>>>>>> can
> > >>>>>>>>>> name it "Emitted", not "EmitConfig".
> > >>>>>>>>>> 3. Following the first comment, I think we can have the static
> > >>>>>>>>> constructor
> > >>>>>>>>>> names as "onWindowClose" and "onEachUpdate".
> > >>>>>>>>>>
> > >>>>>>>>>> The resulted code pattern would be like this:
> > >>>>>>>>>>
> > >>>>>>>>>>      stream
> > >>>>>>>>>>        .groupBy(..)
> > >>>>>>>>>>        .windowBy(TimeWindow..)
> > >>>>>>>>>>        .count(Emitted.onWindowClose)
> > >>>>>>>>>>
> > >>>>>>>>>> WDYT?
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax <
> > >>>> mj...@apache.org
> > >>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>>>> `allowedLateness` may not be a good name. What I have in
> > >>>> mind
> > >>>>> is
> > >>>>>>> to
> > >>>>>>>>>> use
> > >>>>>>>>>>>>> this to control how frequently we try to emit final
> results.
> > >>>>>>> Maybe
> > >>>>>>>>>> it's
> > >>>>>>>>>>>>> more flexible to be used as config in properties as we
> don't
> > >>>>>>> need to
> > >>>>>>>>>>>>> recompile DSL to change it.
> > >>>>>>>>>>>
> > >>>>>>>>>>> I see; making it a config seems better. Frankly, I am not
> even
> > >>>>> sure
> > >>>>>>> if
> > >>>>>>>>>>> we need a config at all or if we can just hard code it? For
> the
> > >>>>>>>>>>> stream-stream join left/outer join fix, there is only an
> > >>>> internal
> > >>>>>>>>> config
> > >>>>>>>>>>> but no public config either.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Option 1: Your proposal is?
> > >>>>>>>>>>>
> > >>>>>>>>>>>      stream
> > >>>>>>>>>>>        .groupByKey()
> > >>>>>>>>>>>        .windowBy(TimeWindow.ofSizeNoGrace(...))
> > >>>>>>>>>>>        .configure(EmitConfig.emitFinal()
> > >>>>>>>>>>>        .count()
> > >>>>>>>>>>>
> > >>>>>>>>>>> Does not change my argument that it seems to be misplace from
> > >>>> an
> > >>>>> API
> > >>>>>>>>>>> flow POV.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Option 1 seems to be the least desirable to me.
> > >>>>>>>>>>>
> > >>>>>>>>>>> For option 2 and 3, and not sure which one I like better.
> Might
> > >>>>> be
> > >>>>>>> good
> > >>>>>>>>>>> if other could chime in, too. I think I slightly prefer
> option
> > >>>> 2
> > >>>>>>> over
> > >>>>>>>>>>> option 3.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> -Matthias
> > >>>>>>>>>>>
> > >>>>>>>>>>> On 3/15/22 5:33 PM, Hao Li wrote:
> > >>>>>>>>>>>> Thanks for the feedback Matthias.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> `allowedLateness` may not be a good name. What I have in
> mind
> > >>>>> is
> > >>>>>>> to
> > >>>>>>>>> use
> > >>>>>>>>>>>> this to control how frequently we try to emit final results.
> > >>>>> Maybe
> > >>>>>>>>> it's
> > >>>>>>>>>>>> more flexible to be used as config in properties as we don't
> > >>>>> need
> > >>>>>>> to
> > >>>>>>>>>>>> recompile DSL to change it.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> For option 1, I intend to use `emitFinal` to configure how
> > >>>>>>>>>>>> `TimeWindowedKStream` should be outputted to `KTable` after
> > >>>>>>>>>> aggregation.
> > >>>>>>>>>>>> But `emitFinal` is not an action to the
> `TimeWindowedKStream`
> > >>>>>>>>>> interface.
> > >>>>>>>>>>>> Maybe adding `configure(EmitConfig config)` makes more
> sense?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> For option 2, config can be created using
> > >>>>>>> `WindowConfig.emitFinal()`
> > >>>>>>>>> or
> > >>>>>>>>>>>> `EmitConfig.emitFinal`
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> For option 3, it will be something like `TimeWindows(...,
> > >>>>>>> EmitConfig
> > >>>>>>>>>>>> emitConfig)`.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> For putting `EmitConfig` in aggregation operator, I think it
> > >>>>>>> doesn't
> > >>>>>>>>>>>> control how we do aggregation but how we output to `KTable`.
> > >>>>>>> That's
> > >>>>>>>>>> why I
> > >>>>>>>>>>>> feel option 1 makes more sense as it applies to
> > >>>>>>>>> `TimeWindowedKStream`.
> > >>>>>>>>>>> But
> > >>>>>>>>>>>> I'm also OK with option 2.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Hao
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax <
> > >>>>> mj...@apache.org
> > >>>>>>>>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Thanks for the KIP.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> A general comment: it seem that we won't need any new
> > >>>>>>>>>> `allowedLateness`
> > >>>>>>>>>>>>> parameter because the grace-period is defined on the window
> > >>>>>>> itself
> > >>>>>>>>>>> already?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> (On the other hand, if I think about it once more, maybe
> the
> > >>>>>>>>>>>>> `grace-period` is actually not a property of the window but
> > >>>> a
> > >>>>>>>>> property
> > >>>>>>>>>>>>> of the aggregation operator? _thinking_)
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>     From an API flow point of view, option 1 might not be
> > >>>>> desirable
> > >>>>>>>>>> IMHO:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>       stream
> > >>>>>>>>>>>>>         .groupByKey()
> > >>>>>>>>>>>>>         .windowBy(TimeWindow.ofSizeNoGrace(...))
> > >>>>>>>>>>>>>         .emitFinal()
> > >>>>>>>>>>>>>         .count()
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> The call to `emitFinal(0` seems not to be on the right
> place
> > >>>>> for
> > >>>>>>>>> this
> > >>>>>>>>>>> case?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Option 2 might work (I think we need to discuss a few
> > >>>> details
> > >>>>> of
> > >>>>>>> the
> > >>>>>>>>>> API
> > >>>>>>>>>>>>> though):
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>       stream
> > >>>>>>>>>>>>>         .groupByKey()
> > >>>>>>>>>>>>>         .windowBy(
> > >>>>>>>>>>>>>           TimeWindow.ofSizeNoGrace(...),
> > >>>>>>>>>>>>>           EmitConfig.emitFinal() -- just made this up; it's
> > >>>> not
> > >>>>> in
> > >>>>>>> the
> > >>>>>>>>>> KIP
> > >>>>>>>>>>>>>         )
> > >>>>>>>>>>>>>         .count()
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I made up the `WindowConfig.emitFinal()` call -- from the
> > >>>> KIP
> > >>>>>>> it's
> > >>>>>>>>>>>>> unclear what API you have in mind? `EmitFinalConfig` has
> not
> > >>>>>>> public
> > >>>>>>>>>>>>> constructor not any builder method.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> For option 3, I am not sure what you really have in mind.
> > >>>> Can
> > >>>>> you
> > >>>>>>>>>> given
> > >>>>>>>>>>>>> a concrete example (similar to above) how users would write
> > >>>>> their
> > >>>>>>>>>> code?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Did you consider to actually pass in the `EmitConfig` into
> > >>>> the
> > >>>>>>>>>>>>> aggregation operator? In the end, it seems not to be
> > >>>> property
> > >>>>> of
> > >>>>>>> the
> > >>>>>>>>>>>>> window definition or windowing step, but a property of the
> > >>>>> actual
> > >>>>>>>>>>> operator:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>       stream
> > >>>>>>>>>>>>>         .groupByKey()
> > >>>>>>>>>>>>>         .windowBy(
> > >>>>>>>>>>>>>           TimeWindow.ofSizeNoGrace(...)
> > >>>>>>>>>>>>>         )
> > >>>>>>>>>>>>>         .count(EmitConfig.emitFinal())
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> The API surface area that need to be updated might be
> larger
> > >>>>> for
> > >>>>>>>>> this
> > >>>>>>>>>>>>> case though...
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On 3/14/22 9:21 PM, Hao Li wrote:
> > >>>>>>>>>>>>>> Thanks Guozhang!
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 1. I agree `EmitConfig` is better than `WindowConfig` and
> > >>>>>>> option 2
> > >>>>>>>>>>>>> modifies
> > >>>>>>>>>>>>>> less places. What do you think of option 1 which doesn't
> > >>>>> change
> > >>>>>>> the
> > >>>>>>>>>>>>> current
> > >>>>>>>>>>>>>> `windowedBy` api but configures `EmitConfig` separately.
> > >>>> The
> > >>>>>>>>> benefit
> > >>>>>>>>>> of
> > >>>>>>>>>>>>>> option 1 is if we need to configure something else later,
> > >>>> we
> > >>>>>>> don't
> > >>>>>>>>>> need
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>> pile them on `windowedBy` but can add separate APIs.
> > >>>>>>>>>>>>>> 2. I added it to `Stores` mainly to conform to
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>>
> > >>
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231
> > >>>>>>>>>>>>> .
> > >>>>>>>>>>>>>> But We can also create an internal API to do that without
> > >>>>>>> modifying
> > >>>>>>>>>>>>>> `Stores`.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Hao
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang <
> > >>>>>>> wangg...@gmail.com>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Hello Hao,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Thanks for the proposal, I have some preference among the
> > >>>>>>> options
> > >>>>>>>>>> here
> > >>>>>>>>>>>>> so I
> > >>>>>>>>>>>>>>> will copy them here:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I'm now thinking if it's better to not add this new
> config
> > >>>>> on
> > >>>>>>> each
> > >>>>>>>>>> of
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> Window interfaces, but instead add that at the
> > >>>>>>>>>>> KGroupedStream#windowedBy
> > >>>>>>>>>>>>>>> function. Also instead of adding just a boolean flag,
> > >>>> maybe
> > >>>>> we
> > >>>>>>> can
> > >>>>>>>>>>> add a
> > >>>>>>>>>>>>>>> Configured class like Grouped, Suppressed, etc, e.g.
> let's
> > >>>>> call
> > >>>>>>>>> it a
> > >>>>>>>>>>>>>>> Emitted which for now would just have a single construct
> > >>>> as
> > >>>>>>>>>>>>>>> Emitted.atWindowClose whose semantics is the same as
> > >>>>> emitFinal
> > >>>>>>> ==
> > >>>>>>>>>>> true.
> > >>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>> think the benefits are:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 1) you do not need to modify multiple Window classes, but
> > >>>>> just
> > >>>>>>>>>>> overload
> > >>>>>>>>>>>>> one
> > >>>>>>>>>>>>>>> windowedBy function with a second param. This is less of
> a
> > >>>>>>> scope
> > >>>>>>>>> for
> > >>>>>>>>>>>>> now,
> > >>>>>>>>>>>>>>> and also more extensible for any future changes.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 2) With a config interface, we maintain its extensibility
> > >>>> as
> > >>>>>>> well
> > >>>>>>>>> as
> > >>>>>>>>>>>>> being
> > >>>>>>>>>>>>>>> able to reuse this Emitted interface for other operators
> > >>>> if
> > >>>>> we
> > >>>>>>>>>> wanted
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>>>> expand to.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> ----------------------------
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> So in general I'm leaning towards option 2). For that,
> > >>>> some
> > >>>>>>> more
> > >>>>>>>>>>>>> detailed
> > >>>>>>>>>>>>>>> comments:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 1) If we want to reuse that config object for other
> > >>>>> non-window
> > >>>>>>>>>>> stateful
> > >>>>>>>>>>>>>>> operations, I think naming it as `EmitConfig` is probably
> > >>>>>>> better
> > >>>>>>>>>> than
> > >>>>>>>>>>>>>>> `WindowConfig`.
> > >>>>>>>>>>>>>>> 2) I saw your PR (
> > >>>>> https://github.com/apache/kafka/pull/11892)
> > >>>>>>>>> that
> > >>>>>>>>>>> you
> > >>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>> also proposing to add new stores into the public factory
> > >>>>>>> Stores,
> > >>>>>>>>> but
> > >>>>>>>>>>>>> it's
> > >>>>>>>>>>>>>>> not included in the KIP. Is that intentional? Personally
> I
> > >>>>>>> think
> > >>>>>>>>>> that
> > >>>>>>>>>>>>>>> although we may eventually want to add a new store type
> to
> > >>>>> the
> > >>>>>>>>>> public
> > >>>>>>>>>>>>> APIs,
> > >>>>>>>>>>>>>>> for this KIP maybe we do not have to add them but can
> > >>>> delay
> > >>>>> for
> > >>>>>>>>>> later
> > >>>>>>>>>>>>> after
> > >>>>>>>>>>>>>>> we've learned the best way to layout. LMK what do you
> > >>>> think?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Guozhang
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Fri, Mar 11, 2022 at 2:13 PM Hao Li
> > >>>>>>> <h...@confluent.io.invalid>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hi Dev team,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I'd like to start a discussion thread on Kafka Streams
> > >>>>>>> KIP-825:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> This KIP is aimed to add new APIs to support outputting
> > >>>>> final
> > >>>>>>>>>>>>> aggregated
> > >>>>>>>>>>>>>>>> results for windowed aggregations. I listed several
> > >>>> options
> > >>>>>>> there
> > >>>>>>>>>> and
> > >>>>>>>>>>>>> I'm
> > >>>>>>>>>>>>>>>> looking forward to your feedback.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>> Hao
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>> -- Guozhang
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> --
> > >>>>>>>>>> -- Guozhang
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> --
> > >>>>>>>>> Thanks,
> > >>>>>>>>> Hao
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> --
> > >>>>>>>> -- Guozhang
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> --
> > >>>>>> -- Guozhang
> > >>>>>
> > >>>>
> > >>>>
> > >>>> --
> > >>>> -- Guozhang
> > >>>>
> > >>>
> > >>>
> > >>
> > >
> > >
> >
>
>
> --
> -- Guozhang
>


-- 
Thanks,
Hao

Reply via email to