Hi Edward, I think you misunderstand ... I definitely do *not* want to commit every message. That would be much too expensive, every few minutes is plenty for me.
I want to guarantee that if I commit a message, that my message has been processed by *my application* at least once. (In another thread I mentioned something about reading exactly once, but that is not what I am going for here -- more than once is OK, as long as its at least once.) That requires me to make sure that a commit never happens between a call to next() and doSomething(). Kafka guarantees that messages get read off the queue at least once, but as I outlined above, a naive solution allows for messages to get read off the queue, but not make it into my app. Not only do I not want to commit every message -- I'd really like to have the above guarantee without even acquiring a lock (most of the time). That is what I was getting at with the comment about the spin-lock, just as an idea of how to prevent a commit between next() and doSomething(). I dunno if the spin-lock was really the right idea, that was just a random thought, but the point is, we want some check that should be very cheap for 99.999% of the time when a commit isn't happening, but can still guarantee proper ordering during that 0.001% of the time when a commit is happening. otherwise, messages might never make it to my app. (And this is all just to prevent a message getting lost during the 10^-6% of the time when a commit might happen between next() and doSomething(), and the app dies before doSomething completes!) Actually after a bit more thought -- the best way to guarantee this would be with a small api change, that would do away with the need for locks completely. The iterator should have a method applyToNext: def applyToNext(f: MessageAndData[K,V] => Unit) { //all the stuff in next, *except* currentTopicInfo.resetConsumeOffset(consumedOffset) val item = ... f(item) //after we've applied the users function, now we can update the offset that should get committed currentTopicInfo.resetConsumeOffset(consumedOffset) } I think this way, you could avoid any problems even with auto-commit. Or, if you don't want to add a new method, so we can stick to the Iterator api, then maybe the iterator could let you register a preCommitFunction, so next() would change to: override def next(): MessageAndMetadata[K, V] = { ... val item = ... preCommitFuntions.foreach{f => f(item)} *currentTopicInfo.resetConsumeOffset(consumedOffset)* ... item } thanks, Imran On Thu, Nov 21, 2013 at 6:55 PM, Edward Capriolo <edlinuxg...@gmail.com> wrote: > You likely need to use a custom offset solution if you plan on committing > every message. With many partitions this puts a large burden on Zookeeper, > you end up needing to roll over your zk transaction logs fast as well or > rist filling up disk > > > On Thu, Nov 21, 2013 at 6:20 PM, Guozhang Wang <wangg...@gmail.com> wrote: > >> Hello Imran, >> >> The offset will only be updated when the next() function is called: >> >> override def next(): MessageAndMetadata[K, V] = { >> ... >> *currentTopicInfo.resetConsumeOffset(consumedOffset)* >> ... >> item >> } >> >> instead of in makeNext(), which will just update consumedOffset, but that >> is not the value that will be committed using the commitOffset call. So as >> long as you turn of auto commit and only call commitOffset after the >> process(msg) call, not after the >> >> b = iter.next() >> >> is called, at-least-once is guaranteed. >> >> Does that make sense? >> >> Guozhang >> >> >> On Thu, Nov 21, 2013 at 2:14 PM, Imran Rashid <im...@therashids.com> >> wrote: >> >> > sorry to keep bugging the list, but I feel like I am either missing >> > something important, or I'm finding something wrong w/ the standard >> > consumer api, (or maybe just the docs need some clarification). >> > >> > I started to think that I should probably just accept at least once >> > semantics ... but I eventually realized that I'm not even sure we >> > really get an at least once guarantee. I think it really might be >> > zero-or-more. Or rather, messages will get pulled off the kafka queue >> > at least once. but that doesn't mean your app will actually *process* >> > those messages at least once -- there might be messages it never >> > processes. >> > >> > Consider a really basic reader of a kafka queue: >> > >> > while(it.hasNext()){ >> > val msg = it.next() >> > doSomething(msg) >> > } >> > >> > the question is, do I have any guarantees on how many times >> > doSomething() is called on everything in the queue? I think the >> > "guarantee" is: >> > 1) most messages will get processed excatly once >> > 2) around a restart, a chunk of msgs will get processed at least once, >> > but probably more than once >> > 3) around a restart, it is possible that one message will get >> > processed ZERO times >> > >> > (1) & (2) are probably clear, so lemme explain how I think (3) could >> > happen. Lets imagine messages a,b,c,... and two threads, one reading >> > from the stream, and one thread that periodically commits the offsets. >> > Imagine this sequence of events: >> > >> > >> > ==Reader== >> > -initializes w/ offset pointing to "a" >> > >> > -hasNext() >> > ---> makeNext() will read "a" >> > and update the local offset to "b" >> > >> > -msg = "a" >> > >> > -doSomething("a") >> > >> > -hasNext() >> > ----> makeNext() will read "b" >> > and update the local offset "c" >> > >> > ==Commiter== >> > >> > -commitOffsets stores the current offset as "c" >> > >> > >> > >> > =====PROCESS DIES===== >> > ===== RESTARTS ===== >> > >> > ==Reader== >> > -initializes w/ offset pointing to "c" >> > >> > -hasNext() >> > --> makeNext() will read "c" >> > and update local offset to "d" >> > -msg = "c" >> > -doSomething("c") >> > ... >> > >> > >> > >> > note that in this scenario, doSomething("b") was never called. >> > Probably for a lot of applications this doesn't matter. But seems >> > like it this could be terrible for some apps. I can't think of any >> > way of preventing it from user code. unless, maybe when the offsets >> > get committed, it is always *before* the last thing read? eg., in my >> > example, it would store the next offset as "b" or earlier? >> > >> > Is there a flaw in my logic? Do committed offsets always "undershoot" >> > to prevent this? >> > >> > thanks, >> > Imran >> > >> >> >> >> -- >> -- Guozhang >>