Thanks Bhavesh. I understand that to get "exactly once" processing of a message requires some de-duplication. What I'm saying, is that the current high level consumer, with automatic offset commits enabled, gives neither "at most once" nor "at least once" guarantees: A consumer group might get duplicate messages, but might also never fully process some messages (which is a bigger problem for me).
With the code change I propose, I think it changes to "at least once", i.e. one can then do the deduplication you describe, without worrying about "losing" messages. Messages should not get committed without being fully processed. I want to know if this code change has any obvious problems. Regards Carl On Thu, Jun 18, 2015 at 11:19 PM, Bhavesh Mistry <mistry.p.bhav...@gmail.com > wrote: > HI Carl, > > Produce side retry can produce duplicated message being sent to brokers > with different offset with same message. Also, you may get duplicated when > the High Level Consumer offset is not being saved or commit but you have > processed data and your server restart etc... > > > > To guaranteed at-least one processing across partitions (and across > servers), you will need to store message hash or primary key into > distributed LRU cache (with eviction policy ) like Hazelcast > <http://www.hazelcast.com> and do dedupping across partitions. > > > > I hope this help ! > > > > Thanks, > > Bhavesh > > > On Wed, Jun 17, 2015 at 1:49 AM, yewton <yew...@gmail.com> wrote: > > > So Carl Heymann's ConsumerIterator.next hack approach is not reasonable? > > > > 2015-06-17 08:12:50 +0000 上のメッセージ Stevo Slavić: > > > > --047d7bfcf30ed09b460518b241db > >> > >> Content-Type: text/plain; charset=UTF-8 > >> > >> > >> > >> > >> With auto-commit one can only have at-most-once delivery guarantee - > after > >> > >> commit but before message is delivered for processing, or even after it > is > >> > >> delivered but before it is processed, things can fail, causing event not > >> to > >> > >> be processed, which is basically same outcome as if it was not > delivered. > >> > >> > >> > >> On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann <ch.heym...@gmail.com> > >> wrote: > >> > >> > >> > >> > Hi > >> > >> > > >> > >> > ** Disclaimer: I know there's a new consumer API on the way, this mail > >> is > >> > >> > about the currently available API. I also apologise if the below has > >> > >> > already been discussed previously. I did try to check previous > >> discussions > >> > >> > on ConsumerIterator ** > >> > >> > > >> > >> > It seems to me that the high-level consumer would be able to support > >> > >> > at-least-once messaging, even if one uses auto-commit, by changing > >> > >> > kafka.consumer.ConsumerIterator.next() to call > >> > >> > currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This > >> way, a > >> > >> > consumer thread for a KafkaStream could just loop: > >> > >> > > >> > >> > while (true) { > >> > >> > MyMessage message = iterator.next().message(); > >> > >> > process(message); > >> > >> > } > >> > >> > > >> > >> > Each call to "iterator.next()" then updates the offset to commit to > the > >> end > >> > >> > of the message that was just processed. When offsets are committed for > >> the > >> > >> > ConsumerConnector (either automatically or manually), the commit will > >> not > >> > >> > include offsets of messages that haven't been fully processed. > >> > >> > > >> > >> > I've tested the following ConsumerIterator.next(), and it seems to > work > >> as > >> > >> > I expect: > >> > >> > > >> > >> > override def next(): MessageAndMetadata[K, V] = { > >> > >> > // New code: reset consumer offset to the end of the previously > >> > >> > consumed message: > >> > >> > if (consumedOffset > -1L && currentTopicInfo != null) { > >> > >> > currentTopicInfo.resetConsumeOffset(consumedOffset) > >> > >> > val topic = currentTopicInfo.topic > >> > >> > trace("Setting %s consumed offset to %d".format(topic, > >> > >> > consumedOffset)) > >> > >> > } > >> > >> > > >> > >> > // Old code, excluding reset: > >> > >> > val item = super.next() > >> > >> > if(consumedOffset < 0) > >> > >> > throw new KafkaException("Offset returned by the message set is > >> > >> > invalid %d".format(consumedOffset)) > >> > >> > val topic = currentTopicInfo.topic > >> > >> > consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark() > >> > >> > consumerTopicStats.getConsumerAllTopicStats().messageRate.mark() > >> > >> > item > >> > >> > } > >> > >> > > >> > >> > I've seen several people asking about managing commit offsets manually > >> with > >> > >> > the high level consumer. I suspect that this approach (the modified > >> > >> > ConsumerIterator) would scale better than having a separate > >> > >> > ConsumerConnecter per stream just so that you can commit offsets with > >> > >> > at-least-once semantics. The downside of this approach is more > duplicate > >> > >> > deliveries after recovery from hard failure (but this is "at least > >> once", > >> > >> > right, not "exactly once"). > >> > >> > > >> > >> > I don't propose that the code necessarily be changed like this in > >> trunk, I > >> > >> > just want to know if the approach seems reasonable. > >> > >> > > >> > >> > Regards > >> > >> > Carl Heymann > >> > >> > > >> > >> > >> > >> --047d7bfcf30ed09b460518b241db-- > >> > >> > >> > >> > > > > > > >