ableegoldman commented on a change in pull request #9383:
URL: https://github.com/apache/kafka/pull/9383#discussion_r501322300



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -483,7 +485,7 @@ public void testEagerSubscription() {
         Collections.sort(subscription.topics());
         assertEquals(asList("topic1", "topic2"), subscription.topics());
 
-        final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks);

Review comment:
       Can you add a test that verifies that it goes back and forth between the 
two expected values when you call `partitionAssignor.subscriptionUserData` 
multiples times (let's say 3) -- also maybe add a verification that the 
`uniqueField` has a length of just 1

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -234,15 +236,20 @@ public ByteBuffer subscriptionUserData(final Set<String> 
topics) {
         // Adds the following information to subscription
         // 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
         // 2. Map from task id to its overall lag
+        // 3. Unique Field to ensure a rebalance when a thread rejoins by 
forcing the user data to be different
 
         handleRebalanceStart(topics);
+        if (usedSubscriptionMetadataVersion >= 8) {

Review comment:
       Instead of hardcoded `8` all over, let's just define a constant for this 
similar to the `MIN_VERSION_OFFSET_SUM_SUBSCRIPTION ` in SubscriptionInfo

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -234,15 +236,20 @@ public ByteBuffer subscriptionUserData(final Set<String> 
topics) {
         // Adds the following information to subscription
         // 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
         // 2. Map from task id to its overall lag
+        // 3. Unique Field to ensure a rebalance when a thread rejoins by 
forcing the user data to be different
 
         handleRebalanceStart(topics);
+        if (usedSubscriptionMetadataVersion >= 8) {

Review comment:
       If you want, I think it's also fine to just always flip the byte and not 
even check against the used subscription version. 

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
##########
@@ -286,23 +296,23 @@ public void 
shouldEncodeAndDecodeSmallerLatestSupportedVersion() {
         final int latestSupportedVersion = LATEST_SUPPORTED_VERSION - 1;
 
         final SubscriptionInfo info =
-            new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, 
"localhost:80", TASK_OFFSET_SUMS);
+            new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, 
"localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD);
         final SubscriptionInfo expectedInfo =
-            new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, 
"localhost:80", TASK_OFFSET_SUMS);
+            new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, 
"localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD);
         assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
     }
 
     @Test
     public void shouldEncodeAndDecodeVersion7() {

Review comment:
       Can you add a test like this for the new version 8?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -212,6 +213,7 @@ public void configure(final Map<String, ?> configs) {
         rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
         taskAssignorSupplier = assignorConfiguration::taskAssignor;
         assignmentListener = assignorConfiguration.assignmentListener();
+        uniqueField = usedSubscriptionMetadataVersion >= 8 ? new byte[1] : new 
byte[0];

Review comment:
       Also, now that I think about it, the `usedSubscriptionMetadataVersion` 
will only ever be >= 8 at this point. It might be set to something lower at 
some point later on, but it has to be at least 8 when the assignor is just 
being created/configured

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
##########
@@ -313,7 +323,8 @@ public void 
shouldReturnTaskOffsetSumsMapForDecodedSubscription() {
             new SubscriptionInfo(MIN_VERSION_OFFSET_SUM_SUBSCRIPTION,
                                  LATEST_SUPPORTED_VERSION, UUID_1,
                                  "localhost:80",
-                                 TASK_OFFSET_SUMS)
+                                 TASK_OFFSET_SUMS,
+                                 IGNORED_UNIQUE_FIELD)
                 .encode());
         assertThat(info.taskOffsetSums(), is(TASK_OFFSET_SUMS));
     }

Review comment:
       I think we should also add a test to make sure that if you pass in a 
`uniqueField` to the SubscriptionInfo but the `usedVersion` is less than 8, 
that it does not actually encode this field.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -212,6 +213,7 @@ public void configure(final Map<String, ?> configs) {
         rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
         taskAssignorSupplier = assignorConfiguration::taskAssignor;
         assignmentListener = assignorConfiguration.assignmentListener();
+        uniqueField = usedSubscriptionMetadataVersion >= 8 ? new byte[1] : new 
byte[0];

Review comment:
       Sounds good. I think it's also fine to just initialize it to `new 
byte[1]` regardless of whether the version is high enough to actually need it 
or not. It's just a single byte, and simpler is better




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to