[ 
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14386022#comment-14386022
 ] 

Jiangjie Qin commented on KAFKA-1716:
-------------------------------------

[~dchu] Do you mean that the fetchers have never been created? That's a good 
point, but I still do not totally understand the cause.
The first rebalance of ZookeeperConsumeConnector occurs when KafkaStreams are 
created. That means you need to specify a topic count map and create streams. 
So leader finder thread will send TopicMetadataRequest to brokers to get back 
the topic metadata for the topic. By default auto topic creation is enabled on 
Kafka brokers. That means when broker saw a TopicMetadataRequest asking for a 
topic that does not exist yet, it will created it and return the topic 
metadata. So the consumer fetcher thread will be created for the topic on 
ZookeeperConsumerConnector. However, if auto topic creation is turned off, your 
description looks possible.
About the shutdown issue. You are right, that is an issue that has been fixed 
in KAFKA-1848, but seems not included in 0.8.2. I just changed the fix version 
from 0.9.0 to 0.8.3 instead.

> hang during shutdown of ZookeeperConsumerConnector
> --------------------------------------------------
>
>                 Key: KAFKA-1716
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1716
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8.1.1
>            Reporter: Sean Fay
>            Assignee: Neha Narkhede
>         Attachments: after-shutdown.log, before-shutdown.log, 
> kafka-shutdown-stuck.log
>
>
> It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to 
> wedge in the case that some consumer fetcher threads receive messages during 
> the shutdown process.
> Shutdown thread:
> {code}    -- Parking to wait for: 
> java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
>     at jrockit/vm/Locks.park0(J)V(Native Method)
>     at jrockit/vm/Locks.park(Locks.java:2230)
>     at sun/misc/Unsafe.park(ZJ)V(Native Method)
>     at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
>     at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>     at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
>     at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
>     at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
>     at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
>     at 
> kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
>     at 
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
>     at 
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
>     at 
> scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>     at 
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>     at 
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>     at 
> scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
>     at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
>     at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
>     at 
> scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>     at 
> kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
>     ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
>     at 
> kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
>     at 
> kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
>     at 
> kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
> ConsumerFetcherThread:
> {code}    -- Parking to wait for: 
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
>     at jrockit/vm/Locks.park0(J)V(Native Method)
>     at jrockit/vm/Locks.park(Locks.java:2230)
>     at sun/misc/Unsafe.park(ZJ)V(Native Method)
>     at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
>     at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>     at 
> java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
>     at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
>     at 
> kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
>     at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
>     at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
>     at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
>     at 
> scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
>     at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
>     at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
>     at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
>     at kafka/utils/Utils$.inLock(Utils.scala:538)
>     at 
> kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
>     at 
> kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>     at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51)
>     at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method){code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to