Damian Guy, could you let me know if you plan to review this further? There is no rush, but if you dont have any additional comments I could start the voting and finish my WIP PR.
Thanks, Kyle On May 9, 2017 11:07 AM, "Kyle Winkelman" <winkelman.k...@gmail.com> wrote: > Eno, is there anyone else that is an expert in the kafka streams realm > that I should reach out to for input? > > I believe Damian Guy is still planning on reviewing this more in depth so > I will wait for his inputs before continuing. > > On May 9, 2017 7:30 AM, "Eno Thereska" <eno.there...@gmail.com> wrote: > >> Thanks Kyle, good arguments. >> >> Eno >> >> > On May 7, 2017, at 5:06 PM, Kyle Winkelman <winkelman.k...@gmail.com> >> wrote: >> > >> > *- minor: could you add an exact example (similar to what Jay’s example >> is, >> > or like your Spark/Pig pointers had) to make this super concrete?* >> > I have added a more concrete example to the KIP. >> > >> > *- my main concern is that we’re exposing this optimization to the DSL. >> In >> > an ideal world, an optimizer would take the existing DSL and do the >> right >> > thing under the covers (create just one state store, arrange the nodes >> > etc). The original DSL had a bunch of small, composable pieces (group, >> > aggregate, join) that this proposal groups together. I’d like to hear >> your >> > thoughts on whether it’s possible to do this optimization with the >> current >> > DSL, at the topology builder level.* >> > You would have to make a lot of checks to understand if it is even >> possible >> > to make this optimization: >> > 1. Make sure they are all KTableKTableOuterJoins >> > 2. None of the intermediate KTables are used for anything else. >> > 3. None of the intermediate stores are used. (This may be impossible >> > especially if they use KafkaStreams#store after the topology has already >> > been built.) >> > You would then need to make decisions during the optimization: >> > 1. Your new initializer would the composite of all the individual >> > initializers and the valueJoiners. >> > 2. I am having a hard time thinking about how you would turn the >> > aggregators and valueJoiners into an aggregator that would work on the >> > final object, but this may be possible. >> > 3. Which state store would you use? The ones declared would be for the >> > aggregate values. None of the declared ones would be guaranteed to hold >> the >> > final object. This would mean you must created a new state store and not >> > created any of the declared ones. >> > >> > The main argument I have against it is even if it could be done I don't >> > know that we would want to have this be an optimization in the >> background >> > because the user would still be required to think about all of the >> > intermediate values that they shouldn't need to worry about if they only >> > care about the final object. >> > >> > In my opinion cogroup is a common enough case that it should be part of >> the >> > composable pieces (group, aggregate, join) because we want to allow >> people >> > to join more than 2 or more streams in an easy way. Right now I don't >> think >> > we give them ways of handling this use case easily. >> > >> > *-I think there will be scope for several such optimizations in the >> future >> > and perhaps at some point we need to think about decoupling the 1:1 >> mapping >> > from the DSL into the physical topology.* >> > I would argue that cogroup is not just an optimization it is a new way >> for >> > the users to look at accomplishing a problem that requires multiple >> > streams. I may sound like a broken record but I don't think users should >> > have to build the N-1 intermediate tables and deal with their >> initializers, >> > serdes and stores if all they care about is the final object. >> > Now if for example someone uses cogroup but doesn't supply additional >> > streams and aggregators this case is equivalent to a single grouped >> stream >> > making an aggregate call. This case is what I view an optimization as, >> we >> > could remove the KStreamCogroup and act as if there was just a call to >> > KGroupedStream#aggregate instead of calling KGroupedStream#cogroup. (I >> > would prefer to just write a warning saying that this is not how >> cogroup is >> > to be used.) >> > >> > Thanks, >> > Kyle >> > >> > On Sun, May 7, 2017 at 5:41 AM, Eno Thereska <eno.there...@gmail.com> >> wrote: >> > >> >> Hi Kyle, >> >> >> >> Thanks for the KIP again. A couple of comments: >> >> >> >> - minor: could you add an exact example (similar to what Jay’s example >> is, >> >> or like your Spark/Pig pointers had) to make this super concrete? >> >> >> >> - my main concern is that we’re exposing this optimization to the DSL. >> In >> >> an ideal world, an optimizer would take the existing DSL and do the >> right >> >> thing under the covers (create just one state store, arrange the nodes >> >> etc). The original DSL had a bunch of small, composable pieces (group, >> >> aggregate, join) that this proposal groups together. I’d like to hear >> your >> >> thoughts on whether it’s possible to do this optimization with the >> current >> >> DSL, at the topology builder level. >> >> >> >> I think there will be scope for several such optimizations in the >> future >> >> and perhaps at some point we need to think about decoupling the 1:1 >> mapping >> >> from the DSL into the physical topology. >> >> >> >> Thanks >> >> Eno >> >> >> >>> On May 5, 2017, at 4:39 PM, Jay Kreps <j...@confluent.io> wrote: >> >>> >> >>> I haven't digested the proposal but the use case is pretty common. An >> >>> example would be the "customer 360" or "unified customer profile" use >> >> case >> >>> we often use. In that use case you have a dozen systems each of which >> has >> >>> some information about your customer (account details, settings, >> billing >> >>> info, customer service contacts, purchase history, etc). Your goal is >> to >> >>> join/munge these into a single profile record for each customer that >> has >> >>> all the relevant info in a usable form and is up-to-date with all the >> >>> source systems. If you implement that with kstreams as a sequence of >> >> joins >> >>> i think today we'd fully materialize N-1 intermediate tables. But >> clearly >> >>> you only need a single stage to group all these things that are >> already >> >>> co-partitioned. A distributed database would do this under the covers >> >> which >> >>> is arguably better (at least when it does the right thing) and >> perhaps we >> >>> could do the same thing but I'm not sure we know the partitioning so >> we >> >> may >> >>> need an explicit cogroup command that impllies they are already >> >>> co-partitioned. >> >>> >> >>> -Jay >> >>> >> >>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman < >> winkelman.k...@gmail.com >> >>> >> >>> wrote: >> >>> >> >>>> Yea thats a good way to look at it. >> >>>> I have seen this type of functionality in a couple other platforms >> like >> >>>> spark and pig. >> >>>> https://spark.apache.org/docs/0.6.2/api/core/spark/ >> >> PairRDDFunctions.html >> >>>> https://www.tutorialspoint.com/apache_pig/apache_pig_ >> >> cogroup_operator.htm >> >>>> >> >>>> >> >>>> On May 5, 2017 7:43 AM, "Damian Guy" <damian....@gmail.com> wrote: >> >>>> >> >>>>> Hi Kyle, >> >>>>> >> >>>>> If i'm reading this correctly it is like an N way outer join? So an >> >> input >> >>>>> on any stream will always produce a new aggregated value - is that >> >>>> correct? >> >>>>> Effectively, each Aggregator just looks up the current value, >> >> aggregates >> >>>>> and forwards the result. >> >>>>> I need to look into it and think about it a bit more, but it seems >> like >> >>>> it >> >>>>> could be a useful optimization. >> >>>>> >> >>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman < >> winkelman.k...@gmail.com> >> >>>>> wrote: >> >>>>> >> >>>>>> I sure can. I have added the following description to my KIP. If >> this >> >>>>>> doesn't help let me know and I will take some more time to build a >> >>>>> diagram >> >>>>>> and make more of a step by step description: >> >>>>>> >> >>>>>> Example with Current API: >> >>>>>> >> >>>>>> KTable<K, V1> table1 = >> >>>>>> builder.stream("topic1").groupByKey().aggregate(initializer1, >> >>>>> aggregator1, >> >>>>>> aggValueSerde1, storeName1); >> >>>>>> KTable<K, V2> table2 = >> >>>>>> builder.stream("topic2").groupByKey().aggregate(initializer2, >> >>>>> aggregator2, >> >>>>>> aggValueSerde2, storeName2); >> >>>>>> KTable<K, V3> table3 = >> >>>>>> builder.stream("topic3").groupByKey().aggregate(initializer3, >> >>>>> aggregator3, >> >>>>>> aggValueSerde3, storeName3); >> >>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2, >> >>>>>> joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree); >> >>>>>> >> >>>>>> As you can see this creates 3 StateStores, requires 3 initializers, >> >>>> and 3 >> >>>>>> aggValueSerdes. This also adds the pressure to user to define what >> the >> >>>>>> intermediate values are going to be (V1, V2, V3). They are left >> with a >> >>>>>> couple choices, first to make V1, V2, and V3 all the same as CG and >> >> the >> >>>>> two >> >>>>>> joiners are more like mergers, or second make them intermediate >> states >> >>>>> such >> >>>>>> as Topic1Map, Topic2Map, and Topic3Map and the joiners use those to >> >>>> build >> >>>>>> the final aggregate CG value. This is something the user could >> avoid >> >>>>>> thinking about with this KIP. >> >>>>>> >> >>>>>> When a new input arrives lets say at "topic1" it will first go >> through >> >>>> a >> >>>>>> KStreamAggregate grabbing the current aggregate from storeName1. It >> >>>> will >> >>>>>> produce this in the form of the first intermediate value and get >> sent >> >>>>>> through a KTableKTableOuterJoin where it will look up the current >> >> value >> >>>>> of >> >>>>>> the key in storeName2. It will use the first joiner to calculate >> the >> >>>>> second >> >>>>>> intermediate value, which will go through an additional >> >>>>>> KTableKTableOuterJoin. Here it will look up the current value of >> the >> >>>> key >> >>>>> in >> >>>>>> storeName3 and use the second joiner to build the final aggregate >> >>>> value. >> >>>>>> >> >>>>>> If you think through all possibilities for incoming topics you will >> >> see >> >>>>>> that no matter which topic it comes in through all three stores are >> >>>>> queried >> >>>>>> and all of the joiners must get used. >> >>>>>> >> >>>>>> Topology wise for N incoming streams this creates N >> >>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1 >> >>>>>> KTableKTableJoinMergers. >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> Example with Proposed API: >> >>>>>> >> >>>>>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1"). >> >>>> groupByKey(); >> >>>>>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2"). >> >>>> groupByKey(); >> >>>>>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3"). >> >>>> groupByKey(); >> >>>>>> KTable<K, CG> cogrouped = grouped1.cogroup(initializer1, >> aggregator1, >> >>>>>> aggValueSerde1, storeName1) >> >>>>>> .cogroup(grouped2, aggregator2) >> >>>>>> .cogroup(grouped3, aggregator3) >> >>>>>> .aggregate(); >> >>>>>> >> >>>>>> As you can see this creates 1 StateStore, requires 1 initializer, >> and >> >> 1 >> >>>>>> aggValueSerde. The user no longer has to worry about the >> intermediate >> >>>>>> values and the joiners. All they have to think about is how each >> >> stream >> >>>>>> impacts the creation of the final CG object. >> >>>>>> >> >>>>>> When a new input arrives lets say at "topic1" it will first go >> through >> >>>> a >> >>>>>> KStreamAggreagte and grab the current aggregate from storeName1. It >> >>>> will >> >>>>>> add its incoming object to the aggregate, update the store and pass >> >> the >> >>>>> new >> >>>>>> aggregate on. This new aggregate goes through the KStreamCogroup >> which >> >>>> is >> >>>>>> pretty much just a pass through processor and you are done. >> >>>>>> >> >>>>>> Topology wise for N incoming streams the new api will only every >> >>>> create N >> >>>>>> KStreamAggregates and 1 KStreamCogroup. >> >>>>>> >> >>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax < >> >> matth...@confluent.io >> >>>>> >> >>>>>> wrote: >> >>>>>> >> >>>>>>> Kyle, >> >>>>>>> >> >>>>>>> thanks a lot for the KIP. Maybe I am a little slow, but I could >> not >> >>>>>>> follow completely. Could you maybe add a more concrete example, >> like >> >>>> 3 >> >>>>>>> streams with 3 records each (plus expected result), and show the >> >>>>>>> difference between current way to to implement it and the proposed >> >>>> API? >> >>>>>>> This could also cover the internal processing to see what store >> calls >> >>>>>>> would be required for both approaches etc. >> >>>>>>> >> >>>>>>> I think, it's pretty advanced stuff you propose, and it would >> help to >> >>>>>>> understand it better. >> >>>>>>> >> >>>>>>> Thanks a lot! >> >>>>>>> >> >>>>>>> >> >>>>>>> -Matthias >> >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote: >> >>>>>>>> I have made a pull request. It can be found here. >> >>>>>>>> >> >>>>>>>> https://github.com/apache/kafka/pull/2975 >> >>>>>>>> >> >>>>>>>> I plan to write some more unit tests for my classes and get >> around >> >>>> to >> >>>>>>>> writing documentation for the public api additions. >> >>>>>>>> >> >>>>>>>> One thing I was curious about is during the >> >>>>>>> KCogroupedStreamImpl#aggregate >> >>>>>>>> method I pass null to the KGroupedStream#repartitionIfRequired >> >>>>> method. >> >>>>>> I >> >>>>>>>> can't supply the store name because if more than one grouped >> stream >> >>>>>>>> repartitions an error is thrown. Is there some name that someone >> >>>> can >> >>>>>>>> recommend or should I leave the null and allow it to fall back to >> >>>> the >> >>>>>>>> KGroupedStream.name? >> >>>>>>>> >> >>>>>>>> Should this be expanded to handle grouped tables? This would be >> >>>>> pretty >> >>>>>>> easy >> >>>>>>>> for a normal aggregate but one allowing session stores and >> windowed >> >>>>>>> stores >> >>>>>>>> would required KTableSessionWindowAggregate and >> >>>> KTableWindowAggregate >> >>>>>>>> implementations. >> >>>>>>>> >> >>>>>>>> Thanks, >> >>>>>>>> Kyle >> >>>>>>>> >> >>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" <eno.there...@gmail.com> >> >>>>> wrote: >> >>>>>>>> >> >>>>>>>>> I’ll look as well asap, sorry, been swamped. >> >>>>>>>>> >> >>>>>>>>> Eno >> >>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy <damian....@gmail.com> >> >>>>> wrote: >> >>>>>>>>>> >> >>>>>>>>>> Hi Kyle, >> >>>>>>>>>> >> >>>>>>>>>> Thanks for the KIP. I apologize that i haven't had the chance >> to >> >>>>> look >> >>>>>>> at >> >>>>>>>>>> the KIP yet, but will schedule some time to look into it >> >>>> tomorrow. >> >>>>>> For >> >>>>>>>>> the >> >>>>>>>>>> implementation, can you raise a PR against kafka trunk and mark >> >>>> it >> >>>>> as >> >>>>>>>>> WIP? >> >>>>>>>>>> It will be easier to review what you have done. >> >>>>>>>>>> >> >>>>>>>>>> Thanks, >> >>>>>>>>>> Damian >> >>>>>>>>>> >> >>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman < >> >>>>> winkelman.k...@gmail.com >> >>>>>>> >> >>>>>>>>> wrote: >> >>>>>>>>>> >> >>>>>>>>>>> I am replying to this in hopes it will draw some attention to >> my >> >>>>> KIP >> >>>>>>> as >> >>>>>>>>> I >> >>>>>>>>>>> haven't heard from anyone in a couple days. This is my first >> KIP >> >>>>> and >> >>>>>>> my >> >>>>>>>>>>> first large contribution to the project so I'm sure I did >> >>>>> something >> >>>>>>>>> wrong. >> >>>>>>>>>>> ;) >> >>>>>>>>>>> >> >>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" < >> >>>>> winkelman.k...@gmail.com> >> >>>>>>>>> wrote: >> >>>>>>>>>>> >> >>>>>>>>>>>> Hello all, >> >>>>>>>>>>>> >> >>>>>>>>>>>> I have created KIP-150 to facilitate discussion about adding >> >>>>>> cogroup >> >>>>>>> to >> >>>>>>>>>>>> the streams DSL. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Please find the KIP here: >> >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> >>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup >> >>>>>>>>>>>> >> >>>>>>>>>>>> Please find my initial implementation here: >> >>>>>>>>>>>> https://github.com/KyleWinkelman/kafka >> >>>>>>>>>>>> >> >>>>>>>>>>>> Thanks, >> >>>>>>>>>>>> Kyle Winkelman >> >>>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>> >> >>>>> >> >>>> >> >> >> >> >> >>