The thread dump basically shows that the consumer thread (from your executor) is waiting on the internal fetcher queue. This means the internal fetcher queue is empty, which in turn suggests that the fetcher might not be running. Please can you file a bug and share the entire thread dump there ?
Thanks, Neha On Thu, Nov 29, 2012 at 1:04 PM, Matan Amir <matan.a...@voxer.com> wrote: > A popular case is that the thread isn't catching thrown Exceptions in run() > - which causes the thread to be restarted in a typical Executor > configuration - and does not pull any more items. > > Is that the case? > > > On Thu, Nov 29, 2012 at 11:48 AM, Juan Valencia > <jvalen...@sharethis.com>wrote: > >> ---------- Forwarded message ---------- >> From: Juan Valencia <jvalen...@sharethis.com> >> Date: Thu, Nov 29, 2012 at 11:45 AM >> Subject: Consumer Hanging (yes, read the FAQ) >> To: users@kafka.apache.org >> >> >> >> My consumers randomly hang. They essentially end up waiting on a kafka >> stream: >> There is plenty of data on the brokers and a ratio of about 1 consumer for >> 5 partitions. >> Any thoughts on how to keep the stream going? Or at least restart on hang? >> e.g. >> >> ConsumerConfig consumerConfig = new ConsumerConfig(props); >> ConsumerConnector consumerConnector = Consumer >> .createJavaConsumerConnector(consumerConfig); >> Map<String, List<KafkaStream<Message>>> topicMessageStreams = >> consumerConnector >> .createMessageStreams(ImmutableMap.of(topic, 10)); >> List<KafkaStream<Message>> streams = topicMessageStreams.get(topic); >> ExecutorService executor = Executors.newFixedThreadPool(10); >> >> for (final KafkaStream<Message> stream : streams) { >> executor.submit(new Runnable() { >> @Override >> public void run() { >> try { >> ****** for (MessageAndMetadata<Message> msgAndMetadata : stream) { >> try { .... >> >> The offset checker shows that the offsets stop moving, but the partitions >> still have owners. >> >> A jstack trace shows the following (*'s below correspond to *'s above): >> >> "pool-1-thread-10" prio=10 tid=0x00007effa4203000 nid=0x50f5 waiting on >> condition [0x00007eff9965b000] >> java.lang.Thread.State: WAITING (parking) >> at sun.misc.Unsafe.park(Native Method) >> - parking to wait for <0x000000008b822548> (a >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) >> at >> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) >> at >> >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) >> at >> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:386) >> at >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60) >> at >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32) >> at >> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) >> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) >> ****** at >> com.mydomain.kafka.consumer.myConsumer$1.run(myConsumer.java:101) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >> at >> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) >> at java.util.concurrent.FutureTask.run(FutureTask.java:166) >> at >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) >> at >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) >> at java.lang.Thread.run(Thread.java:679) >> >> >> >> >> >> >> >> >> -- >> >> Learn More: SQI (Social Quality Index) - A Universal Measure of Social >> Quality >>