why not just disable autocommit and only call commit offsets() after you've processed a batch? it isn't obvious to me how doing so would allow a message to be processed zero times. On Nov 21, 2013 5:52 PM, "Imran Rashid" <im...@therashids.com> wrote:
> Hi Edward, > > I think you misunderstand ... I definitely do *not* want to commit > every message. That would be much too expensive, every few minutes is > plenty for me. > > I want to guarantee that if I commit a message, that my message has > been processed by *my application* at least once. (In another thread > I mentioned something about reading exactly once, but that is not what > I am going for here -- more than once is OK, as long as its at least > once.) That requires me to make sure that a commit never happens > between a call to next() and doSomething(). Kafka guarantees that > messages get read off the queue at least once, but as I outlined > above, a naive solution allows for messages to get read off the queue, > but not make it into my app. > > Not only do I not want to commit every message -- I'd really like to > have the above guarantee without even acquiring a lock (most of the > time). That is what I was getting at with the comment about the > spin-lock, just as an idea of how to prevent a commit between next() > and doSomething(). I dunno if the spin-lock was really the right > idea, that was just a random thought, but the point is, we want some > check that should be very cheap for 99.999% of the time when a commit > isn't happening, but can still guarantee proper ordering during that > 0.001% of the time when a commit is happening. otherwise, messages > might never make it to my app. (And this is all just to prevent a > message getting lost during the 10^-6% of the time when a commit might > happen between next() and doSomething(), and the app dies before > doSomething completes!) > > Actually after a bit more thought -- the best way to guarantee this > would be with a small api change, that would do away with the need for > locks completely. The iterator should have a method applyToNext: > > def applyToNext(f: MessageAndData[K,V] => Unit) { > //all the stuff in next, *except* > currentTopicInfo.resetConsumeOffset(consumedOffset) > val item = ... > f(item) > //after we've applied the users function, now we can update the > offset that should get committed > currentTopicInfo.resetConsumeOffset(consumedOffset) > } > > I think this way, you could avoid any problems even with auto-commit. > Or, if you don't want to add a new method, so we can stick to the > Iterator api, then maybe the iterator could let you register a > preCommitFunction, so next() would change to: > > override def next(): MessageAndMetadata[K, V] = { > ... > val item = ... > preCommitFuntions.foreach{f => f(item)} > *currentTopicInfo.resetConsumeOffset(consumedOffset)* > ... > item > } > > > thanks, > Imran > > On Thu, Nov 21, 2013 at 6:55 PM, Edward Capriolo <edlinuxg...@gmail.com> > wrote: > > You likely need to use a custom offset solution if you plan on committing > > every message. With many partitions this puts a large burden on > Zookeeper, > > you end up needing to roll over your zk transaction logs fast as well or > > rist filling up disk > > > > > > On Thu, Nov 21, 2013 at 6:20 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > >> Hello Imran, > >> > >> The offset will only be updated when the next() function is called: > >> > >> override def next(): MessageAndMetadata[K, V] = { > >> ... > >> *currentTopicInfo.resetConsumeOffset(consumedOffset)* > >> ... > >> item > >> } > >> > >> instead of in makeNext(), which will just update consumedOffset, but > that > >> is not the value that will be committed using the commitOffset call. So > as > >> long as you turn of auto commit and only call commitOffset after the > >> process(msg) call, not after the > >> > >> b = iter.next() > >> > >> is called, at-least-once is guaranteed. > >> > >> Does that make sense? > >> > >> Guozhang > >> > >> > >> On Thu, Nov 21, 2013 at 2:14 PM, Imran Rashid <im...@therashids.com> > >> wrote: > >> > >> > sorry to keep bugging the list, but I feel like I am either missing > >> > something important, or I'm finding something wrong w/ the standard > >> > consumer api, (or maybe just the docs need some clarification). > >> > > >> > I started to think that I should probably just accept at least once > >> > semantics ... but I eventually realized that I'm not even sure we > >> > really get an at least once guarantee. I think it really might be > >> > zero-or-more. Or rather, messages will get pulled off the kafka queue > >> > at least once. but that doesn't mean your app will actually *process* > >> > those messages at least once -- there might be messages it never > >> > processes. > >> > > >> > Consider a really basic reader of a kafka queue: > >> > > >> > while(it.hasNext()){ > >> > val msg = it.next() > >> > doSomething(msg) > >> > } > >> > > >> > the question is, do I have any guarantees on how many times > >> > doSomething() is called on everything in the queue? I think the > >> > "guarantee" is: > >> > 1) most messages will get processed excatly once > >> > 2) around a restart, a chunk of msgs will get processed at least once, > >> > but probably more than once > >> > 3) around a restart, it is possible that one message will get > >> > processed ZERO times > >> > > >> > (1) & (2) are probably clear, so lemme explain how I think (3) could > >> > happen. Lets imagine messages a,b,c,... and two threads, one reading > >> > from the stream, and one thread that periodically commits the offsets. > >> > Imagine this sequence of events: > >> > > >> > > >> > ==Reader== > >> > -initializes w/ offset pointing to "a" > >> > > >> > -hasNext() > >> > ---> makeNext() will read "a" > >> > and update the local offset to "b" > >> > > >> > -msg = "a" > >> > > >> > -doSomething("a") > >> > > >> > -hasNext() > >> > ----> makeNext() will read "b" > >> > and update the local offset "c" > >> > > >> > ==Commiter== > >> > > >> > -commitOffsets stores the current offset as "c" > >> > > >> > > >> > > >> > =====PROCESS DIES===== > >> > ===== RESTARTS ===== > >> > > >> > ==Reader== > >> > -initializes w/ offset pointing to "c" > >> > > >> > -hasNext() > >> > --> makeNext() will read "c" > >> > and update local offset to "d" > >> > -msg = "c" > >> > -doSomething("c") > >> > ... > >> > > >> > > >> > > >> > note that in this scenario, doSomething("b") was never called. > >> > Probably for a lot of applications this doesn't matter. But seems > >> > like it this could be terrible for some apps. I can't think of any > >> > way of preventing it from user code. unless, maybe when the offsets > >> > get committed, it is always *before* the last thing read? eg., in my > >> > example, it would store the next offset as "b" or earlier? > >> > > >> > Is there a flaw in my logic? Do committed offsets always "undershoot" > >> > to prevent this? > >> > > >> > thanks, > >> > Imran > >> > > >> > >> > >> > >> -- > >> -- Guozhang > >> >