dajac commented on code in PR #12748: URL: https://github.com/apache/kafka/pull/12748#discussion_r1017718371
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ########## @@ -1229,4 +1247,23 @@ private boolean in(Set<ConsumerPair> pairs) { return false; } } + + // An inner class to store ownedPartitions and generation data + private static class OwnedPartitionsWithGeneration { Review Comment: This is basically `MemberData` but with a different name. Could we reuse `MemberData` instead? ########## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java: ########## @@ -138,13 +146,18 @@ public Optional<String> groupInstanceId() { return groupInstanceId; } + public Optional<Integer> generationId() { + return generationId; + } + @Override public String toString() { return "Subscription(" + "topics=" + topics + (userData == null ? "" : ", userDataSize=" + userData.remaining()) + ", ownedPartitions=" + ownedPartitions + ", groupInstanceId=" + (groupInstanceId.map(String::toString).orElse("null")) + + ", generationId=" + (generationId.orElse(-1)) + Review Comment: nit: Are the parenthesis needed here? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ########## @@ -162,6 +162,23 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, return isAllSubscriptionsEqual; } + private OwnedPartitionsWithGeneration findOwnedPartitionsWithGeneration(Subscription subscription) { Review Comment: nit: Starting with `find` is a bit weird here. Following on my previous comment, how about `memberDataFromSubscription`? ########## clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java: ########## @@ -78,6 +80,26 @@ public void testDecodeGeneration() { assertFalse(((CooperativeStickyAssignor) assignor).memberData(subscription).generation.isPresent()); } + @Test + public void testCooperativeStickyAssignorHonorSubscriptionUserdataIfNoGenerationIdInField() { + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 2); + int higherGenerationId = 2; + int lowerGenerationId = 1; + + assignor.onAssignment(new ConsumerPartitionAssignor.Assignment(partitions(tp1)), new ConsumerGroupMetadata(groupId, higherGenerationId, consumer1, Optional.empty())); + ByteBuffer userDataWithHigherGenerationId = assignor.subscriptionUserData(new HashSet<>(topics(topic))); Review Comment: I think that your goal is to basically ensure that `findOwnedPartitionsWithGeneration` works as expected given all the possible inputs. Should we just add a unit test for this method for both assignors with all the possible inputs? The intent would be clearer in my opinion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org