---------- Forwarded message ---------- From: Juan Valencia <jvalen...@sharethis.com> Date: Thu, Nov 29, 2012 at 11:45 AM Subject: Consumer Hanging (yes, read the FAQ) To: users@kafka.apache.org
My consumers randomly hang. They essentially end up waiting on a kafka stream: There is plenty of data on the brokers and a ratio of about 1 consumer for 5 partitions. Any thoughts on how to keep the stream going? Or at least restart on hang? e.g. ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = Consumer .createJavaConsumerConnector(consumerConfig); Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector .createMessageStreams(ImmutableMap.of(topic, 10)); List<KafkaStream<Message>> streams = topicMessageStreams.get(topic); ExecutorService executor = Executors.newFixedThreadPool(10); for (final KafkaStream<Message> stream : streams) { executor.submit(new Runnable() { @Override public void run() { try { ****** for (MessageAndMetadata<Message> msgAndMetadata : stream) { try { .... The offset checker shows that the offsets stop moving, but the partitions still have owners. A jstack trace shows the following (*'s below correspond to *'s above): "pool-1-thread-10" prio=10 tid=0x00007effa4203000 nid=0x50f5 waiting on condition [0x00007eff9965b000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000000008b822548> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:386) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) ****** at com.mydomain.kafka.consumer.myConsumer$1.run(myConsumer.java:101) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) at java.util.concurrent.FutureTask.run(FutureTask.java:166) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:679) -- Learn More: SQI (Social Quality Index) - A Universal Measure of Social Quality