dajac commented on code in PR #16057: URL: https://github.com/apache/kafka/pull/16057#discussion_r1613991045
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -4424,14 +4419,124 @@ private ConsumerGroupMember validateConsumerGroupMember( * @param context The request context. * @param request The actual LeaveGroup request. * + * @return The LeaveGroup response and the records to append. + */ + public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeave( + RequestContext context, + LeaveGroupRequestData request + ) throws UnknownMemberIdException, GroupIdNotFoundException { + Group group = group(request.groupId()); Review Comment: ditto. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -4424,14 +4419,124 @@ private ConsumerGroupMember validateConsumerGroupMember( * @param context The request context. * @param request The actual LeaveGroup request. * + * @return The LeaveGroup response and the records to append. + */ + public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeave( + RequestContext context, + LeaveGroupRequestData request + ) throws UnknownMemberIdException, GroupIdNotFoundException { + Group group = group(request.groupId()); + + if (group.type() == CLASSIC) { + return classicGroupLeaveToClassicGroup((ClassicGroup) group, context, request); + } else { + return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, context, request); + } + } + + /** + * Handle a classic LeaveGroupRequest to a ConsumerGroup. + * + * @param group The ConsumerGroup. + * @param context The request context. + * @param request The actual LeaveGroup request. + * + * @return The LeaveGroup response and the records to append. + */ + private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeaveToConsumerGroup( + ConsumerGroup group, + RequestContext context, + LeaveGroupRequestData request + ) throws UnknownMemberIdException { + String groupId = group.groupId(); + List<MemberResponse> memberResponses = new ArrayList<>(); + Set<ConsumerGroupMember> validLeaveGroupMembers = new HashSet<>(); + List<CoordinatorRecord> records = new ArrayList<>(); + + for (MemberIdentity memberIdentity : request.members()) { + String memberId = memberIdentity.memberId(); + String instanceId = memberIdentity.groupInstanceId(); + String reason = memberIdentity.reason() != null ? memberIdentity.reason() : "not provided"; + + ConsumerGroupMember member; + try { + if (instanceId == null) { + member = group.getOrMaybeCreateMember(memberId, false); + throwIfMemberDoesNotUseClassicProtocol(member); + + log.info("[Group {}] Dynamic Member {} has left group " + + "through explicit `LeaveGroup` request; client reason: {}", + groupId, memberId, reason); + } else { + member = group.staticMember(instanceId); + throwIfStaticMemberIsUnknown(member, instanceId); + // The LeaveGroup API allows administrative removal of members by GroupInstanceId + // in which case we expect the MemberId to be undefined. + if (!UNKNOWN_MEMBER_ID.equals(memberId)) { + throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); + } + throwIfMemberDoesNotUseClassicProtocol(member); + + memberId = member.memberId(); + log.info("[Group {}] Static Member {} with instance id {} has left group " + + "through explicit `LeaveGroup` request; client reason: {}", + groupId, memberId, instanceId, reason); + } + + removeMember(records, groupId, memberId); + cancelTimers(groupId, memberId); + memberResponses.add( + new MemberResponse() + .setMemberId(memberId) + .setGroupInstanceId(instanceId) + ); + validLeaveGroupMembers.add(member); + } catch (KafkaException e) { + memberResponses.add( + new MemberResponse() + .setMemberId(memberId) + .setGroupInstanceId(instanceId) + .setErrorCode(Errors.forException(e).code()) + ); + } + } + + if (!records.isEmpty()) { + // Maybe update the subscription metadata. + Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata( + group.computeSubscribedTopicNames(validLeaveGroupMembers), + metadataImage.topics(), + metadataImage.cluster() + ); + + if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { + log.info("[GroupId {}] Computed new subscription metadata: {}.", + group.groupId(), subscriptionMetadata); + records.add(newGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata)); + } + + // Bump the group epoch. + records.add(newGroupEpochRecord(groupId, group.groupEpoch() + 1)); + } + + return new CoordinatorResult<>(records, new LeaveGroupResponseData().setMembers(memberResponses)); + } + + /** + * Handle a classic LeaveGroupRequest to a ClassicGroup. + * + * @param group The ClassicGroup. + * @param context The request context. + * @param request The actual LeaveGroup request. + * * @return The LeaveGroup response and the GroupMetadata record to append if the group * no longer has any members. */ - public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeave( + private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeaveToClassicGroup( + ClassicGroup group, RequestContext context, LeaveGroupRequestData request ) throws UnknownMemberIdException, GroupIdNotFoundException { Review Comment: Should we remove GroupIdNotFoundException here too? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -4424,14 +4419,124 @@ private ConsumerGroupMember validateConsumerGroupMember( * @param context The request context. * @param request The actual LeaveGroup request. * + * @return The LeaveGroup response and the records to append. + */ + public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeave( + RequestContext context, + LeaveGroupRequestData request + ) throws UnknownMemberIdException, GroupIdNotFoundException { Review Comment: I suppose that we could remove GroupIdNotFoundException here too. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3984,9 +3985,9 @@ public CoordinatorResult<Void, CoordinatorRecord> classicGroupSync( SyncGroupRequestData request, CompletableFuture<SyncGroupResponseData> responseFuture ) throws UnknownMemberIdException, GroupIdNotFoundException { - Group group = groups.get(request.groupId(), Long.MAX_VALUE); + Group group = group(request.groupId()); Review Comment: We need to catch GroupIdNotFoundException and re-thrown UnknownMemberIdException here. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3984,9 +3985,9 @@ public CoordinatorResult<Void, CoordinatorRecord> classicGroupSync( SyncGroupRequestData request, CompletableFuture<SyncGroupResponseData> responseFuture ) throws UnknownMemberIdException, GroupIdNotFoundException { Review Comment: Should we remove GroupIdNotFoundException as I don't think that we expect it? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -4248,13 +4249,7 @@ public CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> classicGroupH RequestContext context, HeartbeatRequestData request ) { - Group group = groups.get(request.groupId(), Long.MAX_VALUE); - - if (group == null) { - throw new UnknownMemberIdException( - String.format("Group %s not found.", request.groupId()) - ); - } + Group group = group(request.groupId()); Review Comment: ditto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org