I was also struggling with this problem. I have found one way to do it without making consumers aware of each others processing or assignment state. You can set autocommit to true. Irrespective of autocommit interval setting autocommit true will make kafka commit all records already sent to consumers.
On Thu, Apr 21, 2016 at 5:18 AM, Phil Luckhurst <phil.luckhu...@encycle.com> wrote: > This is an example of the scenario I'm trying avoid where 2 consumers end > up processing the same records from a partition at the same time. > > > 1. I have a topic with 2 partitions and two consumers A and B which > have each been assigned a partition from the topic. > > 2. Consumer A is processing a batch of 100 records from partition 1 > and it takes much longer than expected so after 20 records the consumer > appears dead so a rebalance occurs. > > 3. Consumer B now gets assigned both partitions and starts > processing partition 1 from the last committed offset by consumer A. > > 4. Consumer A which, unknown to it, was the cause of the rebalance > is still processing the remaining records in its batch of 100 which are the > same records that Consumer B is also now processing from the same partition. > > Is there a way that I can detect in the ConsumerRecords loop of Consumer A > that it has been marked dead and should skip the remaining records in the > batch and do the next poll() which will cause it to trigger another > rebalance and rejoin the group? I have added a ConsumerRebalanceListener > but as onPartitionsRevoked() only gets called when poll() is called I don't > get notified until all the records from the current poll batch have been > processed. > > This shows where I think I need the check. > > while (true) { > ConsumerRecords<String, String> records = > consumer.poll(100); > records.forEach((ConsumerRecord<String, String> crs) -> { > // I think I need a check here to jump > out the loop and call poll() again? > If (consumer.isDead()) > continue; > > } > } > > I've looked at the suggestions for using pause and resume so that poll() > can be called during the record processing loop which might allow me to do > it like this? > > > 1. Call pause at the start of the loop. > > 2. Call poll(0) in the loop which will trigger the call to > ConsumerRebalanceListener onPartitionsRevoked() and perform the rebalance. > > 3. If ConsumerRebalanceListener onPartitionsRevoked() was called > then I would call resume and break out of the record processing loop so the > main poll() request is called again. > > 4. Call resume at the end of the record processing loop. > > Is that a viable solution to the problem or is there a better way to do > this? > > Thanks > Phil Luckhurst >