[ https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14356289#comment-14356289 ]
Ashwin Jayaprakash commented on KAFKA-1716: ------------------------------------------- We upgraded to Kafka 0.8.2 last week and now we can reproduce this issue every time on our Kafka consumer JVMs. Our setup is like this. We start {{ConsumerConnector}} instances dynamically based on a configurable property. Each of those {{ConsumerConnector}} instances creates a {{ConsumerIterator}}. Right now we have 4 such instances in each JVM. Naturally we have 4 separate threads consuming from those 4 iterators in parallel. All this worked ok until recently, where we faced some issues with consumer rebalancing and an overloaded ZK subtree, see http://markmail.org/thread/gnodacjjya6r573m. While we were trying to address that we changed the defaults to these {{rebalance.max.retries 16}} and {{rebalance.backoff.ms 10000}}. Note that we also upgraded to 0.8.2. Everytime we shutdown the JVM, we first try to shutdown the consumers one by one before exiting. With these recent changes, the JVM exit gets stuck because: # The shutdown thread is different from the 4 consumer threads (in addition to the background threads that ZK and Kafka create) # The shutdown thread shuts down the first consumer and so that consumer exits quickly and gracefully # In the meanwhile the second, third and fourth consumers are trying to rebalance the partitions # Shutdown thread proceeds to call shutdown on the second consumer ## The shutdown thread appears to make some progress in shutting down the second consumer but then gets stuck on a monitor that has been acquired by the {{xx_watcher_executor}} ## This appears to be a deadlock because the {{xx_watcher_executor}} thread has acquired the monitor lock and gone to sleep # The shutdown then takes a long time because all the 3 remaining consumers retry for {{16}} times and then give up The thread dumps here should make it clear. {code} "Thread-13@8222" prio=5 tid=0x53 nid=NA waiting for monitor entry java.lang.Thread.State: BLOCKED waiting for indexer-group_on-localhost-pid-58357-kafka-message-source-id-820_watcher_executor@8160 to release lock on <0x2b5e> (a java.lang.Object) at kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:191) at kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:119) at xx.yy.zz.processor.kafka.consumer.KafkaMessageSource.close(KafkaMessageSource.java:239) at xx.yy.zz.pipeline.source.MessageSourceStage.stop(MessageSourceStage.java:162) at xx.yy.common.util.lifecycle.LifecycleHelper.stopAll(LifecycleHelper.java:53) at xx.yy.zz.pipeline.framework.Pipeline.stop(Pipeline.java:205) at xx.yy.common.util.lifecycle.LifecycleHelper.stopAll(LifecycleHelper.java:53) at xx.yy.zz.pipeline.framework.Pipelines.stop(Pipelines.java:225) at sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at xx.yy.AppLifecycle.callAnnotatedMethods(AppLifecycle.java:163) at xx.yy.AppLifecycle.stop(AppLifecycle.java:144) - locked <0x2b31> (a xx.yy.AppLifecycle) at xx.yy.AppLifecycle$6.stop(AppLifecycle.java:247) at io.dropwizard.lifecycle.JettyManaged.doStop(JettyManaged.java:32) at org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:90) - locked <0x2b83> (a java.lang.Object) at org.eclipse.jetty.util.component.ContainerLifeCycle.stop(ContainerLifeCycle.java:129) at org.eclipse.jetty.util.component.ContainerLifeCycle.doStop(ContainerLifeCycle.java:148) at org.eclipse.jetty.server.handler.AbstractHandler.doStop(AbstractHandler.java:71) at org.eclipse.jetty.server.Server.doStop(Server.java:410) at org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:90) - locked <0x2b84> (a java.lang.Object) at org.eclipse.jetty.util.thread.ShutdownThread.run(ShutdownThread.java:133) "indexer-group_on-localhost-pid-58357-kafka-message-source-id-820_watcher_executor@8160" daemon prio=5 tid=0x74 nid=NA sleeping java.lang.Thread.State: TIMED_WAITING blocks Thread-13@8222 at java.lang.Thread.sleep(Thread.java:-1) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:627) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:602) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:598) - locked <0x2b5e> (a java.lang.Object) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:551) {code} Log snippet attached. > hang during shutdown of ZookeeperConsumerConnector > -------------------------------------------------- > > Key: KAFKA-1716 > URL: https://issues.apache.org/jira/browse/KAFKA-1716 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.8.1.1 > Reporter: Sean Fay > Assignee: Neha Narkhede > > It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to > wedge in the case that some consumer fetcher threads receive messages during > the shutdown process. > Shutdown thread: > {code} -- Parking to wait for: > java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0 > at jrockit/vm/Locks.park0(J)V(Native Method) > at jrockit/vm/Locks.park(Locks.java:2230) > at sun/misc/Unsafe.park(ZJ)V(Native Method) > at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) > at > java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > at > java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) > at > java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) > at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207) > at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36) > at > kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71) > at > kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121) > at > kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120) > at > scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at > scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226) > at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39) > at scala/collection/mutable/HashMap.foreach(HashMap.scala:98) > at > scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at > kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120) > ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock] > at > kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148) > at > kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171) > at > kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code} > ConsumerFetcherThread: > {code} -- Parking to wait for: > java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568 > at jrockit/vm/Locks.park0(J)V(Native Method) > at jrockit/vm/Locks.park(Locks.java:2230) > at sun/misc/Unsafe.park(ZJ)V(Native Method) > at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) > at > java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) > at > java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) > at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) > at > kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) > at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224) > at > scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403) > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > at kafka/utils/Utils$.inLock(Utils.scala:538) > at > kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) > at > kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51) > at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method){code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)