Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1165606409


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala:
##########
@@ -824,14 +823,14 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
     expiredOffsets
   }
 
-  def allOffsets: Map[TopicPartition, OffsetAndMetadata] = offsets.map { case 
(topicPartition, commitRecordMetadataAndOffset) =>
-    (topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata)
+  def allOffsets: Map[TopicIdPartition, OffsetAndMetadata] = offsets.map { 
case (topicPartition, commitRecordMetadataAndOffset) =>
+    (new TopicIdPartition(Uuid.ZERO_UUID, topicPartition), 
commitRecordMetadataAndOffset.offsetAndMetadata)

Review Comment:
   Hi Justine (@jolshan), I pushed a commit with the change to resolve the 
topic id in the group metadata manager when the offsets of all topic-partitions 
known by the coordinator are requested. Apologies for the delay.
   
   I don't really like the code I wrote since it introduces one level of 
indirection on the valid path and one more failure path when a topic id cannot 
be resolved. And the implementation proposed is not very elegant. But it 
follows up on the discussion above. 
   
   When the new schemas for metadata and offset records are used, the topic id 
will be taken from the record tag. However for all existing records, there will 
still be a need for reconciliation. Perhaps this reconciliation should be done 
when the offsets are loaded by the coordinator once the metadata data structure 
become topic-id-aware.
   
   I will add a few more unit tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to