Kyle, Thanks for the explanations, my previous read on the wiki examples was wrong.
So I guess my motivation should be "reduced" to: can we move the window specs param from "KGroupedStream#cogroup(..)" to "CogroupedKStream#aggregate(..)", and my motivations are: 1. minor: we can reduce the #.generics in CogroupedKStream from 3 to 2. 2. major: this is for extensibility of the APIs, and since we are removing the "Evolving" annotations on Streams it may be harder to change it again in the future. The extended use cases are that people wanted to have windowed running aggregates on different granularities, e.g. "give me the counts per-minute, per-hour, per-day and per-week", and today in DSL we need to specify that case in multiple aggregate operators, which gets a state store / changelog, etc. And it is possible to optimize it as well to a single state store. Its implementation would be tricky as you need to contain different lengthed windows within your window store but just from the public API point of view, it could be specified as: CogroupedKStream stream = stream1.cogroup(stream2, ... "state-store-name"); table1 = stream.aggregate(/*per-minute window*/) table2 = stream.aggregate(/*per-hour window*/) table3 = stream.aggregate(/*per-day window*/) while underlying we are only using a single store "state-store-name" for it. Although this feature is out of the scope of this KIP, I'd like to discuss if we can "leave the door open" to make such changes without modifying the public APIs . Guozhang On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman <winkelman.k...@gmail.com> wrote: > I allow defining a single window/sessionwindow one time when you make the > cogroup call from a KGroupedStream. From then on you are using the cogroup > call from with in CogroupedKStream which doesnt accept any additional > windows/sessionwindows. > > Is this what you meant by your question or did I misunderstand? > > On May 23, 2017 9:33 PM, "Guozhang Wang" <wangg...@gmail.com> wrote: > > Another question that came to me is on "window alignment": from the KIP it > seems you are allowing users to specify a (potentially different) window > spec in each co-grouped input stream. So if these window specs are > different how should we "align" them with different input streams? I think > it is more natural to only specify on window spec in the > > KTable<RK, V> CogroupedKStream#aggregate(Windows); > > > And remove it from the cogroup() functions. WDYT? > > > Guozhang > > On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Thanks for the proposal Kyle, this is a quite common use case to support > > such multi-way table join (i.e. N source tables with N aggregate func) > with > > a single store and N+1 serdes, I have seen lots of people using the > > low-level PAPI to achieve this goal. > > > > > > On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman < > winkelman.k...@gmail.com > > > wrote: > > > >> I like your point about not handling other cases such as count and > reduce. > >> > >> I think that reduce may not make sense because reduce assumes that the > >> input values are the same as the output values. With cogroup there may > be > >> multiple different input types and then your output type cant be > multiple > >> different things. In the case where you have all matching value types > you > >> can do KStreamBuilder#merge followed by the reduce. > >> > >> As for count I think it is possible to call count on all the individual > >> grouped streams and then do joins. Otherwise we could maybe make a > special > >> call in groupedstream for this case. Because in this case we dont need > to > >> do type checking on the values. It could be similar to the current count > >> methods but accept a var args of additonal grouped streams as well and > >> make > >> sure they have a key type of K. > >> > >> The way I have put the kip together is to ensure that we do type > checking. > >> I don't see a way we could group them all first and then make a call to > >> count, reduce, or aggregate because with aggregate they would need to > pass > >> a list of aggregators and we would have no way of type checking that > they > >> match the grouped streams. > >> > >> Thanks, > >> Kyle > >> > >> On May 19, 2017 11:42 AM, "Xavier Léauté" <xav...@confluent.io> wrote: > >> > >> > Sorry to jump on this thread so late. I agree this is a very useful > >> > addition and wanted to provide an additional use-case and some more > >> > comments. > >> > > >> > This is actually a very common analytics use-case in the ad-tech > >> industry. > >> > The typical setup will have an auction stream, an impression stream, > >> and a > >> > click stream. Those three streams need to be combined to compute > >> aggregate > >> > statistics (e.g. impression statistics, and click-through rates), > since > >> > most of the attributes of interest are only present the auction > stream. > >> > > >> > A simple way to do this is to co-group all the streams by the auction > >> key, > >> > and process updates to the co-group as events for each stream come in, > >> > keeping only one value from each stream before sending downstream for > >> > further processing / aggregation. > >> > > >> > One could view the result of that co-group operation as a "KTable" > with > >> > multiple values per key. The key being the grouping key, and the > values > >> > consisting of one value per stream. > >> > > >> > What I like about Kyle's approach is that allows elegant co-grouping > of > >> > multiple streams without having to worry about the number of streams, > >> and > >> > avoids dealing with Tuple types or other generic interfaces that could > >> get > >> > messy if we wanted to preserve all the value types in the resulting > >> > co-grouped stream. > >> > > >> > My only concern is that we only allow the cogroup + aggregate combined > >> > operation. This forces the user to build their own tuple serialization > >> > format if they want to preserve the individual input stream values as > a > >> > group. It also deviates quite a bit from our approach in > KGroupedStream > >> > which offers other operations, such as count and reduce, which should > >> also > >> > be applicable to a co-grouped stream. > >> > > >> > Overall I still think this is a really useful addition, but I feel we > >> > haven't spend much time trying to explore alternative DSLs that could > >> maybe > >> > generalize better or match our existing syntax more closely. > >> > > >> > On Tue, May 9, 2017 at 8:08 AM Kyle Winkelman < > winkelman.k...@gmail.com > >> > > >> > wrote: > >> > > >> > > Eno, is there anyone else that is an expert in the kafka streams > realm > >> > that > >> > > I should reach out to for input? > >> > > > >> > > I believe Damian Guy is still planning on reviewing this more in > depth > >> > so I > >> > > will wait for his inputs before continuing. > >> > > > >> > > On May 9, 2017 7:30 AM, "Eno Thereska" <eno.there...@gmail.com> > >> wrote: > >> > > > >> > > > Thanks Kyle, good arguments. > >> > > > > >> > > > Eno > >> > > > > >> > > > > On May 7, 2017, at 5:06 PM, Kyle Winkelman < > >> winkelman.k...@gmail.com > >> > > > >> > > > wrote: > >> > > > > > >> > > > > *- minor: could you add an exact example (similar to what Jay’s > >> > example > >> > > > is, > >> > > > > or like your Spark/Pig pointers had) to make this super > concrete?* > >> > > > > I have added a more concrete example to the KIP. > >> > > > > > >> > > > > *- my main concern is that we’re exposing this optimization to > the > >> > DSL. > >> > > > In > >> > > > > an ideal world, an optimizer would take the existing DSL and do > >> the > >> > > right > >> > > > > thing under the covers (create just one state store, arrange the > >> > nodes > >> > > > > etc). The original DSL had a bunch of small, composable pieces > >> > (group, > >> > > > > aggregate, join) that this proposal groups together. I’d like to > >> hear > >> > > > your > >> > > > > thoughts on whether it’s possible to do this optimization with > the > >> > > > current > >> > > > > DSL, at the topology builder level.* > >> > > > > You would have to make a lot of checks to understand if it is > even > >> > > > possible > >> > > > > to make this optimization: > >> > > > > 1. Make sure they are all KTableKTableOuterJoins > >> > > > > 2. None of the intermediate KTables are used for anything else. > >> > > > > 3. None of the intermediate stores are used. (This may be > >> impossible > >> > > > > especially if they use KafkaStreams#store after the topology has > >> > > already > >> > > > > been built.) > >> > > > > You would then need to make decisions during the optimization: > >> > > > > 1. Your new initializer would the composite of all the > individual > >> > > > > initializers and the valueJoiners. > >> > > > > 2. I am having a hard time thinking about how you would turn the > >> > > > > aggregators and valueJoiners into an aggregator that would work > on > >> > the > >> > > > > final object, but this may be possible. > >> > > > > 3. Which state store would you use? The ones declared would be > for > >> > the > >> > > > > aggregate values. None of the declared ones would be guaranteed > to > >> > hold > >> > > > the > >> > > > > final object. This would mean you must created a new state store > >> and > >> > > not > >> > > > > created any of the declared ones. > >> > > > > > >> > > > > The main argument I have against it is even if it could be done > I > >> > don't > >> > > > > know that we would want to have this be an optimization in the > >> > > background > >> > > > > because the user would still be required to think about all of > the > >> > > > > intermediate values that they shouldn't need to worry about if > >> they > >> > > only > >> > > > > care about the final object. > >> > > > > > >> > > > > In my opinion cogroup is a common enough case that it should be > >> part > >> > of > >> > > > the > >> > > > > composable pieces (group, aggregate, join) because we want to > >> allow > >> > > > people > >> > > > > to join more than 2 or more streams in an easy way. Right now I > >> don't > >> > > > think > >> > > > > we give them ways of handling this use case easily. > >> > > > > > >> > > > > *-I think there will be scope for several such optimizations in > >> the > >> > > > future > >> > > > > and perhaps at some point we need to think about decoupling the > >> 1:1 > >> > > > mapping > >> > > > > from the DSL into the physical topology.* > >> > > > > I would argue that cogroup is not just an optimization it is a > new > >> > way > >> > > > for > >> > > > > the users to look at accomplishing a problem that requires > >> multiple > >> > > > > streams. I may sound like a broken record but I don't think > users > >> > > should > >> > > > > have to build the N-1 intermediate tables and deal with their > >> > > > initializers, > >> > > > > serdes and stores if all they care about is the final object. > >> > > > > Now if for example someone uses cogroup but doesn't supply > >> additional > >> > > > > streams and aggregators this case is equivalent to a single > >> grouped > >> > > > stream > >> > > > > making an aggregate call. This case is what I view an > optimization > >> > as, > >> > > we > >> > > > > could remove the KStreamCogroup and act as if there was just a > >> call > >> > to > >> > > > > KGroupedStream#aggregate instead of calling > >> KGroupedStream#cogroup. > >> > (I > >> > > > > would prefer to just write a warning saying that this is not how > >> > > cogroup > >> > > > is > >> > > > > to be used.) > >> > > > > > >> > > > > Thanks, > >> > > > > Kyle > >> > > > > > >> > > > > On Sun, May 7, 2017 at 5:41 AM, Eno Thereska < > >> eno.there...@gmail.com > >> > > > >> > > > wrote: > >> > > > > > >> > > > >> Hi Kyle, > >> > > > >> > >> > > > >> Thanks for the KIP again. A couple of comments: > >> > > > >> > >> > > > >> - minor: could you add an exact example (similar to what Jay’s > >> > example > >> > > > is, > >> > > > >> or like your Spark/Pig pointers had) to make this super > concrete? > >> > > > >> > >> > > > >> - my main concern is that we’re exposing this optimization to > the > >> > DSL. > >> > > > In > >> > > > >> an ideal world, an optimizer would take the existing DSL and do > >> the > >> > > > right > >> > > > >> thing under the covers (create just one state store, arrange > the > >> > nodes > >> > > > >> etc). The original DSL had a bunch of small, composable pieces > >> > (group, > >> > > > >> aggregate, join) that this proposal groups together. I’d like > to > >> > hear > >> > > > your > >> > > > >> thoughts on whether it’s possible to do this optimization with > >> the > >> > > > current > >> > > > >> DSL, at the topology builder level. > >> > > > >> > >> > > > >> I think there will be scope for several such optimizations in > the > >> > > future > >> > > > >> and perhaps at some point we need to think about decoupling the > >> 1:1 > >> > > > mapping > >> > > > >> from the DSL into the physical topology. > >> > > > >> > >> > > > >> Thanks > >> > > > >> Eno > >> > > > >> > >> > > > >>> On May 5, 2017, at 4:39 PM, Jay Kreps <j...@confluent.io> > wrote: > >> > > > >>> > >> > > > >>> I haven't digested the proposal but the use case is pretty > >> common. > >> > An > >> > > > >>> example would be the "customer 360" or "unified customer > >> profile" > >> > use > >> > > > >> case > >> > > > >>> we often use. In that use case you have a dozen systems each > of > >> > which > >> > > > has > >> > > > >>> some information about your customer (account details, > settings, > >> > > > billing > >> > > > >>> info, customer service contacts, purchase history, etc). Your > >> goal > >> > is > >> > > > to > >> > > > >>> join/munge these into a single profile record for each > customer > >> > that > >> > > > has > >> > > > >>> all the relevant info in a usable form and is up-to-date with > >> all > >> > the > >> > > > >>> source systems. If you implement that with kstreams as a > >> sequence > >> > of > >> > > > >> joins > >> > > > >>> i think today we'd fully materialize N-1 intermediate tables. > >> But > >> > > > clearly > >> > > > >>> you only need a single stage to group all these things that > are > >> > > already > >> > > > >>> co-partitioned. A distributed database would do this under the > >> > covers > >> > > > >> which > >> > > > >>> is arguably better (at least when it does the right thing) and > >> > > perhaps > >> > > > we > >> > > > >>> could do the same thing but I'm not sure we know the > >> partitioning > >> > so > >> > > we > >> > > > >> may > >> > > > >>> need an explicit cogroup command that impllies they are > already > >> > > > >>> co-partitioned. > >> > > > >>> > >> > > > >>> -Jay > >> > > > >>> > >> > > > >>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman < > >> > > > winkelman.k...@gmail.com > >> > > > >>> > >> > > > >>> wrote: > >> > > > >>> > >> > > > >>>> Yea thats a good way to look at it. > >> > > > >>>> I have seen this type of functionality in a couple other > >> platforms > >> > > > like > >> > > > >>>> spark and pig. > >> > > > >>>> https://spark.apache.org/docs/0.6.2/api/core/spark/ > >> > > > >> PairRDDFunctions.html > >> > > > >>>> https://www.tutorialspoint.com/apache_pig/apache_pig_ > >> > > > >> cogroup_operator.htm > >> > > > >>>> > >> > > > >>>> > >> > > > >>>> On May 5, 2017 7:43 AM, "Damian Guy" <damian....@gmail.com> > >> > wrote: > >> > > > >>>> > >> > > > >>>>> Hi Kyle, > >> > > > >>>>> > >> > > > >>>>> If i'm reading this correctly it is like an N way outer > join? > >> So > >> > an > >> > > > >> input > >> > > > >>>>> on any stream will always produce a new aggregated value - > is > >> > that > >> > > > >>>> correct? > >> > > > >>>>> Effectively, each Aggregator just looks up the current > value, > >> > > > >> aggregates > >> > > > >>>>> and forwards the result. > >> > > > >>>>> I need to look into it and think about it a bit more, but it > >> > seems > >> > > > like > >> > > > >>>> it > >> > > > >>>>> could be a useful optimization. > >> > > > >>>>> > >> > > > >>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman < > >> > > winkelman.k...@gmail.com > >> > > > > > >> > > > >>>>> wrote: > >> > > > >>>>> > >> > > > >>>>>> I sure can. I have added the following description to my > >> KIP. If > >> > > > this > >> > > > >>>>>> doesn't help let me know and I will take some more time to > >> > build a > >> > > > >>>>> diagram > >> > > > >>>>>> and make more of a step by step description: > >> > > > >>>>>> > >> > > > >>>>>> Example with Current API: > >> > > > >>>>>> > >> > > > >>>>>> KTable<K, V1> table1 = > >> > > > >>>>>> builder.stream("topic1").groupByKey().aggregate( > initializer1 > >> , > >> > > > >>>>> aggregator1, > >> > > > >>>>>> aggValueSerde1, storeName1); > >> > > > >>>>>> KTable<K, V2> table2 = > >> > > > >>>>>> builder.stream("topic2").groupByKey().aggregate( > initializer2 > >> , > >> > > > >>>>> aggregator2, > >> > > > >>>>>> aggValueSerde2, storeName2); > >> > > > >>>>>> KTable<K, V3> table3 = > >> > > > >>>>>> builder.stream("topic3").groupByKey().aggregate( > initializer3 > >> , > >> > > > >>>>> aggregator3, > >> > > > >>>>>> aggValueSerde3, storeName3); > >> > > > >>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2, > >> > > > >>>>>> joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree); > >> > > > >>>>>> > >> > > > >>>>>> As you can see this creates 3 StateStores, requires 3 > >> > > initializers, > >> > > > >>>> and 3 > >> > > > >>>>>> aggValueSerdes. This also adds the pressure to user to > define > >> > what > >> > > > the > >> > > > >>>>>> intermediate values are going to be (V1, V2, V3). They are > >> left > >> > > > with a > >> > > > >>>>>> couple choices, first to make V1, V2, and V3 all the same > as > >> CG > >> > > and > >> > > > >> the > >> > > > >>>>> two > >> > > > >>>>>> joiners are more like mergers, or second make them > >> intermediate > >> > > > states > >> > > > >>>>> such > >> > > > >>>>>> as Topic1Map, Topic2Map, and Topic3Map and the joiners use > >> those > >> > > to > >> > > > >>>> build > >> > > > >>>>>> the final aggregate CG value. This is something the user > >> could > >> > > avoid > >> > > > >>>>>> thinking about with this KIP. > >> > > > >>>>>> > >> > > > >>>>>> When a new input arrives lets say at "topic1" it will first > >> go > >> > > > through > >> > > > >>>> a > >> > > > >>>>>> KStreamAggregate grabbing the current aggregate from > >> storeName1. > >> > > It > >> > > > >>>> will > >> > > > >>>>>> produce this in the form of the first intermediate value > and > >> get > >> > > > sent > >> > > > >>>>>> through a KTableKTableOuterJoin where it will look up the > >> > current > >> > > > >> value > >> > > > >>>>> of > >> > > > >>>>>> the key in storeName2. It will use the first joiner to > >> calculate > >> > > the > >> > > > >>>>> second > >> > > > >>>>>> intermediate value, which will go through an additional > >> > > > >>>>>> KTableKTableOuterJoin. Here it will look up the current > >> value of > >> > > the > >> > > > >>>> key > >> > > > >>>>> in > >> > > > >>>>>> storeName3 and use the second joiner to build the final > >> > aggregate > >> > > > >>>> value. > >> > > > >>>>>> > >> > > > >>>>>> If you think through all possibilities for incoming topics > >> you > >> > > will > >> > > > >> see > >> > > > >>>>>> that no matter which topic it comes in through all three > >> stores > >> > > are > >> > > > >>>>> queried > >> > > > >>>>>> and all of the joiners must get used. > >> > > > >>>>>> > >> > > > >>>>>> Topology wise for N incoming streams this creates N > >> > > > >>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1 > >> > > > >>>>>> KTableKTableJoinMergers. > >> > > > >>>>>> > >> > > > >>>>>> > >> > > > >>>>>> > >> > > > >>>>>> Example with Proposed API: > >> > > > >>>>>> > >> > > > >>>>>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1"). > >> > > > >>>> groupByKey(); > >> > > > >>>>>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2"). > >> > > > >>>> groupByKey(); > >> > > > >>>>>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3"). > >> > > > >>>> groupByKey(); > >> > > > >>>>>> KTable<K, CG> cogrouped = grouped1.cogroup(initializer1, > >> > > > aggregator1, > >> > > > >>>>>> aggValueSerde1, storeName1) > >> > > > >>>>>> .cogroup(grouped2, aggregator2) > >> > > > >>>>>> .cogroup(grouped3, aggregator3) > >> > > > >>>>>> .aggregate(); > >> > > > >>>>>> > >> > > > >>>>>> As you can see this creates 1 StateStore, requires 1 > >> > initializer, > >> > > > and > >> > > > >> 1 > >> > > > >>>>>> aggValueSerde. The user no longer has to worry about the > >> > > > intermediate > >> > > > >>>>>> values and the joiners. All they have to think about is how > >> each > >> > > > >> stream > >> > > > >>>>>> impacts the creation of the final CG object. > >> > > > >>>>>> > >> > > > >>>>>> When a new input arrives lets say at "topic1" it will first > >> go > >> > > > through > >> > > > >>>> a > >> > > > >>>>>> KStreamAggreagte and grab the current aggregate from > >> storeName1. > >> > > It > >> > > > >>>> will > >> > > > >>>>>> add its incoming object to the aggregate, update the store > >> and > >> > > pass > >> > > > >> the > >> > > > >>>>> new > >> > > > >>>>>> aggregate on. This new aggregate goes through the > >> KStreamCogroup > >> > > > which > >> > > > >>>> is > >> > > > >>>>>> pretty much just a pass through processor and you are done. > >> > > > >>>>>> > >> > > > >>>>>> Topology wise for N incoming streams the new api will only > >> every > >> > > > >>>> create N > >> > > > >>>>>> KStreamAggregates and 1 KStreamCogroup. > >> > > > >>>>>> > >> > > > >>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax < > >> > > > >> matth...@confluent.io > >> > > > >>>>> > >> > > > >>>>>> wrote: > >> > > > >>>>>> > >> > > > >>>>>>> Kyle, > >> > > > >>>>>>> > >> > > > >>>>>>> thanks a lot for the KIP. Maybe I am a little slow, but I > >> could > >> > > not > >> > > > >>>>>>> follow completely. Could you maybe add a more concrete > >> example, > >> > > > like > >> > > > >>>> 3 > >> > > > >>>>>>> streams with 3 records each (plus expected result), and > show > >> > the > >> > > > >>>>>>> difference between current way to to implement it and the > >> > > proposed > >> > > > >>>> API? > >> > > > >>>>>>> This could also cover the internal processing to see what > >> store > >> > > > calls > >> > > > >>>>>>> would be required for both approaches etc. > >> > > > >>>>>>> > >> > > > >>>>>>> I think, it's pretty advanced stuff you propose, and it > >> would > >> > > help > >> > > > to > >> > > > >>>>>>> understand it better. > >> > > > >>>>>>> > >> > > > >>>>>>> Thanks a lot! > >> > > > >>>>>>> > >> > > > >>>>>>> > >> > > > >>>>>>> -Matthias > >> > > > >>>>>>> > >> > > > >>>>>>> > >> > > > >>>>>>> > >> > > > >>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote: > >> > > > >>>>>>>> I have made a pull request. It can be found here. > >> > > > >>>>>>>> > >> > > > >>>>>>>> https://github.com/apache/kafka/pull/2975 > >> > > > >>>>>>>> > >> > > > >>>>>>>> I plan to write some more unit tests for my classes and > get > >> > > around > >> > > > >>>> to > >> > > > >>>>>>>> writing documentation for the public api additions. > >> > > > >>>>>>>> > >> > > > >>>>>>>> One thing I was curious about is during the > >> > > > >>>>>>> KCogroupedStreamImpl#aggregate > >> > > > >>>>>>>> method I pass null to the KGroupedStream# > >> > repartitionIfRequired > >> > > > >>>>> method. > >> > > > >>>>>> I > >> > > > >>>>>>>> can't supply the store name because if more than one > >> grouped > >> > > > stream > >> > > > >>>>>>>> repartitions an error is thrown. Is there some name that > >> > someone > >> > > > >>>> can > >> > > > >>>>>>>> recommend or should I leave the null and allow it to fall > >> back > >> > > to > >> > > > >>>> the > >> > > > >>>>>>>> KGroupedStream.name? > >> > > > >>>>>>>> > >> > > > >>>>>>>> Should this be expanded to handle grouped tables? This > >> would > >> > be > >> > > > >>>>> pretty > >> > > > >>>>>>> easy > >> > > > >>>>>>>> for a normal aggregate but one allowing session stores > and > >> > > > windowed > >> > > > >>>>>>> stores > >> > > > >>>>>>>> would required KTableSessionWindowAggregate and > >> > > > >>>> KTableWindowAggregate > >> > > > >>>>>>>> implementations. > >> > > > >>>>>>>> > >> > > > >>>>>>>> Thanks, > >> > > > >>>>>>>> Kyle > >> > > > >>>>>>>> > >> > > > >>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" < > >> > eno.there...@gmail.com> > >> > > > >>>>> wrote: > >> > > > >>>>>>>> > >> > > > >>>>>>>>> I’ll look as well asap, sorry, been swamped. > >> > > > >>>>>>>>> > >> > > > >>>>>>>>> Eno > >> > > > >>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy < > >> > damian....@gmail.com> > >> > > > >>>>> wrote: > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>>> Hi Kyle, > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>>> Thanks for the KIP. I apologize that i haven't had the > >> > chance > >> > > to > >> > > > >>>>> look > >> > > > >>>>>>> at > >> > > > >>>>>>>>>> the KIP yet, but will schedule some time to look into > it > >> > > > >>>> tomorrow. > >> > > > >>>>>> For > >> > > > >>>>>>>>> the > >> > > > >>>>>>>>>> implementation, can you raise a PR against kafka trunk > >> and > >> > > mark > >> > > > >>>> it > >> > > > >>>>> as > >> > > > >>>>>>>>> WIP? > >> > > > >>>>>>>>>> It will be easier to review what you have done. > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>>> Thanks, > >> > > > >>>>>>>>>> Damian > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman < > >> > > > >>>>> winkelman.k...@gmail.com > >> > > > >>>>>>> > >> > > > >>>>>>>>> wrote: > >> > > > >>>>>>>>>> > >> > > > >>>>>>>>>>> I am replying to this in hopes it will draw some > >> attention > >> > to > >> > > > my > >> > > > >>>>> KIP > >> > > > >>>>>>> as > >> > > > >>>>>>>>> I > >> > > > >>>>>>>>>>> haven't heard from anyone in a couple days. This is my > >> > first > >> > > > KIP > >> > > > >>>>> and > >> > > > >>>>>>> my > >> > > > >>>>>>>>>>> first large contribution to the project so I'm sure I > >> did > >> > > > >>>>> something > >> > > > >>>>>>>>> wrong. > >> > > > >>>>>>>>>>> ;) > >> > > > >>>>>>>>>>> > >> > > > >>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" < > >> > > > >>>>> winkelman.k...@gmail.com> > >> > > > >>>>>>>>> wrote: > >> > > > >>>>>>>>>>> > >> > > > >>>>>>>>>>>> Hello all, > >> > > > >>>>>>>>>>>> > >> > > > >>>>>>>>>>>> I have created KIP-150 to facilitate discussion about > >> > adding > >> > > > >>>>>> cogroup > >> > > > >>>>>>> to > >> > > > >>>>>>>>>>>> the streams DSL. > >> > > > >>>>>>>>>>>> > >> > > > >>>>>>>>>>>> Please find the KIP here: > >> > > > >>>>>>>>>>>> https://cwiki.apache.org/ > confluence/display/KAFKA/KIP- > >> > > > >>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup > >> > > > >>>>>>>>>>>> > >> > > > >>>>>>>>>>>> Please find my initial implementation here: > >> > > > >>>>>>>>>>>> https://github.com/KyleWinkelman/kafka > >> > > > >>>>>>>>>>>> > >> > > > >>>>>>>>>>>> Thanks, > >> > > > >>>>>>>>>>>> Kyle Winkelman > >> > > > >>>>>>>>>>>> > >> > > > >>>>>>>>>>> > >> > > > >>>>>>>>> > >> > > > >>>>>>>>> > >> > > > >>>>>>>> > >> > > > >>>>>>> > >> > > > >>>>>>> > >> > > > >>>>>> > >> > > > >>>>> > >> > > > >>>> > >> > > > >> > >> > > > >> > >> > > > > >> > > > > >> > > > >> > > >> > > > > > > > > -- > > -- Guozhang > > > > > > -- > -- Guozhang > -- -- Guozhang