Hi all, I can see both sides of this.
On one hand, when we say "stream.groupBy().windowBy().count()", it seems like we're telling KS to take the raw stream, group it based on key, then window it based on time, and then compute an aggregation on the windows. In that model, "trigger()" would have to mean something like "trigger it", which doesn't really make sense, since we aren't "triggering" the aggregation (then again, to an outside observer, it would appear that way... food for thought). Another way to look at it is that all we're really doing is configuring a windowed aggreation on the stream, and we're doing it with a progressive builder interface. In other words, the above is just a progressive builder for configuring an operation like "stream.aggregate(groupingConfig, windowingConfig, countFn)". Under the latter interpretation of the DSL, it makes perfect sense to add more optional progressive builder methods like trigger() to the WindowedKStream interfaces. Since part of the motivation for choosing the word "trigger" here is to stay close to what Flink defines, I'll also point out that Flink's syntax is also "stream.keyBy().window().trigger().aggregate()". Not that their API is the holy grail or anything, but it's at least an indication that this API isn't a horrible mistake. All other things being equal, I also prefer to leave tie- breakers in the hands of the contributor. So, if we've all said our piece and Hao still prefers option 1, then (as long as we don't think it's a horrible mistake), I think we should just let him go for it. Speaking of which, after reviewing the responses regarding deprecating `Suppressed#onWindowClose`, I still think we should just go ahead and deprecate it. Although it's not expressed exactly the same way, it still does exactly the same thing, or so close that it seems confusing to keep both. But again, if Hao really prefers to keep both, I won't insist on it :) Thanks all, -John On Wed, 2022-03-23 at 09:59 -0700, Hao Li wrote: > Thanks Bruno! > > Argument for option 1 is: > 1. Concise and descriptive. It avoids overloading existing functions and > it's very clear what it's doing. Imagine if there's a autocomplete feature > in Intellij or other IDE for our DSL in the future, it's not favorable to > show 6 `windowedBy` functions. > 2. Option 1 is operated on `windowedStream` to configure how it should be > outputted. Option 2 operates on `KGroupedStream` to produce > `windowedStream` as well as configure how `windowedStream` should be > outputted. I feel it's better to have a `windowedStream` and then > configure how it can be outputted. Somehow I feel option 2 breaks the > builder pattern. > 3. `WindowedByParameters` doesn't seem very descriptive. If we put all > kinds of different parameters into it to avoid future overloading, it's too > bloated and not very user friendly. > > I agree option 1's `trigger` function is configuring the stream which feels > different from existing `count` or `aggregate` etc. Configuring might be > also a kind of action to stream :) I'm not sure if it breaks DSL principle > and if it does, > can we relax the principle given the benefits compared to option 2)? Maybe > John can chime in as the DSL grammar author. > > Thanks, > Hao > > On Wed, Mar 23, 2022 at 2:59 AM Bruno Cadonna <cado...@apache.org> wrote: > > > Hi Hao, > > > > I agree with Guozhang: Great summary! Thank you! > > > > Regarding "aligned with other config class names", there is this DSL > > grammar John once specified > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar > > and we have already used it in the code. I found the grammar quite useful. > > > > I am undecided if option 1 is really worth it. What are actually the > > arguments in favor of it? Is it only that we do not need to overload > > other methods? This does not seem worth to break DSL principles. An > > alternative proposal would be to go with option 2 and conform with the > > grammar above: > > > > <W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W> > > windows, WindowedByParameters parameters); > > > > TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows, > > WindowedByParameters parameters); > > > > SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows, > > WindowedByParameters parameters); > > > > This is similar to option 2 in the KIP, but it ensures that we put all > > future needed configs in the parameters object and we do not need to > > overload the methods anymore. > > > > Then if we also get KAFKA-10298 done, we could even collapse all > > `windowedBy()` methods into one. > > > > Best, > > Bruno > > > > On 22.03.22 22:31, Guozhang Wang wrote: > > > Thanks for the great summary Hao. I'm still learning towards option 2) > > > here, and I'm in favor of `trigger` as function name, and `Triggered` as > > > config class name (mainly to be aligned with other config class names). > > > Also want to see other's preferences between the options, as well as the > > > namings. > > > > > > > > > Guozhang > > > > > > > > > > > > On Tue, Mar 22, 2022 at 12:23 PM Hao Li <h...@confluent.io.invalid> > > wrote: > > > > > > > `windowedStream.onWindowClose()` was the original option 1 > > > > (`windowedStream.emitFinal()`) but was rejected > > > > because we could add more emit types and this will result in adding more > > > > functions. I still prefer the > > > > "windowedStream.someFunc(Controlled.onWindowClose)" > > > > model since it's flexible and clear that it's configuring the emit > > policy. > > > > Let me summarize all the naming options we have and compare: > > > > > > > > *API function name:* > > > > > > > > *1. `windowedStream.trigger()`* > > > > Pros: > > > > i. Simple > > > > ii. Similar to Flink's trigger function (is this a con > > actually?) > > > > Cons: > > > > i. `trigger()` can be confused with Flink trigger (raised by > > John) > > > > ii. `trigger()` feels like an operation instead of a configure > > > > function (raised by Bruno)? > > > > > > > > *2. `windowedStream.emitTrigger()`* > > > > Pros: > > > > i. Avoid confusion from Flink's trigger API > > > > ii. `emitTrigger` feels like configuring the trigger because > > > > "trigger" here is a noun instead of verbose in `trigger()` > > > > Cons: > > > > i: Verbose? > > > > ii: Not consistent with `Suppressed.untilWindowClose`? > > > > > > > > > > > > *Config class/object name:* > > > > > > > > 1. *`Emitted.onWindowClose()`* and *`Emitted.onEachUpdate()`* > > > > Cons: > > > > i. Doesn't go along with `trigger` (raised by Bruno) > > > > > > > > 2. *`Triggered.onWindowClose()`* and *`Triggered.onEachUpdate()`* > > > > > > > > 3. *`EmitTrigger.onWindowClose()`* and *`EmitTrigger.onEachUpdate()`* > > > > > > > > 4. *`(Emit|Trigger)(Config|Policy).onWindowClose()`* and > > > > *`(Emit|Trigger)(Config|Policy).onEachUpdate()`* > > > > This is a combination of different names like: `EmitConfig`, > > > > `EmitPolicy`, `TriggerConfig` and `TriggerPolicy`... > > > > > > > > > > > > If we are settled with option 1), we can add new options to these names > > and > > > > comment on their Pros and Cons. > > > > > > > > Hao > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 22, 2022 at 10:48 AM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > > > I see what you mean now, and I think it's a fair point that composing > > > > > `trigger` and `emitted` seems awkward. > > > > > > > > > > Re: data process operator v.s. control operator, I shared your concern > > as > > > > > well, and here's my train of thoughts: Having only data process > > operators > > > > > was my primary motivation for how we add the suppress operator --- it > > > > > indeed "suppresses" data. But as a hind-sight it's disadvantage is > > that, > > > > > for example in Suppressed.onWindowClose() should be only related to an > > > > > earlier windowedBy operator which is possibly very far from it in the > > > > > resulting DSL code. It's not only a bit awkward for users to write > > > > > such > > > > > code, but also in such cases the DSL builder needs to maintain and > > > > > propagate this information to the suppress operator further down. So > > > > > we > > > > are > > > > > now thinking about "putting the control object as close as to where > > > > > the > > > > > related processor really happens". And in that world my original > > > > > preference was somewhere in option 2), i.e. just put the control as a > > > > param > > > > > of the related "windowedBy" operator, but the trade-off is we keep > > adding > > > > > overloaded functions to these operators. So after some back and forth > > > > > thoughts I'm learning towards relaxing our principles to only have > > > > > processing operators but no flow-control operators. That being said, > > > > > if > > > > you > > > > > have any ideas that we can have both world's benefits I'm all ears. > > > > > > > > > > Re: using a direct function like "windowedStream.onWindowClose()" v.s. > > > > > "windowedStream.someFunc(Controlled.onWindowClose)", again my > > motivation > > > > > for the latter is for extensibility without adding more functions in > > the > > > > > future. If people feel this is not worthy we can do the first option > > > > > as > > > > > well. If we just feel the `trigger` and `emitted` does not feel > > > > composible > > > > > together, maybe we can consider something like > > > > > `windowedStream.trigger(Triggered.onWindowClose())"? > > > > > > > > > > Re: windowedBy v.s. windowBy, yeah I do not really have a good reason > > why > > > > > we should use past term as well :P But if it's not bothering people > > much > > > > > I'd say we just keep it than deprecate/rename new APIs. > > > > > > > > > > > > > > > On Tue, Mar 22, 2022 at 9:42 AM Bruno Cadonna <cado...@apache.org> > > > > wrote: > > > > > > > > > > > Hi Guozhang, > > > > > > > > > > > > There is no semantic difference. It is a cosmetic difference. > > > > > > Conceptually, I relate `Emitted` with the aggregation and not with > > > > > > `trigger()` in the API flow, because the aggregation emits the > > > > > > result > > > > > > not `trigger()`. Therefore, I proposed to not use `Emitted` as the > > name > > > > > > of the config object passed to `trigger()`. > > > > > > > > > > > > > > > > > > Best, > > > > > > Bruno > > > > > > > > > > > > On 22.03.22 17:24, Guozhang Wang wrote: > > > > > > > Hi Bruno, > > > > > > > > > > > > > > Could you elaborate a bit more here, what's the semantic > > > > > > > difference > > > > > > between > > > > > > > "the aggregation is triggered on window close and all aggregation > > > > > results > > > > > > > are emitted." for trigger(TriggerParameters.onWindowClose()), and > > > > "the > > > > > > > aggregation is configured to only emit final results." for > > > > > > > trigger(Emitted.onWindowClose())? > > > > > > > > > > > > > > On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna <cado...@apache.org> > > > > > > wrote: > > > > > > > > > > > > > > > Hi Hao, > > > > > > > > > > > > > > > > Thank you for the KIP! > > > > > > > > > > > > > > > > Regarding option 1, I would not use `Emitted.onWindowClose()` > > > > > > > > since > > > > > that > > > > > > > > does not seem compatible with the proposed flow. Conceptually, > > > > > > > > now > > > > the > > > > > > > > flow states that the aggregation is triggered on window close > > > > > > > > and > > > > all > > > > > > > > aggregation results are emitted. `Emitted` suggests that the > > > > > aggregation > > > > > > > > is configured to only emit final results. > > > > > > > > > > > > > > > > Thus, I propose the following: > > > > > > > > > > > > > > > > stream > > > > > > > > .groupBy(..) > > > > > > > > .windowedBy(..) > > > > > > > > .trigger(TriggerParameters.onWindowClose()) > > > > > > > > .aggregate(..) //result in a KTable<Windowed<..>> > > > > > > > > .mapValues(..) > > > > > > > > > > > > > > > > An alternative to `trigger()` could be `schedule()`, but I do > > > > > > > > not > > > > > really > > > > > > > > like it. > > > > > > > > > > > > > > > > One thing I noticed with option 1 is that all other methods in > > > > > > > > the > > > > > > > > example above are operations on data. `groupBy()` groups, > > > > > `windowedBy()` > > > > > > > > partitions, `aggregate()` computes the aggregate, `mapValues()` > > > > > > > > maps > > > > > > > > values, even `suppress()` suppresses intermediate results. But > > > > > > > > what > > > > > does > > > > > > > > `trigger()` do? `trigger()` seems a config lost among > > > > > > > > operations. > > > > > > > > > > > > > > > > However, if we do not want to restrict ourselves to only use > > > > > > > > methods > > > > > > > > when we want to specify operations on data, I have the following > > > > > > proposal: > > > > > > > > > > > > > > > > stream > > > > > > > > .groupBy(..) > > > > > > > > .windowedBy(..) > > > > > > > > .onWindowClose() > > > > > > > > .aggregate(..) //result in a KTable<Windowed<..>> > > > > > > > > .mapValues(..) > > > > > > > > > > > > > > > > Best, > > > > > > > > Bruno > > > > > > > > > > > > > > > > P.S.: Why is it `windowedBy()` and not `windowBy()`? All other > > > > > > > > operations also use present tense. > > > > > > > > > > > > > > > > On 22.03.22 06:36, Hao Li wrote: > > > > > > > > > Hi John, > > > > > > > > > > > > > > > > > > Yes. For naming, `trigger` is similar to Flink's trigger, but > > > > > > > > > it > > > > has > > > > > a > > > > > > > > > different meaning in our case. `emit` sounds like an action to > > > > emit? > > > > > > How > > > > > > > > > about `emitTrigger`? I'm open to suggestions for the naming. > > > > > > > > > > > > > > > > > > For deprecating `Suppressed.untilWindowClose`, I agree with > > > > Guozhang > > > > > we > > > > > > > > can > > > > > > > > > deprecate `Suppressed` config as a whole later. Or we can > > > > > > > > > deprecate > > > > > > > > > `Suppressed.untilWindowClose` in later KIP after > > > > > > > > > implementation of > > > > > emit > > > > > > > > > final is done. > > > > > > > > > > > > > > > > > > BTW, isn't > > > > > > > > > > > > > > > > > > stream > > > > > > > > > .groupBy(..) > > > > > > > > > .windowBy(..) > > > > > > > > > .aggregate(..) //result in a KTable<Windowed<..>> > > > > > > > > > .mapValues(..) > > > > > > > > > .suppress(Suppressed.untilWindowClose) // since we can > > > > > > > > > trace > > > > back > > > > > > to > > > > > > > > > parent node, to find a window definition > > > > > > > > > > > > > > > > > > same as > > > > > > > > > > > > > > > > > > stream > > > > > > > > > .groupBy(..) > > > > > > > > > .windowBy(..) > > > > > > > > > .trigger(Emitted.onWindowClose) > > > > > > > > > .aggregate(..) //result in a KTable<Windowed<..>> > > > > > > > > > .mapValues(..) > > > > > > > > > ? > > > > > > > > > > > > > > > > > > Hao > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang > > > > > > > > > <wangg...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > I think the following case is only doable via `suppress`: > > > > > > > > > > > > > > > > > > > > stream > > > > > > > > > > .groupBy(..) > > > > > > > > > > .windowBy(..) > > > > > > > > > > .aggregate(..) //result in a KTable<Windowed<..>> > > > > > > > > > > .mapValues(..) > > > > > > > > > > .suppress(Suppressed.untilWindowClose) // since we can > > > > > > > > > > trace > > > > > back > > > > > > to > > > > > > > > > > parent node, to find a window definition > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Mar 21, 2022 at 6:36 PM John Roesler > > > > > > > > > > <vvcep...@apache.org > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Thanks, Guozhang! > > > > > > > > > > > > > > > > > > > > > > To clarify, I was asking specifically about deprecating > > > > > > > > > > > just the > > > > > > method > > > > > > > > > > > ‘untilWindowClose’. I might not be thinking clearly about > > > > > > > > > > > it, > > > > > though. > > > > > > > > > > What > > > > > > > > > > > does untilWindowClose do that this KIP doesn’t cover? > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > John > > > > > > > > > > > > > > > > > > > > > > On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote: > > > > > > > > > > > > Just my 2c: Suppressed is in `suppress` whose > > > > > > > > > > > > application scope > > > > is > > > > > > > > much > > > > > > > > > > > > larger and hence more flexible. I.e. it can be used > > > > > > > > > > > > anywhere > > > > for a > > > > > > > > > > > `KTable` > > > > > > > > > > > > (but internally we would check whether certain emit > > > > > > > > > > > > policies > > > > like > > > > > > > > > > > > `untilWindowClose` is valid or not), whereas `trigger` > > > > > > > > > > > > as for > > > > now > > > > > is > > > > > > > > > > only > > > > > > > > > > > > applicable in XXWindowedKStream. So I think it would > > > > > > > > > > > > not be > > > > > > completely > > > > > > > > > > > > replacing Suppressed.untilWindowClose. > > > > > > > > > > > > > > > > > > > > > > > > In the future, personally I'd still want to keep one > > > > > > > > > > > > control > > > > > object > > > > > > > > > > still > > > > > > > > > > > > for all emit policies, and maybe if we have extended > > > > > > > > > > > > Emitted for > > > > > > other > > > > > > > > > > > > emitting policies covered by Suppressed today, we can > > > > > > > > > > > > discuss if > > > > > we > > > > > > > > > > could > > > > > > > > > > > > have `KTable.suppress(Emitted..)` replacing > > > > > > > > > > > `KTable.suppress(Suppressed..)` > > > > > > > > > > > > as a whole, but for this KIP I think it's too early. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Mar 21, 2022 at 6:18 PM John Roesler < > > > > vvcep...@apache.org > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the Kip, Hao! > > > > > > > > > > > > > > > > > > > > > > > > > > For what it’s worth, I’m also in favor of your latest > > > > > > > > > > > > > framing > > > > of > > > > > > the > > > > > > > > > > > API, > > > > > > > > > > > > > > > > > > > > > > > > > > I think the name is fine. I assume it’s inspired by > > > > > > > > > > > > > Flink? It’s > > > > > not > > > > > > > > > > > > > identical to the concept of a trigger in Flink, which > > > > > > > > > > > > > specifies > > > > > > when > > > > > > > > > > to > > > > > > > > > > > > > evaluate the window, which might be confusing to some > > > > > > > > > > > > > people > > > > who > > > > > > have > > > > > > > > > > > deep > > > > > > > > > > > > > experience with Flink. Then again, it seems close > > > > > > > > > > > > > enough that > > > > it > > > > > > > > > > should > > > > > > > > > > > be > > > > > > > > > > > > > clear to casual Flink users. For people with no other > > > > > > > > > > > > > stream > > > > > > > > > > processing > > > > > > > > > > > > > experience, it might seem a bit esoteric compared to > > > > > > > > > > > > > something > > > > > > > > > > > > > self-documenting like ‘emit()’, but the docs should > > > > > > > > > > > > > make it > > > > > clear. > > > > > > > > > > > > > > > > > > > > > > > > > > One small question: it seems like this proposal is > > > > > > > > > > > > > identical to > > > > > > > > > > > > > Suppressed.untilWindowClose, and the KIP states that > > > > > > > > > > > > > this API > > > > is > > > > > > > > > > > superior. > > > > > > > > > > > > > In that case, should we deprecate > > > > > > > > > > > > > Suppressed.untilWindowClose? > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > John > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote: > > > > > > > > > > > > > > Hi Hao, > > > > > > > > > > > > > > > > > > > > > > > > > > > > For 2), I think it's a good idea in general to use > > > > > > > > > > > > > > a separate > > > > > > > > > > > function on > > > > > > > > > > > > > > the Time/SessionWindowedKStream itself, to achieve > > > > > > > > > > > > > > the same > > > > > effect > > > > > > > > > > > that, > > > > > > > > > > > > > > for now, the emitting control is only for windowed > > > > aggregations > > > > > as > > > > > > > > > > in > > > > > > > > > > > > > this > > > > > > > > > > > > > > KIP, than overloading existing functions. We can > > > > > > > > > > > > > > discuss > > > > further > > > > > > > > > > about > > > > > > > > > > > > > the > > > > > > > > > > > > > > actual function names, whether others like the name > > > > > > > > > > > > > > `trigger` > > > > or > > > > > > > > > > not. > > > > > > > > > > > As > > > > > > > > > > > > > > for myself I feel `trigger` is a good one but I'd > > > > > > > > > > > > > > like to see > > > > if > > > > > > > > > > > others > > > > > > > > > > > > > > have opinions as well. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Mar 21, 2022 at 5:18 PM Hao Li > > > > <h...@confluent.io.invalid > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Guozhang, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the feedback. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. I agree to have an `Emitted` control class and > > > > > > > > > > > > > > > two static > > > > > > > > > > > > > constructors > > > > > > > > > > > > > > > named `onWindowClose` and `onEachUpdate`. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. For the API function changes, I'm thinking of > > > > > > > > > > > > > > > adding a new > > > > > > > > > > > function > > > > > > > > > > > > > > > called `trigger` to `TimeWindowedKStream` and > > > > > > > > > > > `SessionWindowedKStream`. > > > > > > > > > > > > > It > > > > > > > > > > > > > > > takes `Emitted` config and returns the same > > > > > > > > > > > > > > > stream. Example: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > stream > > > > > > > > > > > > > > > .groupBy(...) > > > > > > > > > > > > > > > .windowedBy(...) > > > > > > > > > > > > > > > .trigger(Emitted.onWindowClose). // N > > > > > > > > > > > > > > > .count() > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The benefits are: > > > > > > > > > > > > > > > 1. It's simple and avoids creating > > > > > > > > > > > > > > > overloading of > > > > existing > > > > > > > > > > > functions > > > > > > > > > > > > > like > > > > > > > > > > > > > > > `windowedBy` or `count`, `reduce` or `aggregate`. > > > > > > > > > > > > > > > In fact, to > > > > > add > > > > > > > > > > it > > > > > > > > > > > to > > > > > > > > > > > > > > > `aggregate` functions, we need to add it to all > > > > > > > > > > > > > > > existing > > > > > `count`, > > > > > > > > > > > > > > > `aggregate` overloading functions which is a lot. > > > > > > > > > > > > > > > 2. It operates directly on windowed kstream > > > > > > > > > > > > > > > and tells > > how > > > > > its > > > > > > > > > > > output > > > > > > > > > > > > > > > should be configured, if later we need to add > > > > > > > > > > > > > > > this other type > > > > > of > > > > > > > > > > > > > streams, > > > > > > > > > > > > > > > we can reuse same `trigger` API whereas other > > > > > > > > > > > > > > > type of > > > > > > > > > > streams/tables > > > > > > > > > > > may > > > > > > > > > > > > > > > not have `aggregate`, `windowedby` api to make it > > > > > > > > > > > > > > > consistent. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hao > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang < > > > > > > wangg...@gmail.com> > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hello Hao, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm preferring option 2 over the other options > > > > > > > > > > > > > > > > mainly > > > > because > > > > > > the > > > > > > > > > > > > > added > > > > > > > > > > > > > > > > config object could potentially be used in > > > > > > > > > > > > > > > > other operators > > > > as > > > > > > > > > > well > > > > > > > > > > > > > (not > > > > > > > > > > > > > > > > necessarily has to be a windowed operator and > > > > > > > > > > > > > > > > hence have to > > > > be > > > > > > > > > > > > > > > piggy-backed > > > > > > > > > > > > > > > > on `windowedBy`, and that's also why I > > > > > > > > > > > > > > > > suggested not naming > > > > it > > > > > > > > > > > > > > > > `WindowConfig` but just `EmitConfig`). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > As for Matthias' question, I think the > > > > > > > > > > > > > > > > difference between > > > > the > > > > > > > > > > > windowed > > > > > > > > > > > > > > > > aggregate operator and the stream-stream join > > > > > > > > > > > > > > > > operator is > > > > > that, > > > > > > > > > > for > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > latter we think emit-final should be the only > > > > > > > > > > > > > > > > right emitting > > > > > > > > > > policy > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > hence we should not let users to configure it. > > > > > > > > > > > > > > > > If users > > > > > > configure > > > > > > > > > > > it > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > e.g. emit eager they may get the old spurious > > > > > > > > > > > > > > > > emitting > > > > > behavior > > > > > > > > > > > which > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > violating the semantics. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For option 2) itself, I have a few more > > > > > > > > > > > > > > > > thoughts: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. Thinking about Matthias' suggestions, I'm > > > > > > > > > > > > > > > > also leaning a > > > > > bit > > > > > > > > > > > > > > > > towards adding the new param in the overloaded > > > > > > > > > > > > > > > > `aggregate`, > > > > > than > > > > > > > > > > > the > > > > > > > > > > > > > > > > overloaded `windowBy` function. The reason is > > > > > > > > > > > > > > > > that the > > > > > emitting > > > > > > > > > > > logic > > > > > > > > > > > > > > > could > > > > > > > > > > > > > > > > be either window based or non-window based, in > > > > > > > > > > > > > > > > the long run. > > > > > > > > > > Though > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > this KIP we could just add it in > > > > > > > > > > `XXXWindowedKStream.aggregate()`, > > > > > > > > > > > we > > > > > > > > > > > > > may > > > > > > > > > > > > > > > > want to extend to other non-windowed operators > > > > > > > > > > > > > > > > in the > > > > future. > > > > > > > > > > > > > > > > 2. To be consistent with other control class > > > > > > > > > > > > > > > > names, I feel > > > > > maybe > > > > > > > > > > we > > > > > > > > > > > > > can > > > > > > > > > > > > > > > > name it "Emitted", not "EmitConfig". > > > > > > > > > > > > > > > > 3. Following the first comment, I think we can > > > > > > > > > > > > > > > > have the > > > > static > > > > > > > > > > > > > > > constructor > > > > > > > > > > > > > > > > names as "onWindowClose" and "onEachUpdate". > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The resulted code pattern would be like this: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > stream > > > > > > > > > > > > > > > > .groupBy(..) > > > > > > > > > > > > > > > > .windowBy(TimeWindow..) > > > > > > > > > > > > > > > > .count(Emitted.onWindowClose) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > WDYT? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 16, 2022 at 12:07 PM Matthias J. > > > > > > > > > > > > > > > > Sax < > > > > > > > > > > mj...@apache.org > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > `allowedLateness` may not be a good name. > > > > > > > > > > > > > > > > > > > What I have in > > > > > > > > > > mind > > > > > > > > > > > is > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > use > > > > > > > > > > > > > > > > > > > this to control how frequently we try to > > > > > > > > > > > > > > > > > > > emit final > > > > > results. > > > > > > > > > > > > > Maybe > > > > > > > > > > > > > > > > it's > > > > > > > > > > > > > > > > > > > more flexible to be used as config in > > > > > > > > > > > > > > > > > > > properties as we > > > > > don't > > > > > > > > > > > > > need to > > > > > > > > > > > > > > > > > > > recompile DSL to change it. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I see; making it a config seems better. > > > > > > > > > > > > > > > > > Frankly, I am not > > > > > even > > > > > > > > > > > sure > > > > > > > > > > > > > if > > > > > > > > > > > > > > > > > we need a config at all or if we can just > > > > > > > > > > > > > > > > > hard code it? For > > > > > the > > > > > > > > > > > > > > > > > stream-stream join left/outer join fix, there > > > > > > > > > > > > > > > > > is only an > > > > > > > > > > internal > > > > > > > > > > > > > > > config > > > > > > > > > > > > > > > > > but no public config either. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Option 1: Your proposal is? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > stream > > > > > > > > > > > > > > > > > .groupByKey() > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > .windowBy(TimeWindow.ofSizeNoGrace(...)) > > > > > > > > > > > > > > > > > .configure(EmitConfig.emitFinal() > > > > > > > > > > > > > > > > > .count() > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Does not change my argument that it seems to > > > > > > > > > > > > > > > > > be misplace > > > > from > > > > > > > > > > an > > > > > > > > > > > API > > > > > > > > > > > > > > > > > flow POV. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Option 1 seems to be the least desirable to > > > > > > > > > > > > > > > > > me. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For option 2 and 3, and not sure which one I > > > > > > > > > > > > > > > > > like better. > > > > > Might > > > > > > > > > > > be > > > > > > > > > > > > > good > > > > > > > > > > > > > > > > > if other could chime in, too. I think I > > > > > > > > > > > > > > > > > slightly prefer > > > > > option > > > > > > > > > > 2 > > > > > > > > > > > > > over > > > > > > > > > > > > > > > > > option 3. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 3/15/22 5:33 PM, Hao Li wrote: > > > > > > > > > > > > > > > > > > Thanks for the feedback Matthias. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > `allowedLateness` may not be a good name. > > > > > > > > > > > > > > > > > > What I have in > > > > > mind > > > > > > > > > > > is > > > > > > > > > > > > > to > > > > > > > > > > > > > > > use > > > > > > > > > > > > > > > > > > this to control how frequently we try to > > > > > > > > > > > > > > > > > > emit final > > > > results. > > > > > > > > > > > Maybe > > > > > > > > > > > > > > > it's > > > > > > > > > > > > > > > > > > more flexible to be used as config in > > > > > > > > > > > > > > > > > > properties as we > > > > don't > > > > > > > > > > > need > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > recompile DSL to change it. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For option 1, I intend to use `emitFinal` > > > > > > > > > > > > > > > > > > to configure how > > > > > > > > > > > > > > > > > > `TimeWindowedKStream` should be outputted > > > > > > > > > > > > > > > > > > to `KTable` > > > > after > > > > > > > > > > > > > > > > aggregation. > > > > > > > > > > > > > > > > > > But `emitFinal` is not an action to the > > > > > `TimeWindowedKStream` > > > > > > > > > > > > > > > > interface. > > > > > > > > > > > > > > > > > > Maybe adding `configure(EmitConfig config)` > > > > > > > > > > > > > > > > > > makes more > > > > > sense? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For option 2, config can be created using > > > > > > > > > > > > > `WindowConfig.emitFinal()` > > > > > > > > > > > > > > > or > > > > > > > > > > > > > > > > > > `EmitConfig.emitFinal` > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For option 3, it will be something like > > > > > > > > > > > > > > > > > > `TimeWindows(..., > > > > > > > > > > > > > EmitConfig > > > > > > > > > > > > > > > > > > emitConfig)`. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For putting `EmitConfig` in aggregation > > > > > > > > > > > > > > > > > > operator, I think > > > > it > > > > > > > > > > > > > doesn't > > > > > > > > > > > > > > > > > > control how we do aggregation but how we > > > > > > > > > > > > > > > > > > output to > > > > `KTable`. > > > > > > > > > > > > > That's > > > > > > > > > > > > > > > > why I > > > > > > > > > > > > > > > > > > feel option 1 makes more sense as it > > > > > > > > > > > > > > > > > > applies to > > > > > > > > > > > > > > > `TimeWindowedKStream`. > > > > > > > > > > > > > > > > > But > > > > > > > > > > > > > > > > > > I'm also OK with option 2. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hao > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 15, 2022 at 4:48 PM Matthias J. > > > > > > > > > > > > > > > > > > Sax < > > > > > > > > > > > mj...@apache.org > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > A general comment: it seem that we won't > > > > > > > > > > > > > > > > > > > need any new > > > > > > > > > > > > > > > > `allowedLateness` > > > > > > > > > > > > > > > > > > > parameter because the grace-period is > > > > > > > > > > > > > > > > > > > defined on the > > > > window > > > > > > > > > > > > > itself > > > > > > > > > > > > > > > > > already? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (On the other hand, if I think about it > > > > > > > > > > > > > > > > > > > once more, maybe > > > > > the > > > > > > > > > > > > > > > > > > > `grace-period` is actually not a property > > > > > > > > > > > > > > > > > > > of the window > > > > but > > > > > > > > > > a > > > > > > > > > > > > > > > property > > > > > > > > > > > > > > > > > > > of the aggregation operator? _thinking_) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > From an API flow point of view, > > > > > > > > > > > > > > > > > > > option 1 might not > > be > > > > > > > > > > > desirable > > > > > > > > > > > > > > > > IMHO: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > stream > > > > > > > > > > > > > > > > > > > .groupByKey() > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > .windowBy(TimeWindow.ofSizeNoGrace(...)) > > > > > > > > > > > > > > > > > > > .emitFinal() > > > > > > > > > > > > > > > > > > > .count() > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The call to `emitFinal(0` seems not to be > > > > > > > > > > > > > > > > > > > on the right > > > > > place > > > > > > > > > > > for > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > case? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Option 2 might work (I think we need to > > > > > > > > > > > > > > > > > > > discuss a few > > > > > > > > > > details > > > > > > > > > > > of > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > API > > > > > > > > > > > > > > > > > > > though): > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > stream > > > > > > > > > > > > > > > > > > > .groupByKey() > > > > > > > > > > > > > > > > > > > .windowBy( > > > > > > > > > > > > > > > > > > > TimeWindow.ofSizeNoGrace(...), > > > > > > > > > > > > > > > > > > > EmitConfig.emitFinal() -- just > > > > > > > > > > > > > > > > > > > made this up; > > > > it's > > > > > > > > > > not > > > > > > > > > > > in > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > KIP > > > > > > > > > > > > > > > > > > > ) > > > > > > > > > > > > > > > > > > > .count() > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I made up the `WindowConfig.emitFinal()` > > > > > > > > > > > > > > > > > > > call -- from the > > > > > > > > > > KIP > > > > > > > > > > > > > it's > > > > > > > > > > > > > > > > > > > unclear what API you have in mind? > > > > > > > > > > > > > > > > > > > `EmitFinalConfig` has > > > > > not > > > > > > > > > > > > > public > > > > > > > > > > > > > > > > > > > constructor not any builder method. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For option 3, I am not sure what you > > > > > > > > > > > > > > > > > > > really have in mind. > > > > > > > > > > Can > > > > > > > > > > > you > > > > > > > > > > > > > > > > given > > > > > > > > > > > > > > > > > > > a concrete example (similar to above) how > > > > > > > > > > > > > > > > > > > users would > > > > write > > > > > > > > > > > their > > > > > > > > > > > > > > > > code? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Did you consider to actually pass in the > > > > > > > > > > > > > > > > > > > `EmitConfig` > > > > into > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > aggregation operator? In the end, it > > > > > > > > > > > > > > > > > > > seems not to be > > > > > > > > > > property > > > > > > > > > > > of > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > window definition or windowing step, but > > > > > > > > > > > > > > > > > > > a property of > > > > the > > > > > > > > > > > actual > > > > > > > > > > > > > > > > > operator: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > stream > > > > > > > > > > > > > > > > > > > .groupByKey() > > > > > > > > > > > > > > > > > > > .windowBy( > > > > > > > > > > > > > > > > > > > TimeWindow.ofSizeNoGrace(...) > > > > > > > > > > > > > > > > > > > ) > > > > > > > > > > > > > > > > > > > .count(EmitConfig.emitFinal()) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The API surface area that need to be > > > > > > > > > > > > > > > > > > > updated might be > > > > > larger > > > > > > > > > > > for > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > > > case though... > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 3/14/22 9:21 PM, Hao Li wrote: > > > > > > > > > > > > > > > > > > > > Thanks Guozhang! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. I agree `EmitConfig` is better than > > > > > > > > > > > > > > > > > > > > `WindowConfig` > > > > and > > > > > > > > > > > > > option 2 > > > > > > > > > > > > > > > > > > > modifies > > > > > > > > > > > > > > > > > > > > less places. What do you think of > > > > > > > > > > > > > > > > > > > > option 1 which doesn't > > > > > > > > > > > change > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > current > > > > > > > > > > > > > > > > > > > > `windowedBy` api but configures > > > > > > > > > > > > > > > > > > > > `EmitConfig` separately. > > > > > > > > > > The > > > > > > > > > > > > > > > benefit > > > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > > > > > option 1 is if we need to configure > > > > > > > > > > > > > > > > > > > > something else > > > > later, > > > > > > > > > > we > > > > > > > > > > > > > don't > > > > > > > > > > > > > > > > need > > > > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > pile them on `windowedBy` but can add > > > > > > > > > > > > > > > > > > > > separate APIs. > > > > > > > > > > > > > > > > > > > > 2. I added it to `Stores` mainly to > > > > > > > > > > > > > > > > > > > > conform to > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231 > > > > > > > > > > > > > > > > > > > . > > > > > > > > > > > > > > > > > > > > But We can also create an internal API > > > > > > > > > > > > > > > > > > > > to do that > > > > without > > > > > > > > > > > > > modifying > > > > > > > > > > > > > > > > > > > > `Stores`. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hao > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Mar 14, 2022 at 7:52 PM > > > > > > > > > > > > > > > > > > > > Guozhang Wang < > > > > > > > > > > > > > wangg...@gmail.com> > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hello Hao, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the proposal, I have some > > > > > > > > > > > > > > > > > > > > > preference among > > > > the > > > > > > > > > > > > > options > > > > > > > > > > > > > > > > here > > > > > > > > > > > > > > > > > > > so I > > > > > > > > > > > > > > > > > > > > > will copy them here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm now thinking if it's better to > > > > > > > > > > > > > > > > > > > > > not add this new > > > > > config > > > > > > > > > > > on > > > > > > > > > > > > > each > > > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > Window interfaces, but instead add > > > > > > > > > > > > > > > > > > > > > that at the > > > > > > > > > > > > > > > > > KGroupedStream#windowedBy > > > > > > > > > > > > > > > > > > > > > function. Also instead of adding just > > > > > > > > > > > > > > > > > > > > > a boolean flag, > > > > > > > > > > maybe > > > > > > > > > > > we > > > > > > > > > > > > > can > > > > > > > > > > > > > > > > > add a > > > > > > > > > > > > > > > > > > > > > Configured class like Grouped, > > > > > > > > > > > > > > > > > > > > > Suppressed, etc, e.g. > > > > > let's > > > > > > > > > > > call > > > > > > > > > > > > > > > it a > > > > > > > > > > > > > > > > > > > > > Emitted which for now would just have > > > > > > > > > > > > > > > > > > > > > a single > > > > construct > > > > > > > > > > as > > > > > > > > > > > > > > > > > > > > > Emitted.atWindowClose whose semantics > > > > > > > > > > > > > > > > > > > > > is the same as > > > > > > > > > > > emitFinal > > > > > > > > > > > > > == > > > > > > > > > > > > > > > > > true. > > > > > > > > > > > > > > > > > > > I > > > > > > > > > > > > > > > > > > > > > think the benefits are: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1) you do not need to modify multiple > > > > > > > > > > > > > > > > > > > > > Window classes, > > > > but > > > > > > > > > > > just > > > > > > > > > > > > > > > > > overload > > > > > > > > > > > > > > > > > > > one > > > > > > > > > > > > > > > > > > > > > windowedBy function with a second > > > > > > > > > > > > > > > > > > > > > param. This is less > > > > of > > > > > a > > > > > > > > > > > > > scope > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > > now, > > > > > > > > > > > > > > > > > > > > > and also more extensible for any > > > > > > > > > > > > > > > > > > > > > future changes. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2) With a config interface, we > > > > > > > > > > > > > > > > > > > > > maintain its > > > > extensibility > > > > > > > > > > as > > > > > > > > > > > > > well > > > > > > > > > > > > > > > as > > > > > > > > > > > > > > > > > > > being > > > > > > > > > > > > > > > > > > > > > able to reuse this Emitted interface > > > > > > > > > > > > > > > > > > > > > for other > > > > operators > > > > > > > > > > if > > > > > > > > > > > we > > > > > > > > > > > > > > > > wanted > > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > > expand to. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > ---------------------------- > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > So in general I'm leaning towards > > > > > > > > > > > > > > > > > > > > > option 2). For that, > > > > > > > > > > some > > > > > > > > > > > > > more > > > > > > > > > > > > > > > > > > > detailed > > > > > > > > > > > > > > > > > > > > > comments: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1) If we want to reuse that config > > > > > > > > > > > > > > > > > > > > > object for other > > > > > > > > > > > non-window > > > > > > > > > > > > > > > > > stateful > > > > > > > > > > > > > > > > > > > > > operations, I think naming it as > > > > > > > > > > > > > > > > > > > > > `EmitConfig` is > > > > probably > > > > > > > > > > > > > better > > > > > > > > > > > > > > > > than > > > > > > > > > > > > > > > > > > > > > `WindowConfig`. > > > > > > > > > > > > > > > > > > > > > 2) I saw your PR ( > > > > > > > > > > > https://github.com/apache/kafka/pull/11892) > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > you > > > > > > > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > > > > > > also proposing to add new stores into > > > > > > > > > > > > > > > > > > > > > the public > > > > factory > > > > > > > > > > > > > Stores, > > > > > > > > > > > > > > > but > > > > > > > > > > > > > > > > > > > it's > > > > > > > > > > > > > > > > > > > > > not included in the KIP. Is that > > > > > > > > > > > > > > > > > > > > > intentional? > > > > Personally > > > > > I > > > > > > > > > > > > > think > > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > > > > > although we may eventually want to > > > > > > > > > > > > > > > > > > > > > add a new store type > > > > > to > > > > > > > > > > > the > > > > > > > > > > > > > > > > public > > > > > > > > > > > > > > > > > > > APIs, > > > > > > > > > > > > > > > > > > > > > for this KIP maybe we do not have to > > > > > > > > > > > > > > > > > > > > > add them but can > > > > > > > > > > delay > > > > > > > > > > > for > > > > > > > > > > > > > > > > later > > > > > > > > > > > > > > > > > > > after > > > > > > > > > > > > > > > > > > > > > we've learned the best way to layout. > > > > > > > > > > > > > > > > > > > > > LMK what do you > > > > > > > > > > think? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Mar 11, 2022 at 2:13 PM Hao Li > > > > > > > > > > > > > <h...@confluent.io.invalid> > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Dev team, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'd like to start a discussion > > > > > > > > > > > > > > > > > > > > > > thread on Kafka Streams > > > > > > > > > > > > > KIP-825: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This KIP is aimed to add new APIs > > > > > > > > > > > > > > > > > > > > > > to support > > > > outputting > > > > > > > > > > > final > > > > > > > > > > > > > > > > > > > aggregated > > > > > > > > > > > > > > > > > > > > > > results for windowed aggregations. > > > > > > > > > > > > > > > > > > > > > > I listed several > > > > > > > > > > options > > > > > > > > > > > > > there > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > I'm > > > > > > > > > > > > > > > > > > > > > > looking forward to your feedback. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > Hao > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > Hao > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > -- > > > > Thanks, > > > > Hao > > > > > > > > > > > > > >