junrao commented on a change in pull request #11216:
URL: https://github.com/apache/kafka/pull/11216#discussion_r689974956



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -2074,48 +2073,23 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  private[kafka] def calculateDeltaChanges(delta: TopicsDelta)
-    : (mutable.HashMap[TopicPartition, Boolean],
-       mutable.HashMap[TopicPartition, LocalLeaderInfo],
-       mutable.HashMap[TopicPartition, LocalLeaderInfo]) = {
-    val deleted = new mutable.HashMap[TopicPartition, Boolean]()
-    delta.deletedTopicIds().forEach { topicId =>
-      val topicImage = delta.image().getTopic(topicId)
-      topicImage.partitions().keySet().forEach { partitionId =>
-        deleted.put(new TopicPartition(topicImage.name(), partitionId), true)
-      }
-    }
-    val newLocalLeaders = new mutable.HashMap[TopicPartition, 
LocalLeaderInfo]()
-    val newLocalFollowers = new mutable.HashMap[TopicPartition, 
LocalLeaderInfo]()
-    delta.changedTopics().values().forEach { topicDelta =>
-      topicDelta.newLocalLeaders(config.nodeId).forEach { e =>
-        newLocalLeaders.put(new TopicPartition(topicDelta.name(), e.getKey),
-          LocalLeaderInfo(topicDelta.id(), e.getValue))
-      }
-      topicDelta.newLocalFollowers(config.nodeId).forEach { e =>
-        newLocalFollowers.put(new TopicPartition(topicDelta.name(), e.getKey),
-          LocalLeaderInfo(topicDelta.id(), e.getValue))
-      }
-    }
-    (deleted, newLocalLeaders, newLocalFollowers)
-  }
-
   /**
    * Apply a KRaft topic change delta.
    *
    * @param newImage        The new metadata image.
    * @param delta           The delta to apply.
    */
   def applyDelta(newImage: MetadataImage, delta: TopicsDelta): Unit = {
-    // Before taking the lock, build some hash maps that we will need.
-    val (deleted, newLocalLeaders, newLocalFollowers) = 
calculateDeltaChanges(delta)
+    // Before taking the lock, compute the local changes
+    val localChanges = delta.localChanges(config.nodeId)
 
     replicaStateChangeLock.synchronized {
       // Handle deleted partitions. We need to do this first because we might 
subsequently
       // create new partitions with the same names as the ones we are deleting 
here.
-      if (!deleted.isEmpty) {
-        stateChangeLogger.info(s"Deleting ${deleted.size} partition(s).")
-        stopPartitions(deleted).foreach { case (topicPartition, e) =>
+      if (!localChanges.deletes.isEmpty) {

Review comment:
       Is it always correct to process the deletes first? For example, a topic 
could be first created and then deleted. By processing the deletes first, we 
would be keeping the partition and the log when they should be deleted?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to