[ https://issues.apache.org/jira/browse/KAFKA-6650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440195#comment-16440195 ]
ASF GitHub Bot commented on KAFKA-6650: --------------------------------------- junrao closed pull request #4825: KAFKA-6650: Allowing transition to OfflineReplica state for replicas without leadership info URL: https://github.com/apache/kafka/pull/4825 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index a2d04e65ae6..5fafcc4fe3f 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -202,8 +202,10 @@ class ReplicaStateMachine(config: KafkaConfig, controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition, deletePartition = false, (_, _) => ()) } - val replicasToRemoveFromIsr = validReplicas.filter(replica => controllerContext.partitionLeadershipInfo.contains(replica.topicPartition)) - val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasToRemoveFromIsr.map(_.topicPartition)) + val (replicasWithLeadershipInfo, replicasWithoutLeadershipInfo) = validReplicas.partition { replica => + controllerContext.partitionLeadershipInfo.contains(replica.topicPartition) + } + val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition)) updatedLeaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) => if (!topicDeletionManager.isPartitionToBeDeleted(partition)) { val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId) @@ -216,6 +218,11 @@ class ReplicaStateMachine(config: KafkaConfig, logSuccessfulTransition(replicaId, partition, replicaState(replica), OfflineReplica) replicaState.put(replica, OfflineReplica) } + + replicasWithoutLeadershipInfo.foreach { replica => + logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica), OfflineReplica) + replicaState.put(replica, OfflineReplica) + } case ReplicaDeletionStarted => validReplicas.foreach { replica => logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica), ReplicaDeletionStarted) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 9b58fc7cc4b..a65128ad98a 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -1370,7 +1370,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean * @return true if path gets deleted successfully, false if root path doesn't exist * @throws KeeperException if there is an error while deleting the znodes */ - private[zk] def deleteRecursive(path: String): Boolean = { + def deleteRecursive(path: String): Boolean = { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(path)) getChildrenResponse.resultCode match { case Code.OK => diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index ef455d4457e..4c033c421bb 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -17,7 +17,7 @@ package kafka.admin import kafka.log.Log -import kafka.zk.ZooKeeperTestHarness +import kafka.zk.{TopicPartitionZNode, ZooKeeperTestHarness} import kafka.utils.TestUtils import kafka.server.{KafkaConfig, KafkaServer} import org.junit.Assert._ @@ -326,7 +326,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { brokerConfigs.head.setProperty("log.segment.bytes","100") brokerConfigs.head.setProperty("log.cleaner.dedupe.buffer.size","1048577") - servers = createTestTopicAndCluster(topic,brokerConfigs) + servers = createTestTopicAndCluster(topic, brokerConfigs, expectedReplicaAssignment) // for simplicity, we are validating cleaner offsets on a single broker val server = servers.head @@ -363,18 +363,18 @@ class DeleteTopicTest extends ZooKeeperTestHarness { TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) } - private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: Boolean = true): Seq[KafkaServer] = { + private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: Boolean = true, replicaAssignment: Map[Int, List[Int]] = expectedReplicaAssignment): Seq[KafkaServer] = { val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, enableControlledShutdown = false) brokerConfigs.foreach(_.setProperty("delete.topic.enable", deleteTopicEnabled.toString)) - createTestTopicAndCluster(topic, brokerConfigs) + createTestTopicAndCluster(topic, brokerConfigs, replicaAssignment) } - private def createTestTopicAndCluster(topic: String, brokerConfigs: Seq[Properties]): Seq[KafkaServer] = { + private def createTestTopicAndCluster(topic: String, brokerConfigs: Seq[Properties], replicaAssignment: Map[Int, List[Int]]): Seq[KafkaServer] = { val topicPartition = new TopicPartition(topic, 0) // create brokers val servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) // create the topic - adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment) + adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, replicaAssignment) // wait until replica log is created on every broker TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined), "Replicas for topic test not created") @@ -408,4 +408,35 @@ class DeleteTopicTest extends ZooKeeperTestHarness { val leaderIdOpt = zkClient.getLeaderForPartition(new TopicPartition(topic, 0)) assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined) } + + @Test + def testDeletingPartiallyDeletedTopic() { + /** + * A previous controller could have deleted some partitions of a topic from ZK, but not all partitions, and then crashed. + * In that case, the new controller should be able to handle the partially deleted topic, and finish the deletion. + */ + + val replicaAssignment = Map(0 -> List(0, 1, 2), 1 -> List(0, 1, 2)) + val topic = "test" + servers = createTestTopicAndCluster(topic, true, replicaAssignment) + + /** + * shutdown all brokers in order to create a partially deleted topic on ZK + */ + servers.foreach(_.shutdown()) + + /** + * delete the partition znode at /brokers/topics/test/partition/0 + * to simulate the case that a previous controller crashed right after deleting the partition znode + */ + zkClient.deleteRecursive(TopicPartitionZNode.path(new TopicPartition(topic, 0))) + adminZkClient.deleteTopic(topic) + + /** + * start up all brokers and verify that topic deletion eventually finishes. + */ + servers.foreach(_.startup()) + TestUtils.waitUntilTrue(() => servers.exists(_.kafkaController.isActive), "No controller is elected") + TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers) + } } diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala index 6a961a53157..14d2df2e8f8 100644 --- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala @@ -119,7 +119,7 @@ class ReplicaStateMachineTest extends JUnitSuite { EasyMock.replay(mockControllerBrokerRequestBatch) replicaStateMachine.handleStateChanges(replicas, OfflineReplica) EasyMock.verify(mockControllerBrokerRequestBatch) - assertEquals(NewReplica, replicaState(replica)) + assertEquals(OfflineReplica, replicaState(replica)) } @Test ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The controller should be able to handle a partially deleted topic > ----------------------------------------------------------------- > > Key: KAFKA-6650 > URL: https://issues.apache.org/jira/browse/KAFKA-6650 > Project: Kafka > Issue Type: Bug > Reporter: Lucas Wang > Assignee: Lucas Wang > Priority: Minor > > A previous controller could have deleted some partitions of a topic from ZK, > but not all partitions, and then died. > In that case, the new controller should be able to handle the partially > deleted topic, and finish the deletion. > In the current code base, if there is no leadership info for a replica's > partition, the transition to OfflineReplica state for the replica will fail. > Afterwards the transition to ReplicaDeletionStarted will fail as well since > the only valid previous state for ReplicaDeletionStarted is OfflineReplica. > Furthermore, it means the topic deletion will never finish. -- This message was sent by Atlassian JIRA (v7.6.3#76005)