We have been trying to narrow down an issue in 0.10.1 of Kafka in our setups where our consumers are marked as dead very frequently causing rebalances almost every few seconds. The consumer (Java new API) then starts seeing exceptions like:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:674) ~[kafka-clients-0.10.1.0.jar!/:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:615) ~[kafka-clients-0.10.1.0.jar!/:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742) ~[kafka-clients-0.10.1.0.jar!/:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722) ~[kafka-clients-0.10.1.0.jar!/:na] at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) ~[kafka-clients-0.10.1.0.jar!/:na] at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) ~[kafka-clients-0.10.1.0.jar!/:na] at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) ~[kafka-clients-0.10.1.0.jar!/:na] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479) ~[kafka-clients-0.10.1.0.jar!/:na] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316) ~[kafka-clients-0.10.1.0.jar!/:na] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256) ~[kafka-clients-0.10.1.0.jar!/:na] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180) ~[kafka-clients-0.10.1.0.jar!/:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:499) ~[kafka-clients-0.10.1.0.jar!/:na]


Our session and heartbeat timeouts are defaults that ship in Kafka 0.10.1 (i.e. we don't set any specific values). Every few seconds, we see messages on the broker logs which indicate these consumers are considered dead:

[2016-11-02 06:09:48,103] TRACE [GroupCoordinator 0]: Member consumer-1-efde1e11-fdc6-4801-8fba-20d58b7a30b6 in group foo-bar has failed (kafka.coordinator.GroupCoordinator) [2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Preparing to restabilize group foo-bar with old generation 1034 (kafka.coordinator.GroupCoordinator) [2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Group foo-bar with generation 1035 is now empty (kafka.coordinator.GroupCoordinator)
....

These messages keep repeating for almost every other consumer we have (in different groups).

There's no real logic in our consumers and they just pick up the message from partitions, commit the offset, and hand it immediately to a different thread to process the message and go back to polling:

       while (!stopped) {
                try {
final ConsumerRecords<K, V> consumerRecords = consumer.poll(someValue); for (final TopicPartition topicPartition : consumerRecords.partitions()) {
                        if (stopped) {
                            break;
                        }
for (final ConsumerRecord<K, V> consumerRecord : consumerRecords.records(topicPartition)) { final long previousOffset = consumerRecord.offset(); // commit the offset and then pass on the message for processing (in a separate thread) consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(previousOffset + 1)));

                            this.executor.execute(new Runnable() {
                                @Override
                                public void run() {
                                    // process the ConsumerRecord
                                }
                            });
                        }
                    }
                } catch (Exception e) {
                    // log the error and continue
                    continue;
                }
            }



We haven't been able to figure out why the heartbeats wouldn't be sent by the consumer in the expected time period. From my understanding of the docs, the heartbeats are sent in the background thread for the consumer, so there should be no real reason why these wouldn't be sent.


We debugged this a bit further and got some thread dumps from the JVM of the consumers and here's what we see:

"*kafka-coordinator-heartbeat-thread* | foo-bar #28 daemon prio=5 os_prio=0 tid=0x00007f0d7c0ee000 nid=0x2e waiting for monitor entry [0x00007f0dd54c7000]
   java.lang.Thread.State: *BLOCKED* (on object monitor)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disableWakeups(ConsumerNetworkClient.java:409) - *waiting to lock <0x00000000c0962bb0>* (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:264) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:864) - locked <0x00000000c0962578> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

So it looks like the heartbeat thread is *blocked* waiting for a object lock and that lock is held by:


"thread-1" #27 daemon prio=5 os_prio=0 tid=0x00007f0dec3c1800 nid=0x27 runnable [0x00007f0dcdffc000]
   java.lang.Thread.State: RUNNABLE
    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
    - locked <0x00000000c063b820> (a sun.nio.ch.Util$3)
    - locked <0x00000000c063b810> (a java.util.Collections$UnmodifiableSet)
    - locked <0x00000000c05f9a70> (a sun.nio.ch.EPollSelectorImpl)
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
    at org.apache.kafka.common.network.Selector.select(Selector.java:470)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) - *locked* <*0x00000000c0962bb0*> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1031) at org.apache.kafka.clients.consumer.*KafkaConsumer*.*poll*(KafkaConsumer.java:979)
    at org.myapp.KafkaMessageReceiver.start(KafkaMessageReceiver.java:72)


So it looks like the consumer code which invokes the *KafkaConsumer.poll*(...) API to fetch the messages is blocking the heartbeat sender thread? Is this intentional? If so, wouldn't this delay the heartbeats being sent and cause the heartbeat task on the coordinator to expire as per this logic is see on the coordinator:

private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) =
    member.awaitingJoinCallback != null ||
      member.awaitingSyncCallback != null ||
* member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline**
*
from what I see and my limited understanding of this code, this would mark the member dead (as seen in the logs).


Is this expected that the background heart beat sender thread would be blocked by poll on the consumer (*our poll timeout is 2 minutes*)? Or did I misread these logs and stacktraces? Let me know if more logs/details are needed and I can get them.


-Jaikiran

Reply via email to