Thanks Bhavesh.

I understand that to get "exactly once" processing of a message requires
some de-duplication. What I'm saying, is that the current high level
consumer, with automatic offset commits enabled, gives neither "at most
once" nor "at least once" guarantees: A consumer group might get duplicate
messages, but might also never fully process some messages (which is a
bigger problem for me).

With the code change I propose, I think it changes to "at least once", i.e.
one can then do the deduplication you describe, without worrying about
"losing" messages. Messages should not get committed without being fully
processed. I want to know if this code change has any obvious problems.

Regards
Carl


On Thu, Jun 18, 2015 at 11:19 PM, Bhavesh Mistry <mistry.p.bhav...@gmail.com
> wrote:

> HI Carl,
>
> Produce side retry can produce duplicated message being sent to brokers
> with different offset with same message. Also, you may get duplicated when
> the High Level Consumer offset is not being saved or commit but you have
> processed data and your server restart etc...
>
>
>
> To guaranteed at-least one processing across partitions (and across
> servers), you will need to store message hash or primary key into
> distributed LRU cache (with eviction policy )  like Hazelcast
> <http://www.hazelcast.com> and do dedupping across partitions.
>
>
>
> I hope this help !
>
>
>
> Thanks,
>
> Bhavesh
>
>
> On Wed, Jun 17, 2015 at 1:49 AM, yewton <yew...@gmail.com> wrote:
>
> > So Carl Heymann's ConsumerIterator.next hack approach is not reasonable?
> >
> > 2015-06-17 08:12:50 +0000 上のメッセージ Stevo Slavić:
> >
> >  --047d7bfcf30ed09b460518b241db
> >>
> >> Content-Type: text/plain; charset=UTF-8
> >>
> >>
> >>
> >>
> >> With auto-commit one can only have at-most-once delivery guarantee -
> after
> >>
> >> commit but before message is delivered for processing, or even after it
> is
> >>
> >> delivered but before it is processed, things can fail, causing event not
> >> to
> >>
> >> be processed, which is basically same outcome as if it was not
> delivered.
> >>
> >>
> >>
> >> On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann <ch.heym...@gmail.com>
> >> wrote:
> >>
> >>
> >>
> >> > Hi
> >>
> >> >
> >>
> >> > ** Disclaimer: I know there's a new consumer API on the way, this mail
> >> is
> >>
> >> > about the currently available API. I also apologise if the below has
> >>
> >> > already been discussed previously. I did try to check previous
> >> discussions
> >>
> >> > on ConsumerIterator **
> >>
> >> >
> >>
> >> > It seems to me that the high-level consumer would be able to support
> >>
> >> > at-least-once messaging, even if one uses auto-commit, by changing
> >>
> >> > kafka.consumer.ConsumerIterator.next() to call
> >>
> >> > currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This
> >> way, a
> >>
> >> > consumer thread for a KafkaStream could just loop:
> >>
> >> >
> >>
> >> > while (true) {
> >>
> >> >     MyMessage message = iterator.next().message();
> >>
> >> >     process(message);
> >>
> >> > }
> >>
> >> >
> >>
> >> > Each call to "iterator.next()" then updates the offset to commit to
> the
> >> end
> >>
> >> > of the message that was just processed. When offsets are committed for
> >> the
> >>
> >> > ConsumerConnector (either automatically or manually), the commit will
> >> not
> >>
> >> > include offsets of messages that haven't been fully processed.
> >>
> >> >
> >>
> >> > I've tested the following ConsumerIterator.next(), and it seems to
> work
> >> as
> >>
> >> > I expect:
> >>
> >> >
> >>
> >> >   override def next(): MessageAndMetadata[K, V] = {
> >>
> >> >     // New code: reset consumer offset to the end of the previously
> >>
> >> > consumed message:
> >>
> >> >     if (consumedOffset > -1L && currentTopicInfo != null) {
> >>
> >> >         currentTopicInfo.resetConsumeOffset(consumedOffset)
> >>
> >> >         val topic = currentTopicInfo.topic
> >>
> >> >         trace("Setting %s consumed offset to %d".format(topic,
> >>
> >> > consumedOffset))
> >>
> >> >     }
> >>
> >> >
> >>
> >> >     // Old code, excluding reset:
> >>
> >> >     val item = super.next()
> >>
> >> >     if(consumedOffset < 0)
> >>
> >> >       throw new KafkaException("Offset returned by the message set is
> >>
> >> > invalid %d".format(consumedOffset))
> >>
> >> >     val topic = currentTopicInfo.topic
> >>
> >> >     consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
> >>
> >> >     consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
> >>
> >> >     item
> >>
> >> >   }
> >>
> >> >
> >>
> >> > I've seen several people asking about managing commit offsets manually
> >> with
> >>
> >> > the high level consumer. I suspect that this approach (the modified
> >>
> >> > ConsumerIterator) would scale better than having a separate
> >>
> >> > ConsumerConnecter per stream just so that you can commit offsets with
> >>
> >> > at-least-once semantics. The downside of this approach is more
> duplicate
> >>
> >> > deliveries after recovery from hard failure (but this is "at least
> >> once",
> >>
> >> > right, not "exactly once").
> >>
> >> >
> >>
> >> > I don't propose that the code necessarily be changed like this in
> >> trunk, I
> >>
> >> > just want to know if the approach seems reasonable.
> >>
> >> >
> >>
> >> > Regards
> >>
> >> > Carl Heymann
> >>
> >> >
> >>
> >>
> >>
> >> --047d7bfcf30ed09b460518b241db--
> >>
> >>
> >>
> >>
> >
> >
> >
>

Reply via email to