You can take a look at this :

https://issues.apache.org/jira/browse/KAFKA-1848

Thanks,

Mayuresh

On Sun, Mar 29, 2015 at 4:45 PM, Jiangjie Qin (JIRA) <j...@apache.org>
wrote:

>
>      [
> https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
> ]
>
> Jiangjie Qin updated KAFKA-1716:
> --------------------------------
>     Comment: was deleted
>
> (was: [~dchu] Do you mean that the fetchers have never been created?
> That's a good point, but I still do not totally understand the cause.
> The first rebalance of ZookeeperConsumeConnector occurs when KafkaStreams
> are created. That means you need to specify a topic count map and create
> streams. So leader finder thread will send TopicMetadataRequest to brokers
> to get back the topic metadata for the topic. By default auto topic
> creation is enabled on Kafka brokers. That means when broker saw a
> TopicMetadataRequest asking for a topic that does not exist yet, it will
> created it and return the topic metadata. So the consumer fetcher thread
> will be created for the topic on ZookeeperConsumerConnector. However, if
> auto topic creation is turned off, your description looks possible.
> About the shutdown issue. You are right, that is an issue that has been
> fixed in KAFKA-1848, but seems not included in 0.8.2. I just changed the
> fix version from 0.9.0 to 0.8.3 instead.)
>
> > hang during shutdown of ZookeeperConsumerConnector
> > --------------------------------------------------
> >
> >                 Key: KAFKA-1716
> >                 URL: https://issues.apache.org/jira/browse/KAFKA-1716
> >             Project: Kafka
> >          Issue Type: Bug
> >          Components: consumer
> >    Affects Versions: 0.8.1.1
> >            Reporter: Sean Fay
> >            Assignee: Neha Narkhede
> >         Attachments: after-shutdown.log, before-shutdown.log,
> kafka-shutdown-stuck.log
> >
> >
> > It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}}
> to wedge in the case that some consumer fetcher threads receive messages
> during the shutdown process.
> > Shutdown thread:
> > {code}    -- Parking to wait for:
> java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
> >     at jrockit/vm/Locks.park0(J)V(Native Method)
> >     at jrockit/vm/Locks.park(Locks.java:2230)
> >     at sun/misc/Unsafe.park(ZJ)V(Native Method)
> >     at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
> >     at
> java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> >     at
> java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> >     at
> java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> >     at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
> >     at
> kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> >     at
> kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
> >     at
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
> >     at
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
> >     at
> scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> >     at
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> >     at
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> >     at
> scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
> >     at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
> >     at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
> >     at
> scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> >     at
> kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
> >     ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
> >     at
> kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
> >     at
> kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
> >     at
> kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
> > ConsumerFetcherThread:
> > {code}    -- Parking to wait for:
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
> >     at jrockit/vm/Locks.park0(J)V(Native Method)
> >     at jrockit/vm/Locks.park(Locks.java:2230)
> >     at sun/misc/Unsafe.park(ZJ)V(Native Method)
> >     at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
> >     at
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> >     at
> java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
> >     at
> kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> >     at
> kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> >     at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
> >     at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> >     at
> scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
> >     at
> scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
> >     at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> >     at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> >     at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> >     at kafka/utils/Utils$.inLock(Utils.scala:538)
> >     at
> kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
> >     at
> kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> >     at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51)
> >     at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method){code}
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v6.3.4#6332)
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Reply via email to