squah-confluent commented on code in PR #21558:
URL: https://github.com/apache/kafka/pull/21558#discussion_r2853281635
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -291,55 +291,56 @@ private boolean ownsRevokedPartitions(
* This method is a lot faster than running the full reconciliation logic
in computeNextAssignment.
*
* @param memberEpoch The epoch of the member to use.
- * @param memberAssignedPartitions The assigned partitions of the member
to use.
+ * @param memberAssignedPartitionsWithEpochs The assigned partitions with
epochs of the member to use.
* @return A new ConsumerGroupMember.
*/
private ConsumerGroupMember updateCurrentAssignment(
int memberEpoch,
- Map<Uuid, Set<Integer>> memberAssignedPartitions
+ Map<Uuid, Map<Integer, Integer>> memberAssignedPartitionsWithEpochs
) {
Set<Uuid> subscribedTopicIds = subscribedTopicIds();
// Reuse the original map if no topics need to be removed.
- Map<Uuid, Set<Integer>> newAssignedPartitions;
- Map<Uuid, Set<Integer>> newPartitionsPendingRevocation;
+ Map<Uuid, Map<Integer, Integer>> newAssignedPartitionsWithEpochs;
+ Map<Uuid, Map<Integer, Integer>>
newPartitionsPendingRevocationWithEpochs;
+
if (subscribedTopicIds.isEmpty() &&
member.partitionsPendingRevocation().isEmpty()) {
- newAssignedPartitions = Map.of();
- newPartitionsPendingRevocation = memberAssignedPartitions;
+ newAssignedPartitionsWithEpochs = Map.of();
+ // Move all assigned to pending revocation with their epochs
+ newPartitionsPendingRevocationWithEpochs = new
HashMap<>(member.assignedPartitions());
} else {
- newAssignedPartitions = memberAssignedPartitions;
- newPartitionsPendingRevocation = new
HashMap<>(member.partitionsPendingRevocation());
- for (Map.Entry<Uuid, Set<Integer>> entry :
memberAssignedPartitions.entrySet()) {
+ newAssignedPartitionsWithEpochs = new
HashMap<>(member.assignedPartitions());
+ newPartitionsPendingRevocationWithEpochs = new
HashMap<>(member.partitionsPendingRevocation());
+ for (Map.Entry<Uuid, Map<Integer, Integer>> entry :
memberAssignedPartitionsWithEpochs.entrySet()) {
if (!subscribedTopicIds.contains(entry.getKey())) {
- if (newAssignedPartitions == memberAssignedPartitions) {
- newAssignedPartitions = new
HashMap<>(memberAssignedPartitions);
- newPartitionsPendingRevocation = new
HashMap<>(member.partitionsPendingRevocation());
Review Comment:
I can't remember. It looks like an oversight to me. Let's fix it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]