AndrewJSchofield commented on code in PR #19026:
URL: https://github.com/apache/kafka/pull/19026#discussion_r1993364145


##########
core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala:
##########
@@ -261,7 +273,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 
       val topicPartitionsAssignedToMember2 = 
shareGroupHeartbeatResponse.data.assignment.topicPartitions()
       // Verify the response.
-      assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
+      assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)

Review Comment:
   I think I expect this epoch to be 3 still. Because the first member has 
successfully received the assignment, I would have thought that a single 
heartbeat of the second member would also be able to assign the partition.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java:
##########
@@ -74,6 +83,30 @@ public Set<String> racksForPartition(Uuid topicId, int 
partition) {
         return Set.of();
     }
 
+    /**
+     * Returns a set of assignable partitions from the topic metadata.
+     * If the allowed partition map is null, all the partitions in the 
corresponding
+     * topic metadata are returned for the argument topic id. If allowed map 
is empty,
+     * empty set is returned.
+     *
+     * @param topicId The uuid of the topic
+     * @return Set of integers if assignable partitions available, empty 
otherwise.
+     * @throws UnknownTopicIdException if the topicId is not found in the 
metadata.

Review Comment:
   I do not think you should throw `UnknownTopicIdException` here. The 
partition assignor will only throw `PartitionAssignorException` so this is 
likely a bit inconvenient. I would return `null` for a missing topic ID, an 
empty set if the topic ID is known but there are no assignable partitions, and 
the set of partition indices for the assignable partitions.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java:
##########
@@ -74,6 +83,30 @@ public Set<String> racksForPartition(Uuid topicId, int 
partition) {
         return Set.of();
     }
 
+    /**
+     * Returns a set of assignable partitions from the topic metadata.
+     * If the allowed partition map is null, all the partitions in the 
corresponding
+     * topic metadata are returned for the argument topic id. If allowed map 
is empty,
+     * empty set is returned.
+     *
+     * @param topicId The uuid of the topic
+     * @return Set of integers if assignable partitions available, empty 
otherwise.
+     * @throws UnknownTopicIdException if the topicId is not found in the 
metadata.
+     */
+    @Override
+    public Set<Integer> assignablePartitions(Uuid topicId) throws 
UnknownTopicIdException {
+        TopicMetadata topic = this.topicMetadata.get(topicId);
+        if (topic == null) {
+            throw new UnknownTopicIdException(topicId.toString());
+        }
+
+        if (topicPartitionAllowedMap == null) {
+            return IntStream.range(0, 
topic.numPartitions()).boxed().collect(Collectors.toSet());

Review Comment:
   There is also `toUnmodifiableSet` which is probably better.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java:
##########
@@ -812,6 +814,44 @@ public static CoordinatorRecord 
newShareGroupCurrentAssignmentTombstoneRecord(
         );
     }
 
+    /**
+     * Creates a ShareGroupStatePartitionMetadata record.
+     *
+     * @param groupId   The share group id.
+     * @param initializedTopics  Topics which have been initialized.
+     * @param deletingTopics  Topics which are being deleted.
+     * @return The record.
+     */
+    public static CoordinatorRecord newShareGroupStatePartitionMetadataRecord(
+        String groupId,
+        Map<Uuid, Map.Entry<String, Set<Integer>>> initializedTopics,
+        Map<Uuid, String> deletingTopics
+    ) {
+        List<ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo> 
initializedTopicPartitionInfo = initializedTopics.entrySet().stream()
+            .map(entry -> new 
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+                .setTopicId(entry.getKey())
+                .setTopicName(entry.getValue().getKey())
+                .setPartitions(entry.getValue().getValue().stream().toList()))
+            .toList();
+
+        List<ShareGroupStatePartitionMetadataValue.TopicInfo> 
deletingTopicsInfo = deletingTopics.entrySet().stream()
+            .map(entry -> new ShareGroupStatePartitionMetadataValue.TopicInfo()
+                .setTopicId(entry.getKey())
+                .setTopicName(entry.getValue()))
+            .toList();
+
+        return CoordinatorRecord.record(
+            new ShareGroupStatePartitionMetadataKey()
+                .setGroupId(groupId),
+            new ApiMessageAndVersion(
+                new ShareGroupStatePartitionMetadataValue()
+                    .setInitializedTopics(initializedTopicPartitionInfo)
+                    .setDeletingTopics(deletingTopicsInfo),
+                (short) 0
+            )
+        );
+    }
+

Review Comment:
   I think you're going to need the tombstone record too since deleting a group 
will leave this new record hanging without it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to