You can take a look at this : https://issues.apache.org/jira/browse/KAFKA-1848
Thanks, Mayuresh On Sun, Mar 29, 2015 at 4:45 PM, Jiangjie Qin (JIRA) <j...@apache.org> wrote: > > [ > https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel > ] > > Jiangjie Qin updated KAFKA-1716: > -------------------------------- > Comment: was deleted > > (was: [~dchu] Do you mean that the fetchers have never been created? > That's a good point, but I still do not totally understand the cause. > The first rebalance of ZookeeperConsumeConnector occurs when KafkaStreams > are created. That means you need to specify a topic count map and create > streams. So leader finder thread will send TopicMetadataRequest to brokers > to get back the topic metadata for the topic. By default auto topic > creation is enabled on Kafka brokers. That means when broker saw a > TopicMetadataRequest asking for a topic that does not exist yet, it will > created it and return the topic metadata. So the consumer fetcher thread > will be created for the topic on ZookeeperConsumerConnector. However, if > auto topic creation is turned off, your description looks possible. > About the shutdown issue. You are right, that is an issue that has been > fixed in KAFKA-1848, but seems not included in 0.8.2. I just changed the > fix version from 0.9.0 to 0.8.3 instead.) > > > hang during shutdown of ZookeeperConsumerConnector > > -------------------------------------------------- > > > > Key: KAFKA-1716 > > URL: https://issues.apache.org/jira/browse/KAFKA-1716 > > Project: Kafka > > Issue Type: Bug > > Components: consumer > > Affects Versions: 0.8.1.1 > > Reporter: Sean Fay > > Assignee: Neha Narkhede > > Attachments: after-shutdown.log, before-shutdown.log, > kafka-shutdown-stuck.log > > > > > > It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} > to wedge in the case that some consumer fetcher threads receive messages > during the shutdown process. > > Shutdown thread: > > {code} -- Parking to wait for: > java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0 > > at jrockit/vm/Locks.park0(J)V(Native Method) > > at jrockit/vm/Locks.park(Locks.java:2230) > > at sun/misc/Unsafe.park(ZJ)V(Native Method) > > at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) > > at > java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > > at > java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) > > at > java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) > > at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207) > > at > kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36) > > at > kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71) > > at > kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121) > > at > kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120) > > at > scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > > at > scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > > at > scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > > at > scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226) > > at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39) > > at scala/collection/mutable/HashMap.foreach(HashMap.scala:98) > > at > scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > > at > kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120) > > ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock] > > at > kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148) > > at > kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171) > > at > kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code} > > ConsumerFetcherThread: > > {code} -- Parking to wait for: > java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568 > > at jrockit/vm/Locks.park0(J)V(Native Method) > > at jrockit/vm/Locks.park(Locks.java:2230) > > at sun/misc/Unsafe.park(ZJ)V(Native Method) > > at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) > > at > java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) > > at > java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) > > at > kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) > > at > kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) > > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) > > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) > > at > scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224) > > at > scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403) > > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) > > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > > at > kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > > at kafka/utils/Utils$.inLock(Utils.scala:538) > > at > kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) > > at > kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > > at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51) > > at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method){code} > > > > -- > This message was sent by Atlassian JIRA > (v6.3.4#6332) > -- -Regards, Mayuresh R. Gharat (862) 250-7125