philipnee commented on code in PR #15023:
URL: https://github.com/apache/kafka/pull/15023#discussion_r1432836418


##########
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java:
##########
@@ -1067,25 +1067,102 @@ protected boolean retainTopic(String topic, boolean 
isInternal, long nowMs) {
         assertEquals(Uuid.ZERO_UUID, cluster.topicId("validTopic1"));
     }
 
+    @Test
+    public void testTopicMetadataOnUpdatePartitionLeadership() {
+        String topic = "input-topic";
+        Uuid topicId = Uuid.randomUuid();
+
+        Time time = new MockTime();
+
+        metadata = new Metadata(
+            refreshBackoffMs,
+            refreshBackoffMaxMs,
+            metadataExpireMs,
+            new LogContext(),
+            new ClusterResourceListeners());
+        Node node1 = new Node(1, "localhost", 9091);
+        Node node2 = new Node(2, "localhost", 9091);
+
+        TopicPartition tp0 = new TopicPartition(topic, 0);
+        MetadataResponse.PartitionMetadata partition0 = new 
MetadataResponse.PartitionMetadata(
+            Errors.NONE,
+            tp0,
+            Optional.of(1),
+            Optional.of(1),
+            Arrays.asList(1, 2),
+            Arrays.asList(1, 2),
+            Collections.emptyList()
+        );
+        TopicPartition tp1 = new TopicPartition(topic, 1);
+        MetadataResponse.PartitionMetadata partition1 =
+            new MetadataResponse.PartitionMetadata(
+            Errors.NONE,
+            tp1,
+            Optional.of(1),
+            Optional.of(1),
+            Arrays.asList(1, 2),
+            Arrays.asList(1, 2),
+            Collections.emptyList()
+        );
+        MetadataResponse.TopicMetadata topicMetadata = new 
MetadataResponse.TopicMetadata(
+            Errors.NONE,
+            topic,
+            topicId,
+            false,
+            Arrays.asList(partition0, partition1),
+            MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED
+        );
+
+        // Initialize metadata with two partitions
+        MetadataResponse response = RequestTestUtils.metadataResponse(
+            Arrays.asList(node1, node2),
+            "clusterId",
+            node1.id(),
+            Collections.singletonList(topicMetadata));
+        metadata.updateWithCurrentRequestVersion(
+            response,
+            false,
+            time.milliseconds());
+        assertEquals(2, metadata.fetch().partitionsForTopic(topic).size());
+        assertEquals(1, metadata.fetch().partition(tp0).leader().id());
+        assertEquals(1, metadata.fetch().partition(tp1).leader().id());
+
+        // "input-topic" partition 1 leader changes from node 1 to node 2
+        metadata.updatePartitionLeadership(
+            Collections.singletonMap(
+                tp1,
+                new Metadata.LeaderIdAndEpoch(
+                    Optional.of(2),
+                    Optional.of(3)
+            )),
+            Arrays.asList(node1)
+        );
+        assertEquals(2, metadata.fetch().partitionsForTopic(topic).size());
+        assertEquals(1, metadata.fetch().partition(tp0).leader().id());
+        assertEquals(2, metadata.fetch().partition(tp1).leader().id());
+    }
+
     @Test
     public void testUpdatePartitionLeadership() {
         Time time = new MockTime();
 
-        // Setup metadata with initial set of 2 partitions, 1 each across 
topics, with 5 nodes.
-        // Also setup, 1 invalid topic, 1 unauthorized topic, 1 internal topic.
+        // Initialize metadata
         int numNodes = 5;
         metadata = new Metadata(refreshBackoffMs, refreshBackoffMaxMs, 
metadataExpireMs, new LogContext(), new ClusterResourceListeners());
         ClusterResourceListener mockListener = 
Mockito.mock(ClusterResourceListener.class);
         metadata.addClusterUpdateListener(mockListener);
-
+        // topic1 has 2 partitions: tp11, tp12
+        // topic2 has 1 partition: tp21
         String topic1 = "topic1";
-        TopicPartition partition1 = new TopicPartition(topic1, 0);
-        PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, 
partition1, Optional.of(1), Optional.of(100), Arrays.asList(1, 2), 
Arrays.asList(1, 2), Arrays.asList(3));
+        TopicPartition tp11 = new TopicPartition(topic1, 0);
+        PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, 
tp11, Optional.of(1), Optional.of(100), Arrays.asList(1, 2), Arrays.asList(1, 
2), Arrays.asList(3));
         Uuid topic1Id = Uuid.randomUuid();
+        TopicPartition tp12 = new TopicPartition(topic1, 1);
+        PartitionMetadata part12Metadata = new PartitionMetadata(Errors.NONE, 
tp12, Optional.of(2), Optional.of(200), Arrays.asList(2, 3), Arrays.asList(2, 
3), Arrays.asList(1));

Review Comment:
   yap - i was trying to be consistent with the previous variable names such as 
`PartitionMetadata part1Metadata`. I'll follow up with the changes in a 
separated refactor PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to