junrao commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r689974956
########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -2074,48 +2073,23 @@ class ReplicaManager(val config: KafkaConfig, } } - private[kafka] def calculateDeltaChanges(delta: TopicsDelta) - : (mutable.HashMap[TopicPartition, Boolean], - mutable.HashMap[TopicPartition, LocalLeaderInfo], - mutable.HashMap[TopicPartition, LocalLeaderInfo]) = { - val deleted = new mutable.HashMap[TopicPartition, Boolean]() - delta.deletedTopicIds().forEach { topicId => - val topicImage = delta.image().getTopic(topicId) - topicImage.partitions().keySet().forEach { partitionId => - deleted.put(new TopicPartition(topicImage.name(), partitionId), true) - } - } - val newLocalLeaders = new mutable.HashMap[TopicPartition, LocalLeaderInfo]() - val newLocalFollowers = new mutable.HashMap[TopicPartition, LocalLeaderInfo]() - delta.changedTopics().values().forEach { topicDelta => - topicDelta.newLocalLeaders(config.nodeId).forEach { e => - newLocalLeaders.put(new TopicPartition(topicDelta.name(), e.getKey), - LocalLeaderInfo(topicDelta.id(), e.getValue)) - } - topicDelta.newLocalFollowers(config.nodeId).forEach { e => - newLocalFollowers.put(new TopicPartition(topicDelta.name(), e.getKey), - LocalLeaderInfo(topicDelta.id(), e.getValue)) - } - } - (deleted, newLocalLeaders, newLocalFollowers) - } - /** * Apply a KRaft topic change delta. * * @param newImage The new metadata image. * @param delta The delta to apply. */ def applyDelta(newImage: MetadataImage, delta: TopicsDelta): Unit = { - // Before taking the lock, build some hash maps that we will need. - val (deleted, newLocalLeaders, newLocalFollowers) = calculateDeltaChanges(delta) + // Before taking the lock, compute the local changes + val localChanges = delta.localChanges(config.nodeId) replicaStateChangeLock.synchronized { // Handle deleted partitions. We need to do this first because we might subsequently // create new partitions with the same names as the ones we are deleting here. - if (!deleted.isEmpty) { - stateChangeLogger.info(s"Deleting ${deleted.size} partition(s).") - stopPartitions(deleted).foreach { case (topicPartition, e) => + if (!localChanges.deletes.isEmpty) { Review comment: Is it always correct to process the deletes first? For example, a topic could be first created and then deleted. By processing the deletes first, we would be keeping the partition and the log when they should be deleted? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org