Maybe https://issues.apache.org/jira/browse/KAFKA-4303?

On 2 Nov 2016 10:15 am, "Jaikiran Pai" <jai.forums2...@gmail.com> wrote:

> We have been trying to narrow down an issue in 0.10.1 of Kafka in our
> setups where our consumers are marked as dead very frequently causing
> rebalances almost every few seconds. The consumer (Java new API) then
> starts seeing exceptions like:
>
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed since the group has already rebalanced and assigned the
> partitions to another member. This means that the time between subsequent
> calls to poll() was longer than the configured max.poll.interval.ms,
> which typically implies that the poll loop is spending too much time
> message processing. You can address this either by increasing the session
> timeout or by reducing the maximum size of batches returned in poll() with
> max.poll.records.
>     at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:674)
> ~[kafka-clients-0.10.1.0.jar!/:na]
>     at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:615)
> ~[kafka-clients-0.10.1.0.jar!/:na]
>     at org.apache.kafka.clients.consumer.internals.AbstractCoordina
> tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
> ~[kafka-clients-0.10.1.0.jar!/:na]
>     at org.apache.kafka.clients.consumer.internals.AbstractCoordina
> tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
> ~[kafka-clients-0.10.1.0.jar!/:na]
>     at org.apache.kafka.clients.consumer.internals.RequestFuture$1.
> onSuccess(RequestFuture.java:186) ~[kafka-clients-0.10.1.0.jar!/:na]
>     at org.apache.kafka.clients.consumer.internals.RequestFuture.
> fireSuccess(RequestFuture.java:149) ~[kafka-clients-0.10.1.0.jar!/:na]
>     at org.apache.kafka.clients.consumer.internals.RequestFuture.
> complete(RequestFuture.java:116) ~[kafka-clients-0.10.1.0.jar!/:na]
>     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
> ~[kafka-clients-0.10.1.0.jar!/:na]
>     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
> ~[kafka-clients-0.10.1.0.jar!/:na]
>     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.poll(ConsumerNetworkClient.java:256) ~[kafka-clients-0.10.1.0.jar!/
> :na]
>     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.poll(ConsumerNetworkClient.java:180) ~[kafka-clients-0.10.1.0.jar!/
> :na]
>     at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> tor.commitOffsetsSync(ConsumerCoordinator.java:499)
> ~[kafka-clients-0.10.1.0.jar!/:na]
>
>
> Our session and heartbeat timeouts are defaults that ship in Kafka 0.10.1
> (i.e. we don't set any specific values). Every few seconds, we see messages
> on the broker logs which indicate these consumers are considered dead:
>
> [2016-11-02 06:09:48,103] TRACE [GroupCoordinator 0]: Member
> consumer-1-efde1e11-fdc6-4801-8fba-20d58b7a30b6 in group foo-bar has
> failed (kafka.coordinator.GroupCoordinator)
> [2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Preparing to
> restabilize group foo-bar with old generation 1034
> (kafka.coordinator.GroupCoordinator)
> [2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Group foo-bar with
> generation 1035 is now empty (kafka.coordinator.GroupCoordinator)
> ....
>
> These messages keep repeating for almost every other consumer we have (in
> different groups).
>
> There's no real logic in our consumers and they just pick up the message
> from partitions, commit the offset, and hand it immediately to a different
> thread to process the message and go back to polling:
>
>        while (!stopped) {
>                 try {
>                     final ConsumerRecords<K, V> consumerRecords =
> consumer.poll(someValue);
>                     for (final TopicPartition topicPartition :
> consumerRecords.partitions()) {
>                         if (stopped) {
>                             break;
>                         }
>                         for (final ConsumerRecord<K, V> consumerRecord :
> consumerRecords.records(topicPartition)) {
>                             final long previousOffset =
> consumerRecord.offset();
>                             // commit the offset and then pass on the
> message for processing (in a separate thread)
> consumer.commitSync(Collections.singletonMap(topicPartition, new
> OffsetAndMetadata(previousOffset + 1)));
>
>                             this.executor.execute(new Runnable() {
>                                 @Override
>                                 public void run() {
>                                     // process the ConsumerRecord
>                                 }
>                             });
>                         }
>                     }
>                 } catch (Exception e) {
>                     // log the error and continue
>                     continue;
>                 }
>             }
>
>
>
> We haven't been able to figure out why the heartbeats wouldn't be sent by
> the consumer in the expected time period. From my understanding of the
> docs, the heartbeats are sent in the background thread for the consumer, so
> there should be no real reason why these wouldn't be sent.
>
>
> We debugged this a bit further and got some thread dumps from the JVM of
> the consumers and here's what we see:
>
> "*kafka-coordinator-heartbeat-thread* | foo-bar #28 daemon prio=5
> os_prio=0 tid=0x00007f0d7c0ee000 nid=0x2e waiting for monitor entry
> [0x00007f0dd54c7000]
>    java.lang.Thread.State: *BLOCKED* (on object monitor)
>     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.disableWakeups(ConsumerNetworkClient.java:409)
>     - *waiting to lock <0x00000000c0962bb0>* (a
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
>     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.pollNoWakeup(ConsumerNetworkClient.java:264)
>     at org.apache.kafka.clients.consumer.internals.AbstractCoordina
> tor$HeartbeatThread.run(AbstractCoordinator.java:864)
>     - locked <0x00000000c0962578> (a org.apache.kafka.clients.consu
> mer.internals.ConsumerCoordinator)
>
> So it looks like the heartbeat thread is *blocked*  waiting for a object
> lock and that lock is held by:
>
>
> "thread-1" #27 daemon prio=5 os_prio=0 tid=0x00007f0dec3c1800 nid=0x27
> runnable [0x00007f0dcdffc000]
>    java.lang.Thread.State: RUNNABLE
>     at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>     at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>     at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>     at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>     - locked <0x00000000c063b820> (a sun.nio.ch.Util$3)
>     - locked <0x00000000c063b810> (a java.util.Collections$Unmodifi
> ableSet)
>     - locked <0x00000000c05f9a70> (a sun.nio.ch.EPollSelectorImpl)
>     at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>     at org.apache.kafka.common.network.Selector.select(Selector.java:470)
>     at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.poll(ConsumerNetworkClient.java:232)
>     - *locked* <*0x00000000c0962bb0*> (a org.apache.kafka.clients.consu
> mer.internals.ConsumerNetworkClient)
>     at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
> KafkaConsumer.java:1031)
>     at org.apache.kafka.clients.consumer.*KafkaConsumer*.*poll*(
> KafkaConsumer.java:979)
>     at org.myapp.KafkaMessageReceiver.start(KafkaMessageReceiver.java:72)
>
>
> So it looks like the consumer code which invokes the
> *KafkaConsumer.poll*(...) API to fetch the messages is blocking the
> heartbeat sender thread? Is this intentional? If so, wouldn't this delay
> the heartbeats being sent and cause the heartbeat task on the coordinator
> to expire as per this logic is see on the coordinator:
>
>   private def shouldKeepMemberAlive(member: MemberMetadata,
> heartbeatDeadline: Long) =
>     member.awaitingJoinCallback != null ||
>       member.awaitingSyncCallback != null ||
> *      member.latestHeartbeat + member.sessionTimeoutMs >
> heartbeatDeadline**
> *
> from what I see and my limited understanding of this code, this would mark
> the member dead (as seen in the logs).
>
>
> Is this expected that the background heart beat sender thread would be
> blocked by poll on the consumer (*our poll timeout is 2 minutes*)? Or did I
> misread these logs and stacktraces? Let me know if more logs/details are
> needed and I can get them.
>
>
> -Jaikiran
>
>

Reply via email to