Tomasz Kaszuba created KAFKA-12761:
--------------------------------------

             Summary: Consumer offsets are deleted 7 days after last offset 
instead of EMPTY status
                 Key: KAFKA-12761
                 URL: https://issues.apache.org/jira/browse/KAFKA-12761
             Project: Kafka
          Issue Type: Bug
          Components: core, streams
    Affects Versions: 2.7.0, 2.5.0
            Reporter: Tomasz Kaszuba


If I understand correctly the following 
[KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets]
 consumer offsets should only be cleared based on having an Empty status: 

{{Empty}}: The field {{current_state_timestamp}} is set to when group last 
transitioned to this state. If the group stays in this for 
{{offsets.retention.minutes}}, the following offset cleanup scheduled task will 
remove all offsets in the group (as explained above).

After a week of not consuming any new messages BUT still connected to the 
consumer group I had the consumer offsets deleted on restart of the k8s pod.
{noformat}
2021-05-06 10:10:04.684  INFO 1 --- [ncurred-pattern] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=ieb-x07-baseline-pc-data-storage-incurred-pattern-86c84635-4c96-4941-b440-5ecd4584d3fd-StreamThread-1-consumer,
 groupId=ieb-x07-baseline-pc-data-storage-incurred-pattern] Found no committed 
offset for partition ieb.publish.baseline_pc.incurred_pattern-0 {noformat}
I looked at what is happening in the the system topic __consumer_offsets and I 
see the following:
{noformat}
17138150 2021-04-27 07:14:50 
[ieb-x07-baseline-pc-data-storage-due-pattern,ieb.publish.baseline_pc.due_pattern,0]::OffsetAndMetadata(offset=646,
 leaderEpoch=Optional.empty, metadata=AQAAAXkOMJr2, 
commitTimestamp=1619500490253, expireTimestamp=None)

53670252 2021-05-03 17:44:11 
ieb-x07-baseline-pc-data-storage-due-pattern::GroupMetadata(groupId=ieb-x07-baseline-pc-data-storage-due-pattern,
 generation=13, protocolType=Some(consumer), currentState=Stable, 
members=Map(ieb-x07-baseline-pc-data-storage-due-pattern-a8f21ea3-3bc0-4dc9-b82c-6b88f9c74008-StreamThread-1-consumer-50603fe4-10f7-432e-b306-115329e82b38
 -> 
MemberMetadata(memberId=ieb-x07-baseline-pc-data-storage-due-pattern-a8f21ea3-3bc0-4dc9-b82c-6b88f9c74008-StreamThread-1-consumer-50603fe4-10f7-432e-b306-115329e82b38,
 groupInstanceId=Some(null), 
clientId=ieb-x07-baseline-pc-data-storage-due-pattern-a8f21ea3-3bc0-4dc9-b82c-6b88f9c74008-StreamThread-1-consumer,
 clientHost=/172.23.194.239, sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, 
supportedProtocols=List(stream), )))

65226775 2021-05-06 11:56:13 
ieb-x07-baseline-pc-data-storage-due-pattern::GroupMetadata(groupId=ieb-x07-baseline-pc-data-storage-due-pattern,
 generation=14, protocolType=Some(consumer), currentState=Empty, members=Map())

65226793 2021-05-06 12:10:00 
[ieb-x07-baseline-pc-data-storage-due-pattern,ieb.publish.baseline_pc.due_pattern,0]::NULL

65226795 2021-05-06 12:10:03 
ieb-x07-baseline-pc-data-storage-due-pattern::GroupMetadata(groupId=ieb-x07-baseline-pc-data-storage-due-pattern,
 generation=15, protocolType=Some(consumer), currentState=Stable, 
members=Map(ieb-x07-baseline-pc-data-storage-due-pattern-0fb01327-b21e-4be7-851a-9985e381f8b8-StreamThread-1-consumer-efb312c3-9c24-4088-a5e0-563a3d52c944
 -> 
MemberMetadata(memberId=ieb-x07-baseline-pc-data-storage-due-pattern-0fb01327-b21e-4be7-851a-9985e381f8b8-StreamThread-1-consumer-efb312c3-9c24-4088-a5e0-563a3d52c944,
 groupInstanceId=Some(null), 
clientId=ieb-x07-baseline-pc-data-storage-due-pattern-0fb01327-b21e-4be7-851a-9985e381f8b8-StreamThread-1-consumer,
 clientHost=/172.23.193.184, sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, 
supportedProtocols=List(stream), )))

65226809 2021-05-06 12:10:09 
[ieb-x07-baseline-pc-data-storage-due-pattern,ieb.publish.baseline_pc.due_pattern,0]::OffsetAndMetadata(offset=2,
 leaderEpoch=Optional.empty, metadata=AQAAAXlBJ/Sy, 
commitTimestamp=1620295809338, expireTimestamp=None)
 {noformat}
As you can see the last commited offset was on the 27th of April but the group 
still had status "Stable" on the 3rd of May. It transitioned to "Empty" on the 
6th of May when the pod was restarted. Following this you can see the tombstone 
message set to delete the offsets which corresponds to the streams logs. 
(UTC+2).

For me it looks like the cleanup only took the last commit timestamp into 
consideration and not the Stable status. Am I misunderstanding how this should 
work?

The client is a kafka streams client using version 2.5.0 with EOS turned on and 
the broker is 2.7.0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to