I ran in to a strange scenario today that after looking closer I am seeing that 
Kafka doesn't fail on several invalid scenarios of committing offsets. I have 
two partitions and two processes in the same consumer group, so each process 
has one partition. Process 1 retrieved a set of records from partition 0 (say 
offset 1 to 10) and then as it processed them one by one it committed the 
offsets one by one. Process 2 was consuming from partition 1 at this time. 
Meanwhile a rebalance occurred, I see this message in the Kafka logs


2016-08-10 11:00:33,702] INFO [kafka.coordinator.GroupCoordinator] 
[GroupCoordinator 1]: Preparing to restabilize group casemonGatewayNotifier 
with old generation 2

[2016-08-10 11:00:35,503] INFO [kafka.coordinator.GroupCoordinator] 
[GroupCoordinator 1]: Group casemonGatewayNotifier generation 2 is dead and 
removed

[2016-08-10 11:00:39,700] INFO [kafka.coordinator.GroupCoordinator] 
[GroupCoordinator 1]: Preparing to restabilize group casemonGatewayNotifier 
with old generation 0

[2016-08-10 11:00:39,700] INFO [kafka.coordinator.GroupCoordinator] 
[GroupCoordinator 1]: Stabilized group casemonGatewayNotifier generation 1

[2016-08-10 11:00:39,701] INFO [kafka.coordinator.GroupCoordinator] 
[GroupCoordinator 1]: Assignment received from leader for group 
casemonGatewayNotifier for generation 1

[2016-08-10 11:00:39,751] INFO [kafka.coordinator.GroupCoordinator] 
[GroupCoordinator 1]: Preparing to restabilize group casemonGatewayNotifier 
with old generation 1

[2016-08-10 11:00:43,699] INFO [kafka.coordinator.GroupCoordinator] 
[GroupCoordinator 1]: Stabilized group casemonGatewayNotifier generation 2

[2016-08-10 11:00:43,701] INFO [kafka.coordinator.GroupCoordinator] 
[GroupCoordinator 1]: Assignment received from leader for group 
casemonGatewayNotifier for generation 2

At this point the two processes swapped partitions but process 1 was still 
processing and committing records with partition 0, some of these failed but 
then they started to succeed again. While it was doing this process 2 called 
"poll" and received records from partition 0 and received records with offset 7 
to 10. So at this point both processes have some of the same records from 
partition 0 and are both processing and committing them. The strange part is 
that none of the commits are failing. Both processes are able to commit the 
same records and process 1 is able to commit offsets to the partition it 
doesn't even own.

I created some test code to further verify this,  I continuously commit the 
same offset every time and what I saw was that "poll" would continue to return 
me new records (records with a higher offset then what I committed). 
kafka.admin.ConsumerGroupCommand continues to show the old offset and a lag. 
When I restart the process it correctly starts back at the old offset, but 
continues to get new records beyond the offset again.  It seems that invalid 
commits confuse the consumer logic.

Since the commits don't fail after the rebalance I don't seem to have a way to 
know that I should no longer be processing these records, therefore I end up 
processing them in both processes.  I know in 0.10 the poll has been changed to 
specify the number of records to retrieve, it seems that the only way to avoid 
these kind of duplicate processing is to only every process the 1st record 
returned by "poll", this seems inefficient. I know there is a rebalance 
callback but if I read the documentation correct this will only get invoked if 
you call "poll" so when working off my memory buffer I wouldn't know. How are 
people handling this currently?

Thanks,
Sean



Reply via email to