AndrewJSchofield commented on code in PR #19026: URL: https://github.com/apache/kafka/pull/19026#discussion_r1993364145
########## core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala: ########## @@ -261,7 +273,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { val topicPartitionsAssignedToMember2 = shareGroupHeartbeatResponse.data.assignment.topicPartitions() // Verify the response. - assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch) Review Comment: I think I expect this epoch to be 3 still. Because the first member has successfully received the assignment, I would have thought that a single heartbeat of the second member would also be able to assign the partition. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java: ########## @@ -74,6 +83,30 @@ public Set<String> racksForPartition(Uuid topicId, int partition) { return Set.of(); } + /** + * Returns a set of assignable partitions from the topic metadata. + * If the allowed partition map is null, all the partitions in the corresponding + * topic metadata are returned for the argument topic id. If allowed map is empty, + * empty set is returned. + * + * @param topicId The uuid of the topic + * @return Set of integers if assignable partitions available, empty otherwise. + * @throws UnknownTopicIdException if the topicId is not found in the metadata. Review Comment: I do not think you should throw `UnknownTopicIdException` here. The partition assignor will only throw `PartitionAssignorException` so this is likely a bit inconvenient. I would return `null` for a missing topic ID, an empty set if the topic ID is known but there are no assignable partitions, and the set of partition indices for the assignable partitions. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java: ########## @@ -74,6 +83,30 @@ public Set<String> racksForPartition(Uuid topicId, int partition) { return Set.of(); } + /** + * Returns a set of assignable partitions from the topic metadata. + * If the allowed partition map is null, all the partitions in the corresponding + * topic metadata are returned for the argument topic id. If allowed map is empty, + * empty set is returned. + * + * @param topicId The uuid of the topic + * @return Set of integers if assignable partitions available, empty otherwise. + * @throws UnknownTopicIdException if the topicId is not found in the metadata. + */ + @Override + public Set<Integer> assignablePartitions(Uuid topicId) throws UnknownTopicIdException { + TopicMetadata topic = this.topicMetadata.get(topicId); + if (topic == null) { + throw new UnknownTopicIdException(topicId.toString()); + } + + if (topicPartitionAllowedMap == null) { + return IntStream.range(0, topic.numPartitions()).boxed().collect(Collectors.toSet()); Review Comment: There is also `toUnmodifiableSet` which is probably better. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java: ########## @@ -812,6 +814,44 @@ public static CoordinatorRecord newShareGroupCurrentAssignmentTombstoneRecord( ); } + /** + * Creates a ShareGroupStatePartitionMetadata record. + * + * @param groupId The share group id. + * @param initializedTopics Topics which have been initialized. + * @param deletingTopics Topics which are being deleted. + * @return The record. + */ + public static CoordinatorRecord newShareGroupStatePartitionMetadataRecord( + String groupId, + Map<Uuid, Map.Entry<String, Set<Integer>>> initializedTopics, + Map<Uuid, String> deletingTopics + ) { + List<ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo> initializedTopicPartitionInfo = initializedTopics.entrySet().stream() + .map(entry -> new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicId(entry.getKey()) + .setTopicName(entry.getValue().getKey()) + .setPartitions(entry.getValue().getValue().stream().toList())) + .toList(); + + List<ShareGroupStatePartitionMetadataValue.TopicInfo> deletingTopicsInfo = deletingTopics.entrySet().stream() + .map(entry -> new ShareGroupStatePartitionMetadataValue.TopicInfo() + .setTopicId(entry.getKey()) + .setTopicName(entry.getValue())) + .toList(); + + return CoordinatorRecord.record( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new ShareGroupStatePartitionMetadataValue() + .setInitializedTopics(initializedTopicPartitionInfo) + .setDeletingTopics(deletingTopicsInfo), + (short) 0 + ) + ); + } + Review Comment: I think you're going to need the tombstone record too since deleting a group will leave this new record hanging without it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org