---------- Forwarded message ----------
From: Juan Valencia <jvalen...@sharethis.com>
Date: Thu, Nov 29, 2012 at 11:45 AM
Subject: Consumer Hanging (yes, read the FAQ)
To: users@kafka.apache.org



My consumers randomly hang.  They essentially end up waiting on a kafka
stream:
There is plenty of data on the brokers and a ratio of about 1 consumer for
5 partitions.
Any thoughts on how to keep the stream going?  Or at least restart on hang?
e.g.

                ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer
 .createJavaConsumerConnector(consumerConfig);
 Map<String, List<KafkaStream<Message>>> topicMessageStreams =
consumerConnector
 .createMessageStreams(ImmutableMap.of(topic, 10));
List<KafkaStream<Message>> streams = topicMessageStreams.get(topic);
 ExecutorService executor = Executors.newFixedThreadPool(10);

                for (final KafkaStream<Message> stream : streams) {
 executor.submit(new Runnable() {
@Override
public void run() {
 try {
****** for (MessageAndMetadata<Message> msgAndMetadata : stream) {
try { ....

The offset checker shows that the offsets stop moving, but the partitions
still have owners.

A jstack trace shows the following (*'s below correspond to *'s above):

"pool-1-thread-10" prio=10 tid=0x00007effa4203000 nid=0x50f5 waiting on
condition [0x00007eff9965b000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000008b822548> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
        at
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:386)
        at
kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60)
        at
kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32)
        at
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
 ******       at
com.mydomain.kafka.consumer.myConsumer$1.run(myConsumer.java:101)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
        at java.util.concurrent.FutureTask.run(FutureTask.java:166)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:679)








-- 

Learn More:  SQI (Social Quality Index) - A Universal Measure of Social
Quality

Reply via email to