I am occasionally seeing commits of offsets taking up to 30 seconds which is leading to a rebalance because the consumer hasn't called poll() to heartbeat. I currently have a "heartbeat" routine that runs periodically to handle that I have long processing times of data, and thus don't go back to Kafka to poll for new data often enough. I have synchronized all of this logic so that a commit() blocks the heartbeat method because the Kafka documentation says the consumer class is not thread safe. Does anyone know if it is safe for me to allow the heartbeats while a commit is going on, meaning can I remove the "synchronized lock"? My heartbeat logic looks like this and is run every 500ms as a TimerTask.
public void run() { synchronized (lock) { long timeDiff = System.currentTimeMillis() - lastPollTime; if (timeDiff >= 2000) { logger.info("Heartbeating, last was " + timeDiff); consumer.pause(consumer.assignment().toArray(new TopicPartition[0])); lastPollTime = System.currentTimeMillis(); consumer.poll(1); consumer.resume(consumer.assignment().toArray(new TopicPartition[0])); } } }