+1 on deferring discussion on Serdes until API improvements are ironed out.
On Tue, Jun 13, 2017 at 2:06 PM, Matthias J. Sax <matth...@confluent.io> wrote: > Hi, > > I am just catching up on this thread. (1) as most people agree, we > should not add anything to KStreamBuilder (btw: we actually plan to move > #merge() to KStream and deprecate it on KStreamBuilder as it's a quite > unnatural API atm). > > About specifying Serdes: there is still the idea to improve to overall > API from the current "we are adding more overloads"-pattern to a > builder-like pattern. This might make the whole discussion void if we do > this. Thus, it might make sense to keep this in mind (or even delay this > KIP?). It seems a waste of time to discuss all this if we are going to > chance it in 2 month anyway... Just saying. > > > -Matthias > > On 6/13/17 8:05 AM, Michal Borowiecki wrote: > > You're right, I haven't thought of that. > > > > Cheers, > > > > Michał > > > > > > On 13/06/17 13:00, Kyle Winkelman wrote: > >> First, I would prefer not calling it aggregate because there are already > >> plenty of aggregate methods. > >> > >> Second, I dont think this would really work because after each aggregate > >> you now have a unique KTable (someone may want a table with 4 streams > and > >> reuse those 4 in another table but with one more stream added) and > unless > >> we completely duplicate everything every time this isnt really possible. > >> Additionally, the cogroup way just requires 1 more call to create two > >> different tables (normal, windowed, and session windowed) this new way > >> would require copying the aggregate chain. > >> > >> Another way to think about it is with cogroup we know that when they > call > >> aggregate they arent going to be adding any more aggregators to that > table > >> but your way requires us to assume they are done adding aggregators > after > >> each call so we must return a ktable just to possibly not need to have > >> created it. > >> > >> On Jun 13, 2017 5:20 AM, "Michal Borowiecki" < > michal.borowie...@openbet.com> > >> wrote: > >> > >>> Actually, just had a thought. It started with naming. > >>> > >>> Are we actually co-grouping these streams or are we co-aggregating > them? > >>> > >>> After all, in each of the cogroup calls we are providing an Aggregator > >>> implementation. > >>> > >>> > >>> If they are really co-aggregated, why don't we turn this around: > >>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1"). > groupByKey(); > >>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2"). > groupByKey(); > >>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3"). > groupByKey(); > >>> > >>> KTable<K, CG> coagg = grouped1.aggregate(initializer1, aggregator1, > >>> aggValueSerde1) // this is the unchanged aggregate method > >>> .aggregate(grouped2, aggregator2) // this is a new method > >>> .aggregate(grouped3, aggregator3); // ditto > >>> > >>> This means instead of adding cogroup methods on KGroupStream interface, > >>> adding aggregate method on KTable interface. > >>> > >>> Is that feasible? > >>> > >>> Cheers, > >>> Michał > >>> > >>> On 13/06/17 10:56, Michal Borowiecki wrote: > >>> > >>> Also, I still feel that putting initializer on the first cogroup can > >>> mislead users into thinking the first stream is in some way special. > >>> Just my 5c. > >>> Michał > >>> > >>> On 13/06/17 09:54, Michal Borowiecki wrote: > >>> > >>> Agree completely with the argument for serdes belonging in the same > place > >>> as the state store name, which is in the aggregate method. > >>> > >>> Cheers, > >>> > >>> Michał > >>> > >>> On 12/06/17 18:20, Xavier Léauté wrote: > >>> > >>> I think we are discussing two separate things here, so it might be > worth > >>> clarifying: > >>> > >>> 1) the position of the initializer with respect to the aggregators. If > I > >>> understand correctly, Guozhang seems to think it is more natural to > specify > >>> the initializer first, despite it not bearing any relation to the first > >>> aggregator. I can see the argument for specifying the initializer > first, > >>> but I think it is debatable whether mixing it into the first cogroup > call > >>> leads to a cleaner API or not. > >>> > >>> 2) where the serde should be defined (if necessary). Looking at our > >>> existing APIs in KGroupedStreams, we always offer two aggregate() > >>> methods. The first one takes the name of the store and associated > aggregate > >>> value serde e.g. KGroupedStream.aggregate(Initializer<VR> initializer, > >>> Aggregator<? super K, ? super V, VR> aggregator, Serde<VR> > aggValueSerde, > >>> String queryableStoreName) > >>> The second one only takes a state store supplier, and does not specify > any > >>> serde, e.g. KGroupedStream.aggregate(Initializer<VR> > >>> initializer, Aggregator<? super K, ? super V, VR> aggregator, final > >>> StateStoreSupplier<KeyValueStore> storeSupplier) > >>> Presumably, when specifying a state store supplier it shouldn't be > >>> necessary to specify an aggregate value serde, since the provided > >>> statestore might not need to serialize the values (e.g. it may just > keep > >>> them as regular objects in heap) or it may have its own > >>> internal serialization format. > >>> > >>> For consistency I think it would be valuable to preserve the same two > >>> aggregate methods for cogroup as well. Since the serde is only > required in > >>> one of the two cases, I believe the serde has no place in the first > >>> cogroup() call and should only have to be specified as part of the > >>> aggregate() method that takes a state store name. In the case of a > state > >>> store supplier, no serde would be necessary. > >>> > >>> > >>> On Sat, Jun 10, 2017 at 4:09 PM Guozhang Wang <wangg...@gmail.com> > wrote: > >>> > >>>> I'd agree that the aggregate value serde and the initializer does not > >>>> bear direct relationship with the first `cogroup` calls, but after I > tried > >>>> to write some example code with these two different set of APIs I > felt the > >>>> current APIs just program more naturally. > >>>> > >>>> I know it is kinda subjective, but I do think that user experience > may be > >>>> more important as a deciding factor than the logical argument for > public > >>>> interfaces. So I'd recommend people to also try out writing some > example > >>>> lines also and we can circle back and discuss which one feels more > natural > >>>> to write code. > >>>> > >>>> > >>>> Guozhang > >>>> > >>>> On Fri, Jun 9, 2017 at 1:59 AM, Michal Borowiecki < > >>>> michal.borowie...@openbet.com> wrote: > >>>> > >>>>> I feel it would make more sense to move the initializer and serde to > the > >>>>> final aggregate statement, since the serde only applies to the state > >>>>> store, > >>>>> and the initializer doesn't bear any relation to the first group in > >>>>> particular. > >>>>> > >>>>> +1 for moving initializer and serde from cogroup() to the aggregate() > >>>>> for the reasons mentioned above. > >>>>> > >>>>> Cheers, > >>>>> > >>>>> Michał > >>>>> > >>>>> On 08/06/17 22:44, Guozhang Wang wrote: > >>>>> > >>>> Note that although the internal `AbstractStoreSupplier` does maintain > the > >>>>> key-value serdes, we do not enforce the interface of > `StateStoreSupplier` > >>>>> to always retain that information, and hence we cannot assume that > >>>>> StateStoreSuppliers always retain key / value serdes. > >>>>> > >>>>> On Thu, Jun 8, 2017 at 11:51 AM, Xavier Léauté <xav...@confluent.io> > <xav...@confluent.io> wrote: > >>>>> > >>>>> > >>>>> Another reason for the serde not to be in the first cogroup call, is > that > >>>>> the serde should not be required if you pass a StateStoreSupplier to > >>>>> aggregate() > >>>>> > >>>>> Regarding the aggregated type <T> I don't the why initializer should > be > >>>>> favored over aggregator to define the type. In my mind separating the > >>>>> initializer into the last aggregate call clearly indicates that the > >>>>> initializer is independent of any of the aggregators or streams and > that we > >>>>> don't wait for grouped1 events to initialize the co-group. > >>>>> > >>>>> On Thu, Jun 8, 2017 at 11:14 AM Guozhang Wang <wangg...@gmail.com> < > wangg...@gmail.com> wrote: > >>>>> > >>>>> > >>>>> On a second thought... This is the current proposal API > >>>>> > >>>>> > >>>>> ``` > >>>>> > >>>>> <T> CogroupedKStream<K, T> cogroup(final Initializer<T> initializer, > >>>>> > >>>>> final > >>>>> > >>>>> Aggregator<? super K, ? super V, T> aggregator, final Serde<T> > >>>>> aggValueSerde) > >>>>> > >>>>> ``` > >>>>> > >>>>> > >>>>> If we do not have the initializer in the first co-group it might be > a bit > >>>>> awkward for users to specify the aggregator that returns a typed <T> > >>>>> > >>>>> value? > >>>>> > >>>>> Maybe it is still better to put these two functions in the same api? > >>>>> > >>>>> > >>>>> > >>>>> Guozhang > >>>>> > >>>>> On Thu, Jun 8, 2017 at 11:08 AM, Guozhang Wang <wangg...@gmail.com> > <wangg...@gmail.com> > >>>>> > >>>>> wrote: > >>>>> > >>>>> This suggestion lgtm. I would vote for the first alternative than > >>>>> > >>>>> adding > >>>>> > >>>>> it to the `KStreamBuilder` though. > >>>>> > >>>>> On Thu, Jun 8, 2017 at 10:58 AM, Xavier Léauté <xav...@confluent.io> > <xav...@confluent.io> > >>>>> wrote: > >>>>> > >>>>> > >>>>> I have a minor suggestion to make the API a little bit more > symmetric. > >>>>> I feel it would make more sense to move the initializer and serde to > >>>>> > >>>>> the > >>>>> > >>>>> final aggregate statement, since the serde only applies to the state > >>>>> store, > >>>>> and the initializer doesn't bear any relation to the first group in > >>>>> particular. It would end up looking like this: > >>>>> > >>>>> KTable<K, CG> cogrouped = > >>>>> grouped1.cogroup(aggregator1) > >>>>> .cogroup(grouped2, aggregator2) > >>>>> .cogroup(grouped3, aggregator3) > >>>>> .aggregate(initializer1, aggValueSerde, storeName1); > >>>>> > >>>>> Alternatively, we could move the the first cogroup() method to > >>>>> KStreamBuilder, similar to how we have .merge() > >>>>> and end up with an api that would be even more symmetric. > >>>>> > >>>>> KStreamBuilder.cogroup(grouped1, aggregator1) > >>>>> .cogroup(grouped2, aggregator2) > >>>>> .cogroup(grouped3, aggregator3) > >>>>> .aggregate(initializer1, aggValueSerde, storeName1); > >>>>> > >>>>> This doesn't have to be a blocker, but I thought it would make the > API > >>>>> just > >>>>> a tad cleaner. > >>>>> > >>>>> On Tue, Jun 6, 2017 at 3:59 PM Guozhang Wang <wangg...@gmail.com> < > wangg...@gmail.com> > >>>>> > >>>>> wrote: > >>>>> > >>>>> Kyle, > >>>>> > >>>>> Thanks a lot for the updated KIP. It looks good to me. > >>>>> > >>>>> > >>>>> Guozhang > >>>>> > >>>>> > >>>>> On Fri, Jun 2, 2017 at 5:37 AM, Jim Jagielski <j...@jagunet.com> < > j...@jagunet.com> > >>>>> > >>>>> wrote: > >>>>> > >>>>> This makes much more sense to me. +1 > >>>>> > >>>>> > >>>>> On Jun 1, 2017, at 10:33 AM, Kyle Winkelman < > >>>>> > >>>>> winkelman.k...@gmail.com> > >>>>> > >>>>> wrote: > >>>>> > >>>>> I have updated the KIP and my PR. Let me know what you think. > >>>>> To created a cogrouped stream just call cogroup on a > >>>>> > >>>>> KgroupedStream > >>>>> > >>>>> and > >>>>> > >>>>> supply the initializer, aggValueSerde, and an aggregator. Then > >>>>> > >>>>> continue > >>>>> > >>>>> adding kgroupedstreams and aggregators. Then call one of the > >>>>> > >>>>> many > >>>>> > >>>>> aggregate > >>>>> > >>>>> calls to create a KTable. > >>>>> > >>>>> Thanks, > >>>>> Kyle > >>>>> > >>>>> On Jun 1, 2017 4:03 AM, "Damian Guy" <damian....@gmail.com> < > damian....@gmail.com> > >>>>> > >>>>> wrote: > >>>>> > >>>>> Hi Kyle, > >>>>> > >>>>> Thanks for the update. I think just one initializer makes sense > >>>>> > >>>>> as > >>>>> > >>>>> it > >>>>> > >>>>> should only be called once per key and generally it is just > >>>>> > >>>>> going > >>>>> > >>>>> to > >>>>> > >>>>> create > >>>>> > >>>>> a new instance of whatever the Aggregate class is. > >>>>> > >>>>> Cheers, > >>>>> Damian > >>>>> > >>>>> On Wed, 31 May 2017 at 20:09 Kyle Winkelman < > >>>>> > >>>>> winkelman.k...@gmail.com > >>>>> > >>>>> wrote: > >>>>> > >>>>> > >>>>> Hello all, > >>>>> > >>>>> I have spent some more time on this and the best alternative I > >>>>> > >>>>> have > >>>>> > >>>>> come > >>>>> > >>>>> up > >>>>> > >>>>> with is: > >>>>> KGroupedStream has a single cogroup call that takes an > >>>>> > >>>>> initializer > >>>>> > >>>>> and > >>>>> > >>>>> an > >>>>> > >>>>> aggregator. > >>>>> CogroupedKStream has a cogroup call that takes additional > >>>>> > >>>>> groupedStream > >>>>> > >>>>> aggregator pairs. > >>>>> CogroupedKStream has multiple aggregate methods that create > >>>>> > >>>>> the > >>>>> > >>>>> different > >>>>> > >>>>> stores. > >>>>> > >>>>> I plan on updating the kip but I want people's input on if we > >>>>> > >>>>> should > >>>>> > >>>>> have > >>>>> > >>>>> the initializer be passed in once at the beginning or if we > >>>>> > >>>>> should > >>>>> > >>>>> instead > >>>>> > >>>>> have the initializer be required for each call to one of the > >>>>> > >>>>> aggregate > >>>>> > >>>>> calls. The first makes more sense to me but doesnt allow the > >>>>> > >>>>> user > >>>>> > >>>>> to > >>>>> > >>>>> specify different initializers for different tables. > >>>>> > >>>>> Thanks, > >>>>> Kyle > >>>>> > >>>>> On May 24, 2017 7:46 PM, "Kyle Winkelman" < > >>>>> > >>>>> winkelman.k...@gmail.com> > >>>>> > >>>>> wrote: > >>>>> > >>>>> > >>>>> Yea I really like that idea I'll see what I can do to update > >>>>> > >>>>> the > >>>>> > >>>>> kip > >>>>> > >>>>> and > >>>>> > >>>>> my pr when I have some time. I'm not sure how well creating > >>>>> > >>>>> the > >>>>> > >>>>> kstreamaggregates will go though because at that point I will > >>>>> > >>>>> have > >>>>> > >>>>> thrown > >>>>> > >>>>> away the type of the values. It will be type safe I just may > >>>>> > >>>>> need to > >>>>> > >>>>> do a > >>>>> > >>>>> little forcing. > >>>>> > >>>>> Thanks, > >>>>> Kyle > >>>>> > >>>>> On May 24, 2017 3:28 PM, "Guozhang Wang" <wangg...@gmail.com > >>>>> > >>>>> wrote: > >>>>> > >>>>> Kyle, > >>>>> > >>>>> Thanks for the explanations, my previous read on the wiki > >>>>> > >>>>> examples > >>>>> > >>>>> was > >>>>> > >>>>> wrong. > >>>>> > >>>>> So I guess my motivation should be "reduced" to: can we move > >>>>> > >>>>> the > >>>>> > >>>>> window > >>>>> > >>>>> specs param from "KGroupedStream#cogroup(..)" to > >>>>> "CogroupedKStream#aggregate(..)", and my motivations are: > >>>>> > >>>>> 1. minor: we can reduce the #.generics in CogroupedKStream > >>>>> > >>>>> from > >>>>> > >>>>> 3 > >>>>> > >>>>> to > >>>>> > >>>>> 2. > >>>>> > >>>>> 2. major: this is for extensibility of the APIs, and since > >>>>> > >>>>> we > >>>>> > >>>>> are > >>>>> > >>>>> removing > >>>>> > >>>>> the "Evolving" annotations on Streams it may be harder to > >>>>> > >>>>> change it > >>>>> > >>>>> again > >>>>> > >>>>> in the future. The extended use cases are that people wanted > >>>>> > >>>>> to > >>>>> > >>>>> have > >>>>> > >>>>> windowed running aggregates on different granularities, e.g. > >>>>> > >>>>> "give > >>>>> > >>>>> me > >>>>> > >>>>> the > >>>>> > >>>>> counts per-minute, per-hour, per-day and per-week", and > >>>>> > >>>>> today > >>>>> > >>>>> in > >>>>> > >>>>> DSL > >>>>> > >>>>> we > >>>>> > >>>>> need to specify that case in multiple aggregate operators, > >>>>> > >>>>> which > >>>>> > >>>>> gets > >>>>> > >>>>> a > >>>>> > >>>>> state store / changelog, etc. And it is possible to optimize > >>>>> > >>>>> it > >>>>> > >>>>> as > >>>>> > >>>>> well > >>>>> > >>>>> to > >>>>> > >>>>> a single state store. Its implementation would be tricky as > >>>>> > >>>>> you > >>>>> > >>>>> need > >>>>> > >>>>> to > >>>>> > >>>>> contain different lengthed windows within your window store > >>>>> > >>>>> but > >>>>> > >>>>> just > >>>>> > >>>>> from > >>>>> > >>>>> the public API point of view, it could be specified as: > >>>>> > >>>>> CogroupedKStream stream = stream1.cogroup(stream2, ... > >>>>> "state-store-name"); > >>>>> > >>>>> table1 = stream.aggregate(/*per-minute window*/) > >>>>> table2 = stream.aggregate(/*per-hour window*/) > >>>>> table3 = stream.aggregate(/*per-day window*/) > >>>>> > >>>>> while underlying we are only using a single store > >>>>> > >>>>> "state-store-name" > >>>>> > >>>>> for > >>>>> > >>>>> it. > >>>>> > >>>>> > >>>>> Although this feature is out of the scope of this KIP, I'd > >>>>> > >>>>> like > >>>>> > >>>>> to > >>>>> > >>>>> discuss > >>>>> > >>>>> if we can "leave the door open" to make such changes without > >>>>> > >>>>> modifying > >>>>> > >>>>> the > >>>>> > >>>>> public APIs . > >>>>> > >>>>> Guozhang > >>>>> > >>>>> > >>>>> On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman < > >>>>> > >>>>> winkelman.k...@gmail.com > >>>>> > >>>>> wrote: > >>>>> > >>>>> > >>>>> I allow defining a single window/sessionwindow one time > >>>>> > >>>>> when > >>>>> > >>>>> you > >>>>> > >>>>> make > >>>>> > >>>>> the > >>>>> > >>>>> cogroup call from a KGroupedStream. From then on you are > >>>>> > >>>>> using > >>>>> > >>>>> the > >>>>> > >>>>> cogroup > >>>>> > >>>>> call from with in CogroupedKStream which doesnt accept any > >>>>> > >>>>> additional > >>>>> > >>>>> windows/sessionwindows. > >>>>> > >>>>> Is this what you meant by your question or did I > >>>>> > >>>>> misunderstand? > >>>>> > >>>>> On May 23, 2017 9:33 PM, "Guozhang Wang" < > >>>>> > >>>>> wangg...@gmail.com > >>>>> > >>>>> wrote: > >>>>> > >>>>> Another question that came to me is on "window alignment": > >>>>> > >>>>> from > >>>>> > >>>>> the > >>>>> > >>>>> KIP > >>>>> > >>>>> it > >>>>> > >>>>> seems you are allowing users to specify a (potentially > >>>>> > >>>>> different) > >>>>> > >>>>> window > >>>>> > >>>>> spec in each co-grouped input stream. So if these window > >>>>> > >>>>> specs > >>>>> > >>>>> are > >>>>> > >>>>> different how should we "align" them with different input > >>>>> > >>>>> streams? I > >>>>> > >>>>> think > >>>>> > >>>>> it is more natural to only specify on window spec in the > >>>>> > >>>>> KTable<RK, V> CogroupedKStream#aggregate(Windows); > >>>>> > >>>>> > >>>>> And remove it from the cogroup() functions. WDYT? > >>>>> > >>>>> > >>>>> Guozhang > >>>>> > >>>>> On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang < > >>>>> > >>>>> wangg...@gmail.com> > >>>>> > >>>>> wrote: > >>>>> > >>>>> Thanks for the proposal Kyle, this is a quite common use > >>>>> > >>>>> case > >>>>> > >>>>> to > >>>>> > >>>>> support > >>>>> > >>>>> such multi-way table join (i.e. N source tables with N > >>>>> > >>>>> aggregate > >>>>> > >>>>> func) > >>>>> > >>>>> with > >>>>> > >>>>> a single store and N+1 serdes, I have seen lots of people > >>>>> > >>>>> using > >>>>> > >>>>> the > >>>>> > >>>>> low-level PAPI to achieve this goal. > >>>>> > >>>>> > >>>>> On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman < > >>>>> > >>>>> winkelman.k...@gmail.com > >>>>> > >>>>> wrote: > >>>>> > >>>>> I like your point about not handling other cases such as > >>>>> > >>>>> count > >>>>> > >>>>> and > >>>>> > >>>>> reduce. > >>>>> > >>>>> I think that reduce may not make sense because reduce > >>>>> > >>>>> assumes > >>>>> > >>>>> that > >>>>> > >>>>> the > >>>>> > >>>>> input values are the same as the output values. With > >>>>> > >>>>> cogroup > >>>>> > >>>>> ... > > > > -- > > Signature > > <http://www.openbet.com/> Michal Borowiecki > > Senior Software Engineer L4 > > T: +44 208 742 1600 > > > > > > +44 203 249 8448 > > > > > > > > E: michal.borowie...@openbet.com > > W: www.openbet.com <http://www.openbet.com/> > > > > > > OpenBet Ltd > > > > Chiswick Park Building 9 > > > > 566 Chiswick High Rd > > > > London > > > > W4 5XT > > > > UK > > > > > > <https://www.openbet.com/email_promo> > > > > This message is confidential and intended only for the addressee. If you > > have received this message in error, please immediately notify the > > postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it > > from your system as well as any copies. The content of e-mails as well > > as traffic data may be monitored by OpenBet for employment and security > > purposes. To protect the environment please do not print this e-mail > > unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building > > 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company > > registered in England and Wales. Registered no. 3134634. VAT no. > > GB927523612 > > > >