On a first pass, the code has a small bug in getConsumedOffset().

You need to close the channel after getting the coordinator   : // broker =
cmr.coordinator();
 and then reconnect to coordinator as follows :

      broker = cmr.coordinator();
// if the coordinator is different, from the above channel's host then
reconnect
    *channel.disconnect();*
*    channel = new BlockingChannel(broker.host(), broker.port(),*
*
BlockingChannel.UseDefaultBufferSize(),*
*
BlockingChannel.UseDefaultBufferSize(),*
*                                          5000 /* read timeout in millis
*/);*
    *channel.connect();*
......


Same goes for commitOffsets().



Thanks,

Mayuresh


On Tue, Mar 31, 2015 at 11:52 AM, Madhukar Bharti <bhartimadhu...@gmail.com>
wrote:

> Hi Mayuresh,
>
> Have you gone through that code? Please help me in that. If it will work
> for high level consumer then will plan accordingly to store offset of
> low-level consumer too in this offset topic. Will that work if I will give
> some group name for low-level consumers also but process it partition wise
> as Simple consumers are there.
>
> Thanks and awaiting for your response!
>
> On Mon, Mar 30, 2015 at 9:45 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
>> Cool. Will try reviewing it today and get back :)
>>
>> Thanks,
>>
>> Mayuresh
>>
>> On Mon, Mar 30, 2015 at 2:53 AM, Madhukar Bharti <
>> bhartimadhu...@gmail.com> wrote:
>>
>>> Hi Mayuresh,
>>>
>>> Thanks for your quick response!
>>>
>>> I have tried to use offset manager for fetch/commit request. I have set
>>> "dua.commit.enable" to false and using offset.storage=kafka.
>>>
>>> But I am not able to commit the offset. My code is here
>>> <https://github.com/madhukarbharti/kafka-8.2.1-test/tree/master/src/com/bharti/kafka/offset>
>>> .
>>>
>>> Kindly check this. Am I missing anything. I am running with single
>>> broker.
>>>
>>> Thanks!
>>>
>>> On Fri, Mar 27, 2015 at 10:06 PM, Mayuresh Gharat <
>>> gharatmayures...@gmail.com> wrote:
>>>
>>>> In your case you are trying to issue an offsetRequest and not a
>>>> fetchOffsetRequest. I know this is little confusing.
>>>>
>>>> Let me point you to a scala patch which has a client for doing fetch
>>>> offset and commit offset.
>>>>
>>>> I am going to rewrite that in java. Here is the Kafka ticket :
>>>>
>>>> https://issues.apache.org/jira/browse/KAFKA-1013
>>>>
>>>> You can look at the RB and see how it is done. If you have any further
>>>> questions I will be happy to answer them.
>>>>
>>>> Thanks,
>>>>
>>>> Mayuresh
>>>>
>>>>
>>>> On Fri, Mar 27, 2015 at 9:30 AM, Mayuresh Gharat <
>>>> gharatmayures...@gmail.com> wrote:
>>>>
>>>>> Other thing is if you are using SimpleConsumer, it is up to your app
>>>>> to do the offsetManagement. The ZK based offsets or Kafka based offsets
>>>>> will work if you are using the HighLevel Consumer.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Mayuresh
>>>>>
>>>>> On Fri, Mar 27, 2015 at 9:17 AM, Mayuresh Gharat <
>>>>> gharatmayures...@gmail.com> wrote:
>>>>>
>>>>>> Hi Madhukar,
>>>>>>
>>>>>> I am going through your code now. Let me see what I can find.
>>>>>>
>>>>>> Where were you storing your offsets before?
>>>>>> Was it always Zookeeper or was it Kafka?
>>>>>> If it was Zookeeper, the correct way to migrate from zookeeper to
>>>>>> kafka based offsets is this :
>>>>>>
>>>>>> 1) Config Change :
>>>>>>      - offsets.storage = kafka
>>>>>>      - dual.commit.enabled = true
>>>>>> 2) Rolling Bounce
>>>>>> 3) Config Change :
>>>>>>      - dual.commit.enabled=false
>>>>>> 4) Rolling Bounce.
>>>>>>
>>>>>> For more info on Offset Management, you can also refer these slides
>>>>>> from Kafka Meetup:
>>>>>> http://www.slideshare.net/jjkoshy/offset-management-in-kafka
>>>>>>
>>>>>>
>>>>>> Apart from that for using Kafka based offsets, to do a
>>>>>> fetchOffsetRequest or commit offset request you don't need a consumer. 
>>>>>> You
>>>>>> need to know the groupId. You need to connect to kafka, issue a
>>>>>> consumerMetaData Request. This will fetch you the OffsetManager for that
>>>>>> groupId. You can then issue the fetch or commit request to that
>>>>>> OffsetManager.
>>>>>>
>>>>>> BTW, we are coming up with an offsetClient soon.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Mayuresh
>>>>>>
>>>>>> On Fri, Mar 27, 2015 at 1:53 AM, Madhukar Bharti <
>>>>>> bhartimadhu...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Mayuresh,
>>>>>>>
>>>>>>> Please check this
>>>>>>> <https://github.com/madhukarbharti/kafka-8.2.1-test/blob/master/src/com/bharti/kafka/offset/OffsetRequester.java>
>>>>>>>  program.
>>>>>>> Am I doing any mistake?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 26, 2015 at 6:27 PM, Madhukar Bharti <
>>>>>>> bhartimadhu...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Mayuresh,
>>>>>>>>
>>>>>>>> I have tried to fetch the offset using OffsetFetchRequest as given
>>>>>>>> in this wiki
>>>>>>>>
>>>>>>>>
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
>>>>>>>>
>>>>>>>> But It only works if we set "dual.commit.enabled" to "true" and
>>>>>>>> "offsets.storage" to "kafka". Otherwise it returns -1.
>>>>>>>>
>>>>>>>> Do I need to change anything?
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks in advance!
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -Regards,
>>>>>> Mayuresh R. Gharat
>>>>>> (862) 250-7125
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> -Regards,
>>>>> Mayuresh R. Gharat
>>>>> (862) 250-7125
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> -Regards,
>>>> Mayuresh R. Gharat
>>>> (862) 250-7125
>>>>
>>>
>>>
>>
>>
>> --
>> -Regards,
>> Mayuresh R. Gharat
>> (862) 250-7125
>>
>
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Reply via email to