Hi Kyle, Thanks for the KIP again. A couple of comments:
- minor: could you add an exact example (similar to what Jay’s example is, or like your Spark/Pig pointers had) to make this super concrete? - my main concern is that we’re exposing this optimization to the DSL. In an ideal world, an optimizer would take the existing DSL and do the right thing under the covers (create just one state store, arrange the nodes etc). The original DSL had a bunch of small, composable pieces (group, aggregate, join) that this proposal groups together. I’d like to hear your thoughts on whether it’s possible to do this optimization with the current DSL, at the topology builder level. I think there will be scope for several such optimizations in the future and perhaps at some point we need to think about decoupling the 1:1 mapping from the DSL into the physical topology. Thanks Eno > On May 5, 2017, at 4:39 PM, Jay Kreps <j...@confluent.io> wrote: > > I haven't digested the proposal but the use case is pretty common. An > example would be the "customer 360" or "unified customer profile" use case > we often use. In that use case you have a dozen systems each of which has > some information about your customer (account details, settings, billing > info, customer service contacts, purchase history, etc). Your goal is to > join/munge these into a single profile record for each customer that has > all the relevant info in a usable form and is up-to-date with all the > source systems. If you implement that with kstreams as a sequence of joins > i think today we'd fully materialize N-1 intermediate tables. But clearly > you only need a single stage to group all these things that are already > co-partitioned. A distributed database would do this under the covers which > is arguably better (at least when it does the right thing) and perhaps we > could do the same thing but I'm not sure we know the partitioning so we may > need an explicit cogroup command that impllies they are already > co-partitioned. > > -Jay > > On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman <winkelman.k...@gmail.com> > wrote: > >> Yea thats a good way to look at it. >> I have seen this type of functionality in a couple other platforms like >> spark and pig. >> https://spark.apache.org/docs/0.6.2/api/core/spark/PairRDDFunctions.html >> https://www.tutorialspoint.com/apache_pig/apache_pig_cogroup_operator.htm >> >> >> On May 5, 2017 7:43 AM, "Damian Guy" <damian....@gmail.com> wrote: >> >>> Hi Kyle, >>> >>> If i'm reading this correctly it is like an N way outer join? So an input >>> on any stream will always produce a new aggregated value - is that >> correct? >>> Effectively, each Aggregator just looks up the current value, aggregates >>> and forwards the result. >>> I need to look into it and think about it a bit more, but it seems like >> it >>> could be a useful optimization. >>> >>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman <winkelman.k...@gmail.com> >>> wrote: >>> >>>> I sure can. I have added the following description to my KIP. If this >>>> doesn't help let me know and I will take some more time to build a >>> diagram >>>> and make more of a step by step description: >>>> >>>> Example with Current API: >>>> >>>> KTable<K, V1> table1 = >>>> builder.stream("topic1").groupByKey().aggregate(initializer1, >>> aggregator1, >>>> aggValueSerde1, storeName1); >>>> KTable<K, V2> table2 = >>>> builder.stream("topic2").groupByKey().aggregate(initializer2, >>> aggregator2, >>>> aggValueSerde2, storeName2); >>>> KTable<K, V3> table3 = >>>> builder.stream("topic3").groupByKey().aggregate(initializer3, >>> aggregator3, >>>> aggValueSerde3, storeName3); >>>> KTable<K, CG> cogrouped = table1.outerJoin(table2, >>>> joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree); >>>> >>>> As you can see this creates 3 StateStores, requires 3 initializers, >> and 3 >>>> aggValueSerdes. This also adds the pressure to user to define what the >>>> intermediate values are going to be (V1, V2, V3). They are left with a >>>> couple choices, first to make V1, V2, and V3 all the same as CG and the >>> two >>>> joiners are more like mergers, or second make them intermediate states >>> such >>>> as Topic1Map, Topic2Map, and Topic3Map and the joiners use those to >> build >>>> the final aggregate CG value. This is something the user could avoid >>>> thinking about with this KIP. >>>> >>>> When a new input arrives lets say at "topic1" it will first go through >> a >>>> KStreamAggregate grabbing the current aggregate from storeName1. It >> will >>>> produce this in the form of the first intermediate value and get sent >>>> through a KTableKTableOuterJoin where it will look up the current value >>> of >>>> the key in storeName2. It will use the first joiner to calculate the >>> second >>>> intermediate value, which will go through an additional >>>> KTableKTableOuterJoin. Here it will look up the current value of the >> key >>> in >>>> storeName3 and use the second joiner to build the final aggregate >> value. >>>> >>>> If you think through all possibilities for incoming topics you will see >>>> that no matter which topic it comes in through all three stores are >>> queried >>>> and all of the joiners must get used. >>>> >>>> Topology wise for N incoming streams this creates N >>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1 >>>> KTableKTableJoinMergers. >>>> >>>> >>>> >>>> Example with Proposed API: >>>> >>>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1"). >> groupByKey(); >>>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2"). >> groupByKey(); >>>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3"). >> groupByKey(); >>>> KTable<K, CG> cogrouped = grouped1.cogroup(initializer1, aggregator1, >>>> aggValueSerde1, storeName1) >>>> .cogroup(grouped2, aggregator2) >>>> .cogroup(grouped3, aggregator3) >>>> .aggregate(); >>>> >>>> As you can see this creates 1 StateStore, requires 1 initializer, and 1 >>>> aggValueSerde. The user no longer has to worry about the intermediate >>>> values and the joiners. All they have to think about is how each stream >>>> impacts the creation of the final CG object. >>>> >>>> When a new input arrives lets say at "topic1" it will first go through >> a >>>> KStreamAggreagte and grab the current aggregate from storeName1. It >> will >>>> add its incoming object to the aggregate, update the store and pass the >>> new >>>> aggregate on. This new aggregate goes through the KStreamCogroup which >> is >>>> pretty much just a pass through processor and you are done. >>>> >>>> Topology wise for N incoming streams the new api will only every >> create N >>>> KStreamAggregates and 1 KStreamCogroup. >>>> >>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax <matth...@confluent.io >>> >>>> wrote: >>>> >>>>> Kyle, >>>>> >>>>> thanks a lot for the KIP. Maybe I am a little slow, but I could not >>>>> follow completely. Could you maybe add a more concrete example, like >> 3 >>>>> streams with 3 records each (plus expected result), and show the >>>>> difference between current way to to implement it and the proposed >> API? >>>>> This could also cover the internal processing to see what store calls >>>>> would be required for both approaches etc. >>>>> >>>>> I think, it's pretty advanced stuff you propose, and it would help to >>>>> understand it better. >>>>> >>>>> Thanks a lot! >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> >>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote: >>>>>> I have made a pull request. It can be found here. >>>>>> >>>>>> https://github.com/apache/kafka/pull/2975 >>>>>> >>>>>> I plan to write some more unit tests for my classes and get around >> to >>>>>> writing documentation for the public api additions. >>>>>> >>>>>> One thing I was curious about is during the >>>>> KCogroupedStreamImpl#aggregate >>>>>> method I pass null to the KGroupedStream#repartitionIfRequired >>> method. >>>> I >>>>>> can't supply the store name because if more than one grouped stream >>>>>> repartitions an error is thrown. Is there some name that someone >> can >>>>>> recommend or should I leave the null and allow it to fall back to >> the >>>>>> KGroupedStream.name? >>>>>> >>>>>> Should this be expanded to handle grouped tables? This would be >>> pretty >>>>> easy >>>>>> for a normal aggregate but one allowing session stores and windowed >>>>> stores >>>>>> would required KTableSessionWindowAggregate and >> KTableWindowAggregate >>>>>> implementations. >>>>>> >>>>>> Thanks, >>>>>> Kyle >>>>>> >>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" <eno.there...@gmail.com> >>> wrote: >>>>>> >>>>>>> I’ll look as well asap, sorry, been swamped. >>>>>>> >>>>>>> Eno >>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy <damian....@gmail.com> >>> wrote: >>>>>>>> >>>>>>>> Hi Kyle, >>>>>>>> >>>>>>>> Thanks for the KIP. I apologize that i haven't had the chance to >>> look >>>>> at >>>>>>>> the KIP yet, but will schedule some time to look into it >> tomorrow. >>>> For >>>>>>> the >>>>>>>> implementation, can you raise a PR against kafka trunk and mark >> it >>> as >>>>>>> WIP? >>>>>>>> It will be easier to review what you have done. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Damian >>>>>>>> >>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman < >>> winkelman.k...@gmail.com >>>>> >>>>>>> wrote: >>>>>>>> >>>>>>>>> I am replying to this in hopes it will draw some attention to my >>> KIP >>>>> as >>>>>>> I >>>>>>>>> haven't heard from anyone in a couple days. This is my first KIP >>> and >>>>> my >>>>>>>>> first large contribution to the project so I'm sure I did >>> something >>>>>>> wrong. >>>>>>>>> ;) >>>>>>>>> >>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" < >>> winkelman.k...@gmail.com> >>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hello all, >>>>>>>>>> >>>>>>>>>> I have created KIP-150 to facilitate discussion about adding >>>> cogroup >>>>> to >>>>>>>>>> the streams DSL. >>>>>>>>>> >>>>>>>>>> Please find the KIP here: >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>> 150+-+Kafka-Streams+Cogroup >>>>>>>>>> >>>>>>>>>> Please find my initial implementation here: >>>>>>>>>> https://github.com/KyleWinkelman/kafka >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Kyle Winkelman >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >>> >>