Damien Gasparina created KAFKA-13636:
----------------------------------------
Summary: Committed offsets could be deleted during a rebalance if
a group did not commit for a while
Key: KAFKA-13636
URL: https://issues.apache.org/jira/browse/KAFKA-13636
Project: Kafka
Issue Type: Bug
Components: core, offset manager
Affects Versions: 3.0.0, 2.8.1, 2.7.2, 2.6.2, 2.5.1, 2.4.0
Reporter: Damien Gasparina
The group coordinator might delete invalid offsets during a group rebalance.
During a rebalance, the coordinator is relying on the last commit timestamp
({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state
modification {_}timestampt (currentStateTimestamp{_}) to detect expired offsets.
This is relatively easy to reproduce by playing with
group.initial.rebalance.delay.ms, offset.retention.minutes and
offset.check.retention.interval, I uploaded an example on:
[https://github.com/Dabz/kafka-example/tree/master/docker/offsets-retention] .
This script does:
* Start a broker with: offset.retention.minute=2,
o[ffset.check.retention.interval.ms=|http://offset.check.retention.interval.ms/]1000,
group.initial.rebalance.delay=20000
* Produced 10 messages
* Create a consumer group to consume 10 messages, and disable auto.commit to
only commit a few times
* Wait 3 minutes, then the Consumer get a {{kill -9}}
* Restart the consumer after a few seconds
* The consumer restart from {{auto.offset.reset}} , the offset got removed
The cause is due to the GroupMetadata.scala:
* When the group get emptied, the {{subscribedTopics}} is set to {{Set.empty}}
([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L520-L521])
* When the new member joins, we add the new member right away in the group ;
BUT the {{subscribedTopics}} is only updated once the migration is over (in the
initNewGeneration) (which could take a while due to the
{{{}group.initial.rebalance.delay{}}})
* When the log cleaner got executed, {{subscribedTopics.isDefined}} returns
true as {{Set.empty != None}} (the underlying condition)
* Thus we enter
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L782-L785]
with an empty {{subscribedTopics}} list and we are relying on the
{{commitTimestamp}} regardless of the {{currentStateTimestamp}}
This seem to be a regression generated by KIP-496
https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets#KIP496:AdministrativeAPItodeleteconsumeroffsets-ProposedChanges
--
This message was sent by Atlassian Jira
(v8.20.1#820001)