`windowedStream.onWindowClose()` was the original option 1 (`windowedStream.emitFinal()`) but was rejected because we could add more emit types and this will result in adding more functions. I still prefer the "windowedStream.someFunc(Controlled.onWindowClose)" model since it's flexible and clear that it's configuring the emit policy. Let me summarize all the naming options we have and compare:
*API function name:* *1. `windowedStream.trigger()`* Pros: i. Simple ii. Similar to Flink's trigger function (is this a con actually?) Cons: i. `trigger()` can be confused with Flink trigger (raised by John) ii. `trigger()` feels like an operation instead of a configure function (raised by Bruno)? *2. `windowedStream.emitTrigger()`* Pros: i. Avoid confusion from Flink's trigger API ii. `emitTrigger` feels like configuring the trigger because "trigger" here is a noun instead of verbose in `trigger()` Cons: i: Verbose? ii: Not consistent with `Suppressed.untilWindowClose`? *Config class/object name:* 1. *`Emitted.onWindowClose()`* and *`Emitted.onEachUpdate()`* Cons: i. Doesn't go along with `trigger` (raised by Bruno) 2. *`Triggered.onWindowClose()`* and *`Triggered.onEachUpdate()`* 3. *`EmitTrigger.onWindowClose()`* and *`EmitTrigger.onEachUpdate()`* 4. *`(Emit|Trigger)(Config|Policy).onWindowClose()`* and *`(Emit|Trigger)(Config|Policy).onEachUpdate()`* This is a combination of different names like: `EmitConfig`, `EmitPolicy`, `TriggerConfig` and `TriggerPolicy`... If we are settled with option 1), we can add new options to these names and comment on their Pros and Cons. Hao On Tue, Mar 22, 2022 at 10:48 AM Guozhang Wang <wangg...@gmail.com> wrote: > I see what you mean now, and I think it's a fair point that composing > `trigger` and `emitted` seems awkward. > > Re: data process operator v.s. control operator, I shared your concern as > well, and here's my train of thoughts: Having only data process operators > was my primary motivation for how we add the suppress operator --- it > indeed "suppresses" data. But as a hind-sight it's disadvantage is that, > for example in Suppressed.onWindowClose() should be only related to an > earlier windowedBy operator which is possibly very far from it in the > resulting DSL code. It's not only a bit awkward for users to write such > code, but also in such cases the DSL builder needs to maintain and > propagate this information to the suppress operator further down. So we are > now thinking about "putting the control object as close as to where the > related processor really happens". And in that world my original > preference was somewhere in option 2), i.e. just put the control as a param > of the related "windowedBy" operator, but the trade-off is we keep adding > overloaded functions to these operators. So after some back and forth > thoughts I'm learning towards relaxing our principles to only have > processing operators but no flow-control operators. That being said, if you > have any ideas that we can have both world's benefits I'm all ears. > > Re: using a direct function like "windowedStream.onWindowClose()" v.s. > "windowedStream.someFunc(Controlled.onWindowClose)", again my motivation > for the latter is for extensibility without adding more functions in the > future. If people feel this is not worthy we can do the first option as > well. If we just feel the `trigger` and `emitted` does not feel composible > together, maybe we can consider something like > `windowedStream.trigger(Triggered.onWindowClose())"? > > Re: windowedBy v.s. windowBy, yeah I do not really have a good reason why > we should use past term as well :P But if it's not bothering people much > I'd say we just keep it than deprecate/rename new APIs. > > > On Tue, Mar 22, 2022 at 9:42 AM Bruno Cadonna <cado...@apache.org> wrote: > > > Hi Guozhang, > > > > There is no semantic difference. It is a cosmetic difference. > > Conceptually, I relate `Emitted` with the aggregation and not with > > `trigger()` in the API flow, because the aggregation emits the result > > not `trigger()`. Therefore, I proposed to not use `Emitted` as the name > > of the config object passed to `trigger()`. > > > > > > Best, > > Bruno > > > > On 22.03.22 17:24, Guozhang Wang wrote: > > > Hi Bruno, > > > > > > Could you elaborate a bit more here, what's the semantic difference > > between > > > "the aggregation is triggered on window close and all aggregation > results > > > are emitted." for trigger(TriggerParameters.onWindowClose()), and "the > > > aggregation is configured to only emit final results." for > > > trigger(Emitted.onWindowClose())? > > > > > > On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna <cado...@apache.org> > > wrote: > > > > > >> Hi Hao, > > >> > > >> Thank you for the KIP! > > >> > > >> Regarding option 1, I would not use `Emitted.onWindowClose()` since > that > > >> does not seem compatible with the proposed flow. Conceptually, now the > > >> flow states that the aggregation is triggered on window close and all > > >> aggregation results are emitted. `Emitted` suggests that the > aggregation > > >> is configured to only emit final results. > > >> > > >> Thus, I propose the following: > > >> > > >> stream > > >> .groupBy(..) > > >> .windowedBy(..) > > >> .trigger(TriggerParameters.onWindowClose()) > > >> .aggregate(..) //result in a KTable<Windowed<..>> > > >> .mapValues(..) > > >> > > >> An alternative to `trigger()` could be `schedule()`, but I do not > really > > >> like it. > > >> > > >> One thing I noticed with option 1 is that all other methods in the > > >> example above are operations on data. `groupBy()` groups, > `windowedBy()` > > >> partitions, `aggregate()` computes the aggregate, `mapValues()` maps > > >> values, even `suppress()` suppresses intermediate results. But what > does > > >> `trigger()` do? `trigger()` seems a config lost among operations. > > >> > > >> However, if we do not want to restrict ourselves to only use methods > > >> when we want to specify operations on data, I have the following > > proposal: > > >> > > >> stream > > >> .groupBy(..) > > >> .windowedBy(..) > > >> .onWindowClose() > > >> .aggregate(..) //result in a KTable<Windowed<..>> > > >> .mapValues(..) > > >> > > >> Best, > > >> Bruno > > >> > > >> P.S.: Why is it `windowedBy()` and not `windowBy()`? All other > > >> operations also use present tense. > > >> > > >> On 22.03.22 06:36, Hao Li wrote: > > >>> Hi John, > > >>> > > >>> Yes. For naming, `trigger` is similar to Flink's trigger, but it has > a > > >>> different meaning in our case. `emit` sounds like an action to emit? > > How > > >>> about `emitTrigger`? I'm open to suggestions for the naming. > > >>> > > >>> For deprecating `Suppressed.untilWindowClose`, I agree with Guozhang > we > > >> can > > >>> deprecate `Suppressed` config as a whole later. Or we can deprecate > > >>> `Suppressed.untilWindowClose` in later KIP after implementation of > emit > > >>> final is done. > > >>> > > >>> BTW, isn't > > >>> > > >>> stream > > >>> .groupBy(..) > > >>> .windowBy(..) > > >>> .aggregate(..) //result in a KTable<Windowed<..>> > > >>> .mapValues(..) > > >>> .suppress(Suppressed.untilWindowClose) // since we can trace back > > to > > >>> parent node, to find a window definition > > >>> > > >>> same as > > >>> > > >>> stream > > >>> .groupBy(..) > > >>> .windowBy(..) > > >>> .trigger(Emitted.onWindowClose) > > >>> .aggregate(..) //result in a KTable<Windowed<..>> > > >>> .mapValues(..) > > >>> ? > > >>> > > >>> Hao > > >>> > > >>> > > >>> On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang <wangg...@gmail.com> > > >> wrote: > > >>> > > >>>> I think the following case is only doable via `suppress`: > > >>>> > > >>>> stream > > >>>> .groupBy(..) > > >>>> .windowBy(..) > > >>>> .aggregate(..) //result in a KTable<Windowed<..>> > > >>>> .mapValues(..) > > >>>> .suppress(Suppressed.untilWindowClose) // since we can trace > back > > to > > >>>> parent node, to find a window definition > > >>>> > > >>>> > > >>>> Guozhang > > >>>> > > >>>> > > >>>> On Mon, Mar 21, 2022 at 6:36 PM John Roesler <vvcep...@apache.org> > > >> wrote: > > >>>> > > >>>>> Thanks, Guozhang! > > >>>>> > > >>>>> To clarify, I was asking specifically about deprecating just the > > method > > >>>>> ‘untilWindowClose’. I might not be thinking clearly about it, > though. > > >>>> What > > >>>>> does untilWindowClose do that this KIP doesn’t cover? > > >>>>> > > >>>>> Thanks, > > >>>>> John > > >>>>> > > >>>>> On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote: > > >>>>>> Just my 2c: Suppressed is in `suppress` whose application scope is > > >> much > > >>>>>> larger and hence more flexible. I.e. it can be used anywhere for a > > >>>>> `KTable` > > >>>>>> (but internally we would check whether certain emit policies like > > >>>>>> `untilWindowClose` is valid or not), whereas `trigger` as for now > is > > >>>> only > > >>>>>> applicable in XXWindowedKStream. So I think it would not be > > completely > > >>>>>> replacing Suppressed.untilWindowClose. > > >>>>>> > > >>>>>> In the future, personally I'd still want to keep one control > object > > >>>> still > > >>>>>> for all emit policies, and maybe if we have extended Emitted for > > other > > >>>>>> emitting policies covered by Suppressed today, we can discuss if > we > > >>>> could > > >>>>>> have `KTable.suppress(Emitted..)` replacing > > >>>>> `KTable.suppress(Suppressed..)` > > >>>>>> as a whole, but for this KIP I think it's too early. > > >>>>>> > > >>>>>> > > >>>>>> Guozhang > > >>>>>> > > >>>>>> > > >>>>>> On Mon, Mar 21, 2022 at 6:18 PM John Roesler <vvcep...@apache.org > > > > >>>>> wrote: > > >>>>>> > > >>>>>>> Hi all, > > >>>>>>> > > >>>>>>> Thanks for the Kip, Hao! > > >>>>>>> > > >>>>>>> For what it’s worth, I’m also in favor of your latest framing of > > the > > >>>>> API, > > >>>>>>> > > >>>>>>> I think the name is fine. I assume it’s inspired by Flink? It’s > not > > >>>>>>> identical to the concept of a trigger in Flink, which specifies > > when > > >>>> to > > >>>>>>> evaluate the window, which might be confusing to some people who > > have > > >>>>> deep > > >>>>>>> experience with Flink. Then again, it seems close enough that it > > >>>> should > > >>>>> be > > >>>>>>> clear to casual Flink users. For people with no other stream > > >>>> processing > > >>>>>>> experience, it might seem a bit esoteric compared to something > > >>>>>>> self-documenting like ‘emit()’, but the docs should make it > clear. > > >>>>>>> > > >>>>>>> One small question: it seems like this proposal is identical to > > >>>>>>> Suppressed.untilWindowClose, and the KIP states that this API is > > >>>>> superior. > > >>>>>>> In that case, should we deprecate Suppressed.untilWindowClose? > > >>>>>>> > > >>>>>>> Thanks, > > >>>>>>> John > > >>>>>>> > > >>>>>>> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote: > > >>>>>>>> Hi Hao, > > >>>>>>>> > > >>>>>>>> For 2), I think it's a good idea in general to use a separate > > >>>>> function on > > >>>>>>>> the Time/SessionWindowedKStream itself, to achieve the same > effect > > >>>>> that, > > >>>>>>>> for now, the emitting control is only for windowed aggregations > as > > >>>> in > > >>>>>>> this > > >>>>>>>> KIP, than overloading existing functions. We can discuss further > > >>>> about > > >>>>>>> the > > >>>>>>>> actual function names, whether others like the name `trigger` or > > >>>> not. > > >>>>> As > > >>>>>>>> for myself I feel `trigger` is a good one but I'd like to see if > > >>>>> others > > >>>>>>>> have opinions as well. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Guozhang > > >>>>>>>> > > >>>>>>>> On Mon, Mar 21, 2022 at 5:18 PM Hao Li <h...@confluent.io.invalid > > > > >>>>> wrote: > > >>>>>>>> > > >>>>>>>>> Hi Guozhang, > > >>>>>>>>> > > >>>>>>>>> Thanks for the feedback. > > >>>>>>>>> > > >>>>>>>>> 1. I agree to have an `Emitted` control class and two static > > >>>>>>> constructors > > >>>>>>>>> named `onWindowClose` and `onEachUpdate`. > > >>>>>>>>> > > >>>>>>>>> 2. For the API function changes, I'm thinking of adding a new > > >>>>> function > > >>>>>>>>> called `trigger` to `TimeWindowedKStream` and > > >>>>> `SessionWindowedKStream`. > > >>>>>>> It > > >>>>>>>>> takes `Emitted` config and returns the same stream. Example: > > >>>>>>>>> > > >>>>>>>>> stream > > >>>>>>>>> .groupBy(...) > > >>>>>>>>> .windowedBy(...) > > >>>>>>>>> .trigger(Emitted.onWindowClose). // N > > >>>>>>>>> .count() > > >>>>>>>>> > > >>>>>>>>> The benefits are: > > >>>>>>>>> 1. It's simple and avoids creating overloading of existing > > >>>>> functions > > >>>>>>> like > > >>>>>>>>> `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to > add > > >>>> it > > >>>>> to > > >>>>>>>>> `aggregate` functions, we need to add it to all existing > `count`, > > >>>>>>>>> `aggregate` overloading functions which is a lot. > > >>>>>>>>> 2. It operates directly on windowed kstream and tells how > its > > >>>>> output > > >>>>>>>>> should be configured, if later we need to add this other type > of > > >>>>>>> streams, > > >>>>>>>>> we can reuse same `trigger` API whereas other type of > > >>>> streams/tables > > >>>>> may > > >>>>>>>>> not have `aggregate`, `windowedby` api to make it consistent. > > >>>>>>>>> > > >>>>>>>>> Hao > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang < > > wangg...@gmail.com> > > >>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>>> Hello Hao, > > >>>>>>>>>> > > >>>>>>>>>> I'm preferring option 2 over the other options mainly because > > the > > >>>>>>> added > > >>>>>>>>>> config object could potentially be used in other operators as > > >>>> well > > >>>>>>> (not > > >>>>>>>>>> necessarily has to be a windowed operator and hence have to be > > >>>>>>>>> piggy-backed > > >>>>>>>>>> on `windowedBy`, and that's also why I suggested not naming it > > >>>>>>>>>> `WindowConfig` but just `EmitConfig`). > > >>>>>>>>>> > > >>>>>>>>>> As for Matthias' question, I think the difference between the > > >>>>> windowed > > >>>>>>>>>> aggregate operator and the stream-stream join operator is > that, > > >>>> for > > >>>>>>> the > > >>>>>>>>>> latter we think emit-final should be the only right emitting > > >>>> policy > > >>>>>>> and > > >>>>>>>>>> hence we should not let users to configure it. If users > > configure > > >>>>> it > > >>>>>>> to > > >>>>>>>>>> e.g. emit eager they may get the old spurious emitting > behavior > > >>>>> which > > >>>>>>> is > > >>>>>>>>>> violating the semantics. > > >>>>>>>>>> > > >>>>>>>>>> For option 2) itself, I have a few more thoughts: > > >>>>>>>>>> > > >>>>>>>>>> 1. Thinking about Matthias' suggestions, I'm also leaning a > bit > > >>>>>>>>>> towards adding the new param in the overloaded `aggregate`, > than > > >>>>> the > > >>>>>>>>>> overloaded `windowBy` function. The reason is that the > emitting > > >>>>> logic > > >>>>>>>>> could > > >>>>>>>>>> be either window based or non-window based, in the long run. > > >>>> Though > > >>>>>>> for > > >>>>>>>>>> this KIP we could just add it in > > >>>> `XXXWindowedKStream.aggregate()`, > > >>>>> we > > >>>>>>> may > > >>>>>>>>>> want to extend to other non-windowed operators in the future. > > >>>>>>>>>> 2. To be consistent with other control class names, I feel > maybe > > >>>> we > > >>>>>>> can > > >>>>>>>>>> name it "Emitted", not "EmitConfig". > > >>>>>>>>>> 3. Following the first comment, I think we can have the static > > >>>>>>>>> constructor > > >>>>>>>>>> names as "onWindowClose" and "onEachUpdate". > > >>>>>>>>>> > > >>>>>>>>>> The resulted code pattern would be like this: > > >>>>>>>>>> > > >>>>>>>>>> stream > > >>>>>>>>>> .groupBy(..) > > >>>>>>>>>> .windowBy(TimeWindow..) > > >>>>>>>>>> .count(Emitted.onWindowClose) > > >>>>>>>>>> > > >>>>>>>>>> WDYT? > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax < > > >>>> mj...@apache.org > > >>>>>> > > >>>>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>>>>> `allowedLateness` may not be a good name. What I have in > > >>>> mind > > >>>>> is > > >>>>>>> to > > >>>>>>>>>> use > > >>>>>>>>>>>>> this to control how frequently we try to emit final > results. > > >>>>>>> Maybe > > >>>>>>>>>> it's > > >>>>>>>>>>>>> more flexible to be used as config in properties as we > don't > > >>>>>>> need to > > >>>>>>>>>>>>> recompile DSL to change it. > > >>>>>>>>>>> > > >>>>>>>>>>> I see; making it a config seems better. Frankly, I am not > even > > >>>>> sure > > >>>>>>> if > > >>>>>>>>>>> we need a config at all or if we can just hard code it? For > the > > >>>>>>>>>>> stream-stream join left/outer join fix, there is only an > > >>>> internal > > >>>>>>>>> config > > >>>>>>>>>>> but no public config either. > > >>>>>>>>>>> > > >>>>>>>>>>> Option 1: Your proposal is? > > >>>>>>>>>>> > > >>>>>>>>>>> stream > > >>>>>>>>>>> .groupByKey() > > >>>>>>>>>>> .windowBy(TimeWindow.ofSizeNoGrace(...)) > > >>>>>>>>>>> .configure(EmitConfig.emitFinal() > > >>>>>>>>>>> .count() > > >>>>>>>>>>> > > >>>>>>>>>>> Does not change my argument that it seems to be misplace from > > >>>> an > > >>>>> API > > >>>>>>>>>>> flow POV. > > >>>>>>>>>>> > > >>>>>>>>>>> Option 1 seems to be the least desirable to me. > > >>>>>>>>>>> > > >>>>>>>>>>> For option 2 and 3, and not sure which one I like better. > Might > > >>>>> be > > >>>>>>> good > > >>>>>>>>>>> if other could chime in, too. I think I slightly prefer > option > > >>>> 2 > > >>>>>>> over > > >>>>>>>>>>> option 3. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> -Matthias > > >>>>>>>>>>> > > >>>>>>>>>>> On 3/15/22 5:33 PM, Hao Li wrote: > > >>>>>>>>>>>> Thanks for the feedback Matthias. > > >>>>>>>>>>>> > > >>>>>>>>>>>> `allowedLateness` may not be a good name. What I have in > mind > > >>>>> is > > >>>>>>> to > > >>>>>>>>> use > > >>>>>>>>>>>> this to control how frequently we try to emit final results. > > >>>>> Maybe > > >>>>>>>>> it's > > >>>>>>>>>>>> more flexible to be used as config in properties as we don't > > >>>>> need > > >>>>>>> to > > >>>>>>>>>>>> recompile DSL to change it. > > >>>>>>>>>>>> > > >>>>>>>>>>>> For option 1, I intend to use `emitFinal` to configure how > > >>>>>>>>>>>> `TimeWindowedKStream` should be outputted to `KTable` after > > >>>>>>>>>> aggregation. > > >>>>>>>>>>>> But `emitFinal` is not an action to the > `TimeWindowedKStream` > > >>>>>>>>>> interface. > > >>>>>>>>>>>> Maybe adding `configure(EmitConfig config)` makes more > sense? > > >>>>>>>>>>>> > > >>>>>>>>>>>> For option 2, config can be created using > > >>>>>>> `WindowConfig.emitFinal()` > > >>>>>>>>> or > > >>>>>>>>>>>> `EmitConfig.emitFinal` > > >>>>>>>>>>>> > > >>>>>>>>>>>> For option 3, it will be something like `TimeWindows(..., > > >>>>>>> EmitConfig > > >>>>>>>>>>>> emitConfig)`. > > >>>>>>>>>>>> > > >>>>>>>>>>>> For putting `EmitConfig` in aggregation operator, I think it > > >>>>>>> doesn't > > >>>>>>>>>>>> control how we do aggregation but how we output to `KTable`. > > >>>>>>> That's > > >>>>>>>>>> why I > > >>>>>>>>>>>> feel option 1 makes more sense as it applies to > > >>>>>>>>> `TimeWindowedKStream`. > > >>>>>>>>>>> But > > >>>>>>>>>>>> I'm also OK with option 2. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Hao > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax < > > >>>>> mj...@apache.org > > >>>>>>>> > > >>>>>>>>>>> wrote: > > >>>>>>>>>>>> > > >>>>>>>>>>>>> Thanks for the KIP. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> A general comment: it seem that we won't need any new > > >>>>>>>>>> `allowedLateness` > > >>>>>>>>>>>>> parameter because the grace-period is defined on the window > > >>>>>>> itself > > >>>>>>>>>>> already? > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> (On the other hand, if I think about it once more, maybe > the > > >>>>>>>>>>>>> `grace-period` is actually not a property of the window but > > >>>> a > > >>>>>>>>> property > > >>>>>>>>>>>>> of the aggregation operator? _thinking_) > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> From an API flow point of view, option 1 might not be > > >>>>> desirable > > >>>>>>>>>> IMHO: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> stream > > >>>>>>>>>>>>> .groupByKey() > > >>>>>>>>>>>>> .windowBy(TimeWindow.ofSizeNoGrace(...)) > > >>>>>>>>>>>>> .emitFinal() > > >>>>>>>>>>>>> .count() > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> The call to `emitFinal(0` seems not to be on the right > place > > >>>>> for > > >>>>>>>>> this > > >>>>>>>>>>> case? > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Option 2 might work (I think we need to discuss a few > > >>>> details > > >>>>> of > > >>>>>>> the > > >>>>>>>>>> API > > >>>>>>>>>>>>> though): > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> stream > > >>>>>>>>>>>>> .groupByKey() > > >>>>>>>>>>>>> .windowBy( > > >>>>>>>>>>>>> TimeWindow.ofSizeNoGrace(...), > > >>>>>>>>>>>>> EmitConfig.emitFinal() -- just made this up; it's > > >>>> not > > >>>>> in > > >>>>>>> the > > >>>>>>>>>> KIP > > >>>>>>>>>>>>> ) > > >>>>>>>>>>>>> .count() > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> I made up the `WindowConfig.emitFinal()` call -- from the > > >>>> KIP > > >>>>>>> it's > > >>>>>>>>>>>>> unclear what API you have in mind? `EmitFinalConfig` has > not > > >>>>>>> public > > >>>>>>>>>>>>> constructor not any builder method. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> For option 3, I am not sure what you really have in mind. > > >>>> Can > > >>>>> you > > >>>>>>>>>> given > > >>>>>>>>>>>>> a concrete example (similar to above) how users would write > > >>>>> their > > >>>>>>>>>> code? > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Did you consider to actually pass in the `EmitConfig` into > > >>>> the > > >>>>>>>>>>>>> aggregation operator? In the end, it seems not to be > > >>>> property > > >>>>> of > > >>>>>>> the > > >>>>>>>>>>>>> window definition or windowing step, but a property of the > > >>>>> actual > > >>>>>>>>>>> operator: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> stream > > >>>>>>>>>>>>> .groupByKey() > > >>>>>>>>>>>>> .windowBy( > > >>>>>>>>>>>>> TimeWindow.ofSizeNoGrace(...) > > >>>>>>>>>>>>> ) > > >>>>>>>>>>>>> .count(EmitConfig.emitFinal()) > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> The API surface area that need to be updated might be > larger > > >>>>> for > > >>>>>>>>> this > > >>>>>>>>>>>>> case though... > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> -Matthias > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On 3/14/22 9:21 PM, Hao Li wrote: > > >>>>>>>>>>>>>> Thanks Guozhang! > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 1. I agree `EmitConfig` is better than `WindowConfig` and > > >>>>>>> option 2 > > >>>>>>>>>>>>> modifies > > >>>>>>>>>>>>>> less places. What do you think of option 1 which doesn't > > >>>>> change > > >>>>>>> the > > >>>>>>>>>>>>> current > > >>>>>>>>>>>>>> `windowedBy` api but configures `EmitConfig` separately. > > >>>> The > > >>>>>>>>> benefit > > >>>>>>>>>> of > > >>>>>>>>>>>>>> option 1 is if we need to configure something else later, > > >>>> we > > >>>>>>> don't > > >>>>>>>>>> need > > >>>>>>>>>>>>> to > > >>>>>>>>>>>>>> pile them on `windowedBy` but can add separate APIs. > > >>>>>>>>>>>>>> 2. I added it to `Stores` mainly to conform to > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>> > > >>>>> > > >>>> > > >> > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231 > > >>>>>>>>>>>>> . > > >>>>>>>>>>>>>> But We can also create an internal API to do that without > > >>>>>>> modifying > > >>>>>>>>>>>>>> `Stores`. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Hao > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang < > > >>>>>>> wangg...@gmail.com> > > >>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Hello Hao, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Thanks for the proposal, I have some preference among the > > >>>>>>> options > > >>>>>>>>>> here > > >>>>>>>>>>>>> so I > > >>>>>>>>>>>>>>> will copy them here: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> I'm now thinking if it's better to not add this new > config > > >>>>> on > > >>>>>>> each > > >>>>>>>>>> of > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>>> Window interfaces, but instead add that at the > > >>>>>>>>>>> KGroupedStream#windowedBy > > >>>>>>>>>>>>>>> function. Also instead of adding just a boolean flag, > > >>>> maybe > > >>>>> we > > >>>>>>> can > > >>>>>>>>>>> add a > > >>>>>>>>>>>>>>> Configured class like Grouped, Suppressed, etc, e.g. > let's > > >>>>> call > > >>>>>>>>> it a > > >>>>>>>>>>>>>>> Emitted which for now would just have a single construct > > >>>> as > > >>>>>>>>>>>>>>> Emitted.atWindowClose whose semantics is the same as > > >>>>> emitFinal > > >>>>>>> == > > >>>>>>>>>>> true. > > >>>>>>>>>>>>> I > > >>>>>>>>>>>>>>> think the benefits are: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 1) you do not need to modify multiple Window classes, but > > >>>>> just > > >>>>>>>>>>> overload > > >>>>>>>>>>>>> one > > >>>>>>>>>>>>>>> windowedBy function with a second param. This is less of > a > > >>>>>>> scope > > >>>>>>>>> for > > >>>>>>>>>>>>> now, > > >>>>>>>>>>>>>>> and also more extensible for any future changes. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 2) With a config interface, we maintain its extensibility > > >>>> as > > >>>>>>> well > > >>>>>>>>> as > > >>>>>>>>>>>>> being > > >>>>>>>>>>>>>>> able to reuse this Emitted interface for other operators > > >>>> if > > >>>>> we > > >>>>>>>>>> wanted > > >>>>>>>>>>> to > > >>>>>>>>>>>>>>> expand to. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> ---------------------------- > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> So in general I'm leaning towards option 2). For that, > > >>>> some > > >>>>>>> more > > >>>>>>>>>>>>> detailed > > >>>>>>>>>>>>>>> comments: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 1) If we want to reuse that config object for other > > >>>>> non-window > > >>>>>>>>>>> stateful > > >>>>>>>>>>>>>>> operations, I think naming it as `EmitConfig` is probably > > >>>>>>> better > > >>>>>>>>>> than > > >>>>>>>>>>>>>>> `WindowConfig`. > > >>>>>>>>>>>>>>> 2) I saw your PR ( > > >>>>> https://github.com/apache/kafka/pull/11892) > > >>>>>>>>> that > > >>>>>>>>>>> you > > >>>>>>>>>>>>> are > > >>>>>>>>>>>>>>> also proposing to add new stores into the public factory > > >>>>>>> Stores, > > >>>>>>>>> but > > >>>>>>>>>>>>> it's > > >>>>>>>>>>>>>>> not included in the KIP. Is that intentional? Personally > I > > >>>>>>> think > > >>>>>>>>>> that > > >>>>>>>>>>>>>>> although we may eventually want to add a new store type > to > > >>>>> the > > >>>>>>>>>> public > > >>>>>>>>>>>>> APIs, > > >>>>>>>>>>>>>>> for this KIP maybe we do not have to add them but can > > >>>> delay > > >>>>> for > > >>>>>>>>>> later > > >>>>>>>>>>>>> after > > >>>>>>>>>>>>>>> we've learned the best way to layout. LMK what do you > > >>>> think? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Guozhang > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On Fri, Mar 11, 2022 at 2:13 PM Hao Li > > >>>>>>> <h...@confluent.io.invalid> > > >>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Hi Dev team, > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> I'd like to start a discussion thread on Kafka Streams > > >>>>>>> KIP-825: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>> > > >>>>> > > >>>> > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> This KIP is aimed to add new APIs to support outputting > > >>>>> final > > >>>>>>>>>>>>> aggregated > > >>>>>>>>>>>>>>>> results for windowed aggregations. I listed several > > >>>> options > > >>>>>>> there > > >>>>>>>>>> and > > >>>>>>>>>>>>> I'm > > >>>>>>>>>>>>>>>> looking forward to your feedback. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>> Hao > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>> -- Guozhang > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> -- > > >>>>>>>>>> -- Guozhang > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> -- > > >>>>>>>>> Thanks, > > >>>>>>>>> Hao > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> -- > > >>>>>>>> -- Guozhang > > >>>>>>> > > >>>>>> > > >>>>>> > > >>>>>> -- > > >>>>>> -- Guozhang > > >>>>> > > >>>> > > >>>> > > >>>> -- > > >>>> -- Guozhang > > >>>> > > >>> > > >>> > > >> > > > > > > > > > > > -- > -- Guozhang > -- Thanks, Hao