This is an example of  the scenario I'm trying avoid where 2 consumers end up 
processing the same records from a partition at the same time.


1.       I have a topic with 2 partitions and two consumers A and B which have 
each been assigned a partition from the topic.

2.       Consumer A is processing a batch of 100 records from partition 1 and 
it takes much longer than expected so after 20 records the consumer appears 
dead so a rebalance occurs.

3.       Consumer B now gets assigned both partitions and starts processing 
partition 1 from the last committed offset by consumer A.

4.       Consumer A which, unknown to it, was the cause of the rebalance is 
still processing the remaining records in its batch of 100 which are the same 
records that Consumer B is also now processing from the same partition.

Is there a way that I can detect in the ConsumerRecords loop of Consumer A that 
it has been marked dead and should skip the remaining records in the batch and 
do the next poll() which will cause it to trigger another rebalance and rejoin 
the group? I have added a ConsumerRebalanceListener but as 
onPartitionsRevoked() only gets called when poll() is called I don't get 
notified until all the records from the current poll batch have been processed.

This shows where I think I need the check.

while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                records.forEach((ConsumerRecord<String, String> crs) -> {
                                //  I think I need a check here to jump out the 
loop and call poll() again?
                                If (consumer.isDead())
                                                continue;

                }
}

I've looked at the suggestions for using pause and resume so that poll() can be 
called during the record processing loop which might allow me to do it like 
this?


1.       Call pause at the start of the loop.

2.       Call poll(0) in the loop which will trigger the call to 
ConsumerRebalanceListener onPartitionsRevoked() and perform the rebalance.

3.       If ConsumerRebalanceListener onPartitionsRevoked() was called then I 
would call resume and break out of the record processing loop so the main 
poll() request is called again.

4.       Call resume at the end of the record processing loop.

Is that a viable solution to the problem or is there a better way to do this?

Thanks
Phil Luckhurst

Reply via email to