You likely need to use a custom offset solution if you plan on committing
every message. With many partitions this puts a large burden on Zookeeper,
you end up needing to roll over your zk transaction logs fast as well or
rist filling up disk


On Thu, Nov 21, 2013 at 6:20 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Imran,
>
> The offset will only be updated when the next() function is called:
>
> override def next(): MessageAndMetadata[K, V] = {
>     ...
>     *currentTopicInfo.resetConsumeOffset(consumedOffset)*
>     ...
>     item
>   }
>
> instead of in makeNext(), which will just update consumedOffset, but that
> is not the value that will be committed using the commitOffset call. So as
> long as you turn of auto commit and only call commitOffset after the
> process(msg) call, not after the
>
> b = iter.next()
>
> is called, at-least-once is guaranteed.
>
> Does that make sense?
>
> Guozhang
>
>
> On Thu, Nov 21, 2013 at 2:14 PM, Imran Rashid <im...@therashids.com>
> wrote:
>
> > sorry to keep bugging the list, but I feel like I am either missing
> > something important, or I'm finding something wrong w/ the standard
> > consumer api, (or maybe just the docs need some clarification).
> >
> > I started to think that I should probably just accept at least once
> > semantics ... but I eventually realized that I'm not even sure we
> > really get an at least once guarantee.  I think it really might be
> > zero-or-more.  Or rather, messages will get pulled off the kafka queue
> > at least once.  but that doesn't mean your app will actually *process*
> > those messages at least once -- there might be messages it never
> > processes.
> >
> > Consider a really basic reader of a kafka queue:
> >
> > while(it.hasNext()){
> >   val msg = it.next()
> >   doSomething(msg)
> > }
> >
> > the question is, do I have any guarantees on how many times
> > doSomething() is called on everything in the queue?  I think the
> > "guarantee" is:
> > 1) most messages will get processed excatly once
> > 2) around a restart, a chunk of msgs will get processed at least once,
> > but probably more than once
> > 3) around a restart, it is possible that one message will get
> > processed ZERO times
> >
> > (1) & (2) are probably clear, so lemme explain how I think (3) could
> > happen.  Lets imagine messages a,b,c,... and two threads, one reading
> > from the stream, and one thread that periodically commits the offsets.
> >  Imagine this sequence of events:
> >
> >
> > ==Reader==
> > -initializes w/ offset pointing to "a"
> >
> > -hasNext()
> >   ---> makeNext() will read "a"
> >         and update the local offset to "b"
> >
> > -msg = "a"
> >
> > -doSomething("a")
> >
> > -hasNext()
> >    ----> makeNext() will read "b"
> >          and  update the local offset "c"
> >
> > ==Commiter==
> >
> > -commitOffsets stores the current offset as "c"
> >
> >
> >
> >       =====PROCESS DIES=====
> >       =====  RESTARTS  =====
> >
> > ==Reader==
> > -initializes w/ offset pointing to "c"
> >
> > -hasNext()
> >    --> makeNext() will read "c"
> >        and update local offset to "d"
> > -msg = "c"
> > -doSomething("c")
> > ...
> >
> >
> >
> > note that in this scenario, doSomething("b") was never called.
> > Probably for a lot of applications this doesn't matter.  But seems
> > like it this could be terrible for some apps.  I can't think of any
> > way of preventing it from user code.  unless, maybe when the offsets
> > get committed, it is always *before* the last thing read?  eg., in my
> > example, it would store the next offset as "b" or earlier?
> >
> > Is there a flaw in my logic?  Do committed offsets always "undershoot"
> > to prevent this?
> >
> > thanks,
> > Imran
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to