Hi there, Just to clarify, is the broker still on 0.8? Unfortunately, the new consumer needs 0.9. That probably would explain the hanging.
-Jason On Thu, Mar 3, 2016 at 2:28 AM, Muthukumaran K <muthukumara...@ericsson.com> wrote: > Ewen, > > By new Consumer API, you mean KafkaConsumer ? I have an issue with a poll > in 0.9.0.1. poll hangs indefinitely even with the timeout > > Following is the consumer code which I am using. Any pointers would be > helpful > > public class ConsumerLoop implements Runnable { > > > private final KafkaConsumer<String, String> consumer; > private final List<String> topics; > private final int id; > > public ConsumerLoop(int id, > String groupId, > List<String> topics) { > this.id = id; > this.topics = topics; > Properties props = new Properties(); > props.put("bootstrap.servers", "192.168.56.101:9092"); > props.put("group.id", groupId); > props.put("auto.offset.reset", "earliest"); > props.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("metadata.fetch.timeout.ms", 1); > > this.consumer = new KafkaConsumer<>(props); > > } > > @Override > public void run() { > try { > > System.out.println("Starting consumer ID : " + id + > " Thread : " + Thread.currentThread().getName() + > " Topic : " + topics.toString() + > " ... "); > long startTime = System.currentTimeMillis(); > int recordCount = 0; > > consumer.subscribe(topics); > > System.out.println("Consumer-ID " + id + " after subscribe..."); > > while (true) { > ConsumerRecords<String, String> records = consumer.poll(10000); > > System.out.println("Consumer-ID " + id + " after poll..."); > > > for (ConsumerRecord<String, String> record : records) { > Map<String, Object> data = new HashMap<>(); > data.put("partition", record.partition()); > data.put("offset", record.offset()); > data.put("value", record.value()); > System.out.println( > "Consumer-ID : " + this.id + > ": " + data + > " Thread_name : " + > Thread.currentThread().getName()); > recordCount++; > > } > long endTime = System.currentTimeMillis(); > long duration = (endTime - startTime)/1000; > System.out.println("###### rate : " + recordCount/duration + " > msgs/sec on Consumer ID " + id); > > } > } catch (WakeupException e) { > // ignore for shutdown > } finally { > consumer.close(); > } > } > > public void shutdown() { > > consumer.wakeup(); > } > > Regards > Muthu > > > -----Original Message----- > From: Ewen Cheslack-Postava [mailto:e...@confluent.io] > Sent: Thursday, March 03, 2016 2:21 PM > To: users@kafka.apache.org > Subject: Re: Consumer deadlock > > Take a look at the consumer.timeout.ms setting if you don't want the > iterator to block indefinitely. > > And a better long term solution is to switch to the new consumer, but that > obviously requires much more significant code changes. The new consumer API > is a single-threaded poll-based API where you can always specify timeouts > to the consumer's poll() method (though it currently has some limitations > to how it enforces that timeout). > > -Ewen > > On Wed, Mar 2, 2016 at 11:24 AM, Oleg Zhurakousky < > ozhurakou...@hortonworks.com> wrote: > > > Guys > > > > We have a consumer deadlock and here is the relevant dump: > > > > "Timer-Driven Process Thread-10" Id=76 TIMED_WAITING on > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@39775787 > > at sun.misc.Unsafe.park(Native Method) > > at > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > > at > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > > at > > > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467) > > at > > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65) > > at > > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) > > at > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) > > at > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) > > . . . . . > > > > What worries me is the fact that ‘hasNext’ is essentially a blocking > > operation. I can’t seem to find a single reason when it would be > > useful, hence I am calling it a bug, but hopefully someone can clarify. > > Kafka version is 0.8.* > > > > Cheers > > Oleg > > > > > > > -- > Thanks, > Ewen >