Can you post some logs from the consumer? That should tell us what it's
busy doing while hanging. You may have to enable DEBUG level.

-Jason

On Thu, Mar 3, 2016 at 5:02 PM, Muthukumaran K <muthukumara...@ericsson.com>
wrote:

> Hi Jason,
>
> I am using 0.9 broker.
>
> One more observation. I had written producer code with 0.9 - Even with
> Producer code, I had hanging issue where send method was hanging requesting
> metadata. Thread-dump below
>
> "main" prio=6 tid=0x0000000002238000 nid=0x1390 in Object.wait()
> [0x00000000025bf000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>             at java.lang.Object.wait(Native Method)
>             - waiting on <0x00000007aecacea0> (a
> org.apache.kafka.clients.Metadata)
>             at
> org.apache.kafka.clients.Metadata.awaitUpdate(Metadata.java:121)
>             - locked <0x00000007aecacea0> (a
> org.apache.kafka.clients.Metadata)
>             at
> org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:483)
>             at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:412)
>             at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:339)
>             at kafkaexperiments.ProducerDriver.main(ProducerDriver.java:41)
>
> Then I included metadata.fetch.timeout.ms=1 and then producer started
> working. But when I poll the same topic using kafka-console-consumer.sh,
> console-consumer also hangs.
>
>
>
> Regards
> Muthu
>
>
> -----Original Message-----
> From: Jason Gustafson [mailto:ja...@confluent.io]
> Sent: Friday, March 04, 2016 5:33 AM
> To: users@kafka.apache.org
> Subject: Re: Consumer deadlock
>
> Hi there,
>
> Just to clarify, is the broker still on 0.8? Unfortunately, the new
> consumer needs 0.9. That probably would explain the hanging.
>
> -Jason
>
> On Thu, Mar 3, 2016 at 2:28 AM, Muthukumaran K <
> muthukumara...@ericsson.com>
> wrote:
>
> > Ewen,
> >
> > By new Consumer API, you mean KafkaConsumer ? I have an issue with a
> > poll in 0.9.0.1. poll hangs indefinitely even with the timeout
> >
> > Following is the consumer code which I am using. Any pointers would be
> > helpful
> >
> > public class ConsumerLoop implements Runnable {
> >
> >
> >     private final KafkaConsumer<String, String> consumer;
> >     private final List<String> topics;
> >     private final int id;
> >
> >     public ConsumerLoop(int id,
> >                       String groupId,
> >                       List<String> topics) {
> >         this.id = id;
> >         this.topics = topics;
> >         Properties props = new Properties();
> >         props.put("bootstrap.servers", "192.168.56.101:9092");
> >         props.put("group.id", groupId);
> >         props.put("auto.offset.reset", "earliest");
> >         props.put("key.deserializer",
> > "org.apache.kafka.common.serialization.StringDeserializer");
> >         props.put("value.deserializer",
> > "org.apache.kafka.common.serialization.StringDeserializer");
> >         props.put("metadata.fetch.timeout.ms", 1);
> >
> >         this.consumer = new KafkaConsumer<>(props);
> >
> >     }
> >
> >     @Override
> >     public void run() {
> >         try {
> >
> >             System.out.println("Starting consumer ID : " + id +
> >                     " Thread : " + Thread.currentThread().getName() +
> >                     " Topic : " + topics.toString() +
> >                     " ... ");
> >             long startTime = System.currentTimeMillis();
> >             int recordCount = 0;
> >
> >           consumer.subscribe(topics);
> >
> >           System.out.println("Consumer-ID " + id + " after
> > subscribe...");
> >
> >           while (true) {
> >             ConsumerRecords<String, String> records =
> > consumer.poll(10000);
> >
> >             System.out.println("Consumer-ID " + id + " after
> > poll...");
> >
> >
> >             for (ConsumerRecord<String, String> record : records) {
> >               Map<String, Object> data = new HashMap<>();
> >               data.put("partition", record.partition());
> >               data.put("offset", record.offset());
> >               data.put("value", record.value());
> >               System.out.println(
> >                       "Consumer-ID : " + this.id +
> >                               ": " + data +
> >                               " Thread_name : " +
> > Thread.currentThread().getName());
> >               recordCount++;
> >
> >             }
> >             long endTime = System.currentTimeMillis();
> >             long duration = (endTime - startTime)/1000;
> >             System.out.println("###### rate : " + recordCount/duration +
> "
> > msgs/sec on Consumer ID " + id);
> >
> >           }
> >         } catch (WakeupException e) {
> >           // ignore for shutdown
> >         } finally {
> >           consumer.close();
> >         }
> >     }
> >
> >     public void shutdown() {
> >
> >         consumer.wakeup();
> >     }
> >
> > Regards
> > Muthu
> >
> >
> > -----Original Message-----
> > From: Ewen Cheslack-Postava [mailto:e...@confluent.io]
> > Sent: Thursday, March 03, 2016 2:21 PM
> > To: users@kafka.apache.org
> > Subject: Re: Consumer deadlock
> >
> > Take a look at the consumer.timeout.ms setting if you don't want the
> > iterator to block indefinitely.
> >
> > And a better long term solution is to switch to the new consumer, but
> > that obviously requires much more significant code changes. The new
> > consumer API is a single-threaded poll-based API where you can always
> > specify timeouts to the consumer's poll() method (though it currently
> > has some limitations to how it enforces that timeout).
> >
> > -Ewen
> >
> > On Wed, Mar 2, 2016 at 11:24 AM, Oleg Zhurakousky <
> > ozhurakou...@hortonworks.com> wrote:
> >
> > > Guys
> > >
> > > We have a consumer deadlock and here is the relevant dump:
> > >
> > > "Timer-Driven Process Thread-10" Id=76 TIMED_WAITING  on
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@
> > 39775787
> > >         at sun.misc.Unsafe.park(Native Method)
> > >         at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> > >         at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.
> > awaitNanos(AbstractQueuedSynchronizer.java:2078)
> > >         at
> > >
> > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java
> > :467)
> > >         at
> > > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
> > >         at
> > > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
> > >         at
> > >
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> > >         at
> > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> > >         . . . . .
> > >
> > > What worries me is the fact that ‘hasNext’ is essentially a blocking
> > > operation. I can’t seem to find a single reason when it would be
> > > useful, hence I am calling it a bug, but hopefully someone can clarify.
> > > Kafka version is 0.8.*
> > >
> > > Cheers
> > > Oleg
> > >
> > >
> >
> >
> > --
> > Thanks,
> > Ewen
> >
>

Reply via email to