Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1165606409


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala:
##########
@@ -824,14 +823,14 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
     expiredOffsets
   }
 
-  def allOffsets: Map[TopicPartition, OffsetAndMetadata] = offsets.map { case 
(topicPartition, commitRecordMetadataAndOffset) =>
-    (topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata)
+  def allOffsets: Map[TopicIdPartition, OffsetAndMetadata] = offsets.map { 
case (topicPartition, commitRecordMetadataAndOffset) =>
+    (new TopicIdPartition(Uuid.ZERO_UUID, topicPartition), 
commitRecordMetadataAndOffset.offsetAndMetadata)

Review Comment:
   Hi Justine (@jolshan), I pushed a commit with the change to resolve the 
topic id in the group metadata manager when the offsets of all topic-partitions 
known by the coordinator are requested. I don't really like the code I wrote 
since it introduces one level of indirection on the valid path and one more 
failure path when a topic id cannot be resolved. And the implementation 
proposed is not very elegant. But it follows up on the discussion above. 
   
   When the new schemas for metadata and offset records are used, the topic id 
will be taken from the record tag. However for all existing records, there will 
still be a need for reconciliation. Perhaps this reconciliation should be done 
when the offsets are loaded by the coordinator once the metadata data structure 
become topic-id-aware.
   
   I will add a few more unit tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to