I think duplicate message is the right behavior for both patterns
iter.next(); process(message) ; CRASH; consumer.commit();
iter.peek();process(message) ; CRASH; iter.next(); CRASH; consumer.commit();

The only diff is fewer lines of code for the first pattern.
Jagbir

> Date: Mon, 23 Jun 2014 13:49:26 -0700
> Subject: Re: Reliable Message Commits
> From: wangg...@gmail.com
> To: users@kafka.apache.org
> 
> If there is a crash between
> 
> process(message);
> 
> and
> 
> iter.next(); //consumes the message now
> 
> you will still have duplicates upon restart.
> 
> Guozhang
> 
> 
> On Mon, Jun 23, 2014 at 12:37 PM, Achanta Vamsi Subhash <
> achanta.va...@flipkart.com> wrote:
> 
> > What about ths pattern:
> >
> > message = iter.peek(); //gets the message
> > process(message);
> > iter.next(); //consumes the message now
> > consumer.commit();
> >
> >
> >
> > On Fri, Jun 20, 2014 at 11:53 AM, Kyle Banker <kyleban...@gmail.com>
> > wrote:
> >
> > > Thanks for the advice, Guozhang.
> > >
> > > Jagbir: I'll report back on my progress. I intend to have quite a few
> > > threads across many machines. We'll see how well it performs with a whole
> > > high-level consumer per thread.
> > >
> > >
> > > On Thu, Jun 19, 2014 at 9:30 AM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Hello Kyle,
> > > >
> > > > For your first question, the first option would be preferable: it may
> > use
> > > > little bit more memory, and have more ZK writes. In 0.9 though, the
> > > offsets
> > > > will be stored in Kafka servers instead of ZK, so you will no longer
> > > > bombard ZK.
> > > >
> > > > For the third question, our designed usage pattern for manual commits
> > > would
> > > > be:
> > > >
> > > > message = iter.next();
> > > > process(message)
> > > > consumer.commit();
> > > >
> > > > Thus if one crashes between process(message) and consumer.commit(), you
> > > do
> > > > incur duplicates; but you will not get any data loss in this case. If
> > you
> > > > are more tolerable to data loss than duplicates, you can do:
> > > >
> > > > message = iter.next();
> > > > consumer.commit();
> > > > process(message)
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Wed, Jun 18, 2014 at 9:43 PM, Jagbir <jsho...@hotmail.com> wrote:
> > > >
> > > > > Hi Kyle,
> > > > >
> > > > > Thanks for the update.  Wondering if you found answer to your N-1
> > > commit
> > > > > question? If auto commit happens only at iterator.next () and onky
> > for
> > > > the
> > > > > N -1 message then client code can be much simpler and reliable as you
> > > > > mentioned. I'm also looking forward to any post in this regard.
> > > > >
> > > > > Jagbir
> > > > >
> > > > > On June 18, 2014 3:17:25 PM PDT, Kyle Banker <kyleban...@gmail.com>
> > > > wrote:
> > > > > >I think I've discovered the answer to my second question: according
> > to
> > > > > >the
> > > > > >code in ZookeeperConsumerConnector.scala, a rebalance derives its
> > > > > >offsets
> > > > > >from what's already in Zookeeper. Therefore, uncommitted but
> > consumed
> > > > > >messages from a given partition will be replayed when the partition
> > is
> > > > > >reassigned.
> > > > > >
> > > > > >
> > > > > >On Fri, Jun 13, 2014 at 3:01 PM, Kyle Banker <kyleban...@gmail.com>
> > > > > >wrote:
> > > > > >
> > > > > >> I'm using Kafka 0.8.1.1.
> > > > > >>
> > > > > >> I have a simple goal: use the high-level consumer to consume a
> > > > > >message
> > > > > >> from Kafka, publish the message to a different system, and then
> > > > > >commit the
> > > > > >> message in Kafka. Based on my reading of the docs and the mailing
> > > > > >list, it
> > > > > >> seems like this isn't so easy to achieve. Here is my current
> > > > > >understanding:
> > > > > >>
> > > > > >> First, I have to disable auto-commit. If the consumer
> > automatically
> > > > > >> commits, then I may lose messages if, for example, my process dies
> > > > > >after
> > > > > >> consuming but before publishing my message.
> > > > > >>
> > > > > >> Next, if my app is multi-threaded, I need to either
> > > > > >>
> > > > > >> a) use a separate consumer per thread (memory-intensive, hard on
> > > > > >> Zookeeper) or
> > > > > >> b) use a single consumer and assign a KafkaStream to each thread.
> > > > > >Then,
> > > > > >> when I want to commit, first synchronize all threads using a
> > > barrier.
> > > > > >>
> > > > > >> First question: is this correct so far?
> > > > > >>
> > > > > >>
> > > > > >> Still, it appears that rebalancing may be a problem. In
> > particular,
> > > > > >this
> > > > > >> sequence of events:
> > > > > >>
> > > > > >> 1. I'm consuming from a stream tied to two partitions, A and B.
> > > > > >> 2. I consume a message, M, from partition A.
> > > > > >> 3. Partition A gets assigned to a different consumer.
> > > > > >> 4. I choose not to commit M or my process fails.
> > > > > >>
> > > > > >> Second question: When the partition is reassigned, will the
> > message
> > > > > >that I
> > > > > >> consumed be automatically committed? If so, then there's no way to
> > > > > >get the
> > > > > >> reliability I want.
> > > > > >>
> > > > > >>
> > > > > >> Third question: How do the folks at LinkedIn handle this overall
> > use
> > > > > >case?
> > > > > >> What about other users?
> > > > > >>
> > > > > >> It seems to me that a lot of the complexity here could be easily
> > > > > >addressed
> > > > > >> by changing the way in which a partition's message pointer is
> > > > > >advanced.
> > > > > >> That is, when I consume message M, advance the pointer to message
> > (M
> > > > > >- 1)
> > > > > >> rather than to M. In other words, calling iterator.next() would
> > > imply
> > > > > >that
> > > > > >> the previously consumed message may be safely committed. If this
> > > were
> > > > > >the
> > > > > >> case, I could simply enable auto-commit and be happy.
> > > > > >>
> > > > >
> > > > > --
> > > > > Sent from my Android phone with K-9 Mail. Please excuse my brevity.
> > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > Regards
> > Vamsi Subhash
> >
> 
> 
> 
> -- 
> -- Guozhang
                                          

Reply via email to