[ https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15145519#comment-15145519 ]
Rekha Joshi edited comment on KAFKA-914 at 2/12/16 11:16 PM: ------------------------------------------------------------- Hi, We have been seeing consistent issue mirroring between our DataCenters., and same issue seems to resurface.Below are the details.Is this concern really resolved? Thanks Rekha {code} Source: AWS (13 Brokers) Destination: OTHER-DC (20 Brokers) Topic: mirrortest (source: 260 partitions, destination: 200 partitions) Connectivity: AWS Direct Connect (max 6Gbps) Data details: Source is receiving 40,000 msg/sec, each message is around 5KB Mirroring ------------ Node: is at our OTHER-DC (24 Cores, 48 GB Memory) JVM Details: 1.8.0_51, -Xmx8G -Xms8G -XX:PermSize=96m -XX:MaxPermSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 Launch script: kafka.tools.MirrorMaker --consumer.config consumer.properties --producer.config producer.properties --num.producers 1 --whitelist mirrortest --num.streams 1 --queue.size 100000 consumer.properties --------------------------- zookeeper.connect=<host:port> group.id=KafkaMirror auto.offset.reset=smallest fetch.message.max.bytes=9000000 zookeeper.connection.timeout.ms=60000 rebalance.max.retries=4 rebalance.backoff.ms=5000 producer.properties -------------------------- metadata.broker.list=<host:port> partitioner.class=<our custom round robin partitioner> producer.type=async When we start the mirroring job everything works fine as expected, Eventually we hit an issue where the job stops consuming no more. At this stage: 1. No Error seen in the mirrormaker logs 2. consumer threads are not fetching any messages and we see thread dumps as follows: "ConsumerFetcherThread-KafkaMirror_1454375262613-7b760d87-0-26" - Thread t@73 java.lang.Thread.State: WAITING at sun.misc.Unsafe.park(Native Method) - parking to wait for <79b6d3ce> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awa i t(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350 ) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcher T hread.scala:49) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfu n $apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfu n $apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$m c V$sp(AbstractFetcherThread.scala:109) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(A b stractFetcherThread.scala:109) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(A b stractFetcherThread.scala:109) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThr e ad.scala:108) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) Locked ownable synchronizers: - locked <199dc92d> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) 3. Producer stops producing, in trace mode we notice it's handling 0 events and Thread dump as follows: "ProducerSendThread--0" - Thread t@53 java.lang.Thread.State: RUNNABLE at sun.nio.ch.FileDispatcherImpl.writev0(Native Method) at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51) at sun.nio.ch.IOUtil.write(IOUtil.java:148) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504) - locked <5ae2fc40> (a java.lang.Object) at java.nio.channels.SocketChannel.write(SocketChannel.java:502) at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:5 6 ) at kafka.network.Send$class.writeCompletely(Transmission.scala:75) at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend . scala:26) at kafka.network.BlockingChannel.send(BlockingChannel.scala:103) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu c er.scala:72) - locked <8489cd8> (a java.lang.Object) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply $ mcV$sp(SyncProducer.scala:103) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply ( SyncProducer.scala:103) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply ( SyncProducer.scala:103) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.sca l a:102) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer.send(SyncProducer.scala:101) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEven t Handler$$send(DefaultEventHandler.scala:255) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$ 2 .apply(DefaultEventHandler.scala:106) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$ 2 .apply(DefaultEventHandler.scala:100) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(Trav e rsableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:9 8 ) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:9 8 ) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226 ) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala : 771) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEv e ntHandler.scala:100) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala : 72) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc a la:105) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr o ducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr o ducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread. s cala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) {code} was (Author: rekhajoshm): Hi, We have been seeing consistent issue mirroring between our DataCenters., and same issue seems to resurface. Below is the setup details ------------------------ Source: AWS (13 Brokers) Destination: OTHER-DC (20 Brokers) Topic: mirrortest (source: 260 partitions, destination: 200 partitions) Connectivity: AWS Direct Connect (max 6Gbps) Data details: Source is receiving 40,000 msg/sec, each message is around 5KB Mirroring ------------ Node: is at our OTHER-DC (24 Cores, 48 GB Memory) JVM Details: 1.8.0_51, -Xmx8G -Xms8G -XX:PermSize=96m -XX:MaxPermSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 Launch script: kafka.tools.MirrorMaker --consumer.config consumer.properties --producer.config producer.properties --num.producers 1 --whitelist mirrortest --num.streams 1 --queue.size 100000 consumer.properties --------------------------- zookeeper.connect=<host:port> group.id=KafkaMirror auto.offset.reset=smallest fetch.message.max.bytes=9000000 zookeeper.connection.timeout.ms=60000 rebalance.max.retries=4 rebalance.backoff.ms=5000 producer.properties -------------------------- metadata.broker.list=<host:port> partitioner.class=<our custom round robin partitioner> producer.type=async When we start the mirroring job everything works fine as expected, Eventually we hit an issue where the job stops consuming no more. At this stage: {code} 1. No Error seen in the mirrormaker logs 2. consumer threads are not fetching any messages and we see thread dumps as follows: "ConsumerFetcherThread-KafkaMirror_1454375262613-7b760d87-0-26" - Thread t@73 java.lang.Thread.State: WAITING at sun.misc.Unsafe.park(Native Method) - parking to wait for <79b6d3ce> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awa i t(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350 ) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcher T hread.scala:49) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfu n $apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfu n $apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$m c V$sp(AbstractFetcherThread.scala:109) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(A b stractFetcherThread.scala:109) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(A b stractFetcherThread.scala:109) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThr e ad.scala:108) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) Locked ownable synchronizers: - locked <199dc92d> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) 3. Producer stops producing, in trace mode we notice it's handling 0 events and Thread dump as follows: "ProducerSendThread--0" - Thread t@53 java.lang.Thread.State: RUNNABLE at sun.nio.ch.FileDispatcherImpl.writev0(Native Method) at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51) at sun.nio.ch.IOUtil.write(IOUtil.java:148) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504) - locked <5ae2fc40> (a java.lang.Object) at java.nio.channels.SocketChannel.write(SocketChannel.java:502) at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:5 6 ) at kafka.network.Send$class.writeCompletely(Transmission.scala:75) at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend . scala:26) at kafka.network.BlockingChannel.send(BlockingChannel.scala:103) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu c er.scala:72) - locked <8489cd8> (a java.lang.Object) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply $ mcV$sp(SyncProducer.scala:103) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply ( SyncProducer.scala:103) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply ( SyncProducer.scala:103) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.sca l a:102) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer.send(SyncProducer.scala:101) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEven t Handler$$send(DefaultEventHandler.scala:255) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$ 2 .apply(DefaultEventHandler.scala:106) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$ 2 .apply(DefaultEventHandler.scala:100) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(Trav e rsableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:9 8 ) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:9 8 ) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226 ) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala : 771) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEv e ntHandler.scala:100) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala : 72) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc a la:105) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr o ducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr o ducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread. s cala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) {code} Thanks Rekha > Deadlock between initial rebalance and watcher-triggered rebalances > ------------------------------------------------------------------- > > Key: KAFKA-914 > URL: https://issues.apache.org/jira/browse/KAFKA-914 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.8.0 > Reporter: Joel Koshy > Fix For: 0.8.0 > > Attachments: KAFKA-914-v1.patch > > > Summary doesn't give the full picture and the fetcher-manager/fetcher-thread > > > code is very complex so it's a bit hard to articulate the following very > > > clearly. I will try and describe the sequence that results in a deadlock > > > when starting up a large number of consumers at around the same time: > > > > > > - When a consumer's createMessageStream method is called, it initiates an > > > initial inline rebalance. > > > - However, before the above initial rebalance actually begins, a ZK watch > > > may trigger (due to some other consumers starting up) and initiate a > > > rebalance. This happens successfully so fetchers start and start filling > > > up the chunk queues. > > > - Another watch triggers and initiates yet another rebalance. This rebalance > > > attempt tries to close the fetchers. Before the fetchers are stopped, we > > > shutdown the leader-finder-thread to prevent new fetchers from being > > > started. > > > - The shutdown is accomplished by interrupting the leader-finder-thread and > > > then awaiting its shutdown latch. > > > - If the leader-finder-thread still has a partition without leader to > > > process and tries to add a fetcher for it, it will get an exception > > > (InterruptedException if acquiring the partitionMapLock or > > > ClosedByInterruptException if performing an offset request). If we get an > > > InterruptedException the thread's interrupted flag is cleared. > > > - However, the leader-finder-thread may have multiple partitions without > > > leader that it is currently processing. So if the interrupted flag is > > > cleared and the leader-finder-thread tries to add a fetcher for a another > > > partition, it does not receive an InterruptedException when it tries to > > > acquire the partitionMapLock. It can end up blocking indefinitely at that > > > point. > > > - The problem is that until now, the createMessageStream's initial inline > > > rebalance has not yet returned - it is blocked on the rebalance lock > > > waiting on the second watch-triggered rebalance to complete. i.e., the > > > consumer iterators have not been created yet and thus the fetcher queues > > > get filled up. [td1] > > > - As a result, processPartitionData (which holds on to the partitionMapLock) > > > in one or more fetchers will be blocked trying to enqueue into a full > > > chunk queue.[td2] > > > - So the leader-finder-thread cannot finish adding fetchers for the > > > remaining partitions without leader and thus cannot shutdown. > > > > > > One way to fix would be to let the exception from the leader-finder-thread > > > propagate outside if the leader-finder-thread is currently shutting down - > > > and avoid the subsequent (unnecessary) attempt to add a fetcher and lock > > > partitionMapLock. There may be more elegant fixes (such as rewriting the > > > whole consumer manager logic) but obviously we want to avoid extensive > > > changes at this point in 0.8. > > > > > > Relevant portions of the thread-dump are here: > > > > > > [td1] createMessageStream's initial inline rebalance (blocked on the ongoing > > > watch-triggered rebalance) > > > > > > 2013-05-20_17:50:13.04848 "main" prio=10 tid=0x00007f5960008000 nid=0x3772 > waiting for monitor entry [0x00007f59666c3000] > > 2013-05-20_17:50:13.04848 java.lang.Thread.State: BLOCKED (on object > monitor) > > 2013-05-20_17:50:13.04848 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368) > > 2013-05-20_17:50:13.04849 - waiting to lock <0x00007f58637dfe10> (a > java.lang.Object) > > 2013-05-20_17:50:13.04849 at > kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:678) > > 2013-05-20_17:50:13.04850 at > kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:712) > > 2013-05-20_17:50:13.04850 at > kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140) > > 2013-05-20_17:50:13.04850 at > kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118) > > > 2013-05-20_17:50:13.04850 at > kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118) > > > 2013-05-20_17:50:13.04850 at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > > 2013-05-20_17:50:13.04851 at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > > 2013-05-20_17:50:13.04851 at > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) > > > 2013-05-20_17:50:13.04851 at > scala.collection.immutable.List.foreach(List.scala:45) > > > 2013-05-20_17:50:13.04851 at > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > > > 2013-05-20_17:50:13.04852 at > scala.collection.immutable.List.map(List.scala:45) > > > 2013-05-20_17:50:13.04852 at > kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118) > > > 2013-05-20_17:50:13.04852 at > kafka.tools.MirrorMaker.main(MirrorMaker.scala) > > > > > > [td2] A consumer fetcher thread blocked on full queue. > > > > > > 2013-05-20_17:50:13.04703 > "ConsumerFetcherThread-xxxx-1368836182178-2009023c-0-3248" prio=10 > tid=0x00007f57a4010800 nid=0x3920 waiting on condition [0x00 > > 007f58316ae000] > > > 2013-05-20_17:50:13.04703 java.lang.Thread.State: WAITING (parking) > > > 2013-05-20_17:50:13.04703 at sun.misc.Unsafe.park(Native Method) > > > 2013-05-20_17:50:13.04704 - parking to wait for <0x00007f586381d6c0> > (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > 2013-05-20_17:50:13.04704 at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) > > > 2013-05-20_17:50:13.04704 at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) > > 2013-05-20_17:50:13.04704 at > java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) > > > 2013-05-20_17:50:13.04704 at > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) > > > 2013-05-20_17:50:13.04705 at > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:50) > > > 2013-05-20_17:50:13.04706 at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:131) > > 2013-05-20_17:50:13.04707 at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:112) > > 2013-05-20_17:50:13.04708 at > scala.collection.immutable.Map$Map1.foreach(Map.scala:105) > > > 2013-05-20_17:50:13.04709 at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:112) > > > 2013-05-20_17:50:13.04709 at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > > > 2013-05-20_17:50:13.04709 at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > 2 > > > > > > [td3] Second watch-triggered rebalance > > > > > > 2013-05-20_17:50:13.04725 "xxxx-1368836182178-2009023c_watcher_executor" > prio=10 tid=0x00007f5960777800 nid=0x37af waiting on condition > [0x00007f58318b00 > 00] > > > 2013-05-20_17:50:13.04725 java.lang.Thread.State: WAITING (parking) > > > 2013-05-20_17:50:13.04726 at sun.misc.Unsafe.park(Native Method) > > > 2013-05-20_17:50:13.04726 - parking to wait for <0x00007f5863728de8> > (a java.util.concurrent.CountDownLatch$Sync) > > 2013-05-20_17:50:13.04726 at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) > > > 2013-05-20_17:50:13.04727 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > > 2013-05-20_17:50:13.04727 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) > > 2013-05-20_17:50:13.04728 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) > > 2013-05-20_17:50:13.04728 at > java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207) > > > 2013-05-20_17:50:13.04729 at > kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36) > > > 2013-05-20_17:50:13.04729 at > kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:125) > > > 2013-05-20_17:50:13.04730 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerCo > nnector.scala:486) > > > 2013-05-20_17:50:13.04730 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:523) > > 2013-05-20_17:50:13.04731 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala > :420) > > > 2013-05-20_17:50:13.04731 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:373) > > 2013-05-20_17:50:13.04732 at > scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) > > > 2013-05-20_17:50:13.04733 at > scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) > > > 2013-05-20_17:50:13.04733 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368) > > 2013-05-20_17:50:13.04733 - locked <0x00007f58637dfe10> (a > java.lang.Object) > > 2013-05-20_17:50:13.04734 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:325) > > > > > [td4] leader-finder-thread still trying to process partitions without leader, > blocked on the partitionMapLock held by processPartitionData in td2. > > > > > 2013-05-20_17:50:13.04712 "xxxx-1368836182178-2009023c-leader-finder-thread" > prio=10 tid=0x00007f57b0027800 nid=0x38d8 waiting on condition [0x00007f5831 > > 7af000] > > > 2013-05-20_17:50:13.04712 java.lang.Thread.State: WAITING (parking) > > > 2013-05-20_17:50:13.04713 at sun.misc.Unsafe.park(Native Method) > > > 2013-05-20_17:50:13.04713 - parking to wait for <0x00007f586375e3d8> > (a java.util.concurrent.locks.ReentrantLock$NonfairSync) > > 2013-05-20_17:50:13.04713 at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) > > > 2013-05-20_17:50:13.04714 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > > 2013-05-20_17:50:13.04714 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:867) > > 2013-05-20_17:50:13.04717 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1201) > > 2013-05-20_17:50:13.04718 at > java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:312) > > > 2013-05-20_17:50:13.04718 at > kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:173) > > > 2013-05-20_17:50:13.04719 at > kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:48) > > > 2013-05-20_17:50:13.04719 - locked <0x00007f586374b040> (a > java.lang.Object) > > 2013-05-20_17:50:13.04719 at > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:83) > > 2013-05-20_17:50:13.04720 at > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:79) > > 2013-05-20_17:50:13.04721 at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > > > 2013-05-20_17:50:13.04721 at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > > > 2013-05-20_17:50:13.04721 at > scala.collection.Iterator$class.foreach(Iterator.scala:631) > > > 2013-05-20_17:50:13.04722 at > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) > > > 2013-05-20_17:50:13.04723 at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) > > > 2013-05-20_17:50:13.04723 at > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > > > 2013-05-20_17:50:13.04723 at > scala.collection.mutable.HashMap.foreach(HashMap.scala:80) > > > 2013-05-20_17:50:13.04724 at > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:79) > > 2013-05-20_17:50:13.04724 at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)