squah-confluent commented on code in PR #18224:
URL: https://github.com/apache/kafka/pull/18224#discussion_r1909869186


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3001,17 +3010,52 @@ private <T> CoordinatorResult<T, CoordinatorRecord> 
consumerGroupFenceMember(
         ConsumerGroupMember member,
         T response
     ) {
+        return consumerGroupFenceMembers(group, Set.of(member), response);
+    }
+
+    /**
+     * Fences members from a consumer group and maybe downgrade the consumer 
group to a classic group.
+     *
+     * @param group     The group.
+     * @param members   The members.
+     * @param response  The response of the CoordinatorResult.
+     *
+     * @return The CoordinatorResult to be applied.
+     */
+    private <T> CoordinatorResult<T, CoordinatorRecord> 
consumerGroupFenceMembers(
+        ConsumerGroup group,
+        Set<ConsumerGroupMember> members,
+        T response
+    ) {
+        if (members.isEmpty()) {
+            // No members to fence. Don't bump the group epoch.
+            return new CoordinatorResult<>(Collections.emptyList(), response);
+        }
+
+        Set<String> memberIds = new HashSet<String>();
+        for (ConsumerGroupMember member : members) {
+            memberIds.add(member.memberId());
+        }

Review Comment:
   Also gave this a try. For consistency I applied the same change to 
`validateOnlineDowngradeWithReplacedMemberId`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to