AndrewJSchofield commented on code in PR #19838: URL: https://github.com/apache/kafka/pull/19838#discussion_r2111347423
########## group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/GroupSpec.java: ########## @@ -40,6 +40,18 @@ public interface GroupSpec { */ boolean isPartitionAssigned(Uuid topicId, int partitionId); + /** + * For share groups, a partition cannot be directly assigned to a + * share consumer. It must first be initialized in the share coordinator + * and on successful persistance, it can be assigned. For non-share groups, + * this will always return true. + * + * @param topicId The topic uuid. Review Comment: I would copy the `@param` text exactly from `SubscribedTopicDescriber`. So that would be: ``` * @param topicId Uuid corresponding to the partition's topic. * @param partition Partition Id within topic. ``` If you could also copy this in to the comment block for `isPartitionAssigned`, it would be even better. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java: ########## @@ -373,13 +373,8 @@ private List<TopicIdPartition> computeTargetPartitions( ); } - // Since we are returning a list here, we can keep it sorted - // to add determinism while testing and iterating. - targetPartitions.addAll(subscribedTopicDescriber.assignablePartitions(topicId).stream() - .sorted() - .map(partition -> new TopicIdPartition(topicId, partition)) - .toList() - ); + IntStream.range(0, numPartitions).filter(partition -> groupSpec.isPartitionAssignable(topicId, partition)) Review Comment: Given that this method returns a list of `TopicIdPartition` which is filtered by assignable partitions, shouldn't it be possible to remove the rest of the checks for assignable partitions in the other methods? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/GroupSpecImpl.java: ########## @@ -46,14 +48,31 @@ public class GroupSpecImpl implements GroupSpec { */ private final Map<Uuid, Map<Integer, String>> invertedMemberAssignment; + /** + * In case of share groups, this map will be queried to decide + * which partition is assignable. For non-share groups, + * this optional should be empty. + */ + private final Optional<Map<Uuid, Set<Integer>>> topicPartitionAllowedMap; Review Comment: Shouldn't this be added to `equals`, `toString` and `hash` also? ########## group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/GroupSpec.java: ########## @@ -40,6 +40,18 @@ public interface GroupSpec { */ boolean isPartitionAssigned(Uuid topicId, int partitionId); + /** + * For share groups, a partition cannot be directly assigned to a Review Comment: I would make this briefer. How about "For share groups, a partition can only be assigned once its initialization is complete. For other group types, this initialization is not required and all partitions returned by the SubscribedTopicDescriber are always assignable." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org