1) Yes. And worst case is that you may lose 1 message, but you may have
more than one duplicates per partition since the commit thread will call
CommitOffset periodically.

2). I think locking is also fine, since it will just lift the thread
waiting on the lock, which is better than spin-lock since that still takes
CPU.

Guozhang


On Thu, Nov 21, 2013 at 4:55 PM, Edward Capriolo <edlinuxg...@gmail.com>wrote:

> You likely need to use a custom offset solution if you plan on committing
> every message. With many partitions this puts a large burden on Zookeeper,
> you end up needing to roll over your zk transaction logs fast as well or
> rist filling up disk
>
>
> On Thu, Nov 21, 2013 at 6:20 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Imran,
> >
> > The offset will only be updated when the next() function is called:
> >
> > override def next(): MessageAndMetadata[K, V] = {
> >     ...
> >     *currentTopicInfo.resetConsumeOffset(consumedOffset)*
> >     ...
> >     item
> >   }
> >
> > instead of in makeNext(), which will just update consumedOffset, but that
> > is not the value that will be committed using the commitOffset call. So
> as
> > long as you turn of auto commit and only call commitOffset after the
> > process(msg) call, not after the
> >
> > b = iter.next()
> >
> > is called, at-least-once is guaranteed.
> >
> > Does that make sense?
> >
> > Guozhang
> >
> >
> > On Thu, Nov 21, 2013 at 2:14 PM, Imran Rashid <im...@therashids.com>
> > wrote:
> >
> > > sorry to keep bugging the list, but I feel like I am either missing
> > > something important, or I'm finding something wrong w/ the standard
> > > consumer api, (or maybe just the docs need some clarification).
> > >
> > > I started to think that I should probably just accept at least once
> > > semantics ... but I eventually realized that I'm not even sure we
> > > really get an at least once guarantee.  I think it really might be
> > > zero-or-more.  Or rather, messages will get pulled off the kafka queue
> > > at least once.  but that doesn't mean your app will actually *process*
> > > those messages at least once -- there might be messages it never
> > > processes.
> > >
> > > Consider a really basic reader of a kafka queue:
> > >
> > > while(it.hasNext()){
> > >   val msg = it.next()
> > >   doSomething(msg)
> > > }
> > >
> > > the question is, do I have any guarantees on how many times
> > > doSomething() is called on everything in the queue?  I think the
> > > "guarantee" is:
> > > 1) most messages will get processed excatly once
> > > 2) around a restart, a chunk of msgs will get processed at least once,
> > > but probably more than once
> > > 3) around a restart, it is possible that one message will get
> > > processed ZERO times
> > >
> > > (1) & (2) are probably clear, so lemme explain how I think (3) could
> > > happen.  Lets imagine messages a,b,c,... and two threads, one reading
> > > from the stream, and one thread that periodically commits the offsets.
> > >  Imagine this sequence of events:
> > >
> > >
> > > ==Reader==
> > > -initializes w/ offset pointing to "a"
> > >
> > > -hasNext()
> > >   ---> makeNext() will read "a"
> > >         and update the local offset to "b"
> > >
> > > -msg = "a"
> > >
> > > -doSomething("a")
> > >
> > > -hasNext()
> > >    ----> makeNext() will read "b"
> > >          and  update the local offset "c"
> > >
> > > ==Commiter==
> > >
> > > -commitOffsets stores the current offset as "c"
> > >
> > >
> > >
> > >       =====PROCESS DIES=====
> > >       =====  RESTARTS  =====
> > >
> > > ==Reader==
> > > -initializes w/ offset pointing to "c"
> > >
> > > -hasNext()
> > >    --> makeNext() will read "c"
> > >        and update local offset to "d"
> > > -msg = "c"
> > > -doSomething("c")
> > > ...
> > >
> > >
> > >
> > > note that in this scenario, doSomething("b") was never called.
> > > Probably for a lot of applications this doesn't matter.  But seems
> > > like it this could be terrible for some apps.  I can't think of any
> > > way of preventing it from user code.  unless, maybe when the offsets
> > > get committed, it is always *before* the last thing read?  eg., in my
> > > example, it would store the next offset as "b" or earlier?
> > >
> > > Is there a flaw in my logic?  Do committed offsets always "undershoot"
> > > to prevent this?
> > >
> > > thanks,
> > > Imran
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to