Hi John, Yes. For naming, `trigger` is similar to Flink's trigger, but it has a different meaning in our case. `emit` sounds like an action to emit? How about `emitTrigger`? I'm open to suggestions for the naming.
For deprecating `Suppressed.untilWindowClose`, I agree with Guozhang we can deprecate `Suppressed` config as a whole later. Or we can deprecate `Suppressed.untilWindowClose` in later KIP after implementation of emit final is done. BTW, isn't stream .groupBy(..) .windowBy(..) .aggregate(..) //result in a KTable<Windowed<..>> .mapValues(..) .suppress(Suppressed.untilWindowClose) // since we can trace back to parent node, to find a window definition same as stream .groupBy(..) .windowBy(..) .trigger(Emitted.onWindowClose) .aggregate(..) //result in a KTable<Windowed<..>> .mapValues(..) ? Hao On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang <wangg...@gmail.com> wrote: > I think the following case is only doable via `suppress`: > > stream > .groupBy(..) > .windowBy(..) > .aggregate(..) //result in a KTable<Windowed<..>> > .mapValues(..) > .suppress(Suppressed.untilWindowClose) // since we can trace back to > parent node, to find a window definition > > > Guozhang > > > On Mon, Mar 21, 2022 at 6:36 PM John Roesler <vvcep...@apache.org> wrote: > > > Thanks, Guozhang! > > > > To clarify, I was asking specifically about deprecating just the method > > ‘untilWindowClose’. I might not be thinking clearly about it, though. > What > > does untilWindowClose do that this KIP doesn’t cover? > > > > Thanks, > > John > > > > On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote: > > > Just my 2c: Suppressed is in `suppress` whose application scope is much > > > larger and hence more flexible. I.e. it can be used anywhere for a > > `KTable` > > > (but internally we would check whether certain emit policies like > > > `untilWindowClose` is valid or not), whereas `trigger` as for now is > only > > > applicable in XXWindowedKStream. So I think it would not be completely > > > replacing Suppressed.untilWindowClose. > > > > > > In the future, personally I'd still want to keep one control object > still > > > for all emit policies, and maybe if we have extended Emitted for other > > > emitting policies covered by Suppressed today, we can discuss if we > could > > > have `KTable.suppress(Emitted..)` replacing > > `KTable.suppress(Suppressed..)` > > > as a whole, but for this KIP I think it's too early. > > > > > > > > > Guozhang > > > > > > > > > On Mon, Mar 21, 2022 at 6:18 PM John Roesler <vvcep...@apache.org> > > wrote: > > > > > >> Hi all, > > >> > > >> Thanks for the Kip, Hao! > > >> > > >> For what it’s worth, I’m also in favor of your latest framing of the > > API, > > >> > > >> I think the name is fine. I assume it’s inspired by Flink? It’s not > > >> identical to the concept of a trigger in Flink, which specifies when > to > > >> evaluate the window, which might be confusing to some people who have > > deep > > >> experience with Flink. Then again, it seems close enough that it > should > > be > > >> clear to casual Flink users. For people with no other stream > processing > > >> experience, it might seem a bit esoteric compared to something > > >> self-documenting like ‘emit()’, but the docs should make it clear. > > >> > > >> One small question: it seems like this proposal is identical to > > >> Suppressed.untilWindowClose, and the KIP states that this API is > > superior. > > >> In that case, should we deprecate Suppressed.untilWindowClose? > > >> > > >> Thanks, > > >> John > > >> > > >> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote: > > >> > Hi Hao, > > >> > > > >> > For 2), I think it's a good idea in general to use a separate > > function on > > >> > the Time/SessionWindowedKStream itself, to achieve the same effect > > that, > > >> > for now, the emitting control is only for windowed aggregations as > in > > >> this > > >> > KIP, than overloading existing functions. We can discuss further > about > > >> the > > >> > actual function names, whether others like the name `trigger` or > not. > > As > > >> > for myself I feel `trigger` is a good one but I'd like to see if > > others > > >> > have opinions as well. > > >> > > > >> > > > >> > Guozhang > > >> > > > >> > On Mon, Mar 21, 2022 at 5:18 PM Hao Li <h...@confluent.io.invalid> > > wrote: > > >> > > > >> >> Hi Guozhang, > > >> >> > > >> >> Thanks for the feedback. > > >> >> > > >> >> 1. I agree to have an `Emitted` control class and two static > > >> constructors > > >> >> named `onWindowClose` and `onEachUpdate`. > > >> >> > > >> >> 2. For the API function changes, I'm thinking of adding a new > > function > > >> >> called `trigger` to `TimeWindowedKStream` and > > `SessionWindowedKStream`. > > >> It > > >> >> takes `Emitted` config and returns the same stream. Example: > > >> >> > > >> >> stream > > >> >> .groupBy(...) > > >> >> .windowedBy(...) > > >> >> .trigger(Emitted.onWindowClose). // N > > >> >> .count() > > >> >> > > >> >> The benefits are: > > >> >> 1. It's simple and avoids creating overloading of existing > > functions > > >> like > > >> >> `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add > it > > to > > >> >> `aggregate` functions, we need to add it to all existing `count`, > > >> >> `aggregate` overloading functions which is a lot. > > >> >> 2. It operates directly on windowed kstream and tells how its > > output > > >> >> should be configured, if later we need to add this other type of > > >> streams, > > >> >> we can reuse same `trigger` API whereas other type of > streams/tables > > may > > >> >> not have `aggregate`, `windowedby` api to make it consistent. > > >> >> > > >> >> Hao > > >> >> > > >> >> > > >> >> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang <wangg...@gmail.com> > > >> wrote: > > >> >> > > >> >> > Hello Hao, > > >> >> > > > >> >> > I'm preferring option 2 over the other options mainly because the > > >> added > > >> >> > config object could potentially be used in other operators as > well > > >> (not > > >> >> > necessarily has to be a windowed operator and hence have to be > > >> >> piggy-backed > > >> >> > on `windowedBy`, and that's also why I suggested not naming it > > >> >> > `WindowConfig` but just `EmitConfig`). > > >> >> > > > >> >> > As for Matthias' question, I think the difference between the > > windowed > > >> >> > aggregate operator and the stream-stream join operator is that, > for > > >> the > > >> >> > latter we think emit-final should be the only right emitting > policy > > >> and > > >> >> > hence we should not let users to configure it. If users configure > > it > > >> to > > >> >> > e.g. emit eager they may get the old spurious emitting behavior > > which > > >> is > > >> >> > violating the semantics. > > >> >> > > > >> >> > For option 2) itself, I have a few more thoughts: > > >> >> > > > >> >> > 1. Thinking about Matthias' suggestions, I'm also leaning a bit > > >> >> > towards adding the new param in the overloaded `aggregate`, than > > the > > >> >> > overloaded `windowBy` function. The reason is that the emitting > > logic > > >> >> could > > >> >> > be either window based or non-window based, in the long run. > Though > > >> for > > >> >> > this KIP we could just add it in > `XXXWindowedKStream.aggregate()`, > > we > > >> may > > >> >> > want to extend to other non-windowed operators in the future. > > >> >> > 2. To be consistent with other control class names, I feel maybe > we > > >> can > > >> >> > name it "Emitted", not "EmitConfig". > > >> >> > 3. Following the first comment, I think we can have the static > > >> >> constructor > > >> >> > names as "onWindowClose" and "onEachUpdate". > > >> >> > > > >> >> > The resulted code pattern would be like this: > > >> >> > > > >> >> > stream > > >> >> > .groupBy(..) > > >> >> > .windowBy(TimeWindow..) > > >> >> > .count(Emitted.onWindowClose) > > >> >> > > > >> >> > WDYT? > > >> >> > > > >> >> > > > >> >> > On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax < > mj...@apache.org > > > > > >> >> wrote: > > >> >> > > > >> >> > > >> `allowedLateness` may not be a good name. What I have in > mind > > is > > >> to > > >> >> > use > > >> >> > > >> this to control how frequently we try to emit final results. > > >> Maybe > > >> >> > it's > > >> >> > > >> more flexible to be used as config in properties as we don't > > >> need to > > >> >> > > >> recompile DSL to change it. > > >> >> > > > > >> >> > > I see; making it a config seems better. Frankly, I am not even > > sure > > >> if > > >> >> > > we need a config at all or if we can just hard code it? For the > > >> >> > > stream-stream join left/outer join fix, there is only an > internal > > >> >> config > > >> >> > > but no public config either. > > >> >> > > > > >> >> > > Option 1: Your proposal is? > > >> >> > > > > >> >> > > stream > > >> >> > > .groupByKey() > > >> >> > > .windowBy(TimeWindow.ofSizeNoGrace(...)) > > >> >> > > .configure(EmitConfig.emitFinal() > > >> >> > > .count() > > >> >> > > > > >> >> > > Does not change my argument that it seems to be misplace from > an > > API > > >> >> > > flow POV. > > >> >> > > > > >> >> > > Option 1 seems to be the least desirable to me. > > >> >> > > > > >> >> > > For option 2 and 3, and not sure which one I like better. Might > > be > > >> good > > >> >> > > if other could chime in, too. I think I slightly prefer option > 2 > > >> over > > >> >> > > option 3. > > >> >> > > > > >> >> > > > > >> >> > > -Matthias > > >> >> > > > > >> >> > > On 3/15/22 5:33 PM, Hao Li wrote: > > >> >> > > > Thanks for the feedback Matthias. > > >> >> > > > > > >> >> > > > `allowedLateness` may not be a good name. What I have in mind > > is > > >> to > > >> >> use > > >> >> > > > this to control how frequently we try to emit final results. > > Maybe > > >> >> it's > > >> >> > > > more flexible to be used as config in properties as we don't > > need > > >> to > > >> >> > > > recompile DSL to change it. > > >> >> > > > > > >> >> > > > For option 1, I intend to use `emitFinal` to configure how > > >> >> > > > `TimeWindowedKStream` should be outputted to `KTable` after > > >> >> > aggregation. > > >> >> > > > But `emitFinal` is not an action to the `TimeWindowedKStream` > > >> >> > interface. > > >> >> > > > Maybe adding `configure(EmitConfig config)` makes more sense? > > >> >> > > > > > >> >> > > > For option 2, config can be created using > > >> `WindowConfig.emitFinal()` > > >> >> or > > >> >> > > > `EmitConfig.emitFinal` > > >> >> > > > > > >> >> > > > For option 3, it will be something like `TimeWindows(..., > > >> EmitConfig > > >> >> > > > emitConfig)`. > > >> >> > > > > > >> >> > > > For putting `EmitConfig` in aggregation operator, I think it > > >> doesn't > > >> >> > > > control how we do aggregation but how we output to `KTable`. > > >> That's > > >> >> > why I > > >> >> > > > feel option 1 makes more sense as it applies to > > >> >> `TimeWindowedKStream`. > > >> >> > > But > > >> >> > > > I'm also OK with option 2. > > >> >> > > > > > >> >> > > > Hao > > >> >> > > > > > >> >> > > > > > >> >> > > > > > >> >> > > > > > >> >> > > > On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax < > > mj...@apache.org > > >> > > > >> >> > > wrote: > > >> >> > > > > > >> >> > > >> Thanks for the KIP. > > >> >> > > >> > > >> >> > > >> A general comment: it seem that we won't need any new > > >> >> > `allowedLateness` > > >> >> > > >> parameter because the grace-period is defined on the window > > >> itself > > >> >> > > already? > > >> >> > > >> > > >> >> > > >> (On the other hand, if I think about it once more, maybe the > > >> >> > > >> `grace-period` is actually not a property of the window but > a > > >> >> property > > >> >> > > >> of the aggregation operator? _thinking_) > > >> >> > > >> > > >> >> > > >> From an API flow point of view, option 1 might not be > > desirable > > >> >> > IMHO: > > >> >> > > >> > > >> >> > > >> stream > > >> >> > > >> .groupByKey() > > >> >> > > >> .windowBy(TimeWindow.ofSizeNoGrace(...)) > > >> >> > > >> .emitFinal() > > >> >> > > >> .count() > > >> >> > > >> > > >> >> > > >> The call to `emitFinal(0` seems not to be on the right place > > for > > >> >> this > > >> >> > > case? > > >> >> > > >> > > >> >> > > >> > > >> >> > > >> Option 2 might work (I think we need to discuss a few > details > > of > > >> the > > >> >> > API > > >> >> > > >> though): > > >> >> > > >> > > >> >> > > >> stream > > >> >> > > >> .groupByKey() > > >> >> > > >> .windowBy( > > >> >> > > >> TimeWindow.ofSizeNoGrace(...), > > >> >> > > >> EmitConfig.emitFinal() -- just made this up; it's > not > > in > > >> the > > >> >> > KIP > > >> >> > > >> ) > > >> >> > > >> .count() > > >> >> > > >> > > >> >> > > >> I made up the `WindowConfig.emitFinal()` call -- from the > KIP > > >> it's > > >> >> > > >> unclear what API you have in mind? `EmitFinalConfig` has not > > >> public > > >> >> > > >> constructor not any builder method. > > >> >> > > >> > > >> >> > > >> > > >> >> > > >> For option 3, I am not sure what you really have in mind. > Can > > you > > >> >> > given > > >> >> > > >> a concrete example (similar to above) how users would write > > their > > >> >> > code? > > >> >> > > >> > > >> >> > > >> > > >> >> > > >> > > >> >> > > >> Did you consider to actually pass in the `EmitConfig` into > the > > >> >> > > >> aggregation operator? In the end, it seems not to be > property > > of > > >> the > > >> >> > > >> window definition or windowing step, but a property of the > > actual > > >> >> > > operator: > > >> >> > > >> > > >> >> > > >> stream > > >> >> > > >> .groupByKey() > > >> >> > > >> .windowBy( > > >> >> > > >> TimeWindow.ofSizeNoGrace(...) > > >> >> > > >> ) > > >> >> > > >> .count(EmitConfig.emitFinal()) > > >> >> > > >> > > >> >> > > >> The API surface area that need to be updated might be larger > > for > > >> >> this > > >> >> > > >> case though... > > >> >> > > >> > > >> >> > > >> > > >> >> > > >> -Matthias > > >> >> > > >> > > >> >> > > >> > > >> >> > > >> > > >> >> > > >> On 3/14/22 9:21 PM, Hao Li wrote: > > >> >> > > >>> Thanks Guozhang! > > >> >> > > >>> > > >> >> > > >>> 1. I agree `EmitConfig` is better than `WindowConfig` and > > >> option 2 > > >> >> > > >> modifies > > >> >> > > >>> less places. What do you think of option 1 which doesn't > > change > > >> the > > >> >> > > >> current > > >> >> > > >>> `windowedBy` api but configures `EmitConfig` separately. > The > > >> >> benefit > > >> >> > of > > >> >> > > >>> option 1 is if we need to configure something else later, > we > > >> don't > > >> >> > need > > >> >> > > >> to > > >> >> > > >>> pile them on `windowedBy` but can add separate APIs. > > >> >> > > >>> 2. I added it to `Stores` mainly to conform to > > >> >> > > >>> > > >> >> > > >> > > >> >> > > > > >> >> > > > >> >> > > >> > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231 > > >> >> > > >> . > > >> >> > > >>> But We can also create an internal API to do that without > > >> modifying > > >> >> > > >>> `Stores`. > > >> >> > > >>> > > >> >> > > >>> Hao > > >> >> > > >>> > > >> >> > > >>> On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang < > > >> wangg...@gmail.com> > > >> >> > > >> wrote: > > >> >> > > >>> > > >> >> > > >>>> Hello Hao, > > >> >> > > >>>> > > >> >> > > >>>> Thanks for the proposal, I have some preference among the > > >> options > > >> >> > here > > >> >> > > >> so I > > >> >> > > >>>> will copy them here: > > >> >> > > >>>> > > >> >> > > >>>> I'm now thinking if it's better to not add this new config > > on > > >> each > > >> >> > of > > >> >> > > >> the > > >> >> > > >>>> Window interfaces, but instead add that at the > > >> >> > > KGroupedStream#windowedBy > > >> >> > > >>>> function. Also instead of adding just a boolean flag, > maybe > > we > > >> can > > >> >> > > add a > > >> >> > > >>>> Configured class like Grouped, Suppressed, etc, e.g. let's > > call > > >> >> it a > > >> >> > > >>>> Emitted which for now would just have a single construct > as > > >> >> > > >>>> Emitted.atWindowClose whose semantics is the same as > > emitFinal > > >> == > > >> >> > > true. > > >> >> > > >> I > > >> >> > > >>>> think the benefits are: > > >> >> > > >>>> > > >> >> > > >>>> 1) you do not need to modify multiple Window classes, but > > just > > >> >> > > overload > > >> >> > > >> one > > >> >> > > >>>> windowedBy function with a second param. This is less of a > > >> scope > > >> >> for > > >> >> > > >> now, > > >> >> > > >>>> and also more extensible for any future changes. > > >> >> > > >>>> > > >> >> > > >>>> 2) With a config interface, we maintain its extensibility > as > > >> well > > >> >> as > > >> >> > > >> being > > >> >> > > >>>> able to reuse this Emitted interface for other operators > if > > we > > >> >> > wanted > > >> >> > > to > > >> >> > > >>>> expand to. > > >> >> > > >>>> > > >> >> > > >>>> ---------------------------- > > >> >> > > >>>> > > >> >> > > >>>> So in general I'm leaning towards option 2). For that, > some > > >> more > > >> >> > > >> detailed > > >> >> > > >>>> comments: > > >> >> > > >>>> > > >> >> > > >>>> 1) If we want to reuse that config object for other > > non-window > > >> >> > > stateful > > >> >> > > >>>> operations, I think naming it as `EmitConfig` is probably > > >> better > > >> >> > than > > >> >> > > >>>> `WindowConfig`. > > >> >> > > >>>> 2) I saw your PR ( > > https://github.com/apache/kafka/pull/11892) > > >> >> that > > >> >> > > you > > >> >> > > >> are > > >> >> > > >>>> also proposing to add new stores into the public factory > > >> Stores, > > >> >> but > > >> >> > > >> it's > > >> >> > > >>>> not included in the KIP. Is that intentional? Personally I > > >> think > > >> >> > that > > >> >> > > >>>> although we may eventually want to add a new store type to > > the > > >> >> > public > > >> >> > > >> APIs, > > >> >> > > >>>> for this KIP maybe we do not have to add them but can > delay > > for > > >> >> > later > > >> >> > > >> after > > >> >> > > >>>> we've learned the best way to layout. LMK what do you > think? > > >> >> > > >>>> > > >> >> > > >>>> > > >> >> > > >>>> > > >> >> > > >>>> Guozhang > > >> >> > > >>>> > > >> >> > > >>>> > > >> >> > > >>>> > > >> >> > > >>>> On Fri, Mar 11, 2022 at 2:13 PM Hao Li > > >> <h...@confluent.io.invalid> > > >> >> > > >> wrote: > > >> >> > > >>>> > > >> >> > > >>>>> Hi Dev team, > > >> >> > > >>>>> > > >> >> > > >>>>> I'd like to start a discussion thread on Kafka Streams > > >> KIP-825: > > >> >> > > >>>>> > > >> >> > > >>>>> > > >> >> > > >>>> > > >> >> > > >> > > >> >> > > > > >> >> > > > >> >> > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced > > >> >> > > >>>>> > > >> >> > > >>>>> This KIP is aimed to add new APIs to support outputting > > final > > >> >> > > >> aggregated > > >> >> > > >>>>> results for windowed aggregations. I listed several > options > > >> there > > >> >> > and > > >> >> > > >> I'm > > >> >> > > >>>>> looking forward to your feedback. > > >> >> > > >>>>> > > >> >> > > >>>>> Thanks, > > >> >> > > >>>>> Hao > > >> >> > > >>>>> > > >> >> > > >>>> > > >> >> > > >>>> > > >> >> > > >>>> -- > > >> >> > > >>>> -- Guozhang > > >> >> > > >>>> > > >> >> > > >>> > > >> >> > > >>> > > >> >> > > >> > > >> >> > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > > >> >> > -- > > >> >> > -- Guozhang > > >> >> > > > >> >> > > >> >> > > >> >> -- > > >> >> Thanks, > > >> >> Hao > > >> >> > > >> > > > >> > > > >> > -- > > >> > -- Guozhang > > >> > > > > > > > > > -- > > > -- Guozhang > > > > > -- > -- Guozhang > -- Thanks, Hao