AndrewJSchofield commented on code in PR #19838:
URL: https://github.com/apache/kafka/pull/19838#discussion_r2111347423


##########
group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/GroupSpec.java:
##########
@@ -40,6 +40,18 @@ public interface GroupSpec {
      */
     boolean isPartitionAssigned(Uuid topicId, int partitionId);
 
+    /**
+     * For share groups, a partition cannot be directly assigned to a
+     * share consumer. It must first be initialized in the share coordinator
+     * and on successful persistance, it can be assigned. For non-share groups,
+     * this will always return true.
+     *
+     * @param topicId The topic uuid.

Review Comment:
   I would copy the `@param` text exactly from `SubscribedTopicDescriber`. So 
that would be:
   
   ```
        * @param topicId       Uuid corresponding to the partition's topic.
        * @param partition     Partition Id within topic.
   ```
   
   If you could also copy this in to the comment block for 
`isPartitionAssigned`, it would be even better.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java:
##########
@@ -373,13 +373,8 @@ private List<TopicIdPartition> computeTargetPartitions(
                 );
             }
 
-            // Since we are returning a list here, we can keep it sorted
-            // to add determinism while testing and iterating.
-            
targetPartitions.addAll(subscribedTopicDescriber.assignablePartitions(topicId).stream()
-                .sorted()
-                .map(partition -> new TopicIdPartition(topicId, partition))
-                .toList()
-            );
+            IntStream.range(0, numPartitions).filter(partition -> 
groupSpec.isPartitionAssignable(topicId, partition))

Review Comment:
   Given that this method returns a list of `TopicIdPartition` which is 
filtered by assignable partitions, shouldn't it be possible to remove the rest 
of the checks for assignable partitions in the other methods?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/GroupSpecImpl.java:
##########
@@ -46,14 +48,31 @@ public class GroupSpecImpl implements GroupSpec {
      */
     private final Map<Uuid, Map<Integer, String>> invertedMemberAssignment;
 
+    /**
+     * In case of share groups, this map will be queried to decide
+     * which partition is assignable. For non-share groups,
+     * this optional should be empty.
+     */
+    private final Optional<Map<Uuid, Set<Integer>>> topicPartitionAllowedMap;

Review Comment:
   Shouldn't this be added to `equals`, `toString` and `hash` also?



##########
group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/GroupSpec.java:
##########
@@ -40,6 +40,18 @@ public interface GroupSpec {
      */
     boolean isPartitionAssigned(Uuid topicId, int partitionId);
 
+    /**
+     * For share groups, a partition cannot be directly assigned to a

Review Comment:
   I would make this briefer. How about "For share groups, a partition can only 
be assigned once its initialization is complete. For other group types, this 
initialization is not required and all partitions returned by the 
SubscribedTopicDescriber are always assignable."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to