dongnuo123 commented on code in PR #17008:
URL: https://github.com/apache/kafka/pull/17008#discussion_r1734841038


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1046,43 +1046,58 @@ ShareGroup shareGroup(
     }
 
     /**
-     * Validates the online downgrade if a consumer member is fenced from the 
consumer group.
+     * Validates whether the group id is eligible for an online downgrade.
      *
-     * @param consumerGroup The ConsumerGroup.
-     * @param memberId      The fenced member id.
+     * @param consumerGroup The group to downgrade.
      * @return A boolean indicating whether it's valid to online downgrade the 
consumer group.
      */
-    private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, 
String memberId) {
-        if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+    private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup) {
+        if (!consumerGroup.allMembersUseClassic()) {
             return false;
-        } else if (consumerGroup.numMembers() <= 1) {
+        } else if (consumerGroup.isEmpty()) {
             log.debug("Skip downgrading the consumer group {} to classic group 
because it's empty.",
                 consumerGroup.groupId());
             return false;
         } else if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
             log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
                 consumerGroup.groupId());
             return false;
-        } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+        } else if (consumerGroup.numMembers() > classicGroupMaxSize) {
             log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
                 consumerGroup.groupId());
             return false;
         }
         return true;
     }
 
+    /**
+     * Maybe downgrade the consumer group to a classic group if it's valid for 
online downgrade.
+     *
+     * @param groupId   The group id.
+     * @return The CoordinatorResult to be applied.
+     */
+    private <T> CoordinatorResult<T, CoordinatorRecord> 
consumerGroupDowngradeOperation(
+        String groupId
+    ) {
+        try {
+            ConsumerGroup consumerGroup = consumerGroup(groupId);
+            if (validateOnlineDowngrade(consumerGroup)) {
+                return convertToClassicGroup(consumerGroup);

Review Comment:
   I agree that we are scheduling duplicate rebalances. The reason we always 
trigger a rebalance in the conversion was 1) to make the members report their 
session timeout (this is not needed anymore as now we stored the session 
timeout in ClassicProtocolMetadata) 2) to make sure the rebalancing group keeps 
rebalancing
   
   In the new approach, the only case where the converted group is stable is 
after the static member replacement in a stable group. If so, no rebalance is 
needed. 
   
   In other cases, currently we do double rebalances
   > When a member leaves, the group epoch is bumped when we fence it. This 
triggers a rebalance. Then, we the group is converted, we trigger another one
   
   This is correct. I'm thinking if we can/need to combine them into one. The 
overhead of triggering two rebalances is that some members could rejoin twice 
for the two rebalances. Given that the timeout is 0 for the downgrade, there 
won't be too many of these members(?not sure). Even if we need to combine the 
two rebalances into one, the one in `convertToClassicGroup` is necessary, 
because it will be hard to carry the members that have rejoined during the 
conversion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to