Yea thats a good way to look at it. I have seen this type of functionality in a couple other platforms like spark and pig. https://spark.apache.org/docs/0.6.2/api/core/spark/PairRDDFunctions.html https://www.tutorialspoint.com/apache_pig/apache_pig_cogroup_operator.htm
On May 5, 2017 7:43 AM, "Damian Guy" <damian....@gmail.com> wrote: > Hi Kyle, > > If i'm reading this correctly it is like an N way outer join? So an input > on any stream will always produce a new aggregated value - is that correct? > Effectively, each Aggregator just looks up the current value, aggregates > and forwards the result. > I need to look into it and think about it a bit more, but it seems like it > could be a useful optimization. > > On Thu, 4 May 2017 at 23:21 Kyle Winkelman <winkelman.k...@gmail.com> > wrote: > > > I sure can. I have added the following description to my KIP. If this > > doesn't help let me know and I will take some more time to build a > diagram > > and make more of a step by step description: > > > > Example with Current API: > > > > KTable<K, V1> table1 = > > builder.stream("topic1").groupByKey().aggregate(initializer1, > aggregator1, > > aggValueSerde1, storeName1); > > KTable<K, V2> table2 = > > builder.stream("topic2").groupByKey().aggregate(initializer2, > aggregator2, > > aggValueSerde2, storeName2); > > KTable<K, V3> table3 = > > builder.stream("topic3").groupByKey().aggregate(initializer3, > aggregator3, > > aggValueSerde3, storeName3); > > KTable<K, CG> cogrouped = table1.outerJoin(table2, > > joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree); > > > > As you can see this creates 3 StateStores, requires 3 initializers, and 3 > > aggValueSerdes. This also adds the pressure to user to define what the > > intermediate values are going to be (V1, V2, V3). They are left with a > > couple choices, first to make V1, V2, and V3 all the same as CG and the > two > > joiners are more like mergers, or second make them intermediate states > such > > as Topic1Map, Topic2Map, and Topic3Map and the joiners use those to build > > the final aggregate CG value. This is something the user could avoid > > thinking about with this KIP. > > > > When a new input arrives lets say at "topic1" it will first go through a > > KStreamAggregate grabbing the current aggregate from storeName1. It will > > produce this in the form of the first intermediate value and get sent > > through a KTableKTableOuterJoin where it will look up the current value > of > > the key in storeName2. It will use the first joiner to calculate the > second > > intermediate value, which will go through an additional > > KTableKTableOuterJoin. Here it will look up the current value of the key > in > > storeName3 and use the second joiner to build the final aggregate value. > > > > If you think through all possibilities for incoming topics you will see > > that no matter which topic it comes in through all three stores are > queried > > and all of the joiners must get used. > > > > Topology wise for N incoming streams this creates N > > KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1 > > KTableKTableJoinMergers. > > > > > > > > Example with Proposed API: > > > > KGroupedStream<K, V1> grouped1 = builder.stream("topic1").groupByKey(); > > KGroupedStream<K, V2> grouped2 = builder.stream("topic2").groupByKey(); > > KGroupedStream<K, V3> grouped3 = builder.stream("topic3").groupByKey(); > > KTable<K, CG> cogrouped = grouped1.cogroup(initializer1, aggregator1, > > aggValueSerde1, storeName1) > > .cogroup(grouped2, aggregator2) > > .cogroup(grouped3, aggregator3) > > .aggregate(); > > > > As you can see this creates 1 StateStore, requires 1 initializer, and 1 > > aggValueSerde. The user no longer has to worry about the intermediate > > values and the joiners. All they have to think about is how each stream > > impacts the creation of the final CG object. > > > > When a new input arrives lets say at "topic1" it will first go through a > > KStreamAggreagte and grab the current aggregate from storeName1. It will > > add its incoming object to the aggregate, update the store and pass the > new > > aggregate on. This new aggregate goes through the KStreamCogroup which is > > pretty much just a pass through processor and you are done. > > > > Topology wise for N incoming streams the new api will only every create N > > KStreamAggregates and 1 KStreamCogroup. > > > > On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > Kyle, > > > > > > thanks a lot for the KIP. Maybe I am a little slow, but I could not > > > follow completely. Could you maybe add a more concrete example, like 3 > > > streams with 3 records each (plus expected result), and show the > > > difference between current way to to implement it and the proposed API? > > > This could also cover the internal processing to see what store calls > > > would be required for both approaches etc. > > > > > > I think, it's pretty advanced stuff you propose, and it would help to > > > understand it better. > > > > > > Thanks a lot! > > > > > > > > > -Matthias > > > > > > > > > > > > On 5/4/17 11:39 AM, Kyle Winkelman wrote: > > > > I have made a pull request. It can be found here. > > > > > > > > https://github.com/apache/kafka/pull/2975 > > > > > > > > I plan to write some more unit tests for my classes and get around to > > > > writing documentation for the public api additions. > > > > > > > > One thing I was curious about is during the > > > KCogroupedStreamImpl#aggregate > > > > method I pass null to the KGroupedStream#repartitionIfRequired > method. > > I > > > > can't supply the store name because if more than one grouped stream > > > > repartitions an error is thrown. Is there some name that someone can > > > > recommend or should I leave the null and allow it to fall back to the > > > > KGroupedStream.name? > > > > > > > > Should this be expanded to handle grouped tables? This would be > pretty > > > easy > > > > for a normal aggregate but one allowing session stores and windowed > > > stores > > > > would required KTableSessionWindowAggregate and KTableWindowAggregate > > > > implementations. > > > > > > > > Thanks, > > > > Kyle > > > > > > > > On May 4, 2017 1:24 PM, "Eno Thereska" <eno.there...@gmail.com> > wrote: > > > > > > > >> I’ll look as well asap, sorry, been swamped. > > > >> > > > >> Eno > > > >>> On May 4, 2017, at 6:17 PM, Damian Guy <damian....@gmail.com> > wrote: > > > >>> > > > >>> Hi Kyle, > > > >>> > > > >>> Thanks for the KIP. I apologize that i haven't had the chance to > look > > > at > > > >>> the KIP yet, but will schedule some time to look into it tomorrow. > > For > > > >> the > > > >>> implementation, can you raise a PR against kafka trunk and mark it > as > > > >> WIP? > > > >>> It will be easier to review what you have done. > > > >>> > > > >>> Thanks, > > > >>> Damian > > > >>> > > > >>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman < > winkelman.k...@gmail.com > > > > > > >> wrote: > > > >>> > > > >>>> I am replying to this in hopes it will draw some attention to my > KIP > > > as > > > >> I > > > >>>> haven't heard from anyone in a couple days. This is my first KIP > and > > > my > > > >>>> first large contribution to the project so I'm sure I did > something > > > >> wrong. > > > >>>> ;) > > > >>>> > > > >>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" < > winkelman.k...@gmail.com> > > > >> wrote: > > > >>>> > > > >>>>> Hello all, > > > >>>>> > > > >>>>> I have created KIP-150 to facilitate discussion about adding > > cogroup > > > to > > > >>>>> the streams DSL. > > > >>>>> > > > >>>>> Please find the KIP here: > > > >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > >>>>> 150+-+Kafka-Streams+Cogroup > > > >>>>> > > > >>>>> Please find my initial implementation here: > > > >>>>> https://github.com/KyleWinkelman/kafka > > > >>>>> > > > >>>>> Thanks, > > > >>>>> Kyle Winkelman > > > >>>>> > > > >>>> > > > >> > > > >> > > > > > > > > > > > > >