Again, Remember that if you have multiple threads processing, then it means they are consuming from different partitions, so the offset of one partition will not be relevant to another. So, you really wouldn't have a use case to be committing specific offsets async. Furthermore, if your consumer is using a filter to select multiple topics, then even a single thread is likely going to be consuming multiple topics, so again, a single offset commit wouldn't make much sense.
Jason On Fri, Nov 22, 2013 at 1:30 AM, Imran Rashid <im...@therashids.com> wrote: > Hi Jason, > > thank you so much! > > I was missing the consumer.timeout.ms property. In fact I had > actually written a full solution using cyclic barriers to make sure > all my readers had stopped consuming ... but threw it out b/c I > thought I couldn't deal w/ iterators blocked in hasNext(). That > property is just what I was missing, I never would have found it w/out > your help. > > Its also great to get some confirmation that this is a a real issue -- > I feel like this should be mentioned in the docs somewhere and ideally > hidden behind a clean api. > > I completely agree I can do this w/ the synchronous "shouldCommit" > given the current api. But I'm not sure that is the "right" way. If I > just had a one-arg commitOffsets(someOffsets) method, then I think > there is a way to do this async that is both cleaner and should have > higher throughput (no need to have a full barrier that stops all the > workers simultaneously). I've been getting influenced by the akka way > of thinking. I think I will be able to share some example code > tomorrow, maybe you and others can take a look and see if you think > its an improvement or not. > > thanks again! > imran > > > On Thu, Nov 21, 2013 at 9:54 PM, Jason Rosenberg <j...@squareup.com> wrote: > > Hi Imran, > > > > The thing to do, is not have an asynchronous background thread for > > committing. Instead have a time based "shouldCommit()" function, and > > commit periodically, synchronously. > > > > If you have a bursty producer, then you can set a timeout ( > > consumer.timeout.ms), so that your iter.hasNext() call will periodically > > wake up and throw a ConsumerTimeoutException. You can catch this > exception > > silently, and check your "shouldCommit()" predicate. > > > > If you have multiple threads, you can use cyclic barrier, or some such, > > make sure all threads have paused before doing the synchronous commit. > In > > this case, you should set your timeout pretty low, say 100ms, so that > > threads with work to do aren't waiting for quiescent threads to wake up > and > > timeout already. > > > > I have a configuration like this. I use a default commit cycle of 60 > > seconds, and use 100ms timeout on the consumer iterator. > > > > The auto-commit mode built in has the fatal flaw, as you have pointed > out, > > of possibly dropping a single message on a restart. > > > > Jason > > > > > > On Thu, Nov 21, 2013 at 9:45 PM, Imran Rashid <im...@therashids.com> > wrote: > > > >> HI Benjamin, > >> > >> great question, I thought that for a while too. I think that won't > >> work, in my use case, but I could certainly use somebody else checking > >> my logic. > >> > >> I think that would work, *if* I was willing to allow arbitrary time to > >> pass between my commits. Then I could call commitOffsets within the > >> while loop like so: > >> while(itr.hasNext()) { > >> val msg = itr.next() > >> doSomething(msg) > >> if(shouldICommit) { > >> consumer.commitOffsets > >> } > >> } > >> > >> (this is what you meant, right?) > >> > >> however, its.hasNext() might block an arbitrary amount of time. The > >> producers for my queues are very bursty -- they will create a ton of > >> messages, and then go silent for a long time (perhaps days). Since I > >> do want to ensure commits are happening regularly, that means its got > >> to be triggered by another thread (something that can't get blocked at > >> hasNext()). Which is why I've got make sure that trigger doesn't > >> happen between next() and doSomething(). > >> > >> a second point (much more minor) is that calling > >> consumer.commitOffsets commits the offsets for all partitions that > >> consumer is reading. Which means I've got to create a separate > >> consumer per thread. (But this is really minor, just a nuisance, > >> doesn't effect correctness.) > >> > >> You might say that even my first point is just a nuisance, and I just > >> need to live with it. Then (1) I'd just like to make sure my > >> understanding is correct -- I think this behavior is really not > >> obvious, so its worth clarifying and (2) I'd like to confirm that I > >> have a workaround and (3) if this problem does exist, and the > >> workaround is tricky, then maybe the api change I'm proposing would be > >> good idea? > >> > >> thanks for reading about this and helping me work through it! > >> > >> imran > >> > >> > >> On Thu, Nov 21, 2013 at 7:58 PM, Benjamin Black <b...@b3k.us> wrote: > >> > why not just disable autocommit and only call commit offsets() after > >> you've > >> > processed a batch? it isn't obvious to me how doing so would allow a > >> > message to be processed zero times. > >> > On Nov 21, 2013 5:52 PM, "Imran Rashid" <im...@therashids.com> wrote: > >> > > >> >> Hi Edward, > >> >> > >> >> I think you misunderstand ... I definitely do *not* want to commit > >> >> every message. That would be much too expensive, every few minutes > is > >> >> plenty for me. > >> >> > >> >> I want to guarantee that if I commit a message, that my message has > >> >> been processed by *my application* at least once. (In another thread > >> >> I mentioned something about reading exactly once, but that is not > what > >> >> I am going for here -- more than once is OK, as long as its at least > >> >> once.) That requires me to make sure that a commit never happens > >> >> between a call to next() and doSomething(). Kafka guarantees that > >> >> messages get read off the queue at least once, but as I outlined > >> >> above, a naive solution allows for messages to get read off the > queue, > >> >> but not make it into my app. > >> >> > >> >> Not only do I not want to commit every message -- I'd really like to > >> >> have the above guarantee without even acquiring a lock (most of the > >> >> time). That is what I was getting at with the comment about the > >> >> spin-lock, just as an idea of how to prevent a commit between next() > >> >> and doSomething(). I dunno if the spin-lock was really the right > >> >> idea, that was just a random thought, but the point is, we want some > >> >> check that should be very cheap for 99.999% of the time when a commit > >> >> isn't happening, but can still guarantee proper ordering during that > >> >> 0.001% of the time when a commit is happening. otherwise, messages > >> >> might never make it to my app. (And this is all just to prevent a > >> >> message getting lost during the 10^-6% of the time when a commit > might > >> >> happen between next() and doSomething(), and the app dies before > >> >> doSomething completes!) > >> >> > >> >> Actually after a bit more thought -- the best way to guarantee this > >> >> would be with a small api change, that would do away with the need > for > >> >> locks completely. The iterator should have a method applyToNext: > >> >> > >> >> def applyToNext(f: MessageAndData[K,V] => Unit) { > >> >> //all the stuff in next, *except* > >> >> currentTopicInfo.resetConsumeOffset(consumedOffset) > >> >> val item = ... > >> >> f(item) > >> >> //after we've applied the users function, now we can update the > >> >> offset that should get committed > >> >> currentTopicInfo.resetConsumeOffset(consumedOffset) > >> >> } > >> >> > >> >> I think this way, you could avoid any problems even with auto-commit. > >> >> Or, if you don't want to add a new method, so we can stick to the > >> >> Iterator api, then maybe the iterator could let you register a > >> >> preCommitFunction, so next() would change to: > >> >> > >> >> override def next(): MessageAndMetadata[K, V] = { > >> >> ... > >> >> val item = ... > >> >> preCommitFuntions.foreach{f => f(item)} > >> >> *currentTopicInfo.resetConsumeOffset(consumedOffset)* > >> >> ... > >> >> item > >> >> } > >> >> > >> >> > >> >> thanks, > >> >> Imran > >> >> > >> >> On Thu, Nov 21, 2013 at 6:55 PM, Edward Capriolo < > edlinuxg...@gmail.com > >> > > >> >> wrote: > >> >> > You likely need to use a custom offset solution if you plan on > >> committing > >> >> > every message. With many partitions this puts a large burden on > >> >> Zookeeper, > >> >> > you end up needing to roll over your zk transaction logs fast as > well > >> or > >> >> > rist filling up disk > >> >> > > >> >> > > >> >> > On Thu, Nov 21, 2013 at 6:20 PM, Guozhang Wang <wangg...@gmail.com > > > >> >> wrote: > >> >> > > >> >> >> Hello Imran, > >> >> >> > >> >> >> The offset will only be updated when the next() function is > called: > >> >> >> > >> >> >> override def next(): MessageAndMetadata[K, V] = { > >> >> >> ... > >> >> >> *currentTopicInfo.resetConsumeOffset(consumedOffset)* > >> >> >> ... > >> >> >> item > >> >> >> } > >> >> >> > >> >> >> instead of in makeNext(), which will just update consumedOffset, > but > >> >> that > >> >> >> is not the value that will be committed using the commitOffset > call. > >> So > >> >> as > >> >> >> long as you turn of auto commit and only call commitOffset after > the > >> >> >> process(msg) call, not after the > >> >> >> > >> >> >> b = iter.next() > >> >> >> > >> >> >> is called, at-least-once is guaranteed. > >> >> >> > >> >> >> Does that make sense? > >> >> >> > >> >> >> Guozhang > >> >> >> > >> >> >> > >> >> >> On Thu, Nov 21, 2013 at 2:14 PM, Imran Rashid < > im...@therashids.com> > >> >> >> wrote: > >> >> >> > >> >> >> > sorry to keep bugging the list, but I feel like I am either > missing > >> >> >> > something important, or I'm finding something wrong w/ the > standard > >> >> >> > consumer api, (or maybe just the docs need some clarification). > >> >> >> > > >> >> >> > I started to think that I should probably just accept at least > once > >> >> >> > semantics ... but I eventually realized that I'm not even sure > we > >> >> >> > really get an at least once guarantee. I think it really might > be > >> >> >> > zero-or-more. Or rather, messages will get pulled off the kafka > >> queue > >> >> >> > at least once. but that doesn't mean your app will actually > >> *process* > >> >> >> > those messages at least once -- there might be messages it never > >> >> >> > processes. > >> >> >> > > >> >> >> > Consider a really basic reader of a kafka queue: > >> >> >> > > >> >> >> > while(it.hasNext()){ > >> >> >> > val msg = it.next() > >> >> >> > doSomething(msg) > >> >> >> > } > >> >> >> > > >> >> >> > the question is, do I have any guarantees on how many times > >> >> >> > doSomething() is called on everything in the queue? I think the > >> >> >> > "guarantee" is: > >> >> >> > 1) most messages will get processed excatly once > >> >> >> > 2) around a restart, a chunk of msgs will get processed at least > >> once, > >> >> >> > but probably more than once > >> >> >> > 3) around a restart, it is possible that one message will get > >> >> >> > processed ZERO times > >> >> >> > > >> >> >> > (1) & (2) are probably clear, so lemme explain how I think (3) > >> could > >> >> >> > happen. Lets imagine messages a,b,c,... and two threads, one > >> reading > >> >> >> > from the stream, and one thread that periodically commits the > >> offsets. > >> >> >> > Imagine this sequence of events: > >> >> >> > > >> >> >> > > >> >> >> > ==Reader== > >> >> >> > -initializes w/ offset pointing to "a" > >> >> >> > > >> >> >> > -hasNext() > >> >> >> > ---> makeNext() will read "a" > >> >> >> > and update the local offset to "b" > >> >> >> > > >> >> >> > -msg = "a" > >> >> >> > > >> >> >> > -doSomething("a") > >> >> >> > > >> >> >> > -hasNext() > >> >> >> > ----> makeNext() will read "b" > >> >> >> > and update the local offset "c" > >> >> >> > > >> >> >> > ==Commiter== > >> >> >> > > >> >> >> > -commitOffsets stores the current offset as "c" > >> >> >> > > >> >> >> > > >> >> >> > > >> >> >> > =====PROCESS DIES===== > >> >> >> > ===== RESTARTS ===== > >> >> >> > > >> >> >> > ==Reader== > >> >> >> > -initializes w/ offset pointing to "c" > >> >> >> > > >> >> >> > -hasNext() > >> >> >> > --> makeNext() will read "c" > >> >> >> > and update local offset to "d" > >> >> >> > -msg = "c" > >> >> >> > -doSomething("c") > >> >> >> > ... > >> >> >> > > >> >> >> > > >> >> >> > > >> >> >> > note that in this scenario, doSomething("b") was never called. > >> >> >> > Probably for a lot of applications this doesn't matter. But > seems > >> >> >> > like it this could be terrible for some apps. I can't think of > any > >> >> >> > way of preventing it from user code. unless, maybe when the > >> offsets > >> >> >> > get committed, it is always *before* the last thing read? eg., > in > >> my > >> >> >> > example, it would store the next offset as "b" or earlier? > >> >> >> > > >> >> >> > Is there a flaw in my logic? Do committed offsets always > >> "undershoot" > >> >> >> > to prevent this? > >> >> >> > > >> >> >> > thanks, > >> >> >> > Imran > >> >> >> > > >> >> >> > >> >> >> > >> >> >> > >> >> >> -- > >> >> >> -- Guozhang > >> >> >> > >> >> > >> >