Kyle, Thanks a lot for the updated KIP. It looks good to me.
Guozhang On Fri, Jun 2, 2017 at 5:37 AM, Jim Jagielski <j...@jagunet.com> wrote: > This makes much more sense to me. +1 > > > On Jun 1, 2017, at 10:33 AM, Kyle Winkelman <winkelman.k...@gmail.com> > wrote: > > > > I have updated the KIP and my PR. Let me know what you think. > > To created a cogrouped stream just call cogroup on a KgroupedStream and > > supply the initializer, aggValueSerde, and an aggregator. Then continue > > adding kgroupedstreams and aggregators. Then call one of the many > aggregate > > calls to create a KTable. > > > > Thanks, > > Kyle > > > > On Jun 1, 2017 4:03 AM, "Damian Guy" <damian....@gmail.com> wrote: > > > >> Hi Kyle, > >> > >> Thanks for the update. I think just one initializer makes sense as it > >> should only be called once per key and generally it is just going to > create > >> a new instance of whatever the Aggregate class is. > >> > >> Cheers, > >> Damian > >> > >> On Wed, 31 May 2017 at 20:09 Kyle Winkelman <winkelman.k...@gmail.com> > >> wrote: > >> > >>> Hello all, > >>> > >>> I have spent some more time on this and the best alternative I have > come > >> up > >>> with is: > >>> KGroupedStream has a single cogroup call that takes an initializer and > an > >>> aggregator. > >>> CogroupedKStream has a cogroup call that takes additional groupedStream > >>> aggregator pairs. > >>> CogroupedKStream has multiple aggregate methods that create the > different > >>> stores. > >>> > >>> I plan on updating the kip but I want people's input on if we should > have > >>> the initializer be passed in once at the beginning or if we should > >> instead > >>> have the initializer be required for each call to one of the aggregate > >>> calls. The first makes more sense to me but doesnt allow the user to > >>> specify different initializers for different tables. > >>> > >>> Thanks, > >>> Kyle > >>> > >>> On May 24, 2017 7:46 PM, "Kyle Winkelman" <winkelman.k...@gmail.com> > >>> wrote: > >>> > >>>> Yea I really like that idea I'll see what I can do to update the kip > >> and > >>>> my pr when I have some time. I'm not sure how well creating the > >>>> kstreamaggregates will go though because at that point I will have > >> thrown > >>>> away the type of the values. It will be type safe I just may need to > >> do a > >>>> little forcing. > >>>> > >>>> Thanks, > >>>> Kyle > >>>> > >>>> On May 24, 2017 3:28 PM, "Guozhang Wang" <wangg...@gmail.com> wrote: > >>>> > >>>>> Kyle, > >>>>> > >>>>> Thanks for the explanations, my previous read on the wiki examples > was > >>>>> wrong. > >>>>> > >>>>> So I guess my motivation should be "reduced" to: can we move the > >> window > >>>>> specs param from "KGroupedStream#cogroup(..)" to > >>>>> "CogroupedKStream#aggregate(..)", and my motivations are: > >>>>> > >>>>> 1. minor: we can reduce the #.generics in CogroupedKStream from 3 to > >> 2. > >>>>> 2. major: this is for extensibility of the APIs, and since we are > >>> removing > >>>>> the "Evolving" annotations on Streams it may be harder to change it > >>> again > >>>>> in the future. The extended use cases are that people wanted to have > >>>>> windowed running aggregates on different granularities, e.g. "give me > >>> the > >>>>> counts per-minute, per-hour, per-day and per-week", and today in DSL > >> we > >>>>> need to specify that case in multiple aggregate operators, which gets > >> a > >>>>> state store / changelog, etc. And it is possible to optimize it as > >> well > >>> to > >>>>> a single state store. Its implementation would be tricky as you need > >> to > >>>>> contain different lengthed windows within your window store but just > >>> from > >>>>> the public API point of view, it could be specified as: > >>>>> > >>>>> CogroupedKStream stream = stream1.cogroup(stream2, ... > >>>>> "state-store-name"); > >>>>> > >>>>> table1 = stream.aggregate(/*per-minute window*/) > >>>>> table2 = stream.aggregate(/*per-hour window*/) > >>>>> table3 = stream.aggregate(/*per-day window*/) > >>>>> > >>>>> while underlying we are only using a single store "state-store-name" > >> for > >>>>> it. > >>>>> > >>>>> > >>>>> Although this feature is out of the scope of this KIP, I'd like to > >>> discuss > >>>>> if we can "leave the door open" to make such changes without > modifying > >>> the > >>>>> public APIs . > >>>>> > >>>>> Guozhang > >>>>> > >>>>> > >>>>> On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman < > >>> winkelman.k...@gmail.com > >>>>>> > >>>>> wrote: > >>>>> > >>>>>> I allow defining a single window/sessionwindow one time when you > >> make > >>>>> the > >>>>>> cogroup call from a KGroupedStream. From then on you are using the > >>>>> cogroup > >>>>>> call from with in CogroupedKStream which doesnt accept any > >> additional > >>>>>> windows/sessionwindows. > >>>>>> > >>>>>> Is this what you meant by your question or did I misunderstand? > >>>>>> > >>>>>> On May 23, 2017 9:33 PM, "Guozhang Wang" <wangg...@gmail.com> > >> wrote: > >>>>>> > >>>>>> Another question that came to me is on "window alignment": from the > >>> KIP > >>>>> it > >>>>>> seems you are allowing users to specify a (potentially different) > >>> window > >>>>>> spec in each co-grouped input stream. So if these window specs are > >>>>>> different how should we "align" them with different input streams? I > >>>>> think > >>>>>> it is more natural to only specify on window spec in the > >>>>>> > >>>>>> KTable<RK, V> CogroupedKStream#aggregate(Windows); > >>>>>> > >>>>>> > >>>>>> And remove it from the cogroup() functions. WDYT? > >>>>>> > >>>>>> > >>>>>> Guozhang > >>>>>> > >>>>>> On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang <wangg...@gmail.com> > >>>>> wrote: > >>>>>> > >>>>>>> Thanks for the proposal Kyle, this is a quite common use case to > >>>>> support > >>>>>>> such multi-way table join (i.e. N source tables with N aggregate > >>> func) > >>>>>> with > >>>>>>> a single store and N+1 serdes, I have seen lots of people using > >> the > >>>>>>> low-level PAPI to achieve this goal. > >>>>>>> > >>>>>>> > >>>>>>> On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman < > >>>>>> winkelman.k...@gmail.com > >>>>>>>> wrote: > >>>>>>> > >>>>>>>> I like your point about not handling other cases such as count > >> and > >>>>>> reduce. > >>>>>>>> > >>>>>>>> I think that reduce may not make sense because reduce assumes > >> that > >>>>> the > >>>>>>>> input values are the same as the output values. With cogroup > >> there > >>>>> may > >>>>>> be > >>>>>>>> multiple different input types and then your output type cant be > >>>>>> multiple > >>>>>>>> different things. In the case where you have all matching value > >>> types > >>>>>> you > >>>>>>>> can do KStreamBuilder#merge followed by the reduce. > >>>>>>>> > >>>>>>>> As for count I think it is possible to call count on all the > >>>>> individual > >>>>>>>> grouped streams and then do joins. Otherwise we could maybe make > >> a > >>>>>> special > >>>>>>>> call in groupedstream for this case. Because in this case we dont > >>>>> need > >>>>>> to > >>>>>>>> do type checking on the values. It could be similar to the > >> current > >>>>> count > >>>>>>>> methods but accept a var args of additonal grouped streams as > >> well > >>>>> and > >>>>>>>> make > >>>>>>>> sure they have a key type of K. > >>>>>>>> > >>>>>>>> The way I have put the kip together is to ensure that we do type > >>>>>> checking. > >>>>>>>> I don't see a way we could group them all first and then make a > >>> call > >>>>> to > >>>>>>>> count, reduce, or aggregate because with aggregate they would > >> need > >>> to > >>>>>> pass > >>>>>>>> a list of aggregators and we would have no way of type checking > >>> that > >>>>>> they > >>>>>>>> match the grouped streams. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Kyle > >>>>>>>> > >>>>>>>> On May 19, 2017 11:42 AM, "Xavier Léauté" <xav...@confluent.io> > >>>>> wrote: > >>>>>>>> > >>>>>>>>> Sorry to jump on this thread so late. I agree this is a very > >>> useful > >>>>>>>>> addition and wanted to provide an additional use-case and some > >>> more > >>>>>>>>> comments. > >>>>>>>>> > >>>>>>>>> This is actually a very common analytics use-case in the > >> ad-tech > >>>>>>>> industry. > >>>>>>>>> The typical setup will have an auction stream, an impression > >>>>> stream, > >>>>>>>> and a > >>>>>>>>> click stream. Those three streams need to be combined to > >> compute > >>>>>>>> aggregate > >>>>>>>>> statistics (e.g. impression statistics, and click-through > >> rates), > >>>>>> since > >>>>>>>>> most of the attributes of interest are only present the auction > >>>>>> stream. > >>>>>>>>> > >>>>>>>>> A simple way to do this is to co-group all the streams by the > >>>>> auction > >>>>>>>> key, > >>>>>>>>> and process updates to the co-group as events for each stream > >>> come > >>>>> in, > >>>>>>>>> keeping only one value from each stream before sending > >> downstream > >>>>> for > >>>>>>>>> further processing / aggregation. > >>>>>>>>> > >>>>>>>>> One could view the result of that co-group operation as a > >>> "KTable" > >>>>>> with > >>>>>>>>> multiple values per key. The key being the grouping key, and > >> the > >>>>>> values > >>>>>>>>> consisting of one value per stream. > >>>>>>>>> > >>>>>>>>> What I like about Kyle's approach is that allows elegant > >>>>> co-grouping > >>>>>> of > >>>>>>>>> multiple streams without having to worry about the number of > >>>>> streams, > >>>>>>>> and > >>>>>>>>> avoids dealing with Tuple types or other generic interfaces > >> that > >>>>> could > >>>>>>>> get > >>>>>>>>> messy if we wanted to preserve all the value types in the > >>> resulting > >>>>>>>>> co-grouped stream. > >>>>>>>>> > >>>>>>>>> My only concern is that we only allow the cogroup + aggregate > >>>>> combined > >>>>>>>>> operation. This forces the user to build their own tuple > >>>>> serialization > >>>>>>>>> format if they want to preserve the individual input stream > >>> values > >>>>> as > >>>>>> a > >>>>>>>>> group. It also deviates quite a bit from our approach in > >>>>>> KGroupedStream > >>>>>>>>> which offers other operations, such as count and reduce, which > >>>>> should > >>>>>>>> also > >>>>>>>>> be applicable to a co-grouped stream. > >>>>>>>>> > >>>>>>>>> Overall I still think this is a really useful addition, but I > >>> feel > >>>>> we > >>>>>>>>> haven't spend much time trying to explore alternative DSLs that > >>>>> could > >>>>>>>> maybe > >>>>>>>>> generalize better or match our existing syntax more closely. > >>>>>>>>> > >>>>>>>>> On Tue, May 9, 2017 at 8:08 AM Kyle Winkelman < > >>>>>> winkelman.k...@gmail.com > >>>>>>>>> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Eno, is there anyone else that is an expert in the kafka > >>> streams > >>>>>> realm > >>>>>>>>> that > >>>>>>>>>> I should reach out to for input? > >>>>>>>>>> > >>>>>>>>>> I believe Damian Guy is still planning on reviewing this more > >>> in > >>>>>> depth > >>>>>>>>> so I > >>>>>>>>>> will wait for his inputs before continuing. > >>>>>>>>>> > >>>>>>>>>> On May 9, 2017 7:30 AM, "Eno Thereska" < > >> eno.there...@gmail.com > >>>> > >>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Thanks Kyle, good arguments. > >>>>>>>>>>> > >>>>>>>>>>> Eno > >>>>>>>>>>> > >>>>>>>>>>>> On May 7, 2017, at 5:06 PM, Kyle Winkelman < > >>>>>>>> winkelman.k...@gmail.com > >>>>>>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> *- minor: could you add an exact example (similar to what > >>>>> Jay’s > >>>>>>>>> example > >>>>>>>>>>> is, > >>>>>>>>>>>> or like your Spark/Pig pointers had) to make this super > >>>>>> concrete?* > >>>>>>>>>>>> I have added a more concrete example to the KIP. > >>>>>>>>>>>> > >>>>>>>>>>>> *- my main concern is that we’re exposing this > >> optimization > >>>>> to > >>>>>> the > >>>>>>>>> DSL. > >>>>>>>>>>> In > >>>>>>>>>>>> an ideal world, an optimizer would take the existing DSL > >>> and > >>>>> do > >>>>>>>> the > >>>>>>>>>> right > >>>>>>>>>>>> thing under the covers (create just one state store, > >>> arrange > >>>>> the > >>>>>>>>> nodes > >>>>>>>>>>>> etc). The original DSL had a bunch of small, composable > >>>>> pieces > >>>>>>>>> (group, > >>>>>>>>>>>> aggregate, join) that this proposal groups together. I’d > >>>>> like to > >>>>>>>> hear > >>>>>>>>>>> your > >>>>>>>>>>>> thoughts on whether it’s possible to do this optimization > >>>>> with > >>>>>> the > >>>>>>>>>>> current > >>>>>>>>>>>> DSL, at the topology builder level.* > >>>>>>>>>>>> You would have to make a lot of checks to understand if > >> it > >>> is > >>>>>> even > >>>>>>>>>>> possible > >>>>>>>>>>>> to make this optimization: > >>>>>>>>>>>> 1. Make sure they are all KTableKTableOuterJoins > >>>>>>>>>>>> 2. None of the intermediate KTables are used for anything > >>>>> else. > >>>>>>>>>>>> 3. None of the intermediate stores are used. (This may be > >>>>>>>> impossible > >>>>>>>>>>>> especially if they use KafkaStreams#store after the > >>> topology > >>>>> has > >>>>>>>>>> already > >>>>>>>>>>>> been built.) > >>>>>>>>>>>> You would then need to make decisions during the > >>>>> optimization: > >>>>>>>>>>>> 1. Your new initializer would the composite of all the > >>>>>> individual > >>>>>>>>>>>> initializers and the valueJoiners. > >>>>>>>>>>>> 2. I am having a hard time thinking about how you would > >>> turn > >>>>> the > >>>>>>>>>>>> aggregators and valueJoiners into an aggregator that > >> would > >>>>> work > >>>>>> on > >>>>>>>>> the > >>>>>>>>>>>> final object, but this may be possible. > >>>>>>>>>>>> 3. Which state store would you use? The ones declared > >> would > >>>>> be > >>>>>> for > >>>>>>>>> the > >>>>>>>>>>>> aggregate values. None of the declared ones would be > >>>>> guaranteed > >>>>>> to > >>>>>>>>> hold > >>>>>>>>>>> the > >>>>>>>>>>>> final object. This would mean you must created a new > >> state > >>>>> store > >>>>>>>> and > >>>>>>>>>> not > >>>>>>>>>>>> created any of the declared ones. > >>>>>>>>>>>> > >>>>>>>>>>>> The main argument I have against it is even if it could > >> be > >>>>> done > >>>>>> I > >>>>>>>>> don't > >>>>>>>>>>>> know that we would want to have this be an optimization > >> in > >>>>> the > >>>>>>>>>> background > >>>>>>>>>>>> because the user would still be required to think about > >> all > >>>>> of > >>>>>> the > >>>>>>>>>>>> intermediate values that they shouldn't need to worry > >> about > >>>>> if > >>>>>>>> they > >>>>>>>>>> only > >>>>>>>>>>>> care about the final object. > >>>>>>>>>>>> > >>>>>>>>>>>> In my opinion cogroup is a common enough case that it > >>> should > >>>>> be > >>>>>>>> part > >>>>>>>>> of > >>>>>>>>>>> the > >>>>>>>>>>>> composable pieces (group, aggregate, join) because we > >> want > >>> to > >>>>>>>> allow > >>>>>>>>>>> people > >>>>>>>>>>>> to join more than 2 or more streams in an easy way. Right > >>>>> now I > >>>>>>>> don't > >>>>>>>>>>> think > >>>>>>>>>>>> we give them ways of handling this use case easily. > >>>>>>>>>>>> > >>>>>>>>>>>> *-I think there will be scope for several such > >>> optimizations > >>>>> in > >>>>>>>> the > >>>>>>>>>>> future > >>>>>>>>>>>> and perhaps at some point we need to think about > >> decoupling > >>>>> the > >>>>>>>> 1:1 > >>>>>>>>>>> mapping > >>>>>>>>>>>> from the DSL into the physical topology.* > >>>>>>>>>>>> I would argue that cogroup is not just an optimization it > >>> is > >>>>> a > >>>>>> new > >>>>>>>>> way > >>>>>>>>>>> for > >>>>>>>>>>>> the users to look at accomplishing a problem that > >> requires > >>>>>>>> multiple > >>>>>>>>>>>> streams. I may sound like a broken record but I don't > >> think > >>>>>> users > >>>>>>>>>> should > >>>>>>>>>>>> have to build the N-1 intermediate tables and deal with > >>> their > >>>>>>>>>>> initializers, > >>>>>>>>>>>> serdes and stores if all they care about is the final > >>> object. > >>>>>>>>>>>> Now if for example someone uses cogroup but doesn't > >> supply > >>>>>>>> additional > >>>>>>>>>>>> streams and aggregators this case is equivalent to a > >> single > >>>>>>>> grouped > >>>>>>>>>>> stream > >>>>>>>>>>>> making an aggregate call. This case is what I view an > >>>>>> optimization > >>>>>>>>> as, > >>>>>>>>>> we > >>>>>>>>>>>> could remove the KStreamCogroup and act as if there was > >>> just > >>>>> a > >>>>>>>> call > >>>>>>>>> to > >>>>>>>>>>>> KGroupedStream#aggregate instead of calling > >>>>>>>> KGroupedStream#cogroup. > >>>>>>>>> (I > >>>>>>>>>>>> would prefer to just write a warning saying that this is > >>> not > >>>>> how > >>>>>>>>>> cogroup > >>>>>>>>>>> is > >>>>>>>>>>>> to be used.) > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks, > >>>>>>>>>>>> Kyle > >>>>>>>>>>>> > >>>>>>>>>>>> On Sun, May 7, 2017 at 5:41 AM, Eno Thereska < > >>>>>>>> eno.there...@gmail.com > >>>>>>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi Kyle, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for the KIP again. A couple of comments: > >>>>>>>>>>>>> > >>>>>>>>>>>>> - minor: could you add an exact example (similar to what > >>>>> Jay’s > >>>>>>>>> example > >>>>>>>>>>> is, > >>>>>>>>>>>>> or like your Spark/Pig pointers had) to make this super > >>>>>> concrete? > >>>>>>>>>>>>> > >>>>>>>>>>>>> - my main concern is that we’re exposing this > >> optimization > >>>>> to > >>>>>> the > >>>>>>>>> DSL. > >>>>>>>>>>> In > >>>>>>>>>>>>> an ideal world, an optimizer would take the existing DSL > >>>>> and do > >>>>>>>> the > >>>>>>>>>>> right > >>>>>>>>>>>>> thing under the covers (create just one state store, > >>> arrange > >>>>>> the > >>>>>>>>> nodes > >>>>>>>>>>>>> etc). The original DSL had a bunch of small, composable > >>>>> pieces > >>>>>>>>> (group, > >>>>>>>>>>>>> aggregate, join) that this proposal groups together. I’d > >>>>> like > >>>>>> to > >>>>>>>>> hear > >>>>>>>>>>> your > >>>>>>>>>>>>> thoughts on whether it’s possible to do this > >> optimization > >>>>> with > >>>>>>>> the > >>>>>>>>>>> current > >>>>>>>>>>>>> DSL, at the topology builder level. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I think there will be scope for several such > >> optimizations > >>>>> in > >>>>>> the > >>>>>>>>>> future > >>>>>>>>>>>>> and perhaps at some point we need to think about > >>> decoupling > >>>>> the > >>>>>>>> 1:1 > >>>>>>>>>>> mapping > >>>>>>>>>>>>> from the DSL into the physical topology. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks > >>>>>>>>>>>>> Eno > >>>>>>>>>>>>> > >>>>>>>>>>>>>> On May 5, 2017, at 4:39 PM, Jay Kreps < > >> j...@confluent.io> > >>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I haven't digested the proposal but the use case is > >>> pretty > >>>>>>>> common. > >>>>>>>>> An > >>>>>>>>>>>>>> example would be the "customer 360" or "unified > >> customer > >>>>>>>> profile" > >>>>>>>>> use > >>>>>>>>>>>>> case > >>>>>>>>>>>>>> we often use. In that use case you have a dozen systems > >>>>> each > >>>>>> of > >>>>>>>>> which > >>>>>>>>>>> has > >>>>>>>>>>>>>> some information about your customer (account details, > >>>>>> settings, > >>>>>>>>>>> billing > >>>>>>>>>>>>>> info, customer service contacts, purchase history, > >> etc). > >>>>> Your > >>>>>>>> goal > >>>>>>>>> is > >>>>>>>>>>> to > >>>>>>>>>>>>>> join/munge these into a single profile record for each > >>>>>> customer > >>>>>>>>> that > >>>>>>>>>>> has > >>>>>>>>>>>>>> all the relevant info in a usable form and is > >> up-to-date > >>>>> with > >>>>>>>> all > >>>>>>>>> the > >>>>>>>>>>>>>> source systems. If you implement that with kstreams as > >> a > >>>>>>>> sequence > >>>>>>>>> of > >>>>>>>>>>>>> joins > >>>>>>>>>>>>>> i think today we'd fully materialize N-1 intermediate > >>>>> tables. > >>>>>>>> But > >>>>>>>>>>> clearly > >>>>>>>>>>>>>> you only need a single stage to group all these things > >>> that > >>>>>> are > >>>>>>>>>> already > >>>>>>>>>>>>>> co-partitioned. A distributed database would do this > >>> under > >>>>> the > >>>>>>>>> covers > >>>>>>>>>>>>> which > >>>>>>>>>>>>>> is arguably better (at least when it does the right > >>> thing) > >>>>> and > >>>>>>>>>> perhaps > >>>>>>>>>>> we > >>>>>>>>>>>>>> could do the same thing but I'm not sure we know the > >>>>>>>> partitioning > >>>>>>>>> so > >>>>>>>>>> we > >>>>>>>>>>>>> may > >>>>>>>>>>>>>> need an explicit cogroup command that impllies they are > >>>>>> already > >>>>>>>>>>>>>> co-partitioned. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> -Jay > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman < > >>>>>>>>>>> winkelman.k...@gmail.com > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Yea thats a good way to look at it. > >>>>>>>>>>>>>>> I have seen this type of functionality in a couple > >> other > >>>>>>>> platforms > >>>>>>>>>>> like > >>>>>>>>>>>>>>> spark and pig. > >>>>>>>>>>>>>>> https://spark.apache.org/docs/0.6.2/api/core/spark/ > >>>>>>>>>>>>> PairRDDFunctions.html > >>>>>>>>>>>>>>> https://www.tutorialspoint.com/apache_pig/apache_pig_ > >>>>>>>>>>>>> cogroup_operator.htm > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On May 5, 2017 7:43 AM, "Damian Guy" < > >>>>> damian....@gmail.com> > >>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Kyle, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> If i'm reading this correctly it is like an N way > >> outer > >>>>>> join? > >>>>>>>> So > >>>>>>>>> an > >>>>>>>>>>>>> input > >>>>>>>>>>>>>>>> on any stream will always produce a new aggregated > >>> value > >>>>> - > >>>>>> is > >>>>>>>>> that > >>>>>>>>>>>>>>> correct? > >>>>>>>>>>>>>>>> Effectively, each Aggregator just looks up the > >> current > >>>>>> value, > >>>>>>>>>>>>> aggregates > >>>>>>>>>>>>>>>> and forwards the result. > >>>>>>>>>>>>>>>> I need to look into it and think about it a bit more, > >>>>> but it > >>>>>>>>> seems > >>>>>>>>>>> like > >>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>> could be a useful optimization. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman < > >>>>>>>>>> winkelman.k...@gmail.com > >>>>>>>>>>>> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I sure can. I have added the following description > >> to > >>> my > >>>>>>>> KIP. If > >>>>>>>>>>> this > >>>>>>>>>>>>>>>>> doesn't help let me know and I will take some more > >>> time > >>>>> to > >>>>>>>>> build a > >>>>>>>>>>>>>>>> diagram > >>>>>>>>>>>>>>>>> and make more of a step by step description: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Example with Current API: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> KTable<K, V1> table1 = > >>>>>>>>>>>>>>>>> builder.stream("topic1").groupByKey().aggregate( > >>>>>> initializer1 > >>>>>>>> , > >>>>>>>>>>>>>>>> aggregator1, > >>>>>>>>>>>>>>>>> aggValueSerde1, storeName1); > >>>>>>>>>>>>>>>>> KTable<K, V2> table2 = > >>>>>>>>>>>>>>>>> builder.stream("topic2").groupByKey().aggregate( > >>>>>> initializer2 > >>>>>>>> , > >>>>>>>>>>>>>>>> aggregator2, > >>>>>>>>>>>>>>>>> aggValueSerde2, storeName2); > >>>>>>>>>>>>>>>>> KTable<K, V3> table3 = > >>>>>>>>>>>>>>>>> builder.stream("topic3").groupByKey().aggregate( > >>>>>> initializer3 > >>>>>>>> , > >>>>>>>>>>>>>>>> aggregator3, > >>>>>>>>>>>>>>>>> aggValueSerde3, storeName3); > >>>>>>>>>>>>>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2, > >>>>>>>>>>>>>>>>> joinerOneAndTwo).outerJoin(table3, > >>>>> joinerOneTwoAndThree); > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> As you can see this creates 3 StateStores, requires > >> 3 > >>>>>>>>>> initializers, > >>>>>>>>>>>>>>> and 3 > >>>>>>>>>>>>>>>>> aggValueSerdes. This also adds the pressure to user > >> to > >>>>>> define > >>>>>>>>> what > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>> intermediate values are going to be (V1, V2, V3). > >> They > >>>>> are > >>>>>>>> left > >>>>>>>>>>> with a > >>>>>>>>>>>>>>>>> couple choices, first to make V1, V2, and V3 all the > >>>>> same > >>>>>> as > >>>>>>>> CG > >>>>>>>>>> and > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> two > >>>>>>>>>>>>>>>>> joiners are more like mergers, or second make them > >>>>>>>> intermediate > >>>>>>>>>>> states > >>>>>>>>>>>>>>>> such > >>>>>>>>>>>>>>>>> as Topic1Map, Topic2Map, and Topic3Map and the > >> joiners > >>>>> use > >>>>>>>> those > >>>>>>>>>> to > >>>>>>>>>>>>>>> build > >>>>>>>>>>>>>>>>> the final aggregate CG value. This is something the > >>> user > >>>>>>>> could > >>>>>>>>>> avoid > >>>>>>>>>>>>>>>>> thinking about with this KIP. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> When a new input arrives lets say at "topic1" it > >> will > >>>>> first > >>>>>>>> go > >>>>>>>>>>> through > >>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>> KStreamAggregate grabbing the current aggregate from > >>>>>>>> storeName1. > >>>>>>>>>> It > >>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>> produce this in the form of the first intermediate > >>> value > >>>>>> and > >>>>>>>> get > >>>>>>>>>>> sent > >>>>>>>>>>>>>>>>> through a KTableKTableOuterJoin where it will look > >> up > >>>>> the > >>>>>>>>> current > >>>>>>>>>>>>> value > >>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>> the key in storeName2. It will use the first joiner > >> to > >>>>>>>> calculate > >>>>>>>>>> the > >>>>>>>>>>>>>>>> second > >>>>>>>>>>>>>>>>> intermediate value, which will go through an > >>> additional > >>>>>>>>>>>>>>>>> KTableKTableOuterJoin. Here it will look up the > >>> current > >>>>>>>> value of > >>>>>>>>>> the > >>>>>>>>>>>>>>> key > >>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>> storeName3 and use the second joiner to build the > >>> final > >>>>>>>>> aggregate > >>>>>>>>>>>>>>> value. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> If you think through all possibilities for incoming > >>>>> topics > >>>>>>>> you > >>>>>>>>>> will > >>>>>>>>>>>>> see > >>>>>>>>>>>>>>>>> that no matter which topic it comes in through all > >>> three > >>>>>>>> stores > >>>>>>>>>> are > >>>>>>>>>>>>>>>> queried > >>>>>>>>>>>>>>>>> and all of the joiners must get used. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Topology wise for N incoming streams this creates N > >>>>>>>>>>>>>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, > >> and > >>>>> N-1 > >>>>>>>>>>>>>>>>> KTableKTableJoinMergers. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Example with Proposed API: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> KGroupedStream<K, V1> grouped1 = > >>>>> builder.stream("topic1"). > >>>>>>>>>>>>>>> groupByKey(); > >>>>>>>>>>>>>>>>> KGroupedStream<K, V2> grouped2 = > >>>>> builder.stream("topic2"). > >>>>>>>>>>>>>>> groupByKey(); > >>>>>>>>>>>>>>>>> KGroupedStream<K, V3> grouped3 = > >>>>> builder.stream("topic3"). > >>>>>>>>>>>>>>> groupByKey(); > >>>>>>>>>>>>>>>>> KTable<K, CG> cogrouped = > >>> grouped1.cogroup(initializer1, > >>>>>>>>>>> aggregator1, > >>>>>>>>>>>>>>>>> aggValueSerde1, storeName1) > >>>>>>>>>>>>>>>>> .cogroup(grouped2, aggregator2) > >>>>>>>>>>>>>>>>> .cogroup(grouped3, aggregator3) > >>>>>>>>>>>>>>>>> .aggregate(); > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> As you can see this creates 1 StateStore, requires 1 > >>>>>>>>> initializer, > >>>>>>>>>>> and > >>>>>>>>>>>>> 1 > >>>>>>>>>>>>>>>>> aggValueSerde. The user no longer has to worry about > >>> the > >>>>>>>>>>> intermediate > >>>>>>>>>>>>>>>>> values and the joiners. All they have to think about > >>> is > >>>>> how > >>>>>>>> each > >>>>>>>>>>>>> stream > >>>>>>>>>>>>>>>>> impacts the creation of the final CG object. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> When a new input arrives lets say at "topic1" it > >> will > >>>>> first > >>>>>>>> go > >>>>>>>>>>> through > >>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>> KStreamAggreagte and grab the current aggregate from > >>>>>>>> storeName1. > >>>>>>>>>> It > >>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>> add its incoming object to the aggregate, update the > >>>>> store > >>>>>>>> and > >>>>>>>>>> pass > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>> aggregate on. This new aggregate goes through the > >>>>>>>> KStreamCogroup > >>>>>>>>>>> which > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>> pretty much just a pass through processor and you > >> are > >>>>> done. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Topology wise for N incoming streams the new api > >> will > >>>>> only > >>>>>>>> every > >>>>>>>>>>>>>>> create N > >>>>>>>>>>>>>>>>> KStreamAggregates and 1 KStreamCogroup. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax < > >>>>>>>>>>>>> matth...@confluent.io > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Kyle, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> thanks a lot for the KIP. Maybe I am a little slow, > >>>>> but I > >>>>>>>> could > >>>>>>>>>> not > >>>>>>>>>>>>>>>>>> follow completely. Could you maybe add a more > >>> concrete > >>>>>>>> example, > >>>>>>>>>>> like > >>>>>>>>>>>>>>> 3 > >>>>>>>>>>>>>>>>>> streams with 3 records each (plus expected result), > >>> and > >>>>>> show > >>>>>>>>> the > >>>>>>>>>>>>>>>>>> difference between current way to to implement it > >> and > >>>>> the > >>>>>>>>>> proposed > >>>>>>>>>>>>>>> API? > >>>>>>>>>>>>>>>>>> This could also cover the internal processing to > >> see > >>>>> what > >>>>>>>> store > >>>>>>>>>>> calls > >>>>>>>>>>>>>>>>>> would be required for both approaches etc. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I think, it's pretty advanced stuff you propose, > >> and > >>> it > >>>>>>>> would > >>>>>>>>>> help > >>>>>>>>>>> to > >>>>>>>>>>>>>>>>>> understand it better. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks a lot! > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote: > >>>>>>>>>>>>>>>>>>> I have made a pull request. It can be found here. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/2975 > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I plan to write some more unit tests for my > >> classes > >>>>> and > >>>>>> get > >>>>>>>>>> around > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> writing documentation for the public api > >> additions. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> One thing I was curious about is during the > >>>>>>>>>>>>>>>>>> KCogroupedStreamImpl#aggregate > >>>>>>>>>>>>>>>>>>> method I pass null to the KGroupedStream# > >>>>>>>>> repartitionIfRequired > >>>>>>>>>>>>>>>> method. > >>>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>> can't supply the store name because if more than > >> one > >>>>>>>> grouped > >>>>>>>>>>> stream > >>>>>>>>>>>>>>>>>>> repartitions an error is thrown. Is there some > >> name > >>>>> that > >>>>>>>>> someone > >>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>> recommend or should I leave the null and allow it > >> to > >>>>> fall > >>>>>>>> back > >>>>>>>>>> to > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> KGroupedStream.name? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Should this be expanded to handle grouped tables? > >>> This > >>>>>>>> would > >>>>>>>>> be > >>>>>>>>>>>>>>>> pretty > >>>>>>>>>>>>>>>>>> easy > >>>>>>>>>>>>>>>>>>> for a normal aggregate but one allowing session > >>> stores > >>>>>> and > >>>>>>>>>>> windowed > >>>>>>>>>>>>>>>>>> stores > >>>>>>>>>>>>>>>>>>> would required KTableSessionWindowAggregate and > >>>>>>>>>>>>>>> KTableWindowAggregate > >>>>>>>>>>>>>>>>>>> implementations. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>> Kyle > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" < > >>>>>>>>> eno.there...@gmail.com> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I’ll look as well asap, sorry, been swamped. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Eno > >>>>>>>>>>>>>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy < > >>>>>>>>> damian....@gmail.com> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Hi Kyle, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. I apologize that i haven't > >> had > >>>>> the > >>>>>>>>> chance > >>>>>>>>>> to > >>>>>>>>>>>>>>>> look > >>>>>>>>>>>>>>>>>> at > >>>>>>>>>>>>>>>>>>>>> the KIP yet, but will schedule some time to look > >>>>> into > >>>>>> it > >>>>>>>>>>>>>>> tomorrow. > >>>>>>>>>>>>>>>>> For > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> implementation, can you raise a PR against kafka > >>>>> trunk > >>>>>>>> and > >>>>>>>>>> mark > >>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>> WIP? > >>>>>>>>>>>>>>>>>>>>> It will be easier to review what you have done. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>> Damian > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman < > >>>>>>>>>>>>>>>> winkelman.k...@gmail.com > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> I am replying to this in hopes it will draw > >> some > >>>>>>>> attention > >>>>>>>>> to > >>>>>>>>>>> my > >>>>>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>>>> haven't heard from anyone in a couple days. > >> This > >>>>> is my > >>>>>>>>> first > >>>>>>>>>>> KIP > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>> my > >>>>>>>>>>>>>>>>>>>>>> first large contribution to the project so I'm > >>>>> sure I > >>>>>>>> did > >>>>>>>>>>>>>>>> something > >>>>>>>>>>>>>>>>>>>> wrong. > >>>>>>>>>>>>>>>>>>>>>> ;) > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" < > >>>>>>>>>>>>>>>> winkelman.k...@gmail.com> > >>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Hello all, > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I have created KIP-150 to facilitate > >> discussion > >>>>> about > >>>>>>>>> adding > >>>>>>>>>>>>>>>>> cogroup > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>> the streams DSL. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Please find the KIP here: > >>>>>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/ > >>>>>> confluence/display/KAFKA/KIP- > >>>>>>>>>>>>>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Please find my initial implementation here: > >>>>>>>>>>>>>>>>>>>>>>> https://github.com/KyleWinkelman/kafka > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>>>> Kyle Winkelman > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> -- Guozhang > >>>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> -- Guozhang > >>>>>> > >>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> -- Guozhang > >>>>> > >>>> > >>> > >> > > -- -- Guozhang