Can you post some logs from the consumer? That should tell us what it's busy doing while hanging. You may have to enable DEBUG level.
-Jason On Thu, Mar 3, 2016 at 5:02 PM, Muthukumaran K <muthukumara...@ericsson.com> wrote: > Hi Jason, > > I am using 0.9 broker. > > One more observation. I had written producer code with 0.9 - Even with > Producer code, I had hanging issue where send method was hanging requesting > metadata. Thread-dump below > > "main" prio=6 tid=0x0000000002238000 nid=0x1390 in Object.wait() > [0x00000000025bf000] > java.lang.Thread.State: TIMED_WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0x00000007aecacea0> (a > org.apache.kafka.clients.Metadata) > at > org.apache.kafka.clients.Metadata.awaitUpdate(Metadata.java:121) > - locked <0x00000007aecacea0> (a > org.apache.kafka.clients.Metadata) > at > org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:483) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:412) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:339) > at kafkaexperiments.ProducerDriver.main(ProducerDriver.java:41) > > Then I included metadata.fetch.timeout.ms=1 and then producer started > working. But when I poll the same topic using kafka-console-consumer.sh, > console-consumer also hangs. > > > > Regards > Muthu > > > -----Original Message----- > From: Jason Gustafson [mailto:ja...@confluent.io] > Sent: Friday, March 04, 2016 5:33 AM > To: users@kafka.apache.org > Subject: Re: Consumer deadlock > > Hi there, > > Just to clarify, is the broker still on 0.8? Unfortunately, the new > consumer needs 0.9. That probably would explain the hanging. > > -Jason > > On Thu, Mar 3, 2016 at 2:28 AM, Muthukumaran K < > muthukumara...@ericsson.com> > wrote: > > > Ewen, > > > > By new Consumer API, you mean KafkaConsumer ? I have an issue with a > > poll in 0.9.0.1. poll hangs indefinitely even with the timeout > > > > Following is the consumer code which I am using. Any pointers would be > > helpful > > > > public class ConsumerLoop implements Runnable { > > > > > > private final KafkaConsumer<String, String> consumer; > > private final List<String> topics; > > private final int id; > > > > public ConsumerLoop(int id, > > String groupId, > > List<String> topics) { > > this.id = id; > > this.topics = topics; > > Properties props = new Properties(); > > props.put("bootstrap.servers", "192.168.56.101:9092"); > > props.put("group.id", groupId); > > props.put("auto.offset.reset", "earliest"); > > props.put("key.deserializer", > > "org.apache.kafka.common.serialization.StringDeserializer"); > > props.put("value.deserializer", > > "org.apache.kafka.common.serialization.StringDeserializer"); > > props.put("metadata.fetch.timeout.ms", 1); > > > > this.consumer = new KafkaConsumer<>(props); > > > > } > > > > @Override > > public void run() { > > try { > > > > System.out.println("Starting consumer ID : " + id + > > " Thread : " + Thread.currentThread().getName() + > > " Topic : " + topics.toString() + > > " ... "); > > long startTime = System.currentTimeMillis(); > > int recordCount = 0; > > > > consumer.subscribe(topics); > > > > System.out.println("Consumer-ID " + id + " after > > subscribe..."); > > > > while (true) { > > ConsumerRecords<String, String> records = > > consumer.poll(10000); > > > > System.out.println("Consumer-ID " + id + " after > > poll..."); > > > > > > for (ConsumerRecord<String, String> record : records) { > > Map<String, Object> data = new HashMap<>(); > > data.put("partition", record.partition()); > > data.put("offset", record.offset()); > > data.put("value", record.value()); > > System.out.println( > > "Consumer-ID : " + this.id + > > ": " + data + > > " Thread_name : " + > > Thread.currentThread().getName()); > > recordCount++; > > > > } > > long endTime = System.currentTimeMillis(); > > long duration = (endTime - startTime)/1000; > > System.out.println("###### rate : " + recordCount/duration + > " > > msgs/sec on Consumer ID " + id); > > > > } > > } catch (WakeupException e) { > > // ignore for shutdown > > } finally { > > consumer.close(); > > } > > } > > > > public void shutdown() { > > > > consumer.wakeup(); > > } > > > > Regards > > Muthu > > > > > > -----Original Message----- > > From: Ewen Cheslack-Postava [mailto:e...@confluent.io] > > Sent: Thursday, March 03, 2016 2:21 PM > > To: users@kafka.apache.org > > Subject: Re: Consumer deadlock > > > > Take a look at the consumer.timeout.ms setting if you don't want the > > iterator to block indefinitely. > > > > And a better long term solution is to switch to the new consumer, but > > that obviously requires much more significant code changes. The new > > consumer API is a single-threaded poll-based API where you can always > > specify timeouts to the consumer's poll() method (though it currently > > has some limitations to how it enforces that timeout). > > > > -Ewen > > > > On Wed, Mar 2, 2016 at 11:24 AM, Oleg Zhurakousky < > > ozhurakou...@hortonworks.com> wrote: > > > > > Guys > > > > > > We have a consumer deadlock and here is the relevant dump: > > > > > > "Timer-Driven Process Thread-10" Id=76 TIMED_WAITING on > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@ > > 39775787 > > > at sun.misc.Unsafe.park(Native Method) > > > at > > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > > > at > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject. > > awaitNanos(AbstractQueuedSynchronizer.java:2078) > > > at > > > > > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java > > :467) > > > at > > > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65) > > > at > > > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) > > > at > > > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) > > > at > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) > > > . . . . . > > > > > > What worries me is the fact that ‘hasNext’ is essentially a blocking > > > operation. I can’t seem to find a single reason when it would be > > > useful, hence I am calling it a bug, but hopefully someone can clarify. > > > Kafka version is 0.8.* > > > > > > Cheers > > > Oleg > > > > > > > > > > > > -- > > Thanks, > > Ewen > > >