I think this will help. Thanks.
On 8/12/16, 6:16 PM, "Guozhang Wang" <wangg...@gmail.com> wrote: >Hello Sean, > >We are adding a background thread doing heartbeats as part of the adopted >KIP-62: >https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread > >Does this resolve your issue? > > >Guozhang > >On Thu, Aug 11, 2016 at 2:47 PM, Sean Morris (semorris) <semor...@cisco.com> >wrote: > >> I am occasionally seeing commits of offsets taking up to 30 seconds which >> is leading to a rebalance because the consumer hasn't called poll() to >> heartbeat. I currently have a "heartbeat" routine that runs periodically to >> handle that I have long processing times of data, and thus don't go back to >> Kafka to poll for new data often enough. I have synchronized all of this >> logic so that a commit() blocks the heartbeat method because the Kafka >> documentation says the consumer class is not thread safe. Does anyone know >> if it is safe for me to allow the heartbeats while a commit is going on, >> meaning can I remove the "synchronized lock"? My heartbeat logic looks like >> this and is run every 500ms as a TimerTask. >> >> >> public void run() { >> >> synchronized (lock) { >> >> long timeDiff = System.currentTimeMillis() - lastPollTime; >> >> if (timeDiff >= 2000) { >> >> logger.info("Heartbeating, last was " + timeDiff); >> >> consumer.pause(consumer.assignment().toArray(new TopicPartition[0])); >> >> lastPollTime = System.currentTimeMillis(); >> >> consumer.poll(1); >> >> consumer.resume(consumer.assignment().toArray(new TopicPartition[0])); >> >> } >> >> } >> >> } >> >> >> > > >-- >-- Guozhang