Hi all,

We run kafka 0.8.1.1. We’re tracking down a problem where consumer groups stop 
pulling from their respective partitions a few minutes or hours into execution. 
It looks like all ConsumerFetcherThreads associated with that consumer are 
blocking while waiting to write data to a LinkedBlockingQueue. They are waiting 
on ConditionObjects with different object IDs, and those object IDs do not 
occur elsewhere within our snapshot of thread data. It appears that those 
threads never make progress once they enter this waiting state.

KAFKA-937 looks like a very similar symptom: 
https://issues.apache.org/jira/browse/KAFKA-937 According to Jun Rao’s comments 
on that issue, a ConsumerFetcherThread should never be blocked. Is that still 
the case?

Here’s the thread dump for the relevant threads. I can provide more information 
if needed.

"ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-0" 
prio=10 tid=0x00007f1954a9c800 nid=0xbf0 waiting on condition 
[0x00007f19339f8000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000008ac24dd8> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
    at 
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
    at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
    at 
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
    at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
    at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
    at scala.collection.immutable.Map$Map3.foreach(Map.scala:154)
    at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
    at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
    at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
    at kafka.utils.Utils$.inLock(Utils.scala:538)
    at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

"ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-1" 
prio=10 tid=0x00007f1955657000 nid=0xbf3 waiting on condition 
[0x00007f19321e0000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000008ad280e8> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
    at 
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
    at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
    at 
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
    at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
    at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
    at scala.collection.immutable.Map$Map3.foreach(Map.scala:154)
    at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
    at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
    at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
    at kafka.utils.Utils$.inLock(Utils.scala:538)
    at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

"ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-2" 
prio=10 tid=0x00007f1954001000 nid=0xbf1 waiting on condition 
[0x00007f19326e5000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000008abf72c0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
    at 
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
    at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
    at 
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
    at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
    at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
    at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
    at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
    at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
    at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
    at kafka.utils.Utils$.inLock(Utils.scala:538)
    at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

--
Jack Foy <j...@whitepages.com<mailto:j...@whitepages.com>>



Reply via email to