Ewen,
By new Consumer API, you mean KafkaConsumer ? I have an issue with a poll in
0.9.0.1. poll hangs indefinitely even with the timeout
Following is the consumer code which I am using. Any pointers would be helpful
public class ConsumerLoop implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final List<String> topics;
private final int id;
public ConsumerLoop(int id,
String groupId,
List<String> topics) {
this.id = id;
this.topics = topics;
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.56.101:9092");
props.put("group.id", groupId);
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("metadata.fetch.timeout.ms", 1);
this.consumer = new KafkaConsumer<>(props);
}
@Override
public void run() {
try {
System.out.println("Starting consumer ID : " + id +
" Thread : " + Thread.currentThread().getName() +
" Topic : " + topics.toString() +
" ... ");
long startTime = System.currentTimeMillis();
int recordCount = 0;
consumer.subscribe(topics);
System.out.println("Consumer-ID " + id + " after subscribe...");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(10000);
System.out.println("Consumer-ID " + id + " after poll...");
for (ConsumerRecord<String, String> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
System.out.println(
"Consumer-ID : " + this.id +
": " + data +
" Thread_name : " +
Thread.currentThread().getName());
recordCount++;
}
long endTime = System.currentTimeMillis();
long duration = (endTime - startTime)/1000;
System.out.println("###### rate : " + recordCount/duration + "
msgs/sec on Consumer ID " + id);
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
}
}
public void shutdown() {
consumer.wakeup();
}
Regards
Muthu
-----Original Message-----
From: Ewen Cheslack-Postava [mailto:[email protected]]
Sent: Thursday, March 03, 2016 2:21 PM
To: [email protected]
Subject: Re: Consumer deadlock
Take a look at the consumer.timeout.ms setting if you don't want the iterator
to block indefinitely.
And a better long term solution is to switch to the new consumer, but that
obviously requires much more significant code changes. The new consumer API is
a single-threaded poll-based API where you can always specify timeouts to the
consumer's poll() method (though it currently has some limitations to how it
enforces that timeout).
-Ewen
On Wed, Mar 2, 2016 at 11:24 AM, Oleg Zhurakousky <
[email protected]> wrote:
> Guys
>
> We have a consumer deadlock and here is the relevant dump:
>
> "Timer-Driven Process Thread-10" Id=76 TIMED_WAITING on
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@39775787
> at sun.misc.Unsafe.park(Native Method)
> at
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at
> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
> at
> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
> at
> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
> at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> . . . . .
>
> What worries me is the fact that ‘hasNext’ is essentially a blocking
> operation. I can’t seem to find a single reason when it would be
> useful, hence I am calling it a bug, but hopefully someone can clarify.
> Kafka version is 0.8.*
>
> Cheers
> Oleg
>
>
--
Thanks,
Ewen