Hi all,

I have created 6 Kafka topics each with 2 partitions, and now I want to
consume from all partitions in a Java app.

I have created a Executors.newScheduledThreadPool(12) for this, and then
submit my KafkaConsumer implementations to this thread pool.

In KafkaConsumer, I do:
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
    MessageAndMetadata<byte[], byte[]> current = it.next();
    ... process record ...
}

When I run this Java app, everything appears to work at first, but after
about 30 seconds, the threads grind to a halt (I think because it.hasNext()
is blocking indefinitely) ... and then no progress is made.

Messages are constantly being published to Kafka, and there is a lot left
to process, so why would the iterator block indefinitely? (I have to
restart the Java app to consume more messages)

Thanks for any help with this!
Josh

Reply via email to