[ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587845#comment-16587845
 ] 

ASF GitHub Bot commented on KAFKA-6753:
---------------------------------------

junrao closed pull request #5388: KAFKA-6753: Updating the OfflinePartitions 
count only when necessary
URL: https://github.com/apache/kafka/pull/5388
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 096b2b4e98b..ecf6fbf33f1 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -389,7 +389,7 @@ class ControllerBrokerRequestBatch(controller: 
KafkaController, stateChangeLogge
 
     updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0)
     givenPartitions.foreach(partition => 
updateMetadataRequestPartitionInfo(partition,
-      beingDeleted = 
controller.topicDeletionManager.partitionsToBeDeleted.contains(partition)))
+      beingDeleted = 
controller.topicDeletionManager.topicsToBeDeleted.contains(partition.topic)))
   }
 
   def sendRequestsToBrokers(controllerEpoch: Int) {
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 645080f7641..aaf73fe1fa0 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -75,7 +75,8 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
   val topicDeletionManager = new TopicDeletionManager(this, eventManager, 
zkClient)
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, 
stateChangeLogger)
   val replicaStateMachine = new ReplicaStateMachine(config, stateChangeLogger, 
controllerContext, topicDeletionManager, zkClient, mutable.Map.empty, new 
ControllerBrokerRequestBatch(this, stateChangeLogger))
-  val partitionStateMachine = new PartitionStateMachine(config, 
stateChangeLogger, controllerContext, topicDeletionManager, zkClient, 
mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
+  val partitionStateMachine = new PartitionStateMachine(config, 
stateChangeLogger, controllerContext, zkClient, mutable.Map.empty, new 
ControllerBrokerRequestBatch(this, stateChangeLogger))
+  partitionStateMachine.setTopicDeletionManager(topicDeletionManager)
 
   private val controllerChangeHandler = new ControllerChangeHandler(this, 
eventManager)
   private val brokerChangeHandler = new BrokerChangeHandler(this, eventManager)
@@ -1052,7 +1053,9 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
       debug(s"Live brokers: ${controllerContext.liveBrokerIds.mkString(",")}")
 
       val partitionsToActOn = controllerContext.partitionsOnBroker(id).filter 
{ partition =>
-        controllerContext.partitionReplicaAssignment(partition).size > 1 && 
controllerContext.partitionLeadershipInfo.contains(partition)
+        controllerContext.partitionReplicaAssignment(partition).size > 1 &&
+          controllerContext.partitionLeadershipInfo.contains(partition) &&
+          !topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)
       }
       val (partitionsLedByBroker, partitionsFollowedByBroker) = 
partitionsToActOn.partition { partition =>
         
controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == id
@@ -1076,7 +1079,9 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
         trace(s"All leaders = 
${controllerContext.partitionLeadershipInfo.mkString(",")}")
         controllerContext.partitionLeadershipInfo.filter {
           case (topicPartition, leaderIsrAndControllerEpoch) =>
-            leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && 
controllerContext.partitionReplicaAssignment(topicPartition).size > 1
+            
!topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
+              leaderIsrAndControllerEpoch.leaderAndIsr.leader == id &&
+              
controllerContext.partitionReplicaAssignment(topicPartition).size > 1
         }.keys
       }
       replicatedPartitionsBrokerLeads().toSet
@@ -1155,10 +1160,7 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
       if (!isActive) {
         0
       } else {
-        controllerContext.partitionLeadershipInfo.count { case (tp, 
leadershipInfo) =>
-          
!controllerContext.liveOrShuttingDownBrokerIds.contains(leadershipInfo.leaderAndIsr.leader)
 &&
-            !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)
-        }
+        partitionStateMachine.offlinePartitionCount
       }
 
     preferredReplicaImbalanceCount =
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index db4c7161f35..1b43419476d 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -44,14 +44,17 @@ import scala.collection.mutable
 class PartitionStateMachine(config: KafkaConfig,
                             stateChangeLogger: StateChangeLogger,
                             controllerContext: ControllerContext,
-                            topicDeletionManager: TopicDeletionManager,
                             zkClient: KafkaZkClient,
                             partitionState: mutable.Map[TopicPartition, 
PartitionState],
                             controllerBrokerRequestBatch: 
ControllerBrokerRequestBatch) extends Logging {
   private val controllerId = config.brokerId
 
+  private var topicDeletionManager: TopicDeletionManager = _
+
   this.logIdent = s"[PartitionStateMachine controllerId=$controllerId] "
 
+  var offlinePartitionCount = 0
+
   /**
    * Invoked on successful controller election.
    */
@@ -68,9 +71,14 @@ class PartitionStateMachine(config: KafkaConfig,
    */
   def shutdown() {
     partitionState.clear()
+    offlinePartitionCount = 0
     info("Stopped partition state machine")
   }
 
+  def setTopicDeletionManager(topicDeletionManager: TopicDeletionManager) {
+    this.topicDeletionManager = topicDeletionManager
+  }
+
   /**
    * Invoked on startup of the partition's state machine to set the initial 
state for all existing partitions in
    * zookeeper
@@ -83,11 +91,11 @@ class PartitionStateMachine(config: KafkaConfig,
           // else, check if the leader for partition is alive. If yes, it is 
in Online state, else it is in Offline state
           if 
(controllerContext.isReplicaOnline(currentLeaderIsrAndEpoch.leaderAndIsr.leader,
 topicPartition))
           // leader is alive
-            partitionState.put(topicPartition, OnlinePartition)
+            changeStateTo(topicPartition, NonExistentPartition, 
OnlinePartition)
           else
-            partitionState.put(topicPartition, OfflinePartition)
+            changeStateTo(topicPartition, NonExistentPartition, 
OfflinePartition)
         case None =>
-          partitionState.put(topicPartition, NewPartition)
+          changeStateTo(topicPartition, NonExistentPartition, NewPartition)
       }
     }
   }
@@ -125,6 +133,21 @@ class PartitionStateMachine(config: KafkaConfig,
     partitionState.filter { case (_, s) => s == state }.keySet.toSet
   }
 
+  private def changeStateTo(partition: TopicPartition, currentState: 
PartitionState, targetState: PartitionState): Unit = {
+    partitionState.put(partition, targetState)
+    updateControllerMetrics(partition, currentState, targetState)
+  }
+
+  private def updateControllerMetrics(partition: TopicPartition, currentState: 
PartitionState, targetState: PartitionState) : Unit = {
+    if (!topicDeletionManager.isTopicWithDeletionStarted(partition.topic)) {
+      if (currentState != OfflinePartition && targetState == OfflinePartition) 
{
+        offlinePartitionCount = offlinePartitionCount + 1
+      } else if (currentState == OfflinePartition && targetState != 
OfflinePartition) {
+        offlinePartitionCount = offlinePartitionCount - 1
+      }
+    }
+  }
+
   /**
    * This API exercises the partition's state machine. It ensures that every 
state transition happens from a legal
    * previous state to the target state. Valid state transitions are:
@@ -158,7 +181,7 @@ class PartitionStateMachine(config: KafkaConfig,
         validPartitions.foreach { partition =>
           stateChangeLog.trace(s"Changed partition $partition state from 
${partitionState(partition)} to $targetState with " +
             s"assigned replicas 
${controllerContext.partitionReplicaAssignment(partition).mkString(",")}")
-          partitionState.put(partition, NewPartition)
+          changeStateTo(partition, partitionState(partition), NewPartition)
         }
       case OnlinePartition =>
         val uninitializedPartitions = validPartitions.filter(partition => 
partitionState(partition) == NewPartition)
@@ -168,7 +191,7 @@ class PartitionStateMachine(config: KafkaConfig,
           successfulInitializations.foreach { partition =>
             stateChangeLog.trace(s"Changed partition $partition from 
${partitionState(partition)} to $targetState with state " +
               
s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
-            partitionState.put(partition, OnlinePartition)
+            changeStateTo(partition, partitionState(partition), 
OnlinePartition)
           }
         }
         if (partitionsToElectLeader.nonEmpty) {
@@ -176,18 +199,18 @@ class PartitionStateMachine(config: KafkaConfig,
           successfulElections.foreach { partition =>
             stateChangeLog.trace(s"Changed partition $partition from 
${partitionState(partition)} to $targetState with state " +
               
s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
-            partitionState.put(partition, OnlinePartition)
+            changeStateTo(partition, partitionState(partition), 
OnlinePartition)
           }
         }
       case OfflinePartition =>
         validPartitions.foreach { partition =>
           stateChangeLog.trace(s"Changed partition $partition state from 
${partitionState(partition)} to $targetState")
-          partitionState.put(partition, OfflinePartition)
+          changeStateTo(partition, partitionState(partition), OfflinePartition)
         }
       case NonExistentPartition =>
         validPartitions.foreach { partition =>
           stateChangeLog.trace(s"Changed partition $partition state from 
${partitionState(partition)} to $targetState")
-          partitionState.put(partition, NonExistentPartition)
+          changeStateTo(partition, partitionState(partition), 
NonExistentPartition)
         }
     }
   }
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 
b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index c9f0640b139..1ab8a43dfde 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -207,7 +207,7 @@ class ReplicaStateMachine(config: KafkaConfig,
         }
         val updatedLeaderIsrAndControllerEpochs = 
removeReplicasFromIsr(replicaId, 
replicasWithLeadershipInfo.map(_.topicPartition))
         updatedLeaderIsrAndControllerEpochs.foreach { case (partition, 
leaderIsrAndControllerEpoch) =>
-          if (!topicDeletionManager.isPartitionToBeDeleted(partition)) {
+          if 
(!topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)) {
             val recipients = 
controllerContext.partitionReplicaAssignment(partition).filterNot(_ == 
replicaId)
             
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipients,
               partition,
@@ -301,7 +301,7 @@ class ReplicaStateMachine(config: KafkaConfig,
     val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, 
failedUpdates) = zkClient.updateLeaderAndIsr(
       adjustedLeaderAndIsrs, controllerContext.epoch)
     val exceptionsForPartitionsWithNoLeaderAndIsrInZk = 
partitionsWithNoLeaderAndIsrInZk.flatMap { partition =>
-      if (!topicDeletionManager.isPartitionToBeDeleted(partition)) {
+      if (!topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)) {
         val exception = new StateChangeFailedException(s"Failed to change 
state of replica $replicaId for partition $partition since the leader and isr 
path in zookeeper is empty")
         Option(partition -> exception)
       } else None
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala 
b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 6e145516cce..8d93ef2fa2d 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -62,13 +62,32 @@ class TopicDeletionManager(controller: KafkaController,
   val controllerContext = controller.controllerContext
   val isDeleteTopicEnabled = controller.config.deleteTopicEnable
   val topicsToBeDeleted = mutable.Set.empty[String]
-  val partitionsToBeDeleted = mutable.Set.empty[TopicPartition]
+  /** The following topicsWithDeletionStarted variable is used to properly 
update the offlinePartitionCount metric.
+    * When a topic is going through deletion, we don't want to keep track of 
its partition state
+    * changes in the offlinePartitionCount metric, see the 
PartitionStateMachine#updateControllerMetrics
+    * for detailed logic. This goal means if some partitions of a topic are 
already
+    * in OfflinePartition state when deletion starts, we need to change the 
corresponding partition
+    * states to NonExistentPartition first before starting the deletion.
+    *
+    * However we can NOT change partition states to NonExistentPartition at 
the time of enqueuing topics
+    * for deletion. The reason is that when a topic is enqueued for deletion, 
it may be ineligible for
+    * deletion due to ongoing partition reassignments. Hence there might be a 
delay between enqueuing
+    * a topic for deletion and the actual start of deletion. In this delayed 
interval, partitions may still
+    * transition to or out of the OfflinePartition state.
+    *
+    * Hence we decide to change partition states to NonExistentPartition only 
when the actual deletion have started.
+    * For topics whose deletion have actually started, we keep track of them 
in the following topicsWithDeletionStarted
+    * variable. And once a topic is in the topicsWithDeletionStarted set, we 
are sure there will no longer
+    * be partition reassignments to any of its partitions, and only then it's 
safe to move its partitions to
+    * NonExistentPartition state. Once a topic is in the 
topicsWithDeletionStarted set, we will stop monitoring
+    * its partition state changes in the offlinePartitionCount metric
+    */
+  val topicsWithDeletionStarted = mutable.Set.empty[String]
   val topicsIneligibleForDeletion = mutable.Set.empty[String]
 
   def init(initialTopicsToBeDeleted: Set[String], 
initialTopicsIneligibleForDeletion: Set[String]): Unit = {
     if (isDeleteTopicEnabled) {
       topicsToBeDeleted ++= initialTopicsToBeDeleted
-      partitionsToBeDeleted ++= 
topicsToBeDeleted.flatMap(controllerContext.partitionsForTopic)
       topicsIneligibleForDeletion ++= initialTopicsIneligibleForDeletion & 
topicsToBeDeleted
     } else {
       // if delete topic is disabled clean the topic entries under 
/admin/delete_topics
@@ -89,7 +108,7 @@ class TopicDeletionManager(controller: KafkaController,
   def reset() {
     if (isDeleteTopicEnabled) {
       topicsToBeDeleted.clear()
-      partitionsToBeDeleted.clear()
+      topicsWithDeletionStarted.clear()
       topicsIneligibleForDeletion.clear()
     }
   }
@@ -103,7 +122,6 @@ class TopicDeletionManager(controller: KafkaController,
   def enqueueTopicsForDeletion(topics: Set[String]) {
     if (isDeleteTopicEnabled) {
       topicsToBeDeleted ++= topics
-      partitionsToBeDeleted ++= 
topics.flatMap(controllerContext.partitionsForTopic)
       resumeDeletions()
     }
   }
@@ -173,9 +191,9 @@ class TopicDeletionManager(controller: KafkaController,
       false
   }
 
-  def isPartitionToBeDeleted(topicAndPartition: TopicPartition) = {
+  def isTopicWithDeletionStarted(topic: String) = {
     if (isDeleteTopicEnabled) {
-      partitionsToBeDeleted.contains(topicAndPartition)
+      topicsWithDeletionStarted.contains(topic)
     } else
       false
   }
@@ -231,12 +249,8 @@ class TopicDeletionManager(controller: KafkaController,
     val replicasForDeletedTopic = 
controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
     // controller will remove this replica from the state machine as well as 
its partition assignment cache
     
controller.replicaStateMachine.handleStateChanges(replicasForDeletedTopic.toSeq,
 NonExistentReplica)
-    val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)
-    // move respective partition to OfflinePartition and NonExistentPartition 
state
-    
controller.partitionStateMachine.handleStateChanges(partitionsForDeletedTopic.toSeq,
 OfflinePartition)
-    
controller.partitionStateMachine.handleStateChanges(partitionsForDeletedTopic.toSeq,
 NonExistentPartition)
     topicsToBeDeleted -= topic
-    partitionsToBeDeleted.retain(_.topic != topic)
+    topicsWithDeletionStarted -= topic
     zkClient.deleteTopicZNode(topic)
     zkClient.deleteTopicConfigs(Seq(topic))
     zkClient.deleteTopicDeletions(Seq(topic))
@@ -254,6 +268,16 @@ class TopicDeletionManager(controller: KafkaController,
     info(s"Topic deletion callback for ${topics.mkString(",")}")
     // send update metadata so that brokers stop serving data for topics to be 
deleted
     val partitions = topics.flatMap(controllerContext.partitionsForTopic)
+    val unseenTopicsForDeletion = topics -- topicsWithDeletionStarted
+    if (unseenTopicsForDeletion.nonEmpty) {
+      val unseenPartitionsForDeletion = 
unseenTopicsForDeletion.flatMap(controllerContext.partitionsForTopic)
+      
controller.partitionStateMachine.handleStateChanges(unseenPartitionsForDeletion.toSeq,
 OfflinePartition)
+      
controller.partitionStateMachine.handleStateChanges(unseenPartitionsForDeletion.toSeq,
 NonExistentPartition)
+      // adding of unseenTopicsForDeletion to topicsBeingDeleted must be done 
after the partition state changes
+      // to make sure the offlinePartitionCount metric is properly updated
+      topicsWithDeletionStarted ++= unseenTopicsForDeletion
+    }
+
     
controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
 partitions)
     topics.foreach { topic =>
       onPartitionDeletion(controllerContext.partitionsForTopic(topic))
@@ -283,9 +307,9 @@ class TopicDeletionManager(controller: KafkaController,
       val successfullyDeletedReplicas = 
controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
       val replicasForDeletionRetry = aliveReplicasForTopic -- 
successfullyDeletedReplicas
       // move dead replicas directly to failed state
-      
controller.replicaStateMachine.handleStateChanges(deadReplicasForTopic.toSeq, 
ReplicaDeletionIneligible)
+      
controller.replicaStateMachine.handleStateChanges(deadReplicasForTopic.toSeq, 
ReplicaDeletionIneligible, new Callbacks())
       // send stop replica to all followers that are not in the OfflineReplica 
state so they stop sending fetch requests to the leader
-      
controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq,
 OfflineReplica)
+      
controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq,
 OfflineReplica, new Callbacks())
       debug(s"Deletion started for replicas 
${replicasForDeletionRetry.mkString(",")}")
       
controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq,
 ReplicaDeletionStarted,
         new Callbacks(stopReplicaResponseCallback = (stopReplicaResponseObj, 
replicaId) =>
diff --git 
a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala 
b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index 52f459970d1..6a587f3bd34 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -55,8 +55,9 @@ class PartitionStateMachineTest extends JUnitSuite {
     mockControllerBrokerRequestBatch = 
EasyMock.createMock(classOf[ControllerBrokerRequestBatch])
     mockTopicDeletionManager = 
EasyMock.createMock(classOf[TopicDeletionManager])
     partitionState = mutable.Map.empty[TopicPartition, PartitionState]
-    partitionStateMachine = new PartitionStateMachine(config, new 
StateChangeLogger(brokerId, true, None), controllerContext, 
mockTopicDeletionManager,
+    partitionStateMachine = new PartitionStateMachine(config, new 
StateChangeLogger(brokerId, true, None), controllerContext,
       mockZkClient, partitionState, mockControllerBrokerRequestBatch)
+    partitionStateMachine.setTopicDeletionManager(mockTopicDeletionManager)
   }
 
   @Test
@@ -312,4 +313,147 @@ class PartitionStateMachineTest extends JUnitSuite {
     assertEquals(OfflinePartition, partitionState(partition))
   }
 
+  private def prepareMockToElectLeaderForPartitions(partitions: 
Seq[TopicPartition]): Unit = {
+    val leaderAndIsr = LeaderAndIsr(brokerId, List(brokerId))
+    def prepareMockToGetTopicPartitionsStatesRaw(): Unit = {
+      val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+      val leaderIsrAndControllerEpoch = 
LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
+      val getDataResponses = partitions.map {p => GetDataResponse(Code.OK, 
null, Some(p),
+        TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat, 
ResponseMetadata(0, 0))}
+      EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
+        .andReturn(getDataResponses)
+    }
+    prepareMockToGetTopicPartitionsStatesRaw()
+
+    def prepareMockToGetLogConfigs(): Unit = {
+      val topicsForPartitionsWithNoLiveInSyncReplicas = Seq()
+      
EasyMock.expect(mockZkClient.getLogConfigs(topicsForPartitionsWithNoLiveInSyncReplicas,
 config.originals()))
+        .andReturn(Map.empty, Map.empty)
+    }
+    prepareMockToGetLogConfigs()
+
+    def prepareMockToUpdateLeaderAndIsr(): Unit = {
+      val updatedLeaderAndIsr = partitions.map { partition =>
+        partition -> leaderAndIsr.newLeaderAndIsr(brokerId, List(brokerId))
+      }.toMap
+      EasyMock.expect(mockZkClient.updateLeaderAndIsr(updatedLeaderAndIsr, 
controllerEpoch))
+        .andReturn(UpdateLeaderAndIsrResult(updatedLeaderAndIsr, Seq.empty, 
Map.empty))
+    }
+    prepareMockToUpdateLeaderAndIsr()
+  }
+
+  /**
+    * This method tests changing partitions' state to OfflinePartition 
increments the offlinePartitionCount,
+    * and changing their state back to OnlinePartition decrements the 
offlinePartitionCount
+    */
+  @Test
+  def testUpdatingOfflinePartitionsCount(): Unit = {
+    controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, 
"host", 0))
+
+    val partitionIds = Seq(0, 1, 2, 3)
+    val topic = "test"
+    val partitions = partitionIds.map(new TopicPartition("test", _))
+
+    partitions.foreach { partition =>
+      controllerContext.updatePartitionReplicaAssignment(partition, 
Seq(brokerId))
+    }
+
+    
EasyMock.expect(mockTopicDeletionManager.isTopicWithDeletionStarted(topic)).andReturn(false)
+    EasyMock.expectLastCall().anyTimes()
+    prepareMockToElectLeaderForPartitions(partitions)
+    EasyMock.replay(mockZkClient, mockTopicDeletionManager)
+
+    partitionStateMachine.handleStateChanges(partitions, NewPartition)
+    partitionStateMachine.handleStateChanges(partitions, OfflinePartition)
+    assertEquals(s"There should be ${partitions.size} offline partition(s)", 
partitions.size, partitionStateMachine.offlinePartitionCount)
+
+    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, 
Some(OfflinePartitionLeaderElectionStrategy))
+    assertEquals(s"There should be no offline partition(s)", 0, 
partitionStateMachine.offlinePartitionCount)
+  }
+
+  /**
+    * This method tests if a topic is being deleted, then changing partitions' 
state to OfflinePartition makes no change
+    * to the offlinePartitionCount
+    */
+  @Test
+  def testNoOfflinePartitionsChangeForTopicsBeingDeleted() = {
+    val partitionIds = Seq(0, 1, 2, 3)
+    val topic = "test"
+    val partitions = partitionIds.map(new TopicPartition("test", _))
+
+    
EasyMock.expect(mockTopicDeletionManager.isTopicWithDeletionStarted(topic)).andReturn(true)
+    EasyMock.expectLastCall().anyTimes()
+    EasyMock.replay(mockTopicDeletionManager)
+
+    partitionStateMachine.handleStateChanges(partitions, NewPartition)
+    partitionStateMachine.handleStateChanges(partitions, OfflinePartition)
+    assertEquals(s"There should be no offline partition(s)", 0, 
partitionStateMachine.offlinePartitionCount)
+  }
+
+  /**
+    * This method tests if some partitions are already in OfflinePartition 
state,
+    * then deleting their topic will decrement the offlinePartitionCount.
+    * For example, if partitions test-0, test-1, test-2, test-3 are in 
OfflinePartition state,
+    * and the offlinePartitionCount is 4, trying to delete the topic "test" 
means these
+    * partitions no longer qualify as offline-partitions, and the 
offlinePartitionCount
+    * should be decremented to 0.
+    */
+  @Test
+  def testUpdatingOfflinePartitionsCountDuringTopicDeletion() = {
+    val partitionIds = Seq(0, 1, 2, 3)
+    val topic = "test"
+    val partitions = partitionIds.map(new TopicPartition("test", _))
+    partitions.foreach { partition =>
+      controllerContext.updatePartitionReplicaAssignment(partition, 
Seq(brokerId))
+    }
+
+    val props = TestUtils.createBrokerConfig(brokerId, "zkConnect")
+    props.put(KafkaConfig.DeleteTopicEnableProp, "true")
+
+    val customConfig = KafkaConfig.fromProps(props)
+
+    def createMockReplicaStateMachine() = {
+      val replicaStateMachine: ReplicaStateMachine = 
EasyMock.createMock(classOf[ReplicaStateMachine])
+      
EasyMock.expect(replicaStateMachine.areAllReplicasForTopicDeleted(topic)).andReturn(false).anyTimes()
+      
EasyMock.expect(replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)).andReturn(false).anyTimes()
+      EasyMock.expect(replicaStateMachine.isAnyReplicaInState(topic, 
ReplicaDeletionIneligible)).andReturn(false).anyTimes()
+      EasyMock.expect(replicaStateMachine.replicasInState(topic, 
ReplicaDeletionIneligible)).andReturn(Set.empty).anyTimes()
+      EasyMock.expect(replicaStateMachine.replicasInState(topic, 
ReplicaDeletionStarted)).andReturn(Set.empty).anyTimes()
+      EasyMock.expect(replicaStateMachine.replicasInState(topic, 
ReplicaDeletionSuccessful)).andReturn(Set.empty).anyTimes()
+      
EasyMock.expect(replicaStateMachine.handleStateChanges(EasyMock.anyObject[Seq[PartitionAndReplica]],
+        EasyMock.anyObject[ReplicaState], EasyMock.anyObject[Callbacks]))
+
+      EasyMock.expectLastCall().anyTimes()
+      replicaStateMachine
+    }
+    val replicaStateMachine = createMockReplicaStateMachine()
+    partitionStateMachine = new PartitionStateMachine(customConfig, new 
StateChangeLogger(brokerId, true, None), controllerContext,
+      mockZkClient, partitionState, mockControllerBrokerRequestBatch)
+
+    def createMockController() = {
+      val mockController = EasyMock.createMock(classOf[KafkaController])
+      
EasyMock.expect(mockController.controllerContext).andReturn(controllerContext).anyTimes()
+      EasyMock.expect(mockController.config).andReturn(customConfig).anyTimes()
+      
EasyMock.expect(mockController.partitionStateMachine).andReturn(partitionStateMachine).anyTimes()
+      
EasyMock.expect(mockController.replicaStateMachine).andReturn(replicaStateMachine).anyTimes()
+      EasyMock.expect(mockController.sendUpdateMetadataRequest(Seq.empty, 
partitions.toSet))
+      EasyMock.expectLastCall().anyTimes()
+      mockController
+    }
+
+    val mockController = createMockController()
+    val mockEventManager = EasyMock.createMock(classOf[ControllerEventManager])
+    EasyMock.replay(mockController, replicaStateMachine, mockEventManager)
+
+    val topicDeletionManager = new TopicDeletionManager(mockController, 
mockEventManager, mockZkClient)
+    partitionStateMachine.setTopicDeletionManager(topicDeletionManager)
+
+    partitionStateMachine.handleStateChanges(partitions, NewPartition)
+    partitionStateMachine.handleStateChanges(partitions, OfflinePartition)
+    assertEquals(s"There should be ${partitions.size} offline partition(s)", 
partitions.size, mockController.partitionStateMachine.offlinePartitionCount)
+
+    topicDeletionManager.enqueueTopicsForDeletion(Set(topic))
+    assertEquals(s"There should be no offline partition(s)", 0, 
partitionStateMachine.offlinePartitionCount)
+  }
+
 }
diff --git 
a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala 
b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
index 14d2df2e8f8..c573c9f041e 100644
--- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -185,7 +185,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
         TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat, 
ResponseMetadata(0, 0))))
     EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> 
adjustedLeaderAndIsr), controllerEpoch))
       .andReturn(UpdateLeaderAndIsrResult(Map(partition -> 
updatedLeaderAndIsr), Seq.empty, Map.empty))
-    
EasyMock.expect(mockTopicDeletionManager.isPartitionToBeDeleted(partition)).andReturn(false)
+    
EasyMock.expect(mockTopicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)).andReturn(false)
     
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
       partition, updatedLeaderIsrAndControllerEpoch, replicaIds, isNew = 
false))
     
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
diff --git 
a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
 
b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
index 2fcc724f513..c1d310f93e8 100644
--- 
a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
+++ 
b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
@@ -34,32 +34,32 @@ class MetricsDuringTopicCreationDeletionTest extends 
KafkaServerTestHarness with
   private val replicationFactor = 3
   private val partitionNum = 3
   private val createDeleteIterations = 3
-  
+
   private val overridingProps = new Properties
   overridingProps.put(KafkaConfig.DeleteTopicEnableProp, "true")
   overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, "false")
-  // speed up the test for UnderReplicatedPartitions 
+  // speed up the test for UnderReplicatedPartitions
   // which relies on the ISR expiry thread to execute concurrently with topic 
creation
-  overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, "2000") 
+  overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, "2000")
 
   private val testedMetrics = 
List("OfflinePartitionsCount","PreferredReplicaImbalanceCount","UnderReplicatedPartitions")
   private val topics = List.tabulate(topicNum) (n => topicName + n)
 
   @volatile private var running = true
-  
+
   override def generateConfigs = TestUtils.createBrokerConfigs(nodesNum, 
zkConnect)
     .map(KafkaConfig.fromProps(_, overridingProps))
 
   @Before
   override def setUp {
-    // Do some Metrics Registry cleanup by removing the metrics that this test 
checks. 
+    // Do some Metrics Registry cleanup by removing the metrics that this test 
checks.
     // This is a test workaround to the issue that prior harness runs may have 
left a populated registry.
     // see https://issues.apache.org/jira/browse/KAFKA-4605
     for (m <- testedMetrics) {
         val metricName = 
Metrics.defaultRegistry.allMetrics.asScala.keys.find(_.getName.endsWith(m))
         metricName.foreach(Metrics.defaultRegistry.removeMetric)
     }
-    
+
     super.setUp
   }
 
@@ -70,7 +70,7 @@ class MetricsDuringTopicCreationDeletionTest extends 
KafkaServerTestHarness with
   def testMetricsDuringTopicCreateDelete() {
 
     // For UnderReplicatedPartitions, because of 
https://issues.apache.org/jira/browse/KAFKA-4605
-    // we can't access the metrics value of each server. So instead we 
directly invoke the method 
+    // we can't access the metrics value of each server. So instead we 
directly invoke the method
     // replicaManager.underReplicatedPartitionCount() that defines the metrics 
value.
     @volatile var underReplicatedPartitionCount = 0
 
@@ -116,7 +116,7 @@ class MetricsDuringTopicCreationDeletionTest extends 
KafkaServerTestHarness with
     // if the thread checking the gauge is still run, stop it
     running = false;
     thread.join
-    
+
     assert(offlinePartitionsCount==0, "OfflinePartitionCount not 0: "+ 
offlinePartitionsCount)
     assert(preferredReplicaImbalanceCount==0, "PreferredReplicaImbalanceCount 
not 0: " + preferredReplicaImbalanceCount)
     assert(underReplicatedPartitionCount==0, "UnderReplicatedPartitionCount 
not 0: " + underReplicatedPartitionCount)
@@ -129,7 +129,7 @@ class MetricsDuringTopicCreationDeletionTest extends 
KafkaServerTestHarness with
            .getOrElse { fail( "Unable to find metric " + metricName ) }
            ._2.asInstanceOf[Gauge[Int]]
   }
-  
+
   private def createDeleteTopics() {
     for (l <- 1 to createDeleteIterations if running) {
       // Create topics
@@ -140,17 +140,16 @@ class MetricsDuringTopicCreationDeletionTest extends 
KafkaServerTestHarness with
           case e: Exception => e.printStackTrace
         }
       }
-      Thread.sleep(500)
 
       // Delete topics
       for (t <- topics if running) {
           try {
-              adminZkClient.deleteTopic(t)
+            adminZkClient.deleteTopic(t)
+            TestUtils.verifyTopicDeletion(zkClient, t, partitionNum, servers)
           } catch {
           case e: Exception => e.printStackTrace
           }
       }
-      Thread.sleep(500)
     }
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Speed up event processing on the controller 
> --------------------------------------------
>
>                 Key: KAFKA-6753
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6753
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Lucas Wang
>            Assignee: Lucas Wang
>            Priority: Minor
>         Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. 
> This can slow down event processing on the controller tremendously. In one 
> profiling we see that updating metrics takes nearly 100% of the CPU for the 
> controller event processing thread. Specifically the slowness can be 
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to 
> calculate the offline partitions count requires iterating through all the 
> partitions in the cluster to check if the partition is offline; and 
> calculating the preferred replica imbalance count requires iterating through 
> all the partitions in the cluster to check if a partition has a leader other 
> than the preferred leader. In a large cluster, the number of partitions can 
> be quite large, all seen by the controller. Even if the time spent to check a 
> single partition is small, the accumulation effect of so many partitions in 
> the cluster can make the invocation to update metrics quite expensive. One 
> might argue that maybe the logic for processing each single partition is not 
> optimized, we checked the CPU percentage of leaf nodes in the profiling 
> result, and found that inside the loops of collection objects, e.g. the set 
> of all partitions, no single function dominates the processing. Hence the 
> large number of the partitions in a cluster is the main contributor to the 
> slowness of one invocation to update the metrics.
> 2. The invocation to update metrics is called many times when the is a high 
> number of events to be processed by the controller, one invocation after 
> processing any event.
> The patch that will be submitted tries to fix bullet 2 above, i.e. reducing 
> the number of invocations to update metrics. Instead of updating the metrics 
> after processing any event, we only periodically check if the metrics needs 
> to be updated, i.e. once every second. 
> * If after the previous invocation to update metrics, there are other types 
> of events that changed the controller’s state, then one second later the 
> metrics will be updated. 
> * If after the previous invocation, there has been no other types of events, 
> then the call to update metrics can be bypassed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to