First, I would prefer not calling it aggregate because there are already plenty of aggregate methods.
Second, I dont think this would really work because after each aggregate you now have a unique KTable (someone may want a table with 4 streams and reuse those 4 in another table but with one more stream added) and unless we completely duplicate everything every time this isnt really possible. Additionally, the cogroup way just requires 1 more call to create two different tables (normal, windowed, and session windowed) this new way would require copying the aggregate chain. Another way to think about it is with cogroup we know that when they call aggregate they arent going to be adding any more aggregators to that table but your way requires us to assume they are done adding aggregators after each call so we must return a ktable just to possibly not need to have created it. On Jun 13, 2017 5:20 AM, "Michal Borowiecki" <michal.borowie...@openbet.com> wrote: > Actually, just had a thought. It started with naming. > > Are we actually co-grouping these streams or are we co-aggregating them? > > After all, in each of the cogroup calls we are providing an Aggregator > implementation. > > > If they are really co-aggregated, why don't we turn this around: > KGroupedStream<K, V1> grouped1 = builder.stream("topic1").groupByKey(); > KGroupedStream<K, V2> grouped2 = builder.stream("topic2").groupByKey(); > KGroupedStream<K, V3> grouped3 = builder.stream("topic3").groupByKey(); > > KTable<K, CG> coagg = grouped1.aggregate(initializer1, aggregator1, > aggValueSerde1) // this is the unchanged aggregate method > .aggregate(grouped2, aggregator2) // this is a new method > .aggregate(grouped3, aggregator3); // ditto > > This means instead of adding cogroup methods on KGroupStream interface, > adding aggregate method on KTable interface. > > Is that feasible? > > Cheers, > Michał > > On 13/06/17 10:56, Michal Borowiecki wrote: > > Also, I still feel that putting initializer on the first cogroup can > mislead users into thinking the first stream is in some way special. > Just my 5c. > Michał > > On 13/06/17 09:54, Michal Borowiecki wrote: > > Agree completely with the argument for serdes belonging in the same place > as the state store name, which is in the aggregate method. > > Cheers, > > Michał > > On 12/06/17 18:20, Xavier Léauté wrote: > > I think we are discussing two separate things here, so it might be worth > clarifying: > > 1) the position of the initializer with respect to the aggregators. If I > understand correctly, Guozhang seems to think it is more natural to specify > the initializer first, despite it not bearing any relation to the first > aggregator. I can see the argument for specifying the initializer first, > but I think it is debatable whether mixing it into the first cogroup call > leads to a cleaner API or not. > > 2) where the serde should be defined (if necessary). Looking at our > existing APIs in KGroupedStreams, we always offer two aggregate() > methods. The first one takes the name of the store and associated aggregate > value serde e.g. KGroupedStream.aggregate(Initializer<VR> initializer, > Aggregator<? super K, ? super V, VR> aggregator, Serde<VR> aggValueSerde, > String queryableStoreName) > The second one only takes a state store supplier, and does not specify any > serde, e.g. KGroupedStream.aggregate(Initializer<VR> > initializer, Aggregator<? super K, ? super V, VR> aggregator, final > StateStoreSupplier<KeyValueStore> storeSupplier) > Presumably, when specifying a state store supplier it shouldn't be > necessary to specify an aggregate value serde, since the provided > statestore might not need to serialize the values (e.g. it may just keep > them as regular objects in heap) or it may have its own > internal serialization format. > > For consistency I think it would be valuable to preserve the same two > aggregate methods for cogroup as well. Since the serde is only required in > one of the two cases, I believe the serde has no place in the first > cogroup() call and should only have to be specified as part of the > aggregate() method that takes a state store name. In the case of a state > store supplier, no serde would be necessary. > > > On Sat, Jun 10, 2017 at 4:09 PM Guozhang Wang <wangg...@gmail.com> wrote: > >> I'd agree that the aggregate value serde and the initializer does not >> bear direct relationship with the first `cogroup` calls, but after I tried >> to write some example code with these two different set of APIs I felt the >> current APIs just program more naturally. >> >> I know it is kinda subjective, but I do think that user experience may be >> more important as a deciding factor than the logical argument for public >> interfaces. So I'd recommend people to also try out writing some example >> lines also and we can circle back and discuss which one feels more natural >> to write code. >> >> >> Guozhang >> >> On Fri, Jun 9, 2017 at 1:59 AM, Michal Borowiecki < >> michal.borowie...@openbet.com> wrote: >> >>> I feel it would make more sense to move the initializer and serde to the >>> final aggregate statement, since the serde only applies to the state >>> store, >>> and the initializer doesn't bear any relation to the first group in >>> particular. >>> >>> +1 for moving initializer and serde from cogroup() to the aggregate() >>> for the reasons mentioned above. >>> >>> Cheers, >>> >>> Michał >>> >>> On 08/06/17 22:44, Guozhang Wang wrote: >>> >> Note that although the internal `AbstractStoreSupplier` does maintain the >>> key-value serdes, we do not enforce the interface of `StateStoreSupplier` >>> to always retain that information, and hence we cannot assume that >>> StateStoreSuppliers always retain key / value serdes. >>> >>> On Thu, Jun 8, 2017 at 11:51 AM, Xavier Léauté <xav...@confluent.io> >>> <xav...@confluent.io> wrote: >>> >>> >>> Another reason for the serde not to be in the first cogroup call, is that >>> the serde should not be required if you pass a StateStoreSupplier to >>> aggregate() >>> >>> Regarding the aggregated type <T> I don't the why initializer should be >>> favored over aggregator to define the type. In my mind separating the >>> initializer into the last aggregate call clearly indicates that the >>> initializer is independent of any of the aggregators or streams and that we >>> don't wait for grouped1 events to initialize the co-group. >>> >>> On Thu, Jun 8, 2017 at 11:14 AM Guozhang Wang <wangg...@gmail.com> >>> <wangg...@gmail.com> wrote: >>> >>> >>> On a second thought... This is the current proposal API >>> >>> >>> ``` >>> >>> <T> CogroupedKStream<K, T> cogroup(final Initializer<T> initializer, >>> >>> final >>> >>> Aggregator<? super K, ? super V, T> aggregator, final Serde<T> >>> aggValueSerde) >>> >>> ``` >>> >>> >>> If we do not have the initializer in the first co-group it might be a bit >>> awkward for users to specify the aggregator that returns a typed <T> >>> >>> value? >>> >>> Maybe it is still better to put these two functions in the same api? >>> >>> >>> >>> Guozhang >>> >>> On Thu, Jun 8, 2017 at 11:08 AM, Guozhang Wang <wangg...@gmail.com> >>> <wangg...@gmail.com> >>> >>> wrote: >>> >>> This suggestion lgtm. I would vote for the first alternative than >>> >>> adding >>> >>> it to the `KStreamBuilder` though. >>> >>> On Thu, Jun 8, 2017 at 10:58 AM, Xavier Léauté <xav...@confluent.io> >>> <xav...@confluent.io> >>> wrote: >>> >>> >>> I have a minor suggestion to make the API a little bit more symmetric. >>> I feel it would make more sense to move the initializer and serde to >>> >>> the >>> >>> final aggregate statement, since the serde only applies to the state >>> store, >>> and the initializer doesn't bear any relation to the first group in >>> particular. It would end up looking like this: >>> >>> KTable<K, CG> cogrouped = >>> grouped1.cogroup(aggregator1) >>> .cogroup(grouped2, aggregator2) >>> .cogroup(grouped3, aggregator3) >>> .aggregate(initializer1, aggValueSerde, storeName1); >>> >>> Alternatively, we could move the the first cogroup() method to >>> KStreamBuilder, similar to how we have .merge() >>> and end up with an api that would be even more symmetric. >>> >>> KStreamBuilder.cogroup(grouped1, aggregator1) >>> .cogroup(grouped2, aggregator2) >>> .cogroup(grouped3, aggregator3) >>> .aggregate(initializer1, aggValueSerde, storeName1); >>> >>> This doesn't have to be a blocker, but I thought it would make the API >>> just >>> a tad cleaner. >>> >>> On Tue, Jun 6, 2017 at 3:59 PM Guozhang Wang <wangg...@gmail.com> >>> <wangg...@gmail.com> >>> >>> wrote: >>> >>> Kyle, >>> >>> Thanks a lot for the updated KIP. It looks good to me. >>> >>> >>> Guozhang >>> >>> >>> On Fri, Jun 2, 2017 at 5:37 AM, Jim Jagielski <j...@jagunet.com> >>> <j...@jagunet.com> >>> >>> wrote: >>> >>> This makes much more sense to me. +1 >>> >>> >>> On Jun 1, 2017, at 10:33 AM, Kyle Winkelman < >>> >>> winkelman.k...@gmail.com> >>> >>> wrote: >>> >>> I have updated the KIP and my PR. Let me know what you think. >>> To created a cogrouped stream just call cogroup on a >>> >>> KgroupedStream >>> >>> and >>> >>> supply the initializer, aggValueSerde, and an aggregator. Then >>> >>> continue >>> >>> adding kgroupedstreams and aggregators. Then call one of the >>> >>> many >>> >>> aggregate >>> >>> calls to create a KTable. >>> >>> Thanks, >>> Kyle >>> >>> On Jun 1, 2017 4:03 AM, "Damian Guy" <damian....@gmail.com> >>> <damian....@gmail.com> >>> >>> wrote: >>> >>> Hi Kyle, >>> >>> Thanks for the update. I think just one initializer makes sense >>> >>> as >>> >>> it >>> >>> should only be called once per key and generally it is just >>> >>> going >>> >>> to >>> >>> create >>> >>> a new instance of whatever the Aggregate class is. >>> >>> Cheers, >>> Damian >>> >>> On Wed, 31 May 2017 at 20:09 Kyle Winkelman < >>> >>> winkelman.k...@gmail.com >>> >>> wrote: >>> >>> >>> Hello all, >>> >>> I have spent some more time on this and the best alternative I >>> >>> have >>> >>> come >>> >>> up >>> >>> with is: >>> KGroupedStream has a single cogroup call that takes an >>> >>> initializer >>> >>> and >>> >>> an >>> >>> aggregator. >>> CogroupedKStream has a cogroup call that takes additional >>> >>> groupedStream >>> >>> aggregator pairs. >>> CogroupedKStream has multiple aggregate methods that create >>> >>> the >>> >>> different >>> >>> stores. >>> >>> I plan on updating the kip but I want people's input on if we >>> >>> should >>> >>> have >>> >>> the initializer be passed in once at the beginning or if we >>> >>> should >>> >>> instead >>> >>> have the initializer be required for each call to one of the >>> >>> aggregate >>> >>> calls. The first makes more sense to me but doesnt allow the >>> >>> user >>> >>> to >>> >>> specify different initializers for different tables. >>> >>> Thanks, >>> Kyle >>> >>> On May 24, 2017 7:46 PM, "Kyle Winkelman" < >>> >>> winkelman.k...@gmail.com> >>> >>> wrote: >>> >>> >>> Yea I really like that idea I'll see what I can do to update >>> >>> the >>> >>> kip >>> >>> and >>> >>> my pr when I have some time. I'm not sure how well creating >>> >>> the >>> >>> kstreamaggregates will go though because at that point I will >>> >>> have >>> >>> thrown >>> >>> away the type of the values. It will be type safe I just may >>> >>> need to >>> >>> do a >>> >>> little forcing. >>> >>> Thanks, >>> Kyle >>> >>> On May 24, 2017 3:28 PM, "Guozhang Wang" <wangg...@gmail.com >>> >>> wrote: >>> >>> Kyle, >>> >>> Thanks for the explanations, my previous read on the wiki >>> >>> examples >>> >>> was >>> >>> wrong. >>> >>> So I guess my motivation should be "reduced" to: can we move >>> >>> the >>> >>> window >>> >>> specs param from "KGroupedStream#cogroup(..)" to >>> "CogroupedKStream#aggregate(..)", and my motivations are: >>> >>> 1. minor: we can reduce the #.generics in CogroupedKStream >>> >>> from >>> >>> 3 >>> >>> to >>> >>> 2. >>> >>> 2. major: this is for extensibility of the APIs, and since >>> >>> we >>> >>> are >>> >>> removing >>> >>> the "Evolving" annotations on Streams it may be harder to >>> >>> change it >>> >>> again >>> >>> in the future. The extended use cases are that people wanted >>> >>> to >>> >>> have >>> >>> windowed running aggregates on different granularities, e.g. >>> >>> "give >>> >>> me >>> >>> the >>> >>> counts per-minute, per-hour, per-day and per-week", and >>> >>> today >>> >>> in >>> >>> DSL >>> >>> we >>> >>> need to specify that case in multiple aggregate operators, >>> >>> which >>> >>> gets >>> >>> a >>> >>> state store / changelog, etc. And it is possible to optimize >>> >>> it >>> >>> as >>> >>> well >>> >>> to >>> >>> a single state store. Its implementation would be tricky as >>> >>> you >>> >>> need >>> >>> to >>> >>> contain different lengthed windows within your window store >>> >>> but >>> >>> just >>> >>> from >>> >>> the public API point of view, it could be specified as: >>> >>> CogroupedKStream stream = stream1.cogroup(stream2, ... >>> "state-store-name"); >>> >>> table1 = stream.aggregate(/*per-minute window*/) >>> table2 = stream.aggregate(/*per-hour window*/) >>> table3 = stream.aggregate(/*per-day window*/) >>> >>> while underlying we are only using a single store >>> >>> "state-store-name" >>> >>> for >>> >>> it. >>> >>> >>> Although this feature is out of the scope of this KIP, I'd >>> >>> like >>> >>> to >>> >>> discuss >>> >>> if we can "leave the door open" to make such changes without >>> >>> modifying >>> >>> the >>> >>> public APIs . >>> >>> Guozhang >>> >>> >>> On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman < >>> >>> winkelman.k...@gmail.com >>> >>> wrote: >>> >>> >>> I allow defining a single window/sessionwindow one time >>> >>> when >>> >>> you >>> >>> make >>> >>> the >>> >>> cogroup call from a KGroupedStream. From then on you are >>> >>> using >>> >>> the >>> >>> cogroup >>> >>> call from with in CogroupedKStream which doesnt accept any >>> >>> additional >>> >>> windows/sessionwindows. >>> >>> Is this what you meant by your question or did I >>> >>> misunderstand? >>> >>> On May 23, 2017 9:33 PM, "Guozhang Wang" < >>> >>> wangg...@gmail.com >>> >>> wrote: >>> >>> Another question that came to me is on "window alignment": >>> >>> from >>> >>> the >>> >>> KIP >>> >>> it >>> >>> seems you are allowing users to specify a (potentially >>> >>> different) >>> >>> window >>> >>> spec in each co-grouped input stream. So if these window >>> >>> specs >>> >>> are >>> >>> different how should we "align" them with different input >>> >>> streams? I >>> >>> think >>> >>> it is more natural to only specify on window spec in the >>> >>> KTable<RK, V> CogroupedKStream#aggregate(Windows); >>> >>> >>> And remove it from the cogroup() functions. WDYT? >>> >>> >>> Guozhang >>> >>> On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang < >>> >>> wangg...@gmail.com> >>> >>> wrote: >>> >>> Thanks for the proposal Kyle, this is a quite common use >>> >>> case >>> >>> to >>> >>> support >>> >>> such multi-way table join (i.e. N source tables with N >>> >>> aggregate >>> >>> func) >>> >>> with >>> >>> a single store and N+1 serdes, I have seen lots of people >>> >>> using >>> >>> the >>> >>> low-level PAPI to achieve this goal. >>> >>> >>> On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman < >>> >>> winkelman.k...@gmail.com >>> >>> wrote: >>> >>> I like your point about not handling other cases such as >>> >>> count >>> >>> and >>> >>> reduce. >>> >>> I think that reduce may not make sense because reduce >>> >>> assumes >>> >>> that >>> >>> the >>> >>> input values are the same as the output values. With >>> >>> cogroup >>> >>> ...