I think that the problem is, you don't know which partitions a thread is currently 'owning', and therefore, you don't know which partitions you can commit to.
On Fri, Nov 22, 2013 at 5:28 PM, Imran Rashid <im...@therashids.com> wrote: > I don't think I need control over which partitions are processed by a > thread -- however, I do need the partition --> thread assignment to be > consistent. I didn't realize that assignment could change, I guess > that could mean my solution might not work. It depends a bit on what > happens during reassignment. > > Here's an example: initially Thread A is processing a partition. It > commits its read up to offset 50, but keeps reading up till offset 60. > At that point, the partition is re-assigned to Thread B (which I guess > can be in a completely process). Where will thread B start reading? > It seems like it definitely can't start reading at 60 b/c then you > can't guarantee that messages 50-60 ever get processed. (Not to > mention that it probably doesn't even know A got to 60.) > > But if B starts at 50, then I don't see how each thread committing > their own offsets is a problem. We just need some rule that you can't > move an offset backwards on a commit (at least, not without some force > option). Without that check, I suppose you could have B read up to > 100, commit, and then A commits its read up to 60. (which wouldn't > even violate "at least once", but it would certainly be undesirable). > > The fact that A & B could also be reading from other partitions at the > same time doesn't seem like a problem to me, each thread would commit > their positions in all of their partitions. > > what am I overlooking? > > thanks again for taking the time to work through this with me, I hope > this helpful for a broader audience :) > > imran > > On Fri, Nov 22, 2013 at 2:37 PM, Jason Rosenberg <j...@squareup.com> wrote: > > With the high-level consumer, the design is that you really don't have > > programmatic control over which threads will process which partitions > (and > > usually multiple per thread). Furthermore, the high-level consumer can > > re-balance and redistribute which threads (and which other processes) are > > consuming which partitions, at any time, etc. > > > > So, the notion of tracking specific offsets for commit, which only have > > meaning per partition, doesn't really scale with this design. > > > > Jason > > > > > > On Fri, Nov 22, 2013 at 9:06 AM, Imran Rashid <im...@therashids.com> > wrote: > > > >> hmm, ok, after some more thought I see what you mean.I was thinking > >> async because of a more complicated use case -- my app actually > >> doesn't process messages one at a time, it builds a batch of messages, > >> merges the batches from all threads, and process those batches, and > >> thats when it shoudl commit. > >> > >> But, let's not worry about batches for the moment -- first lets just > >> make sure we can get behavior correct for an app which processes > >> messages one at a time. > >> > >> 1) I agree that synchronous makes more sense in that case. But > >> wouldn't that also benefit from commitOffsets(offsets)? that way each > >> thread could commit just its own offsets, completely avoiding the need > >> for locks & stopping other workers. So the code would look like this: > >> > >> while(itr.hasNext()) { > >> val msg = its.next() > >> updateOffsets(msg) //updates some worker state w/ current position > >> of this worker in all its partitions > >> doSomething(msg) > >> if ( shouldCommit) { > >> consumer.commitOffsets(myOffsets) //myOffsets would be the set of > >> offsets for *only* this worker > >> } > >> } > >> > >> > >> (the actual code would be a bit more complicated b/c of handling > >> TimeoutExceptions in hasNext(), but still lock-free) > >> > >> 2) Though async doesn't have any advantages in this case, its not > >> *incorrect*, is it? I just want to make sure, for building up to the > >> batch case. So instead of directly committing its offsets, the worker > >> would just put its offsets in a queue and keep going. (You can > >> probably see where this is going for batches ...) > >> > >> > >> thanks a lot, this discussion has been fascinating! > >> > >> Imran > >> > >> > >> On Fri, Nov 22, 2013 at 1:06 AM, Jason Rosenberg <j...@squareup.com> > wrote: > >> > Again, > >> > > >> > Remember that if you have multiple threads processing, then it means > they > >> > are consuming from different partitions, so the offset of one > partition > >> > will not be relevant to another. So, you really wouldn't have a use > case > >> > to be committing specific offsets async. Furthermore, if your > consumer > >> is > >> > using a filter to select multiple topics, then even a single thread is > >> > likely going to be consuming multiple topics, so again, a single > offset > >> > commit wouldn't make much sense. > >> > > >> > Jason > >> > > >> > > >> > On Fri, Nov 22, 2013 at 1:30 AM, Imran Rashid <im...@therashids.com> > >> wrote: > >> > > >> >> Hi Jason, > >> >> > >> >> thank you so much! > >> >> > >> >> I was missing the consumer.timeout.ms property. In fact I had > >> >> actually written a full solution using cyclic barriers to make sure > >> >> all my readers had stopped consuming ... but threw it out b/c I > >> >> thought I couldn't deal w/ iterators blocked in hasNext(). That > >> >> property is just what I was missing, I never would have found it > w/out > >> >> your help. > >> >> > >> >> Its also great to get some confirmation that this is a a real issue > -- > >> >> I feel like this should be mentioned in the docs somewhere and > ideally > >> >> hidden behind a clean api. > >> >> > >> >> I completely agree I can do this w/ the synchronous "shouldCommit" > >> >> given the current api. But I'm not sure that is the "right" way. If > I > >> >> just had a one-arg commitOffsets(someOffsets) method, then I think > >> >> there is a way to do this async that is both cleaner and should have > >> >> higher throughput (no need to have a full barrier that stops all the > >> >> workers simultaneously). I've been getting influenced by the akka > way > >> >> of thinking. I think I will be able to share some example code > >> >> tomorrow, maybe you and others can take a look and see if you think > >> >> its an improvement or not. > >> >> > >> >> thanks again! > >> >> imran > >> >> > >> >> > >> >> On Thu, Nov 21, 2013 at 9:54 PM, Jason Rosenberg <j...@squareup.com> > >> wrote: > >> >> > Hi Imran, > >> >> > > >> >> > The thing to do, is not have an asynchronous background thread for > >> >> > committing. Instead have a time based "shouldCommit()" function, > and > >> >> > commit periodically, synchronously. > >> >> > > >> >> > If you have a bursty producer, then you can set a timeout ( > >> >> > consumer.timeout.ms), so that your iter.hasNext() call will > >> periodically > >> >> > wake up and throw a ConsumerTimeoutException. You can catch this > >> >> exception > >> >> > silently, and check your "shouldCommit()" predicate. > >> >> > > >> >> > If you have multiple threads, you can use cyclic barrier, or some > >> such, > >> >> > make sure all threads have paused before doing the synchronous > commit. > >> >> In > >> >> > this case, you should set your timeout pretty low, say 100ms, so > that > >> >> > threads with work to do aren't waiting for quiescent threads to > wake > >> up > >> >> and > >> >> > timeout already. > >> >> > > >> >> > I have a configuration like this. I use a default commit cycle of > 60 > >> >> > seconds, and use 100ms timeout on the consumer iterator. > >> >> > > >> >> > The auto-commit mode built in has the fatal flaw, as you have > pointed > >> >> out, > >> >> > of possibly dropping a single message on a restart. > >> >> > > >> >> > Jason > >> >> > > >> >> > > >> >> > On Thu, Nov 21, 2013 at 9:45 PM, Imran Rashid < > im...@therashids.com> > >> >> wrote: > >> >> > > >> >> >> HI Benjamin, > >> >> >> > >> >> >> great question, I thought that for a while too. I think that > won't > >> >> >> work, in my use case, but I could certainly use somebody else > >> checking > >> >> >> my logic. > >> >> >> > >> >> >> I think that would work, *if* I was willing to allow arbitrary > time > >> to > >> >> >> pass between my commits. Then I could call commitOffsets within > the > >> >> >> while loop like so: > >> >> >> while(itr.hasNext()) { > >> >> >> val msg = itr.next() > >> >> >> doSomething(msg) > >> >> >> if(shouldICommit) { > >> >> >> consumer.commitOffsets > >> >> >> } > >> >> >> } > >> >> >> > >> >> >> (this is what you meant, right?) > >> >> >> > >> >> >> however, its.hasNext() might block an arbitrary amount of time. > The > >> >> >> producers for my queues are very bursty -- they will create a ton > of > >> >> >> messages, and then go silent for a long time (perhaps days). > Since I > >> >> >> do want to ensure commits are happening regularly, that means its > got > >> >> >> to be triggered by another thread (something that can't get > blocked > >> at > >> >> >> hasNext()). Which is why I've got make sure that trigger doesn't > >> >> >> happen between next() and doSomething(). > >> >> >> > >> >> >> a second point (much more minor) is that calling > >> >> >> consumer.commitOffsets commits the offsets for all partitions that > >> >> >> consumer is reading. Which means I've got to create a separate > >> >> >> consumer per thread. (But this is really minor, just a nuisance, > >> >> >> doesn't effect correctness.) > >> >> >> > >> >> >> You might say that even my first point is just a nuisance, and I > just > >> >> >> need to live with it. Then (1) I'd just like to make sure my > >> >> >> understanding is correct -- I think this behavior is really not > >> >> >> obvious, so its worth clarifying and (2) I'd like to confirm that > I > >> >> >> have a workaround and (3) if this problem does exist, and the > >> >> >> workaround is tricky, then maybe the api change I'm proposing > would > >> be > >> >> >> good idea? > >> >> >> > >> >> >> thanks for reading about this and helping me work through it! > >> >> >> > >> >> >> imran > >> >> >> > >> >> >> > >> >> >> On Thu, Nov 21, 2013 at 7:58 PM, Benjamin Black <b...@b3k.us> wrote: > >> >> >> > why not just disable autocommit and only call commit offsets() > >> after > >> >> >> you've > >> >> >> > processed a batch? it isn't obvious to me how doing so would > allow > >> a > >> >> >> > message to be processed zero times. > >> >> >> > On Nov 21, 2013 5:52 PM, "Imran Rashid" <im...@therashids.com> > >> wrote: > >> >> >> > > >> >> >> >> Hi Edward, > >> >> >> >> > >> >> >> >> I think you misunderstand ... I definitely do *not* want to > commit > >> >> >> >> every message. That would be much too expensive, every few > >> minutes > >> >> is > >> >> >> >> plenty for me. > >> >> >> >> > >> >> >> >> I want to guarantee that if I commit a message, that my message > >> has > >> >> >> >> been processed by *my application* at least once. (In another > >> thread > >> >> >> >> I mentioned something about reading exactly once, but that is > not > >> >> what > >> >> >> >> I am going for here -- more than once is OK, as long as its at > >> least > >> >> >> >> once.) That requires me to make sure that a commit never > happens > >> >> >> >> between a call to next() and doSomething(). Kafka guarantees > that > >> >> >> >> messages get read off the queue at least once, but as I > outlined > >> >> >> >> above, a naive solution allows for messages to get read off the > >> >> queue, > >> >> >> >> but not make it into my app. > >> >> >> >> > >> >> >> >> Not only do I not want to commit every message -- I'd really > like > >> to > >> >> >> >> have the above guarantee without even acquiring a lock (most of > >> the > >> >> >> >> time). That is what I was getting at with the comment about > the > >> >> >> >> spin-lock, just as an idea of how to prevent a commit between > >> next() > >> >> >> >> and doSomething(). I dunno if the spin-lock was really the > right > >> >> >> >> idea, that was just a random thought, but the point is, we want > >> some > >> >> >> >> check that should be very cheap for 99.999% of the time when a > >> commit > >> >> >> >> isn't happening, but can still guarantee proper ordering during > >> that > >> >> >> >> 0.001% of the time when a commit is happening. otherwise, > >> messages > >> >> >> >> might never make it to my app. (And this is all just to > prevent a > >> >> >> >> message getting lost during the 10^-6% of the time when a > commit > >> >> might > >> >> >> >> happen between next() and doSomething(), and the app dies > before > >> >> >> >> doSomething completes!) > >> >> >> >> > >> >> >> >> Actually after a bit more thought -- the best way to guarantee > >> this > >> >> >> >> would be with a small api change, that would do away with the > need > >> >> for > >> >> >> >> locks completely. The iterator should have a method > applyToNext: > >> >> >> >> > >> >> >> >> def applyToNext(f: MessageAndData[K,V] => Unit) { > >> >> >> >> //all the stuff in next, *except* > >> >> >> >> currentTopicInfo.resetConsumeOffset(consumedOffset) > >> >> >> >> val item = ... > >> >> >> >> f(item) > >> >> >> >> //after we've applied the users function, now we can update > the > >> >> >> >> offset that should get committed > >> >> >> >> currentTopicInfo.resetConsumeOffset(consumedOffset) > >> >> >> >> } > >> >> >> >> > >> >> >> >> I think this way, you could avoid any problems even with > >> auto-commit. > >> >> >> >> Or, if you don't want to add a new method, so we can stick to > the > >> >> >> >> Iterator api, then maybe the iterator could let you register a > >> >> >> >> preCommitFunction, so next() would change to: > >> >> >> >> > >> >> >> >> override def next(): MessageAndMetadata[K, V] = { > >> >> >> >> ... > >> >> >> >> val item = ... > >> >> >> >> preCommitFuntions.foreach{f => f(item)} > >> >> >> >> *currentTopicInfo.resetConsumeOffset(consumedOffset)* > >> >> >> >> ... > >> >> >> >> item > >> >> >> >> } > >> >> >> >> > >> >> >> >> > >> >> >> >> thanks, > >> >> >> >> Imran > >> >> >> >> > >> >> >> >> On Thu, Nov 21, 2013 at 6:55 PM, Edward Capriolo < > >> >> edlinuxg...@gmail.com > >> >> >> > > >> >> >> >> wrote: > >> >> >> >> > You likely need to use a custom offset solution if you plan > on > >> >> >> committing > >> >> >> >> > every message. With many partitions this puts a large burden > on > >> >> >> >> Zookeeper, > >> >> >> >> > you end up needing to roll over your zk transaction logs > fast as > >> >> well > >> >> >> or > >> >> >> >> > rist filling up disk > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > On Thu, Nov 21, 2013 at 6:20 PM, Guozhang Wang < > >> wangg...@gmail.com > >> >> > > >> >> >> >> wrote: > >> >> >> >> > > >> >> >> >> >> Hello Imran, > >> >> >> >> >> > >> >> >> >> >> The offset will only be updated when the next() function is > >> >> called: > >> >> >> >> >> > >> >> >> >> >> override def next(): MessageAndMetadata[K, V] = { > >> >> >> >> >> ... > >> >> >> >> >> *currentTopicInfo.resetConsumeOffset(consumedOffset)* > >> >> >> >> >> ... > >> >> >> >> >> item > >> >> >> >> >> } > >> >> >> >> >> > >> >> >> >> >> instead of in makeNext(), which will just update > >> consumedOffset, > >> >> but > >> >> >> >> that > >> >> >> >> >> is not the value that will be committed using the > commitOffset > >> >> call. > >> >> >> So > >> >> >> >> as > >> >> >> >> >> long as you turn of auto commit and only call commitOffset > >> after > >> >> the > >> >> >> >> >> process(msg) call, not after the > >> >> >> >> >> > >> >> >> >> >> b = iter.next() > >> >> >> >> >> > >> >> >> >> >> is called, at-least-once is guaranteed. > >> >> >> >> >> > >> >> >> >> >> Does that make sense? > >> >> >> >> >> > >> >> >> >> >> Guozhang > >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> >> On Thu, Nov 21, 2013 at 2:14 PM, Imran Rashid < > >> >> im...@therashids.com> > >> >> >> >> >> wrote: > >> >> >> >> >> > >> >> >> >> >> > sorry to keep bugging the list, but I feel like I am > either > >> >> missing > >> >> >> >> >> > something important, or I'm finding something wrong w/ the > >> >> standard > >> >> >> >> >> > consumer api, (or maybe just the docs need some > >> clarification). > >> >> >> >> >> > > >> >> >> >> >> > I started to think that I should probably just accept at > >> least > >> >> once > >> >> >> >> >> > semantics ... but I eventually realized that I'm not even > >> sure > >> >> we > >> >> >> >> >> > really get an at least once guarantee. I think it really > >> might > >> >> be > >> >> >> >> >> > zero-or-more. Or rather, messages will get pulled off the > >> kafka > >> >> >> queue > >> >> >> >> >> > at least once. but that doesn't mean your app will > actually > >> >> >> *process* > >> >> >> >> >> > those messages at least once -- there might be messages it > >> never > >> >> >> >> >> > processes. > >> >> >> >> >> > > >> >> >> >> >> > Consider a really basic reader of a kafka queue: > >> >> >> >> >> > > >> >> >> >> >> > while(it.hasNext()){ > >> >> >> >> >> > val msg = it.next() > >> >> >> >> >> > doSomething(msg) > >> >> >> >> >> > } > >> >> >> >> >> > > >> >> >> >> >> > the question is, do I have any guarantees on how many > times > >> >> >> >> >> > doSomething() is called on everything in the queue? I > think > >> the > >> >> >> >> >> > "guarantee" is: > >> >> >> >> >> > 1) most messages will get processed excatly once > >> >> >> >> >> > 2) around a restart, a chunk of msgs will get processed at > >> least > >> >> >> once, > >> >> >> >> >> > but probably more than once > >> >> >> >> >> > 3) around a restart, it is possible that one message will > get > >> >> >> >> >> > processed ZERO times > >> >> >> >> >> > > >> >> >> >> >> > (1) & (2) are probably clear, so lemme explain how I think > >> (3) > >> >> >> could > >> >> >> >> >> > happen. Lets imagine messages a,b,c,... and two threads, > one > >> >> >> reading > >> >> >> >> >> > from the stream, and one thread that periodically commits > the > >> >> >> offsets. > >> >> >> >> >> > Imagine this sequence of events: > >> >> >> >> >> > > >> >> >> >> >> > > >> >> >> >> >> > ==Reader== > >> >> >> >> >> > -initializes w/ offset pointing to "a" > >> >> >> >> >> > > >> >> >> >> >> > -hasNext() > >> >> >> >> >> > ---> makeNext() will read "a" > >> >> >> >> >> > and update the local offset to "b" > >> >> >> >> >> > > >> >> >> >> >> > -msg = "a" > >> >> >> >> >> > > >> >> >> >> >> > -doSomething("a") > >> >> >> >> >> > > >> >> >> >> >> > -hasNext() > >> >> >> >> >> > ----> makeNext() will read "b" > >> >> >> >> >> > and update the local offset "c" > >> >> >> >> >> > > >> >> >> >> >> > ==Commiter== > >> >> >> >> >> > > >> >> >> >> >> > -commitOffsets stores the current offset as "c" > >> >> >> >> >> > > >> >> >> >> >> > > >> >> >> >> >> > > >> >> >> >> >> > =====PROCESS DIES===== > >> >> >> >> >> > ===== RESTARTS ===== > >> >> >> >> >> > > >> >> >> >> >> > ==Reader== > >> >> >> >> >> > -initializes w/ offset pointing to "c" > >> >> >> >> >> > > >> >> >> >> >> > -hasNext() > >> >> >> >> >> > --> makeNext() will read "c" > >> >> >> >> >> > and update local offset to "d" > >> >> >> >> >> > -msg = "c" > >> >> >> >> >> > -doSomething("c") > >> >> >> >> >> > ... > >> >> >> >> >> > > >> >> >> >> >> > > >> >> >> >> >> > > >> >> >> >> >> > note that in this scenario, doSomething("b") was never > >> called. > >> >> >> >> >> > Probably for a lot of applications this doesn't matter. > But > >> >> seems > >> >> >> >> >> > like it this could be terrible for some apps. I can't > think > >> of > >> >> any > >> >> >> >> >> > way of preventing it from user code. unless, maybe when > the > >> >> >> offsets > >> >> >> >> >> > get committed, it is always *before* the last thing read? > >> eg., > >> >> in > >> >> >> my > >> >> >> >> >> > example, it would store the next offset as "b" or earlier? > >> >> >> >> >> > > >> >> >> >> >> > Is there a flaw in my logic? Do committed offsets always > >> >> >> "undershoot" > >> >> >> >> >> > to prevent this? > >> >> >> >> >> > > >> >> >> >> >> > thanks, > >> >> >> >> >> > Imran > >> >> >> >> >> > > >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> >> -- > >> >> >> >> >> -- Guozhang > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> >