dajac commented on a change in pull request #9506:
URL: https://github.com/apache/kafka/pull/9506#discussion_r512458078



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
##########
@@ -46,6 +46,30 @@
     private final TopicPartition tp2 = new TopicPartition("bar", 2);
     private final Optional<String> groupInstanceId = 
Optional.of("instance.id");
 
+    @Test
+    public void serializeDeserializeSubscriptionAllVersions() {
+        List<TopicPartition> ownedPartitions = Arrays.asList(
+            new TopicPartition("foo", 0),
+            new TopicPartition("bar", 0));
+        Subscription subscription = new Subscription(Arrays.asList("foo", 
"bar"),
+            ByteBuffer.wrap("hello".getBytes()), ownedPartitions);
+
+        for (short version = 
ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= 
ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) {
+            ByteBuffer buffer = 
ConsumerProtocol.serializeSubscription(subscription, version);
+            Subscription parsedSubscription = 
ConsumerProtocol.deserializeSubscription(buffer);
+
+            assertEquals(subscription.topics(), parsedSubscription.topics());
+            assertEquals(subscription.userData(), 
parsedSubscription.userData());
+            assertFalse(parsedSubscription.groupInstanceId().isPresent());
+
+            if (version >= 1) {
+                assertEquals(toSet(subscription.ownedPartitions()), 
toSet(parsedSubscription.ownedPartitions()));
+            } else {
+                assertTrue(parsedSubscription.ownedPartitions().isEmpty());

Review comment:
       Good point.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
##########
@@ -137,6 +161,19 @@ public void deserializeFutureSubscriptionVersion() {
         assertEquals(groupInstanceId, subscription.groupInstanceId());
     }
 
+    @Test
+    public void serializeDeserializeAssignmentAllVersions() {
+        List<TopicPartition> partitions = Arrays.asList(tp1, tp2);
+        Assignment assignment = new Assignment(partitions, 
ByteBuffer.wrap("hello".getBytes()));
+
+        for (short version = 
ConsumerProtocolAssignment.LOWEST_SUPPORTED_VERSION; version <= 
ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION; version++) {
+            ByteBuffer buffer = 
ConsumerProtocol.serializeAssignment(assignment, version);
+            Assignment parsedAssignment = 
ConsumerProtocol.deserializeAssignment(buffer);
+            assertEquals(toSet(partitions), 
toSet(parsedAssignment.partitions()));
+            assertEquals(parsedAssignment.userData(), 
parsedAssignment.userData());

Review comment:
       Good catch!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to