rajinisivaram commented on a change in pull request #9814:
URL: https://github.com/apache/kafka/pull/9814#discussion_r555710338
##########
File path:
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##########
@@ -884,6 +884,37 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
assertNotEquals(topicId1, topicId2)
}
+ @Test
+ def testTopicIdsAreNotAdded(): Unit = {
+ servers = makeServers(1, interBrokerProtocolVersion = Some(KAFKA_2_7_IV0))
+ TestUtils.waitUntilControllerElected(zkClient)
+ val controller = getController().kafkaController
+ val tp1 = new TopicPartition("t1", 0)
+ val assignment1 = Map(tp1.partition -> Seq(0))
+
+ // Before adding the topic, an attempt to get the ID should result in None.
+ assertEquals(None, controller.controllerContext.topicIds.get("t1"))
+
+ TestUtils.createTopic(zkClient, tp1.topic(), assignment1, servers)
+
+ // Test that the first topic has its ID added correctly
Review comment:
Comment should say: no topic ID (or just remove the comment)?
##########
File path:
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##########
@@ -884,6 +884,37 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
assertNotEquals(topicId1, topicId2)
}
+ @Test
+ def testTopicIdsAreNotAdded(): Unit = {
+ servers = makeServers(1, interBrokerProtocolVersion = Some(KAFKA_2_7_IV0))
+ TestUtils.waitUntilControllerElected(zkClient)
+ val controller = getController().kafkaController
+ val tp1 = new TopicPartition("t1", 0)
+ val assignment1 = Map(tp1.partition -> Seq(0))
+
+ // Before adding the topic, an attempt to get the ID should result in None.
+ assertEquals(None, controller.controllerContext.topicIds.get("t1"))
+
+ TestUtils.createTopic(zkClient, tp1.topic(), assignment1, servers)
+
+ // Test that the first topic has its ID added correctly
+ waitForPartitionState(tp1, firstControllerEpoch, 0,
LeaderAndIsr.initialLeaderEpoch,
+ "failed to get expected partition state upon topic creation")
+ assertEquals(None, controller.controllerContext.topicIds.get("t1"))
+
+ val tp2 = new TopicPartition("t2", 0)
+ val assignment2 = Map(tp2.partition -> Seq(0))
+ TestUtils.createTopic(zkClient, tp2.topic(), assignment2, servers)
+
+ // Test that the second topic has its ID added correctly
+ waitForPartitionState(tp2, firstControllerEpoch, 0,
LeaderAndIsr.initialLeaderEpoch,
+ "failed to get expected partition state upon topic creation")
+ assertEquals(controller.controllerContext.topicIds.get("t2"), None)
Review comment:
Make None the first argument?
##########
File path:
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##########
@@ -884,6 +884,37 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
assertNotEquals(topicId1, topicId2)
}
+ @Test
+ def testTopicIdsAreNotAdded(): Unit = {
+ servers = makeServers(1, interBrokerProtocolVersion = Some(KAFKA_2_7_IV0))
+ TestUtils.waitUntilControllerElected(zkClient)
+ val controller = getController().kafkaController
+ val tp1 = new TopicPartition("t1", 0)
+ val assignment1 = Map(tp1.partition -> Seq(0))
+
+ // Before adding the topic, an attempt to get the ID should result in None.
+ assertEquals(None, controller.controllerContext.topicIds.get("t1"))
+
+ TestUtils.createTopic(zkClient, tp1.topic(), assignment1, servers)
+
+ // Test that the first topic has its ID added correctly
+ waitForPartitionState(tp1, firstControllerEpoch, 0,
LeaderAndIsr.initialLeaderEpoch,
+ "failed to get expected partition state upon topic creation")
+ assertEquals(None, controller.controllerContext.topicIds.get("t1"))
+
+ val tp2 = new TopicPartition("t2", 0)
+ val assignment2 = Map(tp2.partition -> Seq(0))
+ TestUtils.createTopic(zkClient, tp2.topic(), assignment2, servers)
+
+ // Test that the second topic has its ID added correctly
Review comment:
As above, update or remove comment?
##########
File path:
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##########
@@ -911,6 +942,32 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
"topic ID for topic should have been removed from controller context
after deletion")
}
+ @Test
+ def testTopicIdMigrationAndHandlingWithOlderVersion(): Unit = {
+ val tp = new TopicPartition("t", 0)
+ val assignment = Map(tp.partition -> ReplicaAssignment(Seq(0), List(),
List()))
+ val adminZkClient = new AdminZkClient(zkClient)
+
+ servers = makeServers(1, interBrokerProtocolVersion = Some(KAFKA_2_7_IV0))
+ adminZkClient.createTopic(tp.topic, 1, 1)
+ waitForPartitionState(tp, firstControllerEpoch, 0,
LeaderAndIsr.initialLeaderEpoch,
+ "failed to get expected partition state upon topic creation")
+ val topicIdAfterCreate =
zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
+ assertTrue(topicIdAfterCreate.isEmpty)
Review comment:
assertEquals(None, ...)?
##########
File path:
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##########
@@ -950,6 +1027,33 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
assertEquals(topicId, controller2.controllerContext.topicIds.get("t").get)
}
+ @Test
+ def testTopicIdCreatedOnUpgrade(): Unit = {
+ servers = makeServers(1, interBrokerProtocolVersion = Some(KAFKA_2_7_IV0))
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+ val controller = getController().kafkaController
+ val tp = new TopicPartition("t", 0)
+ val assignment = Map(tp.partition -> Seq(controllerId))
+ TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment =
assignment, servers = servers)
+ waitForPartitionState(tp, firstControllerEpoch, controllerId,
LeaderAndIsr.initialLeaderEpoch,
+ "failed to get expected partition state upon topic creation")
+ val emptyTopicId = controller.controllerContext.topicIds.get("t")
+ assertEquals(None, emptyTopicId)
+
+ servers(controllerId).shutdown()
+ servers(controllerId).awaitShutdown()
+ servers = makeServers(1)
+ TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed
to elect a controller")
+ val controller2 = getController().kafkaController
+ assertNotEquals(emptyTopicId,
controller2.controllerContext.topicIds.get("t"))
+ val topicId = controller2.controllerContext.topicIds.get("t").get
+ assertEquals("t", controller2.controllerContext.topicNames(topicId))
Review comment:
Should we also check the topic id in ZooKeeper?
##########
File path: tests/kafkatest/version.py
##########
@@ -185,4 +189,10 @@ def get_version(node=None):
# 2.7.x versions
V_2_7_0 = KafkaVersion("2.7.0")
-LATEST_2_7 = V_2_7_0
+V_2_7_1 = KafkaVersion("2.7.1")
+V_2_7_2 = KafkaVersion("2.7.2")
+LATEST_2_7 = V_2_7_2
Review comment:
We are currently on 2.7.0, so don't think we need 2.7.1 and 2.7.2
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]