[ 
https://issues.apache.org/jira/browse/KAFKA-6612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16412336#comment-16412336
 ] 

ASF GitHub Bot commented on KAFKA-6612:
---------------------------------------

becketqin closed pull request #4666: KAFKA-6612: Added logic to prevent 
increasing partition counts during topic deletion
URL: https://github.com/apache/kafka/pull/4666
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index a8707ad887d..c24c5762709 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1308,14 +1308,31 @@ class KafkaController(val config: KafkaConfig, 
zkClient: KafkaZkClient, time: Ti
   case class PartitionModifications(topic: String) extends ControllerEvent {
     override def state: ControllerState = ControllerState.TopicChange
 
+    def restorePartitionReplicaAssignment(topic: String, 
newPartitionReplicaAssignment : immutable.Map[TopicPartition, Seq[Int]]): Unit 
= {
+      info("Restoring the partition replica assignment for topic 
%s".format(topic))
+
+      val existingPartitions = 
zkClient.getChildren(TopicPartitionsZNode.path(topic))
+      val existingPartitionReplicaAssignment = 
newPartitionReplicaAssignment.filter(p =>
+        existingPartitions.contains(p._1.partition.toString))
+
+      zkClient.setTopicAssignment(topic, existingPartitionReplicaAssignment)
+    }
+
     override def process(): Unit = {
       if (!isActive) return
       val partitionReplicaAssignment = 
zkClient.getReplicaAssignmentForTopics(immutable.Set(topic))
       val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
         !controllerContext.partitionReplicaAssignment.contains(p._1))
       if (topicDeletionManager.isTopicQueuedUpForDeletion(topic))
-        error(s"Skipping adding partitions 
${partitionsToBeAdded.map(_._1.partition).mkString(",")} for topic $topic " +
-          "since it is currently being deleted")
+        if (partitionsToBeAdded.nonEmpty) {
+          warn("Skipping adding partitions %s for topic %s since it is 
currently being deleted"
+            .format(partitionsToBeAdded.map(_._1.partition).mkString(","), 
topic))
+
+          restorePartitionReplicaAssignment(topic, partitionReplicaAssignment)
+        } else {
+          // This can happen if existing partition replica assignment are 
restored to prevent increasing partition count during topic deletion
+          info("Ignoring partition change during topic deletion as no new 
partitions are added")
+        }
       else {
         if (partitionsToBeAdded.nonEmpty) {
           info(s"New partitions to be added $partitionsToBeAdded")
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 897cc598123..ef455d4457e 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -24,7 +24,8 @@ import org.junit.Assert._
 import org.junit.{After, Test}
 import java.util.Properties
 
-import kafka.common.TopicAlreadyMarkedForDeletionException
+import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition}
+import kafka.controller.{OfflineReplica, PartitionAndReplica, 
ReplicaDeletionSuccessful}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
 
@@ -145,6 +146,86 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
   }
 
+  private def getController() : (KafkaServer, Int) = {
+    val controllerId = zkClient.getControllerId.getOrElse(fail("Controller 
doesn't exist"))
+    val controller = servers.find(s => s.config.brokerId == controllerId).get
+    (controller, controllerId)
+  }
+
+  private def ensureControllerExists() = {
+    TestUtils.waitUntilTrue(() => {
+      try {
+        getController()
+        true
+      } catch {
+        case _: Throwable  => false
+      }
+    }, "Controller should eventually exist")
+  }
+
+  private def getAllReplicasFromAssignment(topic : String, assignment : 
Map[Int, Seq[Int]]) : Set[PartitionAndReplica] = {
+    assignment.flatMap { case (partition, replicas) =>
+      replicas.map {r => new PartitionAndReplica(new TopicPartition(topic, 
partition), r)}
+    }.toSet
+  }
+
+  @Test
+  def testIncreasePartitiovnCountDuringDeleteTopic() {
+    val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
+    val topic = "test"
+    val topicPartition = new TopicPartition(topic, 0)
+    val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false)
+    brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
+    // create brokers
+    val allServers = brokerConfigs.map(b => 
TestUtils.createServer(KafkaConfig.fromProps(b)))
+    this.servers = allServers
+    val servers = allServers.filter(s => 
expectedReplicaAssignment(0).contains(s.config.brokerId))
+    // create the topic
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, 
expectedReplicaAssignment)
+    // wait until replica log is created on every broker
+    TestUtils.waitUntilTrue(() => 
servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
+      "Replicas for topic test not created.")
+    // shutdown a broker to make sure the following topic deletion will be 
suspended
+    val leaderIdOpt = zkClient.getLeaderForPartition(topicPartition)
+    assertTrue("Leader should exist for partition [test,0]", 
leaderIdOpt.isDefined)
+    val follower = servers.filter(s => s.config.brokerId != 
leaderIdOpt.get).last
+    follower.shutdown()
+    // start topic deletion
+    adminZkClient.deleteTopic(topic)
+
+    // make sure deletion of all of the topic's replicas have been tried
+    ensureControllerExists()
+    val (controller, controllerId) = getController()
+    val allReplicasForTopic = getAllReplicasFromAssignment(topic, 
expectedReplicaAssignment)
+    TestUtils.waitUntilTrue(() => {
+      val replicasInDeletionSuccessful = 
controller.kafkaController.replicaStateMachine.replicasInState(topic, 
ReplicaDeletionSuccessful)
+      val offlineReplicas = 
controller.kafkaController.replicaStateMachine.replicasInState(topic, 
OfflineReplica)
+      allReplicasForTopic == (replicasInDeletionSuccessful union 
offlineReplicas)
+    }, s"Not all replicas for topic $topic are in states of either 
ReplicaDeletionSuccessful or OfflineReplica")
+
+    // increase the partition count for topic
+    val topicCommandOptions = new 
TopicCommand.TopicCommandOptions(Array("--zookeeper", zkConnect, "--alter", 
"--topic", topic, "--partitions", "2"))
+    TopicCommand.alterTopic(zkClient, topicCommandOptions)
+
+    // trigger a controller switch now
+    val previousControllerId = controllerId
+
+    controller.shutdown()
+
+    ensureControllerExists()
+    // wait until a new controller to show up
+    TestUtils.waitUntilTrue(() => {
+      val (newController, newControllerId) = getController()
+      newControllerId != previousControllerId
+    }, "The new controller should not have the failed controller id")
+
+    // bring back the failed brokers
+    follower.startup()
+    controller.startup()
+    TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
+  }
+
+
   @Test
   def testDeleteTopicDuringAddPartition() {
     val topic = "test"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Added logic to prevent increasing partition counts during topic deletion
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-6612
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6612
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Lucas Wang
>            Assignee: Lucas Wang
>            Priority: Major
>
> Problem: trying to increase the partition count of a topic while the topic 
> deletion is in progress can cause the topic to be never deleted.
> In the current code base, if a topic deletion is still in progress and the 
> partition count is increased,
> the new partition and its replica assignment be created on zookeeper as data 
> of the path /brokers/topics/<topic>.
> Upon detecting the change, the controller sees the topic is being deleted, 
> and therefore ignores the partition change. Therefore the zk path 
> /brokers/topics/<topic>/partitions/<partition id> will NOT be created.
> If a controller switch happens next, the added partition will be detected by 
> the new controller and stored in the 
> controllerContext.partitionReplicaAssignment. The new controller then tries 
> to delete the topic by first transitioning its replicas to OfflineReplica. 
> However the transition to OfflineReplica state will NOT succeed since there 
> is no leader for the partition. Since the only state change path for a 
> replica to be successfully deleted is OfflineReplica -> 
> ReplicaDeletionStarted -> ReplicaDeletionSuccessful, not being able to enter 
> the OfflineReplica state means the replica can never be successfully deleted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to