Kvicii commented on code in PR #11687:
URL: https://github.com/apache/kafka/pull/11687#discussion_r883647368


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -1221,59 +1221,58 @@ class ControllerIntegrationTest extends 
QuorumTestHarness {
     adminZkClient.createTopic(tp.topic, 1, 1)
     waitForPartitionState(tp, firstControllerEpoch, 0, 
LeaderAndIsr.initialLeaderEpoch,
       "failed to get expected partition state upon topic creation")
-    val topicIdAfterCreate = 
zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
-    val id = 
servers.head.kafkaController.controllerContext.topicIds.get(tp.topic)
+    val topicIdAfterCreate = 
zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic)
     assertTrue(topicIdAfterCreate.isEmpty)
-    assertEquals(topicIdAfterCreate, id,
+    assertEquals(topicIdAfterCreate, 
servers.head.kafkaController.controllerContext.topicIds.get(tp.topic),
       "expected no topic ID, but one existed")
 
     // Upgrade to IBP 2.8
-    servers(0).shutdown()
-    servers(0).awaitShutdown()
+    servers.head.shutdown()
+    servers.head.awaitShutdown()
     servers = makeServers(1)
     waitForPartitionState(tp, firstControllerEpoch, 0, 
LeaderAndIsr.initialLeaderEpoch,
       "failed to get expected partition state upon controller restart")
-    val topicIdAfterUpgrade = 
zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
+    val (topicIdAfterUpgrade, _) = 
TestUtils.computeUntilTrue(zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic))(_.nonEmpty)
     assertEquals(topicIdAfterUpgrade, 
servers.head.kafkaController.controllerContext.topicIds.get(tp.topic),
       "expected same topic ID but it can not be found")
-    assertEquals(tp.topic(), 
servers.head.kafkaController.controllerContext.topicNames(topicIdAfterUpgrade.get),
+    assertEquals(tp.topic, 
servers.head.kafkaController.controllerContext.topicNames(topicIdAfterUpgrade.get),
       "correct topic name expected but cannot be found in the controller 
context")
 
     // Downgrade back to 2.7
-    servers(0).shutdown()
-    servers(0).awaitShutdown()
+    servers.head.shutdown()
+    servers.head.awaitShutdown()
     servers = makeServers(1, interBrokerProtocolVersion = Some(KAFKA_2_7_IV0))
     waitForPartitionState(tp, firstControllerEpoch, 0, 
LeaderAndIsr.initialLeaderEpoch,
       "failed to get expected partition state upon topic creation")
-    val topicIdAfterDowngrade = 
zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
+    val (topicIdAfterDowngrade, _) = 
TestUtils.computeUntilTrue(zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic))(_.nonEmpty)
     assertTrue(topicIdAfterDowngrade.isDefined)
     assertEquals(topicIdAfterUpgrade, topicIdAfterDowngrade,
       "expected same topic ID but it can not be found after downgrade")
     assertEquals(topicIdAfterDowngrade, 
servers.head.kafkaController.controllerContext.topicIds.get(tp.topic),
       "expected same topic ID in controller context but it is no longer found 
after downgrade")
-    assertEquals(tp.topic(), 
servers.head.kafkaController.controllerContext.topicNames(topicIdAfterUpgrade.get),
+    assertEquals(tp.topic, 
servers.head.kafkaController.controllerContext.topicNames(topicIdAfterUpgrade.get),
       "correct topic name expected but cannot be found in the controller 
context")
 
     // Reassign partitions
-    
servers(0).kafkaController.eventManager.put(ApiPartitionReassignment(reassignment,
 _ => ()))
+    
servers.head.kafkaController.eventManager.put(ApiPartitionReassignment(reassignment,
 _ => ()))
     waitForPartitionState(tp, 3, 0, 1,
       "failed to get expected partition state upon controller restart")
-    val topicIdAfterReassignment = 
zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
+    val topicIdAfterReassignment = 
zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic)
     assertTrue(topicIdAfterReassignment.isDefined)
     assertEquals(topicIdAfterUpgrade, topicIdAfterReassignment,
       "expected same topic ID but it can not be found after reassignment")
     assertEquals(topicIdAfterUpgrade, 
servers.head.kafkaController.controllerContext.topicIds.get(tp.topic),
       "expected same topic ID in controller context but is no longer found 
after reassignment")
-    assertEquals(tp.topic(), 
servers.head.kafkaController.controllerContext.topicNames(topicIdAfterUpgrade.get),
+    assertEquals(tp.topic, 
servers.head.kafkaController.controllerContext.topicNames(topicIdAfterUpgrade.get),
       "correct topic name expected but cannot be found in the controller 
context")
 
     // Upgrade back to 2.8
-    servers(0).shutdown()
-    servers(0).awaitShutdown()
+    servers.head.shutdown()
+    servers.head.awaitShutdown()
     servers = makeServers(1)
     waitForPartitionState(tp, 3, 0, 1,
       "failed to get expected partition state upon controller restart")
-    val topicIdAfterReUpgrade = 
zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
+    val (topicIdAfterReUpgrade, _) = 
TestUtils.computeUntilTrue(zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic))(_.nonEmpty)

Review Comment:
   only `ControllerIntegrationTest.scala` file ? I will modify this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to