I think that the problem is, you don't know which partitions a thread is
currently 'owning', and therefore, you don't know which partitions you can
commit to.


On Fri, Nov 22, 2013 at 5:28 PM, Imran Rashid <im...@therashids.com> wrote:

> I don't think I need control over which partitions are processed by a
> thread -- however, I do need the partition --> thread assignment to be
> consistent.  I didn't realize that assignment could change, I guess
> that could mean my solution might not work.  It depends a bit on what
> happens during reassignment.
>
> Here's an example: initially Thread A is processing a partition.  It
> commits its read up to offset 50, but keeps reading up till offset 60.
> At that point, the partition is re-assigned to Thread B (which I guess
> can be in a  completely process).  Where will thread B start reading?
> It seems like it definitely can't start reading at 60 b/c then you
> can't guarantee that messages 50-60 ever get processed.  (Not to
> mention that it probably doesn't even know A got to 60.)
>
> But if B starts at 50, then I don't see how each thread committing
> their own offsets is a problem.  We just need some rule that you can't
> move an offset backwards on a commit (at least, not without some force
> option).  Without that check, I suppose you could have B read up to
> 100, commit, and then A commits its read up to 60.  (which wouldn't
> even violate "at least once", but it would certainly be undesirable).
>
> The fact that A & B could also be reading from other partitions at the
> same time doesn't seem like a problem to me, each thread would commit
> their positions in all of their partitions.
>
> what am I overlooking?
>
> thanks again for taking the time to work through this with me, I hope
> this helpful for a broader audience :)
>
> imran
>
> On Fri, Nov 22, 2013 at 2:37 PM, Jason Rosenberg <j...@squareup.com> wrote:
> > With the high-level consumer, the design is that you really don't have
> > programmatic control over which threads will process which partitions
> (and
> > usually multiple per thread).  Furthermore, the high-level consumer can
> > re-balance and redistribute which threads (and which other processes) are
> > consuming which partitions, at any time, etc.
> >
> > So, the notion of tracking specific offsets for commit, which only have
> > meaning per partition, doesn't really scale with this design.
> >
> > Jason
> >
> >
> > On Fri, Nov 22, 2013 at 9:06 AM, Imran Rashid <im...@therashids.com>
> wrote:
> >
> >> hmm, ok, after some more thought I see what you mean.I was thinking
> >> async because of a more complicated use case -- my app actually
> >> doesn't process messages one at a time, it builds a batch of messages,
> >> merges the batches from all threads, and process those batches, and
> >> thats when it shoudl commit.
> >>
> >> But, let's not worry about batches for the moment -- first lets just
> >> make sure we can get behavior correct for an app which processes
> >> messages one at a time.
> >>
> >> 1) I agree that synchronous makes more sense in that case.  But
> >> wouldn't that also benefit from commitOffsets(offsets)?  that way each
> >> thread could commit just its own offsets, completely avoiding the need
> >> for locks & stopping other workers.  So the code would look like this:
> >>
> >> while(itr.hasNext()) {
> >>   val msg = its.next()
> >>   updateOffsets(msg) //updates some worker state w/ current position
> >> of this worker in all its partitions
> >>   doSomething(msg)
> >>   if ( shouldCommit) {
> >>     consumer.commitOffsets(myOffsets)  //myOffsets would be the set of
> >> offsets for *only* this worker
> >>   }
> >> }
> >>
> >>
> >> (the actual code would be a bit more complicated b/c of handling
> >> TimeoutExceptions in hasNext(), but still lock-free)
> >>
> >> 2) Though async doesn't have any advantages in this case, its not
> >> *incorrect*, is it?  I just want to make sure, for building up to the
> >> batch case.  So instead of directly committing its offsets, the worker
> >> would just put its offsets in a queue and keep going.  (You can
> >> probably see where this is going for batches ...)
> >>
> >>
> >> thanks a lot, this discussion has been fascinating!
> >>
> >> Imran
> >>
> >>
> >> On Fri, Nov 22, 2013 at 1:06 AM, Jason Rosenberg <j...@squareup.com>
> wrote:
> >> > Again,
> >> >
> >> > Remember that if you have multiple threads processing, then it means
> they
> >> > are consuming from different partitions, so the offset of one
> partition
> >> > will not be relevant to another.  So, you really wouldn't have a use
> case
> >> > to be committing specific offsets async.  Furthermore, if your
> consumer
> >> is
> >> > using a filter to select multiple topics, then even a single thread is
> >> > likely going to be consuming multiple topics, so again, a single
> offset
> >> > commit wouldn't make much sense.
> >> >
> >> > Jason
> >> >
> >> >
> >> > On Fri, Nov 22, 2013 at 1:30 AM, Imran Rashid <im...@therashids.com>
> >> wrote:
> >> >
> >> >> Hi Jason,
> >> >>
> >> >> thank you so much!
> >> >>
> >> >> I was missing the consumer.timeout.ms property.   In fact I had
> >> >> actually written a full solution using cyclic barriers to make sure
> >> >> all my readers had stopped consuming ... but threw it out b/c I
> >> >> thought I couldn't deal w/ iterators blocked in hasNext().  That
> >> >> property is just what I was missing, I never would have found it
> w/out
> >> >> your help.
> >> >>
> >> >> Its also great to get some confirmation that this is a a real issue
> --
> >> >> I feel like this should be mentioned in the docs somewhere and
> ideally
> >> >> hidden behind a clean api.
> >> >>
> >> >> I completely agree I can do this w/ the synchronous "shouldCommit"
> >> >> given the current api. But I'm not sure that is the "right" way.  If
> I
> >> >> just had a one-arg commitOffsets(someOffsets) method, then I think
> >> >> there is a way to do this async that is both cleaner and should have
> >> >> higher throughput (no need to have a full barrier that stops all the
> >> >> workers simultaneously).  I've been getting influenced by the akka
> way
> >> >> of thinking.  I think I will be able to share some example code
> >> >> tomorrow, maybe you and others can take a look and see if you think
> >> >> its an improvement or not.
> >> >>
> >> >> thanks again!
> >> >> imran
> >> >>
> >> >>
> >> >> On Thu, Nov 21, 2013 at 9:54 PM, Jason Rosenberg <j...@squareup.com>
> >> wrote:
> >> >> > Hi Imran,
> >> >> >
> >> >> > The thing to do, is not have an asynchronous background thread for
> >> >> > committing.  Instead have a time based "shouldCommit()" function,
> and
> >> >> > commit periodically, synchronously.
> >> >> >
> >> >> > If you have a bursty producer, then you can set a timeout (
> >> >> > consumer.timeout.ms), so that your iter.hasNext() call will
> >> periodically
> >> >> > wake up and throw a ConsumerTimeoutException.  You can catch this
> >> >> exception
> >> >> > silently, and check your "shouldCommit()" predicate.
> >> >> >
> >> >> > If you have multiple threads, you can use cyclic barrier, or some
> >> such,
> >> >> > make sure all threads have paused before doing the synchronous
> commit.
> >> >>  In
> >> >> > this case, you should set your timeout pretty low, say 100ms, so
> that
> >> >> > threads with work to do aren't waiting for quiescent threads to
> wake
> >> up
> >> >> and
> >> >> > timeout already.
> >> >> >
> >> >> > I have a configuration like this.  I use a default commit cycle of
> 60
> >> >> > seconds, and use 100ms timeout on the consumer iterator.
> >> >> >
> >> >> > The auto-commit mode built in has the fatal flaw, as you have
> pointed
> >> >> out,
> >> >> > of possibly dropping a single message on a restart.
> >> >> >
> >> >> > Jason
> >> >> >
> >> >> >
> >> >> > On Thu, Nov 21, 2013 at 9:45 PM, Imran Rashid <
> im...@therashids.com>
> >> >> wrote:
> >> >> >
> >> >> >> HI Benjamin,
> >> >> >>
> >> >> >> great question, I thought that for a while too.  I think that
> won't
> >> >> >> work, in my use case, but I could certainly use somebody else
> >> checking
> >> >> >> my logic.
> >> >> >>
> >> >> >> I think that would work, *if* I was willing to allow arbitrary
> time
> >> to
> >> >> >> pass between my commits.  Then I could call commitOffsets within
> the
> >> >> >> while loop like so:
> >> >> >> while(itr.hasNext()) {
> >> >> >>   val msg = itr.next()
> >> >> >>   doSomething(msg)
> >> >> >>   if(shouldICommit)  {
> >> >> >>     consumer.commitOffsets
> >> >> >>   }
> >> >> >> }
> >> >> >>
> >> >> >> (this is what you meant, right?)
> >> >> >>
> >> >> >> however, its.hasNext() might block an arbitrary amount of time.
>  The
> >> >> >> producers for my queues are very bursty -- they will create a ton
> of
> >> >> >> messages, and then go silent for a long time (perhaps days).
>  Since I
> >> >> >> do want to ensure commits are happening regularly, that means its
> got
> >> >> >> to be triggered by another thread (something that can't get
> blocked
> >> at
> >> >> >> hasNext()).  Which is why I've got make sure that trigger doesn't
> >> >> >> happen between next() and doSomething().
> >> >> >>
> >> >> >> a second point (much more minor) is that calling
> >> >> >> consumer.commitOffsets commits the offsets for all partitions that
> >> >> >> consumer is reading.  Which means I've got to create a separate
> >> >> >> consumer per thread.  (But this is really minor, just a nuisance,
> >> >> >> doesn't effect correctness.)
> >> >> >>
> >> >> >> You might say that even my first point is just a nuisance, and I
> just
> >> >> >> need to live with it.  Then (1) I'd just like to make sure my
> >> >> >> understanding is correct -- I think this behavior is really not
> >> >> >> obvious, so its worth clarifying and (2) I'd like to confirm that
> I
> >> >> >> have a workaround and (3) if this problem does exist, and the
> >> >> >> workaround is tricky, then maybe the api change I'm proposing
> would
> >> be
> >> >> >> good idea?
> >> >> >>
> >> >> >> thanks for reading about this and helping me work through it!
> >> >> >>
> >> >> >> imran
> >> >> >>
> >> >> >>
> >> >> >> On Thu, Nov 21, 2013 at 7:58 PM, Benjamin Black <b...@b3k.us> wrote:
> >> >> >> > why not just disable autocommit and only call commit offsets()
> >> after
> >> >> >> you've
> >> >> >> > processed a batch? it isn't obvious to me how doing so would
> allow
> >> a
> >> >> >> > message to be processed zero times.
> >> >> >> > On Nov 21, 2013 5:52 PM, "Imran Rashid" <im...@therashids.com>
> >> wrote:
> >> >> >> >
> >> >> >> >> Hi Edward,
> >> >> >> >>
> >> >> >> >> I think you misunderstand ... I definitely do *not* want to
> commit
> >> >> >> >> every message.  That would be much too expensive, every few
> >> minutes
> >> >> is
> >> >> >> >> plenty for me.
> >> >> >> >>
> >> >> >> >> I want to guarantee that if I commit a message, that my message
> >> has
> >> >> >> >> been processed by *my application* at least once.  (In another
> >> thread
> >> >> >> >> I mentioned something about reading exactly once, but that is
> not
> >> >> what
> >> >> >> >> I am going for here -- more than once is OK, as long as its at
> >> least
> >> >> >> >> once.)  That requires me to make sure that a commit never
> happens
> >> >> >> >> between a call to next() and doSomething().  Kafka guarantees
> that
> >> >> >> >> messages get read off the queue at least once, but as I
> outlined
> >> >> >> >> above, a naive solution allows for messages to get read off the
> >> >> queue,
> >> >> >> >> but not make it into my app.
> >> >> >> >>
> >> >> >> >> Not only do I not want to commit every message -- I'd really
> like
> >> to
> >> >> >> >> have the above guarantee without even acquiring a lock (most of
> >> the
> >> >> >> >> time).  That is what I was getting at with the comment about
> the
> >> >> >> >> spin-lock, just as an idea of how to prevent a commit between
> >> next()
> >> >> >> >> and doSomething().  I dunno if the spin-lock was really the
> right
> >> >> >> >> idea, that was just a random thought, but the point is, we want
> >> some
> >> >> >> >> check that should be very cheap for 99.999% of the time when a
> >> commit
> >> >> >> >> isn't happening, but can still guarantee proper ordering during
> >> that
> >> >> >> >> 0.001% of the time when a commit is happening.  otherwise,
> >> messages
> >> >> >> >> might never make it to my app.  (And this is all just to
> prevent a
> >> >> >> >> message getting lost during the 10^-6% of the time when a
> commit
> >> >> might
> >> >> >> >> happen between next() and doSomething(), and the app dies
> before
> >> >> >> >> doSomething completes!)
> >> >> >> >>
> >> >> >> >> Actually after a bit more thought -- the best way to guarantee
> >> this
> >> >> >> >> would be with a small api change, that would do away with the
> need
> >> >> for
> >> >> >> >> locks completely.  The iterator should have a method
> applyToNext:
> >> >> >> >>
> >> >> >> >> def applyToNext(f: MessageAndData[K,V] => Unit) {
> >> >> >> >>   //all the stuff in next, *except*
> >> >> >> >> currentTopicInfo.resetConsumeOffset(consumedOffset)
> >> >> >> >>   val item = ...
> >> >> >> >>   f(item)
> >> >> >> >>   //after we've applied the users function, now we can update
> the
> >> >> >> >> offset that should get committed
> >> >> >> >>   currentTopicInfo.resetConsumeOffset(consumedOffset)
> >> >> >> >> }
> >> >> >> >>
> >> >> >> >> I think this way, you could avoid any problems even with
> >> auto-commit.
> >> >> >> >> Or, if you don't want to add a new method, so we can stick to
> the
> >> >> >> >> Iterator api, then maybe the iterator could let you register a
> >> >> >> >> preCommitFunction, so next() would change to:
> >> >> >> >>
> >> >> >> >> override def next(): MessageAndMetadata[K, V] = {
> >> >> >> >>     ...
> >> >> >> >>     val item = ...
> >> >> >> >>     preCommitFuntions.foreach{f => f(item)}
> >> >> >> >>     *currentTopicInfo.resetConsumeOffset(consumedOffset)*
> >> >> >> >>     ...
> >> >> >> >>     item
> >> >> >> >>   }
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> thanks,
> >> >> >> >> Imran
> >> >> >> >>
> >> >> >> >> On Thu, Nov 21, 2013 at 6:55 PM, Edward Capriolo <
> >> >> edlinuxg...@gmail.com
> >> >> >> >
> >> >> >> >> wrote:
> >> >> >> >> > You likely need to use a custom offset solution if you plan
> on
> >> >> >> committing
> >> >> >> >> > every message. With many partitions this puts a large burden
> on
> >> >> >> >> Zookeeper,
> >> >> >> >> > you end up needing to roll over your zk transaction logs
> fast as
> >> >> well
> >> >> >> or
> >> >> >> >> > rist filling up disk
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> > On Thu, Nov 21, 2013 at 6:20 PM, Guozhang Wang <
> >> wangg...@gmail.com
> >> >> >
> >> >> >> >> wrote:
> >> >> >> >> >
> >> >> >> >> >> Hello Imran,
> >> >> >> >> >>
> >> >> >> >> >> The offset will only be updated when the next() function is
> >> >> called:
> >> >> >> >> >>
> >> >> >> >> >> override def next(): MessageAndMetadata[K, V] = {
> >> >> >> >> >>     ...
> >> >> >> >> >>     *currentTopicInfo.resetConsumeOffset(consumedOffset)*
> >> >> >> >> >>     ...
> >> >> >> >> >>     item
> >> >> >> >> >>   }
> >> >> >> >> >>
> >> >> >> >> >> instead of in makeNext(), which will just update
> >> consumedOffset,
> >> >> but
> >> >> >> >> that
> >> >> >> >> >> is not the value that will be committed using the
> commitOffset
> >> >> call.
> >> >> >> So
> >> >> >> >> as
> >> >> >> >> >> long as you turn of auto commit and only call commitOffset
> >> after
> >> >> the
> >> >> >> >> >> process(msg) call, not after the
> >> >> >> >> >>
> >> >> >> >> >> b = iter.next()
> >> >> >> >> >>
> >> >> >> >> >> is called, at-least-once is guaranteed.
> >> >> >> >> >>
> >> >> >> >> >> Does that make sense?
> >> >> >> >> >>
> >> >> >> >> >> Guozhang
> >> >> >> >> >>
> >> >> >> >> >>
> >> >> >> >> >> On Thu, Nov 21, 2013 at 2:14 PM, Imran Rashid <
> >> >> im...@therashids.com>
> >> >> >> >> >> wrote:
> >> >> >> >> >>
> >> >> >> >> >> > sorry to keep bugging the list, but I feel like I am
> either
> >> >> missing
> >> >> >> >> >> > something important, or I'm finding something wrong w/ the
> >> >> standard
> >> >> >> >> >> > consumer api, (or maybe just the docs need some
> >> clarification).
> >> >> >> >> >> >
> >> >> >> >> >> > I started to think that I should probably just accept at
> >> least
> >> >> once
> >> >> >> >> >> > semantics ... but I eventually realized that I'm not even
> >> sure
> >> >> we
> >> >> >> >> >> > really get an at least once guarantee.  I think it really
> >> might
> >> >> be
> >> >> >> >> >> > zero-or-more.  Or rather, messages will get pulled off the
> >> kafka
> >> >> >> queue
> >> >> >> >> >> > at least once.  but that doesn't mean your app will
> actually
> >> >> >> *process*
> >> >> >> >> >> > those messages at least once -- there might be messages it
> >> never
> >> >> >> >> >> > processes.
> >> >> >> >> >> >
> >> >> >> >> >> > Consider a really basic reader of a kafka queue:
> >> >> >> >> >> >
> >> >> >> >> >> > while(it.hasNext()){
> >> >> >> >> >> >   val msg = it.next()
> >> >> >> >> >> >   doSomething(msg)
> >> >> >> >> >> > }
> >> >> >> >> >> >
> >> >> >> >> >> > the question is, do I have any guarantees on how many
> times
> >> >> >> >> >> > doSomething() is called on everything in the queue?  I
> think
> >> the
> >> >> >> >> >> > "guarantee" is:
> >> >> >> >> >> > 1) most messages will get processed excatly once
> >> >> >> >> >> > 2) around a restart, a chunk of msgs will get processed at
> >> least
> >> >> >> once,
> >> >> >> >> >> > but probably more than once
> >> >> >> >> >> > 3) around a restart, it is possible that one message will
> get
> >> >> >> >> >> > processed ZERO times
> >> >> >> >> >> >
> >> >> >> >> >> > (1) & (2) are probably clear, so lemme explain how I think
> >> (3)
> >> >> >> could
> >> >> >> >> >> > happen.  Lets imagine messages a,b,c,... and two threads,
> one
> >> >> >> reading
> >> >> >> >> >> > from the stream, and one thread that periodically commits
> the
> >> >> >> offsets.
> >> >> >> >> >> >  Imagine this sequence of events:
> >> >> >> >> >> >
> >> >> >> >> >> >
> >> >> >> >> >> > ==Reader==
> >> >> >> >> >> > -initializes w/ offset pointing to "a"
> >> >> >> >> >> >
> >> >> >> >> >> > -hasNext()
> >> >> >> >> >> >   ---> makeNext() will read "a"
> >> >> >> >> >> >         and update the local offset to "b"
> >> >> >> >> >> >
> >> >> >> >> >> > -msg = "a"
> >> >> >> >> >> >
> >> >> >> >> >> > -doSomething("a")
> >> >> >> >> >> >
> >> >> >> >> >> > -hasNext()
> >> >> >> >> >> >    ----> makeNext() will read "b"
> >> >> >> >> >> >          and  update the local offset "c"
> >> >> >> >> >> >
> >> >> >> >> >> > ==Commiter==
> >> >> >> >> >> >
> >> >> >> >> >> > -commitOffsets stores the current offset as "c"
> >> >> >> >> >> >
> >> >> >> >> >> >
> >> >> >> >> >> >
> >> >> >> >> >> >       =====PROCESS DIES=====
> >> >> >> >> >> >       =====  RESTARTS  =====
> >> >> >> >> >> >
> >> >> >> >> >> > ==Reader==
> >> >> >> >> >> > -initializes w/ offset pointing to "c"
> >> >> >> >> >> >
> >> >> >> >> >> > -hasNext()
> >> >> >> >> >> >    --> makeNext() will read "c"
> >> >> >> >> >> >        and update local offset to "d"
> >> >> >> >> >> > -msg = "c"
> >> >> >> >> >> > -doSomething("c")
> >> >> >> >> >> > ...
> >> >> >> >> >> >
> >> >> >> >> >> >
> >> >> >> >> >> >
> >> >> >> >> >> > note that in this scenario, doSomething("b") was never
> >> called.
> >> >> >> >> >> > Probably for a lot of applications this doesn't matter.
>  But
> >> >> seems
> >> >> >> >> >> > like it this could be terrible for some apps.  I can't
> think
> >> of
> >> >> any
> >> >> >> >> >> > way of preventing it from user code.  unless, maybe when
> the
> >> >> >> offsets
> >> >> >> >> >> > get committed, it is always *before* the last thing read?
> >>  eg.,
> >> >> in
> >> >> >> my
> >> >> >> >> >> > example, it would store the next offset as "b" or earlier?
> >> >> >> >> >> >
> >> >> >> >> >> > Is there a flaw in my logic?  Do committed offsets always
> >> >> >> "undershoot"
> >> >> >> >> >> > to prevent this?
> >> >> >> >> >> >
> >> >> >> >> >> > thanks,
> >> >> >> >> >> > Imran
> >> >> >> >> >> >
> >> >> >> >> >>
> >> >> >> >> >>
> >> >> >> >> >>
> >> >> >> >> >> --
> >> >> >> >> >> -- Guozhang
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
>

Reply via email to