We have also encountered this problem in the production environment, and the 
specific manifestation is that some groups have site resets

kafka server 2.7.2 and 2.4.2
[2025-03-11 19:45:03,061] INFO [GroupCoordinator 2]: Preparing to rebalance 
group nihao-newbee-bill in state PreparingRebalance with old generation 4549 
(__consumer_offsets-16) (reason: removing member 
nihao-newbee-bill_80ab55b3f31540da-86e2b6ec-8932-4869-b53c-dbd4862491bf on 
heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2025-03-11 19:45:03,985] INFO [GroupCoordinator 2]: Member 
nihao-newbee-bill_0eff5fbfbf1cf62f-e65cc081-e44e-41af-9fac-262f8a3a0f6a in 
group nihao-newbee-bill has failed, removing it from the group 
[2025-03-11 19:45:03,992] INFO [GroupCoordinator 2]: Member 
nihao-newbee-bill_dbde5089929e8b07-b4967d22-c142-45b1-823f-0f09f296c625 in 
group nihao-newbee-bill has failed, removing it from the group 

not have logs 
Removed 2 expired offsets in 5 milliseconds

> We've recently encountered a scenario where a consumer group got their 
> committed offsets deleted some minutes (around 3 minutes)  after the consumer 
> got into inactive state (the underlying node went away).
> As per 
> [KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets],
>  committed offsets for an active (i.e running) consumer group should not be 
> deleted. However, if a consumer becomes inactive, {+}the deletion of 
> committed offsets will not occur immediately{+}. Instead, the committed 
> offsets will only be removed if the consumer remains inactive for at least 
> the duration specified by 
> [offset.retention.minutes|https://kafka.apache.org/documentation/#brokerconfigs_offsets.retention.minutes].
> In our case {{offset.retention.minutes}} is set to 7 days and the consumer 
> was only inactive for 5 minutes, so deletion should have not occurred.
> Later on 
> [KIP-496|https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets]
>  was introduced to fix the following issue in KIP-211:
> {quote}When a consumer subscription changes, we are still left with the 
> committed offsets of the previous subscription. These will never be cleaned 
> up as long as the group remains active. We were aware of this problem in 
> KIP-211, but the solution was not implemented because the coordinator is 
> presently agnostic to join group metadata and we were unclear about the 
> compatibility implications of changing that.
> {quote}
> However this introduced a regression as explained in 
> https://issues.apache.org/jira/browse/KAFKA-13636.
> {quote}The group coordinator might delete invalid offsets during a group 
> rebalance. During a rebalance, the coordinator is relying on the last commit 
> timestamp ({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state 
> modification {_}timestamp (currentStateTimestamp{_}) to detect expired 
> offsets.
> {quote}
> It is implied, that Kafka employs two approaches for offset expiration:
>  * The deletion timer is activated when a consumer group enters the Empty 
> state (i.e., not running). Once the timer exceeds the 
> {{offset.retention.minutes}} threshold, the committed offsets are deleted.
>  * If a consumer is in a "running" state (i.e., not in the Empty state) but 
> is no longer consuming from topics with committed offsets older than the 
> offset.retention.minutes duration, the committed offsets are deleted.
> However, the Kafka issue KAFKA-13636 specifically states that this situation 
> could occur during a group rebalance. In my particular scenario, I have 
> observed that the affected consumer group did not transition into the Empty 
> state. I'm uncertain whether the rebalance has finished. The reason why Kafka 
> did not detect this consumer group as Empty remains unclear.
> *Logs*
> {noformat}
> 01:30:47 am - [GroupCoordinator 1]: Member consumer-mycg-1-uuid in group mycg 
> has failed, removing it from the group
> 01:30:47 am - [GroupCoordinator 1]: Preparing to rebalance group mycg in 
> state PreparingRebalance with old generation 432 (__consumer_offsets-16) 
> (reason: removing member consumer-mycg-1-uuid on heartbeat expiration)
> 1:30:50 am - [GroupCoordinator 1]: Member consumer-mycg-2-uuid in group mycg 
> has failed, removing it from the group
> 01:30:50 am - [GroupCoordinator 1]: Stabilized group mycg generation 433 
> (__consumer_offsets-16)
> 01:30:50 am - [GroupCoordinator 1]: Assignment received from leader for group 
> mycg for generation 433{noformat}
> This suggests that kafka might have followed the second approach and that's 
> why kafka deleted the offsets 3 minutes later.
> {noformat}
> 1:33:17 am - 
> [GroupMetadataManager brokerId=1] Removed 285 expired offsets in 8 
> milliseconds.{noformat}
> h4. Was the consumer group in a rebalance state when the offsets were removed?
> By looking at the logs and the kafka codebase, we can find the following:
>  * The [onExpireHeartbeat 
> callback|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1278]
>  on the GroupCoordinator is called. The 
> [removeMemberAndUpdateGroup|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1291]
>  method is called.
>  * The removeMemberAndUpdateGroup method will call [maybePrepareRebalance 
> when the group is 
> stable|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1158].
>  This was the case as the group was not empty (not running), dead (removed) 
> or PreparingRebalance state from kafka perspective.
>  * maybePrepareRebalance [just calls 
> prepareRebalance|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1118]
>  if the group can rebalance.
>  * PrepareRebalance will call 
> [DelayedJoin|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1136]
>  as the group is not empty. [This 
> object|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala#L25-L28]
>  takes care of the join-group operation (first phase of a rebalance).
>  * When the group is not empty nor dead, it [will 
> log|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1222]
>  {{{}Stabilized group ${group.groupId} generation 
> ${group.generationId}{{}}}}. {*}This does not mean the rebalance phase is 
> over{*}. It will immediately after wait on the JoinGroupResponse requests 
> from all members. This is still the first phase of the rebalance.
>  * Later on we can find a "Assignment received from leader for group" log 
> line. This log line comes from the group coordinator when handling a sync 
> group request (2nd phase of the rebalance) in the [CompletingRebalance 
> state|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L412].
>  * By following that codepath, if no errors are logged, then [the assignment 
> will be propagated and the group transition to Stable 
> state|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L432-L433].
> This seems to indicate that we were not in a rebalance state and [the 
> fix|https://github.com/apache/kafka/pull/11742/files] for KAFKA-13636 won't 
> fix the issue seen here. The main question remains as to why Kafka did not 
> transition the consumer group to an empty state.

