jsancio commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r689978096
########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -2074,48 +2073,23 @@ class ReplicaManager(val config: KafkaConfig, } } - private[kafka] def calculateDeltaChanges(delta: TopicsDelta) - : (mutable.HashMap[TopicPartition, Boolean], - mutable.HashMap[TopicPartition, LocalLeaderInfo], - mutable.HashMap[TopicPartition, LocalLeaderInfo]) = { - val deleted = new mutable.HashMap[TopicPartition, Boolean]() - delta.deletedTopicIds().forEach { topicId => - val topicImage = delta.image().getTopic(topicId) - topicImage.partitions().keySet().forEach { partitionId => - deleted.put(new TopicPartition(topicImage.name(), partitionId), true) - } - } - val newLocalLeaders = new mutable.HashMap[TopicPartition, LocalLeaderInfo]() - val newLocalFollowers = new mutable.HashMap[TopicPartition, LocalLeaderInfo]() - delta.changedTopics().values().forEach { topicDelta => - topicDelta.newLocalLeaders(config.nodeId).forEach { e => - newLocalLeaders.put(new TopicPartition(topicDelta.name(), e.getKey), - LocalLeaderInfo(topicDelta.id(), e.getValue)) - } - topicDelta.newLocalFollowers(config.nodeId).forEach { e => - newLocalFollowers.put(new TopicPartition(topicDelta.name(), e.getKey), - LocalLeaderInfo(topicDelta.id(), e.getValue)) - } - } - (deleted, newLocalLeaders, newLocalFollowers) - } - /** * Apply a KRaft topic change delta. * * @param newImage The new metadata image. * @param delta The delta to apply. */ def applyDelta(newImage: MetadataImage, delta: TopicsDelta): Unit = { - // Before taking the lock, build some hash maps that we will need. - val (deleted, newLocalLeaders, newLocalFollowers) = calculateDeltaChanges(delta) + // Before taking the lock, compute the local changes + val localChanges = delta.localChanges(config.nodeId) replicaStateChangeLock.synchronized { // Handle deleted partitions. We need to do this first because we might subsequently // create new partitions with the same names as the ones we are deleting here. - if (!deleted.isEmpty) { - stateChangeLogger.info(s"Deleting ${deleted.size} partition(s).") - stopPartitions(deleted).foreach { case (topicPartition, e) => + if (!localChanges.deletes.isEmpty) { Review comment: If that happens then `TopicsDelta` would remove it from `changedTopics`: https://github.com/apache/kafka/pull/11216/files#diff-521beb14ca284c8e5ed56e92271216167da342c44b86992add15f33c39128ecbR93 Let me write a test that shows this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org