why not just disable autocommit and only call commit offsets() after you've
processed a batch? it isn't obvious to me how doing so would allow a
message to be processed zero times.
On Nov 21, 2013 5:52 PM, "Imran Rashid" <im...@therashids.com> wrote:

> Hi Edward,
>
> I think you misunderstand ... I definitely do *not* want to commit
> every message.  That would be much too expensive, every few minutes is
> plenty for me.
>
> I want to guarantee that if I commit a message, that my message has
> been processed by *my application* at least once.  (In another thread
> I mentioned something about reading exactly once, but that is not what
> I am going for here -- more than once is OK, as long as its at least
> once.)  That requires me to make sure that a commit never happens
> between a call to next() and doSomething().  Kafka guarantees that
> messages get read off the queue at least once, but as I outlined
> above, a naive solution allows for messages to get read off the queue,
> but not make it into my app.
>
> Not only do I not want to commit every message -- I'd really like to
> have the above guarantee without even acquiring a lock (most of the
> time).  That is what I was getting at with the comment about the
> spin-lock, just as an idea of how to prevent a commit between next()
> and doSomething().  I dunno if the spin-lock was really the right
> idea, that was just a random thought, but the point is, we want some
> check that should be very cheap for 99.999% of the time when a commit
> isn't happening, but can still guarantee proper ordering during that
> 0.001% of the time when a commit is happening.  otherwise, messages
> might never make it to my app.  (And this is all just to prevent a
> message getting lost during the 10^-6% of the time when a commit might
> happen between next() and doSomething(), and the app dies before
> doSomething completes!)
>
> Actually after a bit more thought -- the best way to guarantee this
> would be with a small api change, that would do away with the need for
> locks completely.  The iterator should have a method applyToNext:
>
> def applyToNext(f: MessageAndData[K,V] => Unit) {
>   //all the stuff in next, *except*
> currentTopicInfo.resetConsumeOffset(consumedOffset)
>   val item = ...
>   f(item)
>   //after we've applied the users function, now we can update the
> offset that should get committed
>   currentTopicInfo.resetConsumeOffset(consumedOffset)
> }
>
> I think this way, you could avoid any problems even with auto-commit.
> Or, if you don't want to add a new method, so we can stick to the
> Iterator api, then maybe the iterator could let you register a
> preCommitFunction, so next() would change to:
>
> override def next(): MessageAndMetadata[K, V] = {
>     ...
>     val item = ...
>     preCommitFuntions.foreach{f => f(item)}
>     *currentTopicInfo.resetConsumeOffset(consumedOffset)*
>     ...
>     item
>   }
>
>
> thanks,
> Imran
>
> On Thu, Nov 21, 2013 at 6:55 PM, Edward Capriolo <edlinuxg...@gmail.com>
> wrote:
> > You likely need to use a custom offset solution if you plan on committing
> > every message. With many partitions this puts a large burden on
> Zookeeper,
> > you end up needing to roll over your zk transaction logs fast as well or
> > rist filling up disk
> >
> >
> > On Thu, Nov 21, 2013 at 6:20 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> Hello Imran,
> >>
> >> The offset will only be updated when the next() function is called:
> >>
> >> override def next(): MessageAndMetadata[K, V] = {
> >>     ...
> >>     *currentTopicInfo.resetConsumeOffset(consumedOffset)*
> >>     ...
> >>     item
> >>   }
> >>
> >> instead of in makeNext(), which will just update consumedOffset, but
> that
> >> is not the value that will be committed using the commitOffset call. So
> as
> >> long as you turn of auto commit and only call commitOffset after the
> >> process(msg) call, not after the
> >>
> >> b = iter.next()
> >>
> >> is called, at-least-once is guaranteed.
> >>
> >> Does that make sense?
> >>
> >> Guozhang
> >>
> >>
> >> On Thu, Nov 21, 2013 at 2:14 PM, Imran Rashid <im...@therashids.com>
> >> wrote:
> >>
> >> > sorry to keep bugging the list, but I feel like I am either missing
> >> > something important, or I'm finding something wrong w/ the standard
> >> > consumer api, (or maybe just the docs need some clarification).
> >> >
> >> > I started to think that I should probably just accept at least once
> >> > semantics ... but I eventually realized that I'm not even sure we
> >> > really get an at least once guarantee.  I think it really might be
> >> > zero-or-more.  Or rather, messages will get pulled off the kafka queue
> >> > at least once.  but that doesn't mean your app will actually *process*
> >> > those messages at least once -- there might be messages it never
> >> > processes.
> >> >
> >> > Consider a really basic reader of a kafka queue:
> >> >
> >> > while(it.hasNext()){
> >> >   val msg = it.next()
> >> >   doSomething(msg)
> >> > }
> >> >
> >> > the question is, do I have any guarantees on how many times
> >> > doSomething() is called on everything in the queue?  I think the
> >> > "guarantee" is:
> >> > 1) most messages will get processed excatly once
> >> > 2) around a restart, a chunk of msgs will get processed at least once,
> >> > but probably more than once
> >> > 3) around a restart, it is possible that one message will get
> >> > processed ZERO times
> >> >
> >> > (1) & (2) are probably clear, so lemme explain how I think (3) could
> >> > happen.  Lets imagine messages a,b,c,... and two threads, one reading
> >> > from the stream, and one thread that periodically commits the offsets.
> >> >  Imagine this sequence of events:
> >> >
> >> >
> >> > ==Reader==
> >> > -initializes w/ offset pointing to "a"
> >> >
> >> > -hasNext()
> >> >   ---> makeNext() will read "a"
> >> >         and update the local offset to "b"
> >> >
> >> > -msg = "a"
> >> >
> >> > -doSomething("a")
> >> >
> >> > -hasNext()
> >> >    ----> makeNext() will read "b"
> >> >          and  update the local offset "c"
> >> >
> >> > ==Commiter==
> >> >
> >> > -commitOffsets stores the current offset as "c"
> >> >
> >> >
> >> >
> >> >       =====PROCESS DIES=====
> >> >       =====  RESTARTS  =====
> >> >
> >> > ==Reader==
> >> > -initializes w/ offset pointing to "c"
> >> >
> >> > -hasNext()
> >> >    --> makeNext() will read "c"
> >> >        and update local offset to "d"
> >> > -msg = "c"
> >> > -doSomething("c")
> >> > ...
> >> >
> >> >
> >> >
> >> > note that in this scenario, doSomething("b") was never called.
> >> > Probably for a lot of applications this doesn't matter.  But seems
> >> > like it this could be terrible for some apps.  I can't think of any
> >> > way of preventing it from user code.  unless, maybe when the offsets
> >> > get committed, it is always *before* the last thing read?  eg., in my
> >> > example, it would store the next offset as "b" or earlier?
> >> >
> >> > Is there a flaw in my logic?  Do committed offsets always "undershoot"
> >> > to prevent this?
> >> >
> >> > thanks,
> >> > Imran
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
>

Reply via email to