Maybe https://issues.apache.org/jira/browse/KAFKA-4303?
On 2 Nov 2016 10:15 am, "Jaikiran Pai" <jai.forums2...@gmail.com> wrote: > We have been trying to narrow down an issue in 0.10.1 of Kafka in our > setups where our consumers are marked as dead very frequently causing > rebalances almost every few seconds. The consumer (Java new API) then > starts seeing exceptions like: > > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be > completed since the group has already rebalanced and assigned the > partitions to another member. This means that the time between subsequent > calls to poll() was longer than the configured max.poll.interval.ms, > which typically implies that the poll loop is spending too much time > message processing. You can address this either by increasing the session > timeout or by reducing the maximum size of batches returned in poll() with > max.poll.records. > at org.apache.kafka.clients.consumer.internals.ConsumerCoordina > tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:674) > ~[kafka-clients-0.10.1.0.jar!/:na] > at org.apache.kafka.clients.consumer.internals.ConsumerCoordina > tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:615) > ~[kafka-clients-0.10.1.0.jar!/:na] > at org.apache.kafka.clients.consumer.internals.AbstractCoordina > tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742) > ~[kafka-clients-0.10.1.0.jar!/:na] > at org.apache.kafka.clients.consumer.internals.AbstractCoordina > tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722) > ~[kafka-clients-0.10.1.0.jar!/:na] > at org.apache.kafka.clients.consumer.internals.RequestFuture$1. > onSuccess(RequestFuture.java:186) ~[kafka-clients-0.10.1.0.jar!/:na] > at org.apache.kafka.clients.consumer.internals.RequestFuture. > fireSuccess(RequestFuture.java:149) ~[kafka-clients-0.10.1.0.jar!/:na] > at org.apache.kafka.clients.consumer.internals.RequestFuture. > complete(RequestFuture.java:116) ~[kafka-clients-0.10.1.0.jar!/:na] > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC > lient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479) > ~[kafka-clients-0.10.1.0.jar!/:na] > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC > lient.firePendingCompletedRequests(ConsumerNetworkClient.java:316) > ~[kafka-clients-0.10.1.0.jar!/:na] > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC > lient.poll(ConsumerNetworkClient.java:256) ~[kafka-clients-0.10.1.0.jar!/ > :na] > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC > lient.poll(ConsumerNetworkClient.java:180) ~[kafka-clients-0.10.1.0.jar!/ > :na] > at org.apache.kafka.clients.consumer.internals.ConsumerCoordina > tor.commitOffsetsSync(ConsumerCoordinator.java:499) > ~[kafka-clients-0.10.1.0.jar!/:na] > > > Our session and heartbeat timeouts are defaults that ship in Kafka 0.10.1 > (i.e. we don't set any specific values). Every few seconds, we see messages > on the broker logs which indicate these consumers are considered dead: > > [2016-11-02 06:09:48,103] TRACE [GroupCoordinator 0]: Member > consumer-1-efde1e11-fdc6-4801-8fba-20d58b7a30b6 in group foo-bar has > failed (kafka.coordinator.GroupCoordinator) > [2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Preparing to > restabilize group foo-bar with old generation 1034 > (kafka.coordinator.GroupCoordinator) > [2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Group foo-bar with > generation 1035 is now empty (kafka.coordinator.GroupCoordinator) > .... > > These messages keep repeating for almost every other consumer we have (in > different groups). > > There's no real logic in our consumers and they just pick up the message > from partitions, commit the offset, and hand it immediately to a different > thread to process the message and go back to polling: > > while (!stopped) { > try { > final ConsumerRecords<K, V> consumerRecords = > consumer.poll(someValue); > for (final TopicPartition topicPartition : > consumerRecords.partitions()) { > if (stopped) { > break; > } > for (final ConsumerRecord<K, V> consumerRecord : > consumerRecords.records(topicPartition)) { > final long previousOffset = > consumerRecord.offset(); > // commit the offset and then pass on the > message for processing (in a separate thread) > consumer.commitSync(Collections.singletonMap(topicPartition, new > OffsetAndMetadata(previousOffset + 1))); > > this.executor.execute(new Runnable() { > @Override > public void run() { > // process the ConsumerRecord > } > }); > } > } > } catch (Exception e) { > // log the error and continue > continue; > } > } > > > > We haven't been able to figure out why the heartbeats wouldn't be sent by > the consumer in the expected time period. From my understanding of the > docs, the heartbeats are sent in the background thread for the consumer, so > there should be no real reason why these wouldn't be sent. > > > We debugged this a bit further and got some thread dumps from the JVM of > the consumers and here's what we see: > > "*kafka-coordinator-heartbeat-thread* | foo-bar #28 daemon prio=5 > os_prio=0 tid=0x00007f0d7c0ee000 nid=0x2e waiting for monitor entry > [0x00007f0dd54c7000] > java.lang.Thread.State: *BLOCKED* (on object monitor) > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC > lient.disableWakeups(ConsumerNetworkClient.java:409) > - *waiting to lock <0x00000000c0962bb0>* (a > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC > lient.pollNoWakeup(ConsumerNetworkClient.java:264) > at org.apache.kafka.clients.consumer.internals.AbstractCoordina > tor$HeartbeatThread.run(AbstractCoordinator.java:864) > - locked <0x00000000c0962578> (a org.apache.kafka.clients.consu > mer.internals.ConsumerCoordinator) > > So it looks like the heartbeat thread is *blocked* waiting for a object > lock and that lock is held by: > > > "thread-1" #27 daemon prio=5 os_prio=0 tid=0x00007f0dec3c1800 nid=0x27 > runnable [0x00007f0dcdffc000] > java.lang.Thread.State: RUNNABLE > at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) > at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) > - locked <0x00000000c063b820> (a sun.nio.ch.Util$3) > - locked <0x00000000c063b810> (a java.util.Collections$Unmodifi > ableSet) > - locked <0x00000000c05f9a70> (a sun.nio.ch.EPollSelectorImpl) > at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > at org.apache.kafka.common.network.Selector.select(Selector.java:470) > at org.apache.kafka.common.network.Selector.poll(Selector.java:286) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC > lient.poll(ConsumerNetworkClient.java:232) > - *locked* <*0x00000000c0962bb0*> (a org.apache.kafka.clients.consu > mer.internals.ConsumerNetworkClient) > at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce( > KafkaConsumer.java:1031) > at org.apache.kafka.clients.consumer.*KafkaConsumer*.*poll*( > KafkaConsumer.java:979) > at org.myapp.KafkaMessageReceiver.start(KafkaMessageReceiver.java:72) > > > So it looks like the consumer code which invokes the > *KafkaConsumer.poll*(...) API to fetch the messages is blocking the > heartbeat sender thread? Is this intentional? If so, wouldn't this delay > the heartbeats being sent and cause the heartbeat task on the coordinator > to expire as per this logic is see on the coordinator: > > private def shouldKeepMemberAlive(member: MemberMetadata, > heartbeatDeadline: Long) = > member.awaitingJoinCallback != null || > member.awaitingSyncCallback != null || > * member.latestHeartbeat + member.sessionTimeoutMs > > heartbeatDeadline** > * > from what I see and my limited understanding of this code, this would mark > the member dead (as seen in the logs). > > > Is this expected that the background heart beat sender thread would be > blocked by poll on the consumer (*our poll timeout is 2 minutes*)? Or did I > misread these logs and stacktraces? Let me know if more logs/details are > needed and I can get them. > > > -Jaikiran > >