lucasbru commented on code in PR #21508:
URL: https://github.com/apache/kafka/pull/21508#discussion_r2827683614
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -837,6 +851,46 @@ private void validateMemberEpoch(
}
}
+ /**
+ * Creates a validator that checks per-partition assignment epochs.
+ * A commit is rejected if the partition is not assigned to the member
+ * or if the received client-side epoch is older than the partition's
assignment epoch(KIP-1251).
+ *
+ * @param member The consumer group member.
+ * @param receivedMemberEpoch The member epoch from the offset commit
request.
+ * @return A validator that checks each partition's assignment epoch.
+ */
+ private CommitPartitionValidator createAssignmentEpochValidator(
+ ConsumerGroupMember member,
+ int receivedMemberEpoch
+ ) {
+ return (topicName, topicId, partitionId) -> {
+ // Check if the partition is in the assigned partitions.
+ // If not found in assigned, check partitions pending revocation.
+ Integer assignmentEpoch = member.getAssignmentEpoch(topicId,
partitionId);
+ if (assignmentEpoch == null) {
+ assignmentEpoch = member.getPendingRevocationEpoch(topicId,
partitionId);
+ }
+
+ // If the partition is not assigned to this member, reject.
+ if (assignmentEpoch == null) {
+ throw new StaleMemberEpochException(
+ String.format("Partition %s-%d is not assigned to member
%s.",
Review Comment:
Above, we are allowing committing unassigned partitions if the "strict epoch
match" passes, that is client-side member epoch = server-side member epoch.
Then this error message is probably not sufficient as a reason. We should
also include that the client-side member epoch != server-side member epoch in
this error.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4138,9 +4138,12 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
ConsumerGroupMember member
) {
// We will write a member epoch of -2 for this departing static member.
+ // Assignment epochs are reset to 0 so when the static member rejoins,
partitions
+ // are considered assigned from epoch 0 to the new member ID.
ConsumerGroupMember leavingStaticMember = new
ConsumerGroupMember.Builder(member)
Review Comment:
It seems we also need to update the dynamic member leave code. We seem to
set setAssignedPartitions there.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -66,6 +69,8 @@ public static class Builder {
private Map<Uuid, Set<Integer>> assignedPartitions = Map.of();
private Map<Uuid, Set<Integer>> partitionsPendingRevocation = Map.of();
private ConsumerGroupMemberMetadataValue.ClassicMemberMetadata
classicMemberMetadata = null;
+ private Map<Uuid, Map<Integer, Integer>> assignedPartitionsWithEpochs
= Map.of();
Review Comment:
Why are we adding assignedPartitionsWithEpochs in addition to
assignedPartitions? This will get out of sync. I think we should just have the
second, and replace all uses of `assignedPartitions` with
`assignedPartitionsWithEpochs`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]