regarding pause and resume approach, I think there will still be a chance that you end up processing duplicate records. Rebalance can still get triggered due to numerous reasons while you are processing records.
On Thu, Apr 21, 2016 at 10:34 AM, vinay sharma <vinsharma.t...@gmail.com> wrote: > I was also struggling with this problem. I have found one way to do it > without making consumers aware of each others processing or assignment > state. You can set autocommit to true. Irrespective of autocommit interval > setting autocommit true will make kafka commit all records already sent to > consumers. > > On Thu, Apr 21, 2016 at 5:18 AM, Phil Luckhurst < > phil.luckhu...@encycle.com> wrote: > >> This is an example of the scenario I'm trying avoid where 2 consumers >> end up processing the same records from a partition at the same time. >> >> >> 1. I have a topic with 2 partitions and two consumers A and B which >> have each been assigned a partition from the topic. >> >> 2. Consumer A is processing a batch of 100 records from partition 1 >> and it takes much longer than expected so after 20 records the consumer >> appears dead so a rebalance occurs. >> >> 3. Consumer B now gets assigned both partitions and starts >> processing partition 1 from the last committed offset by consumer A. >> >> 4. Consumer A which, unknown to it, was the cause of the rebalance >> is still processing the remaining records in its batch of 100 which are the >> same records that Consumer B is also now processing from the same partition. >> >> Is there a way that I can detect in the ConsumerRecords loop of Consumer >> A that it has been marked dead and should skip the remaining records in the >> batch and do the next poll() which will cause it to trigger another >> rebalance and rejoin the group? I have added a ConsumerRebalanceListener >> but as onPartitionsRevoked() only gets called when poll() is called I don't >> get notified until all the records from the current poll batch have been >> processed. >> >> This shows where I think I need the check. >> >> while (true) { >> ConsumerRecords<String, String> records = >> consumer.poll(100); >> records.forEach((ConsumerRecord<String, String> crs) -> { >> // I think I need a check here to jump >> out the loop and call poll() again? >> If (consumer.isDead()) >> continue; >> >> } >> } >> >> I've looked at the suggestions for using pause and resume so that poll() >> can be called during the record processing loop which might allow me to do >> it like this? >> >> >> 1. Call pause at the start of the loop. >> >> 2. Call poll(0) in the loop which will trigger the call to >> ConsumerRebalanceListener onPartitionsRevoked() and perform the rebalance. >> >> 3. If ConsumerRebalanceListener onPartitionsRevoked() was called >> then I would call resume and break out of the record processing loop so the >> main poll() request is called again. >> >> 4. Call resume at the end of the record processing loop. >> >> Is that a viable solution to the problem or is there a better way to do >> this? >> >> Thanks >> Phil Luckhurst >> > >