I sure can. I have added the following description to my KIP. If this doesn't help let me know and I will take some more time to build a diagram and make more of a step by step description:
Example with Current API: KTable<K, V1> table1 = builder.stream("topic1").groupByKey().aggregate(initializer1, aggregator1, aggValueSerde1, storeName1); KTable<K, V2> table2 = builder.stream("topic2").groupByKey().aggregate(initializer2, aggregator2, aggValueSerde2, storeName2); KTable<K, V3> table3 = builder.stream("topic3").groupByKey().aggregate(initializer3, aggregator3, aggValueSerde3, storeName3); KTable<K, CG> cogrouped = table1.outerJoin(table2, joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree); As you can see this creates 3 StateStores, requires 3 initializers, and 3 aggValueSerdes. This also adds the pressure to user to define what the intermediate values are going to be (V1, V2, V3). They are left with a couple choices, first to make V1, V2, and V3 all the same as CG and the two joiners are more like mergers, or second make them intermediate states such as Topic1Map, Topic2Map, and Topic3Map and the joiners use those to build the final aggregate CG value. This is something the user could avoid thinking about with this KIP. When a new input arrives lets say at "topic1" it will first go through a KStreamAggregate grabbing the current aggregate from storeName1. It will produce this in the form of the first intermediate value and get sent through a KTableKTableOuterJoin where it will look up the current value of the key in storeName2. It will use the first joiner to calculate the second intermediate value, which will go through an additional KTableKTableOuterJoin. Here it will look up the current value of the key in storeName3 and use the second joiner to build the final aggregate value. If you think through all possibilities for incoming topics you will see that no matter which topic it comes in through all three stores are queried and all of the joiners must get used. Topology wise for N incoming streams this creates N KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1 KTableKTableJoinMergers. Example with Proposed API: KGroupedStream<K, V1> grouped1 = builder.stream("topic1").groupByKey(); KGroupedStream<K, V2> grouped2 = builder.stream("topic2").groupByKey(); KGroupedStream<K, V3> grouped3 = builder.stream("topic3").groupByKey(); KTable<K, CG> cogrouped = grouped1.cogroup(initializer1, aggregator1, aggValueSerde1, storeName1) .cogroup(grouped2, aggregator2) .cogroup(grouped3, aggregator3) .aggregate(); As you can see this creates 1 StateStore, requires 1 initializer, and 1 aggValueSerde. The user no longer has to worry about the intermediate values and the joiners. All they have to think about is how each stream impacts the creation of the final CG object. When a new input arrives lets say at "topic1" it will first go through a KStreamAggreagte and grab the current aggregate from storeName1. It will add its incoming object to the aggregate, update the store and pass the new aggregate on. This new aggregate goes through the KStreamCogroup which is pretty much just a pass through processor and you are done. Topology wise for N incoming streams the new api will only every create N KStreamAggregates and 1 KStreamCogroup. On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax <matth...@confluent.io> wrote: > Kyle, > > thanks a lot for the KIP. Maybe I am a little slow, but I could not > follow completely. Could you maybe add a more concrete example, like 3 > streams with 3 records each (plus expected result), and show the > difference between current way to to implement it and the proposed API? > This could also cover the internal processing to see what store calls > would be required for both approaches etc. > > I think, it's pretty advanced stuff you propose, and it would help to > understand it better. > > Thanks a lot! > > > -Matthias > > > > On 5/4/17 11:39 AM, Kyle Winkelman wrote: > > I have made a pull request. It can be found here. > > > > https://github.com/apache/kafka/pull/2975 > > > > I plan to write some more unit tests for my classes and get around to > > writing documentation for the public api additions. > > > > One thing I was curious about is during the > KCogroupedStreamImpl#aggregate > > method I pass null to the KGroupedStream#repartitionIfRequired method. I > > can't supply the store name because if more than one grouped stream > > repartitions an error is thrown. Is there some name that someone can > > recommend or should I leave the null and allow it to fall back to the > > KGroupedStream.name? > > > > Should this be expanded to handle grouped tables? This would be pretty > easy > > for a normal aggregate but one allowing session stores and windowed > stores > > would required KTableSessionWindowAggregate and KTableWindowAggregate > > implementations. > > > > Thanks, > > Kyle > > > > On May 4, 2017 1:24 PM, "Eno Thereska" <eno.there...@gmail.com> wrote: > > > >> I’ll look as well asap, sorry, been swamped. > >> > >> Eno > >>> On May 4, 2017, at 6:17 PM, Damian Guy <damian....@gmail.com> wrote: > >>> > >>> Hi Kyle, > >>> > >>> Thanks for the KIP. I apologize that i haven't had the chance to look > at > >>> the KIP yet, but will schedule some time to look into it tomorrow. For > >> the > >>> implementation, can you raise a PR against kafka trunk and mark it as > >> WIP? > >>> It will be easier to review what you have done. > >>> > >>> Thanks, > >>> Damian > >>> > >>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman <winkelman.k...@gmail.com> > >> wrote: > >>> > >>>> I am replying to this in hopes it will draw some attention to my KIP > as > >> I > >>>> haven't heard from anyone in a couple days. This is my first KIP and > my > >>>> first large contribution to the project so I'm sure I did something > >> wrong. > >>>> ;) > >>>> > >>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" <winkelman.k...@gmail.com> > >> wrote: > >>>> > >>>>> Hello all, > >>>>> > >>>>> I have created KIP-150 to facilitate discussion about adding cogroup > to > >>>>> the streams DSL. > >>>>> > >>>>> Please find the KIP here: > >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>> 150+-+Kafka-Streams+Cogroup > >>>>> > >>>>> Please find my initial implementation here: > >>>>> https://github.com/KyleWinkelman/kafka > >>>>> > >>>>> Thanks, > >>>>> Kyle Winkelman > >>>>> > >>>> > >> > >> > > > >