Copilot commented on code in PR #20755:
URL: https://github.com/apache/kafka/pull/20755#discussion_r2474176821
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1985,30 +1985,54 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
// Actually bump the group epoch
int groupEpoch = group.groupEpoch();
+ boolean isInitialRebalance = (group.isEmpty() && groupEpoch == 0);
if (bumpGroupEpoch) {
- groupEpoch += 1;
+ if (isInitialRebalance) {
+ groupEpoch += 2;
+ } else {
+ groupEpoch += 1;
+ }
Review Comment:
The `isInitialRebalance` check on line 1988 is evaluated before
`bumpGroupEpoch` is processed (line 1989-1999), but the condition relies on
`group.isEmpty()` which becomes false after the member is added to the group
earlier in the method. This means `isInitialRebalance` will always be false for
the first member join since the member is already added to the group before
this check. The timer scheduling on line 2003 may never execute as intended.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1985,30 +1985,54 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
// Actually bump the group epoch
int groupEpoch = group.groupEpoch();
+ boolean isInitialRebalance = (group.isEmpty() && groupEpoch == 0);
if (bumpGroupEpoch) {
- groupEpoch += 1;
+ if (isInitialRebalance) {
+ groupEpoch += 2;
+ } else {
+ groupEpoch += 1;
+ }
records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch,
metadataHash, validatedTopologyEpoch, currentAssignmentConfigs));
log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to
{} with metadata hash {} and validated topic epoch {}.", groupId, memberId,
groupEpoch, metadataHash, validatedTopologyEpoch);
metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
group.setMetadataRefreshDeadline(currentTimeMs +
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
}
+ // Schedule initial rebalance delay for new streams groups to coalesce
joins.
+ int initialDelayMs = streamsGroupInitialRebalanceDelayMs(groupId);
+ if (isInitialRebalance && initialDelayMs > 0) {
Review Comment:
The `isInitialRebalance` check on line 1988 is evaluated before
`bumpGroupEpoch` is processed (line 1989-1999), but the condition relies on
`group.isEmpty()` which becomes false after the member is added to the group
earlier in the method. This means `isInitialRebalance` will always be false for
the first member join since the member is already added to the group before
this check. The timer scheduling on line 2003 may never execute as intended.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1985,30 +1985,54 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
// Actually bump the group epoch
int groupEpoch = group.groupEpoch();
+ boolean isInitialRebalance = (group.isEmpty() && groupEpoch == 0);
if (bumpGroupEpoch) {
- groupEpoch += 1;
+ if (isInitialRebalance) {
+ groupEpoch += 2;
+ } else {
+ groupEpoch += 1;
+ }
records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch,
metadataHash, validatedTopologyEpoch, currentAssignmentConfigs));
log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to
{} with metadata hash {} and validated topic epoch {}.", groupId, memberId,
groupEpoch, metadataHash, validatedTopologyEpoch);
metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
group.setMetadataRefreshDeadline(currentTimeMs +
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
}
+ // Schedule initial rebalance delay for new streams groups to coalesce
joins.
+ int initialDelayMs = streamsGroupInitialRebalanceDelayMs(groupId);
+ if (isInitialRebalance && initialDelayMs > 0) {
+ timer.scheduleIfAbsent(
+ streamsInitialRebalanceKey(groupId),
+ initialDelayMs,
+ TimeUnit.MILLISECONDS,
+ false,
+ () -> fireStreamsInitialRebalance(groupId)
+ );
+ }
Review Comment:
The timer is only scheduled when `bumpGroupEpoch` is true (due to placement
after line 1999), but `isInitialRebalance` is checked independently. If the
first member join doesn't trigger a group epoch bump (which could happen if no
metadata changes occur), the initial rebalance timer may not be scheduled.
Consider scheduling the timer before the `bumpGroupEpoch` condition or ensuring
it's always scheduled for the first member.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]