Hi Jason, thank you so much!
I was missing the consumer.timeout.ms property. In fact I had actually written a full solution using cyclic barriers to make sure all my readers had stopped consuming ... but threw it out b/c I thought I couldn't deal w/ iterators blocked in hasNext(). That property is just what I was missing, I never would have found it w/out your help. Its also great to get some confirmation that this is a a real issue -- I feel like this should be mentioned in the docs somewhere and ideally hidden behind a clean api. I completely agree I can do this w/ the synchronous "shouldCommit" given the current api. But I'm not sure that is the "right" way. If I just had a one-arg commitOffsets(someOffsets) method, then I think there is a way to do this async that is both cleaner and should have higher throughput (no need to have a full barrier that stops all the workers simultaneously). I've been getting influenced by the akka way of thinking. I think I will be able to share some example code tomorrow, maybe you and others can take a look and see if you think its an improvement or not. thanks again! imran On Thu, Nov 21, 2013 at 9:54 PM, Jason Rosenberg <j...@squareup.com> wrote: > Hi Imran, > > The thing to do, is not have an asynchronous background thread for > committing. Instead have a time based "shouldCommit()" function, and > commit periodically, synchronously. > > If you have a bursty producer, then you can set a timeout ( > consumer.timeout.ms), so that your iter.hasNext() call will periodically > wake up and throw a ConsumerTimeoutException. You can catch this exception > silently, and check your "shouldCommit()" predicate. > > If you have multiple threads, you can use cyclic barrier, or some such, > make sure all threads have paused before doing the synchronous commit. In > this case, you should set your timeout pretty low, say 100ms, so that > threads with work to do aren't waiting for quiescent threads to wake up and > timeout already. > > I have a configuration like this. I use a default commit cycle of 60 > seconds, and use 100ms timeout on the consumer iterator. > > The auto-commit mode built in has the fatal flaw, as you have pointed out, > of possibly dropping a single message on a restart. > > Jason > > > On Thu, Nov 21, 2013 at 9:45 PM, Imran Rashid <im...@therashids.com> wrote: > >> HI Benjamin, >> >> great question, I thought that for a while too. I think that won't >> work, in my use case, but I could certainly use somebody else checking >> my logic. >> >> I think that would work, *if* I was willing to allow arbitrary time to >> pass between my commits. Then I could call commitOffsets within the >> while loop like so: >> while(itr.hasNext()) { >> val msg = itr.next() >> doSomething(msg) >> if(shouldICommit) { >> consumer.commitOffsets >> } >> } >> >> (this is what you meant, right?) >> >> however, its.hasNext() might block an arbitrary amount of time. The >> producers for my queues are very bursty -- they will create a ton of >> messages, and then go silent for a long time (perhaps days). Since I >> do want to ensure commits are happening regularly, that means its got >> to be triggered by another thread (something that can't get blocked at >> hasNext()). Which is why I've got make sure that trigger doesn't >> happen between next() and doSomething(). >> >> a second point (much more minor) is that calling >> consumer.commitOffsets commits the offsets for all partitions that >> consumer is reading. Which means I've got to create a separate >> consumer per thread. (But this is really minor, just a nuisance, >> doesn't effect correctness.) >> >> You might say that even my first point is just a nuisance, and I just >> need to live with it. Then (1) I'd just like to make sure my >> understanding is correct -- I think this behavior is really not >> obvious, so its worth clarifying and (2) I'd like to confirm that I >> have a workaround and (3) if this problem does exist, and the >> workaround is tricky, then maybe the api change I'm proposing would be >> good idea? >> >> thanks for reading about this and helping me work through it! >> >> imran >> >> >> On Thu, Nov 21, 2013 at 7:58 PM, Benjamin Black <b...@b3k.us> wrote: >> > why not just disable autocommit and only call commit offsets() after >> you've >> > processed a batch? it isn't obvious to me how doing so would allow a >> > message to be processed zero times. >> > On Nov 21, 2013 5:52 PM, "Imran Rashid" <im...@therashids.com> wrote: >> > >> >> Hi Edward, >> >> >> >> I think you misunderstand ... I definitely do *not* want to commit >> >> every message. That would be much too expensive, every few minutes is >> >> plenty for me. >> >> >> >> I want to guarantee that if I commit a message, that my message has >> >> been processed by *my application* at least once. (In another thread >> >> I mentioned something about reading exactly once, but that is not what >> >> I am going for here -- more than once is OK, as long as its at least >> >> once.) That requires me to make sure that a commit never happens >> >> between a call to next() and doSomething(). Kafka guarantees that >> >> messages get read off the queue at least once, but as I outlined >> >> above, a naive solution allows for messages to get read off the queue, >> >> but not make it into my app. >> >> >> >> Not only do I not want to commit every message -- I'd really like to >> >> have the above guarantee without even acquiring a lock (most of the >> >> time). That is what I was getting at with the comment about the >> >> spin-lock, just as an idea of how to prevent a commit between next() >> >> and doSomething(). I dunno if the spin-lock was really the right >> >> idea, that was just a random thought, but the point is, we want some >> >> check that should be very cheap for 99.999% of the time when a commit >> >> isn't happening, but can still guarantee proper ordering during that >> >> 0.001% of the time when a commit is happening. otherwise, messages >> >> might never make it to my app. (And this is all just to prevent a >> >> message getting lost during the 10^-6% of the time when a commit might >> >> happen between next() and doSomething(), and the app dies before >> >> doSomething completes!) >> >> >> >> Actually after a bit more thought -- the best way to guarantee this >> >> would be with a small api change, that would do away with the need for >> >> locks completely. The iterator should have a method applyToNext: >> >> >> >> def applyToNext(f: MessageAndData[K,V] => Unit) { >> >> //all the stuff in next, *except* >> >> currentTopicInfo.resetConsumeOffset(consumedOffset) >> >> val item = ... >> >> f(item) >> >> //after we've applied the users function, now we can update the >> >> offset that should get committed >> >> currentTopicInfo.resetConsumeOffset(consumedOffset) >> >> } >> >> >> >> I think this way, you could avoid any problems even with auto-commit. >> >> Or, if you don't want to add a new method, so we can stick to the >> >> Iterator api, then maybe the iterator could let you register a >> >> preCommitFunction, so next() would change to: >> >> >> >> override def next(): MessageAndMetadata[K, V] = { >> >> ... >> >> val item = ... >> >> preCommitFuntions.foreach{f => f(item)} >> >> *currentTopicInfo.resetConsumeOffset(consumedOffset)* >> >> ... >> >> item >> >> } >> >> >> >> >> >> thanks, >> >> Imran >> >> >> >> On Thu, Nov 21, 2013 at 6:55 PM, Edward Capriolo <edlinuxg...@gmail.com >> > >> >> wrote: >> >> > You likely need to use a custom offset solution if you plan on >> committing >> >> > every message. With many partitions this puts a large burden on >> >> Zookeeper, >> >> > you end up needing to roll over your zk transaction logs fast as well >> or >> >> > rist filling up disk >> >> > >> >> > >> >> > On Thu, Nov 21, 2013 at 6:20 PM, Guozhang Wang <wangg...@gmail.com> >> >> wrote: >> >> > >> >> >> Hello Imran, >> >> >> >> >> >> The offset will only be updated when the next() function is called: >> >> >> >> >> >> override def next(): MessageAndMetadata[K, V] = { >> >> >> ... >> >> >> *currentTopicInfo.resetConsumeOffset(consumedOffset)* >> >> >> ... >> >> >> item >> >> >> } >> >> >> >> >> >> instead of in makeNext(), which will just update consumedOffset, but >> >> that >> >> >> is not the value that will be committed using the commitOffset call. >> So >> >> as >> >> >> long as you turn of auto commit and only call commitOffset after the >> >> >> process(msg) call, not after the >> >> >> >> >> >> b = iter.next() >> >> >> >> >> >> is called, at-least-once is guaranteed. >> >> >> >> >> >> Does that make sense? >> >> >> >> >> >> Guozhang >> >> >> >> >> >> >> >> >> On Thu, Nov 21, 2013 at 2:14 PM, Imran Rashid <im...@therashids.com> >> >> >> wrote: >> >> >> >> >> >> > sorry to keep bugging the list, but I feel like I am either missing >> >> >> > something important, or I'm finding something wrong w/ the standard >> >> >> > consumer api, (or maybe just the docs need some clarification). >> >> >> > >> >> >> > I started to think that I should probably just accept at least once >> >> >> > semantics ... but I eventually realized that I'm not even sure we >> >> >> > really get an at least once guarantee. I think it really might be >> >> >> > zero-or-more. Or rather, messages will get pulled off the kafka >> queue >> >> >> > at least once. but that doesn't mean your app will actually >> *process* >> >> >> > those messages at least once -- there might be messages it never >> >> >> > processes. >> >> >> > >> >> >> > Consider a really basic reader of a kafka queue: >> >> >> > >> >> >> > while(it.hasNext()){ >> >> >> > val msg = it.next() >> >> >> > doSomething(msg) >> >> >> > } >> >> >> > >> >> >> > the question is, do I have any guarantees on how many times >> >> >> > doSomething() is called on everything in the queue? I think the >> >> >> > "guarantee" is: >> >> >> > 1) most messages will get processed excatly once >> >> >> > 2) around a restart, a chunk of msgs will get processed at least >> once, >> >> >> > but probably more than once >> >> >> > 3) around a restart, it is possible that one message will get >> >> >> > processed ZERO times >> >> >> > >> >> >> > (1) & (2) are probably clear, so lemme explain how I think (3) >> could >> >> >> > happen. Lets imagine messages a,b,c,... and two threads, one >> reading >> >> >> > from the stream, and one thread that periodically commits the >> offsets. >> >> >> > Imagine this sequence of events: >> >> >> > >> >> >> > >> >> >> > ==Reader== >> >> >> > -initializes w/ offset pointing to "a" >> >> >> > >> >> >> > -hasNext() >> >> >> > ---> makeNext() will read "a" >> >> >> > and update the local offset to "b" >> >> >> > >> >> >> > -msg = "a" >> >> >> > >> >> >> > -doSomething("a") >> >> >> > >> >> >> > -hasNext() >> >> >> > ----> makeNext() will read "b" >> >> >> > and update the local offset "c" >> >> >> > >> >> >> > ==Commiter== >> >> >> > >> >> >> > -commitOffsets stores the current offset as "c" >> >> >> > >> >> >> > >> >> >> > >> >> >> > =====PROCESS DIES===== >> >> >> > ===== RESTARTS ===== >> >> >> > >> >> >> > ==Reader== >> >> >> > -initializes w/ offset pointing to "c" >> >> >> > >> >> >> > -hasNext() >> >> >> > --> makeNext() will read "c" >> >> >> > and update local offset to "d" >> >> >> > -msg = "c" >> >> >> > -doSomething("c") >> >> >> > ... >> >> >> > >> >> >> > >> >> >> > >> >> >> > note that in this scenario, doSomething("b") was never called. >> >> >> > Probably for a lot of applications this doesn't matter. But seems >> >> >> > like it this could be terrible for some apps. I can't think of any >> >> >> > way of preventing it from user code. unless, maybe when the >> offsets >> >> >> > get committed, it is always *before* the last thing read? eg., in >> my >> >> >> > example, it would store the next offset as "b" or earlier? >> >> >> > >> >> >> > Is there a flaw in my logic? Do committed offsets always >> "undershoot" >> >> >> > to prevent this? >> >> >> > >> >> >> > thanks, >> >> >> > Imran >> >> >> > >> >> >> >> >> >> >> >> >> >> >> >> -- >> >> >> -- Guozhang >> >> >> >> >> >>