Hangleton commented on code in PR #13493: URL: https://github.com/apache/kafka/pull/13493#discussion_r1165606409
########## core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala: ########## @@ -824,14 +823,14 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState expiredOffsets } - def allOffsets: Map[TopicPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) => - (topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata) + def allOffsets: Map[TopicIdPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) => + (new TopicIdPartition(Uuid.ZERO_UUID, topicPartition), commitRecordMetadataAndOffset.offsetAndMetadata) Review Comment: Hi Justine (@jolshan), I pushed a commit with the change to resolve the topic id in the group metadata manager when the offsets of all topic-partitions known by the coordinator are requested. I don't really like the code I wrote since it introduces one level of indirection on the valid path and one more failure path when a topic id cannot be resolved. And the implementation proposed is not very elegant. But it follows up on the discussion above. When the new schemas for metadata and offset records are used, the topic id will be taken from the record tag. However for all existing records, there will still be a need for reconciliation. Perhaps this reconciliation should be done when the offsets are loaded by the coordinator once the metadata data structure become topic-id-aware. I will add a few more unit tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org