lucasbru commented on code in PR #21558:
URL: https://github.com/apache/kafka/pull/21558#discussion_r2853141793
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -291,55 +291,56 @@ private boolean ownsRevokedPartitions(
* This method is a lot faster than running the full reconciliation logic
in computeNextAssignment.
*
* @param memberEpoch The epoch of the member to use.
- * @param memberAssignedPartitions The assigned partitions of the member
to use.
+ * @param memberAssignedPartitionsWithEpochs The assigned partitions with
epochs of the member to use.
* @return A new ConsumerGroupMember.
*/
private ConsumerGroupMember updateCurrentAssignment(
int memberEpoch,
- Map<Uuid, Set<Integer>> memberAssignedPartitions
+ Map<Uuid, Map<Integer, Integer>> memberAssignedPartitionsWithEpochs
) {
Set<Uuid> subscribedTopicIds = subscribedTopicIds();
// Reuse the original map if no topics need to be removed.
- Map<Uuid, Set<Integer>> newAssignedPartitions;
- Map<Uuid, Set<Integer>> newPartitionsPendingRevocation;
+ Map<Uuid, Map<Integer, Integer>> newAssignedPartitionsWithEpochs;
+ Map<Uuid, Map<Integer, Integer>>
newPartitionsPendingRevocationWithEpochs;
+
if (subscribedTopicIds.isEmpty() &&
member.partitionsPendingRevocation().isEmpty()) {
- newAssignedPartitions = Map.of();
- newPartitionsPendingRevocation = memberAssignedPartitions;
+ newAssignedPartitionsWithEpochs = Map.of();
+ // Move all assigned to pending revocation with their epochs
+ newPartitionsPendingRevocationWithEpochs = new
HashMap<>(member.assignedPartitions());
} else {
- newAssignedPartitions = memberAssignedPartitions;
- newPartitionsPendingRevocation = new
HashMap<>(member.partitionsPendingRevocation());
- for (Map.Entry<Uuid, Set<Integer>> entry :
memberAssignedPartitions.entrySet()) {
+ newAssignedPartitionsWithEpochs = new
HashMap<>(member.assignedPartitions());
Review Comment:
@lucliu1108 It seems this is breaking the lazy copy optimization that Sean
has implemented here. This will copy every time, instead of just when needed.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -222,8 +256,12 @@ public Builder
updateWith(ConsumerGroupCurrentMemberAssignmentValue record) {
setMemberEpoch(record.memberEpoch());
setPreviousMemberEpoch(record.previousMemberEpoch());
setState(MemberState.fromValue(record.state()));
-
setAssignedPartitions(Utils.assignmentFromTopicPartitions(record.assignedPartitions()));
-
setPartitionsPendingRevocation(Utils.assignmentFromTopicPartitions(record.partitionsPendingRevocation()));
+ setAssignedPartitions(
+
Utils.assignmentFromTopicPartitions(record.assignedPartitions(),
record.memberEpoch())
Review Comment:
Can it be that memberEpoch is something invalid here, like -2 for a static
member? It seems, for legacy static member we would get -2 here.
That goes back to the KIP discussion I believe. I proposed to just omit the
assignment epoch for static members, while David preferred setting them to 0.
Here, we now have a mix of both situations --
legacy "static member" records get -2 as assignment epoch
new "static member" records get 0 as assignment epoch
That's a bit unfortunate. Here, we should at least make sure that default >=
0.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -395,7 +487,13 @@ public ConsumerGroupDescribeResponseData.Member
asConsumerGroupDescribeMember(
.setMemberEpoch(memberEpoch)
.setMemberId(memberId)
.setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
- .setTopicPartitions(topicPartitionsFromMap(assignedPartitions,
image)))
+ .setTopicPartitions(topicPartitionsFromMap(
Review Comment:
Seems we should create a new version of topicPartitionsFromMap which handles
assignment epochs natively. We definitely should avoid these kinds of
intermediate collections.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -291,55 +291,56 @@ private boolean ownsRevokedPartitions(
* This method is a lot faster than running the full reconciliation logic
in computeNextAssignment.
*
* @param memberEpoch The epoch of the member to use.
- * @param memberAssignedPartitions The assigned partitions of the member
to use.
+ * @param memberAssignedPartitionsWithEpochs The assigned partitions with
epochs of the member to use.
* @return A new ConsumerGroupMember.
*/
private ConsumerGroupMember updateCurrentAssignment(
int memberEpoch,
- Map<Uuid, Set<Integer>> memberAssignedPartitions
+ Map<Uuid, Map<Integer, Integer>> memberAssignedPartitionsWithEpochs
) {
Set<Uuid> subscribedTopicIds = subscribedTopicIds();
// Reuse the original map if no topics need to be removed.
- Map<Uuid, Set<Integer>> newAssignedPartitions;
- Map<Uuid, Set<Integer>> newPartitionsPendingRevocation;
+ Map<Uuid, Map<Integer, Integer>> newAssignedPartitionsWithEpochs;
+ Map<Uuid, Map<Integer, Integer>>
newPartitionsPendingRevocationWithEpochs;
+
if (subscribedTopicIds.isEmpty() &&
member.partitionsPendingRevocation().isEmpty()) {
- newAssignedPartitions = Map.of();
- newPartitionsPendingRevocation = memberAssignedPartitions;
+ newAssignedPartitionsWithEpochs = Map.of();
+ // Move all assigned to pending revocation with their epochs
+ newPartitionsPendingRevocationWithEpochs = new
HashMap<>(member.assignedPartitions());
} else {
- newAssignedPartitions = memberAssignedPartitions;
- newPartitionsPendingRevocation = new
HashMap<>(member.partitionsPendingRevocation());
- for (Map.Entry<Uuid, Set<Integer>> entry :
memberAssignedPartitions.entrySet()) {
+ newAssignedPartitionsWithEpochs = new
HashMap<>(member.assignedPartitions());
+ newPartitionsPendingRevocationWithEpochs = new
HashMap<>(member.partitionsPendingRevocation());
+ for (Map.Entry<Uuid, Map<Integer, Integer>> entry :
memberAssignedPartitionsWithEpochs.entrySet()) {
if (!subscribedTopicIds.contains(entry.getKey())) {
- if (newAssignedPartitions == memberAssignedPartitions) {
- newAssignedPartitions = new
HashMap<>(memberAssignedPartitions);
- newPartitionsPendingRevocation = new
HashMap<>(member.partitionsPendingRevocation());
Review Comment:
@squah-confluent Why another copy? We already copy it above.
Seems like Lucy is fixing a minor duplication here, just making sure we are
not breaking anything.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]