Hi Paul, This will work if you cares about not losing any non-processed messages, but not about messages-being-processed twice.
Say you have a failure after process(...) and before it.next(), on recovery the same message will be processed again. I agree that the cross-thread commit() in the 0.8 consumer makes its usage a bit tricky. In fact, in 0.9 we are re-writing our consumer client with as single-threaded client, and hence its commit() call will only commit offsets that has ever been consumed by this thread itself so far. Please take a look at our current design https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design and the API java doc http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/ if you are interested. Any comments are more than welcome. Guozhang On Mon, Apr 14, 2014 at 2:29 PM, Paul Mogren <pmog...@commercehub.com>wrote: > Hello, > > I am excited to be trying out Kafka. It sounds like everything I ever > wanted in a messaging system, but didn't have. > > I am interested in using the High-Level Consumer without losing > (consuming) messages that were read from the broker, but not processed by > user code (exception thrown, database down, unclean consumer shutdown, > whatever). I have read everything I can find on the subject, and the > official advice has been to disable auto-commit of the offset, and commit > explicitly at a suitable time. This is fair, but not entirely trivial: it > applies across all threads using the consumer, and it is not clear what > action, if any, should be taken in regards to rebalancing. A good reference > discussion can be found at > http://mail-archives.apache.org/mod_mbox/kafka-users/201310.mbox/%3CCAA+BczSRjgR=lffnl+ihsxx6p6snovwqjfhrebuldwwdom+...@mail.gmail.com%3E > > I also understand that there are a few improvements in the works. I > already went ahead and voted for them in Jira. > > What never seems to be discussed publicly is the fact that there IS a > peek() method on ConsumerIterator, and it appears to be just the ticket. Is > there anything wrong with doing something like this? > > while (it.hasNext()) { > MessageAndMetadata<byte[], byte[]> messageAndMetadata = > it.peek(); > try { > process(messageAndMetadata); > it.next(); // processed successfully, now consume it > } catch { ... } > } > > In case it matters, I'm dealing with a low message rate, and tracking the > latest official release. > > Thank you, > Paul > -- -- Guozhang