Hi Jason,

thank you so much!

I was missing the consumer.timeout.ms property.   In fact I had
actually written a full solution using cyclic barriers to make sure
all my readers had stopped consuming ... but threw it out b/c I
thought I couldn't deal w/ iterators blocked in hasNext().  That
property is just what I was missing, I never would have found it w/out
your help.

Its also great to get some confirmation that this is a a real issue --
I feel like this should be mentioned in the docs somewhere and ideally
hidden behind a clean api.

I completely agree I can do this w/ the synchronous "shouldCommit"
given the current api. But I'm not sure that is the "right" way.  If I
just had a one-arg commitOffsets(someOffsets) method, then I think
there is a way to do this async that is both cleaner and should have
higher throughput (no need to have a full barrier that stops all the
workers simultaneously).  I've been getting influenced by the akka way
of thinking.  I think I will be able to share some example code
tomorrow, maybe you and others can take a look and see if you think
its an improvement or not.

thanks again!
imran


On Thu, Nov 21, 2013 at 9:54 PM, Jason Rosenberg <j...@squareup.com> wrote:
> Hi Imran,
>
> The thing to do, is not have an asynchronous background thread for
> committing.  Instead have a time based "shouldCommit()" function, and
> commit periodically, synchronously.
>
> If you have a bursty producer, then you can set a timeout (
> consumer.timeout.ms), so that your iter.hasNext() call will periodically
> wake up and throw a ConsumerTimeoutException.  You can catch this exception
> silently, and check your "shouldCommit()" predicate.
>
> If you have multiple threads, you can use cyclic barrier, or some such,
> make sure all threads have paused before doing the synchronous commit.  In
> this case, you should set your timeout pretty low, say 100ms, so that
> threads with work to do aren't waiting for quiescent threads to wake up and
> timeout already.
>
> I have a configuration like this.  I use a default commit cycle of 60
> seconds, and use 100ms timeout on the consumer iterator.
>
> The auto-commit mode built in has the fatal flaw, as you have pointed out,
> of possibly dropping a single message on a restart.
>
> Jason
>
>
> On Thu, Nov 21, 2013 at 9:45 PM, Imran Rashid <im...@therashids.com> wrote:
>
>> HI Benjamin,
>>
>> great question, I thought that for a while too.  I think that won't
>> work, in my use case, but I could certainly use somebody else checking
>> my logic.
>>
>> I think that would work, *if* I was willing to allow arbitrary time to
>> pass between my commits.  Then I could call commitOffsets within the
>> while loop like so:
>> while(itr.hasNext()) {
>>   val msg = itr.next()
>>   doSomething(msg)
>>   if(shouldICommit)  {
>>     consumer.commitOffsets
>>   }
>> }
>>
>> (this is what you meant, right?)
>>
>> however, its.hasNext() might block an arbitrary amount of time.  The
>> producers for my queues are very bursty -- they will create a ton of
>> messages, and then go silent for a long time (perhaps days).  Since I
>> do want to ensure commits are happening regularly, that means its got
>> to be triggered by another thread (something that can't get blocked at
>> hasNext()).  Which is why I've got make sure that trigger doesn't
>> happen between next() and doSomething().
>>
>> a second point (much more minor) is that calling
>> consumer.commitOffsets commits the offsets for all partitions that
>> consumer is reading.  Which means I've got to create a separate
>> consumer per thread.  (But this is really minor, just a nuisance,
>> doesn't effect correctness.)
>>
>> You might say that even my first point is just a nuisance, and I just
>> need to live with it.  Then (1) I'd just like to make sure my
>> understanding is correct -- I think this behavior is really not
>> obvious, so its worth clarifying and (2) I'd like to confirm that I
>> have a workaround and (3) if this problem does exist, and the
>> workaround is tricky, then maybe the api change I'm proposing would be
>> good idea?
>>
>> thanks for reading about this and helping me work through it!
>>
>> imran
>>
>>
>> On Thu, Nov 21, 2013 at 7:58 PM, Benjamin Black <b...@b3k.us> wrote:
>> > why not just disable autocommit and only call commit offsets() after
>> you've
>> > processed a batch? it isn't obvious to me how doing so would allow a
>> > message to be processed zero times.
>> > On Nov 21, 2013 5:52 PM, "Imran Rashid" <im...@therashids.com> wrote:
>> >
>> >> Hi Edward,
>> >>
>> >> I think you misunderstand ... I definitely do *not* want to commit
>> >> every message.  That would be much too expensive, every few minutes is
>> >> plenty for me.
>> >>
>> >> I want to guarantee that if I commit a message, that my message has
>> >> been processed by *my application* at least once.  (In another thread
>> >> I mentioned something about reading exactly once, but that is not what
>> >> I am going for here -- more than once is OK, as long as its at least
>> >> once.)  That requires me to make sure that a commit never happens
>> >> between a call to next() and doSomething().  Kafka guarantees that
>> >> messages get read off the queue at least once, but as I outlined
>> >> above, a naive solution allows for messages to get read off the queue,
>> >> but not make it into my app.
>> >>
>> >> Not only do I not want to commit every message -- I'd really like to
>> >> have the above guarantee without even acquiring a lock (most of the
>> >> time).  That is what I was getting at with the comment about the
>> >> spin-lock, just as an idea of how to prevent a commit between next()
>> >> and doSomething().  I dunno if the spin-lock was really the right
>> >> idea, that was just a random thought, but the point is, we want some
>> >> check that should be very cheap for 99.999% of the time when a commit
>> >> isn't happening, but can still guarantee proper ordering during that
>> >> 0.001% of the time when a commit is happening.  otherwise, messages
>> >> might never make it to my app.  (And this is all just to prevent a
>> >> message getting lost during the 10^-6% of the time when a commit might
>> >> happen between next() and doSomething(), and the app dies before
>> >> doSomething completes!)
>> >>
>> >> Actually after a bit more thought -- the best way to guarantee this
>> >> would be with a small api change, that would do away with the need for
>> >> locks completely.  The iterator should have a method applyToNext:
>> >>
>> >> def applyToNext(f: MessageAndData[K,V] => Unit) {
>> >>   //all the stuff in next, *except*
>> >> currentTopicInfo.resetConsumeOffset(consumedOffset)
>> >>   val item = ...
>> >>   f(item)
>> >>   //after we've applied the users function, now we can update the
>> >> offset that should get committed
>> >>   currentTopicInfo.resetConsumeOffset(consumedOffset)
>> >> }
>> >>
>> >> I think this way, you could avoid any problems even with auto-commit.
>> >> Or, if you don't want to add a new method, so we can stick to the
>> >> Iterator api, then maybe the iterator could let you register a
>> >> preCommitFunction, so next() would change to:
>> >>
>> >> override def next(): MessageAndMetadata[K, V] = {
>> >>     ...
>> >>     val item = ...
>> >>     preCommitFuntions.foreach{f => f(item)}
>> >>     *currentTopicInfo.resetConsumeOffset(consumedOffset)*
>> >>     ...
>> >>     item
>> >>   }
>> >>
>> >>
>> >> thanks,
>> >> Imran
>> >>
>> >> On Thu, Nov 21, 2013 at 6:55 PM, Edward Capriolo <edlinuxg...@gmail.com
>> >
>> >> wrote:
>> >> > You likely need to use a custom offset solution if you plan on
>> committing
>> >> > every message. With many partitions this puts a large burden on
>> >> Zookeeper,
>> >> > you end up needing to roll over your zk transaction logs fast as well
>> or
>> >> > rist filling up disk
>> >> >
>> >> >
>> >> > On Thu, Nov 21, 2013 at 6:20 PM, Guozhang Wang <wangg...@gmail.com>
>> >> wrote:
>> >> >
>> >> >> Hello Imran,
>> >> >>
>> >> >> The offset will only be updated when the next() function is called:
>> >> >>
>> >> >> override def next(): MessageAndMetadata[K, V] = {
>> >> >>     ...
>> >> >>     *currentTopicInfo.resetConsumeOffset(consumedOffset)*
>> >> >>     ...
>> >> >>     item
>> >> >>   }
>> >> >>
>> >> >> instead of in makeNext(), which will just update consumedOffset, but
>> >> that
>> >> >> is not the value that will be committed using the commitOffset call.
>> So
>> >> as
>> >> >> long as you turn of auto commit and only call commitOffset after the
>> >> >> process(msg) call, not after the
>> >> >>
>> >> >> b = iter.next()
>> >> >>
>> >> >> is called, at-least-once is guaranteed.
>> >> >>
>> >> >> Does that make sense?
>> >> >>
>> >> >> Guozhang
>> >> >>
>> >> >>
>> >> >> On Thu, Nov 21, 2013 at 2:14 PM, Imran Rashid <im...@therashids.com>
>> >> >> wrote:
>> >> >>
>> >> >> > sorry to keep bugging the list, but I feel like I am either missing
>> >> >> > something important, or I'm finding something wrong w/ the standard
>> >> >> > consumer api, (or maybe just the docs need some clarification).
>> >> >> >
>> >> >> > I started to think that I should probably just accept at least once
>> >> >> > semantics ... but I eventually realized that I'm not even sure we
>> >> >> > really get an at least once guarantee.  I think it really might be
>> >> >> > zero-or-more.  Or rather, messages will get pulled off the kafka
>> queue
>> >> >> > at least once.  but that doesn't mean your app will actually
>> *process*
>> >> >> > those messages at least once -- there might be messages it never
>> >> >> > processes.
>> >> >> >
>> >> >> > Consider a really basic reader of a kafka queue:
>> >> >> >
>> >> >> > while(it.hasNext()){
>> >> >> >   val msg = it.next()
>> >> >> >   doSomething(msg)
>> >> >> > }
>> >> >> >
>> >> >> > the question is, do I have any guarantees on how many times
>> >> >> > doSomething() is called on everything in the queue?  I think the
>> >> >> > "guarantee" is:
>> >> >> > 1) most messages will get processed excatly once
>> >> >> > 2) around a restart, a chunk of msgs will get processed at least
>> once,
>> >> >> > but probably more than once
>> >> >> > 3) around a restart, it is possible that one message will get
>> >> >> > processed ZERO times
>> >> >> >
>> >> >> > (1) & (2) are probably clear, so lemme explain how I think (3)
>> could
>> >> >> > happen.  Lets imagine messages a,b,c,... and two threads, one
>> reading
>> >> >> > from the stream, and one thread that periodically commits the
>> offsets.
>> >> >> >  Imagine this sequence of events:
>> >> >> >
>> >> >> >
>> >> >> > ==Reader==
>> >> >> > -initializes w/ offset pointing to "a"
>> >> >> >
>> >> >> > -hasNext()
>> >> >> >   ---> makeNext() will read "a"
>> >> >> >         and update the local offset to "b"
>> >> >> >
>> >> >> > -msg = "a"
>> >> >> >
>> >> >> > -doSomething("a")
>> >> >> >
>> >> >> > -hasNext()
>> >> >> >    ----> makeNext() will read "b"
>> >> >> >          and  update the local offset "c"
>> >> >> >
>> >> >> > ==Commiter==
>> >> >> >
>> >> >> > -commitOffsets stores the current offset as "c"
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >       =====PROCESS DIES=====
>> >> >> >       =====  RESTARTS  =====
>> >> >> >
>> >> >> > ==Reader==
>> >> >> > -initializes w/ offset pointing to "c"
>> >> >> >
>> >> >> > -hasNext()
>> >> >> >    --> makeNext() will read "c"
>> >> >> >        and update local offset to "d"
>> >> >> > -msg = "c"
>> >> >> > -doSomething("c")
>> >> >> > ...
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > note that in this scenario, doSomething("b") was never called.
>> >> >> > Probably for a lot of applications this doesn't matter.  But seems
>> >> >> > like it this could be terrible for some apps.  I can't think of any
>> >> >> > way of preventing it from user code.  unless, maybe when the
>> offsets
>> >> >> > get committed, it is always *before* the last thing read?  eg., in
>> my
>> >> >> > example, it would store the next offset as "b" or earlier?
>> >> >> >
>> >> >> > Is there a flaw in my logic?  Do committed offsets always
>> "undershoot"
>> >> >> > to prevent this?
>> >> >> >
>> >> >> > thanks,
>> >> >> > Imran
>> >> >> >
>> >> >>
>> >> >>
>> >> >>
>> >> >> --
>> >> >> -- Guozhang
>> >> >>
>> >>
>>

Reply via email to