You likely need to use a custom offset solution if you plan on committing every message. With many partitions this puts a large burden on Zookeeper, you end up needing to roll over your zk transaction logs fast as well or rist filling up disk
On Thu, Nov 21, 2013 at 6:20 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Imran, > > The offset will only be updated when the next() function is called: > > override def next(): MessageAndMetadata[K, V] = { > ... > *currentTopicInfo.resetConsumeOffset(consumedOffset)* > ... > item > } > > instead of in makeNext(), which will just update consumedOffset, but that > is not the value that will be committed using the commitOffset call. So as > long as you turn of auto commit and only call commitOffset after the > process(msg) call, not after the > > b = iter.next() > > is called, at-least-once is guaranteed. > > Does that make sense? > > Guozhang > > > On Thu, Nov 21, 2013 at 2:14 PM, Imran Rashid <im...@therashids.com> > wrote: > > > sorry to keep bugging the list, but I feel like I am either missing > > something important, or I'm finding something wrong w/ the standard > > consumer api, (or maybe just the docs need some clarification). > > > > I started to think that I should probably just accept at least once > > semantics ... but I eventually realized that I'm not even sure we > > really get an at least once guarantee. I think it really might be > > zero-or-more. Or rather, messages will get pulled off the kafka queue > > at least once. but that doesn't mean your app will actually *process* > > those messages at least once -- there might be messages it never > > processes. > > > > Consider a really basic reader of a kafka queue: > > > > while(it.hasNext()){ > > val msg = it.next() > > doSomething(msg) > > } > > > > the question is, do I have any guarantees on how many times > > doSomething() is called on everything in the queue? I think the > > "guarantee" is: > > 1) most messages will get processed excatly once > > 2) around a restart, a chunk of msgs will get processed at least once, > > but probably more than once > > 3) around a restart, it is possible that one message will get > > processed ZERO times > > > > (1) & (2) are probably clear, so lemme explain how I think (3) could > > happen. Lets imagine messages a,b,c,... and two threads, one reading > > from the stream, and one thread that periodically commits the offsets. > > Imagine this sequence of events: > > > > > > ==Reader== > > -initializes w/ offset pointing to "a" > > > > -hasNext() > > ---> makeNext() will read "a" > > and update the local offset to "b" > > > > -msg = "a" > > > > -doSomething("a") > > > > -hasNext() > > ----> makeNext() will read "b" > > and update the local offset "c" > > > > ==Commiter== > > > > -commitOffsets stores the current offset as "c" > > > > > > > > =====PROCESS DIES===== > > ===== RESTARTS ===== > > > > ==Reader== > > -initializes w/ offset pointing to "c" > > > > -hasNext() > > --> makeNext() will read "c" > > and update local offset to "d" > > -msg = "c" > > -doSomething("c") > > ... > > > > > > > > note that in this scenario, doSomething("b") was never called. > > Probably for a lot of applications this doesn't matter. But seems > > like it this could be terrible for some apps. I can't think of any > > way of preventing it from user code. unless, maybe when the offsets > > get committed, it is always *before* the last thing read? eg., in my > > example, it would store the next offset as "b" or earlier? > > > > Is there a flaw in my logic? Do committed offsets always "undershoot" > > to prevent this? > > > > thanks, > > Imran > > > > > > -- > -- Guozhang >