Hi Edward,

I think you misunderstand ... I definitely do *not* want to commit
every message.  That would be much too expensive, every few minutes is
plenty for me.

I want to guarantee that if I commit a message, that my message has
been processed by *my application* at least once.  (In another thread
I mentioned something about reading exactly once, but that is not what
I am going for here -- more than once is OK, as long as its at least
once.)  That requires me to make sure that a commit never happens
between a call to next() and doSomething().  Kafka guarantees that
messages get read off the queue at least once, but as I outlined
above, a naive solution allows for messages to get read off the queue,
but not make it into my app.

Not only do I not want to commit every message -- I'd really like to
have the above guarantee without even acquiring a lock (most of the
time).  That is what I was getting at with the comment about the
spin-lock, just as an idea of how to prevent a commit between next()
and doSomething().  I dunno if the spin-lock was really the right
idea, that was just a random thought, but the point is, we want some
check that should be very cheap for 99.999% of the time when a commit
isn't happening, but can still guarantee proper ordering during that
0.001% of the time when a commit is happening.  otherwise, messages
might never make it to my app.  (And this is all just to prevent a
message getting lost during the 10^-6% of the time when a commit might
happen between next() and doSomething(), and the app dies before
doSomething completes!)

Actually after a bit more thought -- the best way to guarantee this
would be with a small api change, that would do away with the need for
locks completely.  The iterator should have a method applyToNext:

def applyToNext(f: MessageAndData[K,V] => Unit) {
  //all the stuff in next, *except*
currentTopicInfo.resetConsumeOffset(consumedOffset)
  val item = ...
  f(item)
  //after we've applied the users function, now we can update the
offset that should get committed
  currentTopicInfo.resetConsumeOffset(consumedOffset)
}

I think this way, you could avoid any problems even with auto-commit.
Or, if you don't want to add a new method, so we can stick to the
Iterator api, then maybe the iterator could let you register a
preCommitFunction, so next() would change to:

override def next(): MessageAndMetadata[K, V] = {
    ...
    val item = ...
    preCommitFuntions.foreach{f => f(item)}
    *currentTopicInfo.resetConsumeOffset(consumedOffset)*
    ...
    item
  }


thanks,
Imran

On Thu, Nov 21, 2013 at 6:55 PM, Edward Capriolo <edlinuxg...@gmail.com> wrote:
> You likely need to use a custom offset solution if you plan on committing
> every message. With many partitions this puts a large burden on Zookeeper,
> you end up needing to roll over your zk transaction logs fast as well or
> rist filling up disk
>
>
> On Thu, Nov 21, 2013 at 6:20 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Hello Imran,
>>
>> The offset will only be updated when the next() function is called:
>>
>> override def next(): MessageAndMetadata[K, V] = {
>>     ...
>>     *currentTopicInfo.resetConsumeOffset(consumedOffset)*
>>     ...
>>     item
>>   }
>>
>> instead of in makeNext(), which will just update consumedOffset, but that
>> is not the value that will be committed using the commitOffset call. So as
>> long as you turn of auto commit and only call commitOffset after the
>> process(msg) call, not after the
>>
>> b = iter.next()
>>
>> is called, at-least-once is guaranteed.
>>
>> Does that make sense?
>>
>> Guozhang
>>
>>
>> On Thu, Nov 21, 2013 at 2:14 PM, Imran Rashid <im...@therashids.com>
>> wrote:
>>
>> > sorry to keep bugging the list, but I feel like I am either missing
>> > something important, or I'm finding something wrong w/ the standard
>> > consumer api, (or maybe just the docs need some clarification).
>> >
>> > I started to think that I should probably just accept at least once
>> > semantics ... but I eventually realized that I'm not even sure we
>> > really get an at least once guarantee.  I think it really might be
>> > zero-or-more.  Or rather, messages will get pulled off the kafka queue
>> > at least once.  but that doesn't mean your app will actually *process*
>> > those messages at least once -- there might be messages it never
>> > processes.
>> >
>> > Consider a really basic reader of a kafka queue:
>> >
>> > while(it.hasNext()){
>> >   val msg = it.next()
>> >   doSomething(msg)
>> > }
>> >
>> > the question is, do I have any guarantees on how many times
>> > doSomething() is called on everything in the queue?  I think the
>> > "guarantee" is:
>> > 1) most messages will get processed excatly once
>> > 2) around a restart, a chunk of msgs will get processed at least once,
>> > but probably more than once
>> > 3) around a restart, it is possible that one message will get
>> > processed ZERO times
>> >
>> > (1) & (2) are probably clear, so lemme explain how I think (3) could
>> > happen.  Lets imagine messages a,b,c,... and two threads, one reading
>> > from the stream, and one thread that periodically commits the offsets.
>> >  Imagine this sequence of events:
>> >
>> >
>> > ==Reader==
>> > -initializes w/ offset pointing to "a"
>> >
>> > -hasNext()
>> >   ---> makeNext() will read "a"
>> >         and update the local offset to "b"
>> >
>> > -msg = "a"
>> >
>> > -doSomething("a")
>> >
>> > -hasNext()
>> >    ----> makeNext() will read "b"
>> >          and  update the local offset "c"
>> >
>> > ==Commiter==
>> >
>> > -commitOffsets stores the current offset as "c"
>> >
>> >
>> >
>> >       =====PROCESS DIES=====
>> >       =====  RESTARTS  =====
>> >
>> > ==Reader==
>> > -initializes w/ offset pointing to "c"
>> >
>> > -hasNext()
>> >    --> makeNext() will read "c"
>> >        and update local offset to "d"
>> > -msg = "c"
>> > -doSomething("c")
>> > ...
>> >
>> >
>> >
>> > note that in this scenario, doSomething("b") was never called.
>> > Probably for a lot of applications this doesn't matter.  But seems
>> > like it this could be terrible for some apps.  I can't think of any
>> > way of preventing it from user code.  unless, maybe when the offsets
>> > get committed, it is always *before* the last thing read?  eg., in my
>> > example, it would store the next offset as "b" or earlier?
>> >
>> > Is there a flaw in my logic?  Do committed offsets always "undershoot"
>> > to prevent this?
>> >
>> > thanks,
>> > Imran
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>

Reply via email to