Hi John,

Yes. For naming, `trigger` is similar to Flink's trigger, but it has a
different meaning in our case. `emit` sounds like an action to emit? How
about `emitTrigger`? I'm open to suggestions for the naming.

For deprecating `Suppressed.untilWindowClose`, I agree with Guozhang we can
deprecate `Suppressed` config as a whole later. Or we can deprecate
`Suppressed.untilWindowClose` in later KIP after implementation of emit
final is done.

BTW, isn't

stream
  .groupBy(..)
  .windowBy(..)
  .aggregate(..) //result in a KTable<Windowed<..>>
  .mapValues(..)
  .suppress(Suppressed.untilWindowClose) // since we can trace back to
parent node, to find a window definition

same as

stream
  .groupBy(..)
  .windowBy(..)
  .trigger(Emitted.onWindowClose)
  .aggregate(..) //result in a KTable<Windowed<..>>
  .mapValues(..)
?

Hao


On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang <wangg...@gmail.com> wrote:

> I think the following case is only doable via `suppress`:
>
> stream
>   .groupBy(..)
>   .windowBy(..)
>   .aggregate(..) //result in a KTable<Windowed<..>>
>   .mapValues(..)
>   .suppress(Suppressed.untilWindowClose) // since we can trace back to
> parent node, to find a window definition
>
>
> Guozhang
>
>
> On Mon, Mar 21, 2022 at 6:36 PM John Roesler <vvcep...@apache.org> wrote:
>
> > Thanks, Guozhang!
> >
> > To clarify, I was asking specifically about deprecating just the method
> > ‘untilWindowClose’. I might not be thinking clearly about it, though.
> What
> > does untilWindowClose do that this KIP doesn’t cover?
> >
> > Thanks,
> > John
> >
> > On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote:
> > > Just my 2c: Suppressed is in `suppress` whose application scope is much
> > > larger and hence more flexible. I.e. it can be used anywhere for a
> > `KTable`
> > > (but internally we would check whether certain emit policies like
> > > `untilWindowClose` is valid or not), whereas `trigger` as for now is
> only
> > > applicable in XXWindowedKStream. So I think it would not be completely
> > > replacing Suppressed.untilWindowClose.
> > >
> > > In the future, personally I'd still want to keep one control object
> still
> > > for all emit policies, and maybe if we have extended Emitted for other
> > > emitting policies covered by Suppressed today, we can discuss if we
> could
> > > have `KTable.suppress(Emitted..)` replacing
> > `KTable.suppress(Suppressed..)`
> > > as a whole, but for this KIP I think it's too early.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Mar 21, 2022 at 6:18 PM John Roesler <vvcep...@apache.org>
> > wrote:
> > >
> > >> Hi all,
> > >>
> > >> Thanks for the Kip, Hao!
> > >>
> > >> For what it’s worth, I’m also in favor of your latest framing of the
> > API,
> > >>
> > >> I think the name is fine. I assume it’s inspired by Flink? It’s not
> > >> identical to the concept of a trigger in Flink, which specifies when
> to
> > >> evaluate the window, which might be confusing to some people who have
> > deep
> > >> experience with Flink. Then again, it seems close enough that it
> should
> > be
> > >> clear to casual Flink users. For people with no other stream
> processing
> > >> experience, it might seem a bit esoteric compared to something
> > >> self-documenting like ‘emit()’, but the docs should  make it clear.
> > >>
> > >> One small question: it seems like this proposal is identical to
> > >> Suppressed.untilWindowClose, and the KIP states that this API is
> > superior.
> > >> In that case, should we deprecate Suppressed.untilWindowClose?
> > >>
> > >> Thanks,
> > >> John
> > >>
> > >> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote:
> > >> > Hi Hao,
> > >> >
> > >> > For 2), I think it's a good idea in general to use a separate
> > function on
> > >> > the Time/SessionWindowedKStream itself, to achieve the same effect
> > that,
> > >> > for now, the emitting control is only for windowed aggregations as
> in
> > >> this
> > >> > KIP, than overloading existing functions. We can discuss further
> about
> > >> the
> > >> > actual function names, whether others like the name `trigger` or
> not.
> > As
> > >> > for myself I feel `trigger` is a good one but I'd like to see if
> > others
> > >> > have opinions as well.
> > >> >
> > >> >
> > >> > Guozhang
> > >> >
> > >> > On Mon, Mar 21, 2022 at 5:18 PM Hao Li <h...@confluent.io.invalid>
> > wrote:
> > >> >
> > >> >> Hi Guozhang,
> > >> >>
> > >> >> Thanks for the feedback.
> > >> >>
> > >> >> 1. I agree to have an `Emitted` control class and two static
> > >> constructors
> > >> >> named `onWindowClose` and `onEachUpdate`.
> > >> >>
> > >> >> 2. For the API function changes, I'm thinking of adding a new
> > function
> > >> >> called `trigger` to `TimeWindowedKStream` and
> > `SessionWindowedKStream`.
> > >> It
> > >> >> takes `Emitted` config and returns the same stream. Example:
> > >> >>
> > >> >> stream
> > >> >>   .groupBy(...)
> > >> >>   .windowedBy(...)
> > >> >>   .trigger(Emitted.onWindowClose). // N
> > >> >>   .count()
> > >> >>
> > >> >> The benefits are:
> > >> >>   1. It's simple and avoids creating overloading of existing
> > functions
> > >> like
> > >> >> `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add
> it
> > to
> > >> >> `aggregate` functions, we need to add it to all existing `count`,
> > >> >> `aggregate` overloading functions which is a lot.
> > >> >>   2. It operates directly on windowed kstream and tells how its
> > output
> > >> >> should be configured, if later we need to add this other type of
> > >> streams,
> > >> >> we can reuse same `trigger` API whereas other type of
> streams/tables
> > may
> > >> >> not have `aggregate`, `windowedby` api to make it consistent.
> > >> >>
> > >> >> Hao
> > >> >>
> > >> >>
> > >> >> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang <wangg...@gmail.com>
> > >> wrote:
> > >> >>
> > >> >> > Hello Hao,
> > >> >> >
> > >> >> > I'm preferring option 2 over the other options mainly because the
> > >> added
> > >> >> > config object could potentially be used in other operators as
> well
> > >> (not
> > >> >> > necessarily has to be a windowed operator and hence have to be
> > >> >> piggy-backed
> > >> >> > on `windowedBy`, and that's also why I suggested not naming it
> > >> >> > `WindowConfig` but just `EmitConfig`).
> > >> >> >
> > >> >> > As for Matthias' question, I think the difference between the
> > windowed
> > >> >> > aggregate operator and the stream-stream join operator is that,
> for
> > >> the
> > >> >> > latter we think emit-final should be the only right emitting
> policy
> > >> and
> > >> >> > hence we should not let users to configure it. If users configure
> > it
> > >> to
> > >> >> > e.g. emit eager they may get the old spurious emitting behavior
> > which
> > >> is
> > >> >> > violating the semantics.
> > >> >> >
> > >> >> > For option 2) itself, I have a few more thoughts:
> > >> >> >
> > >> >> > 1. Thinking about Matthias' suggestions, I'm also leaning a bit
> > >> >> > towards adding the new param in the overloaded `aggregate`, than
> > the
> > >> >> > overloaded `windowBy` function. The reason is that the emitting
> > logic
> > >> >> could
> > >> >> > be either window based or non-window based, in the long run.
> Though
> > >> for
> > >> >> > this KIP we could just add it in
> `XXXWindowedKStream.aggregate()`,
> > we
> > >> may
> > >> >> > want to extend to other non-windowed operators in the future.
> > >> >> > 2. To be consistent with other control class names, I feel maybe
> we
> > >> can
> > >> >> > name it "Emitted", not "EmitConfig".
> > >> >> > 3. Following the first comment, I think we can have the static
> > >> >> constructor
> > >> >> > names as "onWindowClose" and "onEachUpdate".
> > >> >> >
> > >> >> > The resulted code pattern would be like this:
> > >> >> >
> > >> >> >    stream
> > >> >> >      .groupBy(..)
> > >> >> >      .windowBy(TimeWindow..)
> > >> >> >      .count(Emitted.onWindowClose)
> > >> >> >
> > >> >> > WDYT?
> > >> >> >
> > >> >> >
> > >> >> > On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax <
> mj...@apache.org
> > >
> > >> >> wrote:
> > >> >> >
> > >> >> > > >> `allowedLateness` may not be a good name. What I have in
> mind
> > is
> > >> to
> > >> >> > use
> > >> >> > > >> this to control how frequently we try to emit final results.
> > >> Maybe
> > >> >> > it's
> > >> >> > > >> more flexible to be used as config in properties as we don't
> > >> need to
> > >> >> > > >> recompile DSL to change it.
> > >> >> > >
> > >> >> > > I see; making it a config seems better. Frankly, I am not even
> > sure
> > >> if
> > >> >> > > we need a config at all or if we can just hard code it? For the
> > >> >> > > stream-stream join left/outer join fix, there is only an
> internal
> > >> >> config
> > >> >> > > but no public config either.
> > >> >> > >
> > >> >> > > Option 1: Your proposal is?
> > >> >> > >
> > >> >> > >    stream
> > >> >> > >      .groupByKey()
> > >> >> > >      .windowBy(TimeWindow.ofSizeNoGrace(...))
> > >> >> > >      .configure(EmitConfig.emitFinal()
> > >> >> > >      .count()
> > >> >> > >
> > >> >> > > Does not change my argument that it seems to be misplace from
> an
> > API
> > >> >> > > flow POV.
> > >> >> > >
> > >> >> > > Option 1 seems to be the least desirable to me.
> > >> >> > >
> > >> >> > > For option 2 and 3, and not sure which one I like better. Might
> > be
> > >> good
> > >> >> > > if other could chime in, too. I think I slightly prefer option
> 2
> > >> over
> > >> >> > > option 3.
> > >> >> > >
> > >> >> > >
> > >> >> > > -Matthias
> > >> >> > >
> > >> >> > > On 3/15/22 5:33 PM, Hao Li wrote:
> > >> >> > > > Thanks for the feedback Matthias.
> > >> >> > > >
> > >> >> > > > `allowedLateness` may not be a good name. What I have in mind
> > is
> > >> to
> > >> >> use
> > >> >> > > > this to control how frequently we try to emit final results.
> > Maybe
> > >> >> it's
> > >> >> > > > more flexible to be used as config in properties as we don't
> > need
> > >> to
> > >> >> > > > recompile DSL to change it.
> > >> >> > > >
> > >> >> > > > For option 1, I intend to use `emitFinal` to configure how
> > >> >> > > > `TimeWindowedKStream` should be outputted to `KTable` after
> > >> >> > aggregation.
> > >> >> > > > But `emitFinal` is not an action to the `TimeWindowedKStream`
> > >> >> > interface.
> > >> >> > > > Maybe adding `configure(EmitConfig config)` makes more sense?
> > >> >> > > >
> > >> >> > > > For option 2, config can be created using
> > >> `WindowConfig.emitFinal()`
> > >> >> or
> > >> >> > > > `EmitConfig.emitFinal`
> > >> >> > > >
> > >> >> > > > For option 3, it will be something like `TimeWindows(...,
> > >> EmitConfig
> > >> >> > > > emitConfig)`.
> > >> >> > > >
> > >> >> > > > For putting `EmitConfig` in aggregation operator, I think it
> > >> doesn't
> > >> >> > > > control how we do aggregation but how we output to `KTable`.
> > >> That's
> > >> >> > why I
> > >> >> > > > feel option 1 makes more sense as it applies to
> > >> >> `TimeWindowedKStream`.
> > >> >> > > But
> > >> >> > > > I'm also OK with option 2.
> > >> >> > > >
> > >> >> > > > Hao
> > >> >> > > >
> > >> >> > > >
> > >> >> > > >
> > >> >> > > >
> > >> >> > > > On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax <
> > mj...@apache.org
> > >> >
> > >> >> > > wrote:
> > >> >> > > >
> > >> >> > > >> Thanks for the KIP.
> > >> >> > > >>
> > >> >> > > >> A general comment: it seem that we won't need any new
> > >> >> > `allowedLateness`
> > >> >> > > >> parameter because the grace-period is defined on the window
> > >> itself
> > >> >> > > already?
> > >> >> > > >>
> > >> >> > > >> (On the other hand, if I think about it once more, maybe the
> > >> >> > > >> `grace-period` is actually not a property of the window but
> a
> > >> >> property
> > >> >> > > >> of the aggregation operator? _thinking_)
> > >> >> > > >>
> > >> >> > > >>   From an API flow point of view, option 1 might not be
> > desirable
> > >> >> > IMHO:
> > >> >> > > >>
> > >> >> > > >>     stream
> > >> >> > > >>       .groupByKey()
> > >> >> > > >>       .windowBy(TimeWindow.ofSizeNoGrace(...))
> > >> >> > > >>       .emitFinal()
> > >> >> > > >>       .count()
> > >> >> > > >>
> > >> >> > > >> The call to `emitFinal(0` seems not to be on the right place
> > for
> > >> >> this
> > >> >> > > case?
> > >> >> > > >>
> > >> >> > > >>
> > >> >> > > >> Option 2 might work (I think we need to discuss a few
> details
> > of
> > >> the
> > >> >> > API
> > >> >> > > >> though):
> > >> >> > > >>
> > >> >> > > >>     stream
> > >> >> > > >>       .groupByKey()
> > >> >> > > >>       .windowBy(
> > >> >> > > >>         TimeWindow.ofSizeNoGrace(...),
> > >> >> > > >>         EmitConfig.emitFinal() -- just made this up; it's
> not
> > in
> > >> the
> > >> >> > KIP
> > >> >> > > >>       )
> > >> >> > > >>       .count()
> > >> >> > > >>
> > >> >> > > >> I made up the `WindowConfig.emitFinal()` call -- from the
> KIP
> > >> it's
> > >> >> > > >> unclear what API you have in mind? `EmitFinalConfig` has not
> > >> public
> > >> >> > > >> constructor not any builder method.
> > >> >> > > >>
> > >> >> > > >>
> > >> >> > > >> For option 3, I am not sure what you really have in mind.
> Can
> > you
> > >> >> > given
> > >> >> > > >> a concrete example (similar to above) how users would write
> > their
> > >> >> > code?
> > >> >> > > >>
> > >> >> > > >>
> > >> >> > > >>
> > >> >> > > >> Did you consider to actually pass in the `EmitConfig` into
> the
> > >> >> > > >> aggregation operator? In the end, it seems not to be
> property
> > of
> > >> the
> > >> >> > > >> window definition or windowing step, but a property of the
> > actual
> > >> >> > > operator:
> > >> >> > > >>
> > >> >> > > >>     stream
> > >> >> > > >>       .groupByKey()
> > >> >> > > >>       .windowBy(
> > >> >> > > >>         TimeWindow.ofSizeNoGrace(...)
> > >> >> > > >>       )
> > >> >> > > >>       .count(EmitConfig.emitFinal())
> > >> >> > > >>
> > >> >> > > >> The API surface area that need to be updated might be larger
> > for
> > >> >> this
> > >> >> > > >> case though...
> > >> >> > > >>
> > >> >> > > >>
> > >> >> > > >> -Matthias
> > >> >> > > >>
> > >> >> > > >>
> > >> >> > > >>
> > >> >> > > >> On 3/14/22 9:21 PM, Hao Li wrote:
> > >> >> > > >>> Thanks Guozhang!
> > >> >> > > >>>
> > >> >> > > >>> 1. I agree `EmitConfig` is better than `WindowConfig` and
> > >> option 2
> > >> >> > > >> modifies
> > >> >> > > >>> less places. What do you think of option 1 which doesn't
> > change
> > >> the
> > >> >> > > >> current
> > >> >> > > >>> `windowedBy` api but configures `EmitConfig` separately.
> The
> > >> >> benefit
> > >> >> > of
> > >> >> > > >>> option 1 is if we need to configure something else later,
> we
> > >> don't
> > >> >> > need
> > >> >> > > >> to
> > >> >> > > >>> pile them on `windowedBy` but can add separate APIs.
> > >> >> > > >>> 2. I added it to `Stores` mainly to conform to
> > >> >> > > >>>
> > >> >> > > >>
> > >> >> > >
> > >> >> >
> > >> >>
> > >>
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231
> > >> >> > > >> .
> > >> >> > > >>> But We can also create an internal API to do that without
> > >> modifying
> > >> >> > > >>> `Stores`.
> > >> >> > > >>>
> > >> >> > > >>> Hao
> > >> >> > > >>>
> > >> >> > > >>> On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang <
> > >> wangg...@gmail.com>
> > >> >> > > >> wrote:
> > >> >> > > >>>
> > >> >> > > >>>> Hello Hao,
> > >> >> > > >>>>
> > >> >> > > >>>> Thanks for the proposal, I have some preference among the
> > >> options
> > >> >> > here
> > >> >> > > >> so I
> > >> >> > > >>>> will copy them here:
> > >> >> > > >>>>
> > >> >> > > >>>> I'm now thinking if it's better to not add this new config
> > on
> > >> each
> > >> >> > of
> > >> >> > > >> the
> > >> >> > > >>>> Window interfaces, but instead add that at the
> > >> >> > > KGroupedStream#windowedBy
> > >> >> > > >>>> function. Also instead of adding just a boolean flag,
> maybe
> > we
> > >> can
> > >> >> > > add a
> > >> >> > > >>>> Configured class like Grouped, Suppressed, etc, e.g. let's
> > call
> > >> >> it a
> > >> >> > > >>>> Emitted which for now would just have a single construct
> as
> > >> >> > > >>>> Emitted.atWindowClose whose semantics is the same as
> > emitFinal
> > >> ==
> > >> >> > > true.
> > >> >> > > >> I
> > >> >> > > >>>> think the benefits are:
> > >> >> > > >>>>
> > >> >> > > >>>> 1) you do not need to modify multiple Window classes, but
> > just
> > >> >> > > overload
> > >> >> > > >> one
> > >> >> > > >>>> windowedBy function with a second param. This is less of a
> > >> scope
> > >> >> for
> > >> >> > > >> now,
> > >> >> > > >>>> and also more extensible for any future changes.
> > >> >> > > >>>>
> > >> >> > > >>>> 2) With a config interface, we maintain its extensibility
> as
> > >> well
> > >> >> as
> > >> >> > > >> being
> > >> >> > > >>>> able to reuse this Emitted interface for other operators
> if
> > we
> > >> >> > wanted
> > >> >> > > to
> > >> >> > > >>>> expand to.
> > >> >> > > >>>>
> > >> >> > > >>>> ----------------------------
> > >> >> > > >>>>
> > >> >> > > >>>> So in general I'm leaning towards option 2). For that,
> some
> > >> more
> > >> >> > > >> detailed
> > >> >> > > >>>> comments:
> > >> >> > > >>>>
> > >> >> > > >>>> 1) If we want to reuse that config object for other
> > non-window
> > >> >> > > stateful
> > >> >> > > >>>> operations, I think naming it as `EmitConfig` is probably
> > >> better
> > >> >> > than
> > >> >> > > >>>> `WindowConfig`.
> > >> >> > > >>>> 2) I saw your PR (
> > https://github.com/apache/kafka/pull/11892)
> > >> >> that
> > >> >> > > you
> > >> >> > > >> are
> > >> >> > > >>>> also proposing to add new stores into the public factory
> > >> Stores,
> > >> >> but
> > >> >> > > >> it's
> > >> >> > > >>>> not included in the KIP. Is that intentional? Personally I
> > >> think
> > >> >> > that
> > >> >> > > >>>> although we may eventually want to add a new store type to
> > the
> > >> >> > public
> > >> >> > > >> APIs,
> > >> >> > > >>>> for this KIP maybe we do not have to add them but can
> delay
> > for
> > >> >> > later
> > >> >> > > >> after
> > >> >> > > >>>> we've learned the best way to layout. LMK what do you
> think?
> > >> >> > > >>>>
> > >> >> > > >>>>
> > >> >> > > >>>>
> > >> >> > > >>>> Guozhang
> > >> >> > > >>>>
> > >> >> > > >>>>
> > >> >> > > >>>>
> > >> >> > > >>>> On Fri, Mar 11, 2022 at 2:13 PM Hao Li
> > >> <h...@confluent.io.invalid>
> > >> >> > > >> wrote:
> > >> >> > > >>>>
> > >> >> > > >>>>> Hi Dev team,
> > >> >> > > >>>>>
> > >> >> > > >>>>> I'd like to start a discussion thread on Kafka Streams
> > >> KIP-825:
> > >> >> > > >>>>>
> > >> >> > > >>>>>
> > >> >> > > >>>>
> > >> >> > > >>
> > >> >> > >
> > >> >> >
> > >> >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced
> > >> >> > > >>>>>
> > >> >> > > >>>>> This KIP is aimed to add new APIs to support outputting
> > final
> > >> >> > > >> aggregated
> > >> >> > > >>>>> results for windowed aggregations. I listed several
> options
> > >> there
> > >> >> > and
> > >> >> > > >> I'm
> > >> >> > > >>>>> looking forward to your feedback.
> > >> >> > > >>>>>
> > >> >> > > >>>>> Thanks,
> > >> >> > > >>>>> Hao
> > >> >> > > >>>>>
> > >> >> > > >>>>
> > >> >> > > >>>>
> > >> >> > > >>>> --
> > >> >> > > >>>> -- Guozhang
> > >> >> > > >>>>
> > >> >> > > >>>
> > >> >> > > >>>
> > >> >> > > >>
> > >> >> > > >
> > >> >> > > >
> > >> >> > >
> > >> >> >
> > >> >> >
> > >> >> > --
> > >> >> > -- Guozhang
> > >> >> >
> > >> >>
> > >> >>
> > >> >> --
> > >> >> Thanks,
> > >> >> Hao
> > >> >>
> > >> >
> > >> >
> > >> > --
> > >> > -- Guozhang
> > >>
> > >
> > >
> > > --
> > > -- Guozhang
> >
>
>
> --
> -- Guozhang
>


-- 
Thanks,
Hao

Reply via email to