Note that although the internal `AbstractStoreSupplier` does maintain the key-value serdes, we do not enforce the interface of `StateStoreSupplier` to always retain that information, and hence we cannot assume that StateStoreSuppliers always retain key / value serdes.
On Thu, Jun 8, 2017 at 11:51 AM, Xavier Léauté <xav...@confluent.io> wrote: > Another reason for the serde not to be in the first cogroup call, is that > the serde should not be required if you pass a StateStoreSupplier to > aggregate() > > Regarding the aggregated type <T> I don't the why initializer should be > favored over aggregator to define the type. In my mind separating the > initializer into the last aggregate call clearly indicates that the > initializer is independent of any of the aggregators or streams and that we > don't wait for grouped1 events to initialize the co-group. > > On Thu, Jun 8, 2017 at 11:14 AM Guozhang Wang <wangg...@gmail.com> wrote: > > > On a second thought... This is the current proposal API > > > > > > ``` > > > > <T> CogroupedKStream<K, T> cogroup(final Initializer<T> initializer, > final > > Aggregator<? super K, ? super V, T> aggregator, final Serde<T> > > aggValueSerde) > > > > ``` > > > > > > If we do not have the initializer in the first co-group it might be a bit > > awkward for users to specify the aggregator that returns a typed <T> > value? > > Maybe it is still better to put these two functions in the same api? > > > > > > > > Guozhang > > > > On Thu, Jun 8, 2017 at 11:08 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > This suggestion lgtm. I would vote for the first alternative than > adding > > > it to the `KStreamBuilder` though. > > > > > > On Thu, Jun 8, 2017 at 10:58 AM, Xavier Léauté <xav...@confluent.io> > > > wrote: > > > > > >> I have a minor suggestion to make the API a little bit more symmetric. > > >> I feel it would make more sense to move the initializer and serde to > the > > >> final aggregate statement, since the serde only applies to the state > > >> store, > > >> and the initializer doesn't bear any relation to the first group in > > >> particular. It would end up looking like this: > > >> > > >> KTable<K, CG> cogrouped = > > >> grouped1.cogroup(aggregator1) > > >> .cogroup(grouped2, aggregator2) > > >> .cogroup(grouped3, aggregator3) > > >> .aggregate(initializer1, aggValueSerde, storeName1); > > >> > > >> Alternatively, we could move the the first cogroup() method to > > >> KStreamBuilder, similar to how we have .merge() > > >> and end up with an api that would be even more symmetric. > > >> > > >> KStreamBuilder.cogroup(grouped1, aggregator1) > > >> .cogroup(grouped2, aggregator2) > > >> .cogroup(grouped3, aggregator3) > > >> .aggregate(initializer1, aggValueSerde, storeName1); > > >> > > >> This doesn't have to be a blocker, but I thought it would make the API > > >> just > > >> a tad cleaner. > > >> > > >> On Tue, Jun 6, 2017 at 3:59 PM Guozhang Wang <wangg...@gmail.com> > > wrote: > > >> > > >> > Kyle, > > >> > > > >> > Thanks a lot for the updated KIP. It looks good to me. > > >> > > > >> > > > >> > Guozhang > > >> > > > >> > > > >> > On Fri, Jun 2, 2017 at 5:37 AM, Jim Jagielski <j...@jagunet.com> > > wrote: > > >> > > > >> > > This makes much more sense to me. +1 > > >> > > > > >> > > > On Jun 1, 2017, at 10:33 AM, Kyle Winkelman < > > >> winkelman.k...@gmail.com> > > >> > > wrote: > > >> > > > > > >> > > > I have updated the KIP and my PR. Let me know what you think. > > >> > > > To created a cogrouped stream just call cogroup on a > > KgroupedStream > > >> and > > >> > > > supply the initializer, aggValueSerde, and an aggregator. Then > > >> continue > > >> > > > adding kgroupedstreams and aggregators. Then call one of the > many > > >> > > aggregate > > >> > > > calls to create a KTable. > > >> > > > > > >> > > > Thanks, > > >> > > > Kyle > > >> > > > > > >> > > > On Jun 1, 2017 4:03 AM, "Damian Guy" <damian....@gmail.com> > > wrote: > > >> > > > > > >> > > >> Hi Kyle, > > >> > > >> > > >> > > >> Thanks for the update. I think just one initializer makes sense > > as > > >> it > > >> > > >> should only be called once per key and generally it is just > going > > >> to > > >> > > create > > >> > > >> a new instance of whatever the Aggregate class is. > > >> > > >> > > >> > > >> Cheers, > > >> > > >> Damian > > >> > > >> > > >> > > >> On Wed, 31 May 2017 at 20:09 Kyle Winkelman < > > >> winkelman.k...@gmail.com > > >> > > > > >> > > >> wrote: > > >> > > >> > > >> > > >>> Hello all, > > >> > > >>> > > >> > > >>> I have spent some more time on this and the best alternative I > > >> have > > >> > > come > > >> > > >> up > > >> > > >>> with is: > > >> > > >>> KGroupedStream has a single cogroup call that takes an > > initializer > > >> > and > > >> > > an > > >> > > >>> aggregator. > > >> > > >>> CogroupedKStream has a cogroup call that takes additional > > >> > groupedStream > > >> > > >>> aggregator pairs. > > >> > > >>> CogroupedKStream has multiple aggregate methods that create > the > > >> > > different > > >> > > >>> stores. > > >> > > >>> > > >> > > >>> I plan on updating the kip but I want people's input on if we > > >> should > > >> > > have > > >> > > >>> the initializer be passed in once at the beginning or if we > > should > > >> > > >> instead > > >> > > >>> have the initializer be required for each call to one of the > > >> > aggregate > > >> > > >>> calls. The first makes more sense to me but doesnt allow the > > user > > >> to > > >> > > >>> specify different initializers for different tables. > > >> > > >>> > > >> > > >>> Thanks, > > >> > > >>> Kyle > > >> > > >>> > > >> > > >>> On May 24, 2017 7:46 PM, "Kyle Winkelman" < > > >> winkelman.k...@gmail.com> > > >> > > >>> wrote: > > >> > > >>> > > >> > > >>>> Yea I really like that idea I'll see what I can do to update > > the > > >> kip > > >> > > >> and > > >> > > >>>> my pr when I have some time. I'm not sure how well creating > the > > >> > > >>>> kstreamaggregates will go though because at that point I will > > >> have > > >> > > >> thrown > > >> > > >>>> away the type of the values. It will be type safe I just may > > >> need to > > >> > > >> do a > > >> > > >>>> little forcing. > > >> > > >>>> > > >> > > >>>> Thanks, > > >> > > >>>> Kyle > > >> > > >>>> > > >> > > >>>> On May 24, 2017 3:28 PM, "Guozhang Wang" <wangg...@gmail.com > > > > >> > wrote: > > >> > > >>>> > > >> > > >>>>> Kyle, > > >> > > >>>>> > > >> > > >>>>> Thanks for the explanations, my previous read on the wiki > > >> examples > > >> > > was > > >> > > >>>>> wrong. > > >> > > >>>>> > > >> > > >>>>> So I guess my motivation should be "reduced" to: can we move > > the > > >> > > >> window > > >> > > >>>>> specs param from "KGroupedStream#cogroup(..)" to > > >> > > >>>>> "CogroupedKStream#aggregate(..)", and my motivations are: > > >> > > >>>>> > > >> > > >>>>> 1. minor: we can reduce the #.generics in CogroupedKStream > > from > > >> 3 > > >> > to > > >> > > >> 2. > > >> > > >>>>> 2. major: this is for extensibility of the APIs, and since > we > > >> are > > >> > > >>> removing > > >> > > >>>>> the "Evolving" annotations on Streams it may be harder to > > >> change it > > >> > > >>> again > > >> > > >>>>> in the future. The extended use cases are that people wanted > > to > > >> > have > > >> > > >>>>> windowed running aggregates on different granularities, e.g. > > >> "give > > >> > me > > >> > > >>> the > > >> > > >>>>> counts per-minute, per-hour, per-day and per-week", and > today > > in > > >> > DSL > > >> > > >> we > > >> > > >>>>> need to specify that case in multiple aggregate operators, > > which > > >> > gets > > >> > > >> a > > >> > > >>>>> state store / changelog, etc. And it is possible to optimize > > it > > >> as > > >> > > >> well > > >> > > >>> to > > >> > > >>>>> a single state store. Its implementation would be tricky as > > you > > >> > need > > >> > > >> to > > >> > > >>>>> contain different lengthed windows within your window store > > but > > >> > just > > >> > > >>> from > > >> > > >>>>> the public API point of view, it could be specified as: > > >> > > >>>>> > > >> > > >>>>> CogroupedKStream stream = stream1.cogroup(stream2, ... > > >> > > >>>>> "state-store-name"); > > >> > > >>>>> > > >> > > >>>>> table1 = stream.aggregate(/*per-minute window*/) > > >> > > >>>>> table2 = stream.aggregate(/*per-hour window*/) > > >> > > >>>>> table3 = stream.aggregate(/*per-day window*/) > > >> > > >>>>> > > >> > > >>>>> while underlying we are only using a single store > > >> > "state-store-name" > > >> > > >> for > > >> > > >>>>> it. > > >> > > >>>>> > > >> > > >>>>> > > >> > > >>>>> Although this feature is out of the scope of this KIP, I'd > > like > > >> to > > >> > > >>> discuss > > >> > > >>>>> if we can "leave the door open" to make such changes without > > >> > > modifying > > >> > > >>> the > > >> > > >>>>> public APIs . > > >> > > >>>>> > > >> > > >>>>> Guozhang > > >> > > >>>>> > > >> > > >>>>> > > >> > > >>>>> On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman < > > >> > > >>> winkelman.k...@gmail.com > > >> > > >>>>>> > > >> > > >>>>> wrote: > > >> > > >>>>> > > >> > > >>>>>> I allow defining a single window/sessionwindow one time > when > > >> you > > >> > > >> make > > >> > > >>>>> the > > >> > > >>>>>> cogroup call from a KGroupedStream. From then on you are > > using > > >> the > > >> > > >>>>> cogroup > > >> > > >>>>>> call from with in CogroupedKStream which doesnt accept any > > >> > > >> additional > > >> > > >>>>>> windows/sessionwindows. > > >> > > >>>>>> > > >> > > >>>>>> Is this what you meant by your question or did I > > misunderstand? > > >> > > >>>>>> > > >> > > >>>>>> On May 23, 2017 9:33 PM, "Guozhang Wang" < > wangg...@gmail.com > > > > > >> > > >> wrote: > > >> > > >>>>>> > > >> > > >>>>>> Another question that came to me is on "window alignment": > > from > > >> > the > > >> > > >>> KIP > > >> > > >>>>> it > > >> > > >>>>>> seems you are allowing users to specify a (potentially > > >> different) > > >> > > >>> window > > >> > > >>>>>> spec in each co-grouped input stream. So if these window > > specs > > >> are > > >> > > >>>>>> different how should we "align" them with different input > > >> > streams? I > > >> > > >>>>> think > > >> > > >>>>>> it is more natural to only specify on window spec in the > > >> > > >>>>>> > > >> > > >>>>>> KTable<RK, V> CogroupedKStream#aggregate(Windows); > > >> > > >>>>>> > > >> > > >>>>>> > > >> > > >>>>>> And remove it from the cogroup() functions. WDYT? > > >> > > >>>>>> > > >> > > >>>>>> > > >> > > >>>>>> Guozhang > > >> > > >>>>>> > > >> > > >>>>>> On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang < > > >> > wangg...@gmail.com> > > >> > > >>>>> wrote: > > >> > > >>>>>> > > >> > > >>>>>>> Thanks for the proposal Kyle, this is a quite common use > > case > > >> to > > >> > > >>>>> support > > >> > > >>>>>>> such multi-way table join (i.e. N source tables with N > > >> aggregate > > >> > > >>> func) > > >> > > >>>>>> with > > >> > > >>>>>>> a single store and N+1 serdes, I have seen lots of people > > >> using > > >> > > >> the > > >> > > >>>>>>> low-level PAPI to achieve this goal. > > >> > > >>>>>>> > > >> > > >>>>>>> > > >> > > >>>>>>> On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman < > > >> > > >>>>>> winkelman.k...@gmail.com > > >> > > >>>>>>>> wrote: > > >> > > >>>>>>> > > >> > > >>>>>>>> I like your point about not handling other cases such as > > >> count > > >> > > >> and > > >> > > >>>>>> reduce. > > >> > > >>>>>>>> > > >> > > >>>>>>>> I think that reduce may not make sense because reduce > > assumes > > >> > > >> that > > >> > > >>>>> the > > >> > > >>>>>>>> input values are the same as the output values. With > > cogroup > > >> > > >> there > > >> > > >>>>> may > > >> > > >>>>>> be > > >> > > >>>>>>>> multiple different input types and then your output type > > >> cant be > > >> > > >>>>>> multiple > > >> > > >>>>>>>> different things. In the case where you have all matching > > >> value > > >> > > >>> types > > >> > > >>>>>> you > > >> > > >>>>>>>> can do KStreamBuilder#merge followed by the reduce. > > >> > > >>>>>>>> > > >> > > >>>>>>>> As for count I think it is possible to call count on all > > the > > >> > > >>>>> individual > > >> > > >>>>>>>> grouped streams and then do joins. Otherwise we could > maybe > > >> make > > >> > > >> a > > >> > > >>>>>> special > > >> > > >>>>>>>> call in groupedstream for this case. Because in this case > > we > > >> > dont > > >> > > >>>>> need > > >> > > >>>>>> to > > >> > > >>>>>>>> do type checking on the values. It could be similar to > the > > >> > > >> current > > >> > > >>>>> count > > >> > > >>>>>>>> methods but accept a var args of additonal grouped > streams > > as > > >> > > >> well > > >> > > >>>>> and > > >> > > >>>>>>>> make > > >> > > >>>>>>>> sure they have a key type of K. > > >> > > >>>>>>>> > > >> > > >>>>>>>> The way I have put the kip together is to ensure that we > do > > >> type > > >> > > >>>>>> checking. > > >> > > >>>>>>>> I don't see a way we could group them all first and then > > >> make a > > >> > > >>> call > > >> > > >>>>> to > > >> > > >>>>>>>> count, reduce, or aggregate because with aggregate they > > would > > >> > > >> need > > >> > > >>> to > > >> > > >>>>>> pass > > >> > > >>>>>>>> a list of aggregators and we would have no way of type > > >> checking > > >> > > >>> that > > >> > > >>>>>> they > > >> > > >>>>>>>> match the grouped streams. > > >> > > >>>>>>>> > > >> > > >>>>>>>> Thanks, > > >> > > >>>>>>>> Kyle > > >> > > >>>>>>>> > > >> > > >>>>>>>> On May 19, 2017 11:42 AM, "Xavier Léauté" < > > >> xav...@confluent.io> > > >> > > >>>>> wrote: > > >> > > >>>>>>>> > > >> > > >>>>>>>>> Sorry to jump on this thread so late. I agree this is a > > very > > >> > > >>> useful > > >> > > >>>>>>>>> addition and wanted to provide an additional use-case > and > > >> some > > >> > > >>> more > > >> > > >>>>>>>>> comments. > > >> > > >>>>>>>>> > > >> > > >>>>>>>>> This is actually a very common analytics use-case in the > > >> > > >> ad-tech > > >> > > >>>>>>>> industry. > > >> > > >>>>>>>>> The typical setup will have an auction stream, an > > impression > > >> > > >>>>> stream, > > >> > > >>>>>>>> and a > > >> > > >>>>>>>>> click stream. Those three streams need to be combined to > > >> > > >> compute > > >> > > >>>>>>>> aggregate > > >> > > >>>>>>>>> statistics (e.g. impression statistics, and > click-through > > >> > > >> rates), > > >> > > >>>>>> since > > >> > > >>>>>>>>> most of the attributes of interest are only present the > > >> auction > > >> > > >>>>>> stream. > > >> > > >>>>>>>>> > > >> > > >>>>>>>>> A simple way to do this is to co-group all the streams > by > > >> the > > >> > > >>>>> auction > > >> > > >>>>>>>> key, > > >> > > >>>>>>>>> and process updates to the co-group as events for each > > >> stream > > >> > > >>> come > > >> > > >>>>> in, > > >> > > >>>>>>>>> keeping only one value from each stream before sending > > >> > > >> downstream > > >> > > >>>>> for > > >> > > >>>>>>>>> further processing / aggregation. > > >> > > >>>>>>>>> > > >> > > >>>>>>>>> One could view the result of that co-group operation as > a > > >> > > >>> "KTable" > > >> > > >>>>>> with > > >> > > >>>>>>>>> multiple values per key. The key being the grouping key, > > and > > >> > > >> the > > >> > > >>>>>> values > > >> > > >>>>>>>>> consisting of one value per stream. > > >> > > >>>>>>>>> > > >> > > >>>>>>>>> What I like about Kyle's approach is that allows elegant > > >> > > >>>>> co-grouping > > >> > > >>>>>> of > > >> > > >>>>>>>>> multiple streams without having to worry about the > number > > of > > >> > > >>>>> streams, > > >> > > >>>>>>>> and > > >> > > >>>>>>>>> avoids dealing with Tuple types or other generic > > interfaces > > >> > > >> that > > >> > > >>>>> could > > >> > > >>>>>>>> get > > >> > > >>>>>>>>> messy if we wanted to preserve all the value types in > the > > >> > > >>> resulting > > >> > > >>>>>>>>> co-grouped stream. > > >> > > >>>>>>>>> > > >> > > >>>>>>>>> My only concern is that we only allow the cogroup + > > >> aggregate > > >> > > >>>>> combined > > >> > > >>>>>>>>> operation. This forces the user to build their own tuple > > >> > > >>>>> serialization > > >> > > >>>>>>>>> format if they want to preserve the individual input > > stream > > >> > > >>> values > > >> > > >>>>> as > > >> > > >>>>>> a > > >> > > >>>>>>>>> group. It also deviates quite a bit from our approach in > > >> > > >>>>>> KGroupedStream > > >> > > >>>>>>>>> which offers other operations, such as count and reduce, > > >> which > > >> > > >>>>> should > > >> > > >>>>>>>> also > > >> > > >>>>>>>>> be applicable to a co-grouped stream. > > >> > > >>>>>>>>> > > >> > > >>>>>>>>> Overall I still think this is a really useful addition, > > but > > >> I > > >> > > >>> feel > > >> > > >>>>> we > > >> > > >>>>>>>>> haven't spend much time trying to explore alternative > DSLs > > >> that > > >> > > >>>>> could > > >> > > >>>>>>>> maybe > > >> > > >>>>>>>>> generalize better or match our existing syntax more > > closely. > > >> > > >>>>>>>>> > > >> > > >>>>>>>>> On Tue, May 9, 2017 at 8:08 AM Kyle Winkelman < > > >> > > >>>>>> winkelman.k...@gmail.com > > >> > > >>>>>>>>> > > >> > > >>>>>>>>> wrote: > > >> > > >>>>>>>>> > > >> > > >>>>>>>>>> Eno, is there anyone else that is an expert in the > kafka > > >> > > >>> streams > > >> > > >>>>>> realm > > >> > > >>>>>>>>> that > > >> > > >>>>>>>>>> I should reach out to for input? > > >> > > >>>>>>>>>> > > >> > > >>>>>>>>>> I believe Damian Guy is still planning on reviewing > this > > >> more > > >> > > >>> in > > >> > > >>>>>> depth > > >> > > >>>>>>>>> so I > > >> > > >>>>>>>>>> will wait for his inputs before continuing. > > >> > > >>>>>>>>>> > > >> > > >>>>>>>>>> On May 9, 2017 7:30 AM, "Eno Thereska" < > > >> > > >> eno.there...@gmail.com > > >> > > >>>> > > >> > > >>>>>>>> wrote: > > >> > > >>>>>>>>>> > > >> > > >>>>>>>>>>> Thanks Kyle, good arguments. > > >> > > >>>>>>>>>>> > > >> > > >>>>>>>>>>> Eno > > >> > > >>>>>>>>>>> > > >> > > >>>>>>>>>>>> On May 7, 2017, at 5:06 PM, Kyle Winkelman < > > >> > > >>>>>>>> winkelman.k...@gmail.com > > >> > > >>>>>>>>>> > > >> > > >>>>>>>>>>> wrote: > > >> > > >>>>>>>>>>>> > > >> > > >>>>>>>>>>>> *- minor: could you add an exact example (similar to > > what > > >> > > >>>>> Jay’s > > >> > > >>>>>>>>> example > > >> > > >>>>>>>>>>> is, > > >> > > >>>>>>>>>>>> or like your Spark/Pig pointers had) to make this > super > > >> > > >>>>>> concrete?* > > >> > > >>>>>>>>>>>> I have added a more concrete example to the KIP. > > >> > > >>>>>>>>>>>> > > >> > > >>>>>>>>>>>> *- my main concern is that we’re exposing this > > >> > > >> optimization > > >> > > >>>>> to > > >> > > >>>>>> the > > >> > > >>>>>>>>> DSL. > > >> > > >>>>>>>>>>> In > > >> > > >>>>>>>>>>>> an ideal world, an optimizer would take the existing > > DSL > > >> > > >>> and > > >> > > >>>>> do > > >> > > >>>>>>>> the > > >> > > >>>>>>>>>> right > > >> > > >>>>>>>>>>>> thing under the covers (create just one state store, > > >> > > >>> arrange > > >> > > >>>>> the > > >> > > >>>>>>>>> nodes > > >> > > >>>>>>>>>>>> etc). The original DSL had a bunch of small, > composable > > >> > > >>>>> pieces > > >> > > >>>>>>>>> (group, > > >> > > >>>>>>>>>>>> aggregate, join) that this proposal groups together. > > I’d > > >> > > >>>>> like to > > >> > > >>>>>>>> hear > > >> > > >>>>>>>>>>> your > > >> > > >>>>>>>>>>>> thoughts on whether it’s possible to do this > > optimization > > >> > > >>>>> with > > >> > > >>>>>> the > > >> > > >>>>>>>>>>> current > > >> > > >>>>>>>>>>>> DSL, at the topology builder level.* > > >> > > >>>>>>>>>>>> You would have to make a lot of checks to understand > if > > >> > > >> it > > >> > > >>> is > > >> > > >>>>>> even > > >> > > >>>>>>>>>>> possible > > >> > > >>>>>>>>>>>> to make this optimization: > > >> > > >>>>>>>>>>>> 1. Make sure they are all KTableKTableOuterJoins > > >> > > >>>>>>>>>>>> 2. None of the intermediate KTables are used for > > anything > > >> > > >>>>> else. > > >> > > >>>>>>>>>>>> 3. None of the intermediate stores are used. (This > may > > be > > >> > > >>>>>>>> impossible > > >> > > >>>>>>>>>>>> especially if they use KafkaStreams#store after the > > >> > > >>> topology > > >> > > >>>>> has > > >> > > >>>>>>>>>> already > > >> > > >>>>>>>>>>>> been built.) > > >> > > >>>>>>>>>>>> You would then need to make decisions during the > > >> > > >>>>> optimization: > > >> > > >>>>>>>>>>>> 1. Your new initializer would the composite of all > the > > >> > > >>>>>> individual > > >> > > >>>>>>>>>>>> initializers and the valueJoiners. > > >> > > >>>>>>>>>>>> 2. I am having a hard time thinking about how you > would > > >> > > >>> turn > > >> > > >>>>> the > > >> > > >>>>>>>>>>>> aggregators and valueJoiners into an aggregator that > > >> > > >> would > > >> > > >>>>> work > > >> > > >>>>>> on > > >> > > >>>>>>>>> the > > >> > > >>>>>>>>>>>> final object, but this may be possible. > > >> > > >>>>>>>>>>>> 3. Which state store would you use? The ones declared > > >> > > >> would > > >> > > >>>>> be > > >> > > >>>>>> for > > >> > > >>>>>>>>> the > > >> > > >>>>>>>>>>>> aggregate values. None of the declared ones would be > > >> > > >>>>> guaranteed > > >> > > >>>>>> to > > >> > > >>>>>>>>> hold > > >> > > >>>>>>>>>>> the > > >> > > >>>>>>>>>>>> final object. This would mean you must created a new > > >> > > >> state > > >> > > >>>>> store > > >> > > >>>>>>>> and > > >> > > >>>>>>>>>> not > > >> > > >>>>>>>>>>>> created any of the declared ones. > > >> > > >>>>>>>>>>>> > > >> > > >>>>>>>>>>>> The main argument I have against it is even if it > could > > >> > > >> be > > >> > > >>>>> done > > >> > > >>>>>> I > > >> > > >>>>>>>>> don't > > >> > > >>>>>>>>>>>> know that we would want to have this be an > optimization > > >> > > >> in > > >> > > >>>>> the > > >> > > >>>>>>>>>> background > > >> > > >>>>>>>>>>>> because the user would still be required to think > about > > >> > > >> all > > >> > > >>>>> of > > >> > > >>>>>> the > > >> > > >>>>>>>>>>>> intermediate values that they shouldn't need to worry > > >> > > >> about > > >> > > >>>>> if > > >> > > >>>>>>>> they > > >> > > >>>>>>>>>> only > > >> > > >>>>>>>>>>>> care about the final object. > > >> > > >>>>>>>>>>>> > > >> > > >>>>>>>>>>>> In my opinion cogroup is a common enough case that it > > >> > > >>> should > > >> > > >>>>> be > > >> > > >>>>>>>> part > > >> > > >>>>>>>>> of > > >> > > >>>>>>>>>>> the > > >> > > >>>>>>>>>>>> composable pieces (group, aggregate, join) because we > > >> > > >> want > > >> > > >>> to > > >> > > >>>>>>>> allow > > >> > > >>>>>>>>>>> people > > >> > > >>>>>>>>>>>> to join more than 2 or more streams in an easy way. > > Right > > >> > > >>>>> now I > > >> > > >>>>>>>> don't > > >> > > >>>>>>>>>>> think > > >> > > >>>>>>>>>>>> we give them ways of handling this use case easily. > > >> > > >>>>>>>>>>>> > > >> > > >>>>>>>>>>>> *-I think there will be scope for several such > > >> > > >>> optimizations > > >> > > >>>>> in > > >> > > >>>>>>>> the > > >> > > >>>>>>>>>>> future > > >> > > >>>>>>>>>>>> and perhaps at some point we need to think about > > >> > > >> decoupling > > >> > > >>>>> the > > >> > > >>>>>>>> 1:1 > > >> > > >>>>>>>>>>> mapping > > >> > > >>>>>>>>>>>> from the DSL into the physical topology.* > > >> > > >>>>>>>>>>>> I would argue that cogroup is not just an > optimization > > it > > >> > > >>> is > > >> > > >>>>> a > > >> > > >>>>>> new > > >> > > >>>>>>>>> way > > >> > > >>>>>>>>>>> for > > >> > > >>>>>>>>>>>> the users to look at accomplishing a problem that > > >> > > >> requires > > >> > > >>>>>>>> multiple > > >> > > >>>>>>>>>>>> streams. I may sound like a broken record but I don't > > >> > > >> think > > >> > > >>>>>> users > > >> > > >>>>>>>>>> should > > >> > > >>>>>>>>>>>> have to build the N-1 intermediate tables and deal > with > > >> > > >>> their > > >> > > >>>>>>>>>>> initializers, > > >> > > >>>>>>>>>>>> serdes and stores if all they care about is the final > > >> > > >>> object. > > >> > > >>>>>>>>>>>> Now if for example someone uses cogroup but doesn't > > >> > > >> supply > > >> > > >>>>>>>> additional > > >> > > >>>>>>>>>>>> streams and aggregators this case is equivalent to a > > >> > > >> single > > >> > > >>>>>>>> grouped > > >> > > >>>>>>>>>>> stream > > >> > > >>>>>>>>>>>> making an aggregate call. This case is what I view an > > >> > > >>>>>> optimization > > >> > > >>>>>>>>> as, > > >> > > >>>>>>>>>> we > > >> > > >>>>>>>>>>>> could remove the KStreamCogroup and act as if there > was > > >> > > >>> just > > >> > > >>>>> a > > >> > > >>>>>>>> call > > >> > > >>>>>>>>> to > > >> > > >>>>>>>>>>>> KGroupedStream#aggregate instead of calling > > >> > > >>>>>>>> KGroupedStream#cogroup. > > >> > > >>>>>>>>> (I > > >> > > >>>>>>>>>>>> would prefer to just write a warning saying that this > > is > > >> > > >>> not > > >> > > >>>>> how > > >> > > >>>>>>>>>> cogroup > > >> > > >>>>>>>>>>> is > > >> > > >>>>>>>>>>>> to be used.) > > >> > > >>>>>>>>>>>> > > >> > > >>>>>>>>>>>> Thanks, > > >> > > >>>>>>>>>>>> Kyle > > >> > > >>>>>>>>>>>> > > >> > > >>>>>>>>>>>> On Sun, May 7, 2017 at 5:41 AM, Eno Thereska < > > >> > > >>>>>>>> eno.there...@gmail.com > > >> > > >>>>>>>>>> > > >> > > >>>>>>>>>>> wrote: > > >> > > >>>>>>>>>>>> > > >> > > >>>>>>>>>>>>> Hi Kyle, > > >> > > >>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>> Thanks for the KIP again. A couple of comments: > > >> > > >>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>> - minor: could you add an exact example (similar to > > what > > >> > > >>>>> Jay’s > > >> > > >>>>>>>>> example > > >> > > >>>>>>>>>>> is, > > >> > > >>>>>>>>>>>>> or like your Spark/Pig pointers had) to make this > > super > > >> > > >>>>>> concrete? > > >> > > >>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>> - my main concern is that we’re exposing this > > >> > > >> optimization > > >> > > >>>>> to > > >> > > >>>>>> the > > >> > > >>>>>>>>> DSL. > > >> > > >>>>>>>>>>> In > > >> > > >>>>>>>>>>>>> an ideal world, an optimizer would take the existing > > DSL > > >> > > >>>>> and do > > >> > > >>>>>>>> the > > >> > > >>>>>>>>>>> right > > >> > > >>>>>>>>>>>>> thing under the covers (create just one state store, > > >> > > >>> arrange > > >> > > >>>>>> the > > >> > > >>>>>>>>> nodes > > >> > > >>>>>>>>>>>>> etc). The original DSL had a bunch of small, > > composable > > >> > > >>>>> pieces > > >> > > >>>>>>>>> (group, > > >> > > >>>>>>>>>>>>> aggregate, join) that this proposal groups together. > > I’d > > >> > > >>>>> like > > >> > > >>>>>> to > > >> > > >>>>>>>>> hear > > >> > > >>>>>>>>>>> your > > >> > > >>>>>>>>>>>>> thoughts on whether it’s possible to do this > > >> > > >> optimization > > >> > > >>>>> with > > >> > > >>>>>>>> the > > >> > > >>>>>>>>>>> current > > >> > > >>>>>>>>>>>>> DSL, at the topology builder level. > > >> > > >>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>> I think there will be scope for several such > > >> > > >> optimizations > > >> > > >>>>> in > > >> > > >>>>>> the > > >> > > >>>>>>>>>> future > > >> > > >>>>>>>>>>>>> and perhaps at some point we need to think about > > >> > > >>> decoupling > > >> > > >>>>> the > > >> > > >>>>>>>> 1:1 > > >> > > >>>>>>>>>>> mapping > > >> > > >>>>>>>>>>>>> from the DSL into the physical topology. > > >> > > >>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>> Thanks > > >> > > >>>>>>>>>>>>> Eno > > >> > > >>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>> On May 5, 2017, at 4:39 PM, Jay Kreps < > > >> > > >> j...@confluent.io> > > >> > > >>>>>> wrote: > > >> > > >>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>> I haven't digested the proposal but the use case is > > >> > > >>> pretty > > >> > > >>>>>>>> common. > > >> > > >>>>>>>>> An > > >> > > >>>>>>>>>>>>>> example would be the "customer 360" or "unified > > >> > > >> customer > > >> > > >>>>>>>> profile" > > >> > > >>>>>>>>> use > > >> > > >>>>>>>>>>>>> case > > >> > > >>>>>>>>>>>>>> we often use. In that use case you have a dozen > > systems > > >> > > >>>>> each > > >> > > >>>>>> of > > >> > > >>>>>>>>> which > > >> > > >>>>>>>>>>> has > > >> > > >>>>>>>>>>>>>> some information about your customer (account > > details, > > >> > > >>>>>> settings, > > >> > > >>>>>>>>>>> billing > > >> > > >>>>>>>>>>>>>> info, customer service contacts, purchase history, > > >> > > >> etc). > > >> > > >>>>> Your > > >> > > >>>>>>>> goal > > >> > > >>>>>>>>> is > > >> > > >>>>>>>>>>> to > > >> > > >>>>>>>>>>>>>> join/munge these into a single profile record for > > each > > >> > > >>>>>> customer > > >> > > >>>>>>>>> that > > >> > > >>>>>>>>>>> has > > >> > > >>>>>>>>>>>>>> all the relevant info in a usable form and is > > >> > > >> up-to-date > > >> > > >>>>> with > > >> > > >>>>>>>> all > > >> > > >>>>>>>>> the > > >> > > >>>>>>>>>>>>>> source systems. If you implement that with kstreams > > as > > >> > > >> a > > >> > > >>>>>>>> sequence > > >> > > >>>>>>>>> of > > >> > > >>>>>>>>>>>>> joins > > >> > > >>>>>>>>>>>>>> i think today we'd fully materialize N-1 > intermediate > > >> > > >>>>> tables. > > >> > > >>>>>>>> But > > >> > > >>>>>>>>>>> clearly > > >> > > >>>>>>>>>>>>>> you only need a single stage to group all these > > things > > >> > > >>> that > > >> > > >>>>>> are > > >> > > >>>>>>>>>> already > > >> > > >>>>>>>>>>>>>> co-partitioned. A distributed database would do > this > > >> > > >>> under > > >> > > >>>>> the > > >> > > >>>>>>>>> covers > > >> > > >>>>>>>>>>>>> which > > >> > > >>>>>>>>>>>>>> is arguably better (at least when it does the right > > >> > > >>> thing) > > >> > > >>>>> and > > >> > > >>>>>>>>>> perhaps > > >> > > >>>>>>>>>>> we > > >> > > >>>>>>>>>>>>>> could do the same thing but I'm not sure we know > the > > >> > > >>>>>>>> partitioning > > >> > > >>>>>>>>> so > > >> > > >>>>>>>>>> we > > >> > > >>>>>>>>>>>>> may > > >> > > >>>>>>>>>>>>>> need an explicit cogroup command that impllies they > > are > > >> > > >>>>>> already > > >> > > >>>>>>>>>>>>>> co-partitioned. > > >> > > >>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>> -Jay > > >> > > >>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman < > > >> > > >>>>>>>>>>> winkelman.k...@gmail.com > > >> > > >>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>> wrote: > > >> > > >>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>> Yea thats a good way to look at it. > > >> > > >>>>>>>>>>>>>>> I have seen this type of functionality in a couple > > >> > > >> other > > >> > > >>>>>>>> platforms > > >> > > >>>>>>>>>>> like > > >> > > >>>>>>>>>>>>>>> spark and pig. > > >> > > >>>>>>>>>>>>>>> https://spark.apache.org/docs/ > 0.6.2/api/core/spark/ > > >> > > >>>>>>>>>>>>> PairRDDFunctions.html > > >> > > >>>>>>>>>>>>>>> > > https://www.tutorialspoint.com/apache_pig/apache_pig_ > > >> > > >>>>>>>>>>>>> cogroup_operator.htm > > >> > > >>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>> On May 5, 2017 7:43 AM, "Damian Guy" < > > >> > > >>>>> damian....@gmail.com> > > >> > > >>>>>>>>> wrote: > > >> > > >>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>> Hi Kyle, > > >> > > >>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>> If i'm reading this correctly it is like an N way > > >> > > >> outer > > >> > > >>>>>> join? > > >> > > >>>>>>>> So > > >> > > >>>>>>>>> an > > >> > > >>>>>>>>>>>>> input > > >> > > >>>>>>>>>>>>>>>> on any stream will always produce a new > aggregated > > >> > > >>> value > > >> > > >>>>> - > > >> > > >>>>>> is > > >> > > >>>>>>>>> that > > >> > > >>>>>>>>>>>>>>> correct? > > >> > > >>>>>>>>>>>>>>>> Effectively, each Aggregator just looks up the > > >> > > >> current > > >> > > >>>>>> value, > > >> > > >>>>>>>>>>>>> aggregates > > >> > > >>>>>>>>>>>>>>>> and forwards the result. > > >> > > >>>>>>>>>>>>>>>> I need to look into it and think about it a bit > > more, > > >> > > >>>>> but it > > >> > > >>>>>>>>> seems > > >> > > >>>>>>>>>>> like > > >> > > >>>>>>>>>>>>>>> it > > >> > > >>>>>>>>>>>>>>>> could be a useful optimization. > > >> > > >>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman < > > >> > > >>>>>>>>>> winkelman.k...@gmail.com > > >> > > >>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>> wrote: > > >> > > >>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>> I sure can. I have added the following > description > > >> > > >> to > > >> > > >>> my > > >> > > >>>>>>>> KIP. If > > >> > > >>>>>>>>>>> this > > >> > > >>>>>>>>>>>>>>>>> doesn't help let me know and I will take some > more > > >> > > >>> time > > >> > > >>>>> to > > >> > > >>>>>>>>> build a > > >> > > >>>>>>>>>>>>>>>> diagram > > >> > > >>>>>>>>>>>>>>>>> and make more of a step by step description: > > >> > > >>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>> Example with Current API: > > >> > > >>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>> KTable<K, V1> table1 = > > >> > > >>>>>>>>>>>>>>>>> builder.stream("topic1").group > ByKey().aggregate( > > >> > > >>>>>> initializer1 > > >> > > >>>>>>>> , > > >> > > >>>>>>>>>>>>>>>> aggregator1, > > >> > > >>>>>>>>>>>>>>>>> aggValueSerde1, storeName1); > > >> > > >>>>>>>>>>>>>>>>> KTable<K, V2> table2 = > > >> > > >>>>>>>>>>>>>>>>> builder.stream("topic2").group > ByKey().aggregate( > > >> > > >>>>>> initializer2 > > >> > > >>>>>>>> , > > >> > > >>>>>>>>>>>>>>>> aggregator2, > > >> > > >>>>>>>>>>>>>>>>> aggValueSerde2, storeName2); > > >> > > >>>>>>>>>>>>>>>>> KTable<K, V3> table3 = > > >> > > >>>>>>>>>>>>>>>>> builder.stream("topic3").group > ByKey().aggregate( > > >> > > >>>>>> initializer3 > > >> > > >>>>>>>> , > > >> > > >>>>>>>>>>>>>>>> aggregator3, > > >> > > >>>>>>>>>>>>>>>>> aggValueSerde3, storeName3); > > >> > > >>>>>>>>>>>>>>>>> KTable<K, CG> cogrouped = > table1.outerJoin(table2, > > >> > > >>>>>>>>>>>>>>>>> joinerOneAndTwo).outerJoin(table3, > > >> > > >>>>> joinerOneTwoAndThree); > > >> > > >>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>> As you can see this creates 3 StateStores, > > requires > > >> > > >> 3 > > >> > > >>>>>>>>>> initializers, > > >> > > >>>>>>>>>>>>>>> and 3 > > >> > > >>>>>>>>>>>>>>>>> aggValueSerdes. This also adds the pressure to > > user > > >> > > >> to > > >> > > >>>>>> define > > >> > > >>>>>>>>> what > > >> > > >>>>>>>>>>> the > > >> > > >>>>>>>>>>>>>>>>> intermediate values are going to be (V1, V2, > V3). > > >> > > >> They > > >> > > >>>>> are > > >> > > >>>>>>>> left > > >> > > >>>>>>>>>>> with a > > >> > > >>>>>>>>>>>>>>>>> couple choices, first to make V1, V2, and V3 all > > the > > >> > > >>>>> same > > >> > > >>>>>> as > > >> > > >>>>>>>> CG > > >> > > >>>>>>>>>> and > > >> > > >>>>>>>>>>>>> the > > >> > > >>>>>>>>>>>>>>>> two > > >> > > >>>>>>>>>>>>>>>>> joiners are more like mergers, or second make > them > > >> > > >>>>>>>> intermediate > > >> > > >>>>>>>>>>> states > > >> > > >>>>>>>>>>>>>>>> such > > >> > > >>>>>>>>>>>>>>>>> as Topic1Map, Topic2Map, and Topic3Map and the > > >> > > >> joiners > > >> > > >>>>> use > > >> > > >>>>>>>> those > > >> > > >>>>>>>>>> to > > >> > > >>>>>>>>>>>>>>> build > > >> > > >>>>>>>>>>>>>>>>> the final aggregate CG value. This is something > > the > > >> > > >>> user > > >> > > >>>>>>>> could > > >> > > >>>>>>>>>> avoid > > >> > > >>>>>>>>>>>>>>>>> thinking about with this KIP. > > >> > > >>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>> When a new input arrives lets say at "topic1" it > > >> > > >> will > > >> > > >>>>> first > > >> > > >>>>>>>> go > > >> > > >>>>>>>>>>> through > > >> > > >>>>>>>>>>>>>>> a > > >> > > >>>>>>>>>>>>>>>>> KStreamAggregate grabbing the current aggregate > > from > > >> > > >>>>>>>> storeName1. > > >> > > >>>>>>>>>> It > > >> > > >>>>>>>>>>>>>>> will > > >> > > >>>>>>>>>>>>>>>>> produce this in the form of the first > intermediate > > >> > > >>> value > > >> > > >>>>>> and > > >> > > >>>>>>>> get > > >> > > >>>>>>>>>>> sent > > >> > > >>>>>>>>>>>>>>>>> through a KTableKTableOuterJoin where it will > look > > >> > > >> up > > >> > > >>>>> the > > >> > > >>>>>>>>> current > > >> > > >>>>>>>>>>>>> value > > >> > > >>>>>>>>>>>>>>>> of > > >> > > >>>>>>>>>>>>>>>>> the key in storeName2. It will use the first > > joiner > > >> > > >> to > > >> > > >>>>>>>> calculate > > >> > > >>>>>>>>>> the > > >> > > >>>>>>>>>>>>>>>> second > > >> > > >>>>>>>>>>>>>>>>> intermediate value, which will go through an > > >> > > >>> additional > > >> > > >>>>>>>>>>>>>>>>> KTableKTableOuterJoin. Here it will look up the > > >> > > >>> current > > >> > > >>>>>>>> value of > > >> > > >>>>>>>>>> the > > >> > > >>>>>>>>>>>>>>> key > > >> > > >>>>>>>>>>>>>>>> in > > >> > > >>>>>>>>>>>>>>>>> storeName3 and use the second joiner to build > the > > >> > > >>> final > > >> > > >>>>>>>>> aggregate > > >> > > >>>>>>>>>>>>>>> value. > > >> > > >>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>> If you think through all possibilities for > > incoming > > >> > > >>>>> topics > > >> > > >>>>>>>> you > > >> > > >>>>>>>>>> will > > >> > > >>>>>>>>>>>>> see > > >> > > >>>>>>>>>>>>>>>>> that no matter which topic it comes in through > all > > >> > > >>> three > > >> > > >>>>>>>> stores > > >> > > >>>>>>>>>> are > > >> > > >>>>>>>>>>>>>>>> queried > > >> > > >>>>>>>>>>>>>>>>> and all of the joiners must get used. > > >> > > >>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>> Topology wise for N incoming streams this > creates > > N > > >> > > >>>>>>>>>>>>>>>>> KStreamAggregates, 2*(N-1) > KTableKTableOuterJoins, > > >> > > >> and > > >> > > >>>>> N-1 > > >> > > >>>>>>>>>>>>>>>>> KTableKTableJoinMergers. > > >> > > >>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>> Example with Proposed API: > > >> > > >>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>> KGroupedStream<K, V1> grouped1 = > > >> > > >>>>> builder.stream("topic1"). > > >> > > >>>>>>>>>>>>>>> groupByKey(); > > >> > > >>>>>>>>>>>>>>>>> KGroupedStream<K, V2> grouped2 = > > >> > > >>>>> builder.stream("topic2"). > > >> > > >>>>>>>>>>>>>>> groupByKey(); > > >> > > >>>>>>>>>>>>>>>>> KGroupedStream<K, V3> grouped3 = > > >> > > >>>>> builder.stream("topic3"). > > >> > > >>>>>>>>>>>>>>> groupByKey(); > > >> > > >>>>>>>>>>>>>>>>> KTable<K, CG> cogrouped = > > >> > > >>> grouped1.cogroup(initializer1, > > >> > > >>>>>>>>>>> aggregator1, > > >> > > >>>>>>>>>>>>>>>>> aggValueSerde1, storeName1) > > >> > > >>>>>>>>>>>>>>>>> .cogroup(grouped2, aggregator2) > > >> > > >>>>>>>>>>>>>>>>> .cogroup(grouped3, aggregator3) > > >> > > >>>>>>>>>>>>>>>>> .aggregate(); > > >> > > >>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>> As you can see this creates 1 StateStore, > > requires 1 > > >> > > >>>>>>>>> initializer, > > >> > > >>>>>>>>>>> and > > >> > > >>>>>>>>>>>>> 1 > > >> > > >>>>>>>>>>>>>>>>> aggValueSerde. The user no longer has to worry > > about > > >> > > >>> the > > >> > > >>>>>>>>>>> intermediate > > >> > > >>>>>>>>>>>>>>>>> values and the joiners. All they have to think > > about > > >> > > >>> is > > >> > > >>>>> how > > >> > > >>>>>>>> each > > >> > > >>>>>>>>>>>>> stream > > >> > > >>>>>>>>>>>>>>>>> impacts the creation of the final CG object. > > >> > > >>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>> When a new input arrives lets say at "topic1" it > > >> > > >> will > > >> > > >>>>> first > > >> > > >>>>>>>> go > > >> > > >>>>>>>>>>> through > > >> > > >>>>>>>>>>>>>>> a > > >> > > >>>>>>>>>>>>>>>>> KStreamAggreagte and grab the current aggregate > > from > > >> > > >>>>>>>> storeName1. > > >> > > >>>>>>>>>> It > > >> > > >>>>>>>>>>>>>>> will > > >> > > >>>>>>>>>>>>>>>>> add its incoming object to the aggregate, update > > the > > >> > > >>>>> store > > >> > > >>>>>>>> and > > >> > > >>>>>>>>>> pass > > >> > > >>>>>>>>>>>>> the > > >> > > >>>>>>>>>>>>>>>> new > > >> > > >>>>>>>>>>>>>>>>> aggregate on. This new aggregate goes through > the > > >> > > >>>>>>>> KStreamCogroup > > >> > > >>>>>>>>>>> which > > >> > > >>>>>>>>>>>>>>> is > > >> > > >>>>>>>>>>>>>>>>> pretty much just a pass through processor and > you > > >> > > >> are > > >> > > >>>>> done. > > >> > > >>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>> Topology wise for N incoming streams the new api > > >> > > >> will > > >> > > >>>>> only > > >> > > >>>>>>>> every > > >> > > >>>>>>>>>>>>>>> create N > > >> > > >>>>>>>>>>>>>>>>> KStreamAggregates and 1 KStreamCogroup. > > >> > > >>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax > < > > >> > > >>>>>>>>>>>>> matth...@confluent.io > > >> > > >>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>> wrote: > > >> > > >>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>> Kyle, > > >> > > >>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>> thanks a lot for the KIP. Maybe I am a little > > slow, > > >> > > >>>>> but I > > >> > > >>>>>>>> could > > >> > > >>>>>>>>>> not > > >> > > >>>>>>>>>>>>>>>>>> follow completely. Could you maybe add a more > > >> > > >>> concrete > > >> > > >>>>>>>> example, > > >> > > >>>>>>>>>>> like > > >> > > >>>>>>>>>>>>>>> 3 > > >> > > >>>>>>>>>>>>>>>>>> streams with 3 records each (plus expected > > result), > > >> > > >>> and > > >> > > >>>>>> show > > >> > > >>>>>>>>> the > > >> > > >>>>>>>>>>>>>>>>>> difference between current way to to implement > it > > >> > > >> and > > >> > > >>>>> the > > >> > > >>>>>>>>>> proposed > > >> > > >>>>>>>>>>>>>>> API? > > >> > > >>>>>>>>>>>>>>>>>> This could also cover the internal processing > to > > >> > > >> see > > >> > > >>>>> what > > >> > > >>>>>>>> store > > >> > > >>>>>>>>>>> calls > > >> > > >>>>>>>>>>>>>>>>>> would be required for both approaches etc. > > >> > > >>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>> I think, it's pretty advanced stuff you > propose, > > >> > > >> and > > >> > > >>> it > > >> > > >>>>>>>> would > > >> > > >>>>>>>>>> help > > >> > > >>>>>>>>>>> to > > >> > > >>>>>>>>>>>>>>>>>> understand it better. > > >> > > >>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>> Thanks a lot! > > >> > > >>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>> -Matthias > > >> > > >>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote: > > >> > > >>>>>>>>>>>>>>>>>>> I have made a pull request. It can be found > > here. > > >> > > >>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/2975 > > >> > > >>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>> I plan to write some more unit tests for my > > >> > > >> classes > > >> > > >>>>> and > > >> > > >>>>>> get > > >> > > >>>>>>>>>> around > > >> > > >>>>>>>>>>>>>>> to > > >> > > >>>>>>>>>>>>>>>>>>> writing documentation for the public api > > >> > > >> additions. > > >> > > >>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>> One thing I was curious about is during the > > >> > > >>>>>>>>>>>>>>>>>> KCogroupedStreamImpl#aggregate > > >> > > >>>>>>>>>>>>>>>>>>> method I pass null to the KGroupedStream# > > >> > > >>>>>>>>> repartitionIfRequired > > >> > > >>>>>>>>>>>>>>>> method. > > >> > > >>>>>>>>>>>>>>>>> I > > >> > > >>>>>>>>>>>>>>>>>>> can't supply the store name because if more > than > > >> > > >> one > > >> > > >>>>>>>> grouped > > >> > > >>>>>>>>>>> stream > > >> > > >>>>>>>>>>>>>>>>>>> repartitions an error is thrown. Is there some > > >> > > >> name > > >> > > >>>>> that > > >> > > >>>>>>>>> someone > > >> > > >>>>>>>>>>>>>>> can > > >> > > >>>>>>>>>>>>>>>>>>> recommend or should I leave the null and allow > > it > > >> > > >> to > > >> > > >>>>> fall > > >> > > >>>>>>>> back > > >> > > >>>>>>>>>> to > > >> > > >>>>>>>>>>>>>>> the > > >> > > >>>>>>>>>>>>>>>>>>> KGroupedStream.name? > > >> > > >>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>> Should this be expanded to handle grouped > > tables? > > >> > > >>> This > > >> > > >>>>>>>> would > > >> > > >>>>>>>>> be > > >> > > >>>>>>>>>>>>>>>> pretty > > >> > > >>>>>>>>>>>>>>>>>> easy > > >> > > >>>>>>>>>>>>>>>>>>> for a normal aggregate but one allowing > session > > >> > > >>> stores > > >> > > >>>>>> and > > >> > > >>>>>>>>>>> windowed > > >> > > >>>>>>>>>>>>>>>>>> stores > > >> > > >>>>>>>>>>>>>>>>>>> would required KTableSessionWindowAggregate > and > > >> > > >>>>>>>>>>>>>>> KTableWindowAggregate > > >> > > >>>>>>>>>>>>>>>>>>> implementations. > > >> > > >>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>> Thanks, > > >> > > >>>>>>>>>>>>>>>>>>> Kyle > > >> > > >>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" < > > >> > > >>>>>>>>> eno.there...@gmail.com> > > >> > > >>>>>>>>>>>>>>>> wrote: > > >> > > >>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>>> I’ll look as well asap, sorry, been swamped. > > >> > > >>>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>>> Eno > > >> > > >>>>>>>>>>>>>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy < > > >> > > >>>>>>>>> damian....@gmail.com> > > >> > > >>>>>>>>>>>>>>>> wrote: > > >> > > >>>>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>>>> Hi Kyle, > > >> > > >>>>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. I apologize that i > haven't > > >> > > >> had > > >> > > >>>>> the > > >> > > >>>>>>>>> chance > > >> > > >>>>>>>>>> to > > >> > > >>>>>>>>>>>>>>>> look > > >> > > >>>>>>>>>>>>>>>>>> at > > >> > > >>>>>>>>>>>>>>>>>>>>> the KIP yet, but will schedule some time to > > look > > >> > > >>>>> into > > >> > > >>>>>> it > > >> > > >>>>>>>>>>>>>>> tomorrow. > > >> > > >>>>>>>>>>>>>>>>> For > > >> > > >>>>>>>>>>>>>>>>>>>> the > > >> > > >>>>>>>>>>>>>>>>>>>>> implementation, can you raise a PR against > > kafka > > >> > > >>>>> trunk > > >> > > >>>>>>>> and > > >> > > >>>>>>>>>> mark > > >> > > >>>>>>>>>>>>>>> it > > >> > > >>>>>>>>>>>>>>>> as > > >> > > >>>>>>>>>>>>>>>>>>>> WIP? > > >> > > >>>>>>>>>>>>>>>>>>>>> It will be easier to review what you have > > done. > > >> > > >>>>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>>>> Thanks, > > >> > > >>>>>>>>>>>>>>>>>>>>> Damian > > >> > > >>>>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman < > > >> > > >>>>>>>>>>>>>>>> winkelman.k...@gmail.com > > >> > > >>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>>> wrote: > > >> > > >>>>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>>>>> I am replying to this in hopes it will draw > > >> > > >> some > > >> > > >>>>>>>> attention > > >> > > >>>>>>>>> to > > >> > > >>>>>>>>>>> my > > >> > > >>>>>>>>>>>>>>>> KIP > > >> > > >>>>>>>>>>>>>>>>>> as > > >> > > >>>>>>>>>>>>>>>>>>>> I > > >> > > >>>>>>>>>>>>>>>>>>>>>> haven't heard from anyone in a couple days. > > >> > > >> This > > >> > > >>>>> is my > > >> > > >>>>>>>>> first > > >> > > >>>>>>>>>>> KIP > > >> > > >>>>>>>>>>>>>>>> and > > >> > > >>>>>>>>>>>>>>>>>> my > > >> > > >>>>>>>>>>>>>>>>>>>>>> first large contribution to the project so > > I'm > > >> > > >>>>> sure I > > >> > > >>>>>>>> did > > >> > > >>>>>>>>>>>>>>>> something > > >> > > >>>>>>>>>>>>>>>>>>>> wrong. > > >> > > >>>>>>>>>>>>>>>>>>>>>> ;) > > >> > > >>>>>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" < > > >> > > >>>>>>>>>>>>>>>> winkelman.k...@gmail.com> > > >> > > >>>>>>>>>>>>>>>>>>>> wrote: > > >> > > >>>>>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>>>>>> Hello all, > > >> > > >>>>>>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>>>>>> I have created KIP-150 to facilitate > > >> > > >> discussion > > >> > > >>>>> about > > >> > > >>>>>>>>> adding > > >> > > >>>>>>>>>>>>>>>>> cogroup > > >> > > >>>>>>>>>>>>>>>>>> to > > >> > > >>>>>>>>>>>>>>>>>>>>>>> the streams DSL. > > >> > > >>>>>>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>>>>>> Please find the KIP here: > > >> > > >>>>>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/ > > >> > > >>>>>> confluence/display/KAFKA/KIP- > > >> > > >>>>>>>>>>>>>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup > > >> > > >>>>>>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>>>>>> Please find my initial implementation > here: > > >> > > >>>>>>>>>>>>>>>>>>>>>>> https://github.com/KyleWinkelman/kafka > > >> > > >>>>>>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>>>>>> Thanks, > > >> > > >>>>>>>>>>>>>>>>>>>>>>> Kyle Winkelman > > >> > > >>>>>>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>> > > >> > > >>>>>>>>>>>>> > > >> > > >>>>>>>>>>> > > >> > > >>>>>>>>>>> > > >> > > >>>>>>>>>> > > >> > > >>>>>>>>> > > >> > > >>>>>>>> > > >> > > >>>>>>> > > >> > > >>>>>>> > > >> > > >>>>>>> > > >> > > >>>>>>> -- > > >> > > >>>>>>> -- Guozhang > > >> > > >>>>>>> > > >> > > >>>>>> > > >> > > >>>>>> > > >> > > >>>>>> > > >> > > >>>>>> -- > > >> > > >>>>>> -- Guozhang > > >> > > >>>>>> > > >> > > >>>>> > > >> > > >>>>> > > >> > > >>>>> > > >> > > >>>>> -- > > >> > > >>>>> -- Guozhang > > >> > > >>>>> > > >> > > >>>> > > >> > > >>> > > >> > > >> > > >> > > > > >> > > > > >> > > > >> > > > >> > -- > > >> > -- Guozhang > > >> > > > >> > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang