On a first pass, the code has a small bug in getConsumedOffset(). You need to close the channel after getting the coordinator : // broker = cmr.coordinator(); and then reconnect to coordinator as follows :
broker = cmr.coordinator(); // if the coordinator is different, from the above channel's host then reconnect *channel.disconnect();* * channel = new BlockingChannel(broker.host(), broker.port(),* * BlockingChannel.UseDefaultBufferSize(),* * BlockingChannel.UseDefaultBufferSize(),* * 5000 /* read timeout in millis */);* *channel.connect();* ...... Same goes for commitOffsets(). Thanks, Mayuresh On Tue, Mar 31, 2015 at 11:52 AM, Madhukar Bharti <bhartimadhu...@gmail.com> wrote: > Hi Mayuresh, > > Have you gone through that code? Please help me in that. If it will work > for high level consumer then will plan accordingly to store offset of > low-level consumer too in this offset topic. Will that work if I will give > some group name for low-level consumers also but process it partition wise > as Simple consumers are there. > > Thanks and awaiting for your response! > > On Mon, Mar 30, 2015 at 9:45 PM, Mayuresh Gharat < > gharatmayures...@gmail.com> wrote: > >> Cool. Will try reviewing it today and get back :) >> >> Thanks, >> >> Mayuresh >> >> On Mon, Mar 30, 2015 at 2:53 AM, Madhukar Bharti < >> bhartimadhu...@gmail.com> wrote: >> >>> Hi Mayuresh, >>> >>> Thanks for your quick response! >>> >>> I have tried to use offset manager for fetch/commit request. I have set >>> "dua.commit.enable" to false and using offset.storage=kafka. >>> >>> But I am not able to commit the offset. My code is here >>> <https://github.com/madhukarbharti/kafka-8.2.1-test/tree/master/src/com/bharti/kafka/offset> >>> . >>> >>> Kindly check this. Am I missing anything. I am running with single >>> broker. >>> >>> Thanks! >>> >>> On Fri, Mar 27, 2015 at 10:06 PM, Mayuresh Gharat < >>> gharatmayures...@gmail.com> wrote: >>> >>>> In your case you are trying to issue an offsetRequest and not a >>>> fetchOffsetRequest. I know this is little confusing. >>>> >>>> Let me point you to a scala patch which has a client for doing fetch >>>> offset and commit offset. >>>> >>>> I am going to rewrite that in java. Here is the Kafka ticket : >>>> >>>> https://issues.apache.org/jira/browse/KAFKA-1013 >>>> >>>> You can look at the RB and see how it is done. If you have any further >>>> questions I will be happy to answer them. >>>> >>>> Thanks, >>>> >>>> Mayuresh >>>> >>>> >>>> On Fri, Mar 27, 2015 at 9:30 AM, Mayuresh Gharat < >>>> gharatmayures...@gmail.com> wrote: >>>> >>>>> Other thing is if you are using SimpleConsumer, it is up to your app >>>>> to do the offsetManagement. The ZK based offsets or Kafka based offsets >>>>> will work if you are using the HighLevel Consumer. >>>>> >>>>> Thanks, >>>>> >>>>> Mayuresh >>>>> >>>>> On Fri, Mar 27, 2015 at 9:17 AM, Mayuresh Gharat < >>>>> gharatmayures...@gmail.com> wrote: >>>>> >>>>>> Hi Madhukar, >>>>>> >>>>>> I am going through your code now. Let me see what I can find. >>>>>> >>>>>> Where were you storing your offsets before? >>>>>> Was it always Zookeeper or was it Kafka? >>>>>> If it was Zookeeper, the correct way to migrate from zookeeper to >>>>>> kafka based offsets is this : >>>>>> >>>>>> 1) Config Change : >>>>>> - offsets.storage = kafka >>>>>> - dual.commit.enabled = true >>>>>> 2) Rolling Bounce >>>>>> 3) Config Change : >>>>>> - dual.commit.enabled=false >>>>>> 4) Rolling Bounce. >>>>>> >>>>>> For more info on Offset Management, you can also refer these slides >>>>>> from Kafka Meetup: >>>>>> http://www.slideshare.net/jjkoshy/offset-management-in-kafka >>>>>> >>>>>> >>>>>> Apart from that for using Kafka based offsets, to do a >>>>>> fetchOffsetRequest or commit offset request you don't need a consumer. >>>>>> You >>>>>> need to know the groupId. You need to connect to kafka, issue a >>>>>> consumerMetaData Request. This will fetch you the OffsetManager for that >>>>>> groupId. You can then issue the fetch or commit request to that >>>>>> OffsetManager. >>>>>> >>>>>> BTW, we are coming up with an offsetClient soon. >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Mayuresh >>>>>> >>>>>> On Fri, Mar 27, 2015 at 1:53 AM, Madhukar Bharti < >>>>>> bhartimadhu...@gmail.com> wrote: >>>>>> >>>>>>> Hi Mayuresh, >>>>>>> >>>>>>> Please check this >>>>>>> <https://github.com/madhukarbharti/kafka-8.2.1-test/blob/master/src/com/bharti/kafka/offset/OffsetRequester.java> >>>>>>> program. >>>>>>> Am I doing any mistake? >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> >>>>>>> On Thu, Mar 26, 2015 at 6:27 PM, Madhukar Bharti < >>>>>>> bhartimadhu...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Mayuresh, >>>>>>>> >>>>>>>> I have tried to fetch the offset using OffsetFetchRequest as given >>>>>>>> in this wiki >>>>>>>> >>>>>>>> >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka >>>>>>>> >>>>>>>> But It only works if we set "dual.commit.enabled" to "true" and >>>>>>>> "offsets.storage" to "kafka". Otherwise it returns -1. >>>>>>>> >>>>>>>> Do I need to change anything? >>>>>>>> >>>>>>>> >>>>>>>> Thanks in advance! >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> -Regards, >>>>>> Mayuresh R. Gharat >>>>>> (862) 250-7125 >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> -Regards, >>>>> Mayuresh R. Gharat >>>>> (862) 250-7125 >>>>> >>>> >>>> >>>> >>>> -- >>>> -Regards, >>>> Mayuresh R. Gharat >>>> (862) 250-7125 >>>> >>> >>> >> >> >> -- >> -Regards, >> Mayuresh R. Gharat >> (862) 250-7125 >> > > -- -Regards, Mayuresh R. Gharat (862) 250-7125