Hi Bruno, Guozhang, I think `Emitted.onWindowClose` is inspired by `Suppressed.untilWindowClose`. Given `Suppressed.untilWindowClose` is used inside `suppress()`, `Suppressed` sounds good. However, `trigger()` is a builder style setting instead of an action, `Emitted` may be a bit off.
How about using `emitTrigger()` as the function name? Benefits are: 1. not to be confused with Flink's `trigger` as John mentioned. 2. make it sound like a setting instead of an action. `trigger()` sounds like we are triggering some action. For the config type name. How about `EmitConfig` or `EmitTriggerConfig` to make it clear it's a config for emit trigger. Hao On Tue, Mar 22, 2022 at 9:43 AM Bruno Cadonna <cado...@apache.org> wrote: > Hi Guozhang, > > There is no semantic difference. It is a cosmetic difference. > Conceptually, I relate `Emitted` with the aggregation and not with > `trigger()` in the API flow, because the aggregation emits the result > not `trigger()`. Therefore, I proposed to not use `Emitted` as the name > of the config object passed to `trigger()`. > > > Best, > Bruno > > On 22.03.22 17:24, Guozhang Wang wrote: > > Hi Bruno, > > > > Could you elaborate a bit more here, what's the semantic difference > between > > "the aggregation is triggered on window close and all aggregation results > > are emitted." for trigger(TriggerParameters.onWindowClose()), and "the > > aggregation is configured to only emit final results." for > > trigger(Emitted.onWindowClose())? > > > > On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna <cado...@apache.org> > wrote: > > > >> Hi Hao, > >> > >> Thank you for the KIP! > >> > >> Regarding option 1, I would not use `Emitted.onWindowClose()` since that > >> does not seem compatible with the proposed flow. Conceptually, now the > >> flow states that the aggregation is triggered on window close and all > >> aggregation results are emitted. `Emitted` suggests that the aggregation > >> is configured to only emit final results. > >> > >> Thus, I propose the following: > >> > >> stream > >> .groupBy(..) > >> .windowedBy(..) > >> .trigger(TriggerParameters.onWindowClose()) > >> .aggregate(..) //result in a KTable<Windowed<..>> > >> .mapValues(..) > >> > >> An alternative to `trigger()` could be `schedule()`, but I do not really > >> like it. > >> > >> One thing I noticed with option 1 is that all other methods in the > >> example above are operations on data. `groupBy()` groups, `windowedBy()` > >> partitions, `aggregate()` computes the aggregate, `mapValues()` maps > >> values, even `suppress()` suppresses intermediate results. But what does > >> `trigger()` do? `trigger()` seems a config lost among operations. > >> > >> However, if we do not want to restrict ourselves to only use methods > >> when we want to specify operations on data, I have the following > proposal: > >> > >> stream > >> .groupBy(..) > >> .windowedBy(..) > >> .onWindowClose() > >> .aggregate(..) //result in a KTable<Windowed<..>> > >> .mapValues(..) > >> > >> Best, > >> Bruno > >> > >> P.S.: Why is it `windowedBy()` and not `windowBy()`? All other > >> operations also use present tense. > >> > >> On 22.03.22 06:36, Hao Li wrote: > >>> Hi John, > >>> > >>> Yes. For naming, `trigger` is similar to Flink's trigger, but it has a > >>> different meaning in our case. `emit` sounds like an action to emit? > How > >>> about `emitTrigger`? I'm open to suggestions for the naming. > >>> > >>> For deprecating `Suppressed.untilWindowClose`, I agree with Guozhang we > >> can > >>> deprecate `Suppressed` config as a whole later. Or we can deprecate > >>> `Suppressed.untilWindowClose` in later KIP after implementation of emit > >>> final is done. > >>> > >>> BTW, isn't > >>> > >>> stream > >>> .groupBy(..) > >>> .windowBy(..) > >>> .aggregate(..) //result in a KTable<Windowed<..>> > >>> .mapValues(..) > >>> .suppress(Suppressed.untilWindowClose) // since we can trace back > to > >>> parent node, to find a window definition > >>> > >>> same as > >>> > >>> stream > >>> .groupBy(..) > >>> .windowBy(..) > >>> .trigger(Emitted.onWindowClose) > >>> .aggregate(..) //result in a KTable<Windowed<..>> > >>> .mapValues(..) > >>> ? > >>> > >>> Hao > >>> > >>> > >>> On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang <wangg...@gmail.com> > >> wrote: > >>> > >>>> I think the following case is only doable via `suppress`: > >>>> > >>>> stream > >>>> .groupBy(..) > >>>> .windowBy(..) > >>>> .aggregate(..) //result in a KTable<Windowed<..>> > >>>> .mapValues(..) > >>>> .suppress(Suppressed.untilWindowClose) // since we can trace back > to > >>>> parent node, to find a window definition > >>>> > >>>> > >>>> Guozhang > >>>> > >>>> > >>>> On Mon, Mar 21, 2022 at 6:36 PM John Roesler <vvcep...@apache.org> > >> wrote: > >>>> > >>>>> Thanks, Guozhang! > >>>>> > >>>>> To clarify, I was asking specifically about deprecating just the > method > >>>>> ‘untilWindowClose’. I might not be thinking clearly about it, though. > >>>> What > >>>>> does untilWindowClose do that this KIP doesn’t cover? > >>>>> > >>>>> Thanks, > >>>>> John > >>>>> > >>>>> On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote: > >>>>>> Just my 2c: Suppressed is in `suppress` whose application scope is > >> much > >>>>>> larger and hence more flexible. I.e. it can be used anywhere for a > >>>>> `KTable` > >>>>>> (but internally we would check whether certain emit policies like > >>>>>> `untilWindowClose` is valid or not), whereas `trigger` as for now is > >>>> only > >>>>>> applicable in XXWindowedKStream. So I think it would not be > completely > >>>>>> replacing Suppressed.untilWindowClose. > >>>>>> > >>>>>> In the future, personally I'd still want to keep one control object > >>>> still > >>>>>> for all emit policies, and maybe if we have extended Emitted for > other > >>>>>> emitting policies covered by Suppressed today, we can discuss if we > >>>> could > >>>>>> have `KTable.suppress(Emitted..)` replacing > >>>>> `KTable.suppress(Suppressed..)` > >>>>>> as a whole, but for this KIP I think it's too early. > >>>>>> > >>>>>> > >>>>>> Guozhang > >>>>>> > >>>>>> > >>>>>> On Mon, Mar 21, 2022 at 6:18 PM John Roesler <vvcep...@apache.org> > >>>>> wrote: > >>>>>> > >>>>>>> Hi all, > >>>>>>> > >>>>>>> Thanks for the Kip, Hao! > >>>>>>> > >>>>>>> For what it’s worth, I’m also in favor of your latest framing of > the > >>>>> API, > >>>>>>> > >>>>>>> I think the name is fine. I assume it’s inspired by Flink? It’s not > >>>>>>> identical to the concept of a trigger in Flink, which specifies > when > >>>> to > >>>>>>> evaluate the window, which might be confusing to some people who > have > >>>>> deep > >>>>>>> experience with Flink. Then again, it seems close enough that it > >>>> should > >>>>> be > >>>>>>> clear to casual Flink users. For people with no other stream > >>>> processing > >>>>>>> experience, it might seem a bit esoteric compared to something > >>>>>>> self-documenting like ‘emit()’, but the docs should make it clear. > >>>>>>> > >>>>>>> One small question: it seems like this proposal is identical to > >>>>>>> Suppressed.untilWindowClose, and the KIP states that this API is > >>>>> superior. > >>>>>>> In that case, should we deprecate Suppressed.untilWindowClose? > >>>>>>> > >>>>>>> Thanks, > >>>>>>> John > >>>>>>> > >>>>>>> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote: > >>>>>>>> Hi Hao, > >>>>>>>> > >>>>>>>> For 2), I think it's a good idea in general to use a separate > >>>>> function on > >>>>>>>> the Time/SessionWindowedKStream itself, to achieve the same effect > >>>>> that, > >>>>>>>> for now, the emitting control is only for windowed aggregations as > >>>> in > >>>>>>> this > >>>>>>>> KIP, than overloading existing functions. We can discuss further > >>>> about > >>>>>>> the > >>>>>>>> actual function names, whether others like the name `trigger` or > >>>> not. > >>>>> As > >>>>>>>> for myself I feel `trigger` is a good one but I'd like to see if > >>>>> others > >>>>>>>> have opinions as well. > >>>>>>>> > >>>>>>>> > >>>>>>>> Guozhang > >>>>>>>> > >>>>>>>> On Mon, Mar 21, 2022 at 5:18 PM Hao Li <h...@confluent.io.invalid> > >>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi Guozhang, > >>>>>>>>> > >>>>>>>>> Thanks for the feedback. > >>>>>>>>> > >>>>>>>>> 1. I agree to have an `Emitted` control class and two static > >>>>>>> constructors > >>>>>>>>> named `onWindowClose` and `onEachUpdate`. > >>>>>>>>> > >>>>>>>>> 2. For the API function changes, I'm thinking of adding a new > >>>>> function > >>>>>>>>> called `trigger` to `TimeWindowedKStream` and > >>>>> `SessionWindowedKStream`. > >>>>>>> It > >>>>>>>>> takes `Emitted` config and returns the same stream. Example: > >>>>>>>>> > >>>>>>>>> stream > >>>>>>>>> .groupBy(...) > >>>>>>>>> .windowedBy(...) > >>>>>>>>> .trigger(Emitted.onWindowClose). // N > >>>>>>>>> .count() > >>>>>>>>> > >>>>>>>>> The benefits are: > >>>>>>>>> 1. It's simple and avoids creating overloading of existing > >>>>> functions > >>>>>>> like > >>>>>>>>> `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add > >>>> it > >>>>> to > >>>>>>>>> `aggregate` functions, we need to add it to all existing `count`, > >>>>>>>>> `aggregate` overloading functions which is a lot. > >>>>>>>>> 2. It operates directly on windowed kstream and tells how its > >>>>> output > >>>>>>>>> should be configured, if later we need to add this other type of > >>>>>>> streams, > >>>>>>>>> we can reuse same `trigger` API whereas other type of > >>>> streams/tables > >>>>> may > >>>>>>>>> not have `aggregate`, `windowedby` api to make it consistent. > >>>>>>>>> > >>>>>>>>> Hao > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang < > wangg...@gmail.com> > >>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hello Hao, > >>>>>>>>>> > >>>>>>>>>> I'm preferring option 2 over the other options mainly because > the > >>>>>>> added > >>>>>>>>>> config object could potentially be used in other operators as > >>>> well > >>>>>>> (not > >>>>>>>>>> necessarily has to be a windowed operator and hence have to be > >>>>>>>>> piggy-backed > >>>>>>>>>> on `windowedBy`, and that's also why I suggested not naming it > >>>>>>>>>> `WindowConfig` but just `EmitConfig`). > >>>>>>>>>> > >>>>>>>>>> As for Matthias' question, I think the difference between the > >>>>> windowed > >>>>>>>>>> aggregate operator and the stream-stream join operator is that, > >>>> for > >>>>>>> the > >>>>>>>>>> latter we think emit-final should be the only right emitting > >>>> policy > >>>>>>> and > >>>>>>>>>> hence we should not let users to configure it. If users > configure > >>>>> it > >>>>>>> to > >>>>>>>>>> e.g. emit eager they may get the old spurious emitting behavior > >>>>> which > >>>>>>> is > >>>>>>>>>> violating the semantics. > >>>>>>>>>> > >>>>>>>>>> For option 2) itself, I have a few more thoughts: > >>>>>>>>>> > >>>>>>>>>> 1. Thinking about Matthias' suggestions, I'm also leaning a bit > >>>>>>>>>> towards adding the new param in the overloaded `aggregate`, than > >>>>> the > >>>>>>>>>> overloaded `windowBy` function. The reason is that the emitting > >>>>> logic > >>>>>>>>> could > >>>>>>>>>> be either window based or non-window based, in the long run. > >>>> Though > >>>>>>> for > >>>>>>>>>> this KIP we could just add it in > >>>> `XXXWindowedKStream.aggregate()`, > >>>>> we > >>>>>>> may > >>>>>>>>>> want to extend to other non-windowed operators in the future. > >>>>>>>>>> 2. To be consistent with other control class names, I feel maybe > >>>> we > >>>>>>> can > >>>>>>>>>> name it "Emitted", not "EmitConfig". > >>>>>>>>>> 3. Following the first comment, I think we can have the static > >>>>>>>>> constructor > >>>>>>>>>> names as "onWindowClose" and "onEachUpdate". > >>>>>>>>>> > >>>>>>>>>> The resulted code pattern would be like this: > >>>>>>>>>> > >>>>>>>>>> stream > >>>>>>>>>> .groupBy(..) > >>>>>>>>>> .windowBy(TimeWindow..) > >>>>>>>>>> .count(Emitted.onWindowClose) > >>>>>>>>>> > >>>>>>>>>> WDYT? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax < > >>>> mj...@apache.org > >>>>>> > >>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>>>> `allowedLateness` may not be a good name. What I have in > >>>> mind > >>>>> is > >>>>>>> to > >>>>>>>>>> use > >>>>>>>>>>>>> this to control how frequently we try to emit final results. > >>>>>>> Maybe > >>>>>>>>>> it's > >>>>>>>>>>>>> more flexible to be used as config in properties as we don't > >>>>>>> need to > >>>>>>>>>>>>> recompile DSL to change it. > >>>>>>>>>>> > >>>>>>>>>>> I see; making it a config seems better. Frankly, I am not even > >>>>> sure > >>>>>>> if > >>>>>>>>>>> we need a config at all or if we can just hard code it? For the > >>>>>>>>>>> stream-stream join left/outer join fix, there is only an > >>>> internal > >>>>>>>>> config > >>>>>>>>>>> but no public config either. > >>>>>>>>>>> > >>>>>>>>>>> Option 1: Your proposal is? > >>>>>>>>>>> > >>>>>>>>>>> stream > >>>>>>>>>>> .groupByKey() > >>>>>>>>>>> .windowBy(TimeWindow.ofSizeNoGrace(...)) > >>>>>>>>>>> .configure(EmitConfig.emitFinal() > >>>>>>>>>>> .count() > >>>>>>>>>>> > >>>>>>>>>>> Does not change my argument that it seems to be misplace from > >>>> an > >>>>> API > >>>>>>>>>>> flow POV. > >>>>>>>>>>> > >>>>>>>>>>> Option 1 seems to be the least desirable to me. > >>>>>>>>>>> > >>>>>>>>>>> For option 2 and 3, and not sure which one I like better. Might > >>>>> be > >>>>>>> good > >>>>>>>>>>> if other could chime in, too. I think I slightly prefer option > >>>> 2 > >>>>>>> over > >>>>>>>>>>> option 3. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -Matthias > >>>>>>>>>>> > >>>>>>>>>>> On 3/15/22 5:33 PM, Hao Li wrote: > >>>>>>>>>>>> Thanks for the feedback Matthias. > >>>>>>>>>>>> > >>>>>>>>>>>> `allowedLateness` may not be a good name. What I have in mind > >>>>> is > >>>>>>> to > >>>>>>>>> use > >>>>>>>>>>>> this to control how frequently we try to emit final results. > >>>>> Maybe > >>>>>>>>> it's > >>>>>>>>>>>> more flexible to be used as config in properties as we don't > >>>>> need > >>>>>>> to > >>>>>>>>>>>> recompile DSL to change it. > >>>>>>>>>>>> > >>>>>>>>>>>> For option 1, I intend to use `emitFinal` to configure how > >>>>>>>>>>>> `TimeWindowedKStream` should be outputted to `KTable` after > >>>>>>>>>> aggregation. > >>>>>>>>>>>> But `emitFinal` is not an action to the `TimeWindowedKStream` > >>>>>>>>>> interface. > >>>>>>>>>>>> Maybe adding `configure(EmitConfig config)` makes more sense? > >>>>>>>>>>>> > >>>>>>>>>>>> For option 2, config can be created using > >>>>>>> `WindowConfig.emitFinal()` > >>>>>>>>> or > >>>>>>>>>>>> `EmitConfig.emitFinal` > >>>>>>>>>>>> > >>>>>>>>>>>> For option 3, it will be something like `TimeWindows(..., > >>>>>>> EmitConfig > >>>>>>>>>>>> emitConfig)`. > >>>>>>>>>>>> > >>>>>>>>>>>> For putting `EmitConfig` in aggregation operator, I think it > >>>>>>> doesn't > >>>>>>>>>>>> control how we do aggregation but how we output to `KTable`. > >>>>>>> That's > >>>>>>>>>> why I > >>>>>>>>>>>> feel option 1 makes more sense as it applies to > >>>>>>>>> `TimeWindowedKStream`. > >>>>>>>>>>> But > >>>>>>>>>>>> I'm also OK with option 2. > >>>>>>>>>>>> > >>>>>>>>>>>> Hao > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax < > >>>>> mj...@apache.org > >>>>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for the KIP. > >>>>>>>>>>>>> > >>>>>>>>>>>>> A general comment: it seem that we won't need any new > >>>>>>>>>> `allowedLateness` > >>>>>>>>>>>>> parameter because the grace-period is defined on the window > >>>>>>> itself > >>>>>>>>>>> already? > >>>>>>>>>>>>> > >>>>>>>>>>>>> (On the other hand, if I think about it once more, maybe the > >>>>>>>>>>>>> `grace-period` is actually not a property of the window but > >>>> a > >>>>>>>>> property > >>>>>>>>>>>>> of the aggregation operator? _thinking_) > >>>>>>>>>>>>> > >>>>>>>>>>>>> From an API flow point of view, option 1 might not be > >>>>> desirable > >>>>>>>>>> IMHO: > >>>>>>>>>>>>> > >>>>>>>>>>>>> stream > >>>>>>>>>>>>> .groupByKey() > >>>>>>>>>>>>> .windowBy(TimeWindow.ofSizeNoGrace(...)) > >>>>>>>>>>>>> .emitFinal() > >>>>>>>>>>>>> .count() > >>>>>>>>>>>>> > >>>>>>>>>>>>> The call to `emitFinal(0` seems not to be on the right place > >>>>> for > >>>>>>>>> this > >>>>>>>>>>> case? > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Option 2 might work (I think we need to discuss a few > >>>> details > >>>>> of > >>>>>>> the > >>>>>>>>>> API > >>>>>>>>>>>>> though): > >>>>>>>>>>>>> > >>>>>>>>>>>>> stream > >>>>>>>>>>>>> .groupByKey() > >>>>>>>>>>>>> .windowBy( > >>>>>>>>>>>>> TimeWindow.ofSizeNoGrace(...), > >>>>>>>>>>>>> EmitConfig.emitFinal() -- just made this up; it's > >>>> not > >>>>> in > >>>>>>> the > >>>>>>>>>> KIP > >>>>>>>>>>>>> ) > >>>>>>>>>>>>> .count() > >>>>>>>>>>>>> > >>>>>>>>>>>>> I made up the `WindowConfig.emitFinal()` call -- from the > >>>> KIP > >>>>>>> it's > >>>>>>>>>>>>> unclear what API you have in mind? `EmitFinalConfig` has not > >>>>>>> public > >>>>>>>>>>>>> constructor not any builder method. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> For option 3, I am not sure what you really have in mind. > >>>> Can > >>>>> you > >>>>>>>>>> given > >>>>>>>>>>>>> a concrete example (similar to above) how users would write > >>>>> their > >>>>>>>>>> code? > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Did you consider to actually pass in the `EmitConfig` into > >>>> the > >>>>>>>>>>>>> aggregation operator? In the end, it seems not to be > >>>> property > >>>>> of > >>>>>>> the > >>>>>>>>>>>>> window definition or windowing step, but a property of the > >>>>> actual > >>>>>>>>>>> operator: > >>>>>>>>>>>>> > >>>>>>>>>>>>> stream > >>>>>>>>>>>>> .groupByKey() > >>>>>>>>>>>>> .windowBy( > >>>>>>>>>>>>> TimeWindow.ofSizeNoGrace(...) > >>>>>>>>>>>>> ) > >>>>>>>>>>>>> .count(EmitConfig.emitFinal()) > >>>>>>>>>>>>> > >>>>>>>>>>>>> The API surface area that need to be updated might be larger > >>>>> for > >>>>>>>>> this > >>>>>>>>>>>>> case though... > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On 3/14/22 9:21 PM, Hao Li wrote: > >>>>>>>>>>>>>> Thanks Guozhang! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1. I agree `EmitConfig` is better than `WindowConfig` and > >>>>>>> option 2 > >>>>>>>>>>>>> modifies > >>>>>>>>>>>>>> less places. What do you think of option 1 which doesn't > >>>>> change > >>>>>>> the > >>>>>>>>>>>>> current > >>>>>>>>>>>>>> `windowedBy` api but configures `EmitConfig` separately. > >>>> The > >>>>>>>>> benefit > >>>>>>>>>> of > >>>>>>>>>>>>>> option 1 is if we need to configure something else later, > >>>> we > >>>>>>> don't > >>>>>>>>>> need > >>>>>>>>>>>>> to > >>>>>>>>>>>>>> pile them on `windowedBy` but can add separate APIs. > >>>>>>>>>>>>>> 2. I added it to `Stores` mainly to conform to > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >> > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231 > >>>>>>>>>>>>> . > >>>>>>>>>>>>>> But We can also create an internal API to do that without > >>>>>>> modifying > >>>>>>>>>>>>>> `Stores`. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Hao > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang < > >>>>>>> wangg...@gmail.com> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hello Hao, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks for the proposal, I have some preference among the > >>>>>>> options > >>>>>>>>>> here > >>>>>>>>>>>>> so I > >>>>>>>>>>>>>>> will copy them here: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I'm now thinking if it's better to not add this new config > >>>>> on > >>>>>>> each > >>>>>>>>>> of > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>> Window interfaces, but instead add that at the > >>>>>>>>>>> KGroupedStream#windowedBy > >>>>>>>>>>>>>>> function. Also instead of adding just a boolean flag, > >>>> maybe > >>>>> we > >>>>>>> can > >>>>>>>>>>> add a > >>>>>>>>>>>>>>> Configured class like Grouped, Suppressed, etc, e.g. let's > >>>>> call > >>>>>>>>> it a > >>>>>>>>>>>>>>> Emitted which for now would just have a single construct > >>>> as > >>>>>>>>>>>>>>> Emitted.atWindowClose whose semantics is the same as > >>>>> emitFinal > >>>>>>> == > >>>>>>>>>>> true. > >>>>>>>>>>>>> I > >>>>>>>>>>>>>>> think the benefits are: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 1) you do not need to modify multiple Window classes, but > >>>>> just > >>>>>>>>>>> overload > >>>>>>>>>>>>> one > >>>>>>>>>>>>>>> windowedBy function with a second param. This is less of a > >>>>>>> scope > >>>>>>>>> for > >>>>>>>>>>>>> now, > >>>>>>>>>>>>>>> and also more extensible for any future changes. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 2) With a config interface, we maintain its extensibility > >>>> as > >>>>>>> well > >>>>>>>>> as > >>>>>>>>>>>>> being > >>>>>>>>>>>>>>> able to reuse this Emitted interface for other operators > >>>> if > >>>>> we > >>>>>>>>>> wanted > >>>>>>>>>>> to > >>>>>>>>>>>>>>> expand to. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> ---------------------------- > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> So in general I'm leaning towards option 2). For that, > >>>> some > >>>>>>> more > >>>>>>>>>>>>> detailed > >>>>>>>>>>>>>>> comments: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 1) If we want to reuse that config object for other > >>>>> non-window > >>>>>>>>>>> stateful > >>>>>>>>>>>>>>> operations, I think naming it as `EmitConfig` is probably > >>>>>>> better > >>>>>>>>>> than > >>>>>>>>>>>>>>> `WindowConfig`. > >>>>>>>>>>>>>>> 2) I saw your PR ( > >>>>> https://github.com/apache/kafka/pull/11892) > >>>>>>>>> that > >>>>>>>>>>> you > >>>>>>>>>>>>> are > >>>>>>>>>>>>>>> also proposing to add new stores into the public factory > >>>>>>> Stores, > >>>>>>>>> but > >>>>>>>>>>>>> it's > >>>>>>>>>>>>>>> not included in the KIP. Is that intentional? Personally I > >>>>>>> think > >>>>>>>>>> that > >>>>>>>>>>>>>>> although we may eventually want to add a new store type to > >>>>> the > >>>>>>>>>> public > >>>>>>>>>>>>> APIs, > >>>>>>>>>>>>>>> for this KIP maybe we do not have to add them but can > >>>> delay > >>>>> for > >>>>>>>>>> later > >>>>>>>>>>>>> after > >>>>>>>>>>>>>>> we've learned the best way to layout. LMK what do you > >>>> think? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Fri, Mar 11, 2022 at 2:13 PM Hao Li > >>>>>>> <h...@confluent.io.invalid> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Dev team, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I'd like to start a discussion thread on Kafka Streams > >>>>>>> KIP-825: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> This KIP is aimed to add new APIs to support outputting > >>>>> final > >>>>>>>>>>>>> aggregated > >>>>>>>>>>>>>>>> results for windowed aggregations. I listed several > >>>> options > >>>>>>> there > >>>>>>>>>> and > >>>>>>>>>>>>> I'm > >>>>>>>>>>>>>>>> looking forward to your feedback. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>> Hao > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -- > >>>>>>>>>> -- Guozhang > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> Thanks, > >>>>>>>>> Hao > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> -- > >>>>>>>> -- Guozhang > >>>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> -- Guozhang > >>>>> > >>>> > >>>> > >>>> -- > >>>> -- Guozhang > >>>> > >>> > >>> > >> > > > > > -- Thanks, Hao