Hi Kyle, Sorry for the delay in reviews, tomorrow is feature freeze deadline (https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0 <https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0>) so people are busier than usual. Stay tuned.
Eno > On 15 May 2017, at 13:25, Kyle Winkelman <winkelman.k...@gmail.com> wrote: > > Damian Guy, could you let me know if you plan to review this further? There > is no rush, but if you dont have any additional comments I could start the > voting and finish my WIP PR. > > Thanks, > Kyle > > On May 9, 2017 11:07 AM, "Kyle Winkelman" <winkelman.k...@gmail.com> wrote: > >> Eno, is there anyone else that is an expert in the kafka streams realm >> that I should reach out to for input? >> >> I believe Damian Guy is still planning on reviewing this more in depth so >> I will wait for his inputs before continuing. >> >> On May 9, 2017 7:30 AM, "Eno Thereska" <eno.there...@gmail.com> wrote: >> >>> Thanks Kyle, good arguments. >>> >>> Eno >>> >>>> On May 7, 2017, at 5:06 PM, Kyle Winkelman <winkelman.k...@gmail.com> >>> wrote: >>>> >>>> *- minor: could you add an exact example (similar to what Jay’s example >>> is, >>>> or like your Spark/Pig pointers had) to make this super concrete?* >>>> I have added a more concrete example to the KIP. >>>> >>>> *- my main concern is that we’re exposing this optimization to the DSL. >>> In >>>> an ideal world, an optimizer would take the existing DSL and do the >>> right >>>> thing under the covers (create just one state store, arrange the nodes >>>> etc). The original DSL had a bunch of small, composable pieces (group, >>>> aggregate, join) that this proposal groups together. I’d like to hear >>> your >>>> thoughts on whether it’s possible to do this optimization with the >>> current >>>> DSL, at the topology builder level.* >>>> You would have to make a lot of checks to understand if it is even >>> possible >>>> to make this optimization: >>>> 1. Make sure they are all KTableKTableOuterJoins >>>> 2. None of the intermediate KTables are used for anything else. >>>> 3. None of the intermediate stores are used. (This may be impossible >>>> especially if they use KafkaStreams#store after the topology has already >>>> been built.) >>>> You would then need to make decisions during the optimization: >>>> 1. Your new initializer would the composite of all the individual >>>> initializers and the valueJoiners. >>>> 2. I am having a hard time thinking about how you would turn the >>>> aggregators and valueJoiners into an aggregator that would work on the >>>> final object, but this may be possible. >>>> 3. Which state store would you use? The ones declared would be for the >>>> aggregate values. None of the declared ones would be guaranteed to hold >>> the >>>> final object. This would mean you must created a new state store and not >>>> created any of the declared ones. >>>> >>>> The main argument I have against it is even if it could be done I don't >>>> know that we would want to have this be an optimization in the >>> background >>>> because the user would still be required to think about all of the >>>> intermediate values that they shouldn't need to worry about if they only >>>> care about the final object. >>>> >>>> In my opinion cogroup is a common enough case that it should be part of >>> the >>>> composable pieces (group, aggregate, join) because we want to allow >>> people >>>> to join more than 2 or more streams in an easy way. Right now I don't >>> think >>>> we give them ways of handling this use case easily. >>>> >>>> *-I think there will be scope for several such optimizations in the >>> future >>>> and perhaps at some point we need to think about decoupling the 1:1 >>> mapping >>>> from the DSL into the physical topology.* >>>> I would argue that cogroup is not just an optimization it is a new way >>> for >>>> the users to look at accomplishing a problem that requires multiple >>>> streams. I may sound like a broken record but I don't think users should >>>> have to build the N-1 intermediate tables and deal with their >>> initializers, >>>> serdes and stores if all they care about is the final object. >>>> Now if for example someone uses cogroup but doesn't supply additional >>>> streams and aggregators this case is equivalent to a single grouped >>> stream >>>> making an aggregate call. This case is what I view an optimization as, >>> we >>>> could remove the KStreamCogroup and act as if there was just a call to >>>> KGroupedStream#aggregate instead of calling KGroupedStream#cogroup. (I >>>> would prefer to just write a warning saying that this is not how >>> cogroup is >>>> to be used.) >>>> >>>> Thanks, >>>> Kyle >>>> >>>> On Sun, May 7, 2017 at 5:41 AM, Eno Thereska <eno.there...@gmail.com> >>> wrote: >>>> >>>>> Hi Kyle, >>>>> >>>>> Thanks for the KIP again. A couple of comments: >>>>> >>>>> - minor: could you add an exact example (similar to what Jay’s example >>> is, >>>>> or like your Spark/Pig pointers had) to make this super concrete? >>>>> >>>>> - my main concern is that we’re exposing this optimization to the DSL. >>> In >>>>> an ideal world, an optimizer would take the existing DSL and do the >>> right >>>>> thing under the covers (create just one state store, arrange the nodes >>>>> etc). The original DSL had a bunch of small, composable pieces (group, >>>>> aggregate, join) that this proposal groups together. I’d like to hear >>> your >>>>> thoughts on whether it’s possible to do this optimization with the >>> current >>>>> DSL, at the topology builder level. >>>>> >>>>> I think there will be scope for several such optimizations in the >>> future >>>>> and perhaps at some point we need to think about decoupling the 1:1 >>> mapping >>>>> from the DSL into the physical topology. >>>>> >>>>> Thanks >>>>> Eno >>>>> >>>>>> On May 5, 2017, at 4:39 PM, Jay Kreps <j...@confluent.io> wrote: >>>>>> >>>>>> I haven't digested the proposal but the use case is pretty common. An >>>>>> example would be the "customer 360" or "unified customer profile" use >>>>> case >>>>>> we often use. In that use case you have a dozen systems each of which >>> has >>>>>> some information about your customer (account details, settings, >>> billing >>>>>> info, customer service contacts, purchase history, etc). Your goal is >>> to >>>>>> join/munge these into a single profile record for each customer that >>> has >>>>>> all the relevant info in a usable form and is up-to-date with all the >>>>>> source systems. If you implement that with kstreams as a sequence of >>>>> joins >>>>>> i think today we'd fully materialize N-1 intermediate tables. But >>> clearly >>>>>> you only need a single stage to group all these things that are >>> already >>>>>> co-partitioned. A distributed database would do this under the covers >>>>> which >>>>>> is arguably better (at least when it does the right thing) and >>> perhaps we >>>>>> could do the same thing but I'm not sure we know the partitioning so >>> we >>>>> may >>>>>> need an explicit cogroup command that impllies they are already >>>>>> co-partitioned. >>>>>> >>>>>> -Jay >>>>>> >>>>>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman < >>> winkelman.k...@gmail.com >>>>>> >>>>>> wrote: >>>>>> >>>>>>> Yea thats a good way to look at it. >>>>>>> I have seen this type of functionality in a couple other platforms >>> like >>>>>>> spark and pig. >>>>>>> https://spark.apache.org/docs/0.6.2/api/core/spark/ >>>>> PairRDDFunctions.html >>>>>>> https://www.tutorialspoint.com/apache_pig/apache_pig_ >>>>> cogroup_operator.htm >>>>>>> >>>>>>> >>>>>>> On May 5, 2017 7:43 AM, "Damian Guy" <damian....@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Kyle, >>>>>>>> >>>>>>>> If i'm reading this correctly it is like an N way outer join? So an >>>>> input >>>>>>>> on any stream will always produce a new aggregated value - is that >>>>>>> correct? >>>>>>>> Effectively, each Aggregator just looks up the current value, >>>>> aggregates >>>>>>>> and forwards the result. >>>>>>>> I need to look into it and think about it a bit more, but it seems >>> like >>>>>>> it >>>>>>>> could be a useful optimization. >>>>>>>> >>>>>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman < >>> winkelman.k...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I sure can. I have added the following description to my KIP. If >>> this >>>>>>>>> doesn't help let me know and I will take some more time to build a >>>>>>>> diagram >>>>>>>>> and make more of a step by step description: >>>>>>>>> >>>>>>>>> Example with Current API: >>>>>>>>> >>>>>>>>> KTable<K, V1> table1 = >>>>>>>>> builder.stream("topic1").groupByKey().aggregate(initializer1, >>>>>>>> aggregator1, >>>>>>>>> aggValueSerde1, storeName1); >>>>>>>>> KTable<K, V2> table2 = >>>>>>>>> builder.stream("topic2").groupByKey().aggregate(initializer2, >>>>>>>> aggregator2, >>>>>>>>> aggValueSerde2, storeName2); >>>>>>>>> KTable<K, V3> table3 = >>>>>>>>> builder.stream("topic3").groupByKey().aggregate(initializer3, >>>>>>>> aggregator3, >>>>>>>>> aggValueSerde3, storeName3); >>>>>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2, >>>>>>>>> joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree); >>>>>>>>> >>>>>>>>> As you can see this creates 3 StateStores, requires 3 initializers, >>>>>>> and 3 >>>>>>>>> aggValueSerdes. This also adds the pressure to user to define what >>> the >>>>>>>>> intermediate values are going to be (V1, V2, V3). They are left >>> with a >>>>>>>>> couple choices, first to make V1, V2, and V3 all the same as CG and >>>>> the >>>>>>>> two >>>>>>>>> joiners are more like mergers, or second make them intermediate >>> states >>>>>>>> such >>>>>>>>> as Topic1Map, Topic2Map, and Topic3Map and the joiners use those to >>>>>>> build >>>>>>>>> the final aggregate CG value. This is something the user could >>> avoid >>>>>>>>> thinking about with this KIP. >>>>>>>>> >>>>>>>>> When a new input arrives lets say at "topic1" it will first go >>> through >>>>>>> a >>>>>>>>> KStreamAggregate grabbing the current aggregate from storeName1. It >>>>>>> will >>>>>>>>> produce this in the form of the first intermediate value and get >>> sent >>>>>>>>> through a KTableKTableOuterJoin where it will look up the current >>>>> value >>>>>>>> of >>>>>>>>> the key in storeName2. It will use the first joiner to calculate >>> the >>>>>>>> second >>>>>>>>> intermediate value, which will go through an additional >>>>>>>>> KTableKTableOuterJoin. Here it will look up the current value of >>> the >>>>>>> key >>>>>>>> in >>>>>>>>> storeName3 and use the second joiner to build the final aggregate >>>>>>> value. >>>>>>>>> >>>>>>>>> If you think through all possibilities for incoming topics you will >>>>> see >>>>>>>>> that no matter which topic it comes in through all three stores are >>>>>>>> queried >>>>>>>>> and all of the joiners must get used. >>>>>>>>> >>>>>>>>> Topology wise for N incoming streams this creates N >>>>>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1 >>>>>>>>> KTableKTableJoinMergers. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Example with Proposed API: >>>>>>>>> >>>>>>>>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1"). >>>>>>> groupByKey(); >>>>>>>>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2"). >>>>>>> groupByKey(); >>>>>>>>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3"). >>>>>>> groupByKey(); >>>>>>>>> KTable<K, CG> cogrouped = grouped1.cogroup(initializer1, >>> aggregator1, >>>>>>>>> aggValueSerde1, storeName1) >>>>>>>>> .cogroup(grouped2, aggregator2) >>>>>>>>> .cogroup(grouped3, aggregator3) >>>>>>>>> .aggregate(); >>>>>>>>> >>>>>>>>> As you can see this creates 1 StateStore, requires 1 initializer, >>> and >>>>> 1 >>>>>>>>> aggValueSerde. The user no longer has to worry about the >>> intermediate >>>>>>>>> values and the joiners. All they have to think about is how each >>>>> stream >>>>>>>>> impacts the creation of the final CG object. >>>>>>>>> >>>>>>>>> When a new input arrives lets say at "topic1" it will first go >>> through >>>>>>> a >>>>>>>>> KStreamAggreagte and grab the current aggregate from storeName1. It >>>>>>> will >>>>>>>>> add its incoming object to the aggregate, update the store and pass >>>>> the >>>>>>>> new >>>>>>>>> aggregate on. This new aggregate goes through the KStreamCogroup >>> which >>>>>>> is >>>>>>>>> pretty much just a pass through processor and you are done. >>>>>>>>> >>>>>>>>> Topology wise for N incoming streams the new api will only every >>>>>>> create N >>>>>>>>> KStreamAggregates and 1 KStreamCogroup. >>>>>>>>> >>>>>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax < >>>>> matth...@confluent.io >>>>>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Kyle, >>>>>>>>>> >>>>>>>>>> thanks a lot for the KIP. Maybe I am a little slow, but I could >>> not >>>>>>>>>> follow completely. Could you maybe add a more concrete example, >>> like >>>>>>> 3 >>>>>>>>>> streams with 3 records each (plus expected result), and show the >>>>>>>>>> difference between current way to to implement it and the proposed >>>>>>> API? >>>>>>>>>> This could also cover the internal processing to see what store >>> calls >>>>>>>>>> would be required for both approaches etc. >>>>>>>>>> >>>>>>>>>> I think, it's pretty advanced stuff you propose, and it would >>> help to >>>>>>>>>> understand it better. >>>>>>>>>> >>>>>>>>>> Thanks a lot! >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote: >>>>>>>>>>> I have made a pull request. It can be found here. >>>>>>>>>>> >>>>>>>>>>> https://github.com/apache/kafka/pull/2975 >>>>>>>>>>> >>>>>>>>>>> I plan to write some more unit tests for my classes and get >>> around >>>>>>> to >>>>>>>>>>> writing documentation for the public api additions. >>>>>>>>>>> >>>>>>>>>>> One thing I was curious about is during the >>>>>>>>>> KCogroupedStreamImpl#aggregate >>>>>>>>>>> method I pass null to the KGroupedStream#repartitionIfRequired >>>>>>>> method. >>>>>>>>> I >>>>>>>>>>> can't supply the store name because if more than one grouped >>> stream >>>>>>>>>>> repartitions an error is thrown. Is there some name that someone >>>>>>> can >>>>>>>>>>> recommend or should I leave the null and allow it to fall back to >>>>>>> the >>>>>>>>>>> KGroupedStream.name? >>>>>>>>>>> >>>>>>>>>>> Should this be expanded to handle grouped tables? This would be >>>>>>>> pretty >>>>>>>>>> easy >>>>>>>>>>> for a normal aggregate but one allowing session stores and >>> windowed >>>>>>>>>> stores >>>>>>>>>>> would required KTableSessionWindowAggregate and >>>>>>> KTableWindowAggregate >>>>>>>>>>> implementations. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Kyle >>>>>>>>>>> >>>>>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" <eno.there...@gmail.com> >>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> I’ll look as well asap, sorry, been swamped. >>>>>>>>>>>> >>>>>>>>>>>> Eno >>>>>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy <damian....@gmail.com> >>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi Kyle, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the KIP. I apologize that i haven't had the chance >>> to >>>>>>>> look >>>>>>>>>> at >>>>>>>>>>>>> the KIP yet, but will schedule some time to look into it >>>>>>> tomorrow. >>>>>>>>> For >>>>>>>>>>>> the >>>>>>>>>>>>> implementation, can you raise a PR against kafka trunk and mark >>>>>>> it >>>>>>>> as >>>>>>>>>>>> WIP? >>>>>>>>>>>>> It will be easier to review what you have done. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Damian >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman < >>>>>>>> winkelman.k...@gmail.com >>>>>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I am replying to this in hopes it will draw some attention to >>> my >>>>>>>> KIP >>>>>>>>>> as >>>>>>>>>>>> I >>>>>>>>>>>>>> haven't heard from anyone in a couple days. This is my first >>> KIP >>>>>>>> and >>>>>>>>>> my >>>>>>>>>>>>>> first large contribution to the project so I'm sure I did >>>>>>>> something >>>>>>>>>>>> wrong. >>>>>>>>>>>>>> ;) >>>>>>>>>>>>>> >>>>>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" < >>>>>>>> winkelman.k...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hello all, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I have created KIP-150 to facilitate discussion about adding >>>>>>>>> cogroup >>>>>>>>>> to >>>>>>>>>>>>>>> the streams DSL. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Please find the KIP here: >>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Please find my initial implementation here: >>>>>>>>>>>>>>> https://github.com/KyleWinkelman/kafka >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Kyle Winkelman >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>>> >>> >>>