While regarding where we should ask users to set serdes: I think I'm convinced by Xavier that they should be in the `aggregate` call (but only those does not pass in a state store supplier) instead of the `KStream#cogroup` call to be consistent with other aggregate functions.
BTW another motivation for me to suggest keeping the initializer on the first stream is that by reviewing the PR (some time ago though, so again I might be wrong) we will trigger the initializer only when we received an incoming record from the first stream whose key is not in the state store yet, while for other streams we will just drop it on the floor. If that is actually not the case, that we call initializer on any one of the co-grouped streams' incoming records, then I'm open to set the initializer at the `aggregate` call as well. Guozhang On Wed, Jun 14, 2017 at 2:23 PM, Guozhang Wang <wangg...@gmail.com> wrote: > I'd suggest we do not block this KIP until the serde work has been sorted > out: we cannot estimate yet how long it will take yet. Instead let's say > make an agreement on where we want to specify the serdes: whether on the > first co-group call or on the aggregate call. > > Also about the initializer specification I actually felt that the first > cogrouped stream is special (Kyle please feel free to correct me if I'm > wrong) and that is why I thought it is better to specify the initializer at > the beginning: since from the typing you can see that the final aggregated > value type is defined to be the same as the first co-grouped stream, and > for any intermediate stream to co-group, their value types not be inherited > but the value be "incorporated" into the original stream: > > <T> CogroupedKStream<K, V> cogroup(final KGroupedStream<K, T> > groupedStream, final Aggregator<? super K, ? super T, V> aggregator) > > Note that we do not have a cogroup function that returns > CogroupedKStream<K, T>. > > > Guozhang > > > On Tue, Jun 13, 2017 at 2:31 PM, Bill Bejeck <bbej...@gmail.com> wrote: > >> +1 on deferring discussion on Serdes until API improvements are ironed >> out. >> >> On Tue, Jun 13, 2017 at 2:06 PM, Matthias J. Sax <matth...@confluent.io> >> wrote: >> >> > Hi, >> > >> > I am just catching up on this thread. (1) as most people agree, we >> > should not add anything to KStreamBuilder (btw: we actually plan to move >> > #merge() to KStream and deprecate it on KStreamBuilder as it's a quite >> > unnatural API atm). >> > >> > About specifying Serdes: there is still the idea to improve to overall >> > API from the current "we are adding more overloads"-pattern to a >> > builder-like pattern. This might make the whole discussion void if we do >> > this. Thus, it might make sense to keep this in mind (or even delay this >> > KIP?). It seems a waste of time to discuss all this if we are going to >> > chance it in 2 month anyway... Just saying. >> > >> > >> > -Matthias >> > >> > On 6/13/17 8:05 AM, Michal Borowiecki wrote: >> > > You're right, I haven't thought of that. >> > > >> > > Cheers, >> > > >> > > Michał >> > > >> > > >> > > On 13/06/17 13:00, Kyle Winkelman wrote: >> > >> First, I would prefer not calling it aggregate because there are >> already >> > >> plenty of aggregate methods. >> > >> >> > >> Second, I dont think this would really work because after each >> aggregate >> > >> you now have a unique KTable (someone may want a table with 4 streams >> > and >> > >> reuse those 4 in another table but with one more stream added) and >> > unless >> > >> we completely duplicate everything every time this isnt really >> possible. >> > >> Additionally, the cogroup way just requires 1 more call to create two >> > >> different tables (normal, windowed, and session windowed) this new >> way >> > >> would require copying the aggregate chain. >> > >> >> > >> Another way to think about it is with cogroup we know that when they >> > call >> > >> aggregate they arent going to be adding any more aggregators to that >> > table >> > >> but your way requires us to assume they are done adding aggregators >> > after >> > >> each call so we must return a ktable just to possibly not need to >> have >> > >> created it. >> > >> >> > >> On Jun 13, 2017 5:20 AM, "Michal Borowiecki" < >> > michal.borowie...@openbet.com> >> > >> wrote: >> > >> >> > >>> Actually, just had a thought. It started with naming. >> > >>> >> > >>> Are we actually co-grouping these streams or are we co-aggregating >> > them? >> > >>> >> > >>> After all, in each of the cogroup calls we are providing an >> Aggregator >> > >>> implementation. >> > >>> >> > >>> >> > >>> If they are really co-aggregated, why don't we turn this around: >> > >>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1"). >> > groupByKey(); >> > >>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2"). >> > groupByKey(); >> > >>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3"). >> > groupByKey(); >> > >>> >> > >>> KTable<K, CG> coagg = grouped1.aggregate(initializer1, aggregator1, >> > >>> aggValueSerde1) // this is the unchanged aggregate method >> > >>> .aggregate(grouped2, aggregator2) // this is a new method >> > >>> .aggregate(grouped3, aggregator3); // ditto >> > >>> >> > >>> This means instead of adding cogroup methods on KGroupStream >> interface, >> > >>> adding aggregate method on KTable interface. >> > >>> >> > >>> Is that feasible? >> > >>> >> > >>> Cheers, >> > >>> Michał >> > >>> >> > >>> On 13/06/17 10:56, Michal Borowiecki wrote: >> > >>> >> > >>> Also, I still feel that putting initializer on the first cogroup can >> > >>> mislead users into thinking the first stream is in some way special. >> > >>> Just my 5c. >> > >>> Michał >> > >>> >> > >>> On 13/06/17 09:54, Michal Borowiecki wrote: >> > >>> >> > >>> Agree completely with the argument for serdes belonging in the same >> > place >> > >>> as the state store name, which is in the aggregate method. >> > >>> >> > >>> Cheers, >> > >>> >> > >>> Michał >> > >>> >> > >>> On 12/06/17 18:20, Xavier Léauté wrote: >> > >>> >> > >>> I think we are discussing two separate things here, so it might be >> > worth >> > >>> clarifying: >> > >>> >> > >>> 1) the position of the initializer with respect to the aggregators. >> If >> > I >> > >>> understand correctly, Guozhang seems to think it is more natural to >> > specify >> > >>> the initializer first, despite it not bearing any relation to the >> first >> > >>> aggregator. I can see the argument for specifying the initializer >> > first, >> > >>> but I think it is debatable whether mixing it into the first cogroup >> > call >> > >>> leads to a cleaner API or not. >> > >>> >> > >>> 2) where the serde should be defined (if necessary). Looking at our >> > >>> existing APIs in KGroupedStreams, we always offer two aggregate() >> > >>> methods. The first one takes the name of the store and associated >> > aggregate >> > >>> value serde e.g. KGroupedStream.aggregate(Initializer<VR> >> initializer, >> > >>> Aggregator<? super K, ? super V, VR> aggregator, Serde<VR> >> > aggValueSerde, >> > >>> String queryableStoreName) >> > >>> The second one only takes a state store supplier, and does not >> specify >> > any >> > >>> serde, e.g. KGroupedStream.aggregate(Initializer<VR> >> > >>> initializer, Aggregator<? super K, ? super V, VR> aggregator, final >> > >>> StateStoreSupplier<KeyValueStore> storeSupplier) >> > >>> Presumably, when specifying a state store supplier it shouldn't be >> > >>> necessary to specify an aggregate value serde, since the provided >> > >>> statestore might not need to serialize the values (e.g. it may just >> > keep >> > >>> them as regular objects in heap) or it may have its own >> > >>> internal serialization format. >> > >>> >> > >>> For consistency I think it would be valuable to preserve the same >> two >> > >>> aggregate methods for cogroup as well. Since the serde is only >> > required in >> > >>> one of the two cases, I believe the serde has no place in the first >> > >>> cogroup() call and should only have to be specified as part of the >> > >>> aggregate() method that takes a state store name. In the case of a >> > state >> > >>> store supplier, no serde would be necessary. >> > >>> >> > >>> >> > >>> On Sat, Jun 10, 2017 at 4:09 PM Guozhang Wang <wangg...@gmail.com> >> > wrote: >> > >>> >> > >>>> I'd agree that the aggregate value serde and the initializer does >> not >> > >>>> bear direct relationship with the first `cogroup` calls, but after >> I >> > tried >> > >>>> to write some example code with these two different set of APIs I >> > felt the >> > >>>> current APIs just program more naturally. >> > >>>> >> > >>>> I know it is kinda subjective, but I do think that user experience >> > may be >> > >>>> more important as a deciding factor than the logical argument for >> > public >> > >>>> interfaces. So I'd recommend people to also try out writing some >> > example >> > >>>> lines also and we can circle back and discuss which one feels more >> > natural >> > >>>> to write code. >> > >>>> >> > >>>> >> > >>>> Guozhang >> > >>>> >> > >>>> On Fri, Jun 9, 2017 at 1:59 AM, Michal Borowiecki < >> > >>>> michal.borowie...@openbet.com> wrote: >> > >>>> >> > >>>>> I feel it would make more sense to move the initializer and serde >> to >> > the >> > >>>>> final aggregate statement, since the serde only applies to the >> state >> > >>>>> store, >> > >>>>> and the initializer doesn't bear any relation to the first group >> in >> > >>>>> particular. >> > >>>>> >> > >>>>> +1 for moving initializer and serde from cogroup() to the >> aggregate() >> > >>>>> for the reasons mentioned above. >> > >>>>> >> > >>>>> Cheers, >> > >>>>> >> > >>>>> Michał >> > >>>>> >> > >>>>> On 08/06/17 22:44, Guozhang Wang wrote: >> > >>>>> >> > >>>> Note that although the internal `AbstractStoreSupplier` does >> maintain >> > the >> > >>>>> key-value serdes, we do not enforce the interface of >> > `StateStoreSupplier` >> > >>>>> to always retain that information, and hence we cannot assume that >> > >>>>> StateStoreSuppliers always retain key / value serdes. >> > >>>>> >> > >>>>> On Thu, Jun 8, 2017 at 11:51 AM, Xavier Léauté < >> xav...@confluent.io> >> > <xav...@confluent.io> wrote: >> > >>>>> >> > >>>>> >> > >>>>> Another reason for the serde not to be in the first cogroup call, >> is >> > that >> > >>>>> the serde should not be required if you pass a StateStoreSupplier >> to >> > >>>>> aggregate() >> > >>>>> >> > >>>>> Regarding the aggregated type <T> I don't the why initializer >> should >> > be >> > >>>>> favored over aggregator to define the type. In my mind separating >> the >> > >>>>> initializer into the last aggregate call clearly indicates that >> the >> > >>>>> initializer is independent of any of the aggregators or streams >> and >> > that we >> > >>>>> don't wait for grouped1 events to initialize the co-group. >> > >>>>> >> > >>>>> On Thu, Jun 8, 2017 at 11:14 AM Guozhang Wang <wangg...@gmail.com> >> < >> > wangg...@gmail.com> wrote: >> > >>>>> >> > >>>>> >> > >>>>> On a second thought... This is the current proposal API >> > >>>>> >> > >>>>> >> > >>>>> ``` >> > >>>>> >> > >>>>> <T> CogroupedKStream<K, T> cogroup(final Initializer<T> >> initializer, >> > >>>>> >> > >>>>> final >> > >>>>> >> > >>>>> Aggregator<? super K, ? super V, T> aggregator, final Serde<T> >> > >>>>> aggValueSerde) >> > >>>>> >> > >>>>> ``` >> > >>>>> >> > >>>>> >> > >>>>> If we do not have the initializer in the first co-group it might >> be >> > a bit >> > >>>>> awkward for users to specify the aggregator that returns a typed >> <T> >> > >>>>> >> > >>>>> value? >> > >>>>> >> > >>>>> Maybe it is still better to put these two functions in the same >> api? >> > >>>>> >> > >>>>> >> > >>>>> >> > >>>>> Guozhang >> > >>>>> >> > >>>>> On Thu, Jun 8, 2017 at 11:08 AM, Guozhang Wang < >> wangg...@gmail.com> >> > <wangg...@gmail.com> >> > >>>>> >> > >>>>> wrote: >> > >>>>> >> > >>>>> This suggestion lgtm. I would vote for the first alternative than >> > >>>>> >> > >>>>> adding >> > >>>>> >> > >>>>> it to the `KStreamBuilder` though. >> > >>>>> >> > >>>>> On Thu, Jun 8, 2017 at 10:58 AM, Xavier Léauté < >> xav...@confluent.io> >> > <xav...@confluent.io> >> > >>>>> wrote: >> > >>>>> >> > >>>>> >> > >>>>> I have a minor suggestion to make the API a little bit more >> > symmetric. >> > >>>>> I feel it would make more sense to move the initializer and serde >> to >> > >>>>> >> > >>>>> the >> > >>>>> >> > >>>>> final aggregate statement, since the serde only applies to the >> state >> > >>>>> store, >> > >>>>> and the initializer doesn't bear any relation to the first group >> in >> > >>>>> particular. It would end up looking like this: >> > >>>>> >> > >>>>> KTable<K, CG> cogrouped = >> > >>>>> grouped1.cogroup(aggregator1) >> > >>>>> .cogroup(grouped2, aggregator2) >> > >>>>> .cogroup(grouped3, aggregator3) >> > >>>>> .aggregate(initializer1, aggValueSerde, storeName1); >> > >>>>> >> > >>>>> Alternatively, we could move the the first cogroup() method to >> > >>>>> KStreamBuilder, similar to how we have .merge() >> > >>>>> and end up with an api that would be even more symmetric. >> > >>>>> >> > >>>>> KStreamBuilder.cogroup(grouped1, aggregator1) >> > >>>>> .cogroup(grouped2, aggregator2) >> > >>>>> .cogroup(grouped3, aggregator3) >> > >>>>> .aggregate(initializer1, aggValueSerde, storeName1); >> > >>>>> >> > >>>>> This doesn't have to be a blocker, but I thought it would make the >> > API >> > >>>>> just >> > >>>>> a tad cleaner. >> > >>>>> >> > >>>>> On Tue, Jun 6, 2017 at 3:59 PM Guozhang Wang <wangg...@gmail.com> >> < >> > wangg...@gmail.com> >> > >>>>> >> > >>>>> wrote: >> > >>>>> >> > >>>>> Kyle, >> > >>>>> >> > >>>>> Thanks a lot for the updated KIP. It looks good to me. >> > >>>>> >> > >>>>> >> > >>>>> Guozhang >> > >>>>> >> > >>>>> >> > >>>>> On Fri, Jun 2, 2017 at 5:37 AM, Jim Jagielski <j...@jagunet.com> < >> > j...@jagunet.com> >> > >>>>> >> > >>>>> wrote: >> > >>>>> >> > >>>>> This makes much more sense to me. +1 >> > >>>>> >> > >>>>> >> > >>>>> On Jun 1, 2017, at 10:33 AM, Kyle Winkelman < >> > >>>>> >> > >>>>> winkelman.k...@gmail.com> >> > >>>>> >> > >>>>> wrote: >> > >>>>> >> > >>>>> I have updated the KIP and my PR. Let me know what you think. >> > >>>>> To created a cogrouped stream just call cogroup on a >> > >>>>> >> > >>>>> KgroupedStream >> > >>>>> >> > >>>>> and >> > >>>>> >> > >>>>> supply the initializer, aggValueSerde, and an aggregator. Then >> > >>>>> >> > >>>>> continue >> > >>>>> >> > >>>>> adding kgroupedstreams and aggregators. Then call one of the >> > >>>>> >> > >>>>> many >> > >>>>> >> > >>>>> aggregate >> > >>>>> >> > >>>>> calls to create a KTable. >> > >>>>> >> > >>>>> Thanks, >> > >>>>> Kyle >> > >>>>> >> > >>>>> On Jun 1, 2017 4:03 AM, "Damian Guy" <damian....@gmail.com> < >> > damian....@gmail.com> >> > >>>>> >> > >>>>> wrote: >> > >>>>> >> > >>>>> Hi Kyle, >> > >>>>> >> > >>>>> Thanks for the update. I think just one initializer makes sense >> > >>>>> >> > >>>>> as >> > >>>>> >> > >>>>> it >> > >>>>> >> > >>>>> should only be called once per key and generally it is just >> > >>>>> >> > >>>>> going >> > >>>>> >> > >>>>> to >> > >>>>> >> > >>>>> create >> > >>>>> >> > >>>>> a new instance of whatever the Aggregate class is. >> > >>>>> >> > >>>>> Cheers, >> > >>>>> Damian >> > >>>>> >> > >>>>> On Wed, 31 May 2017 at 20:09 Kyle Winkelman < >> > >>>>> >> > >>>>> winkelman.k...@gmail.com >> > >>>>> >> > >>>>> wrote: >> > >>>>> >> > >>>>> >> > >>>>> Hello all, >> > >>>>> >> > >>>>> I have spent some more time on this and the best alternative I >> > >>>>> >> > >>>>> have >> > >>>>> >> > >>>>> come >> > >>>>> >> > >>>>> up >> > >>>>> >> > >>>>> with is: >> > >>>>> KGroupedStream has a single cogroup call that takes an >> > >>>>> >> > >>>>> initializer >> > >>>>> >> > >>>>> and >> > >>>>> >> > >>>>> an >> > >>>>> >> > >>>>> aggregator. >> > >>>>> CogroupedKStream has a cogroup call that takes additional >> > >>>>> >> > >>>>> groupedStream >> > >>>>> >> > >>>>> aggregator pairs. >> > >>>>> CogroupedKStream has multiple aggregate methods that create >> > >>>>> >> > >>>>> the >> > >>>>> >> > >>>>> different >> > >>>>> >> > >>>>> stores. >> > >>>>> >> > >>>>> I plan on updating the kip but I want people's input on if we >> > >>>>> >> > >>>>> should >> > >>>>> >> > >>>>> have >> > >>>>> >> > >>>>> the initializer be passed in once at the beginning or if we >> > >>>>> >> > >>>>> should >> > >>>>> >> > >>>>> instead >> > >>>>> >> > >>>>> have the initializer be required for each call to one of the >> > >>>>> >> > >>>>> aggregate >> > >>>>> >> > >>>>> calls. The first makes more sense to me but doesnt allow the >> > >>>>> >> > >>>>> user >> > >>>>> >> > >>>>> to >> > >>>>> >> > >>>>> specify different initializers for different tables. >> > >>>>> >> > >>>>> Thanks, >> > >>>>> Kyle >> > >>>>> >> > >>>>> On May 24, 2017 7:46 PM, "Kyle Winkelman" < >> > >>>>> >> > >>>>> winkelman.k...@gmail.com> >> > >>>>> >> > >>>>> wrote: >> > >>>>> >> > >>>>> >> > >>>>> Yea I really like that idea I'll see what I can do to update >> > >>>>> >> > >>>>> the >> > >>>>> >> > >>>>> kip >> > >>>>> >> > >>>>> and >> > >>>>> >> > >>>>> my pr when I have some time. I'm not sure how well creating >> > >>>>> >> > >>>>> the >> > >>>>> >> > >>>>> kstreamaggregates will go though because at that point I will >> > >>>>> >> > >>>>> have >> > >>>>> >> > >>>>> thrown >> > >>>>> >> > >>>>> away the type of the values. It will be type safe I just may >> > >>>>> >> > >>>>> need to >> > >>>>> >> > >>>>> do a >> > >>>>> >> > >>>>> little forcing. >> > >>>>> >> > >>>>> Thanks, >> > >>>>> Kyle >> > >>>>> >> > >>>>> On May 24, 2017 3:28 PM, "Guozhang Wang" <wangg...@gmail.com >> > >>>>> >> > >>>>> wrote: >> > >>>>> >> > >>>>> Kyle, >> > >>>>> >> > >>>>> Thanks for the explanations, my previous read on the wiki >> > >>>>> >> > >>>>> examples >> > >>>>> >> > >>>>> was >> > >>>>> >> > >>>>> wrong. >> > >>>>> >> > >>>>> So I guess my motivation should be "reduced" to: can we move >> > >>>>> >> > >>>>> the >> > >>>>> >> > >>>>> window >> > >>>>> >> > >>>>> specs param from "KGroupedStream#cogroup(..)" to >> > >>>>> "CogroupedKStream#aggregate(..)", and my motivations are: >> > >>>>> >> > >>>>> 1. minor: we can reduce the #.generics in CogroupedKStream >> > >>>>> >> > >>>>> from >> > >>>>> >> > >>>>> 3 >> > >>>>> >> > >>>>> to >> > >>>>> >> > >>>>> 2. >> > >>>>> >> > >>>>> 2. major: this is for extensibility of the APIs, and since >> > >>>>> >> > >>>>> we >> > >>>>> >> > >>>>> are >> > >>>>> >> > >>>>> removing >> > >>>>> >> > >>>>> the "Evolving" annotations on Streams it may be harder to >> > >>>>> >> > >>>>> change it >> > >>>>> >> > >>>>> again >> > >>>>> >> > >>>>> in the future. The extended use cases are that people wanted >> > >>>>> >> > >>>>> to >> > >>>>> >> > >>>>> have >> > >>>>> >> > >>>>> windowed running aggregates on different granularities, e.g. >> > >>>>> >> > >>>>> "give >> > >>>>> >> > >>>>> me >> > >>>>> >> > >>>>> the >> > >>>>> >> > >>>>> counts per-minute, per-hour, per-day and per-week", and >> > >>>>> >> > >>>>> today >> > >>>>> >> > >>>>> in >> > >>>>> >> > >>>>> DSL >> > >>>>> >> > >>>>> we >> > >>>>> >> > >>>>> need to specify that case in multiple aggregate operators, >> > >>>>> >> > >>>>> which >> > >>>>> >> > >>>>> gets >> > >>>>> >> > >>>>> a >> > >>>>> >> > >>>>> state store / changelog, etc. And it is possible to optimize >> > >>>>> >> > >>>>> it >> > >>>>> >> > >>>>> as >> > >>>>> >> > >>>>> well >> > >>>>> >> > >>>>> to >> > >>>>> >> > >>>>> a single state store. Its implementation would be tricky as >> > >>>>> >> > >>>>> you >> > >>>>> >> > >>>>> need >> > >>>>> >> > >>>>> to >> > >>>>> >> > >>>>> contain different lengthed windows within your window store >> > >>>>> >> > >>>>> but >> > >>>>> >> > >>>>> just >> > >>>>> >> > >>>>> from >> > >>>>> >> > >>>>> the public API point of view, it could be specified as: >> > >>>>> >> > >>>>> CogroupedKStream stream = stream1.cogroup(stream2, ... >> > >>>>> "state-store-name"); >> > >>>>> >> > >>>>> table1 = stream.aggregate(/*per-minute window*/) >> > >>>>> table2 = stream.aggregate(/*per-hour window*/) >> > >>>>> table3 = stream.aggregate(/*per-day window*/) >> > >>>>> >> > >>>>> while underlying we are only using a single store >> > >>>>> >> > >>>>> "state-store-name" >> > >>>>> >> > >>>>> for >> > >>>>> >> > >>>>> it. >> > >>>>> >> > >>>>> >> > >>>>> Although this feature is out of the scope of this KIP, I'd >> > >>>>> >> > >>>>> like >> > >>>>> >> > >>>>> to >> > >>>>> >> > >>>>> discuss >> > >>>>> >> > >>>>> if we can "leave the door open" to make such changes without >> > >>>>> >> > >>>>> modifying >> > >>>>> >> > >>>>> the >> > >>>>> >> > >>>>> public APIs . >> > >>>>> >> > >>>>> Guozhang >> > >>>>> >> > >>>>> >> > >>>>> On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman < >> > >>>>> >> > >>>>> winkelman.k...@gmail.com >> > >>>>> >> > >>>>> wrote: >> > >>>>> >> > >>>>> >> > >>>>> I allow defining a single window/sessionwindow one time >> > >>>>> >> > >>>>> when >> > >>>>> >> > >>>>> you >> > >>>>> >> > >>>>> make >> > >>>>> >> > >>>>> the >> > >>>>> >> > >>>>> cogroup call from a KGroupedStream. From then on you are >> > >>>>> >> > >>>>> using >> > >>>>> >> > >>>>> the >> > >>>>> >> > >>>>> cogroup >> > >>>>> >> > >>>>> call from with in CogroupedKStream which doesnt accept any >> > >>>>> >> > >>>>> additional >> > >>>>> >> > >>>>> windows/sessionwindows. >> > >>>>> >> > >>>>> Is this what you meant by your question or did I >> > >>>>> >> > >>>>> misunderstand? >> > >>>>> >> > >>>>> On May 23, 2017 9:33 PM, "Guozhang Wang" < >> > >>>>> >> > >>>>> wangg...@gmail.com >> > >>>>> >> > >>>>> wrote: >> > >>>>> >> > >>>>> Another question that came to me is on "window alignment": >> > >>>>> >> > >>>>> from >> > >>>>> >> > >>>>> the >> > >>>>> >> > >>>>> KIP >> > >>>>> >> > >>>>> it >> > >>>>> >> > >>>>> seems you are allowing users to specify a (potentially >> > >>>>> >> > >>>>> different) >> > >>>>> >> > >>>>> window >> > >>>>> >> > >>>>> spec in each co-grouped input stream. So if these window >> > >>>>> >> > >>>>> specs >> > >>>>> >> > >>>>> are >> > >>>>> >> > >>>>> different how should we "align" them with different input >> > >>>>> >> > >>>>> streams? I >> > >>>>> >> > >>>>> think >> > >>>>> >> > >>>>> it is more natural to only specify on window spec in the >> > >>>>> >> > >>>>> KTable<RK, V> CogroupedKStream#aggregate(Windows); >> > >>>>> >> > >>>>> >> > >>>>> And remove it from the cogroup() functions. WDYT? >> > >>>>> >> > >>>>> >> > >>>>> Guozhang >> > >>>>> >> > >>>>> On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang < >> > >>>>> >> > >>>>> wangg...@gmail.com> >> > >>>>> >> > >>>>> wrote: >> > >>>>> >> > >>>>> Thanks for the proposal Kyle, this is a quite common use >> > >>>>> >> > >>>>> case >> > >>>>> >> > >>>>> to >> > >>>>> >> > >>>>> support >> > >>>>> >> > >>>>> such multi-way table join (i.e. N source tables with N >> > >>>>> >> > >>>>> aggregate >> > >>>>> >> > >>>>> func) >> > >>>>> >> > >>>>> with >> > >>>>> >> > >>>>> a single store and N+1 serdes, I have seen lots of people >> > >>>>> >> > >>>>> using >> > >>>>> >> > >>>>> the >> > >>>>> >> > >>>>> low-level PAPI to achieve this goal. >> > >>>>> >> > >>>>> >> > >>>>> On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman < >> > >>>>> >> > >>>>> winkelman.k...@gmail.com >> > >>>>> >> > >>>>> wrote: >> > >>>>> >> > >>>>> I like your point about not handling other cases such as >> > >>>>> >> > >>>>> count >> > >>>>> >> > >>>>> and >> > >>>>> >> > >>>>> reduce. >> > >>>>> >> > >>>>> I think that reduce may not make sense because reduce >> > >>>>> >> > >>>>> assumes >> > >>>>> >> > >>>>> that >> > >>>>> >> > >>>>> the >> > >>>>> >> > >>>>> input values are the same as the output values. With >> > >>>>> >> > >>>>> cogroup >> > >>>>> >> > >>>>> ... >> > > >> > > -- >> > > Signature >> > > <http://www.openbet.com/> Michal Borowiecki >> > > Senior Software Engineer L4 >> > > T: +44 208 742 1600 >> > > >> > > >> > > +44 203 249 8448 >> > > >> > > >> > > >> > > E: michal.borowie...@openbet.com >> > > W: www.openbet.com <http://www.openbet.com/> >> > > >> > > >> > > OpenBet Ltd >> > > >> > > Chiswick Park Building 9 >> > > >> > > 566 Chiswick High Rd >> > > >> > > London >> > > >> > > W4 5XT >> > > >> > > UK >> > > >> > > >> > > <https://www.openbet.com/email_promo> >> > > >> > > This message is confidential and intended only for the addressee. If >> you >> > > have received this message in error, please immediately notify the >> > > postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it >> > > from your system as well as any copies. The content of e-mails as well >> > > as traffic data may be monitored by OpenBet for employment and >> security >> > > purposes. To protect the environment please do not print this e-mail >> > > unless necessary. OpenBet Ltd. Registered Office: Chiswick Park >> Building >> > > 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company >> > > registered in England and Wales. Registered no. 3134634. VAT no. >> > > GB927523612 >> > > >> > >> > >> > > > > -- > -- Guozhang > -- -- Guozhang