dajac commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1362102003
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -285,6 +302,11 @@ public ConsumerGroupMember getOrMaybeCreateMember(
return member;
}
+ public ConsumerGroupMember getStaticMember(String instanceId) {
Review Comment:
nit: We don't prefix getters with `get`. Let's add javadoc as well.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -312,6 +337,9 @@ public void removeMember(String memberId) {
maybeUpdateServerAssignors(oldMember, null);
maybeRemovePartitionEpoch(oldMember);
maybeUpdateGroupState();
+ if (oldMember.instanceId() != null) {
+ staticMembers.remove(oldMember.instanceId());
+ }
Review Comment:
ditto.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -983,27 +1052,125 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
return new CoordinatorResult<>(records, response);
}
+ private void replaceStaticMemberInConsumerGroup(
+ String groupId,
+ List<Record> records,
+ ConsumerGroup group,
+ ConsumerGroupMember member,
+ ConsumerGroupMember existingStaticMember,
+ ConsumerGroupMember updatedMember
+ ) {
+ // Write tombstones for the departed static member. Follow the same
order as the one followed when fencing
+ // a member
+ records.add(newCurrentAssignmentTombstoneRecord(group.groupId(),
existingStaticMember.memberId()));
+ records.add(newTargetAssignmentTombstoneRecord(group.groupId(),
existingStaticMember.memberId()));
+ records.add(newMemberSubscriptionTombstoneRecord(group.groupId(),
existingStaticMember.memberId()));
Review Comment:
We have similar code somewhere else. Could we add a method for this and
reuse it?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -107,6 +107,11 @@ public static class DeadlineAndEpoch {
*/
Review Comment:
Let's add unit tests for the new or changed methods to the corresponding
file.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -983,27 +1052,125 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
return new CoordinatorResult<>(records, response);
}
+ private void replaceStaticMemberInConsumerGroup(
+ String groupId,
+ List<Record> records,
+ ConsumerGroup group,
+ ConsumerGroupMember member,
+ ConsumerGroupMember existingStaticMember,
+ ConsumerGroupMember updatedMember
+ ) {
+ // Write tombstones for the departed static member. Follow the same
order as the one followed when fencing
+ // a member
+ records.add(newCurrentAssignmentTombstoneRecord(group.groupId(),
existingStaticMember.memberId()));
+ records.add(newTargetAssignmentTombstoneRecord(group.groupId(),
existingStaticMember.memberId()));
+ records.add(newMemberSubscriptionTombstoneRecord(group.groupId(),
existingStaticMember.memberId()));
+ // Cancel all the timers of the departed static member.
+ cancelConsumerGroupSessionTimeout(group.groupId(),
existingStaticMember.memberId());
+ cancelConsumerGroupRevocationTimeout(group.groupId(),
existingStaticMember.memberId());
+ // Write a record corresponding to the new member
+ records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+ TargetAssignmentBuilder.TargetAssignmentResult assignmentResult =
computeTargetAssignment(group, group.groupEpoch(), member, updatedMember);
+ records.addAll(assignmentResult.records());
+ records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+ }
+
+ private TargetAssignmentBuilder.TargetAssignmentResult
computeTargetAssignment(
+ ConsumerGroup group,
+ int groupEpoch,
+ ConsumerGroupMember member,
+ ConsumerGroupMember updatedMember) {
+ String preferredServerAssignor = group.computePreferredServerAssignor(
+ member,
+ updatedMember
+ ).orElse(defaultAssignor.name());
+
+ String groupId = group.groupId();
+ Map<String, TopicMetadata> subscriptionMetadata =
group.subscriptionMetadata();
+ String memberId = member.memberId();
+ try {
+ TargetAssignmentBuilder.TargetAssignmentResult assignmentResult =
+ new TargetAssignmentBuilder(groupId, groupEpoch,
assignors.get(preferredServerAssignor))
+ .withMembers(group.members())
Review Comment:
I think that the old member will be in `members` so the computed target
assignment is incorrect. We need to remove it with `removeMember` and we also
need to set the target assignment of the new member from the old one.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -299,6 +321,9 @@ public void updateMember(ConsumerGroupMember newMember) {
maybeUpdateServerAssignors(oldMember, newMember);
maybeUpdatePartitionEpoch(oldMember, newMember);
maybeUpdateGroupState();
+ if (newMember.instanceId() != null) {
+ staticMembers.put(newMember.instanceId(), newMember.memberId());
+ }
Review Comment:
nit: Would it make sense to have a method like the others? I would also do
this before calling `maybeUpdateGroupState`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -259,6 +265,17 @@ public void setTargetAssignmentEpoch(int
targetAssignmentEpoch) {
maybeUpdateGroupState();
}
+ /**
+ * Get member id of a static member that matches the given group
+ * instance id.
+ *
+ * @param groupInstanceId the group instance id.
+ * @return the static member if it exists.
Review Comment:
nit: We tend to use a single line for getters. eg. `@return The member id
corresponding to the given instance id or null if it does not exist`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -983,27 +1052,125 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
return new CoordinatorResult<>(records, response);
}
+ private void replaceStaticMemberInConsumerGroup(
+ String groupId,
+ List<Record> records,
+ ConsumerGroup group,
+ ConsumerGroupMember member,
+ ConsumerGroupMember existingStaticMember,
+ ConsumerGroupMember updatedMember
+ ) {
+ // Write tombstones for the departed static member. Follow the same
order as the one followed when fencing
+ // a member
+ records.add(newCurrentAssignmentTombstoneRecord(group.groupId(),
existingStaticMember.memberId()));
+ records.add(newTargetAssignmentTombstoneRecord(group.groupId(),
existingStaticMember.memberId()));
+ records.add(newMemberSubscriptionTombstoneRecord(group.groupId(),
existingStaticMember.memberId()));
+ // Cancel all the timers of the departed static member.
+ cancelConsumerGroupSessionTimeout(group.groupId(),
existingStaticMember.memberId());
+ cancelConsumerGroupRevocationTimeout(group.groupId(),
existingStaticMember.memberId());
Review Comment:
Same for this one. It would be great to have a method.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -983,27 +1052,125 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
return new CoordinatorResult<>(records, response);
}
+ private void replaceStaticMemberInConsumerGroup(
+ String groupId,
+ List<Record> records,
+ ConsumerGroup group,
+ ConsumerGroupMember member,
+ ConsumerGroupMember existingStaticMember,
+ ConsumerGroupMember updatedMember
+ ) {
+ // Write tombstones for the departed static member. Follow the same
order as the one followed when fencing
+ // a member
+ records.add(newCurrentAssignmentTombstoneRecord(group.groupId(),
existingStaticMember.memberId()));
+ records.add(newTargetAssignmentTombstoneRecord(group.groupId(),
existingStaticMember.memberId()));
+ records.add(newMemberSubscriptionTombstoneRecord(group.groupId(),
existingStaticMember.memberId()));
+ // Cancel all the timers of the departed static member.
+ cancelConsumerGroupSessionTimeout(group.groupId(),
existingStaticMember.memberId());
+ cancelConsumerGroupRevocationTimeout(group.groupId(),
existingStaticMember.memberId());
+ // Write a record corresponding to the new member
+ records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+ TargetAssignmentBuilder.TargetAssignmentResult assignmentResult =
computeTargetAssignment(group, group.groupEpoch(), member, updatedMember);
+ records.addAll(assignmentResult.records());
+ records.add(newCurrentAssignmentRecord(groupId, updatedMember));
Review Comment:
I am not sold on this. Is it too difficult to reuse the main logic? There
are a few issues with this approach. For instance, the member's assignment is
not reconciled like we do in the main logic. Another one is that the
subscription metadata must be updated as well if the subscriptions have changed.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -747,6 +798,11 @@ private void throwIfMemberEpochIsInvalid(
List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
ownedTopicPartitions
) {
if (receivedMemberEpoch > member.memberEpoch()) {
+ // If a static member rejoins, it's previous epoch would be -2. In
such a
+ // case, we don't need to fence the member.
+ if (member.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH &&
receivedMemberEpoch == 0) {
+ return;
+ }
Review Comment:
I don't fully get this one. Could you please elaborate?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]