[ https://issues.apache.org/jira/browse/KAFKA-16371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
David Jacot resolved KAFKA-16371. --------------------------------- Fix Version/s: 3.8.0 3.7.1 Assignee: David Jacot Resolution: Fixed > Unstable committed offsets after triggering commits where metadata for some > partitions are over the limit > --------------------------------------------------------------------------------------------------------- > > Key: KAFKA-16371 > URL: https://issues.apache.org/jira/browse/KAFKA-16371 > Project: Kafka > Issue Type: Bug > Components: offset manager > Affects Versions: 3.7.0 > Reporter: mlowicki > Assignee: David Jacot > Priority: Major > Fix For: 3.8.0, 3.7.1 > > > Issue is reproducible with simple CLI tool - > [https://gist.github.com/mlowicki/c3b942f5545faced93dc414e01a2da70] > {code:java} > #!/usr/bin/env bash > for i in {1..100} > do > kafka-committer --bootstrap "ADDR:9092" --topic "TOPIC" --group foo > --metadata-min 6000 --metadata-max 10000 --partitions 72 --fetch > done{code} > What it does it that initially it fetches committed offsets and then tries to > commit for multiple partitions. If some of commits have metadata over the > allowed limit then: > 1. I see errors about too large commits (expected) > 2. Another run the tool fails at the stage of fetching commits with (this is > the problem): > {code:java} > config: ClientConfig { conf_map: { "group.id": "bar", "bootstrap.servers": > "ADDR:9092", }, log_level: Error, } > fetching committed offsets.. > Error: Meta data fetch error: OperationTimedOut (Local: Timed out) Caused by: > OperationTimedOut (Local: Timed out){code} > On the Kafka side I see _unstable_offset_commits_ errors reported by out > internal metric which is derived from: > {noformat} > > kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=X,error=Y{noformat} > Increasing the timeout doesn't help and the only solution I've found is to > trigger commits for all partitions with metadata below the limit or to use: > {code:java} > isolation.level=read_uncommitted{code} > > I don't know that code very well but > [https://github.com/apache/kafka/blob/3.7/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L492-L496] > seems fishy: > {code:java} > if (isTxnOffsetCommit) { > addProducerGroup(producerId, group.groupId) > group.prepareTxnOffsetCommit(producerId, offsetMetadata) > } else { > group.prepareOffsetCommit(offsetMetadata) > }{code} > as it's using _offsetMetadata_ and not _filteredOffsetMetadata_ and I see > that while removing those pending commits we use filtered offset metadata > around > [https://github.com/apache/kafka/blob/3.7/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L397-L422] > > {code:java} > val responseError = group.inLock { > if (status.error == Errors.NONE) { > if (!group.is(Dead)) { > filteredOffsetMetadata.forKeyValue { (topicIdPartition, > offsetAndMetadata) => > if (isTxnOffsetCommit) > group.onTxnOffsetCommitAppend(producerId, topicIdPartition, > CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata)) > else > group.onOffsetCommitAppend(topicIdPartition, > CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata)) > } > } > // Record the number of offsets committed to the log > offsetCommitsSensor.record(records.size) > Errors.NONE > } else { > if (!group.is(Dead)) { > if (!group.hasPendingOffsetCommitsFromProducer(producerId)) > removeProducerGroup(producerId, group.groupId) > filteredOffsetMetadata.forKeyValue { (topicIdPartition, > offsetAndMetadata) => > if (isTxnOffsetCommit) > group.failPendingTxnOffsetCommit(producerId, topicIdPartition) > else > group.failPendingOffsetWrite(topicIdPartition, > offsetAndMetadata) > } > } > {code} > so the problem might be related to not cleaning up the data structure with > pending commits properly. -- This message was sent by Atlassian Jira (v8.20.10#820010)