[ https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14384762#comment-14384762 ]
David Chu commented on KAFKA-1716: ---------------------------------- [~becket_qin] I was able to reproduce the same issue as [~ashwin.jayaprakash] by simply starting up a single Kafka 0.8.2.1 broker (in conjunction with Zookeeper 3.4.6) along with an application that had 8 {{ZookeeperConsumerConnector}} instances and then shutting down the application with the 8 consumer connectors. Also, I did not create any Kafka topics in this test (so the 8 consumer connectors were not consuming from anything) and looking at the created threads I could see that I had 8 \*-leader-finder-thread instances but no consumer fetcher threads similar to the thread dumps provided by [~ashwin.jayaprakash]. >From what I can tell, it appears that there is a race condition between the >{{ZookeeperConsumerConnector}} shutdown logic and its rebalance logic that >could result in the consumer shutdown process hanging for a long time. For >example, I noticed that when I shutdown my application with the 8 consumer >connectors that the following occurred: # The first {{ZookeeperConsumerConnector}} is shutdown properly as shown from the following logs: {code} [2015-03-27T00:44:00,512Z] [INFO ] [Thread-13] [k.c.ZookeeperConsumerConnector] [elasticsearch-indexer-group_on-cc.aa.com-pid-33836-kafka-message-source-id-845], ZKConsumerConnector shutting down [2015-03-27T00:44:00,523Z] [INFO ] [Thread-13] [k.c.ZookeeperConsumerConnector] [elasticsearch-indexer-group_on-cc.aa.com-pid-33836-kafka-message-source-id-845], ZKConsumerConnector shutdown completed in 11 ms {code} # This then triggers a rebalance in the other consumers as shown in the following log: {code} [2015-03-27T00:44:00,524Z] [INFO ] [elasticsearch-indexer-group_on-cc.aa.com-pid-33836-kafka-message-source-id-844_watcher_executor] [k.c.ZookeeperConsumerConnector] [elasticsearch-indexer-group_on-cc.aa.com-pid-33836-kafka-message-source-id-844], begin rebalancing consumer elasticsearch-indexer-group_on-cc.aa.com-pid-33836-kafka-message-source-id-844 try #0 {code} Also, while the rebalance is going on, the thread performing the rebalance is holding the {{ZookeeperConsumerConnector.rebalanceLock}} # While the {{ZookeeperConsumerConnector}} is performing a rebalance a shutdown request is triggered as shown by the following log: {code} [2015-03-27T00:52:01,952Z] [INFO ] [Thread-13] [k.c.ZookeeperConsumerConnector] [elasticsearch-indexer-group_on-cc.aa.com-pid-33836-kafka-message-source-id-844], ZKConsumerConnector shutting down {code} When the {{ZookeeperConsumerConnector.shutdown()}} method is called it sets the boolean property {{ZookeeperConsumerConnector.isShuttingDown}} to {{true}}. The thread then waits for the {{ZookeeperConsumerConnector.rebalanceLock}} to be released which is currently being held by the thread performing the rebalance as described in step #2. # In the {{ZookeeperConsumerConnector.rebalance()}} method it looks at the {{ZookeeperConsumerConnector.isShuttingDown}} property and if it's {{true}} the method will return {{false}} as shown from the following code snippet (line 669 in ZookeeperConsumerConnector): {code} if (isShuttingDown.get || !offsetFetchResponseOpt.isDefined) false {code} Since {{isShuttingDown}} was set to true in step #3 this will always cause the {{ZookeeperConsumerConnector.rebalance()}} method to return {{false}} which in turn will cause the {{ZookeeperConsumerConnector.syncedRebalance()}} method to keep retrying the rebalance operation until the configured {{rebalance.max.retries}} have been exhausted. # While the rebalance is going on the shutdown process can't proceed since it's trying to acquire the {{rebalanceLock}} held by the rebalance process. # Therefore, if your application has a large number of {{ZookeeperConsumerConnector}} instances and it's configured with large values for {{rebalance.max.retries}} and {{rebalance.backoff.ms}}, it could take a very long time for the application to shut down cleanly. It seems like one solution to this problem would be to move the {{if(isShuttingDown.get())}} check inside the {{for}} loop on line 602 in the {{ZookeeperConsumerConnector.syncedRebalance}} method. > hang during shutdown of ZookeeperConsumerConnector > -------------------------------------------------- > > Key: KAFKA-1716 > URL: https://issues.apache.org/jira/browse/KAFKA-1716 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.8.1.1 > Reporter: Sean Fay > Assignee: Neha Narkhede > Attachments: after-shutdown.log, before-shutdown.log, > kafka-shutdown-stuck.log > > > It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to > wedge in the case that some consumer fetcher threads receive messages during > the shutdown process. > Shutdown thread: > {code} -- Parking to wait for: > java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0 > at jrockit/vm/Locks.park0(J)V(Native Method) > at jrockit/vm/Locks.park(Locks.java:2230) > at sun/misc/Unsafe.park(ZJ)V(Native Method) > at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) > at > java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > at > java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) > at > java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) > at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207) > at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36) > at > kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71) > at > kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121) > at > kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120) > at > scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at > scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226) > at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39) > at scala/collection/mutable/HashMap.foreach(HashMap.scala:98) > at > scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at > kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120) > ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock] > at > kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148) > at > kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171) > at > kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code} > ConsumerFetcherThread: > {code} -- Parking to wait for: > java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568 > at jrockit/vm/Locks.park0(J)V(Native Method) > at jrockit/vm/Locks.park(Locks.java:2230) > at sun/misc/Unsafe.park(ZJ)V(Native Method) > at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) > at > java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) > at > java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) > at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) > at > kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) > at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224) > at > scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403) > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > at kafka/utils/Utils$.inLock(Utils.scala:538) > at > kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) > at > kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51) > at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method){code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)