Copilot commented on code in PR #20755:
URL: https://github.com/apache/kafka/pull/20755#discussion_r2474176821


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1985,30 +1985,54 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
 
         // Actually bump the group epoch
         int groupEpoch = group.groupEpoch();
+        boolean isInitialRebalance = (group.isEmpty() && groupEpoch == 0);
         if (bumpGroupEpoch) {
-            groupEpoch += 1;
+            if (isInitialRebalance) {
+                groupEpoch += 2;
+            } else {
+                groupEpoch += 1;
+            }

Review Comment:
   The `isInitialRebalance` check on line 1988 is evaluated before 
`bumpGroupEpoch` is processed (line 1989-1999), but the condition relies on 
`group.isEmpty()` which becomes false after the member is added to the group 
earlier in the method. This means `isInitialRebalance` will always be false for 
the first member join since the member is already added to the group before 
this check. The timer scheduling on line 2003 may never execute as intended.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1985,30 +1985,54 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
 
         // Actually bump the group epoch
         int groupEpoch = group.groupEpoch();
+        boolean isInitialRebalance = (group.isEmpty() && groupEpoch == 0);
         if (bumpGroupEpoch) {
-            groupEpoch += 1;
+            if (isInitialRebalance) {
+                groupEpoch += 2;
+            } else {
+                groupEpoch += 1;
+            }
             records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch, 
metadataHash, validatedTopologyEpoch, currentAssignmentConfigs));
             log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to 
{} with metadata hash {} and validated topic epoch {}.", groupId, memberId, 
groupEpoch, metadataHash, validatedTopologyEpoch);
             metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
             group.setMetadataRefreshDeadline(currentTimeMs + 
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
         }
 
+        // Schedule initial rebalance delay for new streams groups to coalesce 
joins.
+        int initialDelayMs = streamsGroupInitialRebalanceDelayMs(groupId);
+        if (isInitialRebalance && initialDelayMs > 0) {

Review Comment:
   The `isInitialRebalance` check on line 1988 is evaluated before 
`bumpGroupEpoch` is processed (line 1989-1999), but the condition relies on 
`group.isEmpty()` which becomes false after the member is added to the group 
earlier in the method. This means `isInitialRebalance` will always be false for 
the first member join since the member is already added to the group before 
this check. The timer scheduling on line 2003 may never execute as intended.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1985,30 +1985,54 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
 
         // Actually bump the group epoch
         int groupEpoch = group.groupEpoch();
+        boolean isInitialRebalance = (group.isEmpty() && groupEpoch == 0);
         if (bumpGroupEpoch) {
-            groupEpoch += 1;
+            if (isInitialRebalance) {
+                groupEpoch += 2;
+            } else {
+                groupEpoch += 1;
+            }
             records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch, 
metadataHash, validatedTopologyEpoch, currentAssignmentConfigs));
             log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to 
{} with metadata hash {} and validated topic epoch {}.", groupId, memberId, 
groupEpoch, metadataHash, validatedTopologyEpoch);
             metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
             group.setMetadataRefreshDeadline(currentTimeMs + 
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
         }
 
+        // Schedule initial rebalance delay for new streams groups to coalesce 
joins.
+        int initialDelayMs = streamsGroupInitialRebalanceDelayMs(groupId);
+        if (isInitialRebalance && initialDelayMs > 0) {
+            timer.scheduleIfAbsent(
+                    streamsInitialRebalanceKey(groupId),
+                    initialDelayMs,
+                    TimeUnit.MILLISECONDS,
+                    false,
+                    () -> fireStreamsInitialRebalance(groupId)
+            );
+        }

Review Comment:
   The timer is only scheduled when `bumpGroupEpoch` is true (due to placement 
after line 1999), but `isInitialRebalance` is checked independently. If the 
first member join doesn't trigger a group epoch bump (which could happen if no 
metadata changes occur), the initial rebalance timer may not be scheduled. 
Consider scheduling the timer before the `bumpGroupEpoch` condition or ensuring 
it's always scheduled for the first member.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to