I have updated the KIP and my PR. Let me know what you think. To created a cogrouped stream just call cogroup on a KgroupedStream and supply the initializer, aggValueSerde, and an aggregator. Then continue adding kgroupedstreams and aggregators. Then call one of the many aggregate calls to create a KTable.
Thanks, Kyle On Jun 1, 2017 4:03 AM, "Damian Guy" <damian....@gmail.com> wrote: > Hi Kyle, > > Thanks for the update. I think just one initializer makes sense as it > should only be called once per key and generally it is just going to create > a new instance of whatever the Aggregate class is. > > Cheers, > Damian > > On Wed, 31 May 2017 at 20:09 Kyle Winkelman <winkelman.k...@gmail.com> > wrote: > > > Hello all, > > > > I have spent some more time on this and the best alternative I have come > up > > with is: > > KGroupedStream has a single cogroup call that takes an initializer and an > > aggregator. > > CogroupedKStream has a cogroup call that takes additional groupedStream > > aggregator pairs. > > CogroupedKStream has multiple aggregate methods that create the different > > stores. > > > > I plan on updating the kip but I want people's input on if we should have > > the initializer be passed in once at the beginning or if we should > instead > > have the initializer be required for each call to one of the aggregate > > calls. The first makes more sense to me but doesnt allow the user to > > specify different initializers for different tables. > > > > Thanks, > > Kyle > > > > On May 24, 2017 7:46 PM, "Kyle Winkelman" <winkelman.k...@gmail.com> > > wrote: > > > > > Yea I really like that idea I'll see what I can do to update the kip > and > > > my pr when I have some time. I'm not sure how well creating the > > > kstreamaggregates will go though because at that point I will have > thrown > > > away the type of the values. It will be type safe I just may need to > do a > > > little forcing. > > > > > > Thanks, > > > Kyle > > > > > > On May 24, 2017 3:28 PM, "Guozhang Wang" <wangg...@gmail.com> wrote: > > > > > >> Kyle, > > >> > > >> Thanks for the explanations, my previous read on the wiki examples was > > >> wrong. > > >> > > >> So I guess my motivation should be "reduced" to: can we move the > window > > >> specs param from "KGroupedStream#cogroup(..)" to > > >> "CogroupedKStream#aggregate(..)", and my motivations are: > > >> > > >> 1. minor: we can reduce the #.generics in CogroupedKStream from 3 to > 2. > > >> 2. major: this is for extensibility of the APIs, and since we are > > removing > > >> the "Evolving" annotations on Streams it may be harder to change it > > again > > >> in the future. The extended use cases are that people wanted to have > > >> windowed running aggregates on different granularities, e.g. "give me > > the > > >> counts per-minute, per-hour, per-day and per-week", and today in DSL > we > > >> need to specify that case in multiple aggregate operators, which gets > a > > >> state store / changelog, etc. And it is possible to optimize it as > well > > to > > >> a single state store. Its implementation would be tricky as you need > to > > >> contain different lengthed windows within your window store but just > > from > > >> the public API point of view, it could be specified as: > > >> > > >> CogroupedKStream stream = stream1.cogroup(stream2, ... > > >> "state-store-name"); > > >> > > >> table1 = stream.aggregate(/*per-minute window*/) > > >> table2 = stream.aggregate(/*per-hour window*/) > > >> table3 = stream.aggregate(/*per-day window*/) > > >> > > >> while underlying we are only using a single store "state-store-name" > for > > >> it. > > >> > > >> > > >> Although this feature is out of the scope of this KIP, I'd like to > > discuss > > >> if we can "leave the door open" to make such changes without modifying > > the > > >> public APIs . > > >> > > >> Guozhang > > >> > > >> > > >> On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman < > > winkelman.k...@gmail.com > > >> > > > >> wrote: > > >> > > >> > I allow defining a single window/sessionwindow one time when you > make > > >> the > > >> > cogroup call from a KGroupedStream. From then on you are using the > > >> cogroup > > >> > call from with in CogroupedKStream which doesnt accept any > additional > > >> > windows/sessionwindows. > > >> > > > >> > Is this what you meant by your question or did I misunderstand? > > >> > > > >> > On May 23, 2017 9:33 PM, "Guozhang Wang" <wangg...@gmail.com> > wrote: > > >> > > > >> > Another question that came to me is on "window alignment": from the > > KIP > > >> it > > >> > seems you are allowing users to specify a (potentially different) > > window > > >> > spec in each co-grouped input stream. So if these window specs are > > >> > different how should we "align" them with different input streams? I > > >> think > > >> > it is more natural to only specify on window spec in the > > >> > > > >> > KTable<RK, V> CogroupedKStream#aggregate(Windows); > > >> > > > >> > > > >> > And remove it from the cogroup() functions. WDYT? > > >> > > > >> > > > >> > Guozhang > > >> > > > >> > On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang <wangg...@gmail.com> > > >> wrote: > > >> > > > >> > > Thanks for the proposal Kyle, this is a quite common use case to > > >> support > > >> > > such multi-way table join (i.e. N source tables with N aggregate > > func) > > >> > with > > >> > > a single store and N+1 serdes, I have seen lots of people using > the > > >> > > low-level PAPI to achieve this goal. > > >> > > > > >> > > > > >> > > On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman < > > >> > winkelman.k...@gmail.com > > >> > > > wrote: > > >> > > > > >> > >> I like your point about not handling other cases such as count > and > > >> > reduce. > > >> > >> > > >> > >> I think that reduce may not make sense because reduce assumes > that > > >> the > > >> > >> input values are the same as the output values. With cogroup > there > > >> may > > >> > be > > >> > >> multiple different input types and then your output type cant be > > >> > multiple > > >> > >> different things. In the case where you have all matching value > > types > > >> > you > > >> > >> can do KStreamBuilder#merge followed by the reduce. > > >> > >> > > >> > >> As for count I think it is possible to call count on all the > > >> individual > > >> > >> grouped streams and then do joins. Otherwise we could maybe make > a > > >> > special > > >> > >> call in groupedstream for this case. Because in this case we dont > > >> need > > >> > to > > >> > >> do type checking on the values. It could be similar to the > current > > >> count > > >> > >> methods but accept a var args of additonal grouped streams as > well > > >> and > > >> > >> make > > >> > >> sure they have a key type of K. > > >> > >> > > >> > >> The way I have put the kip together is to ensure that we do type > > >> > checking. > > >> > >> I don't see a way we could group them all first and then make a > > call > > >> to > > >> > >> count, reduce, or aggregate because with aggregate they would > need > > to > > >> > pass > > >> > >> a list of aggregators and we would have no way of type checking > > that > > >> > they > > >> > >> match the grouped streams. > > >> > >> > > >> > >> Thanks, > > >> > >> Kyle > > >> > >> > > >> > >> On May 19, 2017 11:42 AM, "Xavier Léauté" <xav...@confluent.io> > > >> wrote: > > >> > >> > > >> > >> > Sorry to jump on this thread so late. I agree this is a very > > useful > > >> > >> > addition and wanted to provide an additional use-case and some > > more > > >> > >> > comments. > > >> > >> > > > >> > >> > This is actually a very common analytics use-case in the > ad-tech > > >> > >> industry. > > >> > >> > The typical setup will have an auction stream, an impression > > >> stream, > > >> > >> and a > > >> > >> > click stream. Those three streams need to be combined to > compute > > >> > >> aggregate > > >> > >> > statistics (e.g. impression statistics, and click-through > rates), > > >> > since > > >> > >> > most of the attributes of interest are only present the auction > > >> > stream. > > >> > >> > > > >> > >> > A simple way to do this is to co-group all the streams by the > > >> auction > > >> > >> key, > > >> > >> > and process updates to the co-group as events for each stream > > come > > >> in, > > >> > >> > keeping only one value from each stream before sending > downstream > > >> for > > >> > >> > further processing / aggregation. > > >> > >> > > > >> > >> > One could view the result of that co-group operation as a > > "KTable" > > >> > with > > >> > >> > multiple values per key. The key being the grouping key, and > the > > >> > values > > >> > >> > consisting of one value per stream. > > >> > >> > > > >> > >> > What I like about Kyle's approach is that allows elegant > > >> co-grouping > > >> > of > > >> > >> > multiple streams without having to worry about the number of > > >> streams, > > >> > >> and > > >> > >> > avoids dealing with Tuple types or other generic interfaces > that > > >> could > > >> > >> get > > >> > >> > messy if we wanted to preserve all the value types in the > > resulting > > >> > >> > co-grouped stream. > > >> > >> > > > >> > >> > My only concern is that we only allow the cogroup + aggregate > > >> combined > > >> > >> > operation. This forces the user to build their own tuple > > >> serialization > > >> > >> > format if they want to preserve the individual input stream > > values > > >> as > > >> > a > > >> > >> > group. It also deviates quite a bit from our approach in > > >> > KGroupedStream > > >> > >> > which offers other operations, such as count and reduce, which > > >> should > > >> > >> also > > >> > >> > be applicable to a co-grouped stream. > > >> > >> > > > >> > >> > Overall I still think this is a really useful addition, but I > > feel > > >> we > > >> > >> > haven't spend much time trying to explore alternative DSLs that > > >> could > > >> > >> maybe > > >> > >> > generalize better or match our existing syntax more closely. > > >> > >> > > > >> > >> > On Tue, May 9, 2017 at 8:08 AM Kyle Winkelman < > > >> > winkelman.k...@gmail.com > > >> > >> > > > >> > >> > wrote: > > >> > >> > > > >> > >> > > Eno, is there anyone else that is an expert in the kafka > > streams > > >> > realm > > >> > >> > that > > >> > >> > > I should reach out to for input? > > >> > >> > > > > >> > >> > > I believe Damian Guy is still planning on reviewing this more > > in > > >> > depth > > >> > >> > so I > > >> > >> > > will wait for his inputs before continuing. > > >> > >> > > > > >> > >> > > On May 9, 2017 7:30 AM, "Eno Thereska" < > eno.there...@gmail.com > > > > > >> > >> wrote: > > >> > >> > > > > >> > >> > > > Thanks Kyle, good arguments. > > >> > >> > > > > > >> > >> > > > Eno > > >> > >> > > > > > >> > >> > > > > On May 7, 2017, at 5:06 PM, Kyle Winkelman < > > >> > >> winkelman.k...@gmail.com > > >> > >> > > > > >> > >> > > > wrote: > > >> > >> > > > > > > >> > >> > > > > *- minor: could you add an exact example (similar to what > > >> Jay’s > > >> > >> > example > > >> > >> > > > is, > > >> > >> > > > > or like your Spark/Pig pointers had) to make this super > > >> > concrete?* > > >> > >> > > > > I have added a more concrete example to the KIP. > > >> > >> > > > > > > >> > >> > > > > *- my main concern is that we’re exposing this > optimization > > >> to > > >> > the > > >> > >> > DSL. > > >> > >> > > > In > > >> > >> > > > > an ideal world, an optimizer would take the existing DSL > > and > > >> do > > >> > >> the > > >> > >> > > right > > >> > >> > > > > thing under the covers (create just one state store, > > arrange > > >> the > > >> > >> > nodes > > >> > >> > > > > etc). The original DSL had a bunch of small, composable > > >> pieces > > >> > >> > (group, > > >> > >> > > > > aggregate, join) that this proposal groups together. I’d > > >> like to > > >> > >> hear > > >> > >> > > > your > > >> > >> > > > > thoughts on whether it’s possible to do this optimization > > >> with > > >> > the > > >> > >> > > > current > > >> > >> > > > > DSL, at the topology builder level.* > > >> > >> > > > > You would have to make a lot of checks to understand if > it > > is > > >> > even > > >> > >> > > > possible > > >> > >> > > > > to make this optimization: > > >> > >> > > > > 1. Make sure they are all KTableKTableOuterJoins > > >> > >> > > > > 2. None of the intermediate KTables are used for anything > > >> else. > > >> > >> > > > > 3. None of the intermediate stores are used. (This may be > > >> > >> impossible > > >> > >> > > > > especially if they use KafkaStreams#store after the > > topology > > >> has > > >> > >> > > already > > >> > >> > > > > been built.) > > >> > >> > > > > You would then need to make decisions during the > > >> optimization: > > >> > >> > > > > 1. Your new initializer would the composite of all the > > >> > individual > > >> > >> > > > > initializers and the valueJoiners. > > >> > >> > > > > 2. I am having a hard time thinking about how you would > > turn > > >> the > > >> > >> > > > > aggregators and valueJoiners into an aggregator that > would > > >> work > > >> > on > > >> > >> > the > > >> > >> > > > > final object, but this may be possible. > > >> > >> > > > > 3. Which state store would you use? The ones declared > would > > >> be > > >> > for > > >> > >> > the > > >> > >> > > > > aggregate values. None of the declared ones would be > > >> guaranteed > > >> > to > > >> > >> > hold > > >> > >> > > > the > > >> > >> > > > > final object. This would mean you must created a new > state > > >> store > > >> > >> and > > >> > >> > > not > > >> > >> > > > > created any of the declared ones. > > >> > >> > > > > > > >> > >> > > > > The main argument I have against it is even if it could > be > > >> done > > >> > I > > >> > >> > don't > > >> > >> > > > > know that we would want to have this be an optimization > in > > >> the > > >> > >> > > background > > >> > >> > > > > because the user would still be required to think about > all > > >> of > > >> > the > > >> > >> > > > > intermediate values that they shouldn't need to worry > about > > >> if > > >> > >> they > > >> > >> > > only > > >> > >> > > > > care about the final object. > > >> > >> > > > > > > >> > >> > > > > In my opinion cogroup is a common enough case that it > > should > > >> be > > >> > >> part > > >> > >> > of > > >> > >> > > > the > > >> > >> > > > > composable pieces (group, aggregate, join) because we > want > > to > > >> > >> allow > > >> > >> > > > people > > >> > >> > > > > to join more than 2 or more streams in an easy way. Right > > >> now I > > >> > >> don't > > >> > >> > > > think > > >> > >> > > > > we give them ways of handling this use case easily. > > >> > >> > > > > > > >> > >> > > > > *-I think there will be scope for several such > > optimizations > > >> in > > >> > >> the > > >> > >> > > > future > > >> > >> > > > > and perhaps at some point we need to think about > decoupling > > >> the > > >> > >> 1:1 > > >> > >> > > > mapping > > >> > >> > > > > from the DSL into the physical topology.* > > >> > >> > > > > I would argue that cogroup is not just an optimization it > > is > > >> a > > >> > new > > >> > >> > way > > >> > >> > > > for > > >> > >> > > > > the users to look at accomplishing a problem that > requires > > >> > >> multiple > > >> > >> > > > > streams. I may sound like a broken record but I don't > think > > >> > users > > >> > >> > > should > > >> > >> > > > > have to build the N-1 intermediate tables and deal with > > their > > >> > >> > > > initializers, > > >> > >> > > > > serdes and stores if all they care about is the final > > object. > > >> > >> > > > > Now if for example someone uses cogroup but doesn't > supply > > >> > >> additional > > >> > >> > > > > streams and aggregators this case is equivalent to a > single > > >> > >> grouped > > >> > >> > > > stream > > >> > >> > > > > making an aggregate call. This case is what I view an > > >> > optimization > > >> > >> > as, > > >> > >> > > we > > >> > >> > > > > could remove the KStreamCogroup and act as if there was > > just > > >> a > > >> > >> call > > >> > >> > to > > >> > >> > > > > KGroupedStream#aggregate instead of calling > > >> > >> KGroupedStream#cogroup. > > >> > >> > (I > > >> > >> > > > > would prefer to just write a warning saying that this is > > not > > >> how > > >> > >> > > cogroup > > >> > >> > > > is > > >> > >> > > > > to be used.) > > >> > >> > > > > > > >> > >> > > > > Thanks, > > >> > >> > > > > Kyle > > >> > >> > > > > > > >> > >> > > > > On Sun, May 7, 2017 at 5:41 AM, Eno Thereska < > > >> > >> eno.there...@gmail.com > > >> > >> > > > > >> > >> > > > wrote: > > >> > >> > > > > > > >> > >> > > > >> Hi Kyle, > > >> > >> > > > >> > > >> > >> > > > >> Thanks for the KIP again. A couple of comments: > > >> > >> > > > >> > > >> > >> > > > >> - minor: could you add an exact example (similar to what > > >> Jay’s > > >> > >> > example > > >> > >> > > > is, > > >> > >> > > > >> or like your Spark/Pig pointers had) to make this super > > >> > concrete? > > >> > >> > > > >> > > >> > >> > > > >> - my main concern is that we’re exposing this > optimization > > >> to > > >> > the > > >> > >> > DSL. > > >> > >> > > > In > > >> > >> > > > >> an ideal world, an optimizer would take the existing DSL > > >> and do > > >> > >> the > > >> > >> > > > right > > >> > >> > > > >> thing under the covers (create just one state store, > > arrange > > >> > the > > >> > >> > nodes > > >> > >> > > > >> etc). The original DSL had a bunch of small, composable > > >> pieces > > >> > >> > (group, > > >> > >> > > > >> aggregate, join) that this proposal groups together. I’d > > >> like > > >> > to > > >> > >> > hear > > >> > >> > > > your > > >> > >> > > > >> thoughts on whether it’s possible to do this > optimization > > >> with > > >> > >> the > > >> > >> > > > current > > >> > >> > > > >> DSL, at the topology builder level. > > >> > >> > > > >> > > >> > >> > > > >> I think there will be scope for several such > optimizations > > >> in > > >> > the > > >> > >> > > future > > >> > >> > > > >> and perhaps at some point we need to think about > > decoupling > > >> the > > >> > >> 1:1 > > >> > >> > > > mapping > > >> > >> > > > >> from the DSL into the physical topology. > > >> > >> > > > >> > > >> > >> > > > >> Thanks > > >> > >> > > > >> Eno > > >> > >> > > > >> > > >> > >> > > > >>> On May 5, 2017, at 4:39 PM, Jay Kreps < > j...@confluent.io> > > >> > wrote: > > >> > >> > > > >>> > > >> > >> > > > >>> I haven't digested the proposal but the use case is > > pretty > > >> > >> common. > > >> > >> > An > > >> > >> > > > >>> example would be the "customer 360" or "unified > customer > > >> > >> profile" > > >> > >> > use > > >> > >> > > > >> case > > >> > >> > > > >>> we often use. In that use case you have a dozen systems > > >> each > > >> > of > > >> > >> > which > > >> > >> > > > has > > >> > >> > > > >>> some information about your customer (account details, > > >> > settings, > > >> > >> > > > billing > > >> > >> > > > >>> info, customer service contacts, purchase history, > etc). > > >> Your > > >> > >> goal > > >> > >> > is > > >> > >> > > > to > > >> > >> > > > >>> join/munge these into a single profile record for each > > >> > customer > > >> > >> > that > > >> > >> > > > has > > >> > >> > > > >>> all the relevant info in a usable form and is > up-to-date > > >> with > > >> > >> all > > >> > >> > the > > >> > >> > > > >>> source systems. If you implement that with kstreams as > a > > >> > >> sequence > > >> > >> > of > > >> > >> > > > >> joins > > >> > >> > > > >>> i think today we'd fully materialize N-1 intermediate > > >> tables. > > >> > >> But > > >> > >> > > > clearly > > >> > >> > > > >>> you only need a single stage to group all these things > > that > > >> > are > > >> > >> > > already > > >> > >> > > > >>> co-partitioned. A distributed database would do this > > under > > >> the > > >> > >> > covers > > >> > >> > > > >> which > > >> > >> > > > >>> is arguably better (at least when it does the right > > thing) > > >> and > > >> > >> > > perhaps > > >> > >> > > > we > > >> > >> > > > >>> could do the same thing but I'm not sure we know the > > >> > >> partitioning > > >> > >> > so > > >> > >> > > we > > >> > >> > > > >> may > > >> > >> > > > >>> need an explicit cogroup command that impllies they are > > >> > already > > >> > >> > > > >>> co-partitioned. > > >> > >> > > > >>> > > >> > >> > > > >>> -Jay > > >> > >> > > > >>> > > >> > >> > > > >>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman < > > >> > >> > > > winkelman.k...@gmail.com > > >> > >> > > > >>> > > >> > >> > > > >>> wrote: > > >> > >> > > > >>> > > >> > >> > > > >>>> Yea thats a good way to look at it. > > >> > >> > > > >>>> I have seen this type of functionality in a couple > other > > >> > >> platforms > > >> > >> > > > like > > >> > >> > > > >>>> spark and pig. > > >> > >> > > > >>>> https://spark.apache.org/docs/0.6.2/api/core/spark/ > > >> > >> > > > >> PairRDDFunctions.html > > >> > >> > > > >>>> https://www.tutorialspoint.com/apache_pig/apache_pig_ > > >> > >> > > > >> cogroup_operator.htm > > >> > >> > > > >>>> > > >> > >> > > > >>>> > > >> > >> > > > >>>> On May 5, 2017 7:43 AM, "Damian Guy" < > > >> damian....@gmail.com> > > >> > >> > wrote: > > >> > >> > > > >>>> > > >> > >> > > > >>>>> Hi Kyle, > > >> > >> > > > >>>>> > > >> > >> > > > >>>>> If i'm reading this correctly it is like an N way > outer > > >> > join? > > >> > >> So > > >> > >> > an > > >> > >> > > > >> input > > >> > >> > > > >>>>> on any stream will always produce a new aggregated > > value > > >> - > > >> > is > > >> > >> > that > > >> > >> > > > >>>> correct? > > >> > >> > > > >>>>> Effectively, each Aggregator just looks up the > current > > >> > value, > > >> > >> > > > >> aggregates > > >> > >> > > > >>>>> and forwards the result. > > >> > >> > > > >>>>> I need to look into it and think about it a bit more, > > >> but it > > >> > >> > seems > > >> > >> > > > like > > >> > >> > > > >>>> it > > >> > >> > > > >>>>> could be a useful optimization. > > >> > >> > > > >>>>> > > >> > >> > > > >>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman < > > >> > >> > > winkelman.k...@gmail.com > > >> > >> > > > > > > >> > >> > > > >>>>> wrote: > > >> > >> > > > >>>>> > > >> > >> > > > >>>>>> I sure can. I have added the following description > to > > my > > >> > >> KIP. If > > >> > >> > > > this > > >> > >> > > > >>>>>> doesn't help let me know and I will take some more > > time > > >> to > > >> > >> > build a > > >> > >> > > > >>>>> diagram > > >> > >> > > > >>>>>> and make more of a step by step description: > > >> > >> > > > >>>>>> > > >> > >> > > > >>>>>> Example with Current API: > > >> > >> > > > >>>>>> > > >> > >> > > > >>>>>> KTable<K, V1> table1 = > > >> > >> > > > >>>>>> builder.stream("topic1").groupByKey().aggregate( > > >> > initializer1 > > >> > >> , > > >> > >> > > > >>>>> aggregator1, > > >> > >> > > > >>>>>> aggValueSerde1, storeName1); > > >> > >> > > > >>>>>> KTable<K, V2> table2 = > > >> > >> > > > >>>>>> builder.stream("topic2").groupByKey().aggregate( > > >> > initializer2 > > >> > >> , > > >> > >> > > > >>>>> aggregator2, > > >> > >> > > > >>>>>> aggValueSerde2, storeName2); > > >> > >> > > > >>>>>> KTable<K, V3> table3 = > > >> > >> > > > >>>>>> builder.stream("topic3").groupByKey().aggregate( > > >> > initializer3 > > >> > >> , > > >> > >> > > > >>>>> aggregator3, > > >> > >> > > > >>>>>> aggValueSerde3, storeName3); > > >> > >> > > > >>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2, > > >> > >> > > > >>>>>> joinerOneAndTwo).outerJoin(table3, > > >> joinerOneTwoAndThree); > > >> > >> > > > >>>>>> > > >> > >> > > > >>>>>> As you can see this creates 3 StateStores, requires > 3 > > >> > >> > > initializers, > > >> > >> > > > >>>> and 3 > > >> > >> > > > >>>>>> aggValueSerdes. This also adds the pressure to user > to > > >> > define > > >> > >> > what > > >> > >> > > > the > > >> > >> > > > >>>>>> intermediate values are going to be (V1, V2, V3). > They > > >> are > > >> > >> left > > >> > >> > > > with a > > >> > >> > > > >>>>>> couple choices, first to make V1, V2, and V3 all the > > >> same > > >> > as > > >> > >> CG > > >> > >> > > and > > >> > >> > > > >> the > > >> > >> > > > >>>>> two > > >> > >> > > > >>>>>> joiners are more like mergers, or second make them > > >> > >> intermediate > > >> > >> > > > states > > >> > >> > > > >>>>> such > > >> > >> > > > >>>>>> as Topic1Map, Topic2Map, and Topic3Map and the > joiners > > >> use > > >> > >> those > > >> > >> > > to > > >> > >> > > > >>>> build > > >> > >> > > > >>>>>> the final aggregate CG value. This is something the > > user > > >> > >> could > > >> > >> > > avoid > > >> > >> > > > >>>>>> thinking about with this KIP. > > >> > >> > > > >>>>>> > > >> > >> > > > >>>>>> When a new input arrives lets say at "topic1" it > will > > >> first > > >> > >> go > > >> > >> > > > through > > >> > >> > > > >>>> a > > >> > >> > > > >>>>>> KStreamAggregate grabbing the current aggregate from > > >> > >> storeName1. > > >> > >> > > It > > >> > >> > > > >>>> will > > >> > >> > > > >>>>>> produce this in the form of the first intermediate > > value > > >> > and > > >> > >> get > > >> > >> > > > sent > > >> > >> > > > >>>>>> through a KTableKTableOuterJoin where it will look > up > > >> the > > >> > >> > current > > >> > >> > > > >> value > > >> > >> > > > >>>>> of > > >> > >> > > > >>>>>> the key in storeName2. It will use the first joiner > to > > >> > >> calculate > > >> > >> > > the > > >> > >> > > > >>>>> second > > >> > >> > > > >>>>>> intermediate value, which will go through an > > additional > > >> > >> > > > >>>>>> KTableKTableOuterJoin. Here it will look up the > > current > > >> > >> value of > > >> > >> > > the > > >> > >> > > > >>>> key > > >> > >> > > > >>>>> in > > >> > >> > > > >>>>>> storeName3 and use the second joiner to build the > > final > > >> > >> > aggregate > > >> > >> > > > >>>> value. > > >> > >> > > > >>>>>> > > >> > >> > > > >>>>>> If you think through all possibilities for incoming > > >> topics > > >> > >> you > > >> > >> > > will > > >> > >> > > > >> see > > >> > >> > > > >>>>>> that no matter which topic it comes in through all > > three > > >> > >> stores > > >> > >> > > are > > >> > >> > > > >>>>> queried > > >> > >> > > > >>>>>> and all of the joiners must get used. > > >> > >> > > > >>>>>> > > >> > >> > > > >>>>>> Topology wise for N incoming streams this creates N > > >> > >> > > > >>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, > and > > >> N-1 > > >> > >> > > > >>>>>> KTableKTableJoinMergers. > > >> > >> > > > >>>>>> > > >> > >> > > > >>>>>> > > >> > >> > > > >>>>>> > > >> > >> > > > >>>>>> Example with Proposed API: > > >> > >> > > > >>>>>> > > >> > >> > > > >>>>>> KGroupedStream<K, V1> grouped1 = > > >> builder.stream("topic1"). > > >> > >> > > > >>>> groupByKey(); > > >> > >> > > > >>>>>> KGroupedStream<K, V2> grouped2 = > > >> builder.stream("topic2"). > > >> > >> > > > >>>> groupByKey(); > > >> > >> > > > >>>>>> KGroupedStream<K, V3> grouped3 = > > >> builder.stream("topic3"). > > >> > >> > > > >>>> groupByKey(); > > >> > >> > > > >>>>>> KTable<K, CG> cogrouped = > > grouped1.cogroup(initializer1, > > >> > >> > > > aggregator1, > > >> > >> > > > >>>>>> aggValueSerde1, storeName1) > > >> > >> > > > >>>>>> .cogroup(grouped2, aggregator2) > > >> > >> > > > >>>>>> .cogroup(grouped3, aggregator3) > > >> > >> > > > >>>>>> .aggregate(); > > >> > >> > > > >>>>>> > > >> > >> > > > >>>>>> As you can see this creates 1 StateStore, requires 1 > > >> > >> > initializer, > > >> > >> > > > and > > >> > >> > > > >> 1 > > >> > >> > > > >>>>>> aggValueSerde. The user no longer has to worry about > > the > > >> > >> > > > intermediate > > >> > >> > > > >>>>>> values and the joiners. All they have to think about > > is > > >> how > > >> > >> each > > >> > >> > > > >> stream > > >> > >> > > > >>>>>> impacts the creation of the final CG object. > > >> > >> > > > >>>>>> > > >> > >> > > > >>>>>> When a new input arrives lets say at "topic1" it > will > > >> first > > >> > >> go > > >> > >> > > > through > > >> > >> > > > >>>> a > > >> > >> > > > >>>>>> KStreamAggreagte and grab the current aggregate from > > >> > >> storeName1. > > >> > >> > > It > > >> > >> > > > >>>> will > > >> > >> > > > >>>>>> add its incoming object to the aggregate, update the > > >> store > > >> > >> and > > >> > >> > > pass > > >> > >> > > > >> the > > >> > >> > > > >>>>> new > > >> > >> > > > >>>>>> aggregate on. This new aggregate goes through the > > >> > >> KStreamCogroup > > >> > >> > > > which > > >> > >> > > > >>>> is > > >> > >> > > > >>>>>> pretty much just a pass through processor and you > are > > >> done. > > >> > >> > > > >>>>>> > > >> > >> > > > >>>>>> Topology wise for N incoming streams the new api > will > > >> only > > >> > >> every > > >> > >> > > > >>>> create N > > >> > >> > > > >>>>>> KStreamAggregates and 1 KStreamCogroup. > > >> > >> > > > >>>>>> > > >> > >> > > > >>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax < > > >> > >> > > > >> matth...@confluent.io > > >> > >> > > > >>>>> > > >> > >> > > > >>>>>> wrote: > > >> > >> > > > >>>>>> > > >> > >> > > > >>>>>>> Kyle, > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> thanks a lot for the KIP. Maybe I am a little slow, > > >> but I > > >> > >> could > > >> > >> > > not > > >> > >> > > > >>>>>>> follow completely. Could you maybe add a more > > concrete > > >> > >> example, > > >> > >> > > > like > > >> > >> > > > >>>> 3 > > >> > >> > > > >>>>>>> streams with 3 records each (plus expected result), > > and > > >> > show > > >> > >> > the > > >> > >> > > > >>>>>>> difference between current way to to implement it > and > > >> the > > >> > >> > > proposed > > >> > >> > > > >>>> API? > > >> > >> > > > >>>>>>> This could also cover the internal processing to > see > > >> what > > >> > >> store > > >> > >> > > > calls > > >> > >> > > > >>>>>>> would be required for both approaches etc. > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> I think, it's pretty advanced stuff you propose, > and > > it > > >> > >> would > > >> > >> > > help > > >> > >> > > > to > > >> > >> > > > >>>>>>> understand it better. > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> Thanks a lot! > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> -Matthias > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote: > > >> > >> > > > >>>>>>>> I have made a pull request. It can be found here. > > >> > >> > > > >>>>>>>> > > >> > >> > > > >>>>>>>> https://github.com/apache/kafka/pull/2975 > > >> > >> > > > >>>>>>>> > > >> > >> > > > >>>>>>>> I plan to write some more unit tests for my > classes > > >> and > > >> > get > > >> > >> > > around > > >> > >> > > > >>>> to > > >> > >> > > > >>>>>>>> writing documentation for the public api > additions. > > >> > >> > > > >>>>>>>> > > >> > >> > > > >>>>>>>> One thing I was curious about is during the > > >> > >> > > > >>>>>>> KCogroupedStreamImpl#aggregate > > >> > >> > > > >>>>>>>> method I pass null to the KGroupedStream# > > >> > >> > repartitionIfRequired > > >> > >> > > > >>>>> method. > > >> > >> > > > >>>>>> I > > >> > >> > > > >>>>>>>> can't supply the store name because if more than > one > > >> > >> grouped > > >> > >> > > > stream > > >> > >> > > > >>>>>>>> repartitions an error is thrown. Is there some > name > > >> that > > >> > >> > someone > > >> > >> > > > >>>> can > > >> > >> > > > >>>>>>>> recommend or should I leave the null and allow it > to > > >> fall > > >> > >> back > > >> > >> > > to > > >> > >> > > > >>>> the > > >> > >> > > > >>>>>>>> KGroupedStream.name? > > >> > >> > > > >>>>>>>> > > >> > >> > > > >>>>>>>> Should this be expanded to handle grouped tables? > > This > > >> > >> would > > >> > >> > be > > >> > >> > > > >>>>> pretty > > >> > >> > > > >>>>>>> easy > > >> > >> > > > >>>>>>>> for a normal aggregate but one allowing session > > stores > > >> > and > > >> > >> > > > windowed > > >> > >> > > > >>>>>>> stores > > >> > >> > > > >>>>>>>> would required KTableSessionWindowAggregate and > > >> > >> > > > >>>> KTableWindowAggregate > > >> > >> > > > >>>>>>>> implementations. > > >> > >> > > > >>>>>>>> > > >> > >> > > > >>>>>>>> Thanks, > > >> > >> > > > >>>>>>>> Kyle > > >> > >> > > > >>>>>>>> > > >> > >> > > > >>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" < > > >> > >> > eno.there...@gmail.com> > > >> > >> > > > >>>>> wrote: > > >> > >> > > > >>>>>>>> > > >> > >> > > > >>>>>>>>> I’ll look as well asap, sorry, been swamped. > > >> > >> > > > >>>>>>>>> > > >> > >> > > > >>>>>>>>> Eno > > >> > >> > > > >>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy < > > >> > >> > damian....@gmail.com> > > >> > >> > > > >>>>> wrote: > > >> > >> > > > >>>>>>>>>> > > >> > >> > > > >>>>>>>>>> Hi Kyle, > > >> > >> > > > >>>>>>>>>> > > >> > >> > > > >>>>>>>>>> Thanks for the KIP. I apologize that i haven't > had > > >> the > > >> > >> > chance > > >> > >> > > to > > >> > >> > > > >>>>> look > > >> > >> > > > >>>>>>> at > > >> > >> > > > >>>>>>>>>> the KIP yet, but will schedule some time to look > > >> into > > >> > it > > >> > >> > > > >>>> tomorrow. > > >> > >> > > > >>>>>> For > > >> > >> > > > >>>>>>>>> the > > >> > >> > > > >>>>>>>>>> implementation, can you raise a PR against kafka > > >> trunk > > >> > >> and > > >> > >> > > mark > > >> > >> > > > >>>> it > > >> > >> > > > >>>>> as > > >> > >> > > > >>>>>>>>> WIP? > > >> > >> > > > >>>>>>>>>> It will be easier to review what you have done. > > >> > >> > > > >>>>>>>>>> > > >> > >> > > > >>>>>>>>>> Thanks, > > >> > >> > > > >>>>>>>>>> Damian > > >> > >> > > > >>>>>>>>>> > > >> > >> > > > >>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman < > > >> > >> > > > >>>>> winkelman.k...@gmail.com > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>>>> wrote: > > >> > >> > > > >>>>>>>>>> > > >> > >> > > > >>>>>>>>>>> I am replying to this in hopes it will draw > some > > >> > >> attention > > >> > >> > to > > >> > >> > > > my > > >> > >> > > > >>>>> KIP > > >> > >> > > > >>>>>>> as > > >> > >> > > > >>>>>>>>> I > > >> > >> > > > >>>>>>>>>>> haven't heard from anyone in a couple days. > This > > >> is my > > >> > >> > first > > >> > >> > > > KIP > > >> > >> > > > >>>>> and > > >> > >> > > > >>>>>>> my > > >> > >> > > > >>>>>>>>>>> first large contribution to the project so I'm > > >> sure I > > >> > >> did > > >> > >> > > > >>>>> something > > >> > >> > > > >>>>>>>>> wrong. > > >> > >> > > > >>>>>>>>>>> ;) > > >> > >> > > > >>>>>>>>>>> > > >> > >> > > > >>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" < > > >> > >> > > > >>>>> winkelman.k...@gmail.com> > > >> > >> > > > >>>>>>>>> wrote: > > >> > >> > > > >>>>>>>>>>> > > >> > >> > > > >>>>>>>>>>>> Hello all, > > >> > >> > > > >>>>>>>>>>>> > > >> > >> > > > >>>>>>>>>>>> I have created KIP-150 to facilitate > discussion > > >> about > > >> > >> > adding > > >> > >> > > > >>>>>> cogroup > > >> > >> > > > >>>>>>> to > > >> > >> > > > >>>>>>>>>>>> the streams DSL. > > >> > >> > > > >>>>>>>>>>>> > > >> > >> > > > >>>>>>>>>>>> Please find the KIP here: > > >> > >> > > > >>>>>>>>>>>> https://cwiki.apache.org/ > > >> > confluence/display/KAFKA/KIP- > > >> > >> > > > >>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup > > >> > >> > > > >>>>>>>>>>>> > > >> > >> > > > >>>>>>>>>>>> Please find my initial implementation here: > > >> > >> > > > >>>>>>>>>>>> https://github.com/KyleWinkelman/kafka > > >> > >> > > > >>>>>>>>>>>> > > >> > >> > > > >>>>>>>>>>>> Thanks, > > >> > >> > > > >>>>>>>>>>>> Kyle Winkelman > > >> > >> > > > >>>>>>>>>>>> > > >> > >> > > > >>>>>>>>>>> > > >> > >> > > > >>>>>>>>> > > >> > >> > > > >>>>>>>>> > > >> > >> > > > >>>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>> > > >> > >> > > > >>>>> > > >> > >> > > > >>>> > > >> > >> > > > >> > > >> > >> > > > >> > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > >> > >> > > > >> > >> > > >> > > > > >> > > > > >> > > > > >> > > -- > > >> > > -- Guozhang > > >> > > > > >> > > > >> > > > >> > > > >> > -- > > >> > -- Guozhang > > >> > > > >> > > >> > > >> > > >> -- > > >> -- Guozhang > > >> > > > > > >