I think this will help.

Thanks.




On 8/12/16, 6:16 PM, "Guozhang Wang" <wangg...@gmail.com> wrote:

>Hello Sean,
>
>We are adding a background thread doing heartbeats as part of the adopted
>KIP-62:
>https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
>
>Does this resolve your issue?
>
>
>Guozhang
>
>On Thu, Aug 11, 2016 at 2:47 PM, Sean Morris (semorris) <semor...@cisco.com>
>wrote:
>
>> I am occasionally seeing commits of offsets taking up to 30 seconds which
>> is leading to a rebalance because the consumer hasn't called poll() to
>> heartbeat. I currently have a "heartbeat" routine that runs periodically to
>> handle that I have long processing times of data, and thus don't go back to
>> Kafka to poll for new data often enough. I have synchronized all of this
>> logic so that a commit() blocks the heartbeat method because the Kafka
>> documentation says the consumer class is not thread safe. Does anyone know
>> if it is safe for me to allow the heartbeats while a commit is going on,
>> meaning can I remove the "synchronized lock"? My heartbeat logic looks like
>> this and is run every 500ms as a TimerTask.
>>
>>
>> public void run() {
>>
>> synchronized (lock) {
>>
>> long timeDiff = System.currentTimeMillis() - lastPollTime;
>>
>> if (timeDiff >= 2000) {
>>
>> logger.info("Heartbeating, last was " + timeDiff);
>>
>> consumer.pause(consumer.assignment().toArray(new TopicPartition[0]));
>>
>> lastPollTime = System.currentTimeMillis();
>>
>> consumer.poll(1);
>>
>> consumer.resume(consumer.assignment().toArray(new TopicPartition[0]));
>>
>> }
>>
>> }
>>
>> }
>>
>>
>>
>
>
>-- 
>-- Guozhang

Reply via email to