Kyle, it might make sense to start a VOTE thread, it doesn’t seem there are more comments.
Thanks Eno > On May 16, 2017, at 6:42 PM, Damian Guy <damian....@gmail.com> wrote: > > Yeah - i wish we'd named KGroupedStream GroupedKStream, similarly for > KGroupedTable. > > On Tue, 16 May 2017 at 17:59 Kyle Winkelman <winkelman.k...@gmail.com> > wrote: > >> I have added code blocks and a note about the partial results. >> >> Can I ask why you dont like KCogroupedStream? I just think that because it >> is created from a KGroupedStream we should keep a similar name format. >> >> On May 16, 2017 9:23 AM, "Damian Guy" <damian....@gmail.com> wrote: >> >> Hi Kyle, >> >> Can you put the code examples etc in {code} blocks to make it easier to >> read? >> >> I think this is probably a pretty common use-case and therefore a >> worthwhile addition to the API. >> >> I'd suggest dropping the K from KCogroupedStream and calling it >> CogroupedStream or CogroupedKStream. >> In your example, wouldn't the output potentially have more partial results? >> I.e, for each input on any stream you'd potentially see a record produced? >> I think it is worth mentioning this. >> >> Thanks, >> Damian >> >> On Tue, 16 May 2017 at 12:26 Kyle Winkelman <winkelman.k...@gmail.com> >> wrote: >> >>> No problem, I just wanted to make sure people still had more to say. I >> will >>> wait another week. >>> >>> Thanks, >>> Kyle >>> >>> On May 16, 2017 4:25 AM, "Eno Thereska" <eno.there...@gmail.com> wrote: >>> >>>> Hi Kyle, >>>> >>>> Sorry for the delay in reviews, tomorrow is feature freeze deadline ( >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0 >>> < >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0 >>>> ) >>>> so people are busier than usual. Stay tuned. >>>> >>>> Eno >>>>> On 15 May 2017, at 13:25, Kyle Winkelman <winkelman.k...@gmail.com> >>>> wrote: >>>>> >>>>> Damian Guy, could you let me know if you plan to review this further? >>>> There >>>>> is no rush, but if you dont have any additional comments I could >> start >>>> the >>>>> voting and finish my WIP PR. >>>>> >>>>> Thanks, >>>>> Kyle >>>>> >>>>> On May 9, 2017 11:07 AM, "Kyle Winkelman" <winkelman.k...@gmail.com> >>>> wrote: >>>>> >>>>>> Eno, is there anyone else that is an expert in the kafka streams >> realm >>>>>> that I should reach out to for input? >>>>>> >>>>>> I believe Damian Guy is still planning on reviewing this more in >> depth >>>> so >>>>>> I will wait for his inputs before continuing. >>>>>> >>>>>> On May 9, 2017 7:30 AM, "Eno Thereska" <eno.there...@gmail.com> >>> wrote: >>>>>> >>>>>>> Thanks Kyle, good arguments. >>>>>>> >>>>>>> Eno >>>>>>> >>>>>>>> On May 7, 2017, at 5:06 PM, Kyle Winkelman < >>> winkelman.k...@gmail.com> >>>>>>> wrote: >>>>>>>> >>>>>>>> *- minor: could you add an exact example (similar to what Jay’s >>>> example >>>>>>> is, >>>>>>>> or like your Spark/Pig pointers had) to make this super concrete?* >>>>>>>> I have added a more concrete example to the KIP. >>>>>>>> >>>>>>>> *- my main concern is that we’re exposing this optimization to the >>>> DSL. >>>>>>> In >>>>>>>> an ideal world, an optimizer would take the existing DSL and do >> the >>>>>>> right >>>>>>>> thing under the covers (create just one state store, arrange the >>> nodes >>>>>>>> etc). The original DSL had a bunch of small, composable pieces >>> (group, >>>>>>>> aggregate, join) that this proposal groups together. I’d like to >>> hear >>>>>>> your >>>>>>>> thoughts on whether it’s possible to do this optimization with the >>>>>>> current >>>>>>>> DSL, at the topology builder level.* >>>>>>>> You would have to make a lot of checks to understand if it is even >>>>>>> possible >>>>>>>> to make this optimization: >>>>>>>> 1. Make sure they are all KTableKTableOuterJoins >>>>>>>> 2. None of the intermediate KTables are used for anything else. >>>>>>>> 3. None of the intermediate stores are used. (This may be >> impossible >>>>>>>> especially if they use KafkaStreams#store after the topology has >>>> already >>>>>>>> been built.) >>>>>>>> You would then need to make decisions during the optimization: >>>>>>>> 1. Your new initializer would the composite of all the individual >>>>>>>> initializers and the valueJoiners. >>>>>>>> 2. I am having a hard time thinking about how you would turn the >>>>>>>> aggregators and valueJoiners into an aggregator that would work on >>> the >>>>>>>> final object, but this may be possible. >>>>>>>> 3. Which state store would you use? The ones declared would be for >>> the >>>>>>>> aggregate values. None of the declared ones would be guaranteed to >>>> hold >>>>>>> the >>>>>>>> final object. This would mean you must created a new state store >> and >>>> not >>>>>>>> created any of the declared ones. >>>>>>>> >>>>>>>> The main argument I have against it is even if it could be done I >>>> don't >>>>>>>> know that we would want to have this be an optimization in the >>>>>>> background >>>>>>>> because the user would still be required to think about all of the >>>>>>>> intermediate values that they shouldn't need to worry about if >> they >>>> only >>>>>>>> care about the final object. >>>>>>>> >>>>>>>> In my opinion cogroup is a common enough case that it should be >> part >>>> of >>>>>>> the >>>>>>>> composable pieces (group, aggregate, join) because we want to >> allow >>>>>>> people >>>>>>>> to join more than 2 or more streams in an easy way. Right now I >>> don't >>>>>>> think >>>>>>>> we give them ways of handling this use case easily. >>>>>>>> >>>>>>>> *-I think there will be scope for several such optimizations in >> the >>>>>>> future >>>>>>>> and perhaps at some point we need to think about decoupling the >> 1:1 >>>>>>> mapping >>>>>>>> from the DSL into the physical topology.* >>>>>>>> I would argue that cogroup is not just an optimization it is a new >>> way >>>>>>> for >>>>>>>> the users to look at accomplishing a problem that requires >> multiple >>>>>>>> streams. I may sound like a broken record but I don't think users >>>> should >>>>>>>> have to build the N-1 intermediate tables and deal with their >>>>>>> initializers, >>>>>>>> serdes and stores if all they care about is the final object. >>>>>>>> Now if for example someone uses cogroup but doesn't supply >>> additional >>>>>>>> streams and aggregators this case is equivalent to a single >> grouped >>>>>>> stream >>>>>>>> making an aggregate call. This case is what I view an optimization >>> as, >>>>>>> we >>>>>>>> could remove the KStreamCogroup and act as if there was just a >> call >>> to >>>>>>>> KGroupedStream#aggregate instead of calling >> KGroupedStream#cogroup. >>> (I >>>>>>>> would prefer to just write a warning saying that this is not how >>>>>>> cogroup is >>>>>>>> to be used.) >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Kyle >>>>>>>> >>>>>>>> On Sun, May 7, 2017 at 5:41 AM, Eno Thereska < >>> eno.there...@gmail.com> >>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Kyle, >>>>>>>>> >>>>>>>>> Thanks for the KIP again. A couple of comments: >>>>>>>>> >>>>>>>>> - minor: could you add an exact example (similar to what Jay’s >>>> example >>>>>>> is, >>>>>>>>> or like your Spark/Pig pointers had) to make this super concrete? >>>>>>>>> >>>>>>>>> - my main concern is that we’re exposing this optimization to the >>>> DSL. >>>>>>> In >>>>>>>>> an ideal world, an optimizer would take the existing DSL and do >> the >>>>>>> right >>>>>>>>> thing under the covers (create just one state store, arrange the >>>> nodes >>>>>>>>> etc). The original DSL had a bunch of small, composable pieces >>>> (group, >>>>>>>>> aggregate, join) that this proposal groups together. I’d like to >>> hear >>>>>>> your >>>>>>>>> thoughts on whether it’s possible to do this optimization with >> the >>>>>>> current >>>>>>>>> DSL, at the topology builder level. >>>>>>>>> >>>>>>>>> I think there will be scope for several such optimizations in the >>>>>>> future >>>>>>>>> and perhaps at some point we need to think about decoupling the >> 1:1 >>>>>>> mapping >>>>>>>>> from the DSL into the physical topology. >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Eno >>>>>>>>> >>>>>>>>>> On May 5, 2017, at 4:39 PM, Jay Kreps <j...@confluent.io> wrote: >>>>>>>>>> >>>>>>>>>> I haven't digested the proposal but the use case is pretty >> common. >>>> An >>>>>>>>>> example would be the "customer 360" or "unified customer >> profile" >>>> use >>>>>>>>> case >>>>>>>>>> we often use. In that use case you have a dozen systems each of >>>> which >>>>>>> has >>>>>>>>>> some information about your customer (account details, settings, >>>>>>> billing >>>>>>>>>> info, customer service contacts, purchase history, etc). Your >> goal >>>> is >>>>>>> to >>>>>>>>>> join/munge these into a single profile record for each customer >>> that >>>>>>> has >>>>>>>>>> all the relevant info in a usable form and is up-to-date with >> all >>>> the >>>>>>>>>> source systems. If you implement that with kstreams as a >> sequence >>> of >>>>>>>>> joins >>>>>>>>>> i think today we'd fully materialize N-1 intermediate tables. >> But >>>>>>> clearly >>>>>>>>>> you only need a single stage to group all these things that are >>>>>>> already >>>>>>>>>> co-partitioned. A distributed database would do this under the >>>> covers >>>>>>>>> which >>>>>>>>>> is arguably better (at least when it does the right thing) and >>>>>>> perhaps we >>>>>>>>>> could do the same thing but I'm not sure we know the >> partitioning >>> so >>>>>>> we >>>>>>>>> may >>>>>>>>>> need an explicit cogroup command that impllies they are already >>>>>>>>>> co-partitioned. >>>>>>>>>> >>>>>>>>>> -Jay >>>>>>>>>> >>>>>>>>>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman < >>>>>>> winkelman.k...@gmail.com >>>>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Yea thats a good way to look at it. >>>>>>>>>>> I have seen this type of functionality in a couple other >>> platforms >>>>>>> like >>>>>>>>>>> spark and pig. >>>>>>>>>>> https://spark.apache.org/docs/0.6.2/api/core/spark/ >>>>>>>>> PairRDDFunctions.html >>>>>>>>>>> https://www.tutorialspoint.com/apache_pig/apache_pig_ >>>>>>>>> cogroup_operator.htm >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On May 5, 2017 7:43 AM, "Damian Guy" <damian....@gmail.com> >>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Kyle, >>>>>>>>>>>> >>>>>>>>>>>> If i'm reading this correctly it is like an N way outer join? >> So >>>> an >>>>>>>>> input >>>>>>>>>>>> on any stream will always produce a new aggregated value - is >>> that >>>>>>>>>>> correct? >>>>>>>>>>>> Effectively, each Aggregator just looks up the current value, >>>>>>>>> aggregates >>>>>>>>>>>> and forwards the result. >>>>>>>>>>>> I need to look into it and think about it a bit more, but it >>> seems >>>>>>> like >>>>>>>>>>> it >>>>>>>>>>>> could be a useful optimization. >>>>>>>>>>>> >>>>>>>>>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman < >>>>>>> winkelman.k...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I sure can. I have added the following description to my KIP. >>> If >>>>>>> this >>>>>>>>>>>>> doesn't help let me know and I will take some more time to >>> build >>>> a >>>>>>>>>>>> diagram >>>>>>>>>>>>> and make more of a step by step description: >>>>>>>>>>>>> >>>>>>>>>>>>> Example with Current API: >>>>>>>>>>>>> >>>>>>>>>>>>> KTable<K, V1> table1 = >>>>>>>>>>>>> builder.stream("topic1").groupByKey().aggregate(initializer1, >>>>>>>>>>>> aggregator1, >>>>>>>>>>>>> aggValueSerde1, storeName1); >>>>>>>>>>>>> KTable<K, V2> table2 = >>>>>>>>>>>>> builder.stream("topic2").groupByKey().aggregate(initializer2, >>>>>>>>>>>> aggregator2, >>>>>>>>>>>>> aggValueSerde2, storeName2); >>>>>>>>>>>>> KTable<K, V3> table3 = >>>>>>>>>>>>> builder.stream("topic3").groupByKey().aggregate(initializer3, >>>>>>>>>>>> aggregator3, >>>>>>>>>>>>> aggValueSerde3, storeName3); >>>>>>>>>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2, >>>>>>>>>>>>> joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree); >>>>>>>>>>>>> >>>>>>>>>>>>> As you can see this creates 3 StateStores, requires 3 >>>> initializers, >>>>>>>>>>> and 3 >>>>>>>>>>>>> aggValueSerdes. This also adds the pressure to user to define >>>> what >>>>>>> the >>>>>>>>>>>>> intermediate values are going to be (V1, V2, V3). They are >> left >>>>>>> with a >>>>>>>>>>>>> couple choices, first to make V1, V2, and V3 all the same as >> CG >>>> and >>>>>>>>> the >>>>>>>>>>>> two >>>>>>>>>>>>> joiners are more like mergers, or second make them >> intermediate >>>>>>> states >>>>>>>>>>>> such >>>>>>>>>>>>> as Topic1Map, Topic2Map, and Topic3Map and the joiners use >>> those >>>> to >>>>>>>>>>> build >>>>>>>>>>>>> the final aggregate CG value. This is something the user >> could >>>>>>> avoid >>>>>>>>>>>>> thinking about with this KIP. >>>>>>>>>>>>> >>>>>>>>>>>>> When a new input arrives lets say at "topic1" it will first >> go >>>>>>> through >>>>>>>>>>> a >>>>>>>>>>>>> KStreamAggregate grabbing the current aggregate from >>> storeName1. >>>> It >>>>>>>>>>> will >>>>>>>>>>>>> produce this in the form of the first intermediate value and >>> get >>>>>>> sent >>>>>>>>>>>>> through a KTableKTableOuterJoin where it will look up the >>> current >>>>>>>>> value >>>>>>>>>>>> of >>>>>>>>>>>>> the key in storeName2. It will use the first joiner to >>> calculate >>>>>>> the >>>>>>>>>>>> second >>>>>>>>>>>>> intermediate value, which will go through an additional >>>>>>>>>>>>> KTableKTableOuterJoin. Here it will look up the current value >>> of >>>>>>> the >>>>>>>>>>> key >>>>>>>>>>>> in >>>>>>>>>>>>> storeName3 and use the second joiner to build the final >>> aggregate >>>>>>>>>>> value. >>>>>>>>>>>>> >>>>>>>>>>>>> If you think through all possibilities for incoming topics >> you >>>> will >>>>>>>>> see >>>>>>>>>>>>> that no matter which topic it comes in through all three >> stores >>>> are >>>>>>>>>>>> queried >>>>>>>>>>>>> and all of the joiners must get used. >>>>>>>>>>>>> >>>>>>>>>>>>> Topology wise for N incoming streams this creates N >>>>>>>>>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1 >>>>>>>>>>>>> KTableKTableJoinMergers. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Example with Proposed API: >>>>>>>>>>>>> >>>>>>>>>>>>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1"). >>>>>>>>>>> groupByKey(); >>>>>>>>>>>>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2"). >>>>>>>>>>> groupByKey(); >>>>>>>>>>>>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3"). >>>>>>>>>>> groupByKey(); >>>>>>>>>>>>> KTable<K, CG> cogrouped = grouped1.cogroup(initializer1, >>>>>>> aggregator1, >>>>>>>>>>>>> aggValueSerde1, storeName1) >>>>>>>>>>>>> .cogroup(grouped2, aggregator2) >>>>>>>>>>>>> .cogroup(grouped3, aggregator3) >>>>>>>>>>>>> .aggregate(); >>>>>>>>>>>>> >>>>>>>>>>>>> As you can see this creates 1 StateStore, requires 1 >>> initializer, >>>>>>> and >>>>>>>>> 1 >>>>>>>>>>>>> aggValueSerde. The user no longer has to worry about the >>>>>>> intermediate >>>>>>>>>>>>> values and the joiners. All they have to think about is how >>> each >>>>>>>>> stream >>>>>>>>>>>>> impacts the creation of the final CG object. >>>>>>>>>>>>> >>>>>>>>>>>>> When a new input arrives lets say at "topic1" it will first >> go >>>>>>> through >>>>>>>>>>> a >>>>>>>>>>>>> KStreamAggreagte and grab the current aggregate from >>> storeName1. >>>> It >>>>>>>>>>> will >>>>>>>>>>>>> add its incoming object to the aggregate, update the store >> and >>>> pass >>>>>>>>> the >>>>>>>>>>>> new >>>>>>>>>>>>> aggregate on. This new aggregate goes through the >>> KStreamCogroup >>>>>>> which >>>>>>>>>>> is >>>>>>>>>>>>> pretty much just a pass through processor and you are done. >>>>>>>>>>>>> >>>>>>>>>>>>> Topology wise for N incoming streams the new api will only >>> every >>>>>>>>>>> create N >>>>>>>>>>>>> KStreamAggregates and 1 KStreamCogroup. >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax < >>>>>>>>> matth...@confluent.io >>>>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Kyle, >>>>>>>>>>>>>> >>>>>>>>>>>>>> thanks a lot for the KIP. Maybe I am a little slow, but I >>> could >>>>>>> not >>>>>>>>>>>>>> follow completely. Could you maybe add a more concrete >>> example, >>>>>>> like >>>>>>>>>>> 3 >>>>>>>>>>>>>> streams with 3 records each (plus expected result), and show >>> the >>>>>>>>>>>>>> difference between current way to to implement it and the >>>> proposed >>>>>>>>>>> API? >>>>>>>>>>>>>> This could also cover the internal processing to see what >>> store >>>>>>> calls >>>>>>>>>>>>>> would be required for both approaches etc. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I think, it's pretty advanced stuff you propose, and it >> would >>>>>>> help to >>>>>>>>>>>>>> understand it better. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks a lot! >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote: >>>>>>>>>>>>>>> I have made a pull request. It can be found here. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/2975 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I plan to write some more unit tests for my classes and get >>>>>>> around >>>>>>>>>>> to >>>>>>>>>>>>>>> writing documentation for the public api additions. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> One thing I was curious about is during the >>>>>>>>>>>>>> KCogroupedStreamImpl#aggregate >>>>>>>>>>>>>>> method I pass null to the >>> KGroupedStream#repartitionIfRequired >>>>>>>>>>>> method. >>>>>>>>>>>>> I >>>>>>>>>>>>>>> can't supply the store name because if more than one >> grouped >>>>>>> stream >>>>>>>>>>>>>>> repartitions an error is thrown. Is there some name that >>>> someone >>>>>>>>>>> can >>>>>>>>>>>>>>> recommend or should I leave the null and allow it to fall >>> back >>>> to >>>>>>>>>>> the >>>>>>>>>>>>>>> KGroupedStream.name? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Should this be expanded to handle grouped tables? This >> would >>> be >>>>>>>>>>>> pretty >>>>>>>>>>>>>> easy >>>>>>>>>>>>>>> for a normal aggregate but one allowing session stores and >>>>>>> windowed >>>>>>>>>>>>>> stores >>>>>>>>>>>>>>> would required KTableSessionWindowAggregate and >>>>>>>>>>> KTableWindowAggregate >>>>>>>>>>>>>>> implementations. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Kyle >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" < >>> eno.there...@gmail.com >>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I’ll look as well asap, sorry, been swamped. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Eno >>>>>>>>>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy < >>> damian....@gmail.com >>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Kyle, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for the KIP. I apologize that i haven't had the >>> chance >>>>>>> to >>>>>>>>>>>> look >>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> the KIP yet, but will schedule some time to look into it >>>>>>>>>>> tomorrow. >>>>>>>>>>>>> For >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> implementation, can you raise a PR against kafka trunk >> and >>>> mark >>>>>>>>>>> it >>>>>>>>>>>> as >>>>>>>>>>>>>>>> WIP? >>>>>>>>>>>>>>>>> It will be easier to review what you have done. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> Damian >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman < >>>>>>>>>>>> winkelman.k...@gmail.com >>>>>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I am replying to this in hopes it will draw some >> attention >>>> to >>>>>>> my >>>>>>>>>>>> KIP >>>>>>>>>>>>>> as >>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>> haven't heard from anyone in a couple days. This is my >>> first >>>>>>> KIP >>>>>>>>>>>> and >>>>>>>>>>>>>> my >>>>>>>>>>>>>>>>>> first large contribution to the project so I'm sure I >> did >>>>>>>>>>>> something >>>>>>>>>>>>>>>> wrong. >>>>>>>>>>>>>>>>>> ;) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" < >>>>>>>>>>>> winkelman.k...@gmail.com> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hello all, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I have created KIP-150 to facilitate discussion about >>>> adding >>>>>>>>>>>>> cogroup >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> the streams DSL. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Please find the KIP here: >>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Please find my initial implementation here: >>>>>>>>>>>>>>>>>>> https://github.com/KyleWinkelman/kafka >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>> Kyle Winkelman >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> >>>> >>>> >>> >>