Hi Paul,

This will work if you cares about not losing any non-processed messages,
but not about messages-being-processed twice.

Say you have a failure after process(...) and before it.next(), on recovery
the same message will be processed again.

I agree that the cross-thread commit() in the 0.8 consumer makes its usage
a bit tricky. In fact, in 0.9 we are re-writing our consumer client with as
single-threaded client, and hence its commit() call will only commit
offsets that has ever been consumed by this thread itself so far. Please
take a look at our current design

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design

and the API java doc

http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/

if you are interested. Any comments are more than welcome.

Guozhang


On Mon, Apr 14, 2014 at 2:29 PM, Paul Mogren <pmog...@commercehub.com>wrote:

> Hello,
>
> I am excited to be trying out Kafka. It sounds like everything I ever
> wanted in a messaging system, but didn't have.
>
> I am interested in using the High-Level Consumer without losing
> (consuming) messages that were read from the broker, but not processed by
> user code (exception thrown, database down, unclean consumer shutdown,
> whatever). I have read everything I can find on the subject, and the
> official advice has been to disable auto-commit of the offset, and commit
> explicitly at a suitable time. This is fair, but not entirely trivial: it
> applies across all threads using the consumer, and it is not clear what
> action, if any, should be taken in regards to rebalancing. A good reference
> discussion can be found at
> http://mail-archives.apache.org/mod_mbox/kafka-users/201310.mbox/%3CCAA+BczSRjgR=lffnl+ihsxx6p6snovwqjfhrebuldwwdom+...@mail.gmail.com%3E
>
> I also understand that there are a few improvements in the works. I
> already went ahead and voted for them in Jira.
>
> What never seems to be discussed publicly is the fact that there IS a
> peek() method on ConsumerIterator, and it appears to be just the ticket. Is
> there anything wrong with doing something like this?
>
>         while (it.hasNext()) {
>             MessageAndMetadata<byte[], byte[]> messageAndMetadata =
> it.peek();
>             try {
>                 process(messageAndMetadata);
>                 it.next();  // processed successfully, now consume it
>             } catch { ... }
>         }
>
> In case it matters, I'm dealing with a low message rate, and tracking the
> latest official release.
>
> Thank you,
> Paul
>



-- 
-- Guozhang

Reply via email to