1) Yes. And worst case is that you may lose 1 message, but you may have more than one duplicates per partition since the commit thread will call CommitOffset periodically.
2). I think locking is also fine, since it will just lift the thread waiting on the lock, which is better than spin-lock since that still takes CPU. Guozhang On Thu, Nov 21, 2013 at 4:55 PM, Edward Capriolo <edlinuxg...@gmail.com>wrote: > You likely need to use a custom offset solution if you plan on committing > every message. With many partitions this puts a large burden on Zookeeper, > you end up needing to roll over your zk transaction logs fast as well or > rist filling up disk > > > On Thu, Nov 21, 2013 at 6:20 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Imran, > > > > The offset will only be updated when the next() function is called: > > > > override def next(): MessageAndMetadata[K, V] = { > > ... > > *currentTopicInfo.resetConsumeOffset(consumedOffset)* > > ... > > item > > } > > > > instead of in makeNext(), which will just update consumedOffset, but that > > is not the value that will be committed using the commitOffset call. So > as > > long as you turn of auto commit and only call commitOffset after the > > process(msg) call, not after the > > > > b = iter.next() > > > > is called, at-least-once is guaranteed. > > > > Does that make sense? > > > > Guozhang > > > > > > On Thu, Nov 21, 2013 at 2:14 PM, Imran Rashid <im...@therashids.com> > > wrote: > > > > > sorry to keep bugging the list, but I feel like I am either missing > > > something important, or I'm finding something wrong w/ the standard > > > consumer api, (or maybe just the docs need some clarification). > > > > > > I started to think that I should probably just accept at least once > > > semantics ... but I eventually realized that I'm not even sure we > > > really get an at least once guarantee. I think it really might be > > > zero-or-more. Or rather, messages will get pulled off the kafka queue > > > at least once. but that doesn't mean your app will actually *process* > > > those messages at least once -- there might be messages it never > > > processes. > > > > > > Consider a really basic reader of a kafka queue: > > > > > > while(it.hasNext()){ > > > val msg = it.next() > > > doSomething(msg) > > > } > > > > > > the question is, do I have any guarantees on how many times > > > doSomething() is called on everything in the queue? I think the > > > "guarantee" is: > > > 1) most messages will get processed excatly once > > > 2) around a restart, a chunk of msgs will get processed at least once, > > > but probably more than once > > > 3) around a restart, it is possible that one message will get > > > processed ZERO times > > > > > > (1) & (2) are probably clear, so lemme explain how I think (3) could > > > happen. Lets imagine messages a,b,c,... and two threads, one reading > > > from the stream, and one thread that periodically commits the offsets. > > > Imagine this sequence of events: > > > > > > > > > ==Reader== > > > -initializes w/ offset pointing to "a" > > > > > > -hasNext() > > > ---> makeNext() will read "a" > > > and update the local offset to "b" > > > > > > -msg = "a" > > > > > > -doSomething("a") > > > > > > -hasNext() > > > ----> makeNext() will read "b" > > > and update the local offset "c" > > > > > > ==Commiter== > > > > > > -commitOffsets stores the current offset as "c" > > > > > > > > > > > > =====PROCESS DIES===== > > > ===== RESTARTS ===== > > > > > > ==Reader== > > > -initializes w/ offset pointing to "c" > > > > > > -hasNext() > > > --> makeNext() will read "c" > > > and update local offset to "d" > > > -msg = "c" > > > -doSomething("c") > > > ... > > > > > > > > > > > > note that in this scenario, doSomething("b") was never called. > > > Probably for a lot of applications this doesn't matter. But seems > > > like it this could be terrible for some apps. I can't think of any > > > way of preventing it from user code. unless, maybe when the offsets > > > get committed, it is always *before* the last thing read? eg., in my > > > example, it would store the next offset as "b" or earlier? > > > > > > Is there a flaw in my logic? Do committed offsets always "undershoot" > > > to prevent this? > > > > > > thanks, > > > Imran > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang