vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1363301832
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -983,27 +1052,125 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
return new CoordinatorResult<>(records, response);
}
+ private void replaceStaticMemberInConsumerGroup(
+ String groupId,
+ List<Record> records,
+ ConsumerGroup group,
+ ConsumerGroupMember member,
+ ConsumerGroupMember existingStaticMember,
+ ConsumerGroupMember updatedMember
+ ) {
+ // Write tombstones for the departed static member. Follow the same
order as the one followed when fencing
+ // a member
+ records.add(newCurrentAssignmentTombstoneRecord(group.groupId(),
existingStaticMember.memberId()));
+ records.add(newTargetAssignmentTombstoneRecord(group.groupId(),
existingStaticMember.memberId()));
+ records.add(newMemberSubscriptionTombstoneRecord(group.groupId(),
existingStaticMember.memberId()));
+ // Cancel all the timers of the departed static member.
+ cancelConsumerGroupSessionTimeout(group.groupId(),
existingStaticMember.memberId());
+ cancelConsumerGroupRevocationTimeout(group.groupId(),
existingStaticMember.memberId());
+ // Write a record corresponding to the new member
+ records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+ TargetAssignmentBuilder.TargetAssignmentResult assignmentResult =
computeTargetAssignment(group, group.groupEpoch(), member, updatedMember);
+ records.addAll(assignmentResult.records());
+ records.add(newCurrentAssignmentRecord(groupId, updatedMember));
Review Comment:
The main reason for extracting this out was that while using the main logic,
there always always a group epoch bump even when a new static member replaces
an older one. When I debugged it further, it seems to be because of this logic
[here](https://github.com/apache/kafka/blob/trunk/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L885-L889)
. More specifically, the issue at hand is that `subscriptionMetadata` has
partition racks info while the currently stored metadata doesn't have it. This
is with regards to the test `testStaticMemberGetsBackAssignmentUponRejoin`. I
wasn't totally sure if this is an issue with the test itself but since this led
to a group epoch bump, I thought we shouldn't do it.
Actually when I think about it now, maybe it makes sense to have a group
epoch bump in this case as well. While it might go against no rebalance during
static member rejoin but the reason for rejoin is a change in subscription
metadata and not a static member re-join. The latter seemed harder to replicate
via tests though because it always bumped up the group epoch due to the above
mentioned issue. Please let me know your thoughts.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]