[ 
https://issues.apache.org/jira/browse/KAFKA-15035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Cantero updated KAFKA-15035:
--------------------------------
    Description: 
We've recently encountered a scenario where a consumer group got their 
committed offsets deleted some minutes (around 3 minutes)  after the consumer 
got into inactive state (the underlying node went away).

As per 
[KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets],
 committed offsets for an active (i.e running) consumer group should not be 
deleted. However, if a consumer becomes inactive, {+}the deletion of committed 
offsets will not occur immediately{+}. Instead, the committed offsets will only 
be removed if the consumer remains inactive for at least the duration specified 
by 
[offset.retention.minutes|https://kafka.apache.org/documentation/#brokerconfigs_offsets.retention.minutes].

In our case {{offset.retention.minutes}} is set to 7 days and the consumer was 
only inactive for 5 minutes, so deletion should have not occurred.

Later on 
[KIP-496|https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets]
 was introduced to fix the following issue in KIP-211:
{quote}When a consumer subscription changes, we are still left with the 
committed offsets of the previous subscription. These will never be cleaned up 
as long as the group remains active. We were aware of this problem in KIP-211, 
but the solution was not implemented because the coordinator is presently 
agnostic to join group metadata and we were unclear about the compatibility 
implications of changing that.
{quote}
However this introduced a regression as explained in 
https://issues.apache.org/jira/browse/KAFKA-13636.
{quote}The group coordinator might delete invalid offsets during a group 
rebalance. During a rebalance, the coordinator is relying on the last commit 
timestamp ({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state 
modification {_}timestamp (currentStateTimestamp{_}) to detect expired offsets.
{quote}
It is implied, that Kafka employs two approaches for offset expiration:
 * The deletion timer is activated when a consumer group enters the Empty state 
(i.e., not running). Once the timer exceeds the {{offset.retention.minutes}} 
threshold, the committed offsets are deleted.
 * If a consumer is in a "running" state (i.e., not in the Empty state) but is 
no longer consuming from topics with committed offsets older than the 
offset.retention.minutes duration, the committed offsets are deleted.

However, the Kafka issue KAFKA-13636 specifically states that this situation 
could occur during a group rebalance. In my particular scenario, I have 
observed that the affected consumer group did not transition into the Empty 
state, and I have encountered a "Stabilized" log line. I'm uncertain whether 
this log line indicates that the rebalance had concluded at that point or not. 
The reason why Kafka did not detect this consumer group as Empty remains 
unclear.

*Logs*
{noformat}
01:30:47 am - [GroupCoordinator 1]: Member consumer-mycg-1-uuid in group mycg 
has failed, removing it from the group

01:30:47 am - [GroupCoordinator 1]: Preparing to rebalance group mycg in state 
PreparingRebalance with old generation 432 (__consumer_offsets-16) (reason: 
removing member consumer-mycg-1-uuid on heartbeat expiration)

1:30:50 am - [GroupCoordinator 1]: Member consumer-mycg-2-uuid in group mycg 
has failed, removing it from the group

01:30:50 am - [GroupCoordinator 1]: Stabilized group mycg generation 433 
(__consumer_offsets-16)

01:30:50 am - [GroupCoordinator 1]: Assignment received from leader for group 
mycg for generation 433{noformat}
This suggests that kafka might have followed the second approach and that's why 
kafka deleted the offsets 3 minutes later.
{noformat}
1:33:17 am - 
[GroupMetadataManager brokerId=1] Removed 285 expired offsets in 8 
milliseconds.{noformat}
As a reference a regular consumer join/startup logs looks like this. The group 
is stabilised and the assignment from the leader received.
{noformat}
 [GroupCoordinator 0]: Preparing to rebalance group mycg in state 
PreparingRebalance with old generation 6 (__consumer_offsets-22) (reason: 
Adding new member consumer-mycg-1-2b8ba689-fbaa-4829-82f5-dd2ed1d89d86 with 
group instance id None) (kafka.coordinator.group.GroupCoordinator)
 
[GroupCoordinator 0]: Stabilized group mycg generation 7 
(__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator)

[GroupCoordinator 0]: Assignment received from leader for group mycg for 
generation 7 (kafka.coordinator.group.GroupCoordinator){noformat}
As a reference a regular consumer leave/shutdown logs looks like this. NOTE how 
the consumer group moves into empty state.
{noformat}
[GroupCoordinator 0]: Member[group.instance.id None, member.id 
consumer-mycg-1-eb77a142-5b64-476e-bc3d-2731c9b811a7] in group mycg has left, 
removing it from the group (kafka.coordinator.group.GroupCoordinator)

[GroupCoordinator 0]: Preparing to rebalance group mycg in state 
PreparingRebalance with old generation 8 (__consumer_offsets-22) (reason: 
removing member consumer-mycg-1-eb77a142-5b64-476e-bc3d-2731c9b811a7 on 
LeaveGroup) (kafka.coordinator.group.GroupCoordinator)

[GroupCoordinator 0]: Group mycg with generation 9 is now empty 
(__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator){noformat}
 
 

  was:
We've recently encountered a scenario where a consumer group got their 
committed offsets deleted some minutes (around 3 minutes)  after the consumer 
got into inactive state (the underlying node went away).

As per 
[KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets],
 committed offsets for an active (i.e running) consumer group should not be 
deleted. However, if a consumer becomes inactive, {+}the deletion of committed 
offsets will not occur immediately{+}. Instead, the committed offsets will only 
be removed if the consumer remains inactive for at least the duration specified 
by 
[offset.retention.minutes|https://kafka.apache.org/documentation/#brokerconfigs_offsets.retention.minutes].

In our case {{offset.retention.minutes}} is set to 7 days and the consumer was 
only inactive for 5 minutes, so deletion should have not occurred.

Later on KIP-496 was introduced to fix the following issue in KIP-211:
{quote}When a consumer subscription changes, we are still left with the 
committed offsets of the previous subscription. These will never be cleaned up 
as long as the group remains active. We were aware of this problem in KIP-211, 
but the solution was not implemented because the coordinator is presently 
agnostic to join group metadata and we were unclear about the compatibility 
implications of changing that.
{quote}
However this introduced a regression as explained in 
https://issues.apache.org/jira/browse/KAFKA-13636.
{quote}The group coordinator might delete invalid offsets during a group 
rebalance. During a rebalance, the coordinator is relying on the last commit 
timestamp ({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state 
modification {_}timestamp (currentStateTimestamp{_}) to detect expired offsets.
{quote}
It is implied, that Kafka employs two approaches for offset expiration:
 * The deletion timer is activated when a consumer group enters the Empty state 
(i.e., not running). Once the timer exceeds the {{offset.retention.minutes}} 
threshold, the committed offsets are deleted.
 * If a consumer is in a "running" state (i.e., not in the Empty state) but is 
no longer consuming from topics with committed offsets older than the 
offset.retention.minutes duration, the committed offsets are deleted.

However, the Kafka issue KAFKA-13636 specifically states that this situation 
could occur during a group rebalance. In my particular scenario, I have 
observed that the affected consumer group did not transition into the Empty 
state, and I have encountered a "Stabilized" log line. I'm uncertain whether 
this log line indicates that the rebalance had concluded at that point or not. 
The reason why Kafka did not detect this consumer group as Empty remains 
unclear.

*Logs*
{noformat}
01:30:47 am - [GroupCoordinator 1]: Member consumer-mycg-1-uuid in group mycg 
has failed, removing it from the group

01:30:47 am - [GroupCoordinator 1]: Preparing to rebalance group mycg in state 
PreparingRebalance with old generation 432 (__consumer_offsets-16) (reason: 
removing member consumer-mycg-1-uuid on heartbeat expiration)

1:30:50 am - [GroupCoordinator 1]: Member consumer-mycg-2-uuid in group mycg 
has failed, removing it from the group

01:30:50 am - [GroupCoordinator 1]: Stabilized group mycg generation 433 
(__consumer_offsets-16)

01:30:50 am - [GroupCoordinator 1]: Assignment received from leader for group 
mycg for generation 433{noformat}
This suggests that kafka might have followed the second approach and that's why 
kafka deleted the offsets 3 minutes later.
{noformat}
1:33:17 am - 
[GroupMetadataManager brokerId=1] Removed 285 expired offsets in 8 
milliseconds.{noformat}
As a reference a regular consumer join/startup logs looks like this. The group 
is stabilised and the assignment from the leader received.
{noformat}
 [GroupCoordinator 0]: Preparing to rebalance group mycg in state 
PreparingRebalance with old generation 6 (__consumer_offsets-22) (reason: 
Adding new member consumer-mycg-1-2b8ba689-fbaa-4829-82f5-dd2ed1d89d86 with 
group instance id None) (kafka.coordinator.group.GroupCoordinator)
 
[GroupCoordinator 0]: Stabilized group mycg generation 7 
(__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator)

[GroupCoordinator 0]: Assignment received from leader for group mycg for 
generation 7 (kafka.coordinator.group.GroupCoordinator){noformat}
As a reference a regular consumer leave/shutdown logs looks like this. NOTE how 
the consumer group moves into empty state.
{noformat}
[GroupCoordinator 0]: Member[group.instance.id None, member.id 
consumer-mycg-1-eb77a142-5b64-476e-bc3d-2731c9b811a7] in group mycg has left, 
removing it from the group (kafka.coordinator.group.GroupCoordinator)

[GroupCoordinator 0]: Preparing to rebalance group mycg in state 
PreparingRebalance with old generation 8 (__consumer_offsets-22) (reason: 
removing member consumer-mycg-1-eb77a142-5b64-476e-bc3d-2731c9b811a7 on 
LeaveGroup) (kafka.coordinator.group.GroupCoordinator)

[GroupCoordinator 0]: Group mycg with generation 9 is now empty 
(__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator){noformat}
 
 


> Consumer offsets can be deleted if kafka does not detect a consumer as empty
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-15035
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15035
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.7.2
>            Reporter: Sam Cantero
>            Priority: Major
>
> We've recently encountered a scenario where a consumer group got their 
> committed offsets deleted some minutes (around 3 minutes)  after the consumer 
> got into inactive state (the underlying node went away).
> As per 
> [KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets],
>  committed offsets for an active (i.e running) consumer group should not be 
> deleted. However, if a consumer becomes inactive, {+}the deletion of 
> committed offsets will not occur immediately{+}. Instead, the committed 
> offsets will only be removed if the consumer remains inactive for at least 
> the duration specified by 
> [offset.retention.minutes|https://kafka.apache.org/documentation/#brokerconfigs_offsets.retention.minutes].
> In our case {{offset.retention.minutes}} is set to 7 days and the consumer 
> was only inactive for 5 minutes, so deletion should have not occurred.
> Later on 
> [KIP-496|https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets]
>  was introduced to fix the following issue in KIP-211:
> {quote}When a consumer subscription changes, we are still left with the 
> committed offsets of the previous subscription. These will never be cleaned 
> up as long as the group remains active. We were aware of this problem in 
> KIP-211, but the solution was not implemented because the coordinator is 
> presently agnostic to join group metadata and we were unclear about the 
> compatibility implications of changing that.
> {quote}
> However this introduced a regression as explained in 
> https://issues.apache.org/jira/browse/KAFKA-13636.
> {quote}The group coordinator might delete invalid offsets during a group 
> rebalance. During a rebalance, the coordinator is relying on the last commit 
> timestamp ({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state 
> modification {_}timestamp (currentStateTimestamp{_}) to detect expired 
> offsets.
> {quote}
> It is implied, that Kafka employs two approaches for offset expiration:
>  * The deletion timer is activated when a consumer group enters the Empty 
> state (i.e., not running). Once the timer exceeds the 
> {{offset.retention.minutes}} threshold, the committed offsets are deleted.
>  * If a consumer is in a "running" state (i.e., not in the Empty state) but 
> is no longer consuming from topics with committed offsets older than the 
> offset.retention.minutes duration, the committed offsets are deleted.
> However, the Kafka issue KAFKA-13636 specifically states that this situation 
> could occur during a group rebalance. In my particular scenario, I have 
> observed that the affected consumer group did not transition into the Empty 
> state, and I have encountered a "Stabilized" log line. I'm uncertain whether 
> this log line indicates that the rebalance had concluded at that point or 
> not. The reason why Kafka did not detect this consumer group as Empty remains 
> unclear.
> *Logs*
> {noformat}
> 01:30:47 am - [GroupCoordinator 1]: Member consumer-mycg-1-uuid in group mycg 
> has failed, removing it from the group
> 01:30:47 am - [GroupCoordinator 1]: Preparing to rebalance group mycg in 
> state PreparingRebalance with old generation 432 (__consumer_offsets-16) 
> (reason: removing member consumer-mycg-1-uuid on heartbeat expiration)
> 1:30:50 am - [GroupCoordinator 1]: Member consumer-mycg-2-uuid in group mycg 
> has failed, removing it from the group
> 01:30:50 am - [GroupCoordinator 1]: Stabilized group mycg generation 433 
> (__consumer_offsets-16)
> 01:30:50 am - [GroupCoordinator 1]: Assignment received from leader for group 
> mycg for generation 433{noformat}
> This suggests that kafka might have followed the second approach and that's 
> why kafka deleted the offsets 3 minutes later.
> {noformat}
> 1:33:17 am - 
> [GroupMetadataManager brokerId=1] Removed 285 expired offsets in 8 
> milliseconds.{noformat}
> As a reference a regular consumer join/startup logs looks like this. The 
> group is stabilised and the assignment from the leader received.
> {noformat}
>  [GroupCoordinator 0]: Preparing to rebalance group mycg in state 
> PreparingRebalance with old generation 6 (__consumer_offsets-22) (reason: 
> Adding new member consumer-mycg-1-2b8ba689-fbaa-4829-82f5-dd2ed1d89d86 with 
> group instance id None) (kafka.coordinator.group.GroupCoordinator)
>  
> [GroupCoordinator 0]: Stabilized group mycg generation 7 
> (__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator)
> [GroupCoordinator 0]: Assignment received from leader for group mycg for 
> generation 7 (kafka.coordinator.group.GroupCoordinator){noformat}
> As a reference a regular consumer leave/shutdown logs looks like this. NOTE 
> how the consumer group moves into empty state.
> {noformat}
> [GroupCoordinator 0]: Member[group.instance.id None, member.id 
> consumer-mycg-1-eb77a142-5b64-476e-bc3d-2731c9b811a7] in group mycg has left, 
> removing it from the group (kafka.coordinator.group.GroupCoordinator)
> [GroupCoordinator 0]: Preparing to rebalance group mycg in state 
> PreparingRebalance with old generation 8 (__consumer_offsets-22) (reason: 
> removing member consumer-mycg-1-eb77a142-5b64-476e-bc3d-2731c9b811a7 on 
> LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
> [GroupCoordinator 0]: Group mycg with generation 9 is now empty 
> (__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator){noformat}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to