I don't think I need control over which partitions are processed by a thread -- however, I do need the partition --> thread assignment to be consistent. I didn't realize that assignment could change, I guess that could mean my solution might not work. It depends a bit on what happens during reassignment.
Here's an example: initially Thread A is processing a partition. It commits its read up to offset 50, but keeps reading up till offset 60. At that point, the partition is re-assigned to Thread B (which I guess can be in a completely process). Where will thread B start reading? It seems like it definitely can't start reading at 60 b/c then you can't guarantee that messages 50-60 ever get processed. (Not to mention that it probably doesn't even know A got to 60.) But if B starts at 50, then I don't see how each thread committing their own offsets is a problem. We just need some rule that you can't move an offset backwards on a commit (at least, not without some force option). Without that check, I suppose you could have B read up to 100, commit, and then A commits its read up to 60. (which wouldn't even violate "at least once", but it would certainly be undesirable). The fact that A & B could also be reading from other partitions at the same time doesn't seem like a problem to me, each thread would commit their positions in all of their partitions. what am I overlooking? thanks again for taking the time to work through this with me, I hope this helpful for a broader audience :) imran On Fri, Nov 22, 2013 at 2:37 PM, Jason Rosenberg <j...@squareup.com> wrote: > With the high-level consumer, the design is that you really don't have > programmatic control over which threads will process which partitions (and > usually multiple per thread). Furthermore, the high-level consumer can > re-balance and redistribute which threads (and which other processes) are > consuming which partitions, at any time, etc. > > So, the notion of tracking specific offsets for commit, which only have > meaning per partition, doesn't really scale with this design. > > Jason > > > On Fri, Nov 22, 2013 at 9:06 AM, Imran Rashid <im...@therashids.com> wrote: > >> hmm, ok, after some more thought I see what you mean.I was thinking >> async because of a more complicated use case -- my app actually >> doesn't process messages one at a time, it builds a batch of messages, >> merges the batches from all threads, and process those batches, and >> thats when it shoudl commit. >> >> But, let's not worry about batches for the moment -- first lets just >> make sure we can get behavior correct for an app which processes >> messages one at a time. >> >> 1) I agree that synchronous makes more sense in that case. But >> wouldn't that also benefit from commitOffsets(offsets)? that way each >> thread could commit just its own offsets, completely avoiding the need >> for locks & stopping other workers. So the code would look like this: >> >> while(itr.hasNext()) { >> val msg = its.next() >> updateOffsets(msg) //updates some worker state w/ current position >> of this worker in all its partitions >> doSomething(msg) >> if ( shouldCommit) { >> consumer.commitOffsets(myOffsets) //myOffsets would be the set of >> offsets for *only* this worker >> } >> } >> >> >> (the actual code would be a bit more complicated b/c of handling >> TimeoutExceptions in hasNext(), but still lock-free) >> >> 2) Though async doesn't have any advantages in this case, its not >> *incorrect*, is it? I just want to make sure, for building up to the >> batch case. So instead of directly committing its offsets, the worker >> would just put its offsets in a queue and keep going. (You can >> probably see where this is going for batches ...) >> >> >> thanks a lot, this discussion has been fascinating! >> >> Imran >> >> >> On Fri, Nov 22, 2013 at 1:06 AM, Jason Rosenberg <j...@squareup.com> wrote: >> > Again, >> > >> > Remember that if you have multiple threads processing, then it means they >> > are consuming from different partitions, so the offset of one partition >> > will not be relevant to another. So, you really wouldn't have a use case >> > to be committing specific offsets async. Furthermore, if your consumer >> is >> > using a filter to select multiple topics, then even a single thread is >> > likely going to be consuming multiple topics, so again, a single offset >> > commit wouldn't make much sense. >> > >> > Jason >> > >> > >> > On Fri, Nov 22, 2013 at 1:30 AM, Imran Rashid <im...@therashids.com> >> wrote: >> > >> >> Hi Jason, >> >> >> >> thank you so much! >> >> >> >> I was missing the consumer.timeout.ms property. In fact I had >> >> actually written a full solution using cyclic barriers to make sure >> >> all my readers had stopped consuming ... but threw it out b/c I >> >> thought I couldn't deal w/ iterators blocked in hasNext(). That >> >> property is just what I was missing, I never would have found it w/out >> >> your help. >> >> >> >> Its also great to get some confirmation that this is a a real issue -- >> >> I feel like this should be mentioned in the docs somewhere and ideally >> >> hidden behind a clean api. >> >> >> >> I completely agree I can do this w/ the synchronous "shouldCommit" >> >> given the current api. But I'm not sure that is the "right" way. If I >> >> just had a one-arg commitOffsets(someOffsets) method, then I think >> >> there is a way to do this async that is both cleaner and should have >> >> higher throughput (no need to have a full barrier that stops all the >> >> workers simultaneously). I've been getting influenced by the akka way >> >> of thinking. I think I will be able to share some example code >> >> tomorrow, maybe you and others can take a look and see if you think >> >> its an improvement or not. >> >> >> >> thanks again! >> >> imran >> >> >> >> >> >> On Thu, Nov 21, 2013 at 9:54 PM, Jason Rosenberg <j...@squareup.com> >> wrote: >> >> > Hi Imran, >> >> > >> >> > The thing to do, is not have an asynchronous background thread for >> >> > committing. Instead have a time based "shouldCommit()" function, and >> >> > commit periodically, synchronously. >> >> > >> >> > If you have a bursty producer, then you can set a timeout ( >> >> > consumer.timeout.ms), so that your iter.hasNext() call will >> periodically >> >> > wake up and throw a ConsumerTimeoutException. You can catch this >> >> exception >> >> > silently, and check your "shouldCommit()" predicate. >> >> > >> >> > If you have multiple threads, you can use cyclic barrier, or some >> such, >> >> > make sure all threads have paused before doing the synchronous commit. >> >> In >> >> > this case, you should set your timeout pretty low, say 100ms, so that >> >> > threads with work to do aren't waiting for quiescent threads to wake >> up >> >> and >> >> > timeout already. >> >> > >> >> > I have a configuration like this. I use a default commit cycle of 60 >> >> > seconds, and use 100ms timeout on the consumer iterator. >> >> > >> >> > The auto-commit mode built in has the fatal flaw, as you have pointed >> >> out, >> >> > of possibly dropping a single message on a restart. >> >> > >> >> > Jason >> >> > >> >> > >> >> > On Thu, Nov 21, 2013 at 9:45 PM, Imran Rashid <im...@therashids.com> >> >> wrote: >> >> > >> >> >> HI Benjamin, >> >> >> >> >> >> great question, I thought that for a while too. I think that won't >> >> >> work, in my use case, but I could certainly use somebody else >> checking >> >> >> my logic. >> >> >> >> >> >> I think that would work, *if* I was willing to allow arbitrary time >> to >> >> >> pass between my commits. Then I could call commitOffsets within the >> >> >> while loop like so: >> >> >> while(itr.hasNext()) { >> >> >> val msg = itr.next() >> >> >> doSomething(msg) >> >> >> if(shouldICommit) { >> >> >> consumer.commitOffsets >> >> >> } >> >> >> } >> >> >> >> >> >> (this is what you meant, right?) >> >> >> >> >> >> however, its.hasNext() might block an arbitrary amount of time. The >> >> >> producers for my queues are very bursty -- they will create a ton of >> >> >> messages, and then go silent for a long time (perhaps days). Since I >> >> >> do want to ensure commits are happening regularly, that means its got >> >> >> to be triggered by another thread (something that can't get blocked >> at >> >> >> hasNext()). Which is why I've got make sure that trigger doesn't >> >> >> happen between next() and doSomething(). >> >> >> >> >> >> a second point (much more minor) is that calling >> >> >> consumer.commitOffsets commits the offsets for all partitions that >> >> >> consumer is reading. Which means I've got to create a separate >> >> >> consumer per thread. (But this is really minor, just a nuisance, >> >> >> doesn't effect correctness.) >> >> >> >> >> >> You might say that even my first point is just a nuisance, and I just >> >> >> need to live with it. Then (1) I'd just like to make sure my >> >> >> understanding is correct -- I think this behavior is really not >> >> >> obvious, so its worth clarifying and (2) I'd like to confirm that I >> >> >> have a workaround and (3) if this problem does exist, and the >> >> >> workaround is tricky, then maybe the api change I'm proposing would >> be >> >> >> good idea? >> >> >> >> >> >> thanks for reading about this and helping me work through it! >> >> >> >> >> >> imran >> >> >> >> >> >> >> >> >> On Thu, Nov 21, 2013 at 7:58 PM, Benjamin Black <b...@b3k.us> wrote: >> >> >> > why not just disable autocommit and only call commit offsets() >> after >> >> >> you've >> >> >> > processed a batch? it isn't obvious to me how doing so would allow >> a >> >> >> > message to be processed zero times. >> >> >> > On Nov 21, 2013 5:52 PM, "Imran Rashid" <im...@therashids.com> >> wrote: >> >> >> > >> >> >> >> Hi Edward, >> >> >> >> >> >> >> >> I think you misunderstand ... I definitely do *not* want to commit >> >> >> >> every message. That would be much too expensive, every few >> minutes >> >> is >> >> >> >> plenty for me. >> >> >> >> >> >> >> >> I want to guarantee that if I commit a message, that my message >> has >> >> >> >> been processed by *my application* at least once. (In another >> thread >> >> >> >> I mentioned something about reading exactly once, but that is not >> >> what >> >> >> >> I am going for here -- more than once is OK, as long as its at >> least >> >> >> >> once.) That requires me to make sure that a commit never happens >> >> >> >> between a call to next() and doSomething(). Kafka guarantees that >> >> >> >> messages get read off the queue at least once, but as I outlined >> >> >> >> above, a naive solution allows for messages to get read off the >> >> queue, >> >> >> >> but not make it into my app. >> >> >> >> >> >> >> >> Not only do I not want to commit every message -- I'd really like >> to >> >> >> >> have the above guarantee without even acquiring a lock (most of >> the >> >> >> >> time). That is what I was getting at with the comment about the >> >> >> >> spin-lock, just as an idea of how to prevent a commit between >> next() >> >> >> >> and doSomething(). I dunno if the spin-lock was really the right >> >> >> >> idea, that was just a random thought, but the point is, we want >> some >> >> >> >> check that should be very cheap for 99.999% of the time when a >> commit >> >> >> >> isn't happening, but can still guarantee proper ordering during >> that >> >> >> >> 0.001% of the time when a commit is happening. otherwise, >> messages >> >> >> >> might never make it to my app. (And this is all just to prevent a >> >> >> >> message getting lost during the 10^-6% of the time when a commit >> >> might >> >> >> >> happen between next() and doSomething(), and the app dies before >> >> >> >> doSomething completes!) >> >> >> >> >> >> >> >> Actually after a bit more thought -- the best way to guarantee >> this >> >> >> >> would be with a small api change, that would do away with the need >> >> for >> >> >> >> locks completely. The iterator should have a method applyToNext: >> >> >> >> >> >> >> >> def applyToNext(f: MessageAndData[K,V] => Unit) { >> >> >> >> //all the stuff in next, *except* >> >> >> >> currentTopicInfo.resetConsumeOffset(consumedOffset) >> >> >> >> val item = ... >> >> >> >> f(item) >> >> >> >> //after we've applied the users function, now we can update the >> >> >> >> offset that should get committed >> >> >> >> currentTopicInfo.resetConsumeOffset(consumedOffset) >> >> >> >> } >> >> >> >> >> >> >> >> I think this way, you could avoid any problems even with >> auto-commit. >> >> >> >> Or, if you don't want to add a new method, so we can stick to the >> >> >> >> Iterator api, then maybe the iterator could let you register a >> >> >> >> preCommitFunction, so next() would change to: >> >> >> >> >> >> >> >> override def next(): MessageAndMetadata[K, V] = { >> >> >> >> ... >> >> >> >> val item = ... >> >> >> >> preCommitFuntions.foreach{f => f(item)} >> >> >> >> *currentTopicInfo.resetConsumeOffset(consumedOffset)* >> >> >> >> ... >> >> >> >> item >> >> >> >> } >> >> >> >> >> >> >> >> >> >> >> >> thanks, >> >> >> >> Imran >> >> >> >> >> >> >> >> On Thu, Nov 21, 2013 at 6:55 PM, Edward Capriolo < >> >> edlinuxg...@gmail.com >> >> >> > >> >> >> >> wrote: >> >> >> >> > You likely need to use a custom offset solution if you plan on >> >> >> committing >> >> >> >> > every message. With many partitions this puts a large burden on >> >> >> >> Zookeeper, >> >> >> >> > you end up needing to roll over your zk transaction logs fast as >> >> well >> >> >> or >> >> >> >> > rist filling up disk >> >> >> >> > >> >> >> >> > >> >> >> >> > On Thu, Nov 21, 2013 at 6:20 PM, Guozhang Wang < >> wangg...@gmail.com >> >> > >> >> >> >> wrote: >> >> >> >> > >> >> >> >> >> Hello Imran, >> >> >> >> >> >> >> >> >> >> The offset will only be updated when the next() function is >> >> called: >> >> >> >> >> >> >> >> >> >> override def next(): MessageAndMetadata[K, V] = { >> >> >> >> >> ... >> >> >> >> >> *currentTopicInfo.resetConsumeOffset(consumedOffset)* >> >> >> >> >> ... >> >> >> >> >> item >> >> >> >> >> } >> >> >> >> >> >> >> >> >> >> instead of in makeNext(), which will just update >> consumedOffset, >> >> but >> >> >> >> that >> >> >> >> >> is not the value that will be committed using the commitOffset >> >> call. >> >> >> So >> >> >> >> as >> >> >> >> >> long as you turn of auto commit and only call commitOffset >> after >> >> the >> >> >> >> >> process(msg) call, not after the >> >> >> >> >> >> >> >> >> >> b = iter.next() >> >> >> >> >> >> >> >> >> >> is called, at-least-once is guaranteed. >> >> >> >> >> >> >> >> >> >> Does that make sense? >> >> >> >> >> >> >> >> >> >> Guozhang >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> On Thu, Nov 21, 2013 at 2:14 PM, Imran Rashid < >> >> im...@therashids.com> >> >> >> >> >> wrote: >> >> >> >> >> >> >> >> >> >> > sorry to keep bugging the list, but I feel like I am either >> >> missing >> >> >> >> >> > something important, or I'm finding something wrong w/ the >> >> standard >> >> >> >> >> > consumer api, (or maybe just the docs need some >> clarification). >> >> >> >> >> > >> >> >> >> >> > I started to think that I should probably just accept at >> least >> >> once >> >> >> >> >> > semantics ... but I eventually realized that I'm not even >> sure >> >> we >> >> >> >> >> > really get an at least once guarantee. I think it really >> might >> >> be >> >> >> >> >> > zero-or-more. Or rather, messages will get pulled off the >> kafka >> >> >> queue >> >> >> >> >> > at least once. but that doesn't mean your app will actually >> >> >> *process* >> >> >> >> >> > those messages at least once -- there might be messages it >> never >> >> >> >> >> > processes. >> >> >> >> >> > >> >> >> >> >> > Consider a really basic reader of a kafka queue: >> >> >> >> >> > >> >> >> >> >> > while(it.hasNext()){ >> >> >> >> >> > val msg = it.next() >> >> >> >> >> > doSomething(msg) >> >> >> >> >> > } >> >> >> >> >> > >> >> >> >> >> > the question is, do I have any guarantees on how many times >> >> >> >> >> > doSomething() is called on everything in the queue? I think >> the >> >> >> >> >> > "guarantee" is: >> >> >> >> >> > 1) most messages will get processed excatly once >> >> >> >> >> > 2) around a restart, a chunk of msgs will get processed at >> least >> >> >> once, >> >> >> >> >> > but probably more than once >> >> >> >> >> > 3) around a restart, it is possible that one message will get >> >> >> >> >> > processed ZERO times >> >> >> >> >> > >> >> >> >> >> > (1) & (2) are probably clear, so lemme explain how I think >> (3) >> >> >> could >> >> >> >> >> > happen. Lets imagine messages a,b,c,... and two threads, one >> >> >> reading >> >> >> >> >> > from the stream, and one thread that periodically commits the >> >> >> offsets. >> >> >> >> >> > Imagine this sequence of events: >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> >> > ==Reader== >> >> >> >> >> > -initializes w/ offset pointing to "a" >> >> >> >> >> > >> >> >> >> >> > -hasNext() >> >> >> >> >> > ---> makeNext() will read "a" >> >> >> >> >> > and update the local offset to "b" >> >> >> >> >> > >> >> >> >> >> > -msg = "a" >> >> >> >> >> > >> >> >> >> >> > -doSomething("a") >> >> >> >> >> > >> >> >> >> >> > -hasNext() >> >> >> >> >> > ----> makeNext() will read "b" >> >> >> >> >> > and update the local offset "c" >> >> >> >> >> > >> >> >> >> >> > ==Commiter== >> >> >> >> >> > >> >> >> >> >> > -commitOffsets stores the current offset as "c" >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> >> > =====PROCESS DIES===== >> >> >> >> >> > ===== RESTARTS ===== >> >> >> >> >> > >> >> >> >> >> > ==Reader== >> >> >> >> >> > -initializes w/ offset pointing to "c" >> >> >> >> >> > >> >> >> >> >> > -hasNext() >> >> >> >> >> > --> makeNext() will read "c" >> >> >> >> >> > and update local offset to "d" >> >> >> >> >> > -msg = "c" >> >> >> >> >> > -doSomething("c") >> >> >> >> >> > ... >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> >> > note that in this scenario, doSomething("b") was never >> called. >> >> >> >> >> > Probably for a lot of applications this doesn't matter. But >> >> seems >> >> >> >> >> > like it this could be terrible for some apps. I can't think >> of >> >> any >> >> >> >> >> > way of preventing it from user code. unless, maybe when the >> >> >> offsets >> >> >> >> >> > get committed, it is always *before* the last thing read? >> eg., >> >> in >> >> >> my >> >> >> >> >> > example, it would store the next offset as "b" or earlier? >> >> >> >> >> > >> >> >> >> >> > Is there a flaw in my logic? Do committed offsets always >> >> >> "undershoot" >> >> >> >> >> > to prevent this? >> >> >> >> >> > >> >> >> >> >> > thanks, >> >> >> >> >> > Imran >> >> >> >> >> > >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> -- >> >> >> >> >> -- Guozhang >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>