On a second thought... This is the current proposal API
``` <T> CogroupedKStream<K, T> cogroup(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Serde<T> aggValueSerde) ``` If we do not have the initializer in the first co-group it might be a bit awkward for users to specify the aggregator that returns a typed <T> value? Maybe it is still better to put these two functions in the same api? Guozhang On Thu, Jun 8, 2017 at 11:08 AM, Guozhang Wang <wangg...@gmail.com> wrote: > This suggestion lgtm. I would vote for the first alternative than adding > it to the `KStreamBuilder` though. > > On Thu, Jun 8, 2017 at 10:58 AM, Xavier Léauté <xav...@confluent.io> > wrote: > >> I have a minor suggestion to make the API a little bit more symmetric. >> I feel it would make more sense to move the initializer and serde to the >> final aggregate statement, since the serde only applies to the state >> store, >> and the initializer doesn't bear any relation to the first group in >> particular. It would end up looking like this: >> >> KTable<K, CG> cogrouped = >> grouped1.cogroup(aggregator1) >> .cogroup(grouped2, aggregator2) >> .cogroup(grouped3, aggregator3) >> .aggregate(initializer1, aggValueSerde, storeName1); >> >> Alternatively, we could move the the first cogroup() method to >> KStreamBuilder, similar to how we have .merge() >> and end up with an api that would be even more symmetric. >> >> KStreamBuilder.cogroup(grouped1, aggregator1) >> .cogroup(grouped2, aggregator2) >> .cogroup(grouped3, aggregator3) >> .aggregate(initializer1, aggValueSerde, storeName1); >> >> This doesn't have to be a blocker, but I thought it would make the API >> just >> a tad cleaner. >> >> On Tue, Jun 6, 2017 at 3:59 PM Guozhang Wang <wangg...@gmail.com> wrote: >> >> > Kyle, >> > >> > Thanks a lot for the updated KIP. It looks good to me. >> > >> > >> > Guozhang >> > >> > >> > On Fri, Jun 2, 2017 at 5:37 AM, Jim Jagielski <j...@jagunet.com> wrote: >> > >> > > This makes much more sense to me. +1 >> > > >> > > > On Jun 1, 2017, at 10:33 AM, Kyle Winkelman < >> winkelman.k...@gmail.com> >> > > wrote: >> > > > >> > > > I have updated the KIP and my PR. Let me know what you think. >> > > > To created a cogrouped stream just call cogroup on a KgroupedStream >> and >> > > > supply the initializer, aggValueSerde, and an aggregator. Then >> continue >> > > > adding kgroupedstreams and aggregators. Then call one of the many >> > > aggregate >> > > > calls to create a KTable. >> > > > >> > > > Thanks, >> > > > Kyle >> > > > >> > > > On Jun 1, 2017 4:03 AM, "Damian Guy" <damian....@gmail.com> wrote: >> > > > >> > > >> Hi Kyle, >> > > >> >> > > >> Thanks for the update. I think just one initializer makes sense as >> it >> > > >> should only be called once per key and generally it is just going >> to >> > > create >> > > >> a new instance of whatever the Aggregate class is. >> > > >> >> > > >> Cheers, >> > > >> Damian >> > > >> >> > > >> On Wed, 31 May 2017 at 20:09 Kyle Winkelman < >> winkelman.k...@gmail.com >> > > >> > > >> wrote: >> > > >> >> > > >>> Hello all, >> > > >>> >> > > >>> I have spent some more time on this and the best alternative I >> have >> > > come >> > > >> up >> > > >>> with is: >> > > >>> KGroupedStream has a single cogroup call that takes an initializer >> > and >> > > an >> > > >>> aggregator. >> > > >>> CogroupedKStream has a cogroup call that takes additional >> > groupedStream >> > > >>> aggregator pairs. >> > > >>> CogroupedKStream has multiple aggregate methods that create the >> > > different >> > > >>> stores. >> > > >>> >> > > >>> I plan on updating the kip but I want people's input on if we >> should >> > > have >> > > >>> the initializer be passed in once at the beginning or if we should >> > > >> instead >> > > >>> have the initializer be required for each call to one of the >> > aggregate >> > > >>> calls. The first makes more sense to me but doesnt allow the user >> to >> > > >>> specify different initializers for different tables. >> > > >>> >> > > >>> Thanks, >> > > >>> Kyle >> > > >>> >> > > >>> On May 24, 2017 7:46 PM, "Kyle Winkelman" < >> winkelman.k...@gmail.com> >> > > >>> wrote: >> > > >>> >> > > >>>> Yea I really like that idea I'll see what I can do to update the >> kip >> > > >> and >> > > >>>> my pr when I have some time. I'm not sure how well creating the >> > > >>>> kstreamaggregates will go though because at that point I will >> have >> > > >> thrown >> > > >>>> away the type of the values. It will be type safe I just may >> need to >> > > >> do a >> > > >>>> little forcing. >> > > >>>> >> > > >>>> Thanks, >> > > >>>> Kyle >> > > >>>> >> > > >>>> On May 24, 2017 3:28 PM, "Guozhang Wang" <wangg...@gmail.com> >> > wrote: >> > > >>>> >> > > >>>>> Kyle, >> > > >>>>> >> > > >>>>> Thanks for the explanations, my previous read on the wiki >> examples >> > > was >> > > >>>>> wrong. >> > > >>>>> >> > > >>>>> So I guess my motivation should be "reduced" to: can we move the >> > > >> window >> > > >>>>> specs param from "KGroupedStream#cogroup(..)" to >> > > >>>>> "CogroupedKStream#aggregate(..)", and my motivations are: >> > > >>>>> >> > > >>>>> 1. minor: we can reduce the #.generics in CogroupedKStream from >> 3 >> > to >> > > >> 2. >> > > >>>>> 2. major: this is for extensibility of the APIs, and since we >> are >> > > >>> removing >> > > >>>>> the "Evolving" annotations on Streams it may be harder to >> change it >> > > >>> again >> > > >>>>> in the future. The extended use cases are that people wanted to >> > have >> > > >>>>> windowed running aggregates on different granularities, e.g. >> "give >> > me >> > > >>> the >> > > >>>>> counts per-minute, per-hour, per-day and per-week", and today in >> > DSL >> > > >> we >> > > >>>>> need to specify that case in multiple aggregate operators, which >> > gets >> > > >> a >> > > >>>>> state store / changelog, etc. And it is possible to optimize it >> as >> > > >> well >> > > >>> to >> > > >>>>> a single state store. Its implementation would be tricky as you >> > need >> > > >> to >> > > >>>>> contain different lengthed windows within your window store but >> > just >> > > >>> from >> > > >>>>> the public API point of view, it could be specified as: >> > > >>>>> >> > > >>>>> CogroupedKStream stream = stream1.cogroup(stream2, ... >> > > >>>>> "state-store-name"); >> > > >>>>> >> > > >>>>> table1 = stream.aggregate(/*per-minute window*/) >> > > >>>>> table2 = stream.aggregate(/*per-hour window*/) >> > > >>>>> table3 = stream.aggregate(/*per-day window*/) >> > > >>>>> >> > > >>>>> while underlying we are only using a single store >> > "state-store-name" >> > > >> for >> > > >>>>> it. >> > > >>>>> >> > > >>>>> >> > > >>>>> Although this feature is out of the scope of this KIP, I'd like >> to >> > > >>> discuss >> > > >>>>> if we can "leave the door open" to make such changes without >> > > modifying >> > > >>> the >> > > >>>>> public APIs . >> > > >>>>> >> > > >>>>> Guozhang >> > > >>>>> >> > > >>>>> >> > > >>>>> On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman < >> > > >>> winkelman.k...@gmail.com >> > > >>>>>> >> > > >>>>> wrote: >> > > >>>>> >> > > >>>>>> I allow defining a single window/sessionwindow one time when >> you >> > > >> make >> > > >>>>> the >> > > >>>>>> cogroup call from a KGroupedStream. From then on you are using >> the >> > > >>>>> cogroup >> > > >>>>>> call from with in CogroupedKStream which doesnt accept any >> > > >> additional >> > > >>>>>> windows/sessionwindows. >> > > >>>>>> >> > > >>>>>> Is this what you meant by your question or did I misunderstand? >> > > >>>>>> >> > > >>>>>> On May 23, 2017 9:33 PM, "Guozhang Wang" <wangg...@gmail.com> >> > > >> wrote: >> > > >>>>>> >> > > >>>>>> Another question that came to me is on "window alignment": from >> > the >> > > >>> KIP >> > > >>>>> it >> > > >>>>>> seems you are allowing users to specify a (potentially >> different) >> > > >>> window >> > > >>>>>> spec in each co-grouped input stream. So if these window specs >> are >> > > >>>>>> different how should we "align" them with different input >> > streams? I >> > > >>>>> think >> > > >>>>>> it is more natural to only specify on window spec in the >> > > >>>>>> >> > > >>>>>> KTable<RK, V> CogroupedKStream#aggregate(Windows); >> > > >>>>>> >> > > >>>>>> >> > > >>>>>> And remove it from the cogroup() functions. WDYT? >> > > >>>>>> >> > > >>>>>> >> > > >>>>>> Guozhang >> > > >>>>>> >> > > >>>>>> On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang < >> > wangg...@gmail.com> >> > > >>>>> wrote: >> > > >>>>>> >> > > >>>>>>> Thanks for the proposal Kyle, this is a quite common use case >> to >> > > >>>>> support >> > > >>>>>>> such multi-way table join (i.e. N source tables with N >> aggregate >> > > >>> func) >> > > >>>>>> with >> > > >>>>>>> a single store and N+1 serdes, I have seen lots of people >> using >> > > >> the >> > > >>>>>>> low-level PAPI to achieve this goal. >> > > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman < >> > > >>>>>> winkelman.k...@gmail.com >> > > >>>>>>>> wrote: >> > > >>>>>>> >> > > >>>>>>>> I like your point about not handling other cases such as >> count >> > > >> and >> > > >>>>>> reduce. >> > > >>>>>>>> >> > > >>>>>>>> I think that reduce may not make sense because reduce assumes >> > > >> that >> > > >>>>> the >> > > >>>>>>>> input values are the same as the output values. With cogroup >> > > >> there >> > > >>>>> may >> > > >>>>>> be >> > > >>>>>>>> multiple different input types and then your output type >> cant be >> > > >>>>>> multiple >> > > >>>>>>>> different things. In the case where you have all matching >> value >> > > >>> types >> > > >>>>>> you >> > > >>>>>>>> can do KStreamBuilder#merge followed by the reduce. >> > > >>>>>>>> >> > > >>>>>>>> As for count I think it is possible to call count on all the >> > > >>>>> individual >> > > >>>>>>>> grouped streams and then do joins. Otherwise we could maybe >> make >> > > >> a >> > > >>>>>> special >> > > >>>>>>>> call in groupedstream for this case. Because in this case we >> > dont >> > > >>>>> need >> > > >>>>>> to >> > > >>>>>>>> do type checking on the values. It could be similar to the >> > > >> current >> > > >>>>> count >> > > >>>>>>>> methods but accept a var args of additonal grouped streams as >> > > >> well >> > > >>>>> and >> > > >>>>>>>> make >> > > >>>>>>>> sure they have a key type of K. >> > > >>>>>>>> >> > > >>>>>>>> The way I have put the kip together is to ensure that we do >> type >> > > >>>>>> checking. >> > > >>>>>>>> I don't see a way we could group them all first and then >> make a >> > > >>> call >> > > >>>>> to >> > > >>>>>>>> count, reduce, or aggregate because with aggregate they would >> > > >> need >> > > >>> to >> > > >>>>>> pass >> > > >>>>>>>> a list of aggregators and we would have no way of type >> checking >> > > >>> that >> > > >>>>>> they >> > > >>>>>>>> match the grouped streams. >> > > >>>>>>>> >> > > >>>>>>>> Thanks, >> > > >>>>>>>> Kyle >> > > >>>>>>>> >> > > >>>>>>>> On May 19, 2017 11:42 AM, "Xavier Léauté" < >> xav...@confluent.io> >> > > >>>>> wrote: >> > > >>>>>>>> >> > > >>>>>>>>> Sorry to jump on this thread so late. I agree this is a very >> > > >>> useful >> > > >>>>>>>>> addition and wanted to provide an additional use-case and >> some >> > > >>> more >> > > >>>>>>>>> comments. >> > > >>>>>>>>> >> > > >>>>>>>>> This is actually a very common analytics use-case in the >> > > >> ad-tech >> > > >>>>>>>> industry. >> > > >>>>>>>>> The typical setup will have an auction stream, an impression >> > > >>>>> stream, >> > > >>>>>>>> and a >> > > >>>>>>>>> click stream. Those three streams need to be combined to >> > > >> compute >> > > >>>>>>>> aggregate >> > > >>>>>>>>> statistics (e.g. impression statistics, and click-through >> > > >> rates), >> > > >>>>>> since >> > > >>>>>>>>> most of the attributes of interest are only present the >> auction >> > > >>>>>> stream. >> > > >>>>>>>>> >> > > >>>>>>>>> A simple way to do this is to co-group all the streams by >> the >> > > >>>>> auction >> > > >>>>>>>> key, >> > > >>>>>>>>> and process updates to the co-group as events for each >> stream >> > > >>> come >> > > >>>>> in, >> > > >>>>>>>>> keeping only one value from each stream before sending >> > > >> downstream >> > > >>>>> for >> > > >>>>>>>>> further processing / aggregation. >> > > >>>>>>>>> >> > > >>>>>>>>> One could view the result of that co-group operation as a >> > > >>> "KTable" >> > > >>>>>> with >> > > >>>>>>>>> multiple values per key. The key being the grouping key, and >> > > >> the >> > > >>>>>> values >> > > >>>>>>>>> consisting of one value per stream. >> > > >>>>>>>>> >> > > >>>>>>>>> What I like about Kyle's approach is that allows elegant >> > > >>>>> co-grouping >> > > >>>>>> of >> > > >>>>>>>>> multiple streams without having to worry about the number of >> > > >>>>> streams, >> > > >>>>>>>> and >> > > >>>>>>>>> avoids dealing with Tuple types or other generic interfaces >> > > >> that >> > > >>>>> could >> > > >>>>>>>> get >> > > >>>>>>>>> messy if we wanted to preserve all the value types in the >> > > >>> resulting >> > > >>>>>>>>> co-grouped stream. >> > > >>>>>>>>> >> > > >>>>>>>>> My only concern is that we only allow the cogroup + >> aggregate >> > > >>>>> combined >> > > >>>>>>>>> operation. This forces the user to build their own tuple >> > > >>>>> serialization >> > > >>>>>>>>> format if they want to preserve the individual input stream >> > > >>> values >> > > >>>>> as >> > > >>>>>> a >> > > >>>>>>>>> group. It also deviates quite a bit from our approach in >> > > >>>>>> KGroupedStream >> > > >>>>>>>>> which offers other operations, such as count and reduce, >> which >> > > >>>>> should >> > > >>>>>>>> also >> > > >>>>>>>>> be applicable to a co-grouped stream. >> > > >>>>>>>>> >> > > >>>>>>>>> Overall I still think this is a really useful addition, but >> I >> > > >>> feel >> > > >>>>> we >> > > >>>>>>>>> haven't spend much time trying to explore alternative DSLs >> that >> > > >>>>> could >> > > >>>>>>>> maybe >> > > >>>>>>>>> generalize better or match our existing syntax more closely. >> > > >>>>>>>>> >> > > >>>>>>>>> On Tue, May 9, 2017 at 8:08 AM Kyle Winkelman < >> > > >>>>>> winkelman.k...@gmail.com >> > > >>>>>>>>> >> > > >>>>>>>>> wrote: >> > > >>>>>>>>> >> > > >>>>>>>>>> Eno, is there anyone else that is an expert in the kafka >> > > >>> streams >> > > >>>>>> realm >> > > >>>>>>>>> that >> > > >>>>>>>>>> I should reach out to for input? >> > > >>>>>>>>>> >> > > >>>>>>>>>> I believe Damian Guy is still planning on reviewing this >> more >> > > >>> in >> > > >>>>>> depth >> > > >>>>>>>>> so I >> > > >>>>>>>>>> will wait for his inputs before continuing. >> > > >>>>>>>>>> >> > > >>>>>>>>>> On May 9, 2017 7:30 AM, "Eno Thereska" < >> > > >> eno.there...@gmail.com >> > > >>>> >> > > >>>>>>>> wrote: >> > > >>>>>>>>>> >> > > >>>>>>>>>>> Thanks Kyle, good arguments. >> > > >>>>>>>>>>> >> > > >>>>>>>>>>> Eno >> > > >>>>>>>>>>> >> > > >>>>>>>>>>>> On May 7, 2017, at 5:06 PM, Kyle Winkelman < >> > > >>>>>>>> winkelman.k...@gmail.com >> > > >>>>>>>>>> >> > > >>>>>>>>>>> wrote: >> > > >>>>>>>>>>>> >> > > >>>>>>>>>>>> *- minor: could you add an exact example (similar to what >> > > >>>>> Jay’s >> > > >>>>>>>>> example >> > > >>>>>>>>>>> is, >> > > >>>>>>>>>>>> or like your Spark/Pig pointers had) to make this super >> > > >>>>>> concrete?* >> > > >>>>>>>>>>>> I have added a more concrete example to the KIP. >> > > >>>>>>>>>>>> >> > > >>>>>>>>>>>> *- my main concern is that we’re exposing this >> > > >> optimization >> > > >>>>> to >> > > >>>>>> the >> > > >>>>>>>>> DSL. >> > > >>>>>>>>>>> In >> > > >>>>>>>>>>>> an ideal world, an optimizer would take the existing DSL >> > > >>> and >> > > >>>>> do >> > > >>>>>>>> the >> > > >>>>>>>>>> right >> > > >>>>>>>>>>>> thing under the covers (create just one state store, >> > > >>> arrange >> > > >>>>> the >> > > >>>>>>>>> nodes >> > > >>>>>>>>>>>> etc). The original DSL had a bunch of small, composable >> > > >>>>> pieces >> > > >>>>>>>>> (group, >> > > >>>>>>>>>>>> aggregate, join) that this proposal groups together. I’d >> > > >>>>> like to >> > > >>>>>>>> hear >> > > >>>>>>>>>>> your >> > > >>>>>>>>>>>> thoughts on whether it’s possible to do this optimization >> > > >>>>> with >> > > >>>>>> the >> > > >>>>>>>>>>> current >> > > >>>>>>>>>>>> DSL, at the topology builder level.* >> > > >>>>>>>>>>>> You would have to make a lot of checks to understand if >> > > >> it >> > > >>> is >> > > >>>>>> even >> > > >>>>>>>>>>> possible >> > > >>>>>>>>>>>> to make this optimization: >> > > >>>>>>>>>>>> 1. Make sure they are all KTableKTableOuterJoins >> > > >>>>>>>>>>>> 2. None of the intermediate KTables are used for anything >> > > >>>>> else. >> > > >>>>>>>>>>>> 3. None of the intermediate stores are used. (This may be >> > > >>>>>>>> impossible >> > > >>>>>>>>>>>> especially if they use KafkaStreams#store after the >> > > >>> topology >> > > >>>>> has >> > > >>>>>>>>>> already >> > > >>>>>>>>>>>> been built.) >> > > >>>>>>>>>>>> You would then need to make decisions during the >> > > >>>>> optimization: >> > > >>>>>>>>>>>> 1. Your new initializer would the composite of all the >> > > >>>>>> individual >> > > >>>>>>>>>>>> initializers and the valueJoiners. >> > > >>>>>>>>>>>> 2. I am having a hard time thinking about how you would >> > > >>> turn >> > > >>>>> the >> > > >>>>>>>>>>>> aggregators and valueJoiners into an aggregator that >> > > >> would >> > > >>>>> work >> > > >>>>>> on >> > > >>>>>>>>> the >> > > >>>>>>>>>>>> final object, but this may be possible. >> > > >>>>>>>>>>>> 3. Which state store would you use? The ones declared >> > > >> would >> > > >>>>> be >> > > >>>>>> for >> > > >>>>>>>>> the >> > > >>>>>>>>>>>> aggregate values. None of the declared ones would be >> > > >>>>> guaranteed >> > > >>>>>> to >> > > >>>>>>>>> hold >> > > >>>>>>>>>>> the >> > > >>>>>>>>>>>> final object. This would mean you must created a new >> > > >> state >> > > >>>>> store >> > > >>>>>>>> and >> > > >>>>>>>>>> not >> > > >>>>>>>>>>>> created any of the declared ones. >> > > >>>>>>>>>>>> >> > > >>>>>>>>>>>> The main argument I have against it is even if it could >> > > >> be >> > > >>>>> done >> > > >>>>>> I >> > > >>>>>>>>> don't >> > > >>>>>>>>>>>> know that we would want to have this be an optimization >> > > >> in >> > > >>>>> the >> > > >>>>>>>>>> background >> > > >>>>>>>>>>>> because the user would still be required to think about >> > > >> all >> > > >>>>> of >> > > >>>>>> the >> > > >>>>>>>>>>>> intermediate values that they shouldn't need to worry >> > > >> about >> > > >>>>> if >> > > >>>>>>>> they >> > > >>>>>>>>>> only >> > > >>>>>>>>>>>> care about the final object. >> > > >>>>>>>>>>>> >> > > >>>>>>>>>>>> In my opinion cogroup is a common enough case that it >> > > >>> should >> > > >>>>> be >> > > >>>>>>>> part >> > > >>>>>>>>> of >> > > >>>>>>>>>>> the >> > > >>>>>>>>>>>> composable pieces (group, aggregate, join) because we >> > > >> want >> > > >>> to >> > > >>>>>>>> allow >> > > >>>>>>>>>>> people >> > > >>>>>>>>>>>> to join more than 2 or more streams in an easy way. Right >> > > >>>>> now I >> > > >>>>>>>> don't >> > > >>>>>>>>>>> think >> > > >>>>>>>>>>>> we give them ways of handling this use case easily. >> > > >>>>>>>>>>>> >> > > >>>>>>>>>>>> *-I think there will be scope for several such >> > > >>> optimizations >> > > >>>>> in >> > > >>>>>>>> the >> > > >>>>>>>>>>> future >> > > >>>>>>>>>>>> and perhaps at some point we need to think about >> > > >> decoupling >> > > >>>>> the >> > > >>>>>>>> 1:1 >> > > >>>>>>>>>>> mapping >> > > >>>>>>>>>>>> from the DSL into the physical topology.* >> > > >>>>>>>>>>>> I would argue that cogroup is not just an optimization it >> > > >>> is >> > > >>>>> a >> > > >>>>>> new >> > > >>>>>>>>> way >> > > >>>>>>>>>>> for >> > > >>>>>>>>>>>> the users to look at accomplishing a problem that >> > > >> requires >> > > >>>>>>>> multiple >> > > >>>>>>>>>>>> streams. I may sound like a broken record but I don't >> > > >> think >> > > >>>>>> users >> > > >>>>>>>>>> should >> > > >>>>>>>>>>>> have to build the N-1 intermediate tables and deal with >> > > >>> their >> > > >>>>>>>>>>> initializers, >> > > >>>>>>>>>>>> serdes and stores if all they care about is the final >> > > >>> object. >> > > >>>>>>>>>>>> Now if for example someone uses cogroup but doesn't >> > > >> supply >> > > >>>>>>>> additional >> > > >>>>>>>>>>>> streams and aggregators this case is equivalent to a >> > > >> single >> > > >>>>>>>> grouped >> > > >>>>>>>>>>> stream >> > > >>>>>>>>>>>> making an aggregate call. This case is what I view an >> > > >>>>>> optimization >> > > >>>>>>>>> as, >> > > >>>>>>>>>> we >> > > >>>>>>>>>>>> could remove the KStreamCogroup and act as if there was >> > > >>> just >> > > >>>>> a >> > > >>>>>>>> call >> > > >>>>>>>>> to >> > > >>>>>>>>>>>> KGroupedStream#aggregate instead of calling >> > > >>>>>>>> KGroupedStream#cogroup. >> > > >>>>>>>>> (I >> > > >>>>>>>>>>>> would prefer to just write a warning saying that this is >> > > >>> not >> > > >>>>> how >> > > >>>>>>>>>> cogroup >> > > >>>>>>>>>>> is >> > > >>>>>>>>>>>> to be used.) >> > > >>>>>>>>>>>> >> > > >>>>>>>>>>>> Thanks, >> > > >>>>>>>>>>>> Kyle >> > > >>>>>>>>>>>> >> > > >>>>>>>>>>>> On Sun, May 7, 2017 at 5:41 AM, Eno Thereska < >> > > >>>>>>>> eno.there...@gmail.com >> > > >>>>>>>>>> >> > > >>>>>>>>>>> wrote: >> > > >>>>>>>>>>>> >> > > >>>>>>>>>>>>> Hi Kyle, >> > > >>>>>>>>>>>>> >> > > >>>>>>>>>>>>> Thanks for the KIP again. A couple of comments: >> > > >>>>>>>>>>>>> >> > > >>>>>>>>>>>>> - minor: could you add an exact example (similar to what >> > > >>>>> Jay’s >> > > >>>>>>>>> example >> > > >>>>>>>>>>> is, >> > > >>>>>>>>>>>>> or like your Spark/Pig pointers had) to make this super >> > > >>>>>> concrete? >> > > >>>>>>>>>>>>> >> > > >>>>>>>>>>>>> - my main concern is that we’re exposing this >> > > >> optimization >> > > >>>>> to >> > > >>>>>> the >> > > >>>>>>>>> DSL. >> > > >>>>>>>>>>> In >> > > >>>>>>>>>>>>> an ideal world, an optimizer would take the existing DSL >> > > >>>>> and do >> > > >>>>>>>> the >> > > >>>>>>>>>>> right >> > > >>>>>>>>>>>>> thing under the covers (create just one state store, >> > > >>> arrange >> > > >>>>>> the >> > > >>>>>>>>> nodes >> > > >>>>>>>>>>>>> etc). The original DSL had a bunch of small, composable >> > > >>>>> pieces >> > > >>>>>>>>> (group, >> > > >>>>>>>>>>>>> aggregate, join) that this proposal groups together. I’d >> > > >>>>> like >> > > >>>>>> to >> > > >>>>>>>>> hear >> > > >>>>>>>>>>> your >> > > >>>>>>>>>>>>> thoughts on whether it’s possible to do this >> > > >> optimization >> > > >>>>> with >> > > >>>>>>>> the >> > > >>>>>>>>>>> current >> > > >>>>>>>>>>>>> DSL, at the topology builder level. >> > > >>>>>>>>>>>>> >> > > >>>>>>>>>>>>> I think there will be scope for several such >> > > >> optimizations >> > > >>>>> in >> > > >>>>>> the >> > > >>>>>>>>>> future >> > > >>>>>>>>>>>>> and perhaps at some point we need to think about >> > > >>> decoupling >> > > >>>>> the >> > > >>>>>>>> 1:1 >> > > >>>>>>>>>>> mapping >> > > >>>>>>>>>>>>> from the DSL into the physical topology. >> > > >>>>>>>>>>>>> >> > > >>>>>>>>>>>>> Thanks >> > > >>>>>>>>>>>>> Eno >> > > >>>>>>>>>>>>> >> > > >>>>>>>>>>>>>> On May 5, 2017, at 4:39 PM, Jay Kreps < >> > > >> j...@confluent.io> >> > > >>>>>> wrote: >> > > >>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>> I haven't digested the proposal but the use case is >> > > >>> pretty >> > > >>>>>>>> common. >> > > >>>>>>>>> An >> > > >>>>>>>>>>>>>> example would be the "customer 360" or "unified >> > > >> customer >> > > >>>>>>>> profile" >> > > >>>>>>>>> use >> > > >>>>>>>>>>>>> case >> > > >>>>>>>>>>>>>> we often use. In that use case you have a dozen systems >> > > >>>>> each >> > > >>>>>> of >> > > >>>>>>>>> which >> > > >>>>>>>>>>> has >> > > >>>>>>>>>>>>>> some information about your customer (account details, >> > > >>>>>> settings, >> > > >>>>>>>>>>> billing >> > > >>>>>>>>>>>>>> info, customer service contacts, purchase history, >> > > >> etc). >> > > >>>>> Your >> > > >>>>>>>> goal >> > > >>>>>>>>> is >> > > >>>>>>>>>>> to >> > > >>>>>>>>>>>>>> join/munge these into a single profile record for each >> > > >>>>>> customer >> > > >>>>>>>>> that >> > > >>>>>>>>>>> has >> > > >>>>>>>>>>>>>> all the relevant info in a usable form and is >> > > >> up-to-date >> > > >>>>> with >> > > >>>>>>>> all >> > > >>>>>>>>> the >> > > >>>>>>>>>>>>>> source systems. If you implement that with kstreams as >> > > >> a >> > > >>>>>>>> sequence >> > > >>>>>>>>> of >> > > >>>>>>>>>>>>> joins >> > > >>>>>>>>>>>>>> i think today we'd fully materialize N-1 intermediate >> > > >>>>> tables. >> > > >>>>>>>> But >> > > >>>>>>>>>>> clearly >> > > >>>>>>>>>>>>>> you only need a single stage to group all these things >> > > >>> that >> > > >>>>>> are >> > > >>>>>>>>>> already >> > > >>>>>>>>>>>>>> co-partitioned. A distributed database would do this >> > > >>> under >> > > >>>>> the >> > > >>>>>>>>> covers >> > > >>>>>>>>>>>>> which >> > > >>>>>>>>>>>>>> is arguably better (at least when it does the right >> > > >>> thing) >> > > >>>>> and >> > > >>>>>>>>>> perhaps >> > > >>>>>>>>>>> we >> > > >>>>>>>>>>>>>> could do the same thing but I'm not sure we know the >> > > >>>>>>>> partitioning >> > > >>>>>>>>> so >> > > >>>>>>>>>> we >> > > >>>>>>>>>>>>> may >> > > >>>>>>>>>>>>>> need an explicit cogroup command that impllies they are >> > > >>>>>> already >> > > >>>>>>>>>>>>>> co-partitioned. >> > > >>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>> -Jay >> > > >>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman < >> > > >>>>>>>>>>> winkelman.k...@gmail.com >> > > >>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>> wrote: >> > > >>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>> Yea thats a good way to look at it. >> > > >>>>>>>>>>>>>>> I have seen this type of functionality in a couple >> > > >> other >> > > >>>>>>>> platforms >> > > >>>>>>>>>>> like >> > > >>>>>>>>>>>>>>> spark and pig. >> > > >>>>>>>>>>>>>>> https://spark.apache.org/docs/0.6.2/api/core/spark/ >> > > >>>>>>>>>>>>> PairRDDFunctions.html >> > > >>>>>>>>>>>>>>> https://www.tutorialspoint.com/apache_pig/apache_pig_ >> > > >>>>>>>>>>>>> cogroup_operator.htm >> > > >>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>> On May 5, 2017 7:43 AM, "Damian Guy" < >> > > >>>>> damian....@gmail.com> >> > > >>>>>>>>> wrote: >> > > >>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>> Hi Kyle, >> > > >>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>> If i'm reading this correctly it is like an N way >> > > >> outer >> > > >>>>>> join? >> > > >>>>>>>> So >> > > >>>>>>>>> an >> > > >>>>>>>>>>>>> input >> > > >>>>>>>>>>>>>>>> on any stream will always produce a new aggregated >> > > >>> value >> > > >>>>> - >> > > >>>>>> is >> > > >>>>>>>>> that >> > > >>>>>>>>>>>>>>> correct? >> > > >>>>>>>>>>>>>>>> Effectively, each Aggregator just looks up the >> > > >> current >> > > >>>>>> value, >> > > >>>>>>>>>>>>> aggregates >> > > >>>>>>>>>>>>>>>> and forwards the result. >> > > >>>>>>>>>>>>>>>> I need to look into it and think about it a bit more, >> > > >>>>> but it >> > > >>>>>>>>> seems >> > > >>>>>>>>>>> like >> > > >>>>>>>>>>>>>>> it >> > > >>>>>>>>>>>>>>>> could be a useful optimization. >> > > >>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman < >> > > >>>>>>>>>> winkelman.k...@gmail.com >> > > >>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>> wrote: >> > > >>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>> I sure can. I have added the following description >> > > >> to >> > > >>> my >> > > >>>>>>>> KIP. If >> > > >>>>>>>>>>> this >> > > >>>>>>>>>>>>>>>>> doesn't help let me know and I will take some more >> > > >>> time >> > > >>>>> to >> > > >>>>>>>>> build a >> > > >>>>>>>>>>>>>>>> diagram >> > > >>>>>>>>>>>>>>>>> and make more of a step by step description: >> > > >>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>> Example with Current API: >> > > >>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>> KTable<K, V1> table1 = >> > > >>>>>>>>>>>>>>>>> builder.stream("topic1").groupByKey().aggregate( >> > > >>>>>> initializer1 >> > > >>>>>>>> , >> > > >>>>>>>>>>>>>>>> aggregator1, >> > > >>>>>>>>>>>>>>>>> aggValueSerde1, storeName1); >> > > >>>>>>>>>>>>>>>>> KTable<K, V2> table2 = >> > > >>>>>>>>>>>>>>>>> builder.stream("topic2").groupByKey().aggregate( >> > > >>>>>> initializer2 >> > > >>>>>>>> , >> > > >>>>>>>>>>>>>>>> aggregator2, >> > > >>>>>>>>>>>>>>>>> aggValueSerde2, storeName2); >> > > >>>>>>>>>>>>>>>>> KTable<K, V3> table3 = >> > > >>>>>>>>>>>>>>>>> builder.stream("topic3").groupByKey().aggregate( >> > > >>>>>> initializer3 >> > > >>>>>>>> , >> > > >>>>>>>>>>>>>>>> aggregator3, >> > > >>>>>>>>>>>>>>>>> aggValueSerde3, storeName3); >> > > >>>>>>>>>>>>>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2, >> > > >>>>>>>>>>>>>>>>> joinerOneAndTwo).outerJoin(table3, >> > > >>>>> joinerOneTwoAndThree); >> > > >>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>> As you can see this creates 3 StateStores, requires >> > > >> 3 >> > > >>>>>>>>>> initializers, >> > > >>>>>>>>>>>>>>> and 3 >> > > >>>>>>>>>>>>>>>>> aggValueSerdes. This also adds the pressure to user >> > > >> to >> > > >>>>>> define >> > > >>>>>>>>> what >> > > >>>>>>>>>>> the >> > > >>>>>>>>>>>>>>>>> intermediate values are going to be (V1, V2, V3). >> > > >> They >> > > >>>>> are >> > > >>>>>>>> left >> > > >>>>>>>>>>> with a >> > > >>>>>>>>>>>>>>>>> couple choices, first to make V1, V2, and V3 all the >> > > >>>>> same >> > > >>>>>> as >> > > >>>>>>>> CG >> > > >>>>>>>>>> and >> > > >>>>>>>>>>>>> the >> > > >>>>>>>>>>>>>>>> two >> > > >>>>>>>>>>>>>>>>> joiners are more like mergers, or second make them >> > > >>>>>>>> intermediate >> > > >>>>>>>>>>> states >> > > >>>>>>>>>>>>>>>> such >> > > >>>>>>>>>>>>>>>>> as Topic1Map, Topic2Map, and Topic3Map and the >> > > >> joiners >> > > >>>>> use >> > > >>>>>>>> those >> > > >>>>>>>>>> to >> > > >>>>>>>>>>>>>>> build >> > > >>>>>>>>>>>>>>>>> the final aggregate CG value. This is something the >> > > >>> user >> > > >>>>>>>> could >> > > >>>>>>>>>> avoid >> > > >>>>>>>>>>>>>>>>> thinking about with this KIP. >> > > >>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>> When a new input arrives lets say at "topic1" it >> > > >> will >> > > >>>>> first >> > > >>>>>>>> go >> > > >>>>>>>>>>> through >> > > >>>>>>>>>>>>>>> a >> > > >>>>>>>>>>>>>>>>> KStreamAggregate grabbing the current aggregate from >> > > >>>>>>>> storeName1. >> > > >>>>>>>>>> It >> > > >>>>>>>>>>>>>>> will >> > > >>>>>>>>>>>>>>>>> produce this in the form of the first intermediate >> > > >>> value >> > > >>>>>> and >> > > >>>>>>>> get >> > > >>>>>>>>>>> sent >> > > >>>>>>>>>>>>>>>>> through a KTableKTableOuterJoin where it will look >> > > >> up >> > > >>>>> the >> > > >>>>>>>>> current >> > > >>>>>>>>>>>>> value >> > > >>>>>>>>>>>>>>>> of >> > > >>>>>>>>>>>>>>>>> the key in storeName2. It will use the first joiner >> > > >> to >> > > >>>>>>>> calculate >> > > >>>>>>>>>> the >> > > >>>>>>>>>>>>>>>> second >> > > >>>>>>>>>>>>>>>>> intermediate value, which will go through an >> > > >>> additional >> > > >>>>>>>>>>>>>>>>> KTableKTableOuterJoin. Here it will look up the >> > > >>> current >> > > >>>>>>>> value of >> > > >>>>>>>>>> the >> > > >>>>>>>>>>>>>>> key >> > > >>>>>>>>>>>>>>>> in >> > > >>>>>>>>>>>>>>>>> storeName3 and use the second joiner to build the >> > > >>> final >> > > >>>>>>>>> aggregate >> > > >>>>>>>>>>>>>>> value. >> > > >>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>> If you think through all possibilities for incoming >> > > >>>>> topics >> > > >>>>>>>> you >> > > >>>>>>>>>> will >> > > >>>>>>>>>>>>> see >> > > >>>>>>>>>>>>>>>>> that no matter which topic it comes in through all >> > > >>> three >> > > >>>>>>>> stores >> > > >>>>>>>>>> are >> > > >>>>>>>>>>>>>>>> queried >> > > >>>>>>>>>>>>>>>>> and all of the joiners must get used. >> > > >>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>> Topology wise for N incoming streams this creates N >> > > >>>>>>>>>>>>>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, >> > > >> and >> > > >>>>> N-1 >> > > >>>>>>>>>>>>>>>>> KTableKTableJoinMergers. >> > > >>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>> Example with Proposed API: >> > > >>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>> KGroupedStream<K, V1> grouped1 = >> > > >>>>> builder.stream("topic1"). >> > > >>>>>>>>>>>>>>> groupByKey(); >> > > >>>>>>>>>>>>>>>>> KGroupedStream<K, V2> grouped2 = >> > > >>>>> builder.stream("topic2"). >> > > >>>>>>>>>>>>>>> groupByKey(); >> > > >>>>>>>>>>>>>>>>> KGroupedStream<K, V3> grouped3 = >> > > >>>>> builder.stream("topic3"). >> > > >>>>>>>>>>>>>>> groupByKey(); >> > > >>>>>>>>>>>>>>>>> KTable<K, CG> cogrouped = >> > > >>> grouped1.cogroup(initializer1, >> > > >>>>>>>>>>> aggregator1, >> > > >>>>>>>>>>>>>>>>> aggValueSerde1, storeName1) >> > > >>>>>>>>>>>>>>>>> .cogroup(grouped2, aggregator2) >> > > >>>>>>>>>>>>>>>>> .cogroup(grouped3, aggregator3) >> > > >>>>>>>>>>>>>>>>> .aggregate(); >> > > >>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>> As you can see this creates 1 StateStore, requires 1 >> > > >>>>>>>>> initializer, >> > > >>>>>>>>>>> and >> > > >>>>>>>>>>>>> 1 >> > > >>>>>>>>>>>>>>>>> aggValueSerde. The user no longer has to worry about >> > > >>> the >> > > >>>>>>>>>>> intermediate >> > > >>>>>>>>>>>>>>>>> values and the joiners. All they have to think about >> > > >>> is >> > > >>>>> how >> > > >>>>>>>> each >> > > >>>>>>>>>>>>> stream >> > > >>>>>>>>>>>>>>>>> impacts the creation of the final CG object. >> > > >>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>> When a new input arrives lets say at "topic1" it >> > > >> will >> > > >>>>> first >> > > >>>>>>>> go >> > > >>>>>>>>>>> through >> > > >>>>>>>>>>>>>>> a >> > > >>>>>>>>>>>>>>>>> KStreamAggreagte and grab the current aggregate from >> > > >>>>>>>> storeName1. >> > > >>>>>>>>>> It >> > > >>>>>>>>>>>>>>> will >> > > >>>>>>>>>>>>>>>>> add its incoming object to the aggregate, update the >> > > >>>>> store >> > > >>>>>>>> and >> > > >>>>>>>>>> pass >> > > >>>>>>>>>>>>> the >> > > >>>>>>>>>>>>>>>> new >> > > >>>>>>>>>>>>>>>>> aggregate on. This new aggregate goes through the >> > > >>>>>>>> KStreamCogroup >> > > >>>>>>>>>>> which >> > > >>>>>>>>>>>>>>> is >> > > >>>>>>>>>>>>>>>>> pretty much just a pass through processor and you >> > > >> are >> > > >>>>> done. >> > > >>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>> Topology wise for N incoming streams the new api >> > > >> will >> > > >>>>> only >> > > >>>>>>>> every >> > > >>>>>>>>>>>>>>> create N >> > > >>>>>>>>>>>>>>>>> KStreamAggregates and 1 KStreamCogroup. >> > > >>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax < >> > > >>>>>>>>>>>>> matth...@confluent.io >> > > >>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>> wrote: >> > > >>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>> Kyle, >> > > >>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>> thanks a lot for the KIP. Maybe I am a little slow, >> > > >>>>> but I >> > > >>>>>>>> could >> > > >>>>>>>>>> not >> > > >>>>>>>>>>>>>>>>>> follow completely. Could you maybe add a more >> > > >>> concrete >> > > >>>>>>>> example, >> > > >>>>>>>>>>> like >> > > >>>>>>>>>>>>>>> 3 >> > > >>>>>>>>>>>>>>>>>> streams with 3 records each (plus expected result), >> > > >>> and >> > > >>>>>> show >> > > >>>>>>>>> the >> > > >>>>>>>>>>>>>>>>>> difference between current way to to implement it >> > > >> and >> > > >>>>> the >> > > >>>>>>>>>> proposed >> > > >>>>>>>>>>>>>>> API? >> > > >>>>>>>>>>>>>>>>>> This could also cover the internal processing to >> > > >> see >> > > >>>>> what >> > > >>>>>>>> store >> > > >>>>>>>>>>> calls >> > > >>>>>>>>>>>>>>>>>> would be required for both approaches etc. >> > > >>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>> I think, it's pretty advanced stuff you propose, >> > > >> and >> > > >>> it >> > > >>>>>>>> would >> > > >>>>>>>>>> help >> > > >>>>>>>>>>> to >> > > >>>>>>>>>>>>>>>>>> understand it better. >> > > >>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>> Thanks a lot! >> > > >>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>> -Matthias >> > > >>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote: >> > > >>>>>>>>>>>>>>>>>>> I have made a pull request. It can be found here. >> > > >>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/2975 >> > > >>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>> I plan to write some more unit tests for my >> > > >> classes >> > > >>>>> and >> > > >>>>>> get >> > > >>>>>>>>>> around >> > > >>>>>>>>>>>>>>> to >> > > >>>>>>>>>>>>>>>>>>> writing documentation for the public api >> > > >> additions. >> > > >>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>> One thing I was curious about is during the >> > > >>>>>>>>>>>>>>>>>> KCogroupedStreamImpl#aggregate >> > > >>>>>>>>>>>>>>>>>>> method I pass null to the KGroupedStream# >> > > >>>>>>>>> repartitionIfRequired >> > > >>>>>>>>>>>>>>>> method. >> > > >>>>>>>>>>>>>>>>> I >> > > >>>>>>>>>>>>>>>>>>> can't supply the store name because if more than >> > > >> one >> > > >>>>>>>> grouped >> > > >>>>>>>>>>> stream >> > > >>>>>>>>>>>>>>>>>>> repartitions an error is thrown. Is there some >> > > >> name >> > > >>>>> that >> > > >>>>>>>>> someone >> > > >>>>>>>>>>>>>>> can >> > > >>>>>>>>>>>>>>>>>>> recommend or should I leave the null and allow it >> > > >> to >> > > >>>>> fall >> > > >>>>>>>> back >> > > >>>>>>>>>> to >> > > >>>>>>>>>>>>>>> the >> > > >>>>>>>>>>>>>>>>>>> KGroupedStream.name? >> > > >>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>> Should this be expanded to handle grouped tables? >> > > >>> This >> > > >>>>>>>> would >> > > >>>>>>>>> be >> > > >>>>>>>>>>>>>>>> pretty >> > > >>>>>>>>>>>>>>>>>> easy >> > > >>>>>>>>>>>>>>>>>>> for a normal aggregate but one allowing session >> > > >>> stores >> > > >>>>>> and >> > > >>>>>>>>>>> windowed >> > > >>>>>>>>>>>>>>>>>> stores >> > > >>>>>>>>>>>>>>>>>>> would required KTableSessionWindowAggregate and >> > > >>>>>>>>>>>>>>> KTableWindowAggregate >> > > >>>>>>>>>>>>>>>>>>> implementations. >> > > >>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>> Thanks, >> > > >>>>>>>>>>>>>>>>>>> Kyle >> > > >>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" < >> > > >>>>>>>>> eno.there...@gmail.com> >> > > >>>>>>>>>>>>>>>> wrote: >> > > >>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>>> I’ll look as well asap, sorry, been swamped. >> > > >>>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>>> Eno >> > > >>>>>>>>>>>>>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy < >> > > >>>>>>>>> damian....@gmail.com> >> > > >>>>>>>>>>>>>>>> wrote: >> > > >>>>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>>>> Hi Kyle, >> > > >>>>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. I apologize that i haven't >> > > >> had >> > > >>>>> the >> > > >>>>>>>>> chance >> > > >>>>>>>>>> to >> > > >>>>>>>>>>>>>>>> look >> > > >>>>>>>>>>>>>>>>>> at >> > > >>>>>>>>>>>>>>>>>>>>> the KIP yet, but will schedule some time to look >> > > >>>>> into >> > > >>>>>> it >> > > >>>>>>>>>>>>>>> tomorrow. >> > > >>>>>>>>>>>>>>>>> For >> > > >>>>>>>>>>>>>>>>>>>> the >> > > >>>>>>>>>>>>>>>>>>>>> implementation, can you raise a PR against kafka >> > > >>>>> trunk >> > > >>>>>>>> and >> > > >>>>>>>>>> mark >> > > >>>>>>>>>>>>>>> it >> > > >>>>>>>>>>>>>>>> as >> > > >>>>>>>>>>>>>>>>>>>> WIP? >> > > >>>>>>>>>>>>>>>>>>>>> It will be easier to review what you have done. >> > > >>>>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>>>> Thanks, >> > > >>>>>>>>>>>>>>>>>>>>> Damian >> > > >>>>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman < >> > > >>>>>>>>>>>>>>>> winkelman.k...@gmail.com >> > > >>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>>> wrote: >> > > >>>>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>>>>> I am replying to this in hopes it will draw >> > > >> some >> > > >>>>>>>> attention >> > > >>>>>>>>> to >> > > >>>>>>>>>>> my >> > > >>>>>>>>>>>>>>>> KIP >> > > >>>>>>>>>>>>>>>>>> as >> > > >>>>>>>>>>>>>>>>>>>> I >> > > >>>>>>>>>>>>>>>>>>>>>> haven't heard from anyone in a couple days. >> > > >> This >> > > >>>>> is my >> > > >>>>>>>>> first >> > > >>>>>>>>>>> KIP >> > > >>>>>>>>>>>>>>>> and >> > > >>>>>>>>>>>>>>>>>> my >> > > >>>>>>>>>>>>>>>>>>>>>> first large contribution to the project so I'm >> > > >>>>> sure I >> > > >>>>>>>> did >> > > >>>>>>>>>>>>>>>> something >> > > >>>>>>>>>>>>>>>>>>>> wrong. >> > > >>>>>>>>>>>>>>>>>>>>>> ;) >> > > >>>>>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" < >> > > >>>>>>>>>>>>>>>> winkelman.k...@gmail.com> >> > > >>>>>>>>>>>>>>>>>>>> wrote: >> > > >>>>>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>>>>>> Hello all, >> > > >>>>>>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>>>>>> I have created KIP-150 to facilitate >> > > >> discussion >> > > >>>>> about >> > > >>>>>>>>> adding >> > > >>>>>>>>>>>>>>>>> cogroup >> > > >>>>>>>>>>>>>>>>>> to >> > > >>>>>>>>>>>>>>>>>>>>>>> the streams DSL. >> > > >>>>>>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>>>>>> Please find the KIP here: >> > > >>>>>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/ >> > > >>>>>> confluence/display/KAFKA/KIP- >> > > >>>>>>>>>>>>>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup >> > > >>>>>>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>>>>>> Please find my initial implementation here: >> > > >>>>>>>>>>>>>>>>>>>>>>> https://github.com/KyleWinkelman/kafka >> > > >>>>>>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>>>>>> Thanks, >> > > >>>>>>>>>>>>>>>>>>>>>>> Kyle Winkelman >> > > >>>>>>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>>>> >> > > >>>>>>>>>>>>> >> > > >>>>>>>>>>>>> >> > > >>>>>>>>>>> >> > > >>>>>>>>>>> >> > > >>>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> -- >> > > >>>>>>> -- Guozhang >> > > >>>>>>> >> > > >>>>>> >> > > >>>>>> >> > > >>>>>> >> > > >>>>>> -- >> > > >>>>>> -- Guozhang >> > > >>>>>> >> > > >>>>> >> > > >>>>> >> > > >>>>> >> > > >>>>> -- >> > > >>>>> -- Guozhang >> > > >>>>> >> > > >>>> >> > > >>> >> > > >> >> > > >> > > >> > >> > >> > -- >> > -- Guozhang >> > >> > > > > -- > -- Guozhang > -- -- Guozhang