Ewen, By new Consumer API, you mean KafkaConsumer ? I have an issue with a poll in poll hangs indefinitely even with the timeout
Following is the consumer code which I am using. Any pointers would be helpful public class ConsumerLoop implements Runnable { private final KafkaConsumer<String, String> consumer; private final List<String> topics; private final int id; public ConsumerLoop(int id, String groupId, List<String> topics) { this.id = id; this.topics = topics; Properties props = new Properties(); props.put("bootstrap.servers", ""); props.put("group.id", groupId); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("metadata.fetch.timeout.ms", 1); this.consumer = new KafkaConsumer<>(props); } @Override public void run() { try { System.out.println("Starting consumer ID : " + id + " Thread : " + Thread.currentThread().getName() + " Topic : " + topics.toString() + " ... "); long startTime = System.currentTimeMillis(); int recordCount = 0; consumer.subscribe(topics); System.out.println("Consumer-ID " + id + " after subscribe..."); while (true) { ConsumerRecords<String, String> records = consumer.poll(10000); System.out.println("Consumer-ID " + id + " after poll..."); for (ConsumerRecord<String, String> record : records) { Map<String, Object> data = new HashMap<>(); data.put("partition", record.partition()); data.put("offset", record.offset()); data.put("value", record.value()); System.out.println( "Consumer-ID : " + this.id + ": " + data + " Thread_name : " + Thread.currentThread().getName()); recordCount++; } long endTime = System.currentTimeMillis(); long duration = (endTime - startTime)/1000; System.out.println("###### rate : " + recordCount/duration + " msgs/sec on Consumer ID " + id); } } catch (WakeupException e) { // ignore for shutdown } finally { consumer.close(); } } public void shutdown() { consumer.wakeup(); } Regards Muthu -----Original Message----- From: Ewen Cheslack-Postava [mailto:e...@confluent.io] Sent: Thursday, March 03, 2016 2:21 PM To: users@kafka.apache.org Subject: Re: Consumer deadlock Take a look at the consumer.timeout.ms setting if you don't want the iterator to block indefinitely. And a better long term solution is to switch to the new consumer, but that obviously requires much more significant code changes. The new consumer API is a single-threaded poll-based API where you can always specify timeouts to the consumer's poll() method (though it currently has some limitations to how it enforces that timeout). -Ewen On Wed, Mar 2, 2016 at 11:24 AM, Oleg Zhurakousky < ozhurakou...@hortonworks.com> wrote: > Guys > > We have a consumer deadlock and here is the relevant dump: > > "Timer-Driven Process Thread-10" Id=76 TIMED_WAITING on > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@39775787 > at sun.misc.Unsafe.park(Native Method) > at > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467) > at > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65) > at > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) > . . . . . > > What worries me is the fact that ‘hasNext’ is essentially a blocking > operation. I can’t seem to find a single reason when it would be > useful, hence I am calling it a bug, but hopefully someone can clarify. > Kafka version is 0.8.* > > Cheers > Oleg > > -- Thanks, Ewen