Yes, this is an issue and has been fixed in 0.8. Thanks,
Jun On Wed, Aug 14, 2013 at 5:21 PM, Ian Friedman <i...@flurry.com> wrote: > Hey guys, > > I designed my consumer app (running on 0.7) to run with autocommit off and > commit manually once it was done processing a record. The intent was so > that if a consumer died while processing a message, the offset would not be > committed, and another box would pick up the partition and reprocess the > message. This seemed to work fine with small numbers of consumers (~10). > But now that I'm scaling it out, I'm running into a problem where it looks > like messages that consumers picked up and then errored on are not getting > processed on another machine. > > After investigating the logs and the partition offsets in zookeeper, I > found that in ZookeeperConsumerConnector.scala closeFetchersForQueues, > called during the rebalance process, will commit the offset regardless of > the autocommit status. So it looks like even if my consumer is in the > middle of processing a message, the offset will be committed, and even if > the processing fails, it will never be picked up again. Now that I have a > lot of consumer nodes, the rebalancer is going off a lot more often and I'm > running into this constantly. > > Were my assumptions faulty? Did I design this wrong? After reading the > comment in the code I understand that if it didn't commit the offset there, > the message would just get immediately consumed by whoever ended up owning > the partition, even if we were in the middle of consuming it elsewhere, and > we'd get unintentional duplicate delivery. How can I make it work the way > I've described? Is there any way? > > Thanks in advance, > > -- > Ian Friedman > >