Hello,

I am using 0.11.0.0 version of Kakfa broker and Java client library. My
consumer code tracks offsets for each assigned partition and at some time
interval manually commits offsets by specifying partition->offset map.

What I noticed is, after the rebalance, even if consumer loses some
partitions that were assigned to it previously, offset commit for those
lost partitions still succeeds by that same consumer! Shouldn't offset
commit fail in this scenario since consumer is trying to commit offsets for
partitions that are not assigned to it?

For clarity below are the logs I see with comments:

// This is when consumer starts for "test" topic and it picks up 3
partitions
log>> onPartitionsAssigned: partitions=[test-1, test-2, test-0]

// Now consumer processes 3 records from partition 0 and 7 records from
partition 2 - confirmed with log statements
log>> ...

// Rebalance happens - right now, my code does not commit any pending
offsets here and just prints the log statement
log>> onPartitionRevoked: partitions=[test-1, test-2, test-0]

// After re-balance, consumer loses partition 0 and 1. Again, my code does
not do anything on this callback and just prints the log statement
onPartitionsAssigned: partitions=[test-2]

// Since the code did not commit offsets during revoke call, after
rebalance, poll() returns all records for assigned partitions since last
offset commit.
// ... So we re-process 7 records from partition 2. This was confirmed with
log statements.
log>> ...

// Offset commit gets triggered after some time and due to the bug in the
code, it tries to commit offsets for both partition 0 and 2.
// There is no failure however! I can see on Kafka broker side that offset
for partition 0 is updated to 3.
// I made sure that another consumer that is actually assigned partition 0
after re-balance has not committed offset yet.
commitOffsets: {0=3, 2=7}


Thanks,
M

Reply via email to