We have been trying to narrow down an issue in 0.10.1 of Kafka in our
setups where our consumers are marked as dead very frequently causing
rebalances almost every few seconds. The consumer (Java new API) then
starts seeing exceptions like:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
be completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between
subsequent calls to poll() was longer than the configured
max.poll.interval.ms, which typically implies that the poll loop is
spending too much time message processing. You can address this either
by increasing the session timeout or by reducing the maximum size of
batches returned in poll() with max.poll.records.
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:674)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:615)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:499)
~[kafka-clients-0.10.1.0.jar!/:na]
Our session and heartbeat timeouts are defaults that ship in Kafka
0.10.1 (i.e. we don't set any specific values). Every few seconds, we
see messages on the broker logs which indicate these consumers are
considered dead:
[2016-11-02 06:09:48,103] TRACE [GroupCoordinator 0]: Member
consumer-1-efde1e11-fdc6-4801-8fba-20d58b7a30b6 in group foo-bar has
failed (kafka.coordinator.GroupCoordinator)
[2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Preparing to
restabilize group foo-bar with old generation 1034
(kafka.coordinator.GroupCoordinator)
[2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Group foo-bar with
generation 1035 is now empty (kafka.coordinator.GroupCoordinator)
....
These messages keep repeating for almost every other consumer we have
(in different groups).
There's no real logic in our consumers and they just pick up the message
from partitions, commit the offset, and hand it immediately to a
different thread to process the message and go back to polling:
while (!stopped) {
try {
final ConsumerRecords<K, V> consumerRecords =
consumer.poll(someValue);
for (final TopicPartition topicPartition :
consumerRecords.partitions()) {
if (stopped) {
break;
}
for (final ConsumerRecord<K, V> consumerRecord
: consumerRecords.records(topicPartition)) {
final long previousOffset =
consumerRecord.offset();
// commit the offset and then pass on the
message for processing (in a separate thread)
consumer.commitSync(Collections.singletonMap(topicPartition, new
OffsetAndMetadata(previousOffset + 1)));
this.executor.execute(new Runnable() {
@Override
public void run() {
// process the ConsumerRecord
}
});
}
}
} catch (Exception e) {
// log the error and continue
continue;
}
}
We haven't been able to figure out why the heartbeats wouldn't be sent
by the consumer in the expected time period. From my understanding of
the docs, the heartbeats are sent in the background thread for the
consumer, so there should be no real reason why these wouldn't be sent.
We debugged this a bit further and got some thread dumps from the JVM of
the consumers and here's what we see:
"*kafka-coordinator-heartbeat-thread* | foo-bar #28 daemon prio=5
os_prio=0 tid=0x00007f0d7c0ee000 nid=0x2e waiting for monitor entry
[0x00007f0dd54c7000]
java.lang.Thread.State: *BLOCKED* (on object monitor)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disableWakeups(ConsumerNetworkClient.java:409)
- *waiting to lock <0x00000000c0962bb0>* (a
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:264)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:864)
- locked <0x00000000c0962578> (a
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
So it looks like the heartbeat thread is *blocked* waiting for a object
lock and that lock is held by:
"thread-1" #27 daemon prio=5 os_prio=0 tid=0x00007f0dec3c1800 nid=0x27
runnable [0x00007f0dcdffc000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000c063b820> (a sun.nio.ch.Util$3)
- locked <0x00000000c063b810> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000c05f9a70> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:470)
at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
- *locked* <*0x00000000c0962bb0*> (a
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1031)
at
org.apache.kafka.clients.consumer.*KafkaConsumer*.*poll*(KafkaConsumer.java:979)
at org.myapp.KafkaMessageReceiver.start(KafkaMessageReceiver.java:72)
So it looks like the consumer code which invokes the
*KafkaConsumer.poll*(...) API to fetch the messages is blocking the
heartbeat sender thread? Is this intentional? If so, wouldn't this delay
the heartbeats being sent and cause the heartbeat task on the
coordinator to expire as per this logic is see on the coordinator:
private def shouldKeepMemberAlive(member: MemberMetadata,
heartbeatDeadline: Long) =
member.awaitingJoinCallback != null ||
member.awaitingSyncCallback != null ||
* member.latestHeartbeat + member.sessionTimeoutMs >
heartbeatDeadline**
*
from what I see and my limited understanding of this code, this would
mark the member dead (as seen in the logs).
Is this expected that the background heart beat sender thread would be
blocked by poll on the consumer (*our poll timeout is 2 minutes*)? Or
did I misread these logs and stacktraces? Let me know if more
logs/details are needed and I can get them.
-Jaikiran