Ewen, 

By new Consumer API, you mean KafkaConsumer ? I have an issue with a poll in 
0.9.0.1. poll hangs indefinitely even with the timeout

Following is the consumer code which I am using. Any pointers would be helpful

public class ConsumerLoop implements Runnable {
    
    
    private final KafkaConsumer<String, String> consumer;
    private final List<String> topics;
    private final int id;

    public ConsumerLoop(int id,
                      String groupId,
                      List<String> topics) {
        this.id = id;
        this.topics = topics;
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.56.101:9092");
        props.put("group.id", groupId);
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
        props.put("metadata.fetch.timeout.ms", 1);

        this.consumer = new KafkaConsumer<>(props);

    }

    @Override
    public void run() {
        try {
            
            System.out.println("Starting consumer ID : " + id + 
                    " Thread : " + Thread.currentThread().getName() + 
                    " Topic : " + topics.toString() +
                    " ... ");
            long startTime = System.currentTimeMillis();
            int recordCount = 0;

          consumer.subscribe(topics);

          System.out.println("Consumer-ID " + id + " after subscribe...");

          while (true) {
            ConsumerRecords<String, String> records = consumer.poll(10000);

            System.out.println("Consumer-ID " + id + " after poll...");


            for (ConsumerRecord<String, String> record : records) {
              Map<String, Object> data = new HashMap<>();
              data.put("partition", record.partition());
              data.put("offset", record.offset());
              data.put("value", record.value());
              System.out.println(
                      "Consumer-ID : " + this.id +
                              ": " + data +
                              " Thread_name : " + 
Thread.currentThread().getName());
              recordCount++;

            }
            long endTime = System.currentTimeMillis();
            long duration = (endTime - startTime)/1000;
            System.out.println("###### rate : " + recordCount/duration + " 
msgs/sec on Consumer ID " + id);

          }
        } catch (WakeupException e) {
          // ignore for shutdown
        } finally {
          consumer.close();
        }
    }

    public void shutdown() {

        consumer.wakeup();
    }

Regards
Muthu


-----Original Message-----
From: Ewen Cheslack-Postava [mailto:e...@confluent.io] 
Sent: Thursday, March 03, 2016 2:21 PM
To: users@kafka.apache.org
Subject: Re: Consumer deadlock

Take a look at the consumer.timeout.ms setting if you don't want the iterator 
to block indefinitely.

And a better long term solution is to switch to the new consumer, but that 
obviously requires much more significant code changes. The new consumer API is 
a single-threaded poll-based API where you can always specify timeouts to the 
consumer's poll() method (though it currently has some limitations to how it 
enforces that timeout).

-Ewen

On Wed, Mar 2, 2016 at 11:24 AM, Oleg Zhurakousky < 
ozhurakou...@hortonworks.com> wrote:

> Guys
>
> We have a consumer deadlock and here is the relevant dump:
>
> "Timer-Driven Process Thread-10" Id=76 TIMED_WAITING  on
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@39775787
>         at sun.misc.Unsafe.park(Native Method)
>         at
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>         at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>         at
> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
>         at
> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
>         at
> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>         at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>         . . . . .
>
> What worries me is the fact that ‘hasNext’ is essentially a blocking 
> operation. I can’t seem to find a single reason when it would be 
> useful, hence I am calling it a bug, but hopefully someone can clarify.
> Kafka version is 0.8.*
>
> Cheers
> Oleg
>
>


--
Thanks,
Ewen

Reply via email to