[ https://issues.apache.org/jira/browse/KAFKA-15035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17934211#comment-17934211 ]
zhangzhisheng edited comment on KAFKA-15035 at 3/11/25 1:24 PM: ---------------------------------------------------------------- We have also encountered this problem in the production environment, and the specific manifestation is that some groups have site resets kafka server 2.7.2 and 2.4.2 {code:java} [2025-03-11 19:45:03,061] INFO [GroupCoordinator 2]: Preparing to rebalance group nihao-newbee-bill in state PreparingRebalance with old generation 4549 (__consumer_offsets-16) (reason: removing member nihao-newbee-bill_80ab55b3f31540da-86e2b6ec-8932-4869-b53c-dbd4862491bf on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator) [2025-03-11 19:45:03,985] INFO [GroupCoordinator 2]: Member nihao-newbee-bill_0eff5fbfbf1cf62f-e65cc081-e44e-41af-9fac-262f8a3a0f6a in group nihao-newbee-bill has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) [2025-03-11 19:45:03,992] INFO [GroupCoordinator 2]: Member nihao-newbee-bill_dbde5089929e8b07-b4967d22-c142-45b1-823f-0f09f296c625 in group nihao-newbee-bill has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) {code} not have logs Removed 2 expired offsets in 5 milliseconds was (Author: zhangzs): We have also encountered this problem in the production environment, and the specific manifestation is that some groups have site resets kafka server 2.7.2 and 2.4.2 {code:java} [2025-03-11 19:45:03,061] INFO [GroupCoordinator 2]: Preparing to rebalance group nihao-newbee-bill in state PreparingRebalance with old generation 4549 (__consumer_offsets-16) (reason: removing member nihao-newbee-bill_80ab55b3f31540da-86e2b6ec-8932-4869-b53c-dbd4862491bf on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator) [2025-03-11 19:45:03,985] INFO [GroupCoordinator 2]: Member nihao-newbee-bill_0eff5fbfbf1cf62f-e65cc081-e44e-41af-9fac-262f8a3a0f6a in group nihao-newbee-bill has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) [2025-03-11 19:45:03,992] INFO [GroupCoordinator 2]: Member nihao-newbee-bill_dbde5089929e8b07-b4967d22-c142-45b1-823f-0f09f296c625 in group nihao-newbee-bill has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) {code} > Consumer offsets can be deleted if kafka does not detect a consumer as empty > ---------------------------------------------------------------------------- > > Key: KAFKA-15035 > URL: https://issues.apache.org/jira/browse/KAFKA-15035 > Project: Kafka > Issue Type: Bug > Affects Versions: 2.7.2 > Reporter: Sam Cantero > Priority: Major > > We've recently encountered a scenario where a consumer group got their > committed offsets deleted some minutes (around 3 minutes) after the consumer > got into inactive state (the underlying node went away). > As per > [KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets], > committed offsets for an active (i.e running) consumer group should not be > deleted. However, if a consumer becomes inactive, {+}the deletion of > committed offsets will not occur immediately{+}. Instead, the committed > offsets will only be removed if the consumer remains inactive for at least > the duration specified by > [offset.retention.minutes|https://kafka.apache.org/documentation/#brokerconfigs_offsets.retention.minutes]. > In our case {{offset.retention.minutes}} is set to 7 days and the consumer > was only inactive for 5 minutes, so deletion should have not occurred. > Later on > [KIP-496|https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets] > was introduced to fix the following issue in KIP-211: > {quote}When a consumer subscription changes, we are still left with the > committed offsets of the previous subscription. These will never be cleaned > up as long as the group remains active. We were aware of this problem in > KIP-211, but the solution was not implemented because the coordinator is > presently agnostic to join group metadata and we were unclear about the > compatibility implications of changing that. > {quote} > However this introduced a regression as explained in > https://issues.apache.org/jira/browse/KAFKA-13636. > {quote}The group coordinator might delete invalid offsets during a group > rebalance. During a rebalance, the coordinator is relying on the last commit > timestamp ({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state > modification {_}timestamp (currentStateTimestamp{_}) to detect expired > offsets. > {quote} > It is implied, that Kafka employs two approaches for offset expiration: > * The deletion timer is activated when a consumer group enters the Empty > state (i.e., not running). Once the timer exceeds the > {{offset.retention.minutes}} threshold, the committed offsets are deleted. > * If a consumer is in a "running" state (i.e., not in the Empty state) but > is no longer consuming from topics with committed offsets older than the > offset.retention.minutes duration, the committed offsets are deleted. > However, the Kafka issue KAFKA-13636 specifically states that this situation > could occur during a group rebalance. In my particular scenario, I have > observed that the affected consumer group did not transition into the Empty > state. I'm uncertain whether the rebalance has finished. The reason why Kafka > did not detect this consumer group as Empty remains unclear. > *Logs* > {noformat} > 01:30:47 am - [GroupCoordinator 1]: Member consumer-mycg-1-uuid in group mycg > has failed, removing it from the group > 01:30:47 am - [GroupCoordinator 1]: Preparing to rebalance group mycg in > state PreparingRebalance with old generation 432 (__consumer_offsets-16) > (reason: removing member consumer-mycg-1-uuid on heartbeat expiration) > 1:30:50 am - [GroupCoordinator 1]: Member consumer-mycg-2-uuid in group mycg > has failed, removing it from the group > 01:30:50 am - [GroupCoordinator 1]: Stabilized group mycg generation 433 > (__consumer_offsets-16) > 01:30:50 am - [GroupCoordinator 1]: Assignment received from leader for group > mycg for generation 433{noformat} > This suggests that kafka might have followed the second approach and that's > why kafka deleted the offsets 3 minutes later. > {noformat} > 1:33:17 am - > [GroupMetadataManager brokerId=1] Removed 285 expired offsets in 8 > milliseconds.{noformat} > h4. Was the consumer group in a rebalance state when the offsets were removed? > By looking at the logs and the kafka codebase, we can find the following: > * The [onExpireHeartbeat > callback|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1278] > on the GroupCoordinator is called. The > [removeMemberAndUpdateGroup|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1291] > method is called. > * The removeMemberAndUpdateGroup method will call [maybePrepareRebalance > when the group is > stable|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1158]. > This was the case as the group was not empty (not running), dead (removed) > or PreparingRebalance state from kafka perspective. > * maybePrepareRebalance [just calls > prepareRebalance|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1118] > if the group can rebalance. > * PrepareRebalance will call > [DelayedJoin|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1136] > as the group is not empty. [This > object|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala#L25-L28] > takes care of the join-group operation (first phase of a rebalance). > * When the group is not empty nor dead, it [will > log|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1222] > {{{}Stabilized group ${group.groupId} generation > ${group.generationId}{{}}}}. {*}This does not mean the rebalance phase is > over{*}. It will immediately after wait on the JoinGroupResponse requests > from all members. This is still the first phase of the rebalance. > * Later on we can find a "Assignment received from leader for group" log > line. This log line comes from the group coordinator when handling a sync > group request (2nd phase of the rebalance) in the [CompletingRebalance > state|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L412]. > * By following that codepath, if no errors are logged, then [the assignment > will be propagated and the group transition to Stable > state|https://github.com/apache/kafka/blob/2.7.2/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L432-L433]. > This seems to indicate that we were not in a rebalance state and [the > fix|https://github.com/apache/kafka/pull/11742/files] for KAFKA-13636 won't > fix the issue seen here. The main question remains as to why Kafka did not > transition the consumer group to an empty state. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)