[
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587845#comment-16587845
]
ASF GitHub Bot commented on KAFKA-6753:
---------------------------------------
junrao closed pull request #5388: KAFKA-6753: Updating the OfflinePartitions
count only when necessary
URL: https://github.com/apache/kafka/pull/5388
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 096b2b4e98b..ecf6fbf33f1 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -389,7 +389,7 @@ class ControllerBrokerRequestBatch(controller:
KafkaController, stateChangeLogge
updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0)
givenPartitions.foreach(partition =>
updateMetadataRequestPartitionInfo(partition,
- beingDeleted =
controller.topicDeletionManager.partitionsToBeDeleted.contains(partition)))
+ beingDeleted =
controller.topicDeletionManager.topicsToBeDeleted.contains(partition.topic)))
}
def sendRequestsToBrokers(controllerEpoch: Int) {
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 645080f7641..aaf73fe1fa0 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -75,7 +75,8 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
val topicDeletionManager = new TopicDeletionManager(this, eventManager,
zkClient)
private val brokerRequestBatch = new ControllerBrokerRequestBatch(this,
stateChangeLogger)
val replicaStateMachine = new ReplicaStateMachine(config, stateChangeLogger,
controllerContext, topicDeletionManager, zkClient, mutable.Map.empty, new
ControllerBrokerRequestBatch(this, stateChangeLogger))
- val partitionStateMachine = new PartitionStateMachine(config,
stateChangeLogger, controllerContext, topicDeletionManager, zkClient,
mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
+ val partitionStateMachine = new PartitionStateMachine(config,
stateChangeLogger, controllerContext, zkClient, mutable.Map.empty, new
ControllerBrokerRequestBatch(this, stateChangeLogger))
+ partitionStateMachine.setTopicDeletionManager(topicDeletionManager)
private val controllerChangeHandler = new ControllerChangeHandler(this,
eventManager)
private val brokerChangeHandler = new BrokerChangeHandler(this, eventManager)
@@ -1052,7 +1053,9 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
debug(s"Live brokers: ${controllerContext.liveBrokerIds.mkString(",")}")
val partitionsToActOn = controllerContext.partitionsOnBroker(id).filter
{ partition =>
- controllerContext.partitionReplicaAssignment(partition).size > 1 &&
controllerContext.partitionLeadershipInfo.contains(partition)
+ controllerContext.partitionReplicaAssignment(partition).size > 1 &&
+ controllerContext.partitionLeadershipInfo.contains(partition) &&
+ !topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)
}
val (partitionsLedByBroker, partitionsFollowedByBroker) =
partitionsToActOn.partition { partition =>
controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == id
@@ -1076,7 +1079,9 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
trace(s"All leaders =
${controllerContext.partitionLeadershipInfo.mkString(",")}")
controllerContext.partitionLeadershipInfo.filter {
case (topicPartition, leaderIsrAndControllerEpoch) =>
- leaderIsrAndControllerEpoch.leaderAndIsr.leader == id &&
controllerContext.partitionReplicaAssignment(topicPartition).size > 1
+
!topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
+ leaderIsrAndControllerEpoch.leaderAndIsr.leader == id &&
+
controllerContext.partitionReplicaAssignment(topicPartition).size > 1
}.keys
}
replicatedPartitionsBrokerLeads().toSet
@@ -1155,10 +1160,7 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
if (!isActive) {
0
} else {
- controllerContext.partitionLeadershipInfo.count { case (tp,
leadershipInfo) =>
-
!controllerContext.liveOrShuttingDownBrokerIds.contains(leadershipInfo.leaderAndIsr.leader)
&&
- !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)
- }
+ partitionStateMachine.offlinePartitionCount
}
preferredReplicaImbalanceCount =
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index db4c7161f35..1b43419476d 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -44,14 +44,17 @@ import scala.collection.mutable
class PartitionStateMachine(config: KafkaConfig,
stateChangeLogger: StateChangeLogger,
controllerContext: ControllerContext,
- topicDeletionManager: TopicDeletionManager,
zkClient: KafkaZkClient,
partitionState: mutable.Map[TopicPartition,
PartitionState],
controllerBrokerRequestBatch:
ControllerBrokerRequestBatch) extends Logging {
private val controllerId = config.brokerId
+ private var topicDeletionManager: TopicDeletionManager = _
+
this.logIdent = s"[PartitionStateMachine controllerId=$controllerId] "
+ var offlinePartitionCount = 0
+
/**
* Invoked on successful controller election.
*/
@@ -68,9 +71,14 @@ class PartitionStateMachine(config: KafkaConfig,
*/
def shutdown() {
partitionState.clear()
+ offlinePartitionCount = 0
info("Stopped partition state machine")
}
+ def setTopicDeletionManager(topicDeletionManager: TopicDeletionManager) {
+ this.topicDeletionManager = topicDeletionManager
+ }
+
/**
* Invoked on startup of the partition's state machine to set the initial
state for all existing partitions in
* zookeeper
@@ -83,11 +91,11 @@ class PartitionStateMachine(config: KafkaConfig,
// else, check if the leader for partition is alive. If yes, it is
in Online state, else it is in Offline state
if
(controllerContext.isReplicaOnline(currentLeaderIsrAndEpoch.leaderAndIsr.leader,
topicPartition))
// leader is alive
- partitionState.put(topicPartition, OnlinePartition)
+ changeStateTo(topicPartition, NonExistentPartition,
OnlinePartition)
else
- partitionState.put(topicPartition, OfflinePartition)
+ changeStateTo(topicPartition, NonExistentPartition,
OfflinePartition)
case None =>
- partitionState.put(topicPartition, NewPartition)
+ changeStateTo(topicPartition, NonExistentPartition, NewPartition)
}
}
}
@@ -125,6 +133,21 @@ class PartitionStateMachine(config: KafkaConfig,
partitionState.filter { case (_, s) => s == state }.keySet.toSet
}
+ private def changeStateTo(partition: TopicPartition, currentState:
PartitionState, targetState: PartitionState): Unit = {
+ partitionState.put(partition, targetState)
+ updateControllerMetrics(partition, currentState, targetState)
+ }
+
+ private def updateControllerMetrics(partition: TopicPartition, currentState:
PartitionState, targetState: PartitionState) : Unit = {
+ if (!topicDeletionManager.isTopicWithDeletionStarted(partition.topic)) {
+ if (currentState != OfflinePartition && targetState == OfflinePartition)
{
+ offlinePartitionCount = offlinePartitionCount + 1
+ } else if (currentState == OfflinePartition && targetState !=
OfflinePartition) {
+ offlinePartitionCount = offlinePartitionCount - 1
+ }
+ }
+ }
+
/**
* This API exercises the partition's state machine. It ensures that every
state transition happens from a legal
* previous state to the target state. Valid state transitions are:
@@ -158,7 +181,7 @@ class PartitionStateMachine(config: KafkaConfig,
validPartitions.foreach { partition =>
stateChangeLog.trace(s"Changed partition $partition state from
${partitionState(partition)} to $targetState with " +
s"assigned replicas
${controllerContext.partitionReplicaAssignment(partition).mkString(",")}")
- partitionState.put(partition, NewPartition)
+ changeStateTo(partition, partitionState(partition), NewPartition)
}
case OnlinePartition =>
val uninitializedPartitions = validPartitions.filter(partition =>
partitionState(partition) == NewPartition)
@@ -168,7 +191,7 @@ class PartitionStateMachine(config: KafkaConfig,
successfulInitializations.foreach { partition =>
stateChangeLog.trace(s"Changed partition $partition from
${partitionState(partition)} to $targetState with state " +
s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
- partitionState.put(partition, OnlinePartition)
+ changeStateTo(partition, partitionState(partition),
OnlinePartition)
}
}
if (partitionsToElectLeader.nonEmpty) {
@@ -176,18 +199,18 @@ class PartitionStateMachine(config: KafkaConfig,
successfulElections.foreach { partition =>
stateChangeLog.trace(s"Changed partition $partition from
${partitionState(partition)} to $targetState with state " +
s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
- partitionState.put(partition, OnlinePartition)
+ changeStateTo(partition, partitionState(partition),
OnlinePartition)
}
}
case OfflinePartition =>
validPartitions.foreach { partition =>
stateChangeLog.trace(s"Changed partition $partition state from
${partitionState(partition)} to $targetState")
- partitionState.put(partition, OfflinePartition)
+ changeStateTo(partition, partitionState(partition), OfflinePartition)
}
case NonExistentPartition =>
validPartitions.foreach { partition =>
stateChangeLog.trace(s"Changed partition $partition state from
${partitionState(partition)} to $targetState")
- partitionState.put(partition, NonExistentPartition)
+ changeStateTo(partition, partitionState(partition),
NonExistentPartition)
}
}
}
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index c9f0640b139..1ab8a43dfde 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -207,7 +207,7 @@ class ReplicaStateMachine(config: KafkaConfig,
}
val updatedLeaderIsrAndControllerEpochs =
removeReplicasFromIsr(replicaId,
replicasWithLeadershipInfo.map(_.topicPartition))
updatedLeaderIsrAndControllerEpochs.foreach { case (partition,
leaderIsrAndControllerEpoch) =>
- if (!topicDeletionManager.isPartitionToBeDeleted(partition)) {
+ if
(!topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)) {
val recipients =
controllerContext.partitionReplicaAssignment(partition).filterNot(_ ==
replicaId)
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipients,
partition,
@@ -301,7 +301,7 @@ class ReplicaStateMachine(config: KafkaConfig,
val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry,
failedUpdates) = zkClient.updateLeaderAndIsr(
adjustedLeaderAndIsrs, controllerContext.epoch)
val exceptionsForPartitionsWithNoLeaderAndIsrInZk =
partitionsWithNoLeaderAndIsrInZk.flatMap { partition =>
- if (!topicDeletionManager.isPartitionToBeDeleted(partition)) {
+ if (!topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)) {
val exception = new StateChangeFailedException(s"Failed to change
state of replica $replicaId for partition $partition since the leader and isr
path in zookeeper is empty")
Option(partition -> exception)
} else None
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 6e145516cce..8d93ef2fa2d 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -62,13 +62,32 @@ class TopicDeletionManager(controller: KafkaController,
val controllerContext = controller.controllerContext
val isDeleteTopicEnabled = controller.config.deleteTopicEnable
val topicsToBeDeleted = mutable.Set.empty[String]
- val partitionsToBeDeleted = mutable.Set.empty[TopicPartition]
+ /** The following topicsWithDeletionStarted variable is used to properly
update the offlinePartitionCount metric.
+ * When a topic is going through deletion, we don't want to keep track of
its partition state
+ * changes in the offlinePartitionCount metric, see the
PartitionStateMachine#updateControllerMetrics
+ * for detailed logic. This goal means if some partitions of a topic are
already
+ * in OfflinePartition state when deletion starts, we need to change the
corresponding partition
+ * states to NonExistentPartition first before starting the deletion.
+ *
+ * However we can NOT change partition states to NonExistentPartition at
the time of enqueuing topics
+ * for deletion. The reason is that when a topic is enqueued for deletion,
it may be ineligible for
+ * deletion due to ongoing partition reassignments. Hence there might be a
delay between enqueuing
+ * a topic for deletion and the actual start of deletion. In this delayed
interval, partitions may still
+ * transition to or out of the OfflinePartition state.
+ *
+ * Hence we decide to change partition states to NonExistentPartition only
when the actual deletion have started.
+ * For topics whose deletion have actually started, we keep track of them
in the following topicsWithDeletionStarted
+ * variable. And once a topic is in the topicsWithDeletionStarted set, we
are sure there will no longer
+ * be partition reassignments to any of its partitions, and only then it's
safe to move its partitions to
+ * NonExistentPartition state. Once a topic is in the
topicsWithDeletionStarted set, we will stop monitoring
+ * its partition state changes in the offlinePartitionCount metric
+ */
+ val topicsWithDeletionStarted = mutable.Set.empty[String]
val topicsIneligibleForDeletion = mutable.Set.empty[String]
def init(initialTopicsToBeDeleted: Set[String],
initialTopicsIneligibleForDeletion: Set[String]): Unit = {
if (isDeleteTopicEnabled) {
topicsToBeDeleted ++= initialTopicsToBeDeleted
- partitionsToBeDeleted ++=
topicsToBeDeleted.flatMap(controllerContext.partitionsForTopic)
topicsIneligibleForDeletion ++= initialTopicsIneligibleForDeletion &
topicsToBeDeleted
} else {
// if delete topic is disabled clean the topic entries under
/admin/delete_topics
@@ -89,7 +108,7 @@ class TopicDeletionManager(controller: KafkaController,
def reset() {
if (isDeleteTopicEnabled) {
topicsToBeDeleted.clear()
- partitionsToBeDeleted.clear()
+ topicsWithDeletionStarted.clear()
topicsIneligibleForDeletion.clear()
}
}
@@ -103,7 +122,6 @@ class TopicDeletionManager(controller: KafkaController,
def enqueueTopicsForDeletion(topics: Set[String]) {
if (isDeleteTopicEnabled) {
topicsToBeDeleted ++= topics
- partitionsToBeDeleted ++=
topics.flatMap(controllerContext.partitionsForTopic)
resumeDeletions()
}
}
@@ -173,9 +191,9 @@ class TopicDeletionManager(controller: KafkaController,
false
}
- def isPartitionToBeDeleted(topicAndPartition: TopicPartition) = {
+ def isTopicWithDeletionStarted(topic: String) = {
if (isDeleteTopicEnabled) {
- partitionsToBeDeleted.contains(topicAndPartition)
+ topicsWithDeletionStarted.contains(topic)
} else
false
}
@@ -231,12 +249,8 @@ class TopicDeletionManager(controller: KafkaController,
val replicasForDeletedTopic =
controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
// controller will remove this replica from the state machine as well as
its partition assignment cache
controller.replicaStateMachine.handleStateChanges(replicasForDeletedTopic.toSeq,
NonExistentReplica)
- val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)
- // move respective partition to OfflinePartition and NonExistentPartition
state
-
controller.partitionStateMachine.handleStateChanges(partitionsForDeletedTopic.toSeq,
OfflinePartition)
-
controller.partitionStateMachine.handleStateChanges(partitionsForDeletedTopic.toSeq,
NonExistentPartition)
topicsToBeDeleted -= topic
- partitionsToBeDeleted.retain(_.topic != topic)
+ topicsWithDeletionStarted -= topic
zkClient.deleteTopicZNode(topic)
zkClient.deleteTopicConfigs(Seq(topic))
zkClient.deleteTopicDeletions(Seq(topic))
@@ -254,6 +268,16 @@ class TopicDeletionManager(controller: KafkaController,
info(s"Topic deletion callback for ${topics.mkString(",")}")
// send update metadata so that brokers stop serving data for topics to be
deleted
val partitions = topics.flatMap(controllerContext.partitionsForTopic)
+ val unseenTopicsForDeletion = topics -- topicsWithDeletionStarted
+ if (unseenTopicsForDeletion.nonEmpty) {
+ val unseenPartitionsForDeletion =
unseenTopicsForDeletion.flatMap(controllerContext.partitionsForTopic)
+
controller.partitionStateMachine.handleStateChanges(unseenPartitionsForDeletion.toSeq,
OfflinePartition)
+
controller.partitionStateMachine.handleStateChanges(unseenPartitionsForDeletion.toSeq,
NonExistentPartition)
+ // adding of unseenTopicsForDeletion to topicsBeingDeleted must be done
after the partition state changes
+ // to make sure the offlinePartitionCount metric is properly updated
+ topicsWithDeletionStarted ++= unseenTopicsForDeletion
+ }
+
controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
partitions)
topics.foreach { topic =>
onPartitionDeletion(controllerContext.partitionsForTopic(topic))
@@ -283,9 +307,9 @@ class TopicDeletionManager(controller: KafkaController,
val successfullyDeletedReplicas =
controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
val replicasForDeletionRetry = aliveReplicasForTopic --
successfullyDeletedReplicas
// move dead replicas directly to failed state
-
controller.replicaStateMachine.handleStateChanges(deadReplicasForTopic.toSeq,
ReplicaDeletionIneligible)
+
controller.replicaStateMachine.handleStateChanges(deadReplicasForTopic.toSeq,
ReplicaDeletionIneligible, new Callbacks())
// send stop replica to all followers that are not in the OfflineReplica
state so they stop sending fetch requests to the leader
-
controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq,
OfflineReplica)
+
controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq,
OfflineReplica, new Callbacks())
debug(s"Deletion started for replicas
${replicasForDeletionRetry.mkString(",")}")
controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq,
ReplicaDeletionStarted,
new Callbacks(stopReplicaResponseCallback = (stopReplicaResponseObj,
replicaId) =>
diff --git
a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index 52f459970d1..6a587f3bd34 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -55,8 +55,9 @@ class PartitionStateMachineTest extends JUnitSuite {
mockControllerBrokerRequestBatch =
EasyMock.createMock(classOf[ControllerBrokerRequestBatch])
mockTopicDeletionManager =
EasyMock.createMock(classOf[TopicDeletionManager])
partitionState = mutable.Map.empty[TopicPartition, PartitionState]
- partitionStateMachine = new PartitionStateMachine(config, new
StateChangeLogger(brokerId, true, None), controllerContext,
mockTopicDeletionManager,
+ partitionStateMachine = new PartitionStateMachine(config, new
StateChangeLogger(brokerId, true, None), controllerContext,
mockZkClient, partitionState, mockControllerBrokerRequestBatch)
+ partitionStateMachine.setTopicDeletionManager(mockTopicDeletionManager)
}
@Test
@@ -312,4 +313,147 @@ class PartitionStateMachineTest extends JUnitSuite {
assertEquals(OfflinePartition, partitionState(partition))
}
+ private def prepareMockToElectLeaderForPartitions(partitions:
Seq[TopicPartition]): Unit = {
+ val leaderAndIsr = LeaderAndIsr(brokerId, List(brokerId))
+ def prepareMockToGetTopicPartitionsStatesRaw(): Unit = {
+ val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+ val leaderIsrAndControllerEpoch =
LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
+ val getDataResponses = partitions.map {p => GetDataResponse(Code.OK,
null, Some(p),
+ TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat,
ResponseMetadata(0, 0))}
+ EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
+ .andReturn(getDataResponses)
+ }
+ prepareMockToGetTopicPartitionsStatesRaw()
+
+ def prepareMockToGetLogConfigs(): Unit = {
+ val topicsForPartitionsWithNoLiveInSyncReplicas = Seq()
+
EasyMock.expect(mockZkClient.getLogConfigs(topicsForPartitionsWithNoLiveInSyncReplicas,
config.originals()))
+ .andReturn(Map.empty, Map.empty)
+ }
+ prepareMockToGetLogConfigs()
+
+ def prepareMockToUpdateLeaderAndIsr(): Unit = {
+ val updatedLeaderAndIsr = partitions.map { partition =>
+ partition -> leaderAndIsr.newLeaderAndIsr(brokerId, List(brokerId))
+ }.toMap
+ EasyMock.expect(mockZkClient.updateLeaderAndIsr(updatedLeaderAndIsr,
controllerEpoch))
+ .andReturn(UpdateLeaderAndIsrResult(updatedLeaderAndIsr, Seq.empty,
Map.empty))
+ }
+ prepareMockToUpdateLeaderAndIsr()
+ }
+
+ /**
+ * This method tests changing partitions' state to OfflinePartition
increments the offlinePartitionCount,
+ * and changing their state back to OnlinePartition decrements the
offlinePartitionCount
+ */
+ @Test
+ def testUpdatingOfflinePartitionsCount(): Unit = {
+ controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId,
"host", 0))
+
+ val partitionIds = Seq(0, 1, 2, 3)
+ val topic = "test"
+ val partitions = partitionIds.map(new TopicPartition("test", _))
+
+ partitions.foreach { partition =>
+ controllerContext.updatePartitionReplicaAssignment(partition,
Seq(brokerId))
+ }
+
+
EasyMock.expect(mockTopicDeletionManager.isTopicWithDeletionStarted(topic)).andReturn(false)
+ EasyMock.expectLastCall().anyTimes()
+ prepareMockToElectLeaderForPartitions(partitions)
+ EasyMock.replay(mockZkClient, mockTopicDeletionManager)
+
+ partitionStateMachine.handleStateChanges(partitions, NewPartition)
+ partitionStateMachine.handleStateChanges(partitions, OfflinePartition)
+ assertEquals(s"There should be ${partitions.size} offline partition(s)",
partitions.size, partitionStateMachine.offlinePartitionCount)
+
+ partitionStateMachine.handleStateChanges(partitions, OnlinePartition,
Some(OfflinePartitionLeaderElectionStrategy))
+ assertEquals(s"There should be no offline partition(s)", 0,
partitionStateMachine.offlinePartitionCount)
+ }
+
+ /**
+ * This method tests if a topic is being deleted, then changing partitions'
state to OfflinePartition makes no change
+ * to the offlinePartitionCount
+ */
+ @Test
+ def testNoOfflinePartitionsChangeForTopicsBeingDeleted() = {
+ val partitionIds = Seq(0, 1, 2, 3)
+ val topic = "test"
+ val partitions = partitionIds.map(new TopicPartition("test", _))
+
+
EasyMock.expect(mockTopicDeletionManager.isTopicWithDeletionStarted(topic)).andReturn(true)
+ EasyMock.expectLastCall().anyTimes()
+ EasyMock.replay(mockTopicDeletionManager)
+
+ partitionStateMachine.handleStateChanges(partitions, NewPartition)
+ partitionStateMachine.handleStateChanges(partitions, OfflinePartition)
+ assertEquals(s"There should be no offline partition(s)", 0,
partitionStateMachine.offlinePartitionCount)
+ }
+
+ /**
+ * This method tests if some partitions are already in OfflinePartition
state,
+ * then deleting their topic will decrement the offlinePartitionCount.
+ * For example, if partitions test-0, test-1, test-2, test-3 are in
OfflinePartition state,
+ * and the offlinePartitionCount is 4, trying to delete the topic "test"
means these
+ * partitions no longer qualify as offline-partitions, and the
offlinePartitionCount
+ * should be decremented to 0.
+ */
+ @Test
+ def testUpdatingOfflinePartitionsCountDuringTopicDeletion() = {
+ val partitionIds = Seq(0, 1, 2, 3)
+ val topic = "test"
+ val partitions = partitionIds.map(new TopicPartition("test", _))
+ partitions.foreach { partition =>
+ controllerContext.updatePartitionReplicaAssignment(partition,
Seq(brokerId))
+ }
+
+ val props = TestUtils.createBrokerConfig(brokerId, "zkConnect")
+ props.put(KafkaConfig.DeleteTopicEnableProp, "true")
+
+ val customConfig = KafkaConfig.fromProps(props)
+
+ def createMockReplicaStateMachine() = {
+ val replicaStateMachine: ReplicaStateMachine =
EasyMock.createMock(classOf[ReplicaStateMachine])
+
EasyMock.expect(replicaStateMachine.areAllReplicasForTopicDeleted(topic)).andReturn(false).anyTimes()
+
EasyMock.expect(replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)).andReturn(false).anyTimes()
+ EasyMock.expect(replicaStateMachine.isAnyReplicaInState(topic,
ReplicaDeletionIneligible)).andReturn(false).anyTimes()
+ EasyMock.expect(replicaStateMachine.replicasInState(topic,
ReplicaDeletionIneligible)).andReturn(Set.empty).anyTimes()
+ EasyMock.expect(replicaStateMachine.replicasInState(topic,
ReplicaDeletionStarted)).andReturn(Set.empty).anyTimes()
+ EasyMock.expect(replicaStateMachine.replicasInState(topic,
ReplicaDeletionSuccessful)).andReturn(Set.empty).anyTimes()
+
EasyMock.expect(replicaStateMachine.handleStateChanges(EasyMock.anyObject[Seq[PartitionAndReplica]],
+ EasyMock.anyObject[ReplicaState], EasyMock.anyObject[Callbacks]))
+
+ EasyMock.expectLastCall().anyTimes()
+ replicaStateMachine
+ }
+ val replicaStateMachine = createMockReplicaStateMachine()
+ partitionStateMachine = new PartitionStateMachine(customConfig, new
StateChangeLogger(brokerId, true, None), controllerContext,
+ mockZkClient, partitionState, mockControllerBrokerRequestBatch)
+
+ def createMockController() = {
+ val mockController = EasyMock.createMock(classOf[KafkaController])
+
EasyMock.expect(mockController.controllerContext).andReturn(controllerContext).anyTimes()
+ EasyMock.expect(mockController.config).andReturn(customConfig).anyTimes()
+
EasyMock.expect(mockController.partitionStateMachine).andReturn(partitionStateMachine).anyTimes()
+
EasyMock.expect(mockController.replicaStateMachine).andReturn(replicaStateMachine).anyTimes()
+ EasyMock.expect(mockController.sendUpdateMetadataRequest(Seq.empty,
partitions.toSet))
+ EasyMock.expectLastCall().anyTimes()
+ mockController
+ }
+
+ val mockController = createMockController()
+ val mockEventManager = EasyMock.createMock(classOf[ControllerEventManager])
+ EasyMock.replay(mockController, replicaStateMachine, mockEventManager)
+
+ val topicDeletionManager = new TopicDeletionManager(mockController,
mockEventManager, mockZkClient)
+ partitionStateMachine.setTopicDeletionManager(topicDeletionManager)
+
+ partitionStateMachine.handleStateChanges(partitions, NewPartition)
+ partitionStateMachine.handleStateChanges(partitions, OfflinePartition)
+ assertEquals(s"There should be ${partitions.size} offline partition(s)",
partitions.size, mockController.partitionStateMachine.offlinePartitionCount)
+
+ topicDeletionManager.enqueueTopicsForDeletion(Set(topic))
+ assertEquals(s"There should be no offline partition(s)", 0,
partitionStateMachine.offlinePartitionCount)
+ }
+
}
diff --git
a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
index 14d2df2e8f8..c573c9f041e 100644
--- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -185,7 +185,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat,
ResponseMetadata(0, 0))))
EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition ->
adjustedLeaderAndIsr), controllerEpoch))
.andReturn(UpdateLeaderAndIsrResult(Map(partition ->
updatedLeaderAndIsr), Seq.empty, Map.empty))
-
EasyMock.expect(mockTopicDeletionManager.isPartitionToBeDeleted(partition)).andReturn(false)
+
EasyMock.expect(mockTopicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)).andReturn(false)
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
partition, updatedLeaderIsrAndControllerEpoch, replicaIds, isNew =
false))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
diff --git
a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
index 2fcc724f513..c1d310f93e8 100644
---
a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
+++
b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
@@ -34,32 +34,32 @@ class MetricsDuringTopicCreationDeletionTest extends
KafkaServerTestHarness with
private val replicationFactor = 3
private val partitionNum = 3
private val createDeleteIterations = 3
-
+
private val overridingProps = new Properties
overridingProps.put(KafkaConfig.DeleteTopicEnableProp, "true")
overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, "false")
- // speed up the test for UnderReplicatedPartitions
+ // speed up the test for UnderReplicatedPartitions
// which relies on the ISR expiry thread to execute concurrently with topic
creation
- overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, "2000")
+ overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, "2000")
private val testedMetrics =
List("OfflinePartitionsCount","PreferredReplicaImbalanceCount","UnderReplicatedPartitions")
private val topics = List.tabulate(topicNum) (n => topicName + n)
@volatile private var running = true
-
+
override def generateConfigs = TestUtils.createBrokerConfigs(nodesNum,
zkConnect)
.map(KafkaConfig.fromProps(_, overridingProps))
@Before
override def setUp {
- // Do some Metrics Registry cleanup by removing the metrics that this test
checks.
+ // Do some Metrics Registry cleanup by removing the metrics that this test
checks.
// This is a test workaround to the issue that prior harness runs may have
left a populated registry.
// see https://issues.apache.org/jira/browse/KAFKA-4605
for (m <- testedMetrics) {
val metricName =
Metrics.defaultRegistry.allMetrics.asScala.keys.find(_.getName.endsWith(m))
metricName.foreach(Metrics.defaultRegistry.removeMetric)
}
-
+
super.setUp
}
@@ -70,7 +70,7 @@ class MetricsDuringTopicCreationDeletionTest extends
KafkaServerTestHarness with
def testMetricsDuringTopicCreateDelete() {
// For UnderReplicatedPartitions, because of
https://issues.apache.org/jira/browse/KAFKA-4605
- // we can't access the metrics value of each server. So instead we
directly invoke the method
+ // we can't access the metrics value of each server. So instead we
directly invoke the method
// replicaManager.underReplicatedPartitionCount() that defines the metrics
value.
@volatile var underReplicatedPartitionCount = 0
@@ -116,7 +116,7 @@ class MetricsDuringTopicCreationDeletionTest extends
KafkaServerTestHarness with
// if the thread checking the gauge is still run, stop it
running = false;
thread.join
-
+
assert(offlinePartitionsCount==0, "OfflinePartitionCount not 0: "+
offlinePartitionsCount)
assert(preferredReplicaImbalanceCount==0, "PreferredReplicaImbalanceCount
not 0: " + preferredReplicaImbalanceCount)
assert(underReplicatedPartitionCount==0, "UnderReplicatedPartitionCount
not 0: " + underReplicatedPartitionCount)
@@ -129,7 +129,7 @@ class MetricsDuringTopicCreationDeletionTest extends
KafkaServerTestHarness with
.getOrElse { fail( "Unable to find metric " + metricName ) }
._2.asInstanceOf[Gauge[Int]]
}
-
+
private def createDeleteTopics() {
for (l <- 1 to createDeleteIterations if running) {
// Create topics
@@ -140,17 +140,16 @@ class MetricsDuringTopicCreationDeletionTest extends
KafkaServerTestHarness with
case e: Exception => e.printStackTrace
}
}
- Thread.sleep(500)
// Delete topics
for (t <- topics if running) {
try {
- adminZkClient.deleteTopic(t)
+ adminZkClient.deleteTopic(t)
+ TestUtils.verifyTopicDeletion(zkClient, t, partitionNum, servers)
} catch {
case e: Exception => e.printStackTrace
}
}
- Thread.sleep(500)
}
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Speed up event processing on the controller
> --------------------------------------------
>
> Key: KAFKA-6753
> URL: https://issues.apache.org/jira/browse/KAFKA-6753
> Project: Kafka
> Issue Type: Improvement
> Reporter: Lucas Wang
> Assignee: Lucas Wang
> Priority: Minor
> Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event.
> This can slow down event processing on the controller tremendously. In one
> profiling we see that updating metrics takes nearly 100% of the CPU for the
> controller event processing thread. Specifically the slowness can be
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to
> calculate the offline partitions count requires iterating through all the
> partitions in the cluster to check if the partition is offline; and
> calculating the preferred replica imbalance count requires iterating through
> all the partitions in the cluster to check if a partition has a leader other
> than the preferred leader. In a large cluster, the number of partitions can
> be quite large, all seen by the controller. Even if the time spent to check a
> single partition is small, the accumulation effect of so many partitions in
> the cluster can make the invocation to update metrics quite expensive. One
> might argue that maybe the logic for processing each single partition is not
> optimized, we checked the CPU percentage of leaf nodes in the profiling
> result, and found that inside the loops of collection objects, e.g. the set
> of all partitions, no single function dominates the processing. Hence the
> large number of the partitions in a cluster is the main contributor to the
> slowness of one invocation to update the metrics.
> 2. The invocation to update metrics is called many times when the is a high
> number of events to be processed by the controller, one invocation after
> processing any event.
> The patch that will be submitted tries to fix bullet 2 above, i.e. reducing
> the number of invocations to update metrics. Instead of updating the metrics
> after processing any event, we only periodically check if the metrics needs
> to be updated, i.e. once every second.
> * If after the previous invocation to update metrics, there are other types
> of events that changed the controller’s state, then one second later the
> metrics will be updated.
> * If after the previous invocation, there has been no other types of events,
> then the call to update metrics can be bypassed.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)