dongnuo123 commented on code in PR #17008: URL: https://github.com/apache/kafka/pull/17008#discussion_r1734841038
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1046,43 +1046,58 @@ ShareGroup shareGroup( } /** - * Validates the online downgrade if a consumer member is fenced from the consumer group. + * Validates whether the group id is eligible for an online downgrade. * - * @param consumerGroup The ConsumerGroup. - * @param memberId The fenced member id. + * @param consumerGroup The group to downgrade. * @return A boolean indicating whether it's valid to online downgrade the consumer group. */ - private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { - if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { + private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup) { + if (!consumerGroup.allMembersUseClassic()) { return false; - } else if (consumerGroup.numMembers() <= 1) { + } else if (consumerGroup.isEmpty()) { log.debug("Skip downgrading the consumer group {} to classic group because it's empty.", consumerGroup.groupId()); return false; } else if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", consumerGroup.groupId()); return false; - } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) { + } else if (consumerGroup.numMembers() > classicGroupMaxSize) { log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", consumerGroup.groupId()); return false; } return true; } + /** + * Maybe downgrade the consumer group to a classic group if it's valid for online downgrade. + * + * @param groupId The group id. + * @return The CoordinatorResult to be applied. + */ + private <T> CoordinatorResult<T, CoordinatorRecord> consumerGroupDowngradeOperation( + String groupId + ) { + try { + ConsumerGroup consumerGroup = consumerGroup(groupId); + if (validateOnlineDowngrade(consumerGroup)) { + return convertToClassicGroup(consumerGroup); Review Comment: I agree that we are scheduling duplicate rebalances. The reason we always trigger a rebalance in the conversion was 1) to make the members report their session timeout (this is not needed anymore as now we stored the session timeout in ClassicProtocolMetadata) 2) to make sure the rebalancing group keeps rebalancing In the new approach, the only case where the converted group is stable is after the static member replacement in a stable group. If so, no rebalance is needed. In other cases, currently we do double rebalances > When a member leaves, the group epoch is bumped when we fence it. This triggers a rebalance. Then, we the group is converted, we trigger another one This is correct. I'm thinking if we can/need to combine them into one. The overhead of triggering two rebalances is that some members could rejoin twice for the two rebalances. Given that the timeout is 0 for the downgrade, there won't be too many of these members(?not sure). Even if we need to combine the two rebalances into one, the one in `convertToClassicGroup` is necessary, because it will be hard to carry the members that have rejoined during the conversion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org