ableegoldman commented on a change in pull request #9383: URL: https://github.com/apache/kafka/pull/9383#discussion_r501322300
########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java ########## @@ -483,7 +485,7 @@ public void testEagerSubscription() { Collections.sort(subscription.topics()); assertEquals(asList("topic1", "topic2"), subscription.topics()); - final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks); Review comment: Can you add a test that verifies that it goes back and forth between the two expected values when you call `partitionAssignor.subscriptionUserData` multiples times (let's say 3) -- also maybe add a verification that the `uniqueField` has a length of just 1 ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -234,15 +236,20 @@ public ByteBuffer subscriptionUserData(final Set<String> topics) { // Adds the following information to subscription // 1. Client UUID (a unique id assigned to an instance of KafkaStreams) // 2. Map from task id to its overall lag + // 3. Unique Field to ensure a rebalance when a thread rejoins by forcing the user data to be different handleRebalanceStart(topics); + if (usedSubscriptionMetadataVersion >= 8) { Review comment: Instead of hardcoded `8` all over, let's just define a constant for this similar to the `MIN_VERSION_OFFSET_SUM_SUBSCRIPTION ` in SubscriptionInfo ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -234,15 +236,20 @@ public ByteBuffer subscriptionUserData(final Set<String> topics) { // Adds the following information to subscription // 1. Client UUID (a unique id assigned to an instance of KafkaStreams) // 2. Map from task id to its overall lag + // 3. Unique Field to ensure a rebalance when a thread rejoins by forcing the user data to be different handleRebalanceStart(topics); + if (usedSubscriptionMetadataVersion >= 8) { Review comment: If you want, I think it's also fine to just always flip the byte and not even check against the used subscription version. ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java ########## @@ -286,23 +296,23 @@ public void shouldEncodeAndDecodeSmallerLatestSupportedVersion() { final int latestSupportedVersion = LATEST_SUPPORTED_VERSION - 1; final SubscriptionInfo info = - new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, "localhost:80", TASK_OFFSET_SUMS); + new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD); final SubscriptionInfo expectedInfo = - new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, "localhost:80", TASK_OFFSET_SUMS); + new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD); assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode())); } @Test public void shouldEncodeAndDecodeVersion7() { Review comment: Can you add a test like this for the new version 8? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -212,6 +213,7 @@ public void configure(final Map<String, ?> configs) { rebalanceProtocol = assignorConfiguration.rebalanceProtocol(); taskAssignorSupplier = assignorConfiguration::taskAssignor; assignmentListener = assignorConfiguration.assignmentListener(); + uniqueField = usedSubscriptionMetadataVersion >= 8 ? new byte[1] : new byte[0]; Review comment: Also, now that I think about it, the `usedSubscriptionMetadataVersion` will only ever be >= 8 at this point. It might be set to something lower at some point later on, but it has to be at least 8 when the assignor is just being created/configured ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java ########## @@ -313,7 +323,8 @@ public void shouldReturnTaskOffsetSumsMapForDecodedSubscription() { new SubscriptionInfo(MIN_VERSION_OFFSET_SUM_SUBSCRIPTION, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", - TASK_OFFSET_SUMS) + TASK_OFFSET_SUMS, + IGNORED_UNIQUE_FIELD) .encode()); assertThat(info.taskOffsetSums(), is(TASK_OFFSET_SUMS)); } Review comment: I think we should also add a test to make sure that if you pass in a `uniqueField` to the SubscriptionInfo but the `usedVersion` is less than 8, that it does not actually encode this field. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -212,6 +213,7 @@ public void configure(final Map<String, ?> configs) { rebalanceProtocol = assignorConfiguration.rebalanceProtocol(); taskAssignorSupplier = assignorConfiguration::taskAssignor; assignmentListener = assignorConfiguration.assignmentListener(); + uniqueField = usedSubscriptionMetadataVersion >= 8 ? new byte[1] : new byte[0]; Review comment: Sounds good. I think it's also fine to just initialize it to `new byte[1]` regardless of whether the version is high enough to actually need it or not. It's just a single byte, and simpler is better ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org