chia7712 commented on code in PR #20335: URL: https://github.com/apache/kafka/pull/20335#discussion_r2266039255
########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java: ########## @@ -163,19 +165,21 @@ public void setup() throws IOException { for (int i = 0; i < partitionCount; i++) { TopicPartition tp = new TopicPartition("topic", i); - List<Integer> replicas = List.of(0, 1, 2); - PartitionState partitionState = new PartitionState() + int[] replicas = {0, 1, 2}; + PartitionRegistration partitionRegistration = new PartitionRegistration.Builder() .setLeader(0) Review Comment: could you please also remove extra tab? ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -800,23 +800,23 @@ class Partition(val topicPartition: TopicPartition, currentTimeMs, leaderEpochStartOffset, isNewLeader, - partitionState.isr.contains(replica.brokerId) + partitionRegistration.isr.contains(replica.brokerId) Review Comment: Perhaps we could reuse the `isr` set? ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -135,7 +134,7 @@ class ReplicaManagerTest { private val quotaAvailableThrottleTime = 0 // Constants defined for readability - private val zkVersion = 0 + private val partitionEpoch = 0 Review Comment: nice ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -837,46 +837,47 @@ class Partition(val topicPartition: TopicPartition, * replica manager that state is already correct and the become-follower steps can * be skipped. */ - def makeFollower(partitionState: JPartitionState, + def makeFollower(partitionRegistration: PartitionRegistration, + isNew: Boolean, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid] = None): Boolean = { inWriteLock(leaderIsrUpdateLock) { - if (partitionState.partitionEpoch < partitionEpoch) { + if (partitionRegistration.partitionEpoch < partitionEpoch) { stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId " + - s"and partition state $partitionState since the follower is already at a newer partition epoch $partitionEpoch.") + s"and partition registration $partitionRegistration since the follower is already at a newer partition epoch $partitionEpoch.") return false } - val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch + val isNewLeaderEpoch = partitionRegistration.leaderEpoch > leaderEpoch // The leader should be updated before updateAssignmentAndIsr where we clear the ISR. Or it is possible to meet // the under min isr condition during the makeFollower process and emits the wrong metric. - leaderReplicaIdOpt = Option(partitionState.leader) - leaderEpoch = partitionState.leaderEpoch + leaderReplicaIdOpt = Option(partitionRegistration.leader) + leaderEpoch = partitionRegistration.leaderEpoch leaderEpochStartOffsetOpt = None - partitionEpoch = partitionState.partitionEpoch + partitionEpoch = partitionRegistration.partitionEpoch updateAssignmentAndIsr( - replicas = partitionState.replicas.asScala.iterator.map(_.toInt).toSeq, + replicas = partitionRegistration.replicas, isLeader = false, isr = Set.empty, - addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt), - removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt), - LeaderRecoveryState.of(partitionState.leaderRecoveryState) + addingReplicas = partitionRegistration.addingReplicas, + removingReplicas = partitionRegistration.removingReplicas, + partitionRegistration.leaderRecoveryState ) - createLogInAssignedDirectoryId(partitionState, highWatermarkCheckpoints, topicId, targetLogDirectoryId) + createLogInAssignedDirectoryId(isNew, highWatermarkCheckpoints, topicId, targetLogDirectoryId) val followerLog = localLogOrException if (isNewLeaderEpoch) { val leaderEpochEndOffset = followerLog.logEndOffset - stateChangeLogger.info(s"Follower $topicPartition starts at leader epoch ${partitionState.leaderEpoch} from " + - s"offset $leaderEpochEndOffset with partition epoch ${partitionState.partitionEpoch} and " + - s"high watermark ${followerLog.highWatermark}. Current leader is ${partitionState.leader}. " + + stateChangeLogger.info(s"Follower $topicPartition starts at leader epoch ${partitionRegistration.leaderEpoch} from " + + s"offset $leaderEpochEndOffset with partition epoch ${partitionRegistration.partitionEpoch} and " + + s"high watermark ${followerLog.highWatermark}. Current leader is ${partitionRegistration.leader}. " + s"Previous leader $leaderReplicaIdOpt and previous leader epoch was $leaderEpoch.") } else { stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId " + - s"and partition state $partitionState since it is already a follower with leader epoch $leaderEpoch.") + s"and partition state $partitionRegistration since it is already a follower with leader epoch $leaderEpoch.") Review Comment: ditto ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -837,46 +837,47 @@ class Partition(val topicPartition: TopicPartition, * replica manager that state is already correct and the become-follower steps can * be skipped. */ - def makeFollower(partitionState: JPartitionState, + def makeFollower(partitionRegistration: PartitionRegistration, + isNew: Boolean, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid] = None): Boolean = { inWriteLock(leaderIsrUpdateLock) { - if (partitionState.partitionEpoch < partitionEpoch) { + if (partitionRegistration.partitionEpoch < partitionEpoch) { stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId " + - s"and partition state $partitionState since the follower is already at a newer partition epoch $partitionEpoch.") + s"and partition registration $partitionRegistration since the follower is already at a newer partition epoch $partitionEpoch.") Review Comment: Could you please add `isNew` to the log? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org