For stream .groupBy(..) .windowedBy(..) .aggregate(..) .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) .mapValues(..)
I think after `aggregate` it's already a table and then the emit strategy is too late to control how windowed stream is outputted to table. This is the concern Guozhang raised about having this in existing `suppress` operator as well. Thanks, Hao On Wed, Mar 23, 2022 at 1:05 PM Bruno Cadonna <cado...@apache.org> wrote: > Hi, > > Thank you for your answers to my questions! > > I see the argument about conciseness of configuring a stream with > methods instead of config objects. I just miss a bit the descriptive > aspect. > > What about > > stream > .groupBy(..) > .windowedBy(..) > .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) > .aggregate(..) > .mapValues(..) > > I have also another question. Why should emitting of results be > controlled by the window level api? If I want to emit results for each > input record the emit strategy is quite independent from the window. So > I somehow share Matthias' and Guozhang's concern that the emit strategy > seems misplaced there. > > What are the arguments against? > > stream > .groupBy(..) > .windowedBy(..) > .aggregate(..) > .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) > .mapValues(..) > > > A final administrative request: Hao, could you please add the rejected > alternatives to the KIP so that future us will know why we rejected them? > > Best, > Bruno > > On 23.03.22 19:38, John Roesler wrote: > > Hi all, > > > > I can see both sides of this. > > > > On one hand, when we say > > "stream.groupBy().windowBy().count()", it seems like we're > > telling KS to take the raw stream, group it based on key, > > then window it based on time, and then compute an > > aggregation on the windows. In that model, "trigger()" would > > have to mean something like "trigger it", which doesn't > > really make sense, since we aren't "triggering" the > > aggregation (then again, to an outside observer, it would > > appear that way... food for thought). > > > > Another way to look at it is that all we're really doing is > > configuring a windowed aggreation on the stream, and we're > > doing it with a progressive builder interface. In other > > words, the above is just a progressive builder for > > configuring an operation like > > "stream.aggregate(groupingConfig, windowingConfig, > > countFn)". Under the latter interpretation of the DSL, it > > makes perfect sense to add more optional progressive builder > > methods like trigger() to the WindowedKStream interfaces. > > > > Since part of the motivation for choosing the word "trigger" > > here is to stay close to what Flink defines, I'll also point > > out that Flink's syntax is also > > "stream.keyBy().window().trigger().aggregate()". Not that > > their API is the holy grail or anything, but it's at least > > an indication that this API isn't a horrible mistake. > > > > All other things being equal, I also prefer to leave tie- > > breakers in the hands of the contributor. So, if we've all > > said our piece and Hao still prefers option 1, then (as long > > as we don't think it's a horrible mistake), I think we > > should just let him go for it. > > > > Speaking of which, after reviewing the responses regarding > > deprecating `Suppressed#onWindowClose`, I still think we > > should just go ahead and deprecate it. Although it's not > > expressed exactly the same way, it still does exactly the > > same thing, or so close that it seems confusing to keep > > both. But again, if Hao really prefers to keep both, I won't > > insist on it :) > > > > Thanks all, > > -John > > > > On Wed, 2022-03-23 at 09:59 -0700, Hao Li wrote: > >> Thanks Bruno! > >> > >> Argument for option 1 is: > >> 1. Concise and descriptive. It avoids overloading existing functions and > >> it's very clear what it's doing. Imagine if there's a autocomplete > feature > >> in Intellij or other IDE for our DSL in the future, it's not favorable > to > >> show 6 `windowedBy` functions. > >> 2. Option 1 is operated on `windowedStream` to configure how it should > be > >> outputted. Option 2 operates on `KGroupedStream` to produce > >> `windowedStream` as well as configure how `windowedStream` should be > >> outputted. I feel it's better to have a `windowedStream` and then > >> configure how it can be outputted. Somehow I feel option 2 breaks the > >> builder pattern. > >> 3. `WindowedByParameters` doesn't seem very descriptive. If we put all > >> kinds of different parameters into it to avoid future overloading, it's > too > >> bloated and not very user friendly. > >> > >> I agree option 1's `trigger` function is configuring the stream which > feels > >> different from existing `count` or `aggregate` etc. Configuring might be > >> also a kind of action to stream :) I'm not sure if it breaks DSL > principle > >> and if it does, > >> can we relax the principle given the benefits compared to option 2)? > Maybe > >> John can chime in as the DSL grammar author. > >> > >> Thanks, > >> Hao > >> > >> On Wed, Mar 23, 2022 at 2:59 AM Bruno Cadonna <cado...@apache.org> > wrote: > >> > >>> Hi Hao, > >>> > >>> I agree with Guozhang: Great summary! Thank you! > >>> > >>> Regarding "aligned with other config class names", there is this DSL > >>> grammar John once specified > >>> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar > >>> and we have already used it in the code. I found the grammar quite > useful. > >>> > >>> I am undecided if option 1 is really worth it. What are actually the > >>> arguments in favor of it? Is it only that we do not need to overload > >>> other methods? This does not seem worth to break DSL principles. An > >>> alternative proposal would be to go with option 2 and conform with the > >>> grammar above: > >>> > >>> <W extends Window> TimeWindowedKStream<K, V> windowedBy(final > Windows<W> > >>> windows, WindowedByParameters parameters); > >>> > >>> TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows, > >>> WindowedByParameters parameters); > >>> > >>> SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows, > >>> WindowedByParameters parameters); > >>> > >>> This is similar to option 2 in the KIP, but it ensures that we put all > >>> future needed configs in the parameters object and we do not need to > >>> overload the methods anymore. > >>> > >>> Then if we also get KAFKA-10298 done, we could even collapse all > >>> `windowedBy()` methods into one. > >>> > >>> Best, > >>> Bruno > >>> > >>> On 22.03.22 22:31, Guozhang Wang wrote: > >>>> Thanks for the great summary Hao. I'm still learning towards option 2) > >>>> here, and I'm in favor of `trigger` as function name, and `Triggered` > as > >>>> config class name (mainly to be aligned with other config class > names). > >>>> Also want to see other's preferences between the options, as well as > the > >>>> namings. > >>>> > >>>> > >>>> Guozhang > >>>> > >>>> > >>>> > >>>> On Tue, Mar 22, 2022 at 12:23 PM Hao Li <h...@confluent.io.invalid> > >>> wrote: > >>>> > >>>>> `windowedStream.onWindowClose()` was the original option 1 > >>>>> (`windowedStream.emitFinal()`) but was rejected > >>>>> because we could add more emit types and this will result in adding > more > >>>>> functions. I still prefer the > >>>>> "windowedStream.someFunc(Controlled.onWindowClose)" > >>>>> model since it's flexible and clear that it's configuring the emit > >>> policy. > >>>>> Let me summarize all the naming options we have and compare: > >>>>> > >>>>> *API function name:* > >>>>> > >>>>> *1. `windowedStream.trigger()`* > >>>>> Pros: > >>>>> i. Simple > >>>>> ii. Similar to Flink's trigger function (is this a con > >>> actually?) > >>>>> Cons: > >>>>> i. `trigger()` can be confused with Flink trigger (raised by > >>> John) > >>>>> ii. `trigger()` feels like an operation instead of a > configure > >>>>> function (raised by Bruno)? > >>>>> > >>>>> *2. `windowedStream.emitTrigger()`* > >>>>> Pros: > >>>>> i. Avoid confusion from Flink's trigger API > >>>>> ii. `emitTrigger` feels like configuring the trigger because > >>>>> "trigger" here is a noun instead of verbose in `trigger()` > >>>>> Cons: > >>>>> i: Verbose? > >>>>> ii: Not consistent with `Suppressed.untilWindowClose`? > >>>>> > >>>>> > >>>>> *Config class/object name:* > >>>>> > >>>>> 1. *`Emitted.onWindowClose()`* and *`Emitted.onEachUpdate()`* > >>>>> Cons: > >>>>> i. Doesn't go along with `trigger` (raised by Bruno) > >>>>> > >>>>> 2. *`Triggered.onWindowClose()`* and *`Triggered.onEachUpdate()`* > >>>>> > >>>>> 3. *`EmitTrigger.onWindowClose()`* and *`EmitTrigger.onEachUpdate()`* > >>>>> > >>>>> 4. *`(Emit|Trigger)(Config|Policy).onWindowClose()`* and > >>>>> *`(Emit|Trigger)(Config|Policy).onEachUpdate()`* > >>>>> This is a combination of different names like: `EmitConfig`, > >>>>> `EmitPolicy`, `TriggerConfig` and `TriggerPolicy`... > >>>>> > >>>>> > >>>>> If we are settled with option 1), we can add new options to these > names > >>> and > >>>>> comment on their Pros and Cons. > >>>>> > >>>>> Hao > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> On Tue, Mar 22, 2022 at 10:48 AM Guozhang Wang <wangg...@gmail.com> > >>> wrote: > >>>>> > >>>>>> I see what you mean now, and I think it's a fair point that > composing > >>>>>> `trigger` and `emitted` seems awkward. > >>>>>> > >>>>>> Re: data process operator v.s. control operator, I shared your > concern > >>> as > >>>>>> well, and here's my train of thoughts: Having only data process > >>> operators > >>>>>> was my primary motivation for how we add the suppress operator --- > it > >>>>>> indeed "suppresses" data. But as a hind-sight it's disadvantage is > >>> that, > >>>>>> for example in Suppressed.onWindowClose() should be only related to > an > >>>>>> earlier windowedBy operator which is possibly very far from it in > the > >>>>>> resulting DSL code. It's not only a bit awkward for users to write > such > >>>>>> code, but also in such cases the DSL builder needs to maintain and > >>>>>> propagate this information to the suppress operator further down. > So we > >>>>> are > >>>>>> now thinking about "putting the control object as close as to where > the > >>>>>> related processor really happens". And in that world my original > >>>>>> preference was somewhere in option 2), i.e. just put the control as > a > >>>>> param > >>>>>> of the related "windowedBy" operator, but the trade-off is we keep > >>> adding > >>>>>> overloaded functions to these operators. So after some back and > forth > >>>>>> thoughts I'm learning towards relaxing our principles to only have > >>>>>> processing operators but no flow-control operators. That being > said, if > >>>>> you > >>>>>> have any ideas that we can have both world's benefits I'm all ears. > >>>>>> > >>>>>> Re: using a direct function like "windowedStream.onWindowClose()" > v.s. > >>>>>> "windowedStream.someFunc(Controlled.onWindowClose)", again my > >>> motivation > >>>>>> for the latter is for extensibility without adding more functions in > >>> the > >>>>>> future. If people feel this is not worthy we can do the first > option as > >>>>>> well. If we just feel the `trigger` and `emitted` does not feel > >>>>> composible > >>>>>> together, maybe we can consider something like > >>>>>> `windowedStream.trigger(Triggered.onWindowClose())"? > >>>>>> > >>>>>> Re: windowedBy v.s. windowBy, yeah I do not really have a good > reason > >>> why > >>>>>> we should use past term as well :P But if it's not bothering people > >>> much > >>>>>> I'd say we just keep it than deprecate/rename new APIs. > >>>>>> > >>>>>> > >>>>>> On Tue, Mar 22, 2022 at 9:42 AM Bruno Cadonna <cado...@apache.org> > >>>>> wrote: > >>>>>> > >>>>>>> Hi Guozhang, > >>>>>>> > >>>>>>> There is no semantic difference. It is a cosmetic difference. > >>>>>>> Conceptually, I relate `Emitted` with the aggregation and not with > >>>>>>> `trigger()` in the API flow, because the aggregation emits the > result > >>>>>>> not `trigger()`. Therefore, I proposed to not use `Emitted` as the > >>> name > >>>>>>> of the config object passed to `trigger()`. > >>>>>>> > >>>>>>> > >>>>>>> Best, > >>>>>>> Bruno > >>>>>>> > >>>>>>> On 22.03.22 17:24, Guozhang Wang wrote: > >>>>>>>> Hi Bruno, > >>>>>>>> > >>>>>>>> Could you elaborate a bit more here, what's the semantic > difference > >>>>>>> between > >>>>>>>> "the aggregation is triggered on window close and all aggregation > >>>>>> results > >>>>>>>> are emitted." for trigger(TriggerParameters.onWindowClose()), and > >>>>> "the > >>>>>>>> aggregation is configured to only emit final results." for > >>>>>>>> trigger(Emitted.onWindowClose())? > >>>>>>>> > >>>>>>>> On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna <cado...@apache.org > > > >>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi Hao, > >>>>>>>>> > >>>>>>>>> Thank you for the KIP! > >>>>>>>>> > >>>>>>>>> Regarding option 1, I would not use `Emitted.onWindowClose()` > since > >>>>>> that > >>>>>>>>> does not seem compatible with the proposed flow. Conceptually, > now > >>>>> the > >>>>>>>>> flow states that the aggregation is triggered on window close and > >>>>> all > >>>>>>>>> aggregation results are emitted. `Emitted` suggests that the > >>>>>> aggregation > >>>>>>>>> is configured to only emit final results. > >>>>>>>>> > >>>>>>>>> Thus, I propose the following: > >>>>>>>>> > >>>>>>>>> stream > >>>>>>>>> .groupBy(..) > >>>>>>>>> .windowedBy(..) > >>>>>>>>> .trigger(TriggerParameters.onWindowClose()) > >>>>>>>>> .aggregate(..) //result in a KTable<Windowed<..>> > >>>>>>>>> .mapValues(..) > >>>>>>>>> > >>>>>>>>> An alternative to `trigger()` could be `schedule()`, but I do not > >>>>>> really > >>>>>>>>> like it. > >>>>>>>>> > >>>>>>>>> One thing I noticed with option 1 is that all other methods in > the > >>>>>>>>> example above are operations on data. `groupBy()` groups, > >>>>>> `windowedBy()` > >>>>>>>>> partitions, `aggregate()` computes the aggregate, `mapValues()` > maps > >>>>>>>>> values, even `suppress()` suppresses intermediate results. But > what > >>>>>> does > >>>>>>>>> `trigger()` do? `trigger()` seems a config lost among operations. > >>>>>>>>> > >>>>>>>>> However, if we do not want to restrict ourselves to only use > methods > >>>>>>>>> when we want to specify operations on data, I have the following > >>>>>>> proposal: > >>>>>>>>> > >>>>>>>>> stream > >>>>>>>>> .groupBy(..) > >>>>>>>>> .windowedBy(..) > >>>>>>>>> .onWindowClose() > >>>>>>>>> .aggregate(..) //result in a KTable<Windowed<..>> > >>>>>>>>> .mapValues(..) > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Bruno > >>>>>>>>> > >>>>>>>>> P.S.: Why is it `windowedBy()` and not `windowBy()`? All other > >>>>>>>>> operations also use present tense. > >>>>>>>>> > >>>>>>>>> On 22.03.22 06:36, Hao Li wrote: > >>>>>>>>>> Hi John, > >>>>>>>>>> > >>>>>>>>>> Yes. For naming, `trigger` is similar to Flink's trigger, but it > >>>>> has > >>>>>> a > >>>>>>>>>> different meaning in our case. `emit` sounds like an action to > >>>>> emit? > >>>>>>> How > >>>>>>>>>> about `emitTrigger`? I'm open to suggestions for the naming. > >>>>>>>>>> > >>>>>>>>>> For deprecating `Suppressed.untilWindowClose`, I agree with > >>>>> Guozhang > >>>>>> we > >>>>>>>>> can > >>>>>>>>>> deprecate `Suppressed` config as a whole later. Or we can > deprecate > >>>>>>>>>> `Suppressed.untilWindowClose` in later KIP after implementation > of > >>>>>> emit > >>>>>>>>>> final is done. > >>>>>>>>>> > >>>>>>>>>> BTW, isn't > >>>>>>>>>> > >>>>>>>>>> stream > >>>>>>>>>> .groupBy(..) > >>>>>>>>>> .windowBy(..) > >>>>>>>>>> .aggregate(..) //result in a KTable<Windowed<..>> > >>>>>>>>>> .mapValues(..) > >>>>>>>>>> .suppress(Suppressed.untilWindowClose) // since we can > trace > >>>>> back > >>>>>>> to > >>>>>>>>>> parent node, to find a window definition > >>>>>>>>>> > >>>>>>>>>> same as > >>>>>>>>>> > >>>>>>>>>> stream > >>>>>>>>>> .groupBy(..) > >>>>>>>>>> .windowBy(..) > >>>>>>>>>> .trigger(Emitted.onWindowClose) > >>>>>>>>>> .aggregate(..) //result in a KTable<Windowed<..>> > >>>>>>>>>> .mapValues(..) > >>>>>>>>>> ? > >>>>>>>>>> > >>>>>>>>>> Hao > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang < > wangg...@gmail.com> > >>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> I think the following case is only doable via `suppress`: > >>>>>>>>>>> > >>>>>>>>>>> stream > >>>>>>>>>>> .groupBy(..) > >>>>>>>>>>> .windowBy(..) > >>>>>>>>>>> .aggregate(..) //result in a KTable<Windowed<..>> > >>>>>>>>>>> .mapValues(..) > >>>>>>>>>>> .suppress(Suppressed.untilWindowClose) // since we can > trace > >>>>>> back > >>>>>>> to > >>>>>>>>>>> parent node, to find a window definition > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Guozhang > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Mon, Mar 21, 2022 at 6:36 PM John Roesler < > vvcep...@apache.org > >>>>>> > >>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Thanks, Guozhang! > >>>>>>>>>>>> > >>>>>>>>>>>> To clarify, I was asking specifically about deprecating just > the > >>>>>>> method > >>>>>>>>>>>> ‘untilWindowClose’. I might not be thinking clearly about it, > >>>>>> though. > >>>>>>>>>>> What > >>>>>>>>>>>> does untilWindowClose do that this KIP doesn’t cover? > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks, > >>>>>>>>>>>> John > >>>>>>>>>>>> > >>>>>>>>>>>> On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote: > >>>>>>>>>>>>> Just my 2c: Suppressed is in `suppress` whose application > scope > >>>>> is > >>>>>>>>> much > >>>>>>>>>>>>> larger and hence more flexible. I.e. it can be used anywhere > >>>>> for a > >>>>>>>>>>>> `KTable` > >>>>>>>>>>>>> (but internally we would check whether certain emit policies > >>>>> like > >>>>>>>>>>>>> `untilWindowClose` is valid or not), whereas `trigger` as for > >>>>> now > >>>>>> is > >>>>>>>>>>> only > >>>>>>>>>>>>> applicable in XXWindowedKStream. So I think it would not be > >>>>>>> completely > >>>>>>>>>>>>> replacing Suppressed.untilWindowClose. > >>>>>>>>>>>>> > >>>>>>>>>>>>> In the future, personally I'd still want to keep one control > >>>>>> object > >>>>>>>>>>> still > >>>>>>>>>>>>> for all emit policies, and maybe if we have extended Emitted > for > >>>>>>> other > >>>>>>>>>>>>> emitting policies covered by Suppressed today, we can > discuss if > >>>>>> we > >>>>>>>>>>> could > >>>>>>>>>>>>> have `KTable.suppress(Emitted..)` replacing > >>>>>>>>>>>> `KTable.suppress(Suppressed..)` > >>>>>>>>>>>>> as a whole, but for this KIP I think it's too early. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Mon, Mar 21, 2022 at 6:18 PM John Roesler < > >>>>> vvcep...@apache.org > >>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for the Kip, Hao! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> For what it’s worth, I’m also in favor of your latest > framing > >>>>> of > >>>>>>> the > >>>>>>>>>>>> API, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I think the name is fine. I assume it’s inspired by Flink? > It’s > >>>>>> not > >>>>>>>>>>>>>> identical to the concept of a trigger in Flink, which > specifies > >>>>>>> when > >>>>>>>>>>> to > >>>>>>>>>>>>>> evaluate the window, which might be confusing to some people > >>>>> who > >>>>>>> have > >>>>>>>>>>>> deep > >>>>>>>>>>>>>> experience with Flink. Then again, it seems close enough > that > >>>>> it > >>>>>>>>>>> should > >>>>>>>>>>>> be > >>>>>>>>>>>>>> clear to casual Flink users. For people with no other stream > >>>>>>>>>>> processing > >>>>>>>>>>>>>> experience, it might seem a bit esoteric compared to > something > >>>>>>>>>>>>>> self-documenting like ‘emit()’, but the docs should make it > >>>>>> clear. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> One small question: it seems like this proposal is > identical to > >>>>>>>>>>>>>> Suppressed.untilWindowClose, and the KIP states that this > API > >>>>> is > >>>>>>>>>>>> superior. > >>>>>>>>>>>>>> In that case, should we deprecate > Suppressed.untilWindowClose? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>> John > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote: > >>>>>>>>>>>>>>> Hi Hao, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> For 2), I think it's a good idea in general to use a > separate > >>>>>>>>>>>> function on > >>>>>>>>>>>>>>> the Time/SessionWindowedKStream itself, to achieve the same > >>>>>> effect > >>>>>>>>>>>> that, > >>>>>>>>>>>>>>> for now, the emitting control is only for windowed > >>>>> aggregations > >>>>>> as > >>>>>>>>>>> in > >>>>>>>>>>>>>> this > >>>>>>>>>>>>>>> KIP, than overloading existing functions. We can discuss > >>>>> further > >>>>>>>>>>> about > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>> actual function names, whether others like the name > `trigger` > >>>>> or > >>>>>>>>>>> not. > >>>>>>>>>>>> As > >>>>>>>>>>>>>>> for myself I feel `trigger` is a good one but I'd like to > see > >>>>> if > >>>>>>>>>>>> others > >>>>>>>>>>>>>>> have opinions as well. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Mon, Mar 21, 2022 at 5:18 PM Hao Li > >>>>> <h...@confluent.io.invalid > >>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Guozhang, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks for the feedback. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 1. I agree to have an `Emitted` control class and two > static > >>>>>>>>>>>>>> constructors > >>>>>>>>>>>>>>>> named `onWindowClose` and `onEachUpdate`. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 2. For the API function changes, I'm thinking of adding a > new > >>>>>>>>>>>> function > >>>>>>>>>>>>>>>> called `trigger` to `TimeWindowedKStream` and > >>>>>>>>>>>> `SessionWindowedKStream`. > >>>>>>>>>>>>>> It > >>>>>>>>>>>>>>>> takes `Emitted` config and returns the same stream. > Example: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> stream > >>>>>>>>>>>>>>>> .groupBy(...) > >>>>>>>>>>>>>>>> .windowedBy(...) > >>>>>>>>>>>>>>>> .trigger(Emitted.onWindowClose). // N > >>>>>>>>>>>>>>>> .count() > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> The benefits are: > >>>>>>>>>>>>>>>> 1. It's simple and avoids creating overloading of > >>>>> existing > >>>>>>>>>>>> functions > >>>>>>>>>>>>>> like > >>>>>>>>>>>>>>>> `windowedBy` or `count`, `reduce` or `aggregate`. In > fact, to > >>>>>> add > >>>>>>>>>>> it > >>>>>>>>>>>> to > >>>>>>>>>>>>>>>> `aggregate` functions, we need to add it to all existing > >>>>>> `count`, > >>>>>>>>>>>>>>>> `aggregate` overloading functions which is a lot. > >>>>>>>>>>>>>>>> 2. It operates directly on windowed kstream and > tells > >>> how > >>>>>> its > >>>>>>>>>>>> output > >>>>>>>>>>>>>>>> should be configured, if later we need to add this other > type > >>>>>> of > >>>>>>>>>>>>>> streams, > >>>>>>>>>>>>>>>> we can reuse same `trigger` API whereas other type of > >>>>>>>>>>> streams/tables > >>>>>>>>>>>> may > >>>>>>>>>>>>>>>> not have `aggregate`, `windowedby` api to make it > consistent. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hao > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang < > >>>>>>> wangg...@gmail.com> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hello Hao, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I'm preferring option 2 over the other options mainly > >>>>> because > >>>>>>> the > >>>>>>>>>>>>>> added > >>>>>>>>>>>>>>>>> config object could potentially be used in other > operators > >>>>> as > >>>>>>>>>>> well > >>>>>>>>>>>>>> (not > >>>>>>>>>>>>>>>>> necessarily has to be a windowed operator and hence have > to > >>>>> be > >>>>>>>>>>>>>>>> piggy-backed > >>>>>>>>>>>>>>>>> on `windowedBy`, and that's also why I suggested not > naming > >>>>> it > >>>>>>>>>>>>>>>>> `WindowConfig` but just `EmitConfig`). > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> As for Matthias' question, I think the difference between > >>>>> the > >>>>>>>>>>>> windowed > >>>>>>>>>>>>>>>>> aggregate operator and the stream-stream join operator is > >>>>>> that, > >>>>>>>>>>> for > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> latter we think emit-final should be the only right > emitting > >>>>>>>>>>> policy > >>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>> hence we should not let users to configure it. If users > >>>>>>> configure > >>>>>>>>>>>> it > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> e.g. emit eager they may get the old spurious emitting > >>>>>> behavior > >>>>>>>>>>>> which > >>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>> violating the semantics. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> For option 2) itself, I have a few more thoughts: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 1. Thinking about Matthias' suggestions, I'm also > leaning a > >>>>>> bit > >>>>>>>>>>>>>>>>> towards adding the new param in the overloaded > `aggregate`, > >>>>>> than > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> overloaded `windowBy` function. The reason is that the > >>>>>> emitting > >>>>>>>>>>>> logic > >>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>> be either window based or non-window based, in the long > run. > >>>>>>>>>>> Though > >>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>> this KIP we could just add it in > >>>>>>>>>>> `XXXWindowedKStream.aggregate()`, > >>>>>>>>>>>> we > >>>>>>>>>>>>>> may > >>>>>>>>>>>>>>>>> want to extend to other non-windowed operators in the > >>>>> future. > >>>>>>>>>>>>>>>>> 2. To be consistent with other control class names, I > feel > >>>>>> maybe > >>>>>>>>>>> we > >>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>> name it "Emitted", not "EmitConfig". > >>>>>>>>>>>>>>>>> 3. Following the first comment, I think we can have the > >>>>> static > >>>>>>>>>>>>>>>> constructor > >>>>>>>>>>>>>>>>> names as "onWindowClose" and "onEachUpdate". > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> The resulted code pattern would be like this: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> stream > >>>>>>>>>>>>>>>>> .groupBy(..) > >>>>>>>>>>>>>>>>> .windowBy(TimeWindow..) > >>>>>>>>>>>>>>>>> .count(Emitted.onWindowClose) > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> WDYT? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax < > >>>>>>>>>>> mj...@apache.org > >>>>>>>>>>>>> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> `allowedLateness` may not be a good name. What I have > in > >>>>>>>>>>> mind > >>>>>>>>>>>> is > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> use > >>>>>>>>>>>>>>>>>>>> this to control how frequently we try to emit final > >>>>>> results. > >>>>>>>>>>>>>> Maybe > >>>>>>>>>>>>>>>>> it's > >>>>>>>>>>>>>>>>>>>> more flexible to be used as config in properties as we > >>>>>> don't > >>>>>>>>>>>>>> need to > >>>>>>>>>>>>>>>>>>>> recompile DSL to change it. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I see; making it a config seems better. Frankly, I am > not > >>>>>> even > >>>>>>>>>>>> sure > >>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>> we need a config at all or if we can just hard code it? > For > >>>>>> the > >>>>>>>>>>>>>>>>>> stream-stream join left/outer join fix, there is only an > >>>>>>>>>>> internal > >>>>>>>>>>>>>>>> config > >>>>>>>>>>>>>>>>>> but no public config either. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Option 1: Your proposal is? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> stream > >>>>>>>>>>>>>>>>>> .groupByKey() > >>>>>>>>>>>>>>>>>> .windowBy(TimeWindow.ofSizeNoGrace(...)) > >>>>>>>>>>>>>>>>>> .configure(EmitConfig.emitFinal() > >>>>>>>>>>>>>>>>>> .count() > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Does not change my argument that it seems to be misplace > >>>>> from > >>>>>>>>>>> an > >>>>>>>>>>>> API > >>>>>>>>>>>>>>>>>> flow POV. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Option 1 seems to be the least desirable to me. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> For option 2 and 3, and not sure which one I like > better. > >>>>>> Might > >>>>>>>>>>>> be > >>>>>>>>>>>>>> good > >>>>>>>>>>>>>>>>>> if other could chime in, too. I think I slightly prefer > >>>>>> option > >>>>>>>>>>> 2 > >>>>>>>>>>>>>> over > >>>>>>>>>>>>>>>>>> option 3. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On 3/15/22 5:33 PM, Hao Li wrote: > >>>>>>>>>>>>>>>>>>> Thanks for the feedback Matthias. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> `allowedLateness` may not be a good name. What I have > in > >>>>>> mind > >>>>>>>>>>>> is > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>> use > >>>>>>>>>>>>>>>>>>> this to control how frequently we try to emit final > >>>>> results. > >>>>>>>>>>>> Maybe > >>>>>>>>>>>>>>>> it's > >>>>>>>>>>>>>>>>>>> more flexible to be used as config in properties as we > >>>>> don't > >>>>>>>>>>>> need > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> recompile DSL to change it. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> For option 1, I intend to use `emitFinal` to configure > how > >>>>>>>>>>>>>>>>>>> `TimeWindowedKStream` should be outputted to `KTable` > >>>>> after > >>>>>>>>>>>>>>>>> aggregation. > >>>>>>>>>>>>>>>>>>> But `emitFinal` is not an action to the > >>>>>> `TimeWindowedKStream` > >>>>>>>>>>>>>>>>> interface. > >>>>>>>>>>>>>>>>>>> Maybe adding `configure(EmitConfig config)` makes more > >>>>>> sense? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> For option 2, config can be created using > >>>>>>>>>>>>>> `WindowConfig.emitFinal()` > >>>>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>> `EmitConfig.emitFinal` > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> For option 3, it will be something like > `TimeWindows(..., > >>>>>>>>>>>>>> EmitConfig > >>>>>>>>>>>>>>>>>>> emitConfig)`. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> For putting `EmitConfig` in aggregation operator, I > think > >>>>> it > >>>>>>>>>>>>>> doesn't > >>>>>>>>>>>>>>>>>>> control how we do aggregation but how we output to > >>>>> `KTable`. > >>>>>>>>>>>>>> That's > >>>>>>>>>>>>>>>>> why I > >>>>>>>>>>>>>>>>>>> feel option 1 makes more sense as it applies to > >>>>>>>>>>>>>>>> `TimeWindowedKStream`. > >>>>>>>>>>>>>>>>>> But > >>>>>>>>>>>>>>>>>>> I'm also OK with option 2. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hao > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax < > >>>>>>>>>>>> mj...@apache.org > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Thanks for the KIP. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> A general comment: it seem that we won't need any new > >>>>>>>>>>>>>>>>> `allowedLateness` > >>>>>>>>>>>>>>>>>>>> parameter because the grace-period is defined on the > >>>>> window > >>>>>>>>>>>>>> itself > >>>>>>>>>>>>>>>>>> already? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> (On the other hand, if I think about it once more, > maybe > >>>>>> the > >>>>>>>>>>>>>>>>>>>> `grace-period` is actually not a property of the > window > >>>>> but > >>>>>>>>>>> a > >>>>>>>>>>>>>>>> property > >>>>>>>>>>>>>>>>>>>> of the aggregation operator? _thinking_) > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> From an API flow point of view, option 1 might > not > >>> be > >>>>>>>>>>>> desirable > >>>>>>>>>>>>>>>>> IMHO: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> stream > >>>>>>>>>>>>>>>>>>>> .groupByKey() > >>>>>>>>>>>>>>>>>>>> .windowBy(TimeWindow.ofSizeNoGrace(...)) > >>>>>>>>>>>>>>>>>>>> .emitFinal() > >>>>>>>>>>>>>>>>>>>> .count() > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> The call to `emitFinal(0` seems not to be on the right > >>>>>> place > >>>>>>>>>>>> for > >>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>> case? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Option 2 might work (I think we need to discuss a few > >>>>>>>>>>> details > >>>>>>>>>>>> of > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> API > >>>>>>>>>>>>>>>>>>>> though): > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> stream > >>>>>>>>>>>>>>>>>>>> .groupByKey() > >>>>>>>>>>>>>>>>>>>> .windowBy( > >>>>>>>>>>>>>>>>>>>> TimeWindow.ofSizeNoGrace(...), > >>>>>>>>>>>>>>>>>>>> EmitConfig.emitFinal() -- just made this > up; > >>>>> it's > >>>>>>>>>>> not > >>>>>>>>>>>> in > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>>>>>> ) > >>>>>>>>>>>>>>>>>>>> .count() > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I made up the `WindowConfig.emitFinal()` call -- from > the > >>>>>>>>>>> KIP > >>>>>>>>>>>>>> it's > >>>>>>>>>>>>>>>>>>>> unclear what API you have in mind? `EmitFinalConfig` > has > >>>>>> not > >>>>>>>>>>>>>> public > >>>>>>>>>>>>>>>>>>>> constructor not any builder method. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> For option 3, I am not sure what you really have in > mind. > >>>>>>>>>>> Can > >>>>>>>>>>>> you > >>>>>>>>>>>>>>>>> given > >>>>>>>>>>>>>>>>>>>> a concrete example (similar to above) how users would > >>>>> write > >>>>>>>>>>>> their > >>>>>>>>>>>>>>>>> code? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Did you consider to actually pass in the `EmitConfig` > >>>>> into > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> aggregation operator? In the end, it seems not to be > >>>>>>>>>>> property > >>>>>>>>>>>> of > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> window definition or windowing step, but a property of > >>>>> the > >>>>>>>>>>>> actual > >>>>>>>>>>>>>>>>>> operator: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> stream > >>>>>>>>>>>>>>>>>>>> .groupByKey() > >>>>>>>>>>>>>>>>>>>> .windowBy( > >>>>>>>>>>>>>>>>>>>> TimeWindow.ofSizeNoGrace(...) > >>>>>>>>>>>>>>>>>>>> ) > >>>>>>>>>>>>>>>>>>>> .count(EmitConfig.emitFinal()) > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> The API surface area that need to be updated might be > >>>>>> larger > >>>>>>>>>>>> for > >>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>> case though... > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On 3/14/22 9:21 PM, Hao Li wrote: > >>>>>>>>>>>>>>>>>>>>> Thanks Guozhang! > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 1. I agree `EmitConfig` is better than `WindowConfig` > >>>>> and > >>>>>>>>>>>>>> option 2 > >>>>>>>>>>>>>>>>>>>> modifies > >>>>>>>>>>>>>>>>>>>>> less places. What do you think of option 1 which > doesn't > >>>>>>>>>>>> change > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> current > >>>>>>>>>>>>>>>>>>>>> `windowedBy` api but configures `EmitConfig` > separately. > >>>>>>>>>>> The > >>>>>>>>>>>>>>>> benefit > >>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>> option 1 is if we need to configure something else > >>>>> later, > >>>>>>>>>>> we > >>>>>>>>>>>>>> don't > >>>>>>>>>>>>>>>>> need > >>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>> pile them on `windowedBy` but can add separate APIs. > >>>>>>>>>>>>>>>>>>>>> 2. I added it to `Stores` mainly to conform to > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>> > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231 > >>>>>>>>>>>>>>>>>>>> . > >>>>>>>>>>>>>>>>>>>>> But We can also create an internal API to do that > >>>>> without > >>>>>>>>>>>>>> modifying > >>>>>>>>>>>>>>>>>>>>> `Stores`. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Hao > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang < > >>>>>>>>>>>>>> wangg...@gmail.com> > >>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Hello Hao, > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Thanks for the proposal, I have some preference > among > >>>>> the > >>>>>>>>>>>>>> options > >>>>>>>>>>>>>>>>> here > >>>>>>>>>>>>>>>>>>>> so I > >>>>>>>>>>>>>>>>>>>>>> will copy them here: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> I'm now thinking if it's better to not add this new > >>>>>> config > >>>>>>>>>>>> on > >>>>>>>>>>>>>> each > >>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> Window interfaces, but instead add that at the > >>>>>>>>>>>>>>>>>> KGroupedStream#windowedBy > >>>>>>>>>>>>>>>>>>>>>> function. Also instead of adding just a boolean > flag, > >>>>>>>>>>> maybe > >>>>>>>>>>>> we > >>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>> add a > >>>>>>>>>>>>>>>>>>>>>> Configured class like Grouped, Suppressed, etc, e.g. > >>>>>> let's > >>>>>>>>>>>> call > >>>>>>>>>>>>>>>> it a > >>>>>>>>>>>>>>>>>>>>>> Emitted which for now would just have a single > >>>>> construct > >>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>> Emitted.atWindowClose whose semantics is the same as > >>>>>>>>>>>> emitFinal > >>>>>>>>>>>>>> == > >>>>>>>>>>>>>>>>>> true. > >>>>>>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>>>> think the benefits are: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 1) you do not need to modify multiple Window > classes, > >>>>> but > >>>>>>>>>>>> just > >>>>>>>>>>>>>>>>>> overload > >>>>>>>>>>>>>>>>>>>> one > >>>>>>>>>>>>>>>>>>>>>> windowedBy function with a second param. This is > less > >>>>> of > >>>>>> a > >>>>>>>>>>>>>> scope > >>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>> now, > >>>>>>>>>>>>>>>>>>>>>> and also more extensible for any future changes. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 2) With a config interface, we maintain its > >>>>> extensibility > >>>>>>>>>>> as > >>>>>>>>>>>>>> well > >>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>> being > >>>>>>>>>>>>>>>>>>>>>> able to reuse this Emitted interface for other > >>>>> operators > >>>>>>>>>>> if > >>>>>>>>>>>> we > >>>>>>>>>>>>>>>>> wanted > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>> expand to. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> ---------------------------- > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> So in general I'm leaning towards option 2). For > that, > >>>>>>>>>>> some > >>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>>> detailed > >>>>>>>>>>>>>>>>>>>>>> comments: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 1) If we want to reuse that config object for other > >>>>>>>>>>>> non-window > >>>>>>>>>>>>>>>>>> stateful > >>>>>>>>>>>>>>>>>>>>>> operations, I think naming it as `EmitConfig` is > >>>>> probably > >>>>>>>>>>>>>> better > >>>>>>>>>>>>>>>>> than > >>>>>>>>>>>>>>>>>>>>>> `WindowConfig`. > >>>>>>>>>>>>>>>>>>>>>> 2) I saw your PR ( > >>>>>>>>>>>> https://github.com/apache/kafka/pull/11892) > >>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>>>> also proposing to add new stores into the public > >>>>> factory > >>>>>>>>>>>>>> Stores, > >>>>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>>>>>> it's > >>>>>>>>>>>>>>>>>>>>>> not included in the KIP. Is that intentional? > >>>>> Personally > >>>>>> I > >>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>> although we may eventually want to add a new store > type > >>>>>> to > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> public > >>>>>>>>>>>>>>>>>>>> APIs, > >>>>>>>>>>>>>>>>>>>>>> for this KIP maybe we do not have to add them but > can > >>>>>>>>>>> delay > >>>>>>>>>>>> for > >>>>>>>>>>>>>>>>> later > >>>>>>>>>>>>>>>>>>>> after > >>>>>>>>>>>>>>>>>>>>>> we've learned the best way to layout. LMK what do > you > >>>>>>>>>>> think? > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 11, 2022 at 2:13 PM Hao Li > >>>>>>>>>>>>>> <h...@confluent.io.invalid> > >>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Hi Dev team, > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion thread on Kafka > Streams > >>>>>>>>>>>>>> KIP-825: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> This KIP is aimed to add new APIs to support > >>>>> outputting > >>>>>>>>>>>> final > >>>>>>>>>>>>>>>>>>>> aggregated > >>>>>>>>>>>>>>>>>>>>>>> results for windowed aggregations. I listed several > >>>>>>>>>>> options > >>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>> I'm > >>>>>>>>>>>>>>>>>>>>>>> looking forward to your feedback. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>>>> Hao > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>> Hao > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> -- > >>>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -- > >>>>>>>>>>> -- Guozhang > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> -- Guozhang > >>>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> Thanks, > >>>>> Hao > >>>>> > >>>> > >>>> > >>> > >> > >> > > > -- Thanks, Hao