I am occasionally seeing commits of offsets taking up to 30 seconds which is 
leading to a rebalance because the consumer hasn't called poll() to heartbeat. 
I currently have a "heartbeat" routine that runs periodically to handle that I 
have long processing times of data, and thus don't go back to Kafka to poll for 
new data often enough. I have synchronized all of this logic so that a commit() 
blocks the heartbeat method because the Kafka documentation says the consumer 
class is not thread safe. Does anyone know if it is safe for me to allow the 
heartbeats while a commit is going on, meaning can I remove the "synchronized 
lock"? My heartbeat logic looks like this and is run every 500ms as a TimerTask.


public void run() {

synchronized (lock) {

long timeDiff = System.currentTimeMillis() - lastPollTime;

if (timeDiff >= 2000) {

logger.info("Heartbeating, last was " + timeDiff);

consumer.pause(consumer.assignment().toArray(new TopicPartition[0]));

lastPollTime = System.currentTimeMillis();

consumer.poll(1);

consumer.resume(consumer.assignment().toArray(new TopicPartition[0]));

}

}

}


Reply via email to