dajac commented on code in PR #12748:
URL: https://github.com/apache/kafka/pull/12748#discussion_r1017718371


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -1229,4 +1247,23 @@ private boolean in(Set<ConsumerPair> pairs) {
             return false;
         }
     }
+
+    // An inner class to store ownedPartitions and generation data
+    private static class OwnedPartitionsWithGeneration {

Review Comment:
   This is basically `MemberData` but with a different name. Could we reuse 
`MemberData` instead?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java:
##########
@@ -138,13 +146,18 @@ public Optional<String> groupInstanceId() {
             return groupInstanceId;
         }
 
+        public Optional<Integer> generationId() {
+            return generationId;
+        }
+
         @Override
         public String toString() {
             return "Subscription(" +
                 "topics=" + topics +
                 (userData == null ? "" : ", userDataSize=" + 
userData.remaining()) +
                 ", ownedPartitions=" + ownedPartitions +
                 ", groupInstanceId=" + 
(groupInstanceId.map(String::toString).orElse("null")) +
+                ", generationId=" + (generationId.orElse(-1)) +

Review Comment:
   nit: Are the parenthesis needed here?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -162,6 +162,23 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
         return isAllSubscriptionsEqual;
     }
 
+    private OwnedPartitionsWithGeneration 
findOwnedPartitionsWithGeneration(Subscription subscription) {

Review Comment:
   nit: Starting with `find` is a bit weird here. Following on my previous 
comment, how about `memberDataFromSubscription`?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java:
##########
@@ -78,6 +80,26 @@ public void testDecodeGeneration() {
         assertFalse(((CooperativeStickyAssignor) 
assignor).memberData(subscription).generation.isPresent());
     }
 
+    @Test
+    public void 
testCooperativeStickyAssignorHonorSubscriptionUserdataIfNoGenerationIdInField() 
{
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 2);
+        int higherGenerationId = 2;
+        int lowerGenerationId = 1;
+
+        assignor.onAssignment(new 
ConsumerPartitionAssignor.Assignment(partitions(tp1)), new 
ConsumerGroupMetadata(groupId, higherGenerationId, consumer1, 
Optional.empty()));
+        ByteBuffer userDataWithHigherGenerationId = 
assignor.subscriptionUserData(new HashSet<>(topics(topic)));

Review Comment:
   I think that your goal is to basically ensure that 
`findOwnedPartitionsWithGeneration` works as expected given all the possible 
inputs. Should we just add a unit test for this method for both assignors with 
all the possible inputs? The intent would be clearer in my opinion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to