I have added code blocks and a note about the partial results. Can I ask why you dont like KCogroupedStream? I just think that because it is created from a KGroupedStream we should keep a similar name format.
On May 16, 2017 9:23 AM, "Damian Guy" <damian....@gmail.com> wrote: Hi Kyle, Can you put the code examples etc in {code} blocks to make it easier to read? I think this is probably a pretty common use-case and therefore a worthwhile addition to the API. I'd suggest dropping the K from KCogroupedStream and calling it CogroupedStream or CogroupedKStream. In your example, wouldn't the output potentially have more partial results? I.e, for each input on any stream you'd potentially see a record produced? I think it is worth mentioning this. Thanks, Damian On Tue, 16 May 2017 at 12:26 Kyle Winkelman <winkelman.k...@gmail.com> wrote: > No problem, I just wanted to make sure people still had more to say. I will > wait another week. > > Thanks, > Kyle > > On May 16, 2017 4:25 AM, "Eno Thereska" <eno.there...@gmail.com> wrote: > > > Hi Kyle, > > > > Sorry for the delay in reviews, tomorrow is feature freeze deadline ( > > https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0 > < > > https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0 > >) > > so people are busier than usual. Stay tuned. > > > > Eno > > > On 15 May 2017, at 13:25, Kyle Winkelman <winkelman.k...@gmail.com> > > wrote: > > > > > > Damian Guy, could you let me know if you plan to review this further? > > There > > > is no rush, but if you dont have any additional comments I could start > > the > > > voting and finish my WIP PR. > > > > > > Thanks, > > > Kyle > > > > > > On May 9, 2017 11:07 AM, "Kyle Winkelman" <winkelman.k...@gmail.com> > > wrote: > > > > > >> Eno, is there anyone else that is an expert in the kafka streams realm > > >> that I should reach out to for input? > > >> > > >> I believe Damian Guy is still planning on reviewing this more in depth > > so > > >> I will wait for his inputs before continuing. > > >> > > >> On May 9, 2017 7:30 AM, "Eno Thereska" <eno.there...@gmail.com> > wrote: > > >> > > >>> Thanks Kyle, good arguments. > > >>> > > >>> Eno > > >>> > > >>>> On May 7, 2017, at 5:06 PM, Kyle Winkelman < > winkelman.k...@gmail.com> > > >>> wrote: > > >>>> > > >>>> *- minor: could you add an exact example (similar to what Jay’s > > example > > >>> is, > > >>>> or like your Spark/Pig pointers had) to make this super concrete?* > > >>>> I have added a more concrete example to the KIP. > > >>>> > > >>>> *- my main concern is that we’re exposing this optimization to the > > DSL. > > >>> In > > >>>> an ideal world, an optimizer would take the existing DSL and do the > > >>> right > > >>>> thing under the covers (create just one state store, arrange the > nodes > > >>>> etc). The original DSL had a bunch of small, composable pieces > (group, > > >>>> aggregate, join) that this proposal groups together. I’d like to > hear > > >>> your > > >>>> thoughts on whether it’s possible to do this optimization with the > > >>> current > > >>>> DSL, at the topology builder level.* > > >>>> You would have to make a lot of checks to understand if it is even > > >>> possible > > >>>> to make this optimization: > > >>>> 1. Make sure they are all KTableKTableOuterJoins > > >>>> 2. None of the intermediate KTables are used for anything else. > > >>>> 3. None of the intermediate stores are used. (This may be impossible > > >>>> especially if they use KafkaStreams#store after the topology has > > already > > >>>> been built.) > > >>>> You would then need to make decisions during the optimization: > > >>>> 1. Your new initializer would the composite of all the individual > > >>>> initializers and the valueJoiners. > > >>>> 2. I am having a hard time thinking about how you would turn the > > >>>> aggregators and valueJoiners into an aggregator that would work on > the > > >>>> final object, but this may be possible. > > >>>> 3. Which state store would you use? The ones declared would be for > the > > >>>> aggregate values. None of the declared ones would be guaranteed to > > hold > > >>> the > > >>>> final object. This would mean you must created a new state store and > > not > > >>>> created any of the declared ones. > > >>>> > > >>>> The main argument I have against it is even if it could be done I > > don't > > >>>> know that we would want to have this be an optimization in the > > >>> background > > >>>> because the user would still be required to think about all of the > > >>>> intermediate values that they shouldn't need to worry about if they > > only > > >>>> care about the final object. > > >>>> > > >>>> In my opinion cogroup is a common enough case that it should be part > > of > > >>> the > > >>>> composable pieces (group, aggregate, join) because we want to allow > > >>> people > > >>>> to join more than 2 or more streams in an easy way. Right now I > don't > > >>> think > > >>>> we give them ways of handling this use case easily. > > >>>> > > >>>> *-I think there will be scope for several such optimizations in the > > >>> future > > >>>> and perhaps at some point we need to think about decoupling the 1:1 > > >>> mapping > > >>>> from the DSL into the physical topology.* > > >>>> I would argue that cogroup is not just an optimization it is a new > way > > >>> for > > >>>> the users to look at accomplishing a problem that requires multiple > > >>>> streams. I may sound like a broken record but I don't think users > > should > > >>>> have to build the N-1 intermediate tables and deal with their > > >>> initializers, > > >>>> serdes and stores if all they care about is the final object. > > >>>> Now if for example someone uses cogroup but doesn't supply > additional > > >>>> streams and aggregators this case is equivalent to a single grouped > > >>> stream > > >>>> making an aggregate call. This case is what I view an optimization > as, > > >>> we > > >>>> could remove the KStreamCogroup and act as if there was just a call > to > > >>>> KGroupedStream#aggregate instead of calling KGroupedStream#cogroup. > (I > > >>>> would prefer to just write a warning saying that this is not how > > >>> cogroup is > > >>>> to be used.) > > >>>> > > >>>> Thanks, > > >>>> Kyle > > >>>> > > >>>> On Sun, May 7, 2017 at 5:41 AM, Eno Thereska < > eno.there...@gmail.com> > > >>> wrote: > > >>>> > > >>>>> Hi Kyle, > > >>>>> > > >>>>> Thanks for the KIP again. A couple of comments: > > >>>>> > > >>>>> - minor: could you add an exact example (similar to what Jay’s > > example > > >>> is, > > >>>>> or like your Spark/Pig pointers had) to make this super concrete? > > >>>>> > > >>>>> - my main concern is that we’re exposing this optimization to the > > DSL. > > >>> In > > >>>>> an ideal world, an optimizer would take the existing DSL and do the > > >>> right > > >>>>> thing under the covers (create just one state store, arrange the > > nodes > > >>>>> etc). The original DSL had a bunch of small, composable pieces > > (group, > > >>>>> aggregate, join) that this proposal groups together. I’d like to > hear > > >>> your > > >>>>> thoughts on whether it’s possible to do this optimization with the > > >>> current > > >>>>> DSL, at the topology builder level. > > >>>>> > > >>>>> I think there will be scope for several such optimizations in the > > >>> future > > >>>>> and perhaps at some point we need to think about decoupling the 1:1 > > >>> mapping > > >>>>> from the DSL into the physical topology. > > >>>>> > > >>>>> Thanks > > >>>>> Eno > > >>>>> > > >>>>>> On May 5, 2017, at 4:39 PM, Jay Kreps <j...@confluent.io> wrote: > > >>>>>> > > >>>>>> I haven't digested the proposal but the use case is pretty common. > > An > > >>>>>> example would be the "customer 360" or "unified customer profile" > > use > > >>>>> case > > >>>>>> we often use. In that use case you have a dozen systems each of > > which > > >>> has > > >>>>>> some information about your customer (account details, settings, > > >>> billing > > >>>>>> info, customer service contacts, purchase history, etc). Your goal > > is > > >>> to > > >>>>>> join/munge these into a single profile record for each customer > that > > >>> has > > >>>>>> all the relevant info in a usable form and is up-to-date with all > > the > > >>>>>> source systems. If you implement that with kstreams as a sequence > of > > >>>>> joins > > >>>>>> i think today we'd fully materialize N-1 intermediate tables. But > > >>> clearly > > >>>>>> you only need a single stage to group all these things that are > > >>> already > > >>>>>> co-partitioned. A distributed database would do this under the > > covers > > >>>>> which > > >>>>>> is arguably better (at least when it does the right thing) and > > >>> perhaps we > > >>>>>> could do the same thing but I'm not sure we know the partitioning > so > > >>> we > > >>>>> may > > >>>>>> need an explicit cogroup command that impllies they are already > > >>>>>> co-partitioned. > > >>>>>> > > >>>>>> -Jay > > >>>>>> > > >>>>>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman < > > >>> winkelman.k...@gmail.com > > >>>>>> > > >>>>>> wrote: > > >>>>>> > > >>>>>>> Yea thats a good way to look at it. > > >>>>>>> I have seen this type of functionality in a couple other > platforms > > >>> like > > >>>>>>> spark and pig. > > >>>>>>> https://spark.apache.org/docs/0.6.2/api/core/spark/ > > >>>>> PairRDDFunctions.html > > >>>>>>> https://www.tutorialspoint.com/apache_pig/apache_pig_ > > >>>>> cogroup_operator.htm > > >>>>>>> > > >>>>>>> > > >>>>>>> On May 5, 2017 7:43 AM, "Damian Guy" <damian....@gmail.com> > wrote: > > >>>>>>> > > >>>>>>>> Hi Kyle, > > >>>>>>>> > > >>>>>>>> If i'm reading this correctly it is like an N way outer join? So > > an > > >>>>> input > > >>>>>>>> on any stream will always produce a new aggregated value - is > that > > >>>>>>> correct? > > >>>>>>>> Effectively, each Aggregator just looks up the current value, > > >>>>> aggregates > > >>>>>>>> and forwards the result. > > >>>>>>>> I need to look into it and think about it a bit more, but it > seems > > >>> like > > >>>>>>> it > > >>>>>>>> could be a useful optimization. > > >>>>>>>> > > >>>>>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman < > > >>> winkelman.k...@gmail.com> > > >>>>>>>> wrote: > > >>>>>>>> > > >>>>>>>>> I sure can. I have added the following description to my KIP. > If > > >>> this > > >>>>>>>>> doesn't help let me know and I will take some more time to > build > > a > > >>>>>>>> diagram > > >>>>>>>>> and make more of a step by step description: > > >>>>>>>>> > > >>>>>>>>> Example with Current API: > > >>>>>>>>> > > >>>>>>>>> KTable<K, V1> table1 = > > >>>>>>>>> builder.stream("topic1").groupByKey().aggregate(initializer1, > > >>>>>>>> aggregator1, > > >>>>>>>>> aggValueSerde1, storeName1); > > >>>>>>>>> KTable<K, V2> table2 = > > >>>>>>>>> builder.stream("topic2").groupByKey().aggregate(initializer2, > > >>>>>>>> aggregator2, > > >>>>>>>>> aggValueSerde2, storeName2); > > >>>>>>>>> KTable<K, V3> table3 = > > >>>>>>>>> builder.stream("topic3").groupByKey().aggregate(initializer3, > > >>>>>>>> aggregator3, > > >>>>>>>>> aggValueSerde3, storeName3); > > >>>>>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2, > > >>>>>>>>> joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree); > > >>>>>>>>> > > >>>>>>>>> As you can see this creates 3 StateStores, requires 3 > > initializers, > > >>>>>>> and 3 > > >>>>>>>>> aggValueSerdes. This also adds the pressure to user to define > > what > > >>> the > > >>>>>>>>> intermediate values are going to be (V1, V2, V3). They are left > > >>> with a > > >>>>>>>>> couple choices, first to make V1, V2, and V3 all the same as CG > > and > > >>>>> the > > >>>>>>>> two > > >>>>>>>>> joiners are more like mergers, or second make them intermediate > > >>> states > > >>>>>>>> such > > >>>>>>>>> as Topic1Map, Topic2Map, and Topic3Map and the joiners use > those > > to > > >>>>>>> build > > >>>>>>>>> the final aggregate CG value. This is something the user could > > >>> avoid > > >>>>>>>>> thinking about with this KIP. > > >>>>>>>>> > > >>>>>>>>> When a new input arrives lets say at "topic1" it will first go > > >>> through > > >>>>>>> a > > >>>>>>>>> KStreamAggregate grabbing the current aggregate from > storeName1. > > It > > >>>>>>> will > > >>>>>>>>> produce this in the form of the first intermediate value and > get > > >>> sent > > >>>>>>>>> through a KTableKTableOuterJoin where it will look up the > current > > >>>>> value > > >>>>>>>> of > > >>>>>>>>> the key in storeName2. It will use the first joiner to > calculate > > >>> the > > >>>>>>>> second > > >>>>>>>>> intermediate value, which will go through an additional > > >>>>>>>>> KTableKTableOuterJoin. Here it will look up the current value > of > > >>> the > > >>>>>>> key > > >>>>>>>> in > > >>>>>>>>> storeName3 and use the second joiner to build the final > aggregate > > >>>>>>> value. > > >>>>>>>>> > > >>>>>>>>> If you think through all possibilities for incoming topics you > > will > > >>>>> see > > >>>>>>>>> that no matter which topic it comes in through all three stores > > are > > >>>>>>>> queried > > >>>>>>>>> and all of the joiners must get used. > > >>>>>>>>> > > >>>>>>>>> Topology wise for N incoming streams this creates N > > >>>>>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1 > > >>>>>>>>> KTableKTableJoinMergers. > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> Example with Proposed API: > > >>>>>>>>> > > >>>>>>>>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1"). > > >>>>>>> groupByKey(); > > >>>>>>>>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2"). > > >>>>>>> groupByKey(); > > >>>>>>>>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3"). > > >>>>>>> groupByKey(); > > >>>>>>>>> KTable<K, CG> cogrouped = grouped1.cogroup(initializer1, > > >>> aggregator1, > > >>>>>>>>> aggValueSerde1, storeName1) > > >>>>>>>>> .cogroup(grouped2, aggregator2) > > >>>>>>>>> .cogroup(grouped3, aggregator3) > > >>>>>>>>> .aggregate(); > > >>>>>>>>> > > >>>>>>>>> As you can see this creates 1 StateStore, requires 1 > initializer, > > >>> and > > >>>>> 1 > > >>>>>>>>> aggValueSerde. The user no longer has to worry about the > > >>> intermediate > > >>>>>>>>> values and the joiners. All they have to think about is how > each > > >>>>> stream > > >>>>>>>>> impacts the creation of the final CG object. > > >>>>>>>>> > > >>>>>>>>> When a new input arrives lets say at "topic1" it will first go > > >>> through > > >>>>>>> a > > >>>>>>>>> KStreamAggreagte and grab the current aggregate from > storeName1. > > It > > >>>>>>> will > > >>>>>>>>> add its incoming object to the aggregate, update the store and > > pass > > >>>>> the > > >>>>>>>> new > > >>>>>>>>> aggregate on. This new aggregate goes through the > KStreamCogroup > > >>> which > > >>>>>>> is > > >>>>>>>>> pretty much just a pass through processor and you are done. > > >>>>>>>>> > > >>>>>>>>> Topology wise for N incoming streams the new api will only > every > > >>>>>>> create N > > >>>>>>>>> KStreamAggregates and 1 KStreamCogroup. > > >>>>>>>>> > > >>>>>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax < > > >>>>> matth...@confluent.io > > >>>>>>>> > > >>>>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>>> Kyle, > > >>>>>>>>>> > > >>>>>>>>>> thanks a lot for the KIP. Maybe I am a little slow, but I > could > > >>> not > > >>>>>>>>>> follow completely. Could you maybe add a more concrete > example, > > >>> like > > >>>>>>> 3 > > >>>>>>>>>> streams with 3 records each (plus expected result), and show > the > > >>>>>>>>>> difference between current way to to implement it and the > > proposed > > >>>>>>> API? > > >>>>>>>>>> This could also cover the internal processing to see what > store > > >>> calls > > >>>>>>>>>> would be required for both approaches etc. > > >>>>>>>>>> > > >>>>>>>>>> I think, it's pretty advanced stuff you propose, and it would > > >>> help to > > >>>>>>>>>> understand it better. > > >>>>>>>>>> > > >>>>>>>>>> Thanks a lot! > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> -Matthias > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote: > > >>>>>>>>>>> I have made a pull request. It can be found here. > > >>>>>>>>>>> > > >>>>>>>>>>> https://github.com/apache/kafka/pull/2975 > > >>>>>>>>>>> > > >>>>>>>>>>> I plan to write some more unit tests for my classes and get > > >>> around > > >>>>>>> to > > >>>>>>>>>>> writing documentation for the public api additions. > > >>>>>>>>>>> > > >>>>>>>>>>> One thing I was curious about is during the > > >>>>>>>>>> KCogroupedStreamImpl#aggregate > > >>>>>>>>>>> method I pass null to the > KGroupedStream#repartitionIfRequired > > >>>>>>>> method. > > >>>>>>>>> I > > >>>>>>>>>>> can't supply the store name because if more than one grouped > > >>> stream > > >>>>>>>>>>> repartitions an error is thrown. Is there some name that > > someone > > >>>>>>> can > > >>>>>>>>>>> recommend or should I leave the null and allow it to fall > back > > to > > >>>>>>> the > > >>>>>>>>>>> KGroupedStream.name? > > >>>>>>>>>>> > > >>>>>>>>>>> Should this be expanded to handle grouped tables? This would > be > > >>>>>>>> pretty > > >>>>>>>>>> easy > > >>>>>>>>>>> for a normal aggregate but one allowing session stores and > > >>> windowed > > >>>>>>>>>> stores > > >>>>>>>>>>> would required KTableSessionWindowAggregate and > > >>>>>>> KTableWindowAggregate > > >>>>>>>>>>> implementations. > > >>>>>>>>>>> > > >>>>>>>>>>> Thanks, > > >>>>>>>>>>> Kyle > > >>>>>>>>>>> > > >>>>>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" < > eno.there...@gmail.com > > > > > >>>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>>> I’ll look as well asap, sorry, been swamped. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Eno > > >>>>>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy < > damian....@gmail.com > > > > > >>>>>>>> wrote: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Hi Kyle, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Thanks for the KIP. I apologize that i haven't had the > chance > > >>> to > > >>>>>>>> look > > >>>>>>>>>> at > > >>>>>>>>>>>>> the KIP yet, but will schedule some time to look into it > > >>>>>>> tomorrow. > > >>>>>>>>> For > > >>>>>>>>>>>> the > > >>>>>>>>>>>>> implementation, can you raise a PR against kafka trunk and > > mark > > >>>>>>> it > > >>>>>>>> as > > >>>>>>>>>>>> WIP? > > >>>>>>>>>>>>> It will be easier to review what you have done. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>> Damian > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman < > > >>>>>>>> winkelman.k...@gmail.com > > >>>>>>>>>> > > >>>>>>>>>>>> wrote: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> I am replying to this in hopes it will draw some attention > > to > > >>> my > > >>>>>>>> KIP > > >>>>>>>>>> as > > >>>>>>>>>>>> I > > >>>>>>>>>>>>>> haven't heard from anyone in a couple days. This is my > first > > >>> KIP > > >>>>>>>> and > > >>>>>>>>>> my > > >>>>>>>>>>>>>> first large contribution to the project so I'm sure I did > > >>>>>>>> something > > >>>>>>>>>>>> wrong. > > >>>>>>>>>>>>>> ;) > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" < > > >>>>>>>> winkelman.k...@gmail.com> > > >>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Hello all, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> I have created KIP-150 to facilitate discussion about > > adding > > >>>>>>>>> cogroup > > >>>>>>>>>> to > > >>>>>>>>>>>>>>> the streams DSL. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Please find the KIP here: > > >>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >>>>>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Please find my initial implementation here: > > >>>>>>>>>>>>>>> https://github.com/KyleWinkelman/kafka > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>> Kyle Winkelman > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>> > > >>>>> > > >>> > > >>> > > > > >