I allow defining a single window/sessionwindow one time when you make the cogroup call from a KGroupedStream. From then on you are using the cogroup call from with in CogroupedKStream which doesnt accept any additional windows/sessionwindows.
Is this what you meant by your question or did I misunderstand? On May 23, 2017 9:33 PM, "Guozhang Wang" <wangg...@gmail.com> wrote: Another question that came to me is on "window alignment": from the KIP it seems you are allowing users to specify a (potentially different) window spec in each co-grouped input stream. So if these window specs are different how should we "align" them with different input streams? I think it is more natural to only specify on window spec in the KTable<RK, V> CogroupedKStream#aggregate(Windows); And remove it from the cogroup() functions. WDYT? Guozhang On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Thanks for the proposal Kyle, this is a quite common use case to support > such multi-way table join (i.e. N source tables with N aggregate func) with > a single store and N+1 serdes, I have seen lots of people using the > low-level PAPI to achieve this goal. > > > On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman <winkelman.k...@gmail.com > > wrote: > >> I like your point about not handling other cases such as count and reduce. >> >> I think that reduce may not make sense because reduce assumes that the >> input values are the same as the output values. With cogroup there may be >> multiple different input types and then your output type cant be multiple >> different things. In the case where you have all matching value types you >> can do KStreamBuilder#merge followed by the reduce. >> >> As for count I think it is possible to call count on all the individual >> grouped streams and then do joins. Otherwise we could maybe make a special >> call in groupedstream for this case. Because in this case we dont need to >> do type checking on the values. It could be similar to the current count >> methods but accept a var args of additonal grouped streams as well and >> make >> sure they have a key type of K. >> >> The way I have put the kip together is to ensure that we do type checking. >> I don't see a way we could group them all first and then make a call to >> count, reduce, or aggregate because with aggregate they would need to pass >> a list of aggregators and we would have no way of type checking that they >> match the grouped streams. >> >> Thanks, >> Kyle >> >> On May 19, 2017 11:42 AM, "Xavier Léauté" <xav...@confluent.io> wrote: >> >> > Sorry to jump on this thread so late. I agree this is a very useful >> > addition and wanted to provide an additional use-case and some more >> > comments. >> > >> > This is actually a very common analytics use-case in the ad-tech >> industry. >> > The typical setup will have an auction stream, an impression stream, >> and a >> > click stream. Those three streams need to be combined to compute >> aggregate >> > statistics (e.g. impression statistics, and click-through rates), since >> > most of the attributes of interest are only present the auction stream. >> > >> > A simple way to do this is to co-group all the streams by the auction >> key, >> > and process updates to the co-group as events for each stream come in, >> > keeping only one value from each stream before sending downstream for >> > further processing / aggregation. >> > >> > One could view the result of that co-group operation as a "KTable" with >> > multiple values per key. The key being the grouping key, and the values >> > consisting of one value per stream. >> > >> > What I like about Kyle's approach is that allows elegant co-grouping of >> > multiple streams without having to worry about the number of streams, >> and >> > avoids dealing with Tuple types or other generic interfaces that could >> get >> > messy if we wanted to preserve all the value types in the resulting >> > co-grouped stream. >> > >> > My only concern is that we only allow the cogroup + aggregate combined >> > operation. This forces the user to build their own tuple serialization >> > format if they want to preserve the individual input stream values as a >> > group. It also deviates quite a bit from our approach in KGroupedStream >> > which offers other operations, such as count and reduce, which should >> also >> > be applicable to a co-grouped stream. >> > >> > Overall I still think this is a really useful addition, but I feel we >> > haven't spend much time trying to explore alternative DSLs that could >> maybe >> > generalize better or match our existing syntax more closely. >> > >> > On Tue, May 9, 2017 at 8:08 AM Kyle Winkelman <winkelman.k...@gmail.com >> > >> > wrote: >> > >> > > Eno, is there anyone else that is an expert in the kafka streams realm >> > that >> > > I should reach out to for input? >> > > >> > > I believe Damian Guy is still planning on reviewing this more in depth >> > so I >> > > will wait for his inputs before continuing. >> > > >> > > On May 9, 2017 7:30 AM, "Eno Thereska" <eno.there...@gmail.com> >> wrote: >> > > >> > > > Thanks Kyle, good arguments. >> > > > >> > > > Eno >> > > > >> > > > > On May 7, 2017, at 5:06 PM, Kyle Winkelman < >> winkelman.k...@gmail.com >> > > >> > > > wrote: >> > > > > >> > > > > *- minor: could you add an exact example (similar to what Jay’s >> > example >> > > > is, >> > > > > or like your Spark/Pig pointers had) to make this super concrete?* >> > > > > I have added a more concrete example to the KIP. >> > > > > >> > > > > *- my main concern is that we’re exposing this optimization to the >> > DSL. >> > > > In >> > > > > an ideal world, an optimizer would take the existing DSL and do >> the >> > > right >> > > > > thing under the covers (create just one state store, arrange the >> > nodes >> > > > > etc). The original DSL had a bunch of small, composable pieces >> > (group, >> > > > > aggregate, join) that this proposal groups together. I’d like to >> hear >> > > > your >> > > > > thoughts on whether it’s possible to do this optimization with the >> > > > current >> > > > > DSL, at the topology builder level.* >> > > > > You would have to make a lot of checks to understand if it is even >> > > > possible >> > > > > to make this optimization: >> > > > > 1. Make sure they are all KTableKTableOuterJoins >> > > > > 2. None of the intermediate KTables are used for anything else. >> > > > > 3. None of the intermediate stores are used. (This may be >> impossible >> > > > > especially if they use KafkaStreams#store after the topology has >> > > already >> > > > > been built.) >> > > > > You would then need to make decisions during the optimization: >> > > > > 1. Your new initializer would the composite of all the individual >> > > > > initializers and the valueJoiners. >> > > > > 2. I am having a hard time thinking about how you would turn the >> > > > > aggregators and valueJoiners into an aggregator that would work on >> > the >> > > > > final object, but this may be possible. >> > > > > 3. Which state store would you use? The ones declared would be for >> > the >> > > > > aggregate values. None of the declared ones would be guaranteed to >> > hold >> > > > the >> > > > > final object. This would mean you must created a new state store >> and >> > > not >> > > > > created any of the declared ones. >> > > > > >> > > > > The main argument I have against it is even if it could be done I >> > don't >> > > > > know that we would want to have this be an optimization in the >> > > background >> > > > > because the user would still be required to think about all of the >> > > > > intermediate values that they shouldn't need to worry about if >> they >> > > only >> > > > > care about the final object. >> > > > > >> > > > > In my opinion cogroup is a common enough case that it should be >> part >> > of >> > > > the >> > > > > composable pieces (group, aggregate, join) because we want to >> allow >> > > > people >> > > > > to join more than 2 or more streams in an easy way. Right now I >> don't >> > > > think >> > > > > we give them ways of handling this use case easily. >> > > > > >> > > > > *-I think there will be scope for several such optimizations in >> the >> > > > future >> > > > > and perhaps at some point we need to think about decoupling the >> 1:1 >> > > > mapping >> > > > > from the DSL into the physical topology.* >> > > > > I would argue that cogroup is not just an optimization it is a new >> > way >> > > > for >> > > > > the users to look at accomplishing a problem that requires >> multiple >> > > > > streams. I may sound like a broken record but I don't think users >> > > should >> > > > > have to build the N-1 intermediate tables and deal with their >> > > > initializers, >> > > > > serdes and stores if all they care about is the final object. >> > > > > Now if for example someone uses cogroup but doesn't supply >> additional >> > > > > streams and aggregators this case is equivalent to a single >> grouped >> > > > stream >> > > > > making an aggregate call. This case is what I view an optimization >> > as, >> > > we >> > > > > could remove the KStreamCogroup and act as if there was just a >> call >> > to >> > > > > KGroupedStream#aggregate instead of calling >> KGroupedStream#cogroup. >> > (I >> > > > > would prefer to just write a warning saying that this is not how >> > > cogroup >> > > > is >> > > > > to be used.) >> > > > > >> > > > > Thanks, >> > > > > Kyle >> > > > > >> > > > > On Sun, May 7, 2017 at 5:41 AM, Eno Thereska < >> eno.there...@gmail.com >> > > >> > > > wrote: >> > > > > >> > > > >> Hi Kyle, >> > > > >> >> > > > >> Thanks for the KIP again. A couple of comments: >> > > > >> >> > > > >> - minor: could you add an exact example (similar to what Jay’s >> > example >> > > > is, >> > > > >> or like your Spark/Pig pointers had) to make this super concrete? >> > > > >> >> > > > >> - my main concern is that we’re exposing this optimization to the >> > DSL. >> > > > In >> > > > >> an ideal world, an optimizer would take the existing DSL and do >> the >> > > > right >> > > > >> thing under the covers (create just one state store, arrange the >> > nodes >> > > > >> etc). The original DSL had a bunch of small, composable pieces >> > (group, >> > > > >> aggregate, join) that this proposal groups together. I’d like to >> > hear >> > > > your >> > > > >> thoughts on whether it’s possible to do this optimization with >> the >> > > > current >> > > > >> DSL, at the topology builder level. >> > > > >> >> > > > >> I think there will be scope for several such optimizations in the >> > > future >> > > > >> and perhaps at some point we need to think about decoupling the >> 1:1 >> > > > mapping >> > > > >> from the DSL into the physical topology. >> > > > >> >> > > > >> Thanks >> > > > >> Eno >> > > > >> >> > > > >>> On May 5, 2017, at 4:39 PM, Jay Kreps <j...@confluent.io> wrote: >> > > > >>> >> > > > >>> I haven't digested the proposal but the use case is pretty >> common. >> > An >> > > > >>> example would be the "customer 360" or "unified customer >> profile" >> > use >> > > > >> case >> > > > >>> we often use. In that use case you have a dozen systems each of >> > which >> > > > has >> > > > >>> some information about your customer (account details, settings, >> > > > billing >> > > > >>> info, customer service contacts, purchase history, etc). Your >> goal >> > is >> > > > to >> > > > >>> join/munge these into a single profile record for each customer >> > that >> > > > has >> > > > >>> all the relevant info in a usable form and is up-to-date with >> all >> > the >> > > > >>> source systems. If you implement that with kstreams as a >> sequence >> > of >> > > > >> joins >> > > > >>> i think today we'd fully materialize N-1 intermediate tables. >> But >> > > > clearly >> > > > >>> you only need a single stage to group all these things that are >> > > already >> > > > >>> co-partitioned. A distributed database would do this under the >> > covers >> > > > >> which >> > > > >>> is arguably better (at least when it does the right thing) and >> > > perhaps >> > > > we >> > > > >>> could do the same thing but I'm not sure we know the >> partitioning >> > so >> > > we >> > > > >> may >> > > > >>> need an explicit cogroup command that impllies they are already >> > > > >>> co-partitioned. >> > > > >>> >> > > > >>> -Jay >> > > > >>> >> > > > >>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman < >> > > > winkelman.k...@gmail.com >> > > > >>> >> > > > >>> wrote: >> > > > >>> >> > > > >>>> Yea thats a good way to look at it. >> > > > >>>> I have seen this type of functionality in a couple other >> platforms >> > > > like >> > > > >>>> spark and pig. >> > > > >>>> https://spark.apache.org/docs/0.6.2/api/core/spark/ >> > > > >> PairRDDFunctions.html >> > > > >>>> https://www.tutorialspoint.com/apache_pig/apache_pig_ >> > > > >> cogroup_operator.htm >> > > > >>>> >> > > > >>>> >> > > > >>>> On May 5, 2017 7:43 AM, "Damian Guy" <damian....@gmail.com> >> > wrote: >> > > > >>>> >> > > > >>>>> Hi Kyle, >> > > > >>>>> >> > > > >>>>> If i'm reading this correctly it is like an N way outer join? >> So >> > an >> > > > >> input >> > > > >>>>> on any stream will always produce a new aggregated value - is >> > that >> > > > >>>> correct? >> > > > >>>>> Effectively, each Aggregator just looks up the current value, >> > > > >> aggregates >> > > > >>>>> and forwards the result. >> > > > >>>>> I need to look into it and think about it a bit more, but it >> > seems >> > > > like >> > > > >>>> it >> > > > >>>>> could be a useful optimization. >> > > > >>>>> >> > > > >>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman < >> > > winkelman.k...@gmail.com >> > > > > >> > > > >>>>> wrote: >> > > > >>>>> >> > > > >>>>>> I sure can. I have added the following description to my >> KIP. If >> > > > this >> > > > >>>>>> doesn't help let me know and I will take some more time to >> > build a >> > > > >>>>> diagram >> > > > >>>>>> and make more of a step by step description: >> > > > >>>>>> >> > > > >>>>>> Example with Current API: >> > > > >>>>>> >> > > > >>>>>> KTable<K, V1> table1 = >> > > > >>>>>> builder.stream("topic1").groupByKey().aggregate(initializer1 >> , >> > > > >>>>> aggregator1, >> > > > >>>>>> aggValueSerde1, storeName1); >> > > > >>>>>> KTable<K, V2> table2 = >> > > > >>>>>> builder.stream("topic2").groupByKey().aggregate(initializer2 >> , >> > > > >>>>> aggregator2, >> > > > >>>>>> aggValueSerde2, storeName2); >> > > > >>>>>> KTable<K, V3> table3 = >> > > > >>>>>> builder.stream("topic3").groupByKey().aggregate(initializer3 >> , >> > > > >>>>> aggregator3, >> > > > >>>>>> aggValueSerde3, storeName3); >> > > > >>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2, >> > > > >>>>>> joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree); >> > > > >>>>>> >> > > > >>>>>> As you can see this creates 3 StateStores, requires 3 >> > > initializers, >> > > > >>>> and 3 >> > > > >>>>>> aggValueSerdes. This also adds the pressure to user to define >> > what >> > > > the >> > > > >>>>>> intermediate values are going to be (V1, V2, V3). They are >> left >> > > > with a >> > > > >>>>>> couple choices, first to make V1, V2, and V3 all the same as >> CG >> > > and >> > > > >> the >> > > > >>>>> two >> > > > >>>>>> joiners are more like mergers, or second make them >> intermediate >> > > > states >> > > > >>>>> such >> > > > >>>>>> as Topic1Map, Topic2Map, and Topic3Map and the joiners use >> those >> > > to >> > > > >>>> build >> > > > >>>>>> the final aggregate CG value. This is something the user >> could >> > > avoid >> > > > >>>>>> thinking about with this KIP. >> > > > >>>>>> >> > > > >>>>>> When a new input arrives lets say at "topic1" it will first >> go >> > > > through >> > > > >>>> a >> > > > >>>>>> KStreamAggregate grabbing the current aggregate from >> storeName1. >> > > It >> > > > >>>> will >> > > > >>>>>> produce this in the form of the first intermediate value and >> get >> > > > sent >> > > > >>>>>> through a KTableKTableOuterJoin where it will look up the >> > current >> > > > >> value >> > > > >>>>> of >> > > > >>>>>> the key in storeName2. It will use the first joiner to >> calculate >> > > the >> > > > >>>>> second >> > > > >>>>>> intermediate value, which will go through an additional >> > > > >>>>>> KTableKTableOuterJoin. Here it will look up the current >> value of >> > > the >> > > > >>>> key >> > > > >>>>> in >> > > > >>>>>> storeName3 and use the second joiner to build the final >> > aggregate >> > > > >>>> value. >> > > > >>>>>> >> > > > >>>>>> If you think through all possibilities for incoming topics >> you >> > > will >> > > > >> see >> > > > >>>>>> that no matter which topic it comes in through all three >> stores >> > > are >> > > > >>>>> queried >> > > > >>>>>> and all of the joiners must get used. >> > > > >>>>>> >> > > > >>>>>> Topology wise for N incoming streams this creates N >> > > > >>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1 >> > > > >>>>>> KTableKTableJoinMergers. >> > > > >>>>>> >> > > > >>>>>> >> > > > >>>>>> >> > > > >>>>>> Example with Proposed API: >> > > > >>>>>> >> > > > >>>>>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1"). >> > > > >>>> groupByKey(); >> > > > >>>>>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2"). >> > > > >>>> groupByKey(); >> > > > >>>>>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3"). >> > > > >>>> groupByKey(); >> > > > >>>>>> KTable<K, CG> cogrouped = grouped1.cogroup(initializer1, >> > > > aggregator1, >> > > > >>>>>> aggValueSerde1, storeName1) >> > > > >>>>>> .cogroup(grouped2, aggregator2) >> > > > >>>>>> .cogroup(grouped3, aggregator3) >> > > > >>>>>> .aggregate(); >> > > > >>>>>> >> > > > >>>>>> As you can see this creates 1 StateStore, requires 1 >> > initializer, >> > > > and >> > > > >> 1 >> > > > >>>>>> aggValueSerde. The user no longer has to worry about the >> > > > intermediate >> > > > >>>>>> values and the joiners. All they have to think about is how >> each >> > > > >> stream >> > > > >>>>>> impacts the creation of the final CG object. >> > > > >>>>>> >> > > > >>>>>> When a new input arrives lets say at "topic1" it will first >> go >> > > > through >> > > > >>>> a >> > > > >>>>>> KStreamAggreagte and grab the current aggregate from >> storeName1. >> > > It >> > > > >>>> will >> > > > >>>>>> add its incoming object to the aggregate, update the store >> and >> > > pass >> > > > >> the >> > > > >>>>> new >> > > > >>>>>> aggregate on. This new aggregate goes through the >> KStreamCogroup >> > > > which >> > > > >>>> is >> > > > >>>>>> pretty much just a pass through processor and you are done. >> > > > >>>>>> >> > > > >>>>>> Topology wise for N incoming streams the new api will only >> every >> > > > >>>> create N >> > > > >>>>>> KStreamAggregates and 1 KStreamCogroup. >> > > > >>>>>> >> > > > >>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax < >> > > > >> matth...@confluent.io >> > > > >>>>> >> > > > >>>>>> wrote: >> > > > >>>>>> >> > > > >>>>>>> Kyle, >> > > > >>>>>>> >> > > > >>>>>>> thanks a lot for the KIP. Maybe I am a little slow, but I >> could >> > > not >> > > > >>>>>>> follow completely. Could you maybe add a more concrete >> example, >> > > > like >> > > > >>>> 3 >> > > > >>>>>>> streams with 3 records each (plus expected result), and show >> > the >> > > > >>>>>>> difference between current way to to implement it and the >> > > proposed >> > > > >>>> API? >> > > > >>>>>>> This could also cover the internal processing to see what >> store >> > > > calls >> > > > >>>>>>> would be required for both approaches etc. >> > > > >>>>>>> >> > > > >>>>>>> I think, it's pretty advanced stuff you propose, and it >> would >> > > help >> > > > to >> > > > >>>>>>> understand it better. >> > > > >>>>>>> >> > > > >>>>>>> Thanks a lot! >> > > > >>>>>>> >> > > > >>>>>>> >> > > > >>>>>>> -Matthias >> > > > >>>>>>> >> > > > >>>>>>> >> > > > >>>>>>> >> > > > >>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote: >> > > > >>>>>>>> I have made a pull request. It can be found here. >> > > > >>>>>>>> >> > > > >>>>>>>> https://github.com/apache/kafka/pull/2975 >> > > > >>>>>>>> >> > > > >>>>>>>> I plan to write some more unit tests for my classes and get >> > > around >> > > > >>>> to >> > > > >>>>>>>> writing documentation for the public api additions. >> > > > >>>>>>>> >> > > > >>>>>>>> One thing I was curious about is during the >> > > > >>>>>>> KCogroupedStreamImpl#aggregate >> > > > >>>>>>>> method I pass null to the KGroupedStream# >> > repartitionIfRequired >> > > > >>>>> method. >> > > > >>>>>> I >> > > > >>>>>>>> can't supply the store name because if more than one >> grouped >> > > > stream >> > > > >>>>>>>> repartitions an error is thrown. Is there some name that >> > someone >> > > > >>>> can >> > > > >>>>>>>> recommend or should I leave the null and allow it to fall >> back >> > > to >> > > > >>>> the >> > > > >>>>>>>> KGroupedStream.name? >> > > > >>>>>>>> >> > > > >>>>>>>> Should this be expanded to handle grouped tables? This >> would >> > be >> > > > >>>>> pretty >> > > > >>>>>>> easy >> > > > >>>>>>>> for a normal aggregate but one allowing session stores and >> > > > windowed >> > > > >>>>>>> stores >> > > > >>>>>>>> would required KTableSessionWindowAggregate and >> > > > >>>> KTableWindowAggregate >> > > > >>>>>>>> implementations. >> > > > >>>>>>>> >> > > > >>>>>>>> Thanks, >> > > > >>>>>>>> Kyle >> > > > >>>>>>>> >> > > > >>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" < >> > eno.there...@gmail.com> >> > > > >>>>> wrote: >> > > > >>>>>>>> >> > > > >>>>>>>>> I’ll look as well asap, sorry, been swamped. >> > > > >>>>>>>>> >> > > > >>>>>>>>> Eno >> > > > >>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy < >> > damian....@gmail.com> >> > > > >>>>> wrote: >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> Hi Kyle, >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> Thanks for the KIP. I apologize that i haven't had the >> > chance >> > > to >> > > > >>>>> look >> > > > >>>>>>> at >> > > > >>>>>>>>>> the KIP yet, but will schedule some time to look into it >> > > > >>>> tomorrow. >> > > > >>>>>> For >> > > > >>>>>>>>> the >> > > > >>>>>>>>>> implementation, can you raise a PR against kafka trunk >> and >> > > mark >> > > > >>>> it >> > > > >>>>> as >> > > > >>>>>>>>> WIP? >> > > > >>>>>>>>>> It will be easier to review what you have done. >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> Thanks, >> > > > >>>>>>>>>> Damian >> > > > >>>>>>>>>> >> > > > >>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman < >> > > > >>>>> winkelman.k...@gmail.com >> > > > >>>>>>> >> > > > >>>>>>>>> wrote: >> > > > >>>>>>>>>> >> > > > >>>>>>>>>>> I am replying to this in hopes it will draw some >> attention >> > to >> > > > my >> > > > >>>>> KIP >> > > > >>>>>>> as >> > > > >>>>>>>>> I >> > > > >>>>>>>>>>> haven't heard from anyone in a couple days. This is my >> > first >> > > > KIP >> > > > >>>>> and >> > > > >>>>>>> my >> > > > >>>>>>>>>>> first large contribution to the project so I'm sure I >> did >> > > > >>>>> something >> > > > >>>>>>>>> wrong. >> > > > >>>>>>>>>>> ;) >> > > > >>>>>>>>>>> >> > > > >>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" < >> > > > >>>>> winkelman.k...@gmail.com> >> > > > >>>>>>>>> wrote: >> > > > >>>>>>>>>>> >> > > > >>>>>>>>>>>> Hello all, >> > > > >>>>>>>>>>>> >> > > > >>>>>>>>>>>> I have created KIP-150 to facilitate discussion about >> > adding >> > > > >>>>>> cogroup >> > > > >>>>>>> to >> > > > >>>>>>>>>>>> the streams DSL. >> > > > >>>>>>>>>>>> >> > > > >>>>>>>>>>>> Please find the KIP here: >> > > > >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> > > > >>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup >> > > > >>>>>>>>>>>> >> > > > >>>>>>>>>>>> Please find my initial implementation here: >> > > > >>>>>>>>>>>> https://github.com/KyleWinkelman/kafka >> > > > >>>>>>>>>>>> >> > > > >>>>>>>>>>>> Thanks, >> > > > >>>>>>>>>>>> Kyle Winkelman >> > > > >>>>>>>>>>>> >> > > > >>>>>>>>>>> >> > > > >>>>>>>>> >> > > > >>>>>>>>> >> > > > >>>>>>>> >> > > > >>>>>>> >> > > > >>>>>>> >> > > > >>>>>> >> > > > >>>>> >> > > > >>>> >> > > > >> >> > > > >> >> > > > >> > > > >> > > >> > >> > > > > -- > -- Guozhang > -- -- Guozhang