jolshan commented on code in PR #15023:
URL: https://github.com/apache/kafka/pull/15023#discussion_r1430774988
##########
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java:
##########
@@ -1068,143 +1067,328 @@ protected boolean retainTopic(String topic, boolean
isInternal, long nowMs) {
}
@Test
- public void testUpdatePartitionLeadership() {
+ public void testTopicMetadataOnUpdatePartitionLeadership() {
+ String topic = "input-topic";
+ Uuid topicId = Uuid.randomUuid();
+
+ Time time = new MockTime();
+
+ metadata = new Metadata(
+ refreshBackoffMs,
+ refreshBackoffMaxMs,
+ metadataExpireMs,
+ new LogContext(),
+ new ClusterResourceListeners());
+ Node node1 = new Node(1, "localhost", 9091);
+ Node node2 = new Node(2, "localhost", 9091);
+
+ TopicPartition tp0 = new TopicPartition(topic, 0);
+ MetadataResponse.PartitionMetadata partition0 = new
MetadataResponse.PartitionMetadata(
+ Errors.NONE,
+ tp0,
+ Optional.of(1),
+ Optional.of(1),
+ Arrays.asList(1, 2),
+ Arrays.asList(1, 2),
+ Collections.emptyList()
+ );
+ TopicPartition tp1 = new TopicPartition(topic, 1);
+ MetadataResponse.PartitionMetadata partition1 =
+ new MetadataResponse.PartitionMetadata(
+ Errors.NONE,
+ tp1,
+ Optional.of(1),
+ Optional.of(1),
+ Arrays.asList(1, 2),
+ Arrays.asList(1, 2),
+ Collections.emptyList()
+ );
+ MetadataResponse.TopicMetadata topicMetadata = new
MetadataResponse.TopicMetadata(
+ Errors.NONE,
+ topic,
+ topicId,
+ false,
+ Arrays.asList(partition0, partition1),
+ MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED
+ );
+
+ // Initialize metadata with two partitions
+ MetadataResponse response = RequestTestUtils.metadataResponse(
+ Arrays.asList(node1, node2),
+ "clusterId",
+ node1.id(),
+ Collections.singletonList(topicMetadata));
+ metadata.updateWithCurrentRequestVersion(
+ response,
+ false,
+ time.milliseconds());
+ assertEquals(2, metadata.fetch().partitionsForTopic(topic).size());
+ assertEquals(1, metadata.fetch().partition(tp0).leader().id());
+ assertEquals(1, metadata.fetch().partition(tp1).leader().id());
+
+ // "input-topic" partition 1 leader changes from node 1 to node 2
+ metadata.updatePartitionLeadership(
Review Comment:
Is this the KIP-951 specific logic?
##########
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java:
##########
@@ -1068,143 +1067,328 @@ protected boolean retainTopic(String topic, boolean
isInternal, long nowMs) {
}
@Test
- public void testUpdatePartitionLeadership() {
+ public void testTopicMetadataOnUpdatePartitionLeadership() {
+ String topic = "input-topic";
+ Uuid topicId = Uuid.randomUuid();
+
+ Time time = new MockTime();
+
+ metadata = new Metadata(
+ refreshBackoffMs,
+ refreshBackoffMaxMs,
+ metadataExpireMs,
+ new LogContext(),
+ new ClusterResourceListeners());
+ Node node1 = new Node(1, "localhost", 9091);
+ Node node2 = new Node(2, "localhost", 9091);
+
+ TopicPartition tp0 = new TopicPartition(topic, 0);
+ MetadataResponse.PartitionMetadata partition0 = new
MetadataResponse.PartitionMetadata(
+ Errors.NONE,
+ tp0,
+ Optional.of(1),
+ Optional.of(1),
+ Arrays.asList(1, 2),
+ Arrays.asList(1, 2),
+ Collections.emptyList()
+ );
+ TopicPartition tp1 = new TopicPartition(topic, 1);
+ MetadataResponse.PartitionMetadata partition1 =
+ new MetadataResponse.PartitionMetadata(
+ Errors.NONE,
+ tp1,
+ Optional.of(1),
+ Optional.of(1),
+ Arrays.asList(1, 2),
+ Arrays.asList(1, 2),
+ Collections.emptyList()
+ );
+ MetadataResponse.TopicMetadata topicMetadata = new
MetadataResponse.TopicMetadata(
+ Errors.NONE,
+ topic,
+ topicId,
+ false,
+ Arrays.asList(partition0, partition1),
+ MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED
+ );
+
+ // Initialize metadata with two partitions
+ MetadataResponse response = RequestTestUtils.metadataResponse(
+ Arrays.asList(node1, node2),
+ "clusterId",
+ node1.id(),
+ Collections.singletonList(topicMetadata));
+ metadata.updateWithCurrentRequestVersion(
+ response,
+ false,
+ time.milliseconds());
+ assertEquals(2, metadata.fetch().partitionsForTopic(topic).size());
+ assertEquals(1, metadata.fetch().partition(tp0).leader().id());
+ assertEquals(1, metadata.fetch().partition(tp1).leader().id());
+
+ // "input-topic" partition 1 leader changes from node 1 to node 2
+ metadata.updatePartitionLeadership(
+ Collections.singletonMap(
+ tp1,
+ new Metadata.LeaderIdAndEpoch(
+ Optional.of(2),
+ Optional.of(3)
+ )),
+ Arrays.asList(node1)
+ );
+ assertEquals(2, metadata.fetch().partitionsForTopic(topic).size());
+ assertEquals(1, metadata.fetch().partition(tp0).leader().id());
+ assertEquals(2, metadata.fetch().partition(tp1).leader().id());
+ }
+
+ @Test
+ public void testValidPartitionLeaderUpdate() {
Time time = new MockTime();
- // Setup metadata with initial set of 2 partitions, 1 each across
topics, with 5 nodes.
- // Also setup, 1 invalid topic, 1 unauthorized topic, 1 internal topic.
+ String clusterId = "kakfa-cluster";
Review Comment:
nit: kafka
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]