dajac commented on code in PR #12187: URL: https://github.com/apache/kafka/pull/12187#discussion_r880193539
########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -2134,20 +2141,22 @@ class ReplicaManager(val config: KafkaConfig, stateChangeLogger.trace(s"Unable to start fetching $tp with topic " + s"ID ${info.topicId} because the replica manager is shutting down.") } else { - val leader = info.partition.leader - if (newImage.cluster.broker(leader) == null) { - stateChangeLogger.trace(s"Unable to start fetching $tp with topic ID ${info.topicId} " + - s"from leader $leader because it is not alive.") - - // Create the local replica even if the leader is unavailable. This is required - // to ensure that we include the partition's high watermark in the checkpoint - // file (see KAFKA-1647). - partition.createLogIfNotExists(isNew, false, offsetCheckpoints, Some(info.topicId)) - } else { - val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew) - if (partition.makeFollower(state, offsetCheckpoints, Some(info.topicId))) { - partitionsToMakeFollower.put(tp, partition) - } + // We always update the follower state. + // - This ensure that a replica with no leader can step down; + // - This also ensures that the local replica is created even if the leader + // is unavailable. This is required to ensure that we include the partition's + // high watermark in the checkpoint file (see KAFKA-1647). + val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew) + val isNewLeaderEpoch = partition.makeFollower(state, offsetCheckpoints, Some(info.topicId)) + + if (isInControlledShutdown && (info.partition.leader == NO_LEADER || + !info.partition.isr.contains(config.brokerId))) { + // During controlled shutdown, replica with no leaders and replica + // where this broker is not in the ISR are stopped. + partitionsToStop.put(tp, false) + } else if (isNewLeaderEpoch) { + // Otherwise, fetcher is restarted if the leader epoch has changed. + partitionsToStart.put(tp, partition) } } changedPartitions.add(partition) Review Comment: I don't have a strong opinion on this but removing unused logic seems appropriate. That will force us to think it through when we implement it. Do you mind if we do this separately? I would like to keep this PR focused on its primary goal. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org