Hangleton commented on code in PR #13493: URL: https://github.com/apache/kafka/pull/13493#discussion_r1171099569
########## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ########## @@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int, * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either * returns the current offset or it begins to sync the cache from the log (and returns an error code). */ - def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = { - trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId)) + def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = { + trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId)) val group = groupMetadataCache.get(groupId) if (group == null) { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) - topicPartition -> partitionData + topicIdPartition -> partitionData }.toMap } else { group.inLock { if (group.is(Dead)) { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) - topicPartition -> partitionData + topicIdPartition -> partitionData }.toMap } else { - val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet) - - topicPartitions.map { topicPartition => - if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) { - topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET, + def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = { + if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) { + new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT) } else { - val partitionData = group.offset(topicPartition) match { + group.offset(topicIdPartition) match { case None => new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) case Some(offsetAndMetadata) => new PartitionData(offsetAndMetadata.offset, offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE) } - topicPartition -> partitionData } - }.toMap + } + + topicIdPartitionsOpt match { + case Some(topicIdPartitions) => + topicIdPartitions.map { topicIdPartition => + topicIdPartition -> resolvePartitionData(topicIdPartition) + }.toMap + + case None => + val topicIds = replicaManager.metadataCache.topicNamesToIds() + group.allOffsets.keySet.map { topicPartition => + Option(topicIds.get(topicPartition.topic())) match { + case Some(topicId) => + val topicIdPartition = new TopicIdPartition(topicId, topicPartition) + topicIdPartition -> resolvePartitionData(topicIdPartition) + case None => + val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition) + zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION + } + }.toMap Review Comment: Hi David, thanks for taking the time to lay out these thoughts. I agree with you that using the zero id does not feel right. It can be considered as an abusive use of the `TopicIdPartition` as an invalid reference to a resource in the cluster. In the worst case, an invalid `TopicIdPartition` could end up being used somewhere else which treats all `TopicIdPartition` as if they were valid/resolved. So, there is a clear code smell here. In any case, the caller of the method `getOffsets` should know that some of the `TopicIdPartition` are invalid (that is, references the zero id). This defeats the purpose of returning a map of offsets keyed by topic-id-partition. So, I agree with you that using topic-id-partition should not be exposed in the value returned by `getOffsets` even though there will be an asymmetry in the method prototype as a result. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org