AndrewJSchofield commented on code in PR #19026: URL: https://github.com/apache/kafka/pull/19026#discussion_r1995963682
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java: ########## @@ -74,6 +83,30 @@ public Set<String> racksForPartition(Uuid topicId, int partition) { return Set.of(); } + /** + * Returns a set of assignable partitions from the topic metadata. + * If the allowed partition map is null, all the partitions in the corresponding + * topic metadata are returned for the argument topic id. If allowed map is empty, + * empty set is returned. + * + * @param topicId The uuid of the topic + * @return Set of integers if assignable partitions available, empty otherwise. + * @throws UnknownTopicIdException if the topicId is not found in the metadata. Review Comment: This method does not throw this exception :) ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java: ########## @@ -287,6 +287,11 @@ protected MemberSubscriptionAndAssignmentImpl newMemberSubscriptionAndAssignment */ private Map<String, String> staticMembers = new HashMap<>(); + /** + * Topic partition allow map. + */ + private Map<Uuid, Set<Integer>> topicPartitionAllowedMap = new HashMap<>(); Review Comment: I would call it the `topicAssignablePartitionsMap` I think. "Allow" sounds like there's an authorization aspect to it, I think. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java: ########## @@ -74,6 +83,30 @@ public Set<String> racksForPartition(Uuid topicId, int partition) { return Set.of(); } + /** + * Returns a set of assignable partitions from the topic metadata. + * If the allowed partition map is null, all the partitions in the corresponding + * topic metadata are returned for the argument topic id. If allowed map is empty, + * empty set is returned. + * + * @param topicId The uuid of the topic + * @return Set of integers if assignable partitions available, empty otherwise. + * @throws UnknownTopicIdException if the topicId is not found in the metadata. Review Comment: Actually, you've used `Set.of()` in both cases, which is fine. Just an observation. I can imagine that returning a `null` might be distasteful to some people and I won't push it if you prefer it this way. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -443,7 +454,19 @@ public CompletableFuture<ShareGroupHeartbeatResponseData> shareGroupHeartbeat( topicPartitionFor(request.groupId()), Duration.ofMillis(config.offsetCommitTimeoutMs()), coordinator -> coordinator.shareGroupHeartbeat(context, request) - ).exceptionally(exception -> handleOperationException( + ).thenCompose(result -> { + // This ensures that the previous group write has completed successfully + // before we start the persister initialize phase. + if (result.getValue().isPresent()) { Review Comment: So, the obvious question here is, what happens if the previous group write has NOT completed successfully and the result is not yet present? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2323,10 +2346,115 @@ private CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> sh // 2. The member's assignment has been updated. boolean isFullRequest = subscribedTopicNames != null; if (memberEpoch == 0 || isFullRequest || hasAssignedPartitionsChanged(member, updatedMember)) { - response.setAssignment(createShareGroupResponseAssignment(updatedMember)); + ShareGroupHeartbeatResponseData.Assignment assignment = createShareGroupResponseAssignment(updatedMember); + response.setAssignment(assignment); } - return new CoordinatorResult<>(records, response); + return new CoordinatorResult<>( + records, + Map.entry( + response, + maybeCreateInitializeShareGroupStateRequest(groupId, groupEpoch, subscriptionMetadata) + ) + ); + } + + private boolean initializedAssignmentPending(ShareGroup group) { + if (!shareGroupPartitionMetadata.containsKey(group.groupId())) { + // No initialized share partitions for the group Review Comment: nit: Comment could easily be on a single longer line. General point for this method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org