Hello Sean,

We are adding a background thread doing heartbeats as part of the adopted
KIP-62:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread

Does this resolve your issue?


Guozhang

On Thu, Aug 11, 2016 at 2:47 PM, Sean Morris (semorris) <semor...@cisco.com>
wrote:

> I am occasionally seeing commits of offsets taking up to 30 seconds which
> is leading to a rebalance because the consumer hasn't called poll() to
> heartbeat. I currently have a "heartbeat" routine that runs periodically to
> handle that I have long processing times of data, and thus don't go back to
> Kafka to poll for new data often enough. I have synchronized all of this
> logic so that a commit() blocks the heartbeat method because the Kafka
> documentation says the consumer class is not thread safe. Does anyone know
> if it is safe for me to allow the heartbeats while a commit is going on,
> meaning can I remove the "synchronized lock"? My heartbeat logic looks like
> this and is run every 500ms as a TimerTask.
>
>
> public void run() {
>
> synchronized (lock) {
>
> long timeDiff = System.currentTimeMillis() - lastPollTime;
>
> if (timeDiff >= 2000) {
>
> logger.info("Heartbeating, last was " + timeDiff);
>
> consumer.pause(consumer.assignment().toArray(new TopicPartition[0]));
>
> lastPollTime = System.currentTimeMillis();
>
> consumer.poll(1);
>
> consumer.resume(consumer.assignment().toArray(new TopicPartition[0]));
>
> }
>
> }
>
> }
>
>
>


-- 
-- Guozhang

Reply via email to