The way i am doing this is as below
First i get a map of topic to stream list
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
From that i get the Stream list for my topic
List<KafkaStream<byte[], byte[]>> stream = consumerMap.get(topic);
I iterate on the list, for each stream i create a new thread and run in
that thread
In each thread i get the iterator for the stream
stream.iterator();
Iterate on that stream
while (it.hasNext()) {//Blocking call.
MessageAndMetadata<byte[], byte[]> message =
it.next();
try {
handleMessage(message);
consumer.commitOffsets();
} catch (Throwable e) {
logger.error("Failed to process", e);
}
}
While in 0.7, this used to get stuck when there is a error, we used to
fix the thing and restart the app, then the processing went on fine. Is
this not the case now? As explained earlier while testing we found out
that the consumer is consuming next message and if next message gets
processed well, then all the previous messages are gone. They are in the
queue, but we don't even know the message offsets to get them and i just
read we cant even get them even if we have the offsets.
Please let me know weather my understanding is correct or not.
Thanks
Arjun Narasimha Kota
On Friday 22 August 2014 04:48 PM, Arjun wrote:
Hi,
I have My Kafka setup with 0.8.1 kafka with 3 producers and 3
consumers. I use the high level consumer. My doubt is , i read the
messages from the consumer iterator, and after reading the message i
process those messages, if the processing throws any exception i am
not commiting any offset.If not then i am commiting the offset.
Example : lets say i have 100 messages in the queue.
after 90 messages, the 91st message is read, but while processing(my
code), there is some exception and i havent commited the offset.
Then in this case, if i say consumer iterator.next, will it give me
the next message i.e 92nd message or will not.
And if the 92nd message is serverd, and its processing is done
smoothly, and if i commit the offset, can i get the 91st message again?
thanks
Arjun Narasimha Kota