jeffkbkim commented on code in PR #16057:
URL: https://github.com/apache/kafka/pull/16057#discussion_r1613845801


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -12801,6 +12801,365 @@ public void 
testConsumerGroupMemberUsingClassicProtocolFencedWhenJoinTimeout() {
         );
     }
 
+    @Test
+    public void testConsumerGroupMemberUsingClassicProtocolBatchLeaveGroup() {
+        String groupId = "group-id";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+        String instanceId2 = "instance-id-2";
+        String instanceId3 = "instance-id-3";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocol1 = 
Collections.singletonList(
+            new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                .setName("range")
+                
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                    Arrays.asList(fooTopicName, barTopicName),
+                    null,
+                    Collections.singletonList(new TopicPartition(fooTopicName, 
0))
+                ))))
+        );
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocol2 = 
Collections.singletonList(
+            new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                .setName("range")
+                
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                    Arrays.asList(fooTopicName, barTopicName),
+                    null,
+                    Collections.singletonList(new TopicPartition(fooTopicName, 
1))
+                ))))
+        );
+
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setClassicMemberMetadata(
+                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                    .setSessionTimeoutMs(5000)
+                    .setSupportedProtocols(protocol1)
+            )
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build();
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setInstanceId(instanceId2)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(9)
+            .setPreviousMemberEpoch(8)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setClassicMemberMetadata(
+                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                    .setSessionTimeoutMs(5000)
+                    .setSupportedProtocols(protocol2)
+            )
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
1)))
+            .build();
+        ConsumerGroupMember member3 = new 
ConsumerGroupMember.Builder(memberId3)
+            .setInstanceId(instanceId3)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId, 
0)))
+            .build();
+
+        // Consumer group with three members.
+        // Dynamic member 1 uses the classic protocol;
+        // static member 2 uses the classic protocol;

Review Comment:
   nit: can we change the ";"s to "."s?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4424,14 +4425,128 @@ private ConsumerGroupMember 
validateConsumerGroupMember(
      * @param context        The request context.
      * @param request        The actual LeaveGroup request.
      *
+     * @return The LeaveGroup response and the records to append.
+     */
+    public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> 
classicGroupLeave(
+        RequestContext context,
+        LeaveGroupRequestData request
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+
+        if (group == null) {
+            throw new UnknownMemberIdException(String.format("Group %s not 
found.", request.groupId()));
+        }
+
+        if (group.type() == CLASSIC) {
+            return classicGroupLeaveToClassicGroup((ClassicGroup) group, 
context, request);
+        } else {
+            return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, 
context, request);
+        }
+    }
+
+    /**
+     * Handle a classic LeaveGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The ConsumerGroup.
+     * @param context        The request context.
+     * @param request        The actual LeaveGroup request.
+     *
+     * @return The LeaveGroup response and the records to append.
+     */
+    private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> 
classicGroupLeaveToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        LeaveGroupRequestData request
+    ) throws UnknownMemberIdException {
+        String groupId = group.groupId();
+        List<MemberResponse> memberResponses = new ArrayList<>();
+        Set<ConsumerGroupMember> validLeaveGroupMembers = new HashSet<>();
+        List<CoordinatorRecord> records = new ArrayList<>();
+
+        for (MemberIdentity memberIdentity: request.members()) {

Review Comment:
   nit: `for (MemberIdentity memberIdentity : request.members()) {`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -1001,6 +1001,27 @@ public Map<String, Integer> computeSubscribedTopicNames(
         return subscribedTopicNames;
     }
 
+    /**
+     * Updates the subscription count with a list of members removed.
+     *
+     * @param removedMembers        The list of removed members.
+     *
+     * @return Copy of the map of topics to the count of number of subscribers.
+     */
+    public Map<String, Integer> computeSubscribedTopicNames(
+        List<ConsumerGroupMember> removedMembers
+    ) {
+        Map<String, Integer> subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
+        if (removedMembers != null) {
+            removedMembers.forEach(removedMember ->
+                removedMember.subscribedTopicNames().forEach(topicName ->
+                    subscribedTopicNames.compute(topicName, 
ConsumerGroup::decValue)
+                )
+            );
+        }
+        return subscribedTopicNames;

Review Comment:
   can we reuse maybeUpdateSubscribedTopicNames, something like
   
   ```
           if (removedMembers != null) {
               removedMembers.forEach(removedMember ->
                   maybeUpdateSubscribedTopicNames(
                       subscribedTopicNames,
                       removedMember,
                       null
                   )
               );
           }
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4424,14 +4425,128 @@ private ConsumerGroupMember 
validateConsumerGroupMember(
      * @param context        The request context.
      * @param request        The actual LeaveGroup request.
      *
+     * @return The LeaveGroup response and the records to append.
+     */
+    public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> 
classicGroupLeave(
+        RequestContext context,
+        LeaveGroupRequestData request
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+
+        if (group == null) {
+            throw new UnknownMemberIdException(String.format("Group %s not 
found.", request.groupId()));
+        }
+
+        if (group.type() == CLASSIC) {
+            return classicGroupLeaveToClassicGroup((ClassicGroup) group, 
context, request);
+        } else {
+            return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, 
context, request);
+        }
+    }
+
+    /**
+     * Handle a classic LeaveGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The ConsumerGroup.
+     * @param context        The request context.
+     * @param request        The actual LeaveGroup request.
+     *
+     * @return The LeaveGroup response and the records to append.
+     */
+    private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> 
classicGroupLeaveToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        LeaveGroupRequestData request
+    ) throws UnknownMemberIdException {
+        String groupId = group.groupId();
+        List<MemberResponse> memberResponses = new ArrayList<>();
+        Set<ConsumerGroupMember> validLeaveGroupMembers = new HashSet<>();
+        List<CoordinatorRecord> records = new ArrayList<>();
+
+        for (MemberIdentity memberIdentity: request.members()) {
+            String memberId = memberIdentity.memberId();
+            String instanceId = memberIdentity.groupInstanceId();
+            String reason = memberIdentity.reason() != null ? 
memberIdentity.reason() : "not provided";
+
+            ConsumerGroupMember member;
+            try {
+                if (instanceId == null) {
+                    member = group.getOrMaybeCreateMember(memberId, false);
+                    throwIfMemberDoesNotUseClassicProtocol(member);
+
+                    log.info("[Group {}] Dynamic Member {} has left group " +
+                            "through explicit `LeaveGroup` request; client 
reason: {}",
+                        groupId, memberId, reason);
+                } else {
+                    member = group.staticMember(instanceId);
+                    throwIfStaticMemberIsUnknown(member, instanceId);
+                    // The LeaveGroup API allows administrative removal of 
members by GroupInstanceId
+                    // in which case we expect the MemberId to be undefined.
+                    if (!UNKNOWN_MEMBER_ID.equals(memberId)) {
+                        throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
+                    }
+                    throwIfMemberDoesNotUseClassicProtocol(member);
+
+                    memberId = member.memberId();
+                    log.info("[Group {}] Static Member {} with instance id {} 
has left group " +
+                            "through explicit `LeaveGroup` request; client 
reason: {}",
+                        groupId, memberId, instanceId, reason);
+                }
+
+                removeMember(records, groupId, memberId);
+                cancelTimers(groupId, memberId);
+                memberResponses.add(
+                    new MemberResponse()
+                        .setMemberId(memberId)
+                        .setGroupInstanceId(instanceId)
+                );
+                validLeaveGroupMembers.add(member);
+            } catch (KafkaException e) {
+                memberResponses.add(
+                    new MemberResponse()
+                        .setMemberId(memberId)
+                        .setGroupInstanceId(instanceId)
+                        .setErrorCode(Errors.forException(e).code())
+                );
+            }
+        }
+
+        if (!records.isEmpty()) {
+            // Maybe update the subscription metadata.
+            Map<String, TopicMetadata> subscriptionMetadata = 
group.computeSubscriptionMetadata(
+                group.computeSubscribedTopicNames(new 
ArrayList<>(validLeaveGroupMembers)),

Review Comment:
   do we need to initialize an arraylist here?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -12801,6 +12801,365 @@ public void 
testConsumerGroupMemberUsingClassicProtocolFencedWhenJoinTimeout() {
         );
     }
 
+    @Test
+    public void testConsumerGroupMemberUsingClassicProtocolBatchLeaveGroup() {
+        String groupId = "group-id";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+        String instanceId2 = "instance-id-2";
+        String instanceId3 = "instance-id-3";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocol1 = 
Collections.singletonList(
+            new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                .setName("range")
+                
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                    Arrays.asList(fooTopicName, barTopicName),
+                    null,
+                    Collections.singletonList(new TopicPartition(fooTopicName, 
0))
+                ))))
+        );
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocol2 = 
Collections.singletonList(
+            new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                .setName("range")
+                
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                    Arrays.asList(fooTopicName, barTopicName),
+                    null,
+                    Collections.singletonList(new TopicPartition(fooTopicName, 
1))
+                ))))
+        );
+
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setClassicMemberMetadata(
+                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                    .setSessionTimeoutMs(5000)
+                    .setSupportedProtocols(protocol1)
+            )
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build();
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setInstanceId(instanceId2)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(9)
+            .setPreviousMemberEpoch(8)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setClassicMemberMetadata(
+                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                    .setSessionTimeoutMs(5000)
+                    .setSupportedProtocols(protocol2)
+            )
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
1)))
+            .build();
+        ConsumerGroupMember member3 = new 
ConsumerGroupMember.Builder(memberId3)
+            .setInstanceId(instanceId3)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId, 
0)))
+            .build();
+
+        // Consumer group with three members.
+        // Dynamic member 1 uses the classic protocol;
+        // static member 2 uses the classic protocol;
+        // static member 3 uses the consumer protocol.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 2)
+                .addTopic(barTopicId, barTopicName, 1)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(member1)
+                .withMember(member2)
+                .withMember(member3)
+                .withAssignment(memberId1, 
mkAssignment(mkTopicAssignment(fooTopicId, 0)))
+                .withAssignment(memberId2, 
mkAssignment(mkTopicAssignment(fooTopicId, 1)))
+                .withAssignment(memberId3, 
mkAssignment(mkTopicAssignment(barTopicId, 0)))
+                .withAssignmentEpoch(10))
+            .build();
+        
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE,
 10);
+        
context.replay(CoordinatorRecordHelpers.newGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
+            {
+                put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 
2, mkMapOfPartitionRacks(2)));
+                put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 
1, mkMapOfPartitionRacks(1)));
+            }
+        }));
+
+        // Member 1 joins to schedule the sync timeout and the heartbeat 
timeout.
+        context.sendClassicGroupJoin(
+            new GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+                .withGroupId(groupId)
+                .withMemberId(memberId1)
+                .withRebalanceTimeoutMs(member1.rebalanceTimeoutMs())
+                
.withSessionTimeoutMs(member1.classicMemberMetadata().get().sessionTimeoutMs())
+                
.withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol(
+                    Arrays.asList(fooTopicName, barTopicName),
+                    Collections.singletonList(new TopicPartition(fooTopicName, 
0))))
+                .build()
+        ).appendFuture.complete(null);
+        context.assertSyncTimeout(groupId, memberId1, 
member1.rebalanceTimeoutMs());
+        context.assertSessionTimeout(groupId, memberId1, 
member1.classicMemberMetadata().get().sessionTimeoutMs());
+
+        // Member 2 heartbeats to schedule the join timeout and the heartbeat 
timeout.
+        context.sendClassicGroupHeartbeat(
+            new HeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setGenerationId(9)
+        );
+        context.assertJoinTimeout(groupId, memberId2, 
member2.rebalanceTimeoutMs());
+        context.assertSessionTimeout(groupId, memberId2, 
member2.classicMemberMetadata().get().sessionTimeoutMs());
+
+        // Member 1 and member 2 leave the group.
+        CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> 
leaveResult = context.sendClassicGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("group-id")
+                .setMembers(Arrays.asList(
+                    // Valid member id.
+                    new MemberIdentity()
+                        .setMemberId(memberId1),
+                    new MemberIdentity()
+                        .setGroupInstanceId(instanceId2),
+                    // Member that doesn't use the classic protocol.
+                    new MemberIdentity()
+                        .setMemberId(memberId3)
+                        .setGroupInstanceId(instanceId3),
+                    // Unknown member ids.

Review Comment:
   nit: Unknown member id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to