Thanks Jiangjie So you agree that with the modified ConsumerIterator.next() code, the high level consumer becomes at-least-once, even with auto-commit enabled? That is what I really want to know.
I'll have a look at the rebalancing code. I think I understand: during rebalancing, with auto-commit enabled, the offsets are committed in ZookeeperConsumerConnector.closeFetchersForQueues(..). Some processing might still be happening at this point. The rebalance listener is called only after this commit. So the current code (without my change) would lead to fewer duplicate messages, because it assumes that these transactions normally complete. This seems prudent, since rebalancing happens much more frequently than java processes being killed unexpectedly. On the other hand it means giving up at-least-once guarantees for message processing, when a java process actually does die unexpectedly. So I see it should be better to create a custom offset tracking&commit component, with some ability to wait a reasonable amount of time for consumer threads on streams to complete their current transaction, on rebalance, before committing from a rebalance listener. Is it OK to block for a second or two in consumerRebalanceListener.beforeReleasingPartitions(..), to wait for processing threads to complete? Will this hold up the whole cluster's rebalancing? The new KafkaConsumer code doesn't appear to do a commit in the same way during rebalance, when autocommit is enabled. So if current users of the high level consumer switch to the new consumer, they might get more duplicates on rebalance, right? Regards Carl On Sun, Jun 21, 2015 at 1:43 AM, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > Hi Carl, > > Generally, you approach works to guarantee at least once consumption - > basically people have to commit offset only after they have processed the > message. > The only problem is that in old high level consumer, during consumer > rebalance consumer will (and should) commit offsets. To guarantee > at-least-once and avoid unecessary duplicates on rebalance, ideally we > should wait until all the messages returned by iterator to be processed > before commit offsets. > > In LinkedIn, we have wrapper around open source consumer iterator where we > can implants those logics. > > Jiangjie (Becket) Qin > > On 6/19/15, 12:22 AM, "Carl Heymann" <ch.heym...@gmail.com> wrote: > > >Thanks Bhavesh. > > > >I understand that to get "exactly once" processing of a message requires > >some de-duplication. What I'm saying, is that the current high level > >consumer, with automatic offset commits enabled, gives neither "at most > >once" nor "at least once" guarantees: A consumer group might get duplicate > >messages, but might also never fully process some messages (which is a > >bigger problem for me). > > > >With the code change I propose, I think it changes to "at least once", > >i.e. > >one can then do the deduplication you describe, without worrying about > >"losing" messages. Messages should not get committed without being fully > >processed. I want to know if this code change has any obvious problems. > > > >Regards > >Carl > > > > > >On Thu, Jun 18, 2015 at 11:19 PM, Bhavesh Mistry > ><mistry.p.bhav...@gmail.com > >> wrote: > > > >> HI Carl, > >> > >> Produce side retry can produce duplicated message being sent to brokers > >> with different offset with same message. Also, you may get duplicated > >>when > >> the High Level Consumer offset is not being saved or commit but you have > >> processed data and your server restart etc... > >> > >> > >> > >> To guaranteed at-least one processing across partitions (and across > >> servers), you will need to store message hash or primary key into > >> distributed LRU cache (with eviction policy ) like Hazelcast > >> <http://www.hazelcast.com> and do dedupping across partitions. > >> > >> > >> > >> I hope this help ! > >> > >> > >> > >> Thanks, > >> > >> Bhavesh > >> > >> > >> On Wed, Jun 17, 2015 at 1:49 AM, yewton <yew...@gmail.com> wrote: > >> > >> > So Carl Heymann's ConsumerIterator.next hack approach is not > >>reasonable? > >> > > >> > 2015-06-17 08:12:50 +0000 上のメッセージ Stevo Slavić: > >> > > >> > --047d7bfcf30ed09b460518b241db > >> >> > >> >> Content-Type: text/plain; charset=UTF-8 > >> >> > >> >> > >> >> > >> >> > >> >> With auto-commit one can only have at-most-once delivery guarantee - > >> after > >> >> > >> >> commit but before message is delivered for processing, or even after > >>it > >> is > >> >> > >> >> delivered but before it is processed, things can fail, causing event > >>not > >> >> to > >> >> > >> >> be processed, which is basically same outcome as if it was not > >> delivered. > >> >> > >> >> > >> >> > >> >> On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann <ch.heym...@gmail.com> > >> >> wrote: > >> >> > >> >> > >> >> > >> >> > Hi > >> >> > >> >> > > >> >> > >> >> > ** Disclaimer: I know there's a new consumer API on the way, this > >>mail > >> >> is > >> >> > >> >> > about the currently available API. I also apologise if the below > >>has > >> >> > >> >> > already been discussed previously. I did try to check previous > >> >> discussions > >> >> > >> >> > on ConsumerIterator ** > >> >> > >> >> > > >> >> > >> >> > It seems to me that the high-level consumer would be able to > >>support > >> >> > >> >> > at-least-once messaging, even if one uses auto-commit, by changing > >> >> > >> >> > kafka.consumer.ConsumerIterator.next() to call > >> >> > >> >> > currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This > >> >> way, a > >> >> > >> >> > consumer thread for a KafkaStream could just loop: > >> >> > >> >> > > >> >> > >> >> > while (true) { > >> >> > >> >> > MyMessage message = iterator.next().message(); > >> >> > >> >> > process(message); > >> >> > >> >> > } > >> >> > >> >> > > >> >> > >> >> > Each call to "iterator.next()" then updates the offset to commit to > >> the > >> >> end > >> >> > >> >> > of the message that was just processed. When offsets are committed > >>for > >> >> the > >> >> > >> >> > ConsumerConnector (either automatically or manually), the commit > >>will > >> >> not > >> >> > >> >> > include offsets of messages that haven't been fully processed. > >> >> > >> >> > > >> >> > >> >> > I've tested the following ConsumerIterator.next(), and it seems to > >> work > >> >> as > >> >> > >> >> > I expect: > >> >> > >> >> > > >> >> > >> >> > override def next(): MessageAndMetadata[K, V] = { > >> >> > >> >> > // New code: reset consumer offset to the end of the previously > >> >> > >> >> > consumed message: > >> >> > >> >> > if (consumedOffset > -1L && currentTopicInfo != null) { > >> >> > >> >> > currentTopicInfo.resetConsumeOffset(consumedOffset) > >> >> > >> >> > val topic = currentTopicInfo.topic > >> >> > >> >> > trace("Setting %s consumed offset to %d".format(topic, > >> >> > >> >> > consumedOffset)) > >> >> > >> >> > } > >> >> > >> >> > > >> >> > >> >> > // Old code, excluding reset: > >> >> > >> >> > val item = super.next() > >> >> > >> >> > if(consumedOffset < 0) > >> >> > >> >> > throw new KafkaException("Offset returned by the message set > >>is > >> >> > >> >> > invalid %d".format(consumedOffset)) > >> >> > >> >> > val topic = currentTopicInfo.topic > >> >> > >> >> > > >>consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark() > >> >> > >> >> > > >>consumerTopicStats.getConsumerAllTopicStats().messageRate.mark() > >> >> > >> >> > item > >> >> > >> >> > } > >> >> > >> >> > > >> >> > >> >> > I've seen several people asking about managing commit offsets > >>manually > >> >> with > >> >> > >> >> > the high level consumer. I suspect that this approach (the modified > >> >> > >> >> > ConsumerIterator) would scale better than having a separate > >> >> > >> >> > ConsumerConnecter per stream just so that you can commit offsets > >>with > >> >> > >> >> > at-least-once semantics. The downside of this approach is more > >> duplicate > >> >> > >> >> > deliveries after recovery from hard failure (but this is "at least > >> >> once", > >> >> > >> >> > right, not "exactly once"). > >> >> > >> >> > > >> >> > >> >> > I don't propose that the code necessarily be changed like this in > >> >> trunk, I > >> >> > >> >> > just want to know if the approach seems reasonable. > >> >> > >> >> > > >> >> > >> >> > Regards > >> >> > >> >> > Carl Heymann > >> >> > >> >> > > >> >> > >> >> > >> >> > >> >> --047d7bfcf30ed09b460518b241db-- > >> >> > >> >> > >> >> > >> >> > >> > > >> > > >> > > >> > >