[ 
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14384762#comment-14384762
 ] 

David Chu commented on KAFKA-1716:
----------------------------------

[~becket_qin] I was able to reproduce the same issue as [~ashwin.jayaprakash] 
by simply starting up a single Kafka 0.8.2.1 broker (in conjunction with 
Zookeeper 3.4.6) along with an application that had 8 
{{ZookeeperConsumerConnector}} instances and then shutting down the application 
with the 8 consumer connectors.  Also, I did not create any Kafka topics in 
this test (so the 8 consumer connectors were not consuming from anything) and 
looking at the created threads I could see that I had 8 \*-leader-finder-thread 
instances but no consumer fetcher threads similar to the thread dumps provided 
by [~ashwin.jayaprakash].  

>From what I can tell, it appears that there is a race condition between the 
>{{ZookeeperConsumerConnector}} shutdown logic and its rebalance logic that 
>could result in the consumer shutdown process hanging for a long time.  For 
>example,  I noticed that when I shutdown my application with the 8 consumer 
>connectors that the following occurred:

# The first {{ZookeeperConsumerConnector}} is shutdown properly as shown from 
the following logs:
{code}
[2015-03-27T00:44:00,512Z]  [INFO ]  [Thread-13]  
[k.c.ZookeeperConsumerConnector]  
[elasticsearch-indexer-group_on-cc.aa.com-pid-33836-kafka-message-source-id-845],
 ZKConsumerConnector shutting down
[2015-03-27T00:44:00,523Z]  [INFO ]  [Thread-13]  
[k.c.ZookeeperConsumerConnector]  
[elasticsearch-indexer-group_on-cc.aa.com-pid-33836-kafka-message-source-id-845],
 ZKConsumerConnector shutdown completed in 11 ms
{code}
# This then triggers a rebalance in the other consumers as shown in the 
following log:
{code}
[2015-03-27T00:44:00,524Z]  [INFO ]  
[elasticsearch-indexer-group_on-cc.aa.com-pid-33836-kafka-message-source-id-844_watcher_executor]
  [k.c.ZookeeperConsumerConnector]  
[elasticsearch-indexer-group_on-cc.aa.com-pid-33836-kafka-message-source-id-844],
 begin rebalancing consumer 
elasticsearch-indexer-group_on-cc.aa.com-pid-33836-kafka-message-source-id-844 
try #0
{code}
Also, while the rebalance is going on, the thread performing the rebalance is 
holding the {{ZookeeperConsumerConnector.rebalanceLock}}
# While the {{ZookeeperConsumerConnector}} is performing a rebalance a shutdown 
request is triggered as shown by the following log:
{code}
[2015-03-27T00:52:01,952Z]  [INFO ]  [Thread-13]  
[k.c.ZookeeperConsumerConnector]  
[elasticsearch-indexer-group_on-cc.aa.com-pid-33836-kafka-message-source-id-844],
 ZKConsumerConnector shutting down
{code}
When the {{ZookeeperConsumerConnector.shutdown()}} method is called it sets the 
boolean property {{ZookeeperConsumerConnector.isShuttingDown}} to {{true}}.  
The thread then waits for the {{ZookeeperConsumerConnector.rebalanceLock}} to 
be released which is currently being held by the thread performing the 
rebalance as described in step #2.
# In the {{ZookeeperConsumerConnector.rebalance()}} method it looks at the 
{{ZookeeperConsumerConnector.isShuttingDown}} property and if it's {{true}} the 
method will return {{false}} as shown from the following code snippet (line 669 
in ZookeeperConsumerConnector):
{code}
if (isShuttingDown.get || !offsetFetchResponseOpt.isDefined)
          false
{code}
Since {{isShuttingDown}} was set to true in step #3 this will always cause the 
{{ZookeeperConsumerConnector.rebalance()}} method to return {{false}} which in 
turn will cause the {{ZookeeperConsumerConnector.syncedRebalance()}} method to 
keep retrying the rebalance operation until the configured 
{{rebalance.max.retries}} have been exhausted.
# While the rebalance is going on the shutdown process can't proceed since it's 
trying to acquire the {{rebalanceLock}} held by the rebalance process.
# Therefore, if your application has a large number of 
{{ZookeeperConsumerConnector}} instances and it's configured with large values 
for {{rebalance.max.retries}} and {{rebalance.backoff.ms}}, it could take a  
very long time for the application to shut down cleanly.

It seems like one solution to this problem would be to move the 
{{if(isShuttingDown.get())}} check inside the {{for}} loop on line 602 in the 
{{ZookeeperConsumerConnector.syncedRebalance}} method.

> hang during shutdown of ZookeeperConsumerConnector
> --------------------------------------------------
>
>                 Key: KAFKA-1716
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1716
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8.1.1
>            Reporter: Sean Fay
>            Assignee: Neha Narkhede
>         Attachments: after-shutdown.log, before-shutdown.log, 
> kafka-shutdown-stuck.log
>
>
> It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to 
> wedge in the case that some consumer fetcher threads receive messages during 
> the shutdown process.
> Shutdown thread:
> {code}    -- Parking to wait for: 
> java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
>     at jrockit/vm/Locks.park0(J)V(Native Method)
>     at jrockit/vm/Locks.park(Locks.java:2230)
>     at sun/misc/Unsafe.park(ZJ)V(Native Method)
>     at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
>     at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>     at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
>     at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
>     at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
>     at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
>     at 
> kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
>     at 
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
>     at 
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
>     at 
> scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>     at 
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>     at 
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>     at 
> scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
>     at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
>     at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
>     at 
> scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>     at 
> kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
>     ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
>     at 
> kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
>     at 
> kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
>     at 
> kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
> ConsumerFetcherThread:
> {code}    -- Parking to wait for: 
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
>     at jrockit/vm/Locks.park0(J)V(Native Method)
>     at jrockit/vm/Locks.park(Locks.java:2230)
>     at sun/misc/Unsafe.park(ZJ)V(Native Method)
>     at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
>     at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>     at 
> java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
>     at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
>     at 
> kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
>     at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
>     at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
>     at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
>     at 
> scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
>     at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
>     at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
>     at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
>     at kafka/utils/Utils$.inLock(Utils.scala:538)
>     at 
> kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
>     at 
> kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>     at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51)
>     at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method){code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to