regarding pause and resume approach, I think there will still be a chance
that you end up processing duplicate records. Rebalance can still get
triggered due to numerous reasons while you are processing records.

On Thu, Apr 21, 2016 at 10:34 AM, vinay sharma <vinsharma.t...@gmail.com>
wrote:

> I was also struggling with this problem. I have found one way to do it
> without making consumers aware of each others processing or assignment
> state. You can set autocommit to true. Irrespective of autocommit interval
> setting autocommit true will make kafka commit all records already sent to
> consumers.
>
> On Thu, Apr 21, 2016 at 5:18 AM, Phil Luckhurst <
> phil.luckhu...@encycle.com> wrote:
>
>> This is an example of  the scenario I'm trying avoid where 2 consumers
>> end up processing the same records from a partition at the same time.
>>
>>
>> 1.       I have a topic with 2 partitions and two consumers A and B which
>> have each been assigned a partition from the topic.
>>
>> 2.       Consumer A is processing a batch of 100 records from partition 1
>> and it takes much longer than expected so after 20 records the consumer
>> appears dead so a rebalance occurs.
>>
>> 3.       Consumer B now gets assigned both partitions and starts
>> processing partition 1 from the last committed offset by consumer A.
>>
>> 4.       Consumer A which, unknown to it, was the cause of the rebalance
>> is still processing the remaining records in its batch of 100 which are the
>> same records that Consumer B is also now processing from the same partition.
>>
>> Is there a way that I can detect in the ConsumerRecords loop of Consumer
>> A that it has been marked dead and should skip the remaining records in the
>> batch and do the next poll() which will cause it to trigger another
>> rebalance and rejoin the group? I have added a ConsumerRebalanceListener
>> but as onPartitionsRevoked() only gets called when poll() is called I don't
>> get notified until all the records from the current poll batch have been
>> processed.
>>
>> This shows where I think I need the check.
>>
>> while (true) {
>>                 ConsumerRecords<String, String> records =
>> consumer.poll(100);
>>                 records.forEach((ConsumerRecord<String, String> crs) -> {
>>                                 //  I think I need a check here to jump
>> out the loop and call poll() again?
>>                                 If (consumer.isDead())
>>                                                 continue;
>>
>>                 }
>> }
>>
>> I've looked at the suggestions for using pause and resume so that poll()
>> can be called during the record processing loop which might allow me to do
>> it like this?
>>
>>
>> 1.       Call pause at the start of the loop.
>>
>> 2.       Call poll(0) in the loop which will trigger the call to
>> ConsumerRebalanceListener onPartitionsRevoked() and perform the rebalance.
>>
>> 3.       If ConsumerRebalanceListener onPartitionsRevoked() was called
>> then I would call resume and break out of the record processing loop so the
>> main poll() request is called again.
>>
>> 4.       Call resume at the end of the record processing loop.
>>
>> Is that a viable solution to the problem or is there a better way to do
>> this?
>>
>> Thanks
>> Phil Luckhurst
>>
>
>

Reply via email to