The way i am doing this is as below

First i get a map of topic to stream list
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

From that i get the Stream list for my topic
      List<KafkaStream<byte[], byte[]>> stream = consumerMap.get(topic);

I iterate on the list, for each stream i create a new thread and run in that thread

In each thread i get the iterator for the stream
     stream.iterator();
Iterate on that stream
                    while (it.hasNext()) {//Blocking call.
MessageAndMetadata<byte[], byte[]> message = it.next();
                        try {
                            handleMessage(message);
                            consumer.commitOffsets();
                        } catch (Throwable e) {
                            logger.error("Failed to process", e);
                        }
                    }

While in 0.7, this used to get stuck when there is a error, we used to fix the thing and restart the app, then the processing went on fine. Is this not the case now? As explained earlier while testing we found out that the consumer is consuming next message and if next message gets processed well, then all the previous messages are gone. They are in the queue, but we don't even know the message offsets to get them and i just read we cant even get them even if we have the offsets.

Please let me know weather my understanding is correct or not.

Thanks
Arjun Narasimha Kota





On Friday 22 August 2014 04:48 PM, Arjun wrote:
Hi,

I have My Kafka setup with 0.8.1 kafka with 3 producers and 3 consumers. I use the high level consumer. My doubt is , i read the messages from the consumer iterator, and after reading the message i process those messages, if the processing throws any exception i am not commiting any offset.If not then i am commiting the offset.

Example : lets say i have 100 messages in the queue.

after 90 messages, the 91st message is read, but while processing(my code), there is some exception and i havent commited the offset. Then in this case, if i say consumer iterator.next, will it give me the next message i.e 92nd message or will not. And if the 92nd message is serverd, and its processing is done smoothly, and if i commit the offset, can i get the 91st message again?


thanks
Arjun Narasimha Kota

Reply via email to