squah-confluent commented on code in PR #18224: URL: https://github.com/apache/kafka/pull/18224#discussion_r1909869186
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3001,17 +3010,52 @@ private <T> CoordinatorResult<T, CoordinatorRecord> consumerGroupFenceMember( ConsumerGroupMember member, T response ) { + return consumerGroupFenceMembers(group, Set.of(member), response); + } + + /** + * Fences members from a consumer group and maybe downgrade the consumer group to a classic group. + * + * @param group The group. + * @param members The members. + * @param response The response of the CoordinatorResult. + * + * @return The CoordinatorResult to be applied. + */ + private <T> CoordinatorResult<T, CoordinatorRecord> consumerGroupFenceMembers( + ConsumerGroup group, + Set<ConsumerGroupMember> members, + T response + ) { + if (members.isEmpty()) { + // No members to fence. Don't bump the group epoch. + return new CoordinatorResult<>(Collections.emptyList(), response); + } + + Set<String> memberIds = new HashSet<String>(); + for (ConsumerGroupMember member : members) { + memberIds.add(member.memberId()); + } Review Comment: Also gave this a try. For consistency I applied the same change to `validateOnlineDowngradeWithReplacedMemberId`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org