dajac commented on code in PR #16057:
URL: https://github.com/apache/kafka/pull/16057#discussion_r1613991045


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4424,14 +4419,124 @@ private ConsumerGroupMember 
validateConsumerGroupMember(
      * @param context        The request context.
      * @param request        The actual LeaveGroup request.
      *
+     * @return The LeaveGroup response and the records to append.
+     */
+    public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> 
classicGroupLeave(
+        RequestContext context,
+        LeaveGroupRequestData request
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        Group group = group(request.groupId());

Review Comment:
   ditto.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4424,14 +4419,124 @@ private ConsumerGroupMember 
validateConsumerGroupMember(
      * @param context        The request context.
      * @param request        The actual LeaveGroup request.
      *
+     * @return The LeaveGroup response and the records to append.
+     */
+    public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> 
classicGroupLeave(
+        RequestContext context,
+        LeaveGroupRequestData request
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        Group group = group(request.groupId());
+
+        if (group.type() == CLASSIC) {
+            return classicGroupLeaveToClassicGroup((ClassicGroup) group, 
context, request);
+        } else {
+            return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, 
context, request);
+        }
+    }
+
+    /**
+     * Handle a classic LeaveGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The ConsumerGroup.
+     * @param context        The request context.
+     * @param request        The actual LeaveGroup request.
+     *
+     * @return The LeaveGroup response and the records to append.
+     */
+    private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> 
classicGroupLeaveToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        LeaveGroupRequestData request
+    ) throws UnknownMemberIdException {
+        String groupId = group.groupId();
+        List<MemberResponse> memberResponses = new ArrayList<>();
+        Set<ConsumerGroupMember> validLeaveGroupMembers = new HashSet<>();
+        List<CoordinatorRecord> records = new ArrayList<>();
+
+        for (MemberIdentity memberIdentity : request.members()) {
+            String memberId = memberIdentity.memberId();
+            String instanceId = memberIdentity.groupInstanceId();
+            String reason = memberIdentity.reason() != null ? 
memberIdentity.reason() : "not provided";
+
+            ConsumerGroupMember member;
+            try {
+                if (instanceId == null) {
+                    member = group.getOrMaybeCreateMember(memberId, false);
+                    throwIfMemberDoesNotUseClassicProtocol(member);
+
+                    log.info("[Group {}] Dynamic Member {} has left group " +
+                            "through explicit `LeaveGroup` request; client 
reason: {}",
+                        groupId, memberId, reason);
+                } else {
+                    member = group.staticMember(instanceId);
+                    throwIfStaticMemberIsUnknown(member, instanceId);
+                    // The LeaveGroup API allows administrative removal of 
members by GroupInstanceId
+                    // in which case we expect the MemberId to be undefined.
+                    if (!UNKNOWN_MEMBER_ID.equals(memberId)) {
+                        throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
+                    }
+                    throwIfMemberDoesNotUseClassicProtocol(member);
+
+                    memberId = member.memberId();
+                    log.info("[Group {}] Static Member {} with instance id {} 
has left group " +
+                            "through explicit `LeaveGroup` request; client 
reason: {}",
+                        groupId, memberId, instanceId, reason);
+                }
+
+                removeMember(records, groupId, memberId);
+                cancelTimers(groupId, memberId);
+                memberResponses.add(
+                    new MemberResponse()
+                        .setMemberId(memberId)
+                        .setGroupInstanceId(instanceId)
+                );
+                validLeaveGroupMembers.add(member);
+            } catch (KafkaException e) {
+                memberResponses.add(
+                    new MemberResponse()
+                        .setMemberId(memberId)
+                        .setGroupInstanceId(instanceId)
+                        .setErrorCode(Errors.forException(e).code())
+                );
+            }
+        }
+
+        if (!records.isEmpty()) {
+            // Maybe update the subscription metadata.
+            Map<String, TopicMetadata> subscriptionMetadata = 
group.computeSubscriptionMetadata(
+                group.computeSubscribedTopicNames(validLeaveGroupMembers),
+                metadataImage.topics(),
+                metadataImage.cluster()
+            );
+
+            if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+                log.info("[GroupId {}] Computed new subscription metadata: 
{}.",
+                    group.groupId(), subscriptionMetadata);
+                
records.add(newGroupSubscriptionMetadataRecord(group.groupId(), 
subscriptionMetadata));
+            }
+
+            // Bump the group epoch.
+            records.add(newGroupEpochRecord(groupId, group.groupEpoch() + 1));
+        }
+
+        return new CoordinatorResult<>(records, new 
LeaveGroupResponseData().setMembers(memberResponses));
+    }
+
+    /**
+     * Handle a classic LeaveGroupRequest to a ClassicGroup.
+     *
+     * @param group          The ClassicGroup.
+     * @param context        The request context.
+     * @param request        The actual LeaveGroup request.
+     *
      * @return The LeaveGroup response and the GroupMetadata record to append 
if the group
      *         no longer has any members.
      */
-    public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> 
classicGroupLeave(
+    private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> 
classicGroupLeaveToClassicGroup(
+        ClassicGroup group,
         RequestContext context,
         LeaveGroupRequestData request
     ) throws UnknownMemberIdException, GroupIdNotFoundException {

Review Comment:
   Should we remove GroupIdNotFoundException here too?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4424,14 +4419,124 @@ private ConsumerGroupMember 
validateConsumerGroupMember(
      * @param context        The request context.
      * @param request        The actual LeaveGroup request.
      *
+     * @return The LeaveGroup response and the records to append.
+     */
+    public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> 
classicGroupLeave(
+        RequestContext context,
+        LeaveGroupRequestData request
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {

Review Comment:
   I suppose that we could remove GroupIdNotFoundException here too.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3984,9 +3985,9 @@ public CoordinatorResult<Void, CoordinatorRecord> 
classicGroupSync(
         SyncGroupRequestData request,
         CompletableFuture<SyncGroupResponseData> responseFuture
     ) throws UnknownMemberIdException, GroupIdNotFoundException {
-        Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+        Group group = group(request.groupId());

Review Comment:
   We need to catch GroupIdNotFoundException and re-thrown 
UnknownMemberIdException here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3984,9 +3985,9 @@ public CoordinatorResult<Void, CoordinatorRecord> 
classicGroupSync(
         SyncGroupRequestData request,
         CompletableFuture<SyncGroupResponseData> responseFuture
     ) throws UnknownMemberIdException, GroupIdNotFoundException {

Review Comment:
   Should we remove GroupIdNotFoundException as I don't think that we expect it?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4248,13 +4249,7 @@ public CoordinatorResult<HeartbeatResponseData, 
CoordinatorRecord> classicGroupH
         RequestContext context,
         HeartbeatRequestData request
     ) {
-        Group group = groups.get(request.groupId(), Long.MAX_VALUE);
-
-        if (group == null) {
-            throw new UnknownMemberIdException(
-                String.format("Group %s not found.", request.groupId())
-            );
-        }
+        Group group = group(request.groupId());

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to