Hi Bruno, Guozhang,

I think `Emitted.onWindowClose` is inspired by `Suppressed.untilWindowClose`.
Given `Suppressed.untilWindowClose` is used inside `suppress()`,
`Suppressed` sounds good. However, `trigger()` is a builder style setting
instead of an action, `Emitted` may be a bit off.

How about using `emitTrigger()` as the function name? Benefits are:
1. not to be confused with Flink's `trigger` as John mentioned.
2. make it sound like a setting instead of an action. `trigger()` sounds
like we are triggering some action.

For the config type name. How about `EmitConfig` or `EmitTriggerConfig` to
make it clear it's a config for emit trigger.

Hao

On Tue, Mar 22, 2022 at 9:43 AM Bruno Cadonna <cado...@apache.org> wrote:

> Hi Guozhang,
>
> There is no semantic difference. It is a cosmetic difference.
> Conceptually, I relate `Emitted` with the aggregation and not with
> `trigger()` in the API flow, because the aggregation emits the result
> not `trigger()`. Therefore, I proposed to not use `Emitted` as the name
> of the config object passed to `trigger()`.
>
>
> Best,
> Bruno
>
> On 22.03.22 17:24, Guozhang Wang wrote:
> > Hi Bruno,
> >
> > Could you elaborate a bit more here, what's the semantic difference
> between
> > "the aggregation is triggered on window close and all aggregation results
> > are emitted." for trigger(TriggerParameters.onWindowClose()), and "the
> > aggregation is configured to only emit final results." for
> > trigger(Emitted.onWindowClose())?
> >
> > On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna <cado...@apache.org>
> wrote:
> >
> >> Hi Hao,
> >>
> >> Thank you for the KIP!
> >>
> >> Regarding option 1, I would not use `Emitted.onWindowClose()` since that
> >> does not seem compatible with the proposed flow. Conceptually, now the
> >> flow states that the aggregation is triggered on window close and all
> >> aggregation results are emitted. `Emitted` suggests that the aggregation
> >> is configured to only emit final results.
> >>
> >> Thus, I propose the following:
> >>
> >> stream
> >>       .groupBy(..)
> >>       .windowedBy(..)
> >>       .trigger(TriggerParameters.onWindowClose())
> >>       .aggregate(..) //result in a KTable<Windowed<..>>
> >>       .mapValues(..)
> >>
> >> An alternative to `trigger()` could be `schedule()`, but I do not really
> >> like it.
> >>
> >> One thing I noticed with option 1 is that all other methods in the
> >> example above are operations on data. `groupBy()` groups, `windowedBy()`
> >> partitions, `aggregate()` computes the aggregate, `mapValues()` maps
> >> values, even `suppress()` suppresses intermediate results. But what does
> >> `trigger()` do? `trigger()` seems a config lost among operations.
> >>
> >> However, if we do not want to restrict ourselves to only use methods
> >> when we want to specify operations on data, I have the following
> proposal:
> >>
> >> stream
> >>       .groupBy(..)
> >>       .windowedBy(..)
> >>       .onWindowClose()
> >>       .aggregate(..) //result in a KTable<Windowed<..>>
> >>       .mapValues(..)
> >>
> >> Best,
> >> Bruno
> >>
> >> P.S.: Why is it `windowedBy()` and not `windowBy()`? All other
> >> operations also use present tense.
> >>
> >> On 22.03.22 06:36, Hao Li wrote:
> >>> Hi John,
> >>>
> >>> Yes. For naming, `trigger` is similar to Flink's trigger, but it has a
> >>> different meaning in our case. `emit` sounds like an action to emit?
> How
> >>> about `emitTrigger`? I'm open to suggestions for the naming.
> >>>
> >>> For deprecating `Suppressed.untilWindowClose`, I agree with Guozhang we
> >> can
> >>> deprecate `Suppressed` config as a whole later. Or we can deprecate
> >>> `Suppressed.untilWindowClose` in later KIP after implementation of emit
> >>> final is done.
> >>>
> >>> BTW, isn't
> >>>
> >>> stream
> >>>     .groupBy(..)
> >>>     .windowBy(..)
> >>>     .aggregate(..) //result in a KTable<Windowed<..>>
> >>>     .mapValues(..)
> >>>     .suppress(Suppressed.untilWindowClose) // since we can trace back
> to
> >>> parent node, to find a window definition
> >>>
> >>> same as
> >>>
> >>> stream
> >>>     .groupBy(..)
> >>>     .windowBy(..)
> >>>     .trigger(Emitted.onWindowClose)
> >>>     .aggregate(..) //result in a KTable<Windowed<..>>
> >>>     .mapValues(..)
> >>> ?
> >>>
> >>> Hao
> >>>
> >>>
> >>> On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >>>
> >>>> I think the following case is only doable via `suppress`:
> >>>>
> >>>> stream
> >>>>     .groupBy(..)
> >>>>     .windowBy(..)
> >>>>     .aggregate(..) //result in a KTable<Windowed<..>>
> >>>>     .mapValues(..)
> >>>>     .suppress(Suppressed.untilWindowClose) // since we can trace back
> to
> >>>> parent node, to find a window definition
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>> On Mon, Mar 21, 2022 at 6:36 PM John Roesler <vvcep...@apache.org>
> >> wrote:
> >>>>
> >>>>> Thanks, Guozhang!
> >>>>>
> >>>>> To clarify, I was asking specifically about deprecating just the
> method
> >>>>> ‘untilWindowClose’. I might not be thinking clearly about it, though.
> >>>> What
> >>>>> does untilWindowClose do that this KIP doesn’t cover?
> >>>>>
> >>>>> Thanks,
> >>>>> John
> >>>>>
> >>>>> On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote:
> >>>>>> Just my 2c: Suppressed is in `suppress` whose application scope is
> >> much
> >>>>>> larger and hence more flexible. I.e. it can be used anywhere for a
> >>>>> `KTable`
> >>>>>> (but internally we would check whether certain emit policies like
> >>>>>> `untilWindowClose` is valid or not), whereas `trigger` as for now is
> >>>> only
> >>>>>> applicable in XXWindowedKStream. So I think it would not be
> completely
> >>>>>> replacing Suppressed.untilWindowClose.
> >>>>>>
> >>>>>> In the future, personally I'd still want to keep one control object
> >>>> still
> >>>>>> for all emit policies, and maybe if we have extended Emitted for
> other
> >>>>>> emitting policies covered by Suppressed today, we can discuss if we
> >>>> could
> >>>>>> have `KTable.suppress(Emitted..)` replacing
> >>>>> `KTable.suppress(Suppressed..)`
> >>>>>> as a whole, but for this KIP I think it's too early.
> >>>>>>
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Mar 21, 2022 at 6:18 PM John Roesler <vvcep...@apache.org>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> Thanks for the Kip, Hao!
> >>>>>>>
> >>>>>>> For what it’s worth, I’m also in favor of your latest framing of
> the
> >>>>> API,
> >>>>>>>
> >>>>>>> I think the name is fine. I assume it’s inspired by Flink? It’s not
> >>>>>>> identical to the concept of a trigger in Flink, which specifies
> when
> >>>> to
> >>>>>>> evaluate the window, which might be confusing to some people who
> have
> >>>>> deep
> >>>>>>> experience with Flink. Then again, it seems close enough that it
> >>>> should
> >>>>> be
> >>>>>>> clear to casual Flink users. For people with no other stream
> >>>> processing
> >>>>>>> experience, it might seem a bit esoteric compared to something
> >>>>>>> self-documenting like ‘emit()’, but the docs should  make it clear.
> >>>>>>>
> >>>>>>> One small question: it seems like this proposal is identical to
> >>>>>>> Suppressed.untilWindowClose, and the KIP states that this API is
> >>>>> superior.
> >>>>>>> In that case, should we deprecate Suppressed.untilWindowClose?
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> John
> >>>>>>>
> >>>>>>> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote:
> >>>>>>>> Hi Hao,
> >>>>>>>>
> >>>>>>>> For 2), I think it's a good idea in general to use a separate
> >>>>> function on
> >>>>>>>> the Time/SessionWindowedKStream itself, to achieve the same effect
> >>>>> that,
> >>>>>>>> for now, the emitting control is only for windowed aggregations as
> >>>> in
> >>>>>>> this
> >>>>>>>> KIP, than overloading existing functions. We can discuss further
> >>>> about
> >>>>>>> the
> >>>>>>>> actual function names, whether others like the name `trigger` or
> >>>> not.
> >>>>> As
> >>>>>>>> for myself I feel `trigger` is a good one but I'd like to see if
> >>>>> others
> >>>>>>>> have opinions as well.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>> On Mon, Mar 21, 2022 at 5:18 PM Hao Li <h...@confluent.io.invalid>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Guozhang,
> >>>>>>>>>
> >>>>>>>>> Thanks for the feedback.
> >>>>>>>>>
> >>>>>>>>> 1. I agree to have an `Emitted` control class and two static
> >>>>>>> constructors
> >>>>>>>>> named `onWindowClose` and `onEachUpdate`.
> >>>>>>>>>
> >>>>>>>>> 2. For the API function changes, I'm thinking of adding a new
> >>>>> function
> >>>>>>>>> called `trigger` to `TimeWindowedKStream` and
> >>>>> `SessionWindowedKStream`.
> >>>>>>> It
> >>>>>>>>> takes `Emitted` config and returns the same stream. Example:
> >>>>>>>>>
> >>>>>>>>> stream
> >>>>>>>>>     .groupBy(...)
> >>>>>>>>>     .windowedBy(...)
> >>>>>>>>>     .trigger(Emitted.onWindowClose). // N
> >>>>>>>>>     .count()
> >>>>>>>>>
> >>>>>>>>> The benefits are:
> >>>>>>>>>     1. It's simple and avoids creating overloading of existing
> >>>>> functions
> >>>>>>> like
> >>>>>>>>> `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add
> >>>> it
> >>>>> to
> >>>>>>>>> `aggregate` functions, we need to add it to all existing `count`,
> >>>>>>>>> `aggregate` overloading functions which is a lot.
> >>>>>>>>>     2. It operates directly on windowed kstream and tells how its
> >>>>> output
> >>>>>>>>> should be configured, if later we need to add this other type of
> >>>>>>> streams,
> >>>>>>>>> we can reuse same `trigger` API whereas other type of
> >>>> streams/tables
> >>>>> may
> >>>>>>>>> not have `aggregate`, `windowedby` api to make it consistent.
> >>>>>>>>>
> >>>>>>>>> Hao
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang <
> wangg...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hello Hao,
> >>>>>>>>>>
> >>>>>>>>>> I'm preferring option 2 over the other options mainly because
> the
> >>>>>>> added
> >>>>>>>>>> config object could potentially be used in other operators as
> >>>> well
> >>>>>>> (not
> >>>>>>>>>> necessarily has to be a windowed operator and hence have to be
> >>>>>>>>> piggy-backed
> >>>>>>>>>> on `windowedBy`, and that's also why I suggested not naming it
> >>>>>>>>>> `WindowConfig` but just `EmitConfig`).
> >>>>>>>>>>
> >>>>>>>>>> As for Matthias' question, I think the difference between the
> >>>>> windowed
> >>>>>>>>>> aggregate operator and the stream-stream join operator is that,
> >>>> for
> >>>>>>> the
> >>>>>>>>>> latter we think emit-final should be the only right emitting
> >>>> policy
> >>>>>>> and
> >>>>>>>>>> hence we should not let users to configure it. If users
> configure
> >>>>> it
> >>>>>>> to
> >>>>>>>>>> e.g. emit eager they may get the old spurious emitting behavior
> >>>>> which
> >>>>>>> is
> >>>>>>>>>> violating the semantics.
> >>>>>>>>>>
> >>>>>>>>>> For option 2) itself, I have a few more thoughts:
> >>>>>>>>>>
> >>>>>>>>>> 1. Thinking about Matthias' suggestions, I'm also leaning a bit
> >>>>>>>>>> towards adding the new param in the overloaded `aggregate`, than
> >>>>> the
> >>>>>>>>>> overloaded `windowBy` function. The reason is that the emitting
> >>>>> logic
> >>>>>>>>> could
> >>>>>>>>>> be either window based or non-window based, in the long run.
> >>>> Though
> >>>>>>> for
> >>>>>>>>>> this KIP we could just add it in
> >>>> `XXXWindowedKStream.aggregate()`,
> >>>>> we
> >>>>>>> may
> >>>>>>>>>> want to extend to other non-windowed operators in the future.
> >>>>>>>>>> 2. To be consistent with other control class names, I feel maybe
> >>>> we
> >>>>>>> can
> >>>>>>>>>> name it "Emitted", not "EmitConfig".
> >>>>>>>>>> 3. Following the first comment, I think we can have the static
> >>>>>>>>> constructor
> >>>>>>>>>> names as "onWindowClose" and "onEachUpdate".
> >>>>>>>>>>
> >>>>>>>>>> The resulted code pattern would be like this:
> >>>>>>>>>>
> >>>>>>>>>>      stream
> >>>>>>>>>>        .groupBy(..)
> >>>>>>>>>>        .windowBy(TimeWindow..)
> >>>>>>>>>>        .count(Emitted.onWindowClose)
> >>>>>>>>>>
> >>>>>>>>>> WDYT?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax <
> >>>> mj...@apache.org
> >>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>>>> `allowedLateness` may not be a good name. What I have in
> >>>> mind
> >>>>> is
> >>>>>>> to
> >>>>>>>>>> use
> >>>>>>>>>>>>> this to control how frequently we try to emit final results.
> >>>>>>> Maybe
> >>>>>>>>>> it's
> >>>>>>>>>>>>> more flexible to be used as config in properties as we don't
> >>>>>>> need to
> >>>>>>>>>>>>> recompile DSL to change it.
> >>>>>>>>>>>
> >>>>>>>>>>> I see; making it a config seems better. Frankly, I am not even
> >>>>> sure
> >>>>>>> if
> >>>>>>>>>>> we need a config at all or if we can just hard code it? For the
> >>>>>>>>>>> stream-stream join left/outer join fix, there is only an
> >>>> internal
> >>>>>>>>> config
> >>>>>>>>>>> but no public config either.
> >>>>>>>>>>>
> >>>>>>>>>>> Option 1: Your proposal is?
> >>>>>>>>>>>
> >>>>>>>>>>>      stream
> >>>>>>>>>>>        .groupByKey()
> >>>>>>>>>>>        .windowBy(TimeWindow.ofSizeNoGrace(...))
> >>>>>>>>>>>        .configure(EmitConfig.emitFinal()
> >>>>>>>>>>>        .count()
> >>>>>>>>>>>
> >>>>>>>>>>> Does not change my argument that it seems to be misplace from
> >>>> an
> >>>>> API
> >>>>>>>>>>> flow POV.
> >>>>>>>>>>>
> >>>>>>>>>>> Option 1 seems to be the least desirable to me.
> >>>>>>>>>>>
> >>>>>>>>>>> For option 2 and 3, and not sure which one I like better. Might
> >>>>> be
> >>>>>>> good
> >>>>>>>>>>> if other could chime in, too. I think I slightly prefer option
> >>>> 2
> >>>>>>> over
> >>>>>>>>>>> option 3.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> -Matthias
> >>>>>>>>>>>
> >>>>>>>>>>> On 3/15/22 5:33 PM, Hao Li wrote:
> >>>>>>>>>>>> Thanks for the feedback Matthias.
> >>>>>>>>>>>>
> >>>>>>>>>>>> `allowedLateness` may not be a good name. What I have in mind
> >>>>> is
> >>>>>>> to
> >>>>>>>>> use
> >>>>>>>>>>>> this to control how frequently we try to emit final results.
> >>>>> Maybe
> >>>>>>>>> it's
> >>>>>>>>>>>> more flexible to be used as config in properties as we don't
> >>>>> need
> >>>>>>> to
> >>>>>>>>>>>> recompile DSL to change it.
> >>>>>>>>>>>>
> >>>>>>>>>>>> For option 1, I intend to use `emitFinal` to configure how
> >>>>>>>>>>>> `TimeWindowedKStream` should be outputted to `KTable` after
> >>>>>>>>>> aggregation.
> >>>>>>>>>>>> But `emitFinal` is not an action to the `TimeWindowedKStream`
> >>>>>>>>>> interface.
> >>>>>>>>>>>> Maybe adding `configure(EmitConfig config)` makes more sense?
> >>>>>>>>>>>>
> >>>>>>>>>>>> For option 2, config can be created using
> >>>>>>> `WindowConfig.emitFinal()`
> >>>>>>>>> or
> >>>>>>>>>>>> `EmitConfig.emitFinal`
> >>>>>>>>>>>>
> >>>>>>>>>>>> For option 3, it will be something like `TimeWindows(...,
> >>>>>>> EmitConfig
> >>>>>>>>>>>> emitConfig)`.
> >>>>>>>>>>>>
> >>>>>>>>>>>> For putting `EmitConfig` in aggregation operator, I think it
> >>>>>>> doesn't
> >>>>>>>>>>>> control how we do aggregation but how we output to `KTable`.
> >>>>>>> That's
> >>>>>>>>>> why I
> >>>>>>>>>>>> feel option 1 makes more sense as it applies to
> >>>>>>>>> `TimeWindowedKStream`.
> >>>>>>>>>>> But
> >>>>>>>>>>>> I'm also OK with option 2.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Hao
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax <
> >>>>> mj...@apache.org
> >>>>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the KIP.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> A general comment: it seem that we won't need any new
> >>>>>>>>>> `allowedLateness`
> >>>>>>>>>>>>> parameter because the grace-period is defined on the window
> >>>>>>> itself
> >>>>>>>>>>> already?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> (On the other hand, if I think about it once more, maybe the
> >>>>>>>>>>>>> `grace-period` is actually not a property of the window but
> >>>> a
> >>>>>>>>> property
> >>>>>>>>>>>>> of the aggregation operator? _thinking_)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>     From an API flow point of view, option 1 might not be
> >>>>> desirable
> >>>>>>>>>> IMHO:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>       stream
> >>>>>>>>>>>>>         .groupByKey()
> >>>>>>>>>>>>>         .windowBy(TimeWindow.ofSizeNoGrace(...))
> >>>>>>>>>>>>>         .emitFinal()
> >>>>>>>>>>>>>         .count()
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The call to `emitFinal(0` seems not to be on the right place
> >>>>> for
> >>>>>>>>> this
> >>>>>>>>>>> case?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Option 2 might work (I think we need to discuss a few
> >>>> details
> >>>>> of
> >>>>>>> the
> >>>>>>>>>> API
> >>>>>>>>>>>>> though):
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>       stream
> >>>>>>>>>>>>>         .groupByKey()
> >>>>>>>>>>>>>         .windowBy(
> >>>>>>>>>>>>>           TimeWindow.ofSizeNoGrace(...),
> >>>>>>>>>>>>>           EmitConfig.emitFinal() -- just made this up; it's
> >>>> not
> >>>>> in
> >>>>>>> the
> >>>>>>>>>> KIP
> >>>>>>>>>>>>>         )
> >>>>>>>>>>>>>         .count()
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I made up the `WindowConfig.emitFinal()` call -- from the
> >>>> KIP
> >>>>>>> it's
> >>>>>>>>>>>>> unclear what API you have in mind? `EmitFinalConfig` has not
> >>>>>>> public
> >>>>>>>>>>>>> constructor not any builder method.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> For option 3, I am not sure what you really have in mind.
> >>>> Can
> >>>>> you
> >>>>>>>>>> given
> >>>>>>>>>>>>> a concrete example (similar to above) how users would write
> >>>>> their
> >>>>>>>>>> code?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Did you consider to actually pass in the `EmitConfig` into
> >>>> the
> >>>>>>>>>>>>> aggregation operator? In the end, it seems not to be
> >>>> property
> >>>>> of
> >>>>>>> the
> >>>>>>>>>>>>> window definition or windowing step, but a property of the
> >>>>> actual
> >>>>>>>>>>> operator:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>       stream
> >>>>>>>>>>>>>         .groupByKey()
> >>>>>>>>>>>>>         .windowBy(
> >>>>>>>>>>>>>           TimeWindow.ofSizeNoGrace(...)
> >>>>>>>>>>>>>         )
> >>>>>>>>>>>>>         .count(EmitConfig.emitFinal())
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The API surface area that need to be updated might be larger
> >>>>> for
> >>>>>>>>> this
> >>>>>>>>>>>>> case though...
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 3/14/22 9:21 PM, Hao Li wrote:
> >>>>>>>>>>>>>> Thanks Guozhang!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1. I agree `EmitConfig` is better than `WindowConfig` and
> >>>>>>> option 2
> >>>>>>>>>>>>> modifies
> >>>>>>>>>>>>>> less places. What do you think of option 1 which doesn't
> >>>>> change
> >>>>>>> the
> >>>>>>>>>>>>> current
> >>>>>>>>>>>>>> `windowedBy` api but configures `EmitConfig` separately.
> >>>> The
> >>>>>>>>> benefit
> >>>>>>>>>> of
> >>>>>>>>>>>>>> option 1 is if we need to configure something else later,
> >>>> we
> >>>>>>> don't
> >>>>>>>>>> need
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>> pile them on `windowedBy` but can add separate APIs.
> >>>>>>>>>>>>>> 2. I added it to `Stores` mainly to conform to
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231
> >>>>>>>>>>>>> .
> >>>>>>>>>>>>>> But We can also create an internal API to do that without
> >>>>>>> modifying
> >>>>>>>>>>>>>> `Stores`.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hao
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang <
> >>>>>>> wangg...@gmail.com>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hello Hao,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for the proposal, I have some preference among the
> >>>>>>> options
> >>>>>>>>>> here
> >>>>>>>>>>>>> so I
> >>>>>>>>>>>>>>> will copy them here:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I'm now thinking if it's better to not add this new config
> >>>>> on
> >>>>>>> each
> >>>>>>>>>> of
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> Window interfaces, but instead add that at the
> >>>>>>>>>>> KGroupedStream#windowedBy
> >>>>>>>>>>>>>>> function. Also instead of adding just a boolean flag,
> >>>> maybe
> >>>>> we
> >>>>>>> can
> >>>>>>>>>>> add a
> >>>>>>>>>>>>>>> Configured class like Grouped, Suppressed, etc, e.g. let's
> >>>>> call
> >>>>>>>>> it a
> >>>>>>>>>>>>>>> Emitted which for now would just have a single construct
> >>>> as
> >>>>>>>>>>>>>>> Emitted.atWindowClose whose semantics is the same as
> >>>>> emitFinal
> >>>>>>> ==
> >>>>>>>>>>> true.
> >>>>>>>>>>>>> I
> >>>>>>>>>>>>>>> think the benefits are:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1) you do not need to modify multiple Window classes, but
> >>>>> just
> >>>>>>>>>>> overload
> >>>>>>>>>>>>> one
> >>>>>>>>>>>>>>> windowedBy function with a second param. This is less of a
> >>>>>>> scope
> >>>>>>>>> for
> >>>>>>>>>>>>> now,
> >>>>>>>>>>>>>>> and also more extensible for any future changes.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 2) With a config interface, we maintain its extensibility
> >>>> as
> >>>>>>> well
> >>>>>>>>> as
> >>>>>>>>>>>>> being
> >>>>>>>>>>>>>>> able to reuse this Emitted interface for other operators
> >>>> if
> >>>>> we
> >>>>>>>>>> wanted
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>> expand to.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> ----------------------------
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> So in general I'm leaning towards option 2). For that,
> >>>> some
> >>>>>>> more
> >>>>>>>>>>>>> detailed
> >>>>>>>>>>>>>>> comments:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1) If we want to reuse that config object for other
> >>>>> non-window
> >>>>>>>>>>> stateful
> >>>>>>>>>>>>>>> operations, I think naming it as `EmitConfig` is probably
> >>>>>>> better
> >>>>>>>>>> than
> >>>>>>>>>>>>>>> `WindowConfig`.
> >>>>>>>>>>>>>>> 2) I saw your PR (
> >>>>> https://github.com/apache/kafka/pull/11892)
> >>>>>>>>> that
> >>>>>>>>>>> you
> >>>>>>>>>>>>> are
> >>>>>>>>>>>>>>> also proposing to add new stores into the public factory
> >>>>>>> Stores,
> >>>>>>>>> but
> >>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>> not included in the KIP. Is that intentional? Personally I
> >>>>>>> think
> >>>>>>>>>> that
> >>>>>>>>>>>>>>> although we may eventually want to add a new store type to
> >>>>> the
> >>>>>>>>>> public
> >>>>>>>>>>>>> APIs,
> >>>>>>>>>>>>>>> for this KIP maybe we do not have to add them but can
> >>>> delay
> >>>>> for
> >>>>>>>>>> later
> >>>>>>>>>>>>> after
> >>>>>>>>>>>>>>> we've learned the best way to layout. LMK what do you
> >>>> think?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Fri, Mar 11, 2022 at 2:13 PM Hao Li
> >>>>>>> <h...@confluent.io.invalid>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Dev team,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I'd like to start a discussion thread on Kafka Streams
> >>>>>>> KIP-825:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> This KIP is aimed to add new APIs to support outputting
> >>>>> final
> >>>>>>>>>>>>> aggregated
> >>>>>>>>>>>>>>>> results for windowed aggregations. I listed several
> >>>> options
> >>>>>>> there
> >>>>>>>>>> and
> >>>>>>>>>>>>> I'm
> >>>>>>>>>>>>>>>> looking forward to your feedback.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>> Hao
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> -- Guozhang
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Thanks,
> >>>>>>>>> Hao
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> -- Guozhang
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>>
> >>>
> >>
> >
> >
>


-- 
Thanks,
Hao

Reply via email to