It seems like you're not explicitly controlling the offsets. Is that correct?
If so, the moment you pull a message from the stream, the client framework considers it processed. So if your app subsequently crashes before the message is fully processed, and "auto-commit" updates the offsets in Zookeeper, you will drop that message. The solution to this to call commitOffsets() explicitly. Philip On Tue, Jul 9, 2013 at 11:16 AM, Chris Curtin <curtin.ch...@gmail.com>wrote: > Hi, > > I'm working through a production-level High Level Consumer app and have a > couple of error/shutdown questions to understand how the offset storage is > handled. > > Test case - simulate an error writing to destination application, for > example a database, offset is 'lost' > > Scenario > - write 500 messages for each topic/partition > - use the example High Level Consumer code I wrote for the Wiki > - Change the code so that every 10th read from the 'hasNext()' > ConsumerIterator breaks out of the loop and returns from the thread, > simulating a hard error. I write the offset to System.out to see what was > provided > - startup again and look to see what offset was first emitted for a > partition > > Issue: Kafka treats the offset for the message read that caused me to break > out of the loop as processed (as expected), but I really failed. How do I > tell Kafka that I didn't really consume that offset? > > Here is the example code in the 'business logic': > > public void run() { > ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); > int counter = 0; > while (it.hasNext()) { > MessageAndMetadata<byte[], byte[]> msg = it.next(); > if (counter == 10) { > System.out.println("Stopping Thread " + m_threadNumber + ": > Partition: " + msg.partition() + > ": Offset: " + msg.offset() + " :" + new > String(msg.message())); > break; > } > System.out.println("Thread " + m_threadNumber + ": Partition: " > + msg.partition() + > ": Offset: " + msg.offset() + " :" + new > String(msg.message())); > counter++; > } > > System.out.println("Shutting down Thread: " + m_threadNumber); > } > > I understand that handling 'hard' errors like JVM crashes, kill -9 etc. may > leave the offsets in ZooKeeper incorrect, but I'm trying to understand what > happens in a clean shutdown where Kafka and the Consumer are behaving > correctly but I can't process what I read. > > This also feels like I'm blurring SimpleConsumer theory into this, but > except for the exception/shutdown case High Level Consumer does everything > I want. > > > Thanks, > > Chris >