Thanks Ismael. Just checked, that one doesn't look like it's the same
issue, but could be a similar one. In that JIRA it looks like the issue
was probably addressed for the commitSync call. However, in this
specific instance the KafkaConsumer.poll(...) itself leads to locking
the object monitor of on the ConsumerNetworkClient. The heart beat
thread in the background seems to be waiting to get hold of that object
monitor and blocks on it.
If I keep aside the implementation details, what is the expected
semantics with heart beat background thread - would it fail to send a
heartbeat for a consumer if the consumer is currently busy with poll(),
commitSync() or any similar call? If so, would this lack of heartbeat
being sent (for a while) cause that member to be considered dead by the
co-ordinator. My reading of the logs and the limited knowledge of Kafka
code seems to indicate that this is what's happening, either as per
expected semantics or a possible bug.
-Jaikiran
On Wednesday 02 November 2016 08:39 PM, Ismael Juma wrote:
Maybe https://issues.apache.org/jira/browse/KAFKA-4303?
On 2 Nov 2016 10:15 am, "Jaikiran Pai" <jai.forums2...@gmail.com> wrote:
We have been trying to narrow down an issue in 0.10.1 of Kafka in our
setups where our consumers are marked as dead very frequently causing
rebalances almost every few seconds. The consumer (Java new API) then
starts seeing exceptions like:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between subsequent
calls to poll() was longer than the configured max.poll.interval.ms,
which typically implies that the poll loop is spending too much time
message processing. You can address this either by increasing the session
timeout or by reducing the maximum size of batches returned in poll() with
max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:674)
~[kafka-clients-0.10.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:615)
~[kafka-clients-0.10.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordina
tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
~[kafka-clients-0.10.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordina
tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
~[kafka-clients-0.10.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.
onSuccess(RequestFuture.java:186) ~[kafka-clients-0.10.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.
fireSuccess(RequestFuture.java:149) ~[kafka-clients-0.10.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.
complete(RequestFuture.java:116) ~[kafka-clients-0.10.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
~[kafka-clients-0.10.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
~[kafka-clients-0.10.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient.poll(ConsumerNetworkClient.java:256) ~[kafka-clients-0.10.1.0.jar!/
:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient.poll(ConsumerNetworkClient.java:180) ~[kafka-clients-0.10.1.0.jar!/
:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
tor.commitOffsetsSync(ConsumerCoordinator.java:499)
~[kafka-clients-0.10.1.0.jar!/:na]
Our session and heartbeat timeouts are defaults that ship in Kafka 0.10.1
(i.e. we don't set any specific values). Every few seconds, we see messages
on the broker logs which indicate these consumers are considered dead:
[2016-11-02 06:09:48,103] TRACE [GroupCoordinator 0]: Member
consumer-1-efde1e11-fdc6-4801-8fba-20d58b7a30b6 in group foo-bar has
failed (kafka.coordinator.GroupCoordinator)
[2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Preparing to
restabilize group foo-bar with old generation 1034
(kafka.coordinator.GroupCoordinator)
[2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Group foo-bar with
generation 1035 is now empty (kafka.coordinator.GroupCoordinator)
....
These messages keep repeating for almost every other consumer we have (in
different groups).
There's no real logic in our consumers and they just pick up the message
from partitions, commit the offset, and hand it immediately to a different
thread to process the message and go back to polling:
while (!stopped) {
try {
final ConsumerRecords<K, V> consumerRecords =
consumer.poll(someValue);
for (final TopicPartition topicPartition :
consumerRecords.partitions()) {
if (stopped) {
break;
}
for (final ConsumerRecord<K, V> consumerRecord :
consumerRecords.records(topicPartition)) {
final long previousOffset =
consumerRecord.offset();
// commit the offset and then pass on the
message for processing (in a separate thread)
consumer.commitSync(Collections.singletonMap(topicPartition, new
OffsetAndMetadata(previousOffset + 1)));
this.executor.execute(new Runnable() {
@Override
public void run() {
// process the ConsumerRecord
}
});
}
}
} catch (Exception e) {
// log the error and continue
continue;
}
}
We haven't been able to figure out why the heartbeats wouldn't be sent by
the consumer in the expected time period. From my understanding of the
docs, the heartbeats are sent in the background thread for the consumer, so
there should be no real reason why these wouldn't be sent.
We debugged this a bit further and got some thread dumps from the JVM of
the consumers and here's what we see:
"*kafka-coordinator-heartbeat-thread* | foo-bar #28 daemon prio=5
os_prio=0 tid=0x00007f0d7c0ee000 nid=0x2e waiting for monitor entry
[0x00007f0dd54c7000]
java.lang.Thread.State: *BLOCKED* (on object monitor)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient.disableWakeups(ConsumerNetworkClient.java:409)
- *waiting to lock <0x00000000c0962bb0>* (a
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient.pollNoWakeup(ConsumerNetworkClient.java:264)
at org.apache.kafka.clients.consumer.internals.AbstractCoordina
tor$HeartbeatThread.run(AbstractCoordinator.java:864)
- locked <0x00000000c0962578> (a org.apache.kafka.clients.consu
mer.internals.ConsumerCoordinator)
So it looks like the heartbeat thread is *blocked* waiting for a object
lock and that lock is held by:
"thread-1" #27 daemon prio=5 os_prio=0 tid=0x00007f0dec3c1800 nid=0x27
runnable [0x00007f0dcdffc000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000c063b820> (a sun.nio.ch.Util$3)
- locked <0x00000000c063b810> (a java.util.Collections$Unmodifi
ableSet)
- locked <0x00000000c05f9a70> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:470)
at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient.poll(ConsumerNetworkClient.java:232)
- *locked* <*0x00000000c0962bb0*> (a org.apache.kafka.clients.consu
mer.internals.ConsumerNetworkClient)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
KafkaConsumer.java:1031)
at org.apache.kafka.clients.consumer.*KafkaConsumer*.*poll*(
KafkaConsumer.java:979)
at org.myapp.KafkaMessageReceiver.start(KafkaMessageReceiver.java:72)
So it looks like the consumer code which invokes the
*KafkaConsumer.poll*(...) API to fetch the messages is blocking the
heartbeat sender thread? Is this intentional? If so, wouldn't this delay
the heartbeats being sent and cause the heartbeat task on the coordinator
to expire as per this logic is see on the coordinator:
private def shouldKeepMemberAlive(member: MemberMetadata,
heartbeatDeadline: Long) =
member.awaitingJoinCallback != null ||
member.awaitingSyncCallback != null ||
* member.latestHeartbeat + member.sessionTimeoutMs >
heartbeatDeadline**
*
from what I see and my limited understanding of this code, this would mark
the member dead (as seen in the logs).
Is this expected that the background heart beat sender thread would be
blocked by poll on the consumer (*our poll timeout is 2 minutes*)? Or did I
misread these logs and stacktraces? Let me know if more logs/details are
needed and I can get them.
-Jaikiran