I see what you mean now, and I think it's a fair point that composing
`trigger` and `emitted` seems awkward.

Re: data process operator v.s. control operator, I shared your concern as
well, and here's my train of thoughts: Having only data process operators
was my primary motivation for how we add the suppress operator --- it
indeed "suppresses" data. But as a hind-sight it's disadvantage is that,
for example in Suppressed.onWindowClose() should be only related to an
earlier windowedBy operator which is possibly very far from it in the
resulting DSL code. It's not only a bit awkward for users to write such
code, but also in such cases the DSL builder needs to maintain and
propagate this information to the suppress operator further down. So we are
now thinking about "putting the control object as close as to where the
related processor really happens". And in that world my original
preference was somewhere in option 2), i.e. just put the control as a param
of the related "windowedBy" operator, but the trade-off is we keep adding
overloaded functions to these operators. So after some back and forth
thoughts I'm learning towards relaxing our principles to only have
processing operators but no flow-control operators. That being said, if you
have any ideas that we can have both world's benefits I'm all ears.

Re: using a direct function like "windowedStream.onWindowClose()" v.s.
"windowedStream.someFunc(Controlled.onWindowClose)", again my motivation
for the latter is for extensibility without adding more functions in the
future. If people feel this is not worthy we can do the first option as
well. If we just feel the `trigger` and `emitted` does not feel composible
together, maybe we can consider something like
`windowedStream.trigger(Triggered.onWindowClose())"?

Re: windowedBy v.s. windowBy, yeah I do not really have a good reason why
we should use past term as well :P But if it's not bothering people much
I'd say we just keep it than deprecate/rename new APIs.


On Tue, Mar 22, 2022 at 9:42 AM Bruno Cadonna <cado...@apache.org> wrote:

> Hi Guozhang,
>
> There is no semantic difference. It is a cosmetic difference.
> Conceptually, I relate `Emitted` with the aggregation and not with
> `trigger()` in the API flow, because the aggregation emits the result
> not `trigger()`. Therefore, I proposed to not use `Emitted` as the name
> of the config object passed to `trigger()`.
>
>
> Best,
> Bruno
>
> On 22.03.22 17:24, Guozhang Wang wrote:
> > Hi Bruno,
> >
> > Could you elaborate a bit more here, what's the semantic difference
> between
> > "the aggregation is triggered on window close and all aggregation results
> > are emitted." for trigger(TriggerParameters.onWindowClose()), and "the
> > aggregation is configured to only emit final results." for
> > trigger(Emitted.onWindowClose())?
> >
> > On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna <cado...@apache.org>
> wrote:
> >
> >> Hi Hao,
> >>
> >> Thank you for the KIP!
> >>
> >> Regarding option 1, I would not use `Emitted.onWindowClose()` since that
> >> does not seem compatible with the proposed flow. Conceptually, now the
> >> flow states that the aggregation is triggered on window close and all
> >> aggregation results are emitted. `Emitted` suggests that the aggregation
> >> is configured to only emit final results.
> >>
> >> Thus, I propose the following:
> >>
> >> stream
> >>       .groupBy(..)
> >>       .windowedBy(..)
> >>       .trigger(TriggerParameters.onWindowClose())
> >>       .aggregate(..) //result in a KTable<Windowed<..>>
> >>       .mapValues(..)
> >>
> >> An alternative to `trigger()` could be `schedule()`, but I do not really
> >> like it.
> >>
> >> One thing I noticed with option 1 is that all other methods in the
> >> example above are operations on data. `groupBy()` groups, `windowedBy()`
> >> partitions, `aggregate()` computes the aggregate, `mapValues()` maps
> >> values, even `suppress()` suppresses intermediate results. But what does
> >> `trigger()` do? `trigger()` seems a config lost among operations.
> >>
> >> However, if we do not want to restrict ourselves to only use methods
> >> when we want to specify operations on data, I have the following
> proposal:
> >>
> >> stream
> >>       .groupBy(..)
> >>       .windowedBy(..)
> >>       .onWindowClose()
> >>       .aggregate(..) //result in a KTable<Windowed<..>>
> >>       .mapValues(..)
> >>
> >> Best,
> >> Bruno
> >>
> >> P.S.: Why is it `windowedBy()` and not `windowBy()`? All other
> >> operations also use present tense.
> >>
> >> On 22.03.22 06:36, Hao Li wrote:
> >>> Hi John,
> >>>
> >>> Yes. For naming, `trigger` is similar to Flink's trigger, but it has a
> >>> different meaning in our case. `emit` sounds like an action to emit?
> How
> >>> about `emitTrigger`? I'm open to suggestions for the naming.
> >>>
> >>> For deprecating `Suppressed.untilWindowClose`, I agree with Guozhang we
> >> can
> >>> deprecate `Suppressed` config as a whole later. Or we can deprecate
> >>> `Suppressed.untilWindowClose` in later KIP after implementation of emit
> >>> final is done.
> >>>
> >>> BTW, isn't
> >>>
> >>> stream
> >>>     .groupBy(..)
> >>>     .windowBy(..)
> >>>     .aggregate(..) //result in a KTable<Windowed<..>>
> >>>     .mapValues(..)
> >>>     .suppress(Suppressed.untilWindowClose) // since we can trace back
> to
> >>> parent node, to find a window definition
> >>>
> >>> same as
> >>>
> >>> stream
> >>>     .groupBy(..)
> >>>     .windowBy(..)
> >>>     .trigger(Emitted.onWindowClose)
> >>>     .aggregate(..) //result in a KTable<Windowed<..>>
> >>>     .mapValues(..)
> >>> ?
> >>>
> >>> Hao
> >>>
> >>>
> >>> On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >>>
> >>>> I think the following case is only doable via `suppress`:
> >>>>
> >>>> stream
> >>>>     .groupBy(..)
> >>>>     .windowBy(..)
> >>>>     .aggregate(..) //result in a KTable<Windowed<..>>
> >>>>     .mapValues(..)
> >>>>     .suppress(Suppressed.untilWindowClose) // since we can trace back
> to
> >>>> parent node, to find a window definition
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>> On Mon, Mar 21, 2022 at 6:36 PM John Roesler <vvcep...@apache.org>
> >> wrote:
> >>>>
> >>>>> Thanks, Guozhang!
> >>>>>
> >>>>> To clarify, I was asking specifically about deprecating just the
> method
> >>>>> ‘untilWindowClose’. I might not be thinking clearly about it, though.
> >>>> What
> >>>>> does untilWindowClose do that this KIP doesn’t cover?
> >>>>>
> >>>>> Thanks,
> >>>>> John
> >>>>>
> >>>>> On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote:
> >>>>>> Just my 2c: Suppressed is in `suppress` whose application scope is
> >> much
> >>>>>> larger and hence more flexible. I.e. it can be used anywhere for a
> >>>>> `KTable`
> >>>>>> (but internally we would check whether certain emit policies like
> >>>>>> `untilWindowClose` is valid or not), whereas `trigger` as for now is
> >>>> only
> >>>>>> applicable in XXWindowedKStream. So I think it would not be
> completely
> >>>>>> replacing Suppressed.untilWindowClose.
> >>>>>>
> >>>>>> In the future, personally I'd still want to keep one control object
> >>>> still
> >>>>>> for all emit policies, and maybe if we have extended Emitted for
> other
> >>>>>> emitting policies covered by Suppressed today, we can discuss if we
> >>>> could
> >>>>>> have `KTable.suppress(Emitted..)` replacing
> >>>>> `KTable.suppress(Suppressed..)`
> >>>>>> as a whole, but for this KIP I think it's too early.
> >>>>>>
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Mar 21, 2022 at 6:18 PM John Roesler <vvcep...@apache.org>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> Thanks for the Kip, Hao!
> >>>>>>>
> >>>>>>> For what it’s worth, I’m also in favor of your latest framing of
> the
> >>>>> API,
> >>>>>>>
> >>>>>>> I think the name is fine. I assume it’s inspired by Flink? It’s not
> >>>>>>> identical to the concept of a trigger in Flink, which specifies
> when
> >>>> to
> >>>>>>> evaluate the window, which might be confusing to some people who
> have
> >>>>> deep
> >>>>>>> experience with Flink. Then again, it seems close enough that it
> >>>> should
> >>>>> be
> >>>>>>> clear to casual Flink users. For people with no other stream
> >>>> processing
> >>>>>>> experience, it might seem a bit esoteric compared to something
> >>>>>>> self-documenting like ‘emit()’, but the docs should  make it clear.
> >>>>>>>
> >>>>>>> One small question: it seems like this proposal is identical to
> >>>>>>> Suppressed.untilWindowClose, and the KIP states that this API is
> >>>>> superior.
> >>>>>>> In that case, should we deprecate Suppressed.untilWindowClose?
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> John
> >>>>>>>
> >>>>>>> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote:
> >>>>>>>> Hi Hao,
> >>>>>>>>
> >>>>>>>> For 2), I think it's a good idea in general to use a separate
> >>>>> function on
> >>>>>>>> the Time/SessionWindowedKStream itself, to achieve the same effect
> >>>>> that,
> >>>>>>>> for now, the emitting control is only for windowed aggregations as
> >>>> in
> >>>>>>> this
> >>>>>>>> KIP, than overloading existing functions. We can discuss further
> >>>> about
> >>>>>>> the
> >>>>>>>> actual function names, whether others like the name `trigger` or
> >>>> not.
> >>>>> As
> >>>>>>>> for myself I feel `trigger` is a good one but I'd like to see if
> >>>>> others
> >>>>>>>> have opinions as well.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>> On Mon, Mar 21, 2022 at 5:18 PM Hao Li <h...@confluent.io.invalid>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Guozhang,
> >>>>>>>>>
> >>>>>>>>> Thanks for the feedback.
> >>>>>>>>>
> >>>>>>>>> 1. I agree to have an `Emitted` control class and two static
> >>>>>>> constructors
> >>>>>>>>> named `onWindowClose` and `onEachUpdate`.
> >>>>>>>>>
> >>>>>>>>> 2. For the API function changes, I'm thinking of adding a new
> >>>>> function
> >>>>>>>>> called `trigger` to `TimeWindowedKStream` and
> >>>>> `SessionWindowedKStream`.
> >>>>>>> It
> >>>>>>>>> takes `Emitted` config and returns the same stream. Example:
> >>>>>>>>>
> >>>>>>>>> stream
> >>>>>>>>>     .groupBy(...)
> >>>>>>>>>     .windowedBy(...)
> >>>>>>>>>     .trigger(Emitted.onWindowClose). // N
> >>>>>>>>>     .count()
> >>>>>>>>>
> >>>>>>>>> The benefits are:
> >>>>>>>>>     1. It's simple and avoids creating overloading of existing
> >>>>> functions
> >>>>>>> like
> >>>>>>>>> `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add
> >>>> it
> >>>>> to
> >>>>>>>>> `aggregate` functions, we need to add it to all existing `count`,
> >>>>>>>>> `aggregate` overloading functions which is a lot.
> >>>>>>>>>     2. It operates directly on windowed kstream and tells how its
> >>>>> output
> >>>>>>>>> should be configured, if later we need to add this other type of
> >>>>>>> streams,
> >>>>>>>>> we can reuse same `trigger` API whereas other type of
> >>>> streams/tables
> >>>>> may
> >>>>>>>>> not have `aggregate`, `windowedby` api to make it consistent.
> >>>>>>>>>
> >>>>>>>>> Hao
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang <
> wangg...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hello Hao,
> >>>>>>>>>>
> >>>>>>>>>> I'm preferring option 2 over the other options mainly because
> the
> >>>>>>> added
> >>>>>>>>>> config object could potentially be used in other operators as
> >>>> well
> >>>>>>> (not
> >>>>>>>>>> necessarily has to be a windowed operator and hence have to be
> >>>>>>>>> piggy-backed
> >>>>>>>>>> on `windowedBy`, and that's also why I suggested not naming it
> >>>>>>>>>> `WindowConfig` but just `EmitConfig`).
> >>>>>>>>>>
> >>>>>>>>>> As for Matthias' question, I think the difference between the
> >>>>> windowed
> >>>>>>>>>> aggregate operator and the stream-stream join operator is that,
> >>>> for
> >>>>>>> the
> >>>>>>>>>> latter we think emit-final should be the only right emitting
> >>>> policy
> >>>>>>> and
> >>>>>>>>>> hence we should not let users to configure it. If users
> configure
> >>>>> it
> >>>>>>> to
> >>>>>>>>>> e.g. emit eager they may get the old spurious emitting behavior
> >>>>> which
> >>>>>>> is
> >>>>>>>>>> violating the semantics.
> >>>>>>>>>>
> >>>>>>>>>> For option 2) itself, I have a few more thoughts:
> >>>>>>>>>>
> >>>>>>>>>> 1. Thinking about Matthias' suggestions, I'm also leaning a bit
> >>>>>>>>>> towards adding the new param in the overloaded `aggregate`, than
> >>>>> the
> >>>>>>>>>> overloaded `windowBy` function. The reason is that the emitting
> >>>>> logic
> >>>>>>>>> could
> >>>>>>>>>> be either window based or non-window based, in the long run.
> >>>> Though
> >>>>>>> for
> >>>>>>>>>> this KIP we could just add it in
> >>>> `XXXWindowedKStream.aggregate()`,
> >>>>> we
> >>>>>>> may
> >>>>>>>>>> want to extend to other non-windowed operators in the future.
> >>>>>>>>>> 2. To be consistent with other control class names, I feel maybe
> >>>> we
> >>>>>>> can
> >>>>>>>>>> name it "Emitted", not "EmitConfig".
> >>>>>>>>>> 3. Following the first comment, I think we can have the static
> >>>>>>>>> constructor
> >>>>>>>>>> names as "onWindowClose" and "onEachUpdate".
> >>>>>>>>>>
> >>>>>>>>>> The resulted code pattern would be like this:
> >>>>>>>>>>
> >>>>>>>>>>      stream
> >>>>>>>>>>        .groupBy(..)
> >>>>>>>>>>        .windowBy(TimeWindow..)
> >>>>>>>>>>        .count(Emitted.onWindowClose)
> >>>>>>>>>>
> >>>>>>>>>> WDYT?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax <
> >>>> mj...@apache.org
> >>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>>>> `allowedLateness` may not be a good name. What I have in
> >>>> mind
> >>>>> is
> >>>>>>> to
> >>>>>>>>>> use
> >>>>>>>>>>>>> this to control how frequently we try to emit final results.
> >>>>>>> Maybe
> >>>>>>>>>> it's
> >>>>>>>>>>>>> more flexible to be used as config in properties as we don't
> >>>>>>> need to
> >>>>>>>>>>>>> recompile DSL to change it.
> >>>>>>>>>>>
> >>>>>>>>>>> I see; making it a config seems better. Frankly, I am not even
> >>>>> sure
> >>>>>>> if
> >>>>>>>>>>> we need a config at all or if we can just hard code it? For the
> >>>>>>>>>>> stream-stream join left/outer join fix, there is only an
> >>>> internal
> >>>>>>>>> config
> >>>>>>>>>>> but no public config either.
> >>>>>>>>>>>
> >>>>>>>>>>> Option 1: Your proposal is?
> >>>>>>>>>>>
> >>>>>>>>>>>      stream
> >>>>>>>>>>>        .groupByKey()
> >>>>>>>>>>>        .windowBy(TimeWindow.ofSizeNoGrace(...))
> >>>>>>>>>>>        .configure(EmitConfig.emitFinal()
> >>>>>>>>>>>        .count()
> >>>>>>>>>>>
> >>>>>>>>>>> Does not change my argument that it seems to be misplace from
> >>>> an
> >>>>> API
> >>>>>>>>>>> flow POV.
> >>>>>>>>>>>
> >>>>>>>>>>> Option 1 seems to be the least desirable to me.
> >>>>>>>>>>>
> >>>>>>>>>>> For option 2 and 3, and not sure which one I like better. Might
> >>>>> be
> >>>>>>> good
> >>>>>>>>>>> if other could chime in, too. I think I slightly prefer option
> >>>> 2
> >>>>>>> over
> >>>>>>>>>>> option 3.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> -Matthias
> >>>>>>>>>>>
> >>>>>>>>>>> On 3/15/22 5:33 PM, Hao Li wrote:
> >>>>>>>>>>>> Thanks for the feedback Matthias.
> >>>>>>>>>>>>
> >>>>>>>>>>>> `allowedLateness` may not be a good name. What I have in mind
> >>>>> is
> >>>>>>> to
> >>>>>>>>> use
> >>>>>>>>>>>> this to control how frequently we try to emit final results.
> >>>>> Maybe
> >>>>>>>>> it's
> >>>>>>>>>>>> more flexible to be used as config in properties as we don't
> >>>>> need
> >>>>>>> to
> >>>>>>>>>>>> recompile DSL to change it.
> >>>>>>>>>>>>
> >>>>>>>>>>>> For option 1, I intend to use `emitFinal` to configure how
> >>>>>>>>>>>> `TimeWindowedKStream` should be outputted to `KTable` after
> >>>>>>>>>> aggregation.
> >>>>>>>>>>>> But `emitFinal` is not an action to the `TimeWindowedKStream`
> >>>>>>>>>> interface.
> >>>>>>>>>>>> Maybe adding `configure(EmitConfig config)` makes more sense?
> >>>>>>>>>>>>
> >>>>>>>>>>>> For option 2, config can be created using
> >>>>>>> `WindowConfig.emitFinal()`
> >>>>>>>>> or
> >>>>>>>>>>>> `EmitConfig.emitFinal`
> >>>>>>>>>>>>
> >>>>>>>>>>>> For option 3, it will be something like `TimeWindows(...,
> >>>>>>> EmitConfig
> >>>>>>>>>>>> emitConfig)`.
> >>>>>>>>>>>>
> >>>>>>>>>>>> For putting `EmitConfig` in aggregation operator, I think it
> >>>>>>> doesn't
> >>>>>>>>>>>> control how we do aggregation but how we output to `KTable`.
> >>>>>>> That's
> >>>>>>>>>> why I
> >>>>>>>>>>>> feel option 1 makes more sense as it applies to
> >>>>>>>>> `TimeWindowedKStream`.
> >>>>>>>>>>> But
> >>>>>>>>>>>> I'm also OK with option 2.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Hao
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax <
> >>>>> mj...@apache.org
> >>>>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the KIP.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> A general comment: it seem that we won't need any new
> >>>>>>>>>> `allowedLateness`
> >>>>>>>>>>>>> parameter because the grace-period is defined on the window
> >>>>>>> itself
> >>>>>>>>>>> already?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> (On the other hand, if I think about it once more, maybe the
> >>>>>>>>>>>>> `grace-period` is actually not a property of the window but
> >>>> a
> >>>>>>>>> property
> >>>>>>>>>>>>> of the aggregation operator? _thinking_)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>     From an API flow point of view, option 1 might not be
> >>>>> desirable
> >>>>>>>>>> IMHO:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>       stream
> >>>>>>>>>>>>>         .groupByKey()
> >>>>>>>>>>>>>         .windowBy(TimeWindow.ofSizeNoGrace(...))
> >>>>>>>>>>>>>         .emitFinal()
> >>>>>>>>>>>>>         .count()
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The call to `emitFinal(0` seems not to be on the right place
> >>>>> for
> >>>>>>>>> this
> >>>>>>>>>>> case?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Option 2 might work (I think we need to discuss a few
> >>>> details
> >>>>> of
> >>>>>>> the
> >>>>>>>>>> API
> >>>>>>>>>>>>> though):
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>       stream
> >>>>>>>>>>>>>         .groupByKey()
> >>>>>>>>>>>>>         .windowBy(
> >>>>>>>>>>>>>           TimeWindow.ofSizeNoGrace(...),
> >>>>>>>>>>>>>           EmitConfig.emitFinal() -- just made this up; it's
> >>>> not
> >>>>> in
> >>>>>>> the
> >>>>>>>>>> KIP
> >>>>>>>>>>>>>         )
> >>>>>>>>>>>>>         .count()
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I made up the `WindowConfig.emitFinal()` call -- from the
> >>>> KIP
> >>>>>>> it's
> >>>>>>>>>>>>> unclear what API you have in mind? `EmitFinalConfig` has not
> >>>>>>> public
> >>>>>>>>>>>>> constructor not any builder method.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> For option 3, I am not sure what you really have in mind.
> >>>> Can
> >>>>> you
> >>>>>>>>>> given
> >>>>>>>>>>>>> a concrete example (similar to above) how users would write
> >>>>> their
> >>>>>>>>>> code?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Did you consider to actually pass in the `EmitConfig` into
> >>>> the
> >>>>>>>>>>>>> aggregation operator? In the end, it seems not to be
> >>>> property
> >>>>> of
> >>>>>>> the
> >>>>>>>>>>>>> window definition or windowing step, but a property of the
> >>>>> actual
> >>>>>>>>>>> operator:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>       stream
> >>>>>>>>>>>>>         .groupByKey()
> >>>>>>>>>>>>>         .windowBy(
> >>>>>>>>>>>>>           TimeWindow.ofSizeNoGrace(...)
> >>>>>>>>>>>>>         )
> >>>>>>>>>>>>>         .count(EmitConfig.emitFinal())
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The API surface area that need to be updated might be larger
> >>>>> for
> >>>>>>>>> this
> >>>>>>>>>>>>> case though...
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 3/14/22 9:21 PM, Hao Li wrote:
> >>>>>>>>>>>>>> Thanks Guozhang!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1. I agree `EmitConfig` is better than `WindowConfig` and
> >>>>>>> option 2
> >>>>>>>>>>>>> modifies
> >>>>>>>>>>>>>> less places. What do you think of option 1 which doesn't
> >>>>> change
> >>>>>>> the
> >>>>>>>>>>>>> current
> >>>>>>>>>>>>>> `windowedBy` api but configures `EmitConfig` separately.
> >>>> The
> >>>>>>>>> benefit
> >>>>>>>>>> of
> >>>>>>>>>>>>>> option 1 is if we need to configure something else later,
> >>>> we
> >>>>>>> don't
> >>>>>>>>>> need
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>> pile them on `windowedBy` but can add separate APIs.
> >>>>>>>>>>>>>> 2. I added it to `Stores` mainly to conform to
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231
> >>>>>>>>>>>>> .
> >>>>>>>>>>>>>> But We can also create an internal API to do that without
> >>>>>>> modifying
> >>>>>>>>>>>>>> `Stores`.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hao
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang <
> >>>>>>> wangg...@gmail.com>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hello Hao,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for the proposal, I have some preference among the
> >>>>>>> options
> >>>>>>>>>> here
> >>>>>>>>>>>>> so I
> >>>>>>>>>>>>>>> will copy them here:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I'm now thinking if it's better to not add this new config
> >>>>> on
> >>>>>>> each
> >>>>>>>>>> of
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> Window interfaces, but instead add that at the
> >>>>>>>>>>> KGroupedStream#windowedBy
> >>>>>>>>>>>>>>> function. Also instead of adding just a boolean flag,
> >>>> maybe
> >>>>> we
> >>>>>>> can
> >>>>>>>>>>> add a
> >>>>>>>>>>>>>>> Configured class like Grouped, Suppressed, etc, e.g. let's
> >>>>> call
> >>>>>>>>> it a
> >>>>>>>>>>>>>>> Emitted which for now would just have a single construct
> >>>> as
> >>>>>>>>>>>>>>> Emitted.atWindowClose whose semantics is the same as
> >>>>> emitFinal
> >>>>>>> ==
> >>>>>>>>>>> true.
> >>>>>>>>>>>>> I
> >>>>>>>>>>>>>>> think the benefits are:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1) you do not need to modify multiple Window classes, but
> >>>>> just
> >>>>>>>>>>> overload
> >>>>>>>>>>>>> one
> >>>>>>>>>>>>>>> windowedBy function with a second param. This is less of a
> >>>>>>> scope
> >>>>>>>>> for
> >>>>>>>>>>>>> now,
> >>>>>>>>>>>>>>> and also more extensible for any future changes.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 2) With a config interface, we maintain its extensibility
> >>>> as
> >>>>>>> well
> >>>>>>>>> as
> >>>>>>>>>>>>> being
> >>>>>>>>>>>>>>> able to reuse this Emitted interface for other operators
> >>>> if
> >>>>> we
> >>>>>>>>>> wanted
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>> expand to.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> ----------------------------
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> So in general I'm leaning towards option 2). For that,
> >>>> some
> >>>>>>> more
> >>>>>>>>>>>>> detailed
> >>>>>>>>>>>>>>> comments:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1) If we want to reuse that config object for other
> >>>>> non-window
> >>>>>>>>>>> stateful
> >>>>>>>>>>>>>>> operations, I think naming it as `EmitConfig` is probably
> >>>>>>> better
> >>>>>>>>>> than
> >>>>>>>>>>>>>>> `WindowConfig`.
> >>>>>>>>>>>>>>> 2) I saw your PR (
> >>>>> https://github.com/apache/kafka/pull/11892)
> >>>>>>>>> that
> >>>>>>>>>>> you
> >>>>>>>>>>>>> are
> >>>>>>>>>>>>>>> also proposing to add new stores into the public factory
> >>>>>>> Stores,
> >>>>>>>>> but
> >>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>> not included in the KIP. Is that intentional? Personally I
> >>>>>>> think
> >>>>>>>>>> that
> >>>>>>>>>>>>>>> although we may eventually want to add a new store type to
> >>>>> the
> >>>>>>>>>> public
> >>>>>>>>>>>>> APIs,
> >>>>>>>>>>>>>>> for this KIP maybe we do not have to add them but can
> >>>> delay
> >>>>> for
> >>>>>>>>>> later
> >>>>>>>>>>>>> after
> >>>>>>>>>>>>>>> we've learned the best way to layout. LMK what do you
> >>>> think?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Fri, Mar 11, 2022 at 2:13 PM Hao Li
> >>>>>>> <h...@confluent.io.invalid>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Dev team,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I'd like to start a discussion thread on Kafka Streams
> >>>>>>> KIP-825:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> This KIP is aimed to add new APIs to support outputting
> >>>>> final
> >>>>>>>>>>>>> aggregated
> >>>>>>>>>>>>>>>> results for windowed aggregations. I listed several
> >>>> options
> >>>>>>> there
> >>>>>>>>>> and
> >>>>>>>>>>>>> I'm
> >>>>>>>>>>>>>>>> looking forward to your feedback.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>> Hao
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> -- Guozhang
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Thanks,
> >>>>>>>>> Hao
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> -- Guozhang
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>>
> >>>
> >>
> >
> >
>


-- 
-- Guozhang

Reply via email to