AndrewJSchofield commented on code in PR #19026:
URL: https://github.com/apache/kafka/pull/19026#discussion_r1995963682


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java:
##########
@@ -74,6 +83,30 @@ public Set<String> racksForPartition(Uuid topicId, int 
partition) {
         return Set.of();
     }
 
+    /**
+     * Returns a set of assignable partitions from the topic metadata.
+     * If the allowed partition map is null, all the partitions in the 
corresponding
+     * topic metadata are returned for the argument topic id. If allowed map 
is empty,
+     * empty set is returned.
+     *
+     * @param topicId The uuid of the topic
+     * @return Set of integers if assignable partitions available, empty 
otherwise.
+     * @throws UnknownTopicIdException if the topicId is not found in the 
metadata.

Review Comment:
   This method does not throw this exception :)



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java:
##########
@@ -287,6 +287,11 @@ protected MemberSubscriptionAndAssignmentImpl 
newMemberSubscriptionAndAssignment
      */
     private Map<String, String> staticMembers = new HashMap<>();
 
+    /**
+     * Topic partition allow map.
+     */
+    private Map<Uuid, Set<Integer>> topicPartitionAllowedMap = new HashMap<>();

Review Comment:
   I would call it the `topicAssignablePartitionsMap` I think. "Allow" sounds 
like there's an authorization aspect to it, I think.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java:
##########
@@ -74,6 +83,30 @@ public Set<String> racksForPartition(Uuid topicId, int 
partition) {
         return Set.of();
     }
 
+    /**
+     * Returns a set of assignable partitions from the topic metadata.
+     * If the allowed partition map is null, all the partitions in the 
corresponding
+     * topic metadata are returned for the argument topic id. If allowed map 
is empty,
+     * empty set is returned.
+     *
+     * @param topicId The uuid of the topic
+     * @return Set of integers if assignable partitions available, empty 
otherwise.
+     * @throws UnknownTopicIdException if the topicId is not found in the 
metadata.

Review Comment:
   Actually, you've used `Set.of()` in both cases, which is fine. Just an 
observation. I can imagine that returning a `null` might be distasteful to some 
people and I won't push it if you prefer it this way.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -443,7 +454,19 @@ public CompletableFuture<ShareGroupHeartbeatResponseData> 
shareGroupHeartbeat(
             topicPartitionFor(request.groupId()),
             Duration.ofMillis(config.offsetCommitTimeoutMs()),
             coordinator -> coordinator.shareGroupHeartbeat(context, request)
-        ).exceptionally(exception -> handleOperationException(
+        ).thenCompose(result -> {
+            // This ensures that the previous group write has completed 
successfully
+            // before we start the persister initialize phase.
+            if (result.getValue().isPresent()) {

Review Comment:
   So, the obvious question here is, what happens if the previous group write 
has NOT completed successfully and the result is not yet present?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2323,10 +2346,115 @@ private 
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> sh
         // 2. The member's assignment has been updated.
         boolean isFullRequest = subscribedTopicNames != null;
         if (memberEpoch == 0 || isFullRequest || 
hasAssignedPartitionsChanged(member, updatedMember)) {
-            
response.setAssignment(createShareGroupResponseAssignment(updatedMember));
+            ShareGroupHeartbeatResponseData.Assignment assignment = 
createShareGroupResponseAssignment(updatedMember);
+            response.setAssignment(assignment);
         }
 
-        return new CoordinatorResult<>(records, response);
+        return new CoordinatorResult<>(
+            records,
+            Map.entry(
+                response,
+                maybeCreateInitializeShareGroupStateRequest(groupId, 
groupEpoch, subscriptionMetadata)
+            )
+        );
+    }
+
+    private boolean initializedAssignmentPending(ShareGroup group) {
+        if (!shareGroupPartitionMetadata.containsKey(group.groupId())) {
+            // No initialized share partitions for the group

Review Comment:
   nit: Comment could easily be on a single longer line. General point for this 
method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to