Copilot commented on code in PR #21508:
URL: https://github.com/apache/kafka/pull/21508#discussion_r2827514018
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java:
##########
@@ -289,8 +289,8 @@ public static CoordinatorRecord
newConsumerGroupCurrentAssignmentRecord(
.setMemberEpoch(member.memberEpoch())
.setPreviousMemberEpoch(member.previousMemberEpoch())
.setState(member.state().value())
-
.setAssignedPartitions(toTopicPartitions(member.assignedPartitions()))
-
.setPartitionsPendingRevocation(toTopicPartitions(member.partitionsPendingRevocation())),
+
.setAssignedPartitions(toTopicPartitions(member.assignedPartitionsWithEpochs()))
+
.setPartitionsPendingRevocation(toTopicPartitions(member.partitionsPendingRevocationWithEpochs())),
Review Comment:
newConsumerGroupCurrentAssignmentRecord now exclusively serializes
member.assignedPartitionsWithEpochs()/partitionsPendingRevocationWithEpochs().
If those maps are empty while the Set-based assignment fields are populated
(possible via existing Builder setters), this will write records with empty
assignments and effectively lose the current assignment on disk. Consider
adding a defensive fallback (e.g., derive epoch maps from
assignedPartitions()/partitionsPendingRevocation() with a sensible default
epoch) or validate/throw if the member is internally inconsistent before
serializing.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -672,9 +672,23 @@ public CommitPartitionValidator validateOffsetCommit(
throw new UnsupportedVersionException("OffsetCommit version 9 or
above must be used " +
"by members using the modern group protocol");
}
+ // For member using the classic protocol, use strict epoch validation.
+ if (member.useClassicProtocol()) {
+ validateMemberEpoch(memberEpoch, member.memberEpoch(), true);
+ return CommitPartitionValidator.NO_OP;
+ }
- validateMemberEpoch(memberEpoch, member.memberEpoch(),
member.useClassicProtocol());
- return CommitPartitionValidator.NO_OP;
+ // For member using the consumer protocol
+ // Case 1: Strict epoch match
+ if (memberEpoch == member.memberEpoch()) {
+ return CommitPartitionValidator.NO_OP;
+ }
+ // Case 2:Client epoch > broker epoch, which is an invalid request
Review Comment:
Minor comment formatting: add a space after the colon in "Case 2:Client".
```suggestion
// Case 2: Client epoch > broker epoch, which is an invalid request
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -28,6 +28,9 @@
import org.apache.kafka.coordinator.group.modern.ModernGroupMember;
import java.util.ArrayList;
+import java.util.stream.Collectors;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
Review Comment:
Import order looks inconsistent with the rest of the module:
java.util.stream.Collectors is currently placed between java.util.ArrayList and
java.util.Collections. Other files keep java.util.* imports together and place
java.util.stream.* afterwards (e.g., Utils.java:36-49). Please reorder these
imports to match the established style (this may also be enforced by
Checkstyle).
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -672,9 +672,23 @@ public CommitPartitionValidator validateOffsetCommit(
throw new UnsupportedVersionException("OffsetCommit version 9 or
above must be used " +
"by members using the modern group protocol");
}
+ // For member using the classic protocol, use strict epoch validation.
+ if (member.useClassicProtocol()) {
+ validateMemberEpoch(memberEpoch, member.memberEpoch(), true);
+ return CommitPartitionValidator.NO_OP;
+ }
- validateMemberEpoch(memberEpoch, member.memberEpoch(),
member.useClassicProtocol());
- return CommitPartitionValidator.NO_OP;
+ // For member using the consumer protocol
+ // Case 1: Strict epoch match
+ if (memberEpoch == member.memberEpoch()) {
+ return CommitPartitionValidator.NO_OP;
+ }
+ // Case 2:Client epoch > broker epoch, which is an invalid request
+ if (memberEpoch > member.memberEpoch()) {
+ throw new StaleMemberEpochException(String.format("The received
member epoch %d is larger than "
+ + "the expected member epoch %d.", memberEpoch,
member.memberEpoch()));
+ }
+ return createAssignmentEpochValidator(member, memberEpoch);
Review Comment:
The new relaxed offset-commit validation path (returning a per-partition
validator when receivedMemberEpoch < broker member epoch) doesn’t appear to be
covered by unit tests. It would be useful to add ConsumerGroupTest cases that
(1) accept commits when assignmentEpoch <= receivedEpoch < brokerEpoch for
assigned/pending-revocation partitions, and (2) reject commits when the
partition isn’t assigned or when receivedEpoch < assignmentEpoch, to prevent
regressions in the new behavior.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -205,6 +212,66 @@ public Builder
setClassicMemberMetadata(ConsumerGroupMemberMetadataValue.Classic
return this;
}
+ public Builder setAssignedPartitionsWithEpochs(Map<Uuid, Map<Integer,
Integer>> assignedPartitionsWithEpochs) {
+ this.assignedPartitionsWithEpochs = assignedPartitionsWithEpochs;
+ this.assignedPartitions =
assignedPartitionsWithEpochs.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> new HashSet<>(e.getValue().keySet())
+ ));
+ return this;
+ }
+
+ public Builder setPartitionsPendingRevocationWithEpochs(Map<Uuid,
Map<Integer, Integer>> partitionsPendingRevocationWithEpochs) {
+ this.partitionsPendingRevocationWithEpochs =
partitionsPendingRevocationWithEpochs;
+ this.partitionsPendingRevocation =
partitionsPendingRevocationWithEpochs.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> new HashSet<>(e.getValue().keySet())
+ ));
+ return this;
+ }
Review Comment:
setAssignedPartitionsWithEpochs / setPartitionsPendingRevocationWithEpochs
keep the Set-based fields in sync, but the Builder still also has the older
setAssignedPartitions(...) and setPartitionsPendingRevocation(...) setters.
Given GroupCoordinatorRecordHelpers now serializes
assignedPartitionsWithEpochs()/partitionsPendingRevocationWithEpochs(), any
code path that uses the older setters (without also setting the epoch maps) can
drop epoch information and even persist empty assignments. Consider enforcing
invariants in build() (e.g., derive epoch maps when missing), or
deprecating/removing the Set-based setters to prevent constructing an
internally inconsistent ConsumerGroupMember.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -672,9 +672,23 @@ public CommitPartitionValidator validateOffsetCommit(
throw new UnsupportedVersionException("OffsetCommit version 9 or
above must be used " +
"by members using the modern group protocol");
}
+ // For member using the classic protocol, use strict epoch validation.
+ if (member.useClassicProtocol()) {
Review Comment:
Grammar in this comment is off: consider changing "For member using the
classic protocol" to "For members using the classic protocol" (or "For a member
using...") for clarity.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]