Hi all, Thanks for the Kip, Hao!
For what it’s worth, I’m also in favor of your latest framing of the API, I think the name is fine. I assume it’s inspired by Flink? It’s not identical to the concept of a trigger in Flink, which specifies when to evaluate the window, which might be confusing to some people who have deep experience with Flink. Then again, it seems close enough that it should be clear to casual Flink users. For people with no other stream processing experience, it might seem a bit esoteric compared to something self-documenting like ‘emit()’, but the docs should make it clear. One small question: it seems like this proposal is identical to Suppressed.untilWindowClose, and the KIP states that this API is superior. In that case, should we deprecate Suppressed.untilWindowClose? Thanks, John On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote: > Hi Hao, > > For 2), I think it's a good idea in general to use a separate function on > the Time/SessionWindowedKStream itself, to achieve the same effect that, > for now, the emitting control is only for windowed aggregations as in this > KIP, than overloading existing functions. We can discuss further about the > actual function names, whether others like the name `trigger` or not. As > for myself I feel `trigger` is a good one but I'd like to see if others > have opinions as well. > > > Guozhang > > On Mon, Mar 21, 2022 at 5:18 PM Hao Li <h...@confluent.io.invalid> wrote: > >> Hi Guozhang, >> >> Thanks for the feedback. >> >> 1. I agree to have an `Emitted` control class and two static constructors >> named `onWindowClose` and `onEachUpdate`. >> >> 2. For the API function changes, I'm thinking of adding a new function >> called `trigger` to `TimeWindowedKStream` and `SessionWindowedKStream`. It >> takes `Emitted` config and returns the same stream. Example: >> >> stream >> .groupBy(...) >> .windowedBy(...) >> .trigger(Emitted.onWindowClose). // N >> .count() >> >> The benefits are: >> 1. It's simple and avoids creating overloading of existing functions like >> `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add it to >> `aggregate` functions, we need to add it to all existing `count`, >> `aggregate` overloading functions which is a lot. >> 2. It operates directly on windowed kstream and tells how its output >> should be configured, if later we need to add this other type of streams, >> we can reuse same `trigger` API whereas other type of streams/tables may >> not have `aggregate`, `windowedby` api to make it consistent. >> >> Hao >> >> >> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang <wangg...@gmail.com> wrote: >> >> > Hello Hao, >> > >> > I'm preferring option 2 over the other options mainly because the added >> > config object could potentially be used in other operators as well (not >> > necessarily has to be a windowed operator and hence have to be >> piggy-backed >> > on `windowedBy`, and that's also why I suggested not naming it >> > `WindowConfig` but just `EmitConfig`). >> > >> > As for Matthias' question, I think the difference between the windowed >> > aggregate operator and the stream-stream join operator is that, for the >> > latter we think emit-final should be the only right emitting policy and >> > hence we should not let users to configure it. If users configure it to >> > e.g. emit eager they may get the old spurious emitting behavior which is >> > violating the semantics. >> > >> > For option 2) itself, I have a few more thoughts: >> > >> > 1. Thinking about Matthias' suggestions, I'm also leaning a bit >> > towards adding the new param in the overloaded `aggregate`, than the >> > overloaded `windowBy` function. The reason is that the emitting logic >> could >> > be either window based or non-window based, in the long run. Though for >> > this KIP we could just add it in `XXXWindowedKStream.aggregate()`, we may >> > want to extend to other non-windowed operators in the future. >> > 2. To be consistent with other control class names, I feel maybe we can >> > name it "Emitted", not "EmitConfig". >> > 3. Following the first comment, I think we can have the static >> constructor >> > names as "onWindowClose" and "onEachUpdate". >> > >> > The resulted code pattern would be like this: >> > >> > stream >> > .groupBy(..) >> > .windowBy(TimeWindow..) >> > .count(Emitted.onWindowClose) >> > >> > WDYT? >> > >> > >> > On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax <mj...@apache.org> >> wrote: >> > >> > > >> `allowedLateness` may not be a good name. What I have in mind is to >> > use >> > > >> this to control how frequently we try to emit final results. Maybe >> > it's >> > > >> more flexible to be used as config in properties as we don't need to >> > > >> recompile DSL to change it. >> > > >> > > I see; making it a config seems better. Frankly, I am not even sure if >> > > we need a config at all or if we can just hard code it? For the >> > > stream-stream join left/outer join fix, there is only an internal >> config >> > > but no public config either. >> > > >> > > Option 1: Your proposal is? >> > > >> > > stream >> > > .groupByKey() >> > > .windowBy(TimeWindow.ofSizeNoGrace(...)) >> > > .configure(EmitConfig.emitFinal() >> > > .count() >> > > >> > > Does not change my argument that it seems to be misplace from an API >> > > flow POV. >> > > >> > > Option 1 seems to be the least desirable to me. >> > > >> > > For option 2 and 3, and not sure which one I like better. Might be good >> > > if other could chime in, too. I think I slightly prefer option 2 over >> > > option 3. >> > > >> > > >> > > -Matthias >> > > >> > > On 3/15/22 5:33 PM, Hao Li wrote: >> > > > Thanks for the feedback Matthias. >> > > > >> > > > `allowedLateness` may not be a good name. What I have in mind is to >> use >> > > > this to control how frequently we try to emit final results. Maybe >> it's >> > > > more flexible to be used as config in properties as we don't need to >> > > > recompile DSL to change it. >> > > > >> > > > For option 1, I intend to use `emitFinal` to configure how >> > > > `TimeWindowedKStream` should be outputted to `KTable` after >> > aggregation. >> > > > But `emitFinal` is not an action to the `TimeWindowedKStream` >> > interface. >> > > > Maybe adding `configure(EmitConfig config)` makes more sense? >> > > > >> > > > For option 2, config can be created using `WindowConfig.emitFinal()` >> or >> > > > `EmitConfig.emitFinal` >> > > > >> > > > For option 3, it will be something like `TimeWindows(..., EmitConfig >> > > > emitConfig)`. >> > > > >> > > > For putting `EmitConfig` in aggregation operator, I think it doesn't >> > > > control how we do aggregation but how we output to `KTable`. That's >> > why I >> > > > feel option 1 makes more sense as it applies to >> `TimeWindowedKStream`. >> > > But >> > > > I'm also OK with option 2. >> > > > >> > > > Hao >> > > > >> > > > >> > > > >> > > > >> > > > On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax <mj...@apache.org> >> > > wrote: >> > > > >> > > >> Thanks for the KIP. >> > > >> >> > > >> A general comment: it seem that we won't need any new >> > `allowedLateness` >> > > >> parameter because the grace-period is defined on the window itself >> > > already? >> > > >> >> > > >> (On the other hand, if I think about it once more, maybe the >> > > >> `grace-period` is actually not a property of the window but a >> property >> > > >> of the aggregation operator? _thinking_) >> > > >> >> > > >> From an API flow point of view, option 1 might not be desirable >> > IMHO: >> > > >> >> > > >> stream >> > > >> .groupByKey() >> > > >> .windowBy(TimeWindow.ofSizeNoGrace(...)) >> > > >> .emitFinal() >> > > >> .count() >> > > >> >> > > >> The call to `emitFinal(0` seems not to be on the right place for >> this >> > > case? >> > > >> >> > > >> >> > > >> Option 2 might work (I think we need to discuss a few details of the >> > API >> > > >> though): >> > > >> >> > > >> stream >> > > >> .groupByKey() >> > > >> .windowBy( >> > > >> TimeWindow.ofSizeNoGrace(...), >> > > >> EmitConfig.emitFinal() -- just made this up; it's not in the >> > KIP >> > > >> ) >> > > >> .count() >> > > >> >> > > >> I made up the `WindowConfig.emitFinal()` call -- from the KIP it's >> > > >> unclear what API you have in mind? `EmitFinalConfig` has not public >> > > >> constructor not any builder method. >> > > >> >> > > >> >> > > >> For option 3, I am not sure what you really have in mind. Can you >> > given >> > > >> a concrete example (similar to above) how users would write their >> > code? >> > > >> >> > > >> >> > > >> >> > > >> Did you consider to actually pass in the `EmitConfig` into the >> > > >> aggregation operator? In the end, it seems not to be property of the >> > > >> window definition or windowing step, but a property of the actual >> > > operator: >> > > >> >> > > >> stream >> > > >> .groupByKey() >> > > >> .windowBy( >> > > >> TimeWindow.ofSizeNoGrace(...) >> > > >> ) >> > > >> .count(EmitConfig.emitFinal()) >> > > >> >> > > >> The API surface area that need to be updated might be larger for >> this >> > > >> case though... >> > > >> >> > > >> >> > > >> -Matthias >> > > >> >> > > >> >> > > >> >> > > >> On 3/14/22 9:21 PM, Hao Li wrote: >> > > >>> Thanks Guozhang! >> > > >>> >> > > >>> 1. I agree `EmitConfig` is better than `WindowConfig` and option 2 >> > > >> modifies >> > > >>> less places. What do you think of option 1 which doesn't change the >> > > >> current >> > > >>> `windowedBy` api but configures `EmitConfig` separately. The >> benefit >> > of >> > > >>> option 1 is if we need to configure something else later, we don't >> > need >> > > >> to >> > > >>> pile them on `windowedBy` but can add separate APIs. >> > > >>> 2. I added it to `Stores` mainly to conform to >> > > >>> >> > > >> >> > > >> > >> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231 >> > > >> . >> > > >>> But We can also create an internal API to do that without modifying >> > > >>> `Stores`. >> > > >>> >> > > >>> Hao >> > > >>> >> > > >>> On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang <wangg...@gmail.com> >> > > >> wrote: >> > > >>> >> > > >>>> Hello Hao, >> > > >>>> >> > > >>>> Thanks for the proposal, I have some preference among the options >> > here >> > > >> so I >> > > >>>> will copy them here: >> > > >>>> >> > > >>>> I'm now thinking if it's better to not add this new config on each >> > of >> > > >> the >> > > >>>> Window interfaces, but instead add that at the >> > > KGroupedStream#windowedBy >> > > >>>> function. Also instead of adding just a boolean flag, maybe we can >> > > add a >> > > >>>> Configured class like Grouped, Suppressed, etc, e.g. let's call >> it a >> > > >>>> Emitted which for now would just have a single construct as >> > > >>>> Emitted.atWindowClose whose semantics is the same as emitFinal == >> > > true. >> > > >> I >> > > >>>> think the benefits are: >> > > >>>> >> > > >>>> 1) you do not need to modify multiple Window classes, but just >> > > overload >> > > >> one >> > > >>>> windowedBy function with a second param. This is less of a scope >> for >> > > >> now, >> > > >>>> and also more extensible for any future changes. >> > > >>>> >> > > >>>> 2) With a config interface, we maintain its extensibility as well >> as >> > > >> being >> > > >>>> able to reuse this Emitted interface for other operators if we >> > wanted >> > > to >> > > >>>> expand to. >> > > >>>> >> > > >>>> ---------------------------- >> > > >>>> >> > > >>>> So in general I'm leaning towards option 2). For that, some more >> > > >> detailed >> > > >>>> comments: >> > > >>>> >> > > >>>> 1) If we want to reuse that config object for other non-window >> > > stateful >> > > >>>> operations, I think naming it as `EmitConfig` is probably better >> > than >> > > >>>> `WindowConfig`. >> > > >>>> 2) I saw your PR (https://github.com/apache/kafka/pull/11892) >> that >> > > you >> > > >> are >> > > >>>> also proposing to add new stores into the public factory Stores, >> but >> > > >> it's >> > > >>>> not included in the KIP. Is that intentional? Personally I think >> > that >> > > >>>> although we may eventually want to add a new store type to the >> > public >> > > >> APIs, >> > > >>>> for this KIP maybe we do not have to add them but can delay for >> > later >> > > >> after >> > > >>>> we've learned the best way to layout. LMK what do you think? >> > > >>>> >> > > >>>> >> > > >>>> >> > > >>>> Guozhang >> > > >>>> >> > > >>>> >> > > >>>> >> > > >>>> On Fri, Mar 11, 2022 at 2:13 PM Hao Li <h...@confluent.io.invalid> >> > > >> wrote: >> > > >>>> >> > > >>>>> Hi Dev team, >> > > >>>>> >> > > >>>>> I'd like to start a discussion thread on Kafka Streams KIP-825: >> > > >>>>> >> > > >>>>> >> > > >>>> >> > > >> >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced >> > > >>>>> >> > > >>>>> This KIP is aimed to add new APIs to support outputting final >> > > >> aggregated >> > > >>>>> results for windowed aggregations. I listed several options there >> > and >> > > >> I'm >> > > >>>>> looking forward to your feedback. >> > > >>>>> >> > > >>>>> Thanks, >> > > >>>>> Hao >> > > >>>>> >> > > >>>> >> > > >>>> >> > > >>>> -- >> > > >>>> -- Guozhang >> > > >>>> >> > > >>> >> > > >>> >> > > >> >> > > > >> > > > >> > > >> > >> > >> > -- >> > -- Guozhang >> > >> >> >> -- >> Thanks, >> Hao >> > > > -- > -- Guozhang