yes, that makes sense, thank you very much! phew, I was worried there ... so, is it fair to say that
1) if you use auto-commit, it is possible for a msg to get completely skipped by your app on restarts? (worst case, one per partition) 2) if you are committing offsets from some thread other than the worker (which is almost certainly the case), than you need to ensure that a commit is not in-progress between hasNext() and next()? eg. while(it.hasNext()) { someMagicMethodThatEnsuresNoCommit { val msg = it.next() doSomething(msg) } } and given that the while is a tight loop, but that commits are rare, you'd want "someMagicMethodThatEnsuresNoCommit" to avoid locking, eg. perhaps a spin-lock around an AtomicInt that gets incremented on every commit? need to think a bit more about the details ... On Thu, Nov 21, 2013 at 5:20 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Imran, > > The offset will only be updated when the next() function is called: > > override def next(): MessageAndMetadata[K, V] = { > ... > *currentTopicInfo.resetConsumeOffset(consumedOffset)* > ... > item > } > > instead of in makeNext(), which will just update consumedOffset, but that > is not the value that will be committed using the commitOffset call. So as > long as you turn of auto commit and only call commitOffset after the > process(msg) call, not after the > > b = iter.next() > > is called, at-least-once is guaranteed. > > Does that make sense? > > Guozhang > > > On Thu, Nov 21, 2013 at 2:14 PM, Imran Rashid <im...@therashids.com> wrote: > >> sorry to keep bugging the list, but I feel like I am either missing >> something important, or I'm finding something wrong w/ the standard >> consumer api, (or maybe just the docs need some clarification). >> >> I started to think that I should probably just accept at least once >> semantics ... but I eventually realized that I'm not even sure we >> really get an at least once guarantee. I think it really might be >> zero-or-more. Or rather, messages will get pulled off the kafka queue >> at least once. but that doesn't mean your app will actually *process* >> those messages at least once -- there might be messages it never >> processes. >> >> Consider a really basic reader of a kafka queue: >> >> while(it.hasNext()){ >> val msg = it.next() >> doSomething(msg) >> } >> >> the question is, do I have any guarantees on how many times >> doSomething() is called on everything in the queue? I think the >> "guarantee" is: >> 1) most messages will get processed excatly once >> 2) around a restart, a chunk of msgs will get processed at least once, >> but probably more than once >> 3) around a restart, it is possible that one message will get >> processed ZERO times >> >> (1) & (2) are probably clear, so lemme explain how I think (3) could >> happen. Lets imagine messages a,b,c,... and two threads, one reading >> from the stream, and one thread that periodically commits the offsets. >> Imagine this sequence of events: >> >> >> ==Reader== >> -initializes w/ offset pointing to "a" >> >> -hasNext() >> ---> makeNext() will read "a" >> and update the local offset to "b" >> >> -msg = "a" >> >> -doSomething("a") >> >> -hasNext() >> ----> makeNext() will read "b" >> and update the local offset "c" >> >> ==Commiter== >> >> -commitOffsets stores the current offset as "c" >> >> >> >> =====PROCESS DIES===== >> ===== RESTARTS ===== >> >> ==Reader== >> -initializes w/ offset pointing to "c" >> >> -hasNext() >> --> makeNext() will read "c" >> and update local offset to "d" >> -msg = "c" >> -doSomething("c") >> ... >> >> >> >> note that in this scenario, doSomething("b") was never called. >> Probably for a lot of applications this doesn't matter. But seems >> like it this could be terrible for some apps. I can't think of any >> way of preventing it from user code. unless, maybe when the offsets >> get committed, it is always *before* the last thing read? eg., in my >> example, it would store the next offset as "b" or earlier? >> >> Is there a flaw in my logic? Do committed offsets always "undershoot" >> to prevent this? >> >> thanks, >> Imran >> > > > > -- > -- Guozhang