Hi

** Disclaimer: I know there's a new consumer API on the way, this mail is
about the currently available API. I also apologise if the below has
already been discussed previously. I did try to check previous discussions
on ConsumerIterator **

It seems to me that the high-level consumer would be able to support
at-least-once messaging, even if one uses auto-commit, by changing
kafka.consumer.ConsumerIterator.next() to call
currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This way, a
consumer thread for a KafkaStream could just loop:

while (true) {
    MyMessage message = iterator.next().message();
    process(message);
}

Each call to "iterator.next()" then updates the offset to commit to the end
of the message that was just processed. When offsets are committed for the
ConsumerConnector (either automatically or manually), the commit will not
include offsets of messages that haven't been fully processed.

I've tested the following ConsumerIterator.next(), and it seems to work as
I expect:

  override def next(): MessageAndMetadata[K, V] = {
    // New code: reset consumer offset to the end of the previously
consumed message:
    if (consumedOffset > -1L && currentTopicInfo != null) {
        currentTopicInfo.resetConsumeOffset(consumedOffset)
        val topic = currentTopicInfo.topic
        trace("Setting %s consumed offset to %d".format(topic,
consumedOffset))
    }

    // Old code, excluding reset:
    val item = super.next()
    if(consumedOffset < 0)
      throw new KafkaException("Offset returned by the message set is
invalid %d".format(consumedOffset))
    val topic = currentTopicInfo.topic
    consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
    consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
    item
  }

I've seen several people asking about managing commit offsets manually with
the high level consumer. I suspect that this approach (the modified
ConsumerIterator) would scale better than having a separate
ConsumerConnecter per stream just so that you can commit offsets with
at-least-once semantics. The downside of this approach is more duplicate
deliveries after recovery from hard failure (but this is "at least once",
right, not "exactly once").

I don't propose that the code necessarily be changed like this in trunk, I
just want to know if the approach seems reasonable.

Regards
Carl Heymann

Reply via email to