vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1383612935
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -898,31 +987,44 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
group.setMetadataRefreshDeadline(currentTimeMs +
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
}
- // 2. Update the target assignment if the group epoch is larger than
the target assignment epoch. The
- // delta between the existing and the new target assignment is
persisted to the partition.
+ // 2. Update the target assignment if the group epoch is larger than
the target assignment epoch or a static member
+ // replaces an existing static member. The delta between the existing
and the new target assignment is persisted to the partition.
int targetAssignmentEpoch = group.assignmentEpoch();
Assignment targetAssignment = group.targetAssignment(memberId);
- if (groupEpoch > targetAssignmentEpoch) {
+ if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
String preferredServerAssignor =
group.computePreferredServerAssignor(
member,
updatedMember
).orElse(defaultAssignor.name());
try {
- TargetAssignmentBuilder.TargetAssignmentResult
assignmentResult =
- new TargetAssignmentBuilder(groupId, groupEpoch,
assignors.get(preferredServerAssignor))
+ TargetAssignmentBuilder assignmentResultBuilder =
+ new TargetAssignmentBuilder(groupId, groupEpoch,
assignors.get(preferredServerAssignor));
+ TargetAssignmentBuilder.TargetAssignmentResult
assignmentResult;
+ // A new static member is replacing an older one with the same
subscriptions.
+ // We just need to remove the older member and add the newer
one. The new member can
+ // reuse the target assignment of the older member.
+ if (staticMemberReplaced && groupEpoch ==
targetAssignmentEpoch) {
+ targetAssignment =
group.targetAssignment(existingStaticMember.memberId());
+ assignmentResult = assignmentResultBuilder
+ .removeMember(existingStaticMember.memberId())
+ .addOrUpdateMember(memberId, updatedMember)
+ .build();
+ records.addAll(assignmentResult.records());
+ } else {
+ assignmentResult = assignmentResultBuilder
.withMembers(group.members())
.withSubscriptionMetadata(subscriptionMetadata)
.withTargetAssignment(group.targetAssignment())
.addOrUpdateMember(memberId, updatedMember)
Review Comment:
I think I got it now. I added some state tracking to the
`TargetAssignmentBuilder` so that it doesn't compute assignments for a
replacing static member.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]