vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1402215662
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -898,31 +1000,45 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
group.setMetadataRefreshDeadline(currentTimeMs +
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
}
- // 2. Update the target assignment if the group epoch is larger than
the target assignment epoch. The
- // delta between the existing and the new target assignment is
persisted to the partition.
+ // 2. Update the target assignment if the group epoch is larger than
the target assignment epoch or a static member
+ // replaces an existing static member. The delta between the existing
and the new target assignment is persisted to the partition.
int targetAssignmentEpoch = group.assignmentEpoch();
Assignment targetAssignment = group.targetAssignment(memberId);
- if (groupEpoch > targetAssignmentEpoch) {
+ if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
String preferredServerAssignor =
group.computePreferredServerAssignor(
member,
updatedMember
).orElse(defaultAssignor.name());
try {
- TargetAssignmentBuilder.TargetAssignmentResult
assignmentResult =
- new TargetAssignmentBuilder(groupId, groupEpoch,
assignors.get(preferredServerAssignor))
+ TargetAssignmentBuilder assignmentResultBuilder =
+ new TargetAssignmentBuilder(groupId, groupEpoch,
assignors.get(preferredServerAssignor));
+ TargetAssignmentBuilder.TargetAssignmentResult
assignmentResult;
+ // A new static member is replacing an older one with the same
subscriptions.
+ // We just need to remove the older member and add the newer
one. The new member can
+ // reuse the target assignment of the older member.
+ if (staticMemberReplaced && groupEpoch ==
targetAssignmentEpoch) {
Review Comment:
Actually, `groupEpoch == targetAssignmentEpoch` is not needed. I was just
trying to ensure that the group epoch and target member epoch are the same
which is what will happen when static member is replaced. So, in a way it's
redundant. I will remove it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]