jolshan commented on code in PR #15023:
URL: https://github.com/apache/kafka/pull/15023#discussion_r1430775731
##########
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java:
##########
@@ -1068,143 +1067,328 @@ protected boolean retainTopic(String topic, boolean
isInternal, long nowMs) {
}
@Test
- public void testUpdatePartitionLeadership() {
+ public void testTopicMetadataOnUpdatePartitionLeadership() {
+ String topic = "input-topic";
+ Uuid topicId = Uuid.randomUuid();
+
+ Time time = new MockTime();
+
+ metadata = new Metadata(
+ refreshBackoffMs,
+ refreshBackoffMaxMs,
+ metadataExpireMs,
+ new LogContext(),
+ new ClusterResourceListeners());
+ Node node1 = new Node(1, "localhost", 9091);
+ Node node2 = new Node(2, "localhost", 9091);
+
+ TopicPartition tp0 = new TopicPartition(topic, 0);
+ MetadataResponse.PartitionMetadata partition0 = new
MetadataResponse.PartitionMetadata(
+ Errors.NONE,
+ tp0,
+ Optional.of(1),
+ Optional.of(1),
+ Arrays.asList(1, 2),
+ Arrays.asList(1, 2),
+ Collections.emptyList()
+ );
+ TopicPartition tp1 = new TopicPartition(topic, 1);
+ MetadataResponse.PartitionMetadata partition1 =
+ new MetadataResponse.PartitionMetadata(
+ Errors.NONE,
+ tp1,
+ Optional.of(1),
+ Optional.of(1),
+ Arrays.asList(1, 2),
+ Arrays.asList(1, 2),
+ Collections.emptyList()
+ );
+ MetadataResponse.TopicMetadata topicMetadata = new
MetadataResponse.TopicMetadata(
+ Errors.NONE,
+ topic,
+ topicId,
+ false,
+ Arrays.asList(partition0, partition1),
+ MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED
+ );
+
+ // Initialize metadata with two partitions
+ MetadataResponse response = RequestTestUtils.metadataResponse(
+ Arrays.asList(node1, node2),
+ "clusterId",
+ node1.id(),
+ Collections.singletonList(topicMetadata));
+ metadata.updateWithCurrentRequestVersion(
+ response,
+ false,
+ time.milliseconds());
+ assertEquals(2, metadata.fetch().partitionsForTopic(topic).size());
+ assertEquals(1, metadata.fetch().partition(tp0).leader().id());
+ assertEquals(1, metadata.fetch().partition(tp1).leader().id());
+
+ // "input-topic" partition 1 leader changes from node 1 to node 2
+ metadata.updatePartitionLeadership(
+ Collections.singletonMap(
+ tp1,
+ new Metadata.LeaderIdAndEpoch(
+ Optional.of(2),
+ Optional.of(3)
+ )),
+ Arrays.asList(node1)
+ );
+ assertEquals(2, metadata.fetch().partitionsForTopic(topic).size());
+ assertEquals(1, metadata.fetch().partition(tp0).leader().id());
+ assertEquals(2, metadata.fetch().partition(tp1).leader().id());
+ }
+
+ @Test
+ public void testValidPartitionLeaderUpdate() {
Time time = new MockTime();
- // Setup metadata with initial set of 2 partitions, 1 each across
topics, with 5 nodes.
- // Also setup, 1 invalid topic, 1 unauthorized topic, 1 internal topic.
+ String clusterId = "kakfa-cluster";
+ String unauthroizedTopics = "unauthorized-topic";
+ String invalidTopics = "invalid-topic";
+ String topic1 = "topic1";
+ // Initialize metadata
int numNodes = 5;
- metadata = new Metadata(refreshBackoffMs, refreshBackoffMaxMs,
metadataExpireMs, new LogContext(), new ClusterResourceListeners());
+ metadata = new Metadata(
+ refreshBackoffMs,
+ refreshBackoffMaxMs,
+ metadataExpireMs,
+ new LogContext(),
+ new ClusterResourceListeners());
ClusterResourceListener mockListener =
Mockito.mock(ClusterResourceListener.class);
metadata.addClusterUpdateListener(mockListener);
+ Map<String, TestTopicMetadata> topicToMetadata =
updateMetadataAndVerify(
+ metadata,
+ numNodes,
+ time,
+ clusterId,
+ singleton(unauthroizedTopics), singleton(invalidTopics),
+ mockListener);
+
+ // Remove and re-add the first updated topicPartition in topic1
+ List<PartitionMetadata> topic1Partitions =
topicToMetadata.get(topic1).partitionMetadata;
+ PartitionMetadata tp11 = topic1Partitions.remove(0);
+ Integer part1NewLeaderId = topic1Partitions.get(0).leaderId.get() + 1;
+ Integer part1NewLeaderEpoch =
topic1Partitions.get(0).leaderEpoch.get() + 1;
+ Map<TopicPartition, Metadata.LeaderIdAndEpoch> updates = new
HashMap<>();
+ updates.put(tp11.topicPartition, new
Metadata.LeaderIdAndEpoch(Optional.of(part1NewLeaderId),
+ Optional.of(part1NewLeaderEpoch)));
+ PartitionMetadata updatedPart1Metadata = new PartitionMetadata(
+ tp11.error,
+ tp11.topicPartition,
+ Optional.of(part1NewLeaderId), Optional.of(part1NewLeaderEpoch),
+ tp11.replicaIds, tp11.inSyncReplicaIds, tp11.offlineReplicaIds);
+ topicToMetadata.get(topic1).addPartitionMetadata(updatedPart1Metadata);
+
+ Node newNode = new Node(999, "testhost", 99999, "testrack");
+ List<Node> nodes = new ArrayList<>(metadata.fetch().nodes());
+ nodes.add(newNode);
+ int index = nodes.stream().filter(node -> node.id() ==
0).findFirst().map(nodes::indexOf).orElse(-1);
+ Node existingNode = nodes.get(index);
+ Node updatedNode = new Node(existingNode.id(), "newhost",
existingNode.port(), "newrack");
+ nodes.remove(index);
+ nodes.add(updatedNode);
+
+ Set<TopicPartition> updatedMetadata =
metadata.updatePartitionLeadership(updates, nodes);
+ assertEquals(1, updatedMetadata.size());
+ assertEquals(tp11.topicPartition, updatedMetadata.toArray()[0]);
+ // Validate metadata is changed for partition1, hosts are updated,
everything else remains unchanged.
+ validateForUpdatePartitionLeadership(
+ metadata,
+ topicToMetadata,
+ metadata.fetch().nodes(),
+ clusterId,
+ singleton(unauthroizedTopics), singleton(invalidTopics),
+ metadata.fetch().controller());
+ // Since cluster metadata is unchanged, listener shouldn't be called.
+ verify(mockListener).onUpdate(any());
+ Mockito.reset(mockListener);
+ }
+ @Test
+ public void testInvalidPartitionLeaderUpdate() {
+ Time time = new MockTime();
+
+ String clusterId = "kakfa-cluster";
+ String unauthroizedTopics = "unauthorized-topic";
Review Comment:
nit: unauthorized and kafka
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]