I don't think I need control over which partitions are processed by a
thread -- however, I do need the partition --> thread assignment to be
consistent.  I didn't realize that assignment could change, I guess
that could mean my solution might not work.  It depends a bit on what
happens during reassignment.

Here's an example: initially Thread A is processing a partition.  It
commits its read up to offset 50, but keeps reading up till offset 60.
At that point, the partition is re-assigned to Thread B (which I guess
can be in a  completely process).  Where will thread B start reading?
It seems like it definitely can't start reading at 60 b/c then you
can't guarantee that messages 50-60 ever get processed.  (Not to
mention that it probably doesn't even know A got to 60.)

But if B starts at 50, then I don't see how each thread committing
their own offsets is a problem.  We just need some rule that you can't
move an offset backwards on a commit (at least, not without some force
option).  Without that check, I suppose you could have B read up to
100, commit, and then A commits its read up to 60.  (which wouldn't
even violate "at least once", but it would certainly be undesirable).

The fact that A & B could also be reading from other partitions at the
same time doesn't seem like a problem to me, each thread would commit
their positions in all of their partitions.

what am I overlooking?

thanks again for taking the time to work through this with me, I hope
this helpful for a broader audience :)

imran

On Fri, Nov 22, 2013 at 2:37 PM, Jason Rosenberg <j...@squareup.com> wrote:
> With the high-level consumer, the design is that you really don't have
> programmatic control over which threads will process which partitions (and
> usually multiple per thread).  Furthermore, the high-level consumer can
> re-balance and redistribute which threads (and which other processes) are
> consuming which partitions, at any time, etc.
>
> So, the notion of tracking specific offsets for commit, which only have
> meaning per partition, doesn't really scale with this design.
>
> Jason
>
>
> On Fri, Nov 22, 2013 at 9:06 AM, Imran Rashid <im...@therashids.com> wrote:
>
>> hmm, ok, after some more thought I see what you mean.I was thinking
>> async because of a more complicated use case -- my app actually
>> doesn't process messages one at a time, it builds a batch of messages,
>> merges the batches from all threads, and process those batches, and
>> thats when it shoudl commit.
>>
>> But, let's not worry about batches for the moment -- first lets just
>> make sure we can get behavior correct for an app which processes
>> messages one at a time.
>>
>> 1) I agree that synchronous makes more sense in that case.  But
>> wouldn't that also benefit from commitOffsets(offsets)?  that way each
>> thread could commit just its own offsets, completely avoiding the need
>> for locks & stopping other workers.  So the code would look like this:
>>
>> while(itr.hasNext()) {
>>   val msg = its.next()
>>   updateOffsets(msg) //updates some worker state w/ current position
>> of this worker in all its partitions
>>   doSomething(msg)
>>   if ( shouldCommit) {
>>     consumer.commitOffsets(myOffsets)  //myOffsets would be the set of
>> offsets for *only* this worker
>>   }
>> }
>>
>>
>> (the actual code would be a bit more complicated b/c of handling
>> TimeoutExceptions in hasNext(), but still lock-free)
>>
>> 2) Though async doesn't have any advantages in this case, its not
>> *incorrect*, is it?  I just want to make sure, for building up to the
>> batch case.  So instead of directly committing its offsets, the worker
>> would just put its offsets in a queue and keep going.  (You can
>> probably see where this is going for batches ...)
>>
>>
>> thanks a lot, this discussion has been fascinating!
>>
>> Imran
>>
>>
>> On Fri, Nov 22, 2013 at 1:06 AM, Jason Rosenberg <j...@squareup.com> wrote:
>> > Again,
>> >
>> > Remember that if you have multiple threads processing, then it means they
>> > are consuming from different partitions, so the offset of one partition
>> > will not be relevant to another.  So, you really wouldn't have a use case
>> > to be committing specific offsets async.  Furthermore, if your consumer
>> is
>> > using a filter to select multiple topics, then even a single thread is
>> > likely going to be consuming multiple topics, so again, a single offset
>> > commit wouldn't make much sense.
>> >
>> > Jason
>> >
>> >
>> > On Fri, Nov 22, 2013 at 1:30 AM, Imran Rashid <im...@therashids.com>
>> wrote:
>> >
>> >> Hi Jason,
>> >>
>> >> thank you so much!
>> >>
>> >> I was missing the consumer.timeout.ms property.   In fact I had
>> >> actually written a full solution using cyclic barriers to make sure
>> >> all my readers had stopped consuming ... but threw it out b/c I
>> >> thought I couldn't deal w/ iterators blocked in hasNext().  That
>> >> property is just what I was missing, I never would have found it w/out
>> >> your help.
>> >>
>> >> Its also great to get some confirmation that this is a a real issue --
>> >> I feel like this should be mentioned in the docs somewhere and ideally
>> >> hidden behind a clean api.
>> >>
>> >> I completely agree I can do this w/ the synchronous "shouldCommit"
>> >> given the current api. But I'm not sure that is the "right" way.  If I
>> >> just had a one-arg commitOffsets(someOffsets) method, then I think
>> >> there is a way to do this async that is both cleaner and should have
>> >> higher throughput (no need to have a full barrier that stops all the
>> >> workers simultaneously).  I've been getting influenced by the akka way
>> >> of thinking.  I think I will be able to share some example code
>> >> tomorrow, maybe you and others can take a look and see if you think
>> >> its an improvement or not.
>> >>
>> >> thanks again!
>> >> imran
>> >>
>> >>
>> >> On Thu, Nov 21, 2013 at 9:54 PM, Jason Rosenberg <j...@squareup.com>
>> wrote:
>> >> > Hi Imran,
>> >> >
>> >> > The thing to do, is not have an asynchronous background thread for
>> >> > committing.  Instead have a time based "shouldCommit()" function, and
>> >> > commit periodically, synchronously.
>> >> >
>> >> > If you have a bursty producer, then you can set a timeout (
>> >> > consumer.timeout.ms), so that your iter.hasNext() call will
>> periodically
>> >> > wake up and throw a ConsumerTimeoutException.  You can catch this
>> >> exception
>> >> > silently, and check your "shouldCommit()" predicate.
>> >> >
>> >> > If you have multiple threads, you can use cyclic barrier, or some
>> such,
>> >> > make sure all threads have paused before doing the synchronous commit.
>> >>  In
>> >> > this case, you should set your timeout pretty low, say 100ms, so that
>> >> > threads with work to do aren't waiting for quiescent threads to wake
>> up
>> >> and
>> >> > timeout already.
>> >> >
>> >> > I have a configuration like this.  I use a default commit cycle of 60
>> >> > seconds, and use 100ms timeout on the consumer iterator.
>> >> >
>> >> > The auto-commit mode built in has the fatal flaw, as you have pointed
>> >> out,
>> >> > of possibly dropping a single message on a restart.
>> >> >
>> >> > Jason
>> >> >
>> >> >
>> >> > On Thu, Nov 21, 2013 at 9:45 PM, Imran Rashid <im...@therashids.com>
>> >> wrote:
>> >> >
>> >> >> HI Benjamin,
>> >> >>
>> >> >> great question, I thought that for a while too.  I think that won't
>> >> >> work, in my use case, but I could certainly use somebody else
>> checking
>> >> >> my logic.
>> >> >>
>> >> >> I think that would work, *if* I was willing to allow arbitrary time
>> to
>> >> >> pass between my commits.  Then I could call commitOffsets within the
>> >> >> while loop like so:
>> >> >> while(itr.hasNext()) {
>> >> >>   val msg = itr.next()
>> >> >>   doSomething(msg)
>> >> >>   if(shouldICommit)  {
>> >> >>     consumer.commitOffsets
>> >> >>   }
>> >> >> }
>> >> >>
>> >> >> (this is what you meant, right?)
>> >> >>
>> >> >> however, its.hasNext() might block an arbitrary amount of time.  The
>> >> >> producers for my queues are very bursty -- they will create a ton of
>> >> >> messages, and then go silent for a long time (perhaps days).  Since I
>> >> >> do want to ensure commits are happening regularly, that means its got
>> >> >> to be triggered by another thread (something that can't get blocked
>> at
>> >> >> hasNext()).  Which is why I've got make sure that trigger doesn't
>> >> >> happen between next() and doSomething().
>> >> >>
>> >> >> a second point (much more minor) is that calling
>> >> >> consumer.commitOffsets commits the offsets for all partitions that
>> >> >> consumer is reading.  Which means I've got to create a separate
>> >> >> consumer per thread.  (But this is really minor, just a nuisance,
>> >> >> doesn't effect correctness.)
>> >> >>
>> >> >> You might say that even my first point is just a nuisance, and I just
>> >> >> need to live with it.  Then (1) I'd just like to make sure my
>> >> >> understanding is correct -- I think this behavior is really not
>> >> >> obvious, so its worth clarifying and (2) I'd like to confirm that I
>> >> >> have a workaround and (3) if this problem does exist, and the
>> >> >> workaround is tricky, then maybe the api change I'm proposing would
>> be
>> >> >> good idea?
>> >> >>
>> >> >> thanks for reading about this and helping me work through it!
>> >> >>
>> >> >> imran
>> >> >>
>> >> >>
>> >> >> On Thu, Nov 21, 2013 at 7:58 PM, Benjamin Black <b...@b3k.us> wrote:
>> >> >> > why not just disable autocommit and only call commit offsets()
>> after
>> >> >> you've
>> >> >> > processed a batch? it isn't obvious to me how doing so would allow
>> a
>> >> >> > message to be processed zero times.
>> >> >> > On Nov 21, 2013 5:52 PM, "Imran Rashid" <im...@therashids.com>
>> wrote:
>> >> >> >
>> >> >> >> Hi Edward,
>> >> >> >>
>> >> >> >> I think you misunderstand ... I definitely do *not* want to commit
>> >> >> >> every message.  That would be much too expensive, every few
>> minutes
>> >> is
>> >> >> >> plenty for me.
>> >> >> >>
>> >> >> >> I want to guarantee that if I commit a message, that my message
>> has
>> >> >> >> been processed by *my application* at least once.  (In another
>> thread
>> >> >> >> I mentioned something about reading exactly once, but that is not
>> >> what
>> >> >> >> I am going for here -- more than once is OK, as long as its at
>> least
>> >> >> >> once.)  That requires me to make sure that a commit never happens
>> >> >> >> between a call to next() and doSomething().  Kafka guarantees that
>> >> >> >> messages get read off the queue at least once, but as I outlined
>> >> >> >> above, a naive solution allows for messages to get read off the
>> >> queue,
>> >> >> >> but not make it into my app.
>> >> >> >>
>> >> >> >> Not only do I not want to commit every message -- I'd really like
>> to
>> >> >> >> have the above guarantee without even acquiring a lock (most of
>> the
>> >> >> >> time).  That is what I was getting at with the comment about the
>> >> >> >> spin-lock, just as an idea of how to prevent a commit between
>> next()
>> >> >> >> and doSomething().  I dunno if the spin-lock was really the right
>> >> >> >> idea, that was just a random thought, but the point is, we want
>> some
>> >> >> >> check that should be very cheap for 99.999% of the time when a
>> commit
>> >> >> >> isn't happening, but can still guarantee proper ordering during
>> that
>> >> >> >> 0.001% of the time when a commit is happening.  otherwise,
>> messages
>> >> >> >> might never make it to my app.  (And this is all just to prevent a
>> >> >> >> message getting lost during the 10^-6% of the time when a commit
>> >> might
>> >> >> >> happen between next() and doSomething(), and the app dies before
>> >> >> >> doSomething completes!)
>> >> >> >>
>> >> >> >> Actually after a bit more thought -- the best way to guarantee
>> this
>> >> >> >> would be with a small api change, that would do away with the need
>> >> for
>> >> >> >> locks completely.  The iterator should have a method applyToNext:
>> >> >> >>
>> >> >> >> def applyToNext(f: MessageAndData[K,V] => Unit) {
>> >> >> >>   //all the stuff in next, *except*
>> >> >> >> currentTopicInfo.resetConsumeOffset(consumedOffset)
>> >> >> >>   val item = ...
>> >> >> >>   f(item)
>> >> >> >>   //after we've applied the users function, now we can update the
>> >> >> >> offset that should get committed
>> >> >> >>   currentTopicInfo.resetConsumeOffset(consumedOffset)
>> >> >> >> }
>> >> >> >>
>> >> >> >> I think this way, you could avoid any problems even with
>> auto-commit.
>> >> >> >> Or, if you don't want to add a new method, so we can stick to the
>> >> >> >> Iterator api, then maybe the iterator could let you register a
>> >> >> >> preCommitFunction, so next() would change to:
>> >> >> >>
>> >> >> >> override def next(): MessageAndMetadata[K, V] = {
>> >> >> >>     ...
>> >> >> >>     val item = ...
>> >> >> >>     preCommitFuntions.foreach{f => f(item)}
>> >> >> >>     *currentTopicInfo.resetConsumeOffset(consumedOffset)*
>> >> >> >>     ...
>> >> >> >>     item
>> >> >> >>   }
>> >> >> >>
>> >> >> >>
>> >> >> >> thanks,
>> >> >> >> Imran
>> >> >> >>
>> >> >> >> On Thu, Nov 21, 2013 at 6:55 PM, Edward Capriolo <
>> >> edlinuxg...@gmail.com
>> >> >> >
>> >> >> >> wrote:
>> >> >> >> > You likely need to use a custom offset solution if you plan on
>> >> >> committing
>> >> >> >> > every message. With many partitions this puts a large burden on
>> >> >> >> Zookeeper,
>> >> >> >> > you end up needing to roll over your zk transaction logs fast as
>> >> well
>> >> >> or
>> >> >> >> > rist filling up disk
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > On Thu, Nov 21, 2013 at 6:20 PM, Guozhang Wang <
>> wangg...@gmail.com
>> >> >
>> >> >> >> wrote:
>> >> >> >> >
>> >> >> >> >> Hello Imran,
>> >> >> >> >>
>> >> >> >> >> The offset will only be updated when the next() function is
>> >> called:
>> >> >> >> >>
>> >> >> >> >> override def next(): MessageAndMetadata[K, V] = {
>> >> >> >> >>     ...
>> >> >> >> >>     *currentTopicInfo.resetConsumeOffset(consumedOffset)*
>> >> >> >> >>     ...
>> >> >> >> >>     item
>> >> >> >> >>   }
>> >> >> >> >>
>> >> >> >> >> instead of in makeNext(), which will just update
>> consumedOffset,
>> >> but
>> >> >> >> that
>> >> >> >> >> is not the value that will be committed using the commitOffset
>> >> call.
>> >> >> So
>> >> >> >> as
>> >> >> >> >> long as you turn of auto commit and only call commitOffset
>> after
>> >> the
>> >> >> >> >> process(msg) call, not after the
>> >> >> >> >>
>> >> >> >> >> b = iter.next()
>> >> >> >> >>
>> >> >> >> >> is called, at-least-once is guaranteed.
>> >> >> >> >>
>> >> >> >> >> Does that make sense?
>> >> >> >> >>
>> >> >> >> >> Guozhang
>> >> >> >> >>
>> >> >> >> >>
>> >> >> >> >> On Thu, Nov 21, 2013 at 2:14 PM, Imran Rashid <
>> >> im...@therashids.com>
>> >> >> >> >> wrote:
>> >> >> >> >>
>> >> >> >> >> > sorry to keep bugging the list, but I feel like I am either
>> >> missing
>> >> >> >> >> > something important, or I'm finding something wrong w/ the
>> >> standard
>> >> >> >> >> > consumer api, (or maybe just the docs need some
>> clarification).
>> >> >> >> >> >
>> >> >> >> >> > I started to think that I should probably just accept at
>> least
>> >> once
>> >> >> >> >> > semantics ... but I eventually realized that I'm not even
>> sure
>> >> we
>> >> >> >> >> > really get an at least once guarantee.  I think it really
>> might
>> >> be
>> >> >> >> >> > zero-or-more.  Or rather, messages will get pulled off the
>> kafka
>> >> >> queue
>> >> >> >> >> > at least once.  but that doesn't mean your app will actually
>> >> >> *process*
>> >> >> >> >> > those messages at least once -- there might be messages it
>> never
>> >> >> >> >> > processes.
>> >> >> >> >> >
>> >> >> >> >> > Consider a really basic reader of a kafka queue:
>> >> >> >> >> >
>> >> >> >> >> > while(it.hasNext()){
>> >> >> >> >> >   val msg = it.next()
>> >> >> >> >> >   doSomething(msg)
>> >> >> >> >> > }
>> >> >> >> >> >
>> >> >> >> >> > the question is, do I have any guarantees on how many times
>> >> >> >> >> > doSomething() is called on everything in the queue?  I think
>> the
>> >> >> >> >> > "guarantee" is:
>> >> >> >> >> > 1) most messages will get processed excatly once
>> >> >> >> >> > 2) around a restart, a chunk of msgs will get processed at
>> least
>> >> >> once,
>> >> >> >> >> > but probably more than once
>> >> >> >> >> > 3) around a restart, it is possible that one message will get
>> >> >> >> >> > processed ZERO times
>> >> >> >> >> >
>> >> >> >> >> > (1) & (2) are probably clear, so lemme explain how I think
>> (3)
>> >> >> could
>> >> >> >> >> > happen.  Lets imagine messages a,b,c,... and two threads, one
>> >> >> reading
>> >> >> >> >> > from the stream, and one thread that periodically commits the
>> >> >> offsets.
>> >> >> >> >> >  Imagine this sequence of events:
>> >> >> >> >> >
>> >> >> >> >> >
>> >> >> >> >> > ==Reader==
>> >> >> >> >> > -initializes w/ offset pointing to "a"
>> >> >> >> >> >
>> >> >> >> >> > -hasNext()
>> >> >> >> >> >   ---> makeNext() will read "a"
>> >> >> >> >> >         and update the local offset to "b"
>> >> >> >> >> >
>> >> >> >> >> > -msg = "a"
>> >> >> >> >> >
>> >> >> >> >> > -doSomething("a")
>> >> >> >> >> >
>> >> >> >> >> > -hasNext()
>> >> >> >> >> >    ----> makeNext() will read "b"
>> >> >> >> >> >          and  update the local offset "c"
>> >> >> >> >> >
>> >> >> >> >> > ==Commiter==
>> >> >> >> >> >
>> >> >> >> >> > -commitOffsets stores the current offset as "c"
>> >> >> >> >> >
>> >> >> >> >> >
>> >> >> >> >> >
>> >> >> >> >> >       =====PROCESS DIES=====
>> >> >> >> >> >       =====  RESTARTS  =====
>> >> >> >> >> >
>> >> >> >> >> > ==Reader==
>> >> >> >> >> > -initializes w/ offset pointing to "c"
>> >> >> >> >> >
>> >> >> >> >> > -hasNext()
>> >> >> >> >> >    --> makeNext() will read "c"
>> >> >> >> >> >        and update local offset to "d"
>> >> >> >> >> > -msg = "c"
>> >> >> >> >> > -doSomething("c")
>> >> >> >> >> > ...
>> >> >> >> >> >
>> >> >> >> >> >
>> >> >> >> >> >
>> >> >> >> >> > note that in this scenario, doSomething("b") was never
>> called.
>> >> >> >> >> > Probably for a lot of applications this doesn't matter.  But
>> >> seems
>> >> >> >> >> > like it this could be terrible for some apps.  I can't think
>> of
>> >> any
>> >> >> >> >> > way of preventing it from user code.  unless, maybe when the
>> >> >> offsets
>> >> >> >> >> > get committed, it is always *before* the last thing read?
>>  eg.,
>> >> in
>> >> >> my
>> >> >> >> >> > example, it would store the next offset as "b" or earlier?
>> >> >> >> >> >
>> >> >> >> >> > Is there a flaw in my logic?  Do committed offsets always
>> >> >> "undershoot"
>> >> >> >> >> > to prevent this?
>> >> >> >> >> >
>> >> >> >> >> > thanks,
>> >> >> >> >> > Imran
>> >> >> >> >> >
>> >> >> >> >>
>> >> >> >> >>
>> >> >> >> >>
>> >> >> >> >> --
>> >> >> >> >> -- Guozhang
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>>

Reply via email to