chia7712 commented on code in PR #20335:
URL: https://github.com/apache/kafka/pull/20335#discussion_r2266039255


##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java:
##########
@@ -163,19 +165,21 @@ public void setup() throws IOException {
         for (int i = 0; i < partitionCount; i++) {
             TopicPartition tp = new TopicPartition("topic", i);
 
-            List<Integer> replicas = List.of(0, 1, 2);
-            PartitionState partitionState = new PartitionState()
+            int[] replicas = {0, 1, 2};
+            PartitionRegistration partitionRegistration = new 
PartitionRegistration.Builder()
                     .setLeader(0)

Review Comment:
   could you please also remove extra tab?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -800,23 +800,23 @@ class Partition(val topicPartition: TopicPartition,
             currentTimeMs,
             leaderEpochStartOffset,
             isNewLeader,
-            partitionState.isr.contains(replica.brokerId)
+            partitionRegistration.isr.contains(replica.brokerId)

Review Comment:
   Perhaps we could reuse the `isr` set?



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -135,7 +134,7 @@ class ReplicaManagerTest {
   private val quotaAvailableThrottleTime = 0
 
   // Constants defined for readability
-  private val zkVersion = 0
+  private val partitionEpoch = 0

Review Comment:
   nice



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -837,46 +837,47 @@ class Partition(val topicPartition: TopicPartition,
    * replica manager that state is already correct and the become-follower 
steps can
    * be skipped.
    */
-  def makeFollower(partitionState: JPartitionState,
+  def makeFollower(partitionRegistration: PartitionRegistration,
+                   isNew: Boolean,
                    highWatermarkCheckpoints: OffsetCheckpoints,
                    topicId: Option[Uuid],
                    targetLogDirectoryId: Option[Uuid] = None): Boolean = {
     inWriteLock(leaderIsrUpdateLock) {
-      if (partitionState.partitionEpoch < partitionEpoch) {
+      if (partitionRegistration.partitionEpoch < partitionEpoch) {
         stateChangeLogger.info(s"Skipped the become-follower state change for 
$topicPartition with topic id $topicId " +
-          s"and partition state $partitionState since the follower is already 
at a newer partition epoch $partitionEpoch.")
+          s"and partition registration $partitionRegistration since the 
follower is already at a newer partition epoch $partitionEpoch.")
         return false
       }
 
-      val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch
+      val isNewLeaderEpoch = partitionRegistration.leaderEpoch > leaderEpoch
       // The leader should be updated before updateAssignmentAndIsr where we 
clear the ISR. Or it is possible to meet
       // the under min isr condition during the makeFollower process and emits 
the wrong metric.
-      leaderReplicaIdOpt = Option(partitionState.leader)
-      leaderEpoch = partitionState.leaderEpoch
+      leaderReplicaIdOpt = Option(partitionRegistration.leader)
+      leaderEpoch = partitionRegistration.leaderEpoch
       leaderEpochStartOffsetOpt = None
-      partitionEpoch = partitionState.partitionEpoch
+      partitionEpoch = partitionRegistration.partitionEpoch
 
       updateAssignmentAndIsr(
-        replicas = partitionState.replicas.asScala.iterator.map(_.toInt).toSeq,
+        replicas = partitionRegistration.replicas,
         isLeader = false,
         isr = Set.empty,
-        addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt),
-        removingReplicas = 
partitionState.removingReplicas.asScala.map(_.toInt),
-        LeaderRecoveryState.of(partitionState.leaderRecoveryState)
+        addingReplicas = partitionRegistration.addingReplicas,
+        removingReplicas = partitionRegistration.removingReplicas,
+        partitionRegistration.leaderRecoveryState
       )
 
-      createLogInAssignedDirectoryId(partitionState, highWatermarkCheckpoints, 
topicId, targetLogDirectoryId)
+      createLogInAssignedDirectoryId(isNew, highWatermarkCheckpoints, topicId, 
targetLogDirectoryId)
 
       val followerLog = localLogOrException
       if (isNewLeaderEpoch) {
         val leaderEpochEndOffset = followerLog.logEndOffset
-        stateChangeLogger.info(s"Follower $topicPartition starts at leader 
epoch ${partitionState.leaderEpoch} from " +
-          s"offset $leaderEpochEndOffset with partition epoch 
${partitionState.partitionEpoch} and " +
-          s"high watermark ${followerLog.highWatermark}. Current leader is 
${partitionState.leader}. " +
+        stateChangeLogger.info(s"Follower $topicPartition starts at leader 
epoch ${partitionRegistration.leaderEpoch} from " +
+          s"offset $leaderEpochEndOffset with partition epoch 
${partitionRegistration.partitionEpoch} and " +
+          s"high watermark ${followerLog.highWatermark}. Current leader is 
${partitionRegistration.leader}. " +
           s"Previous leader $leaderReplicaIdOpt and previous leader epoch was 
$leaderEpoch.")
       } else {
         stateChangeLogger.info(s"Skipped the become-follower state change for 
$topicPartition with topic id $topicId " +
-          s"and partition state $partitionState since it is already a follower 
with leader epoch $leaderEpoch.")
+          s"and partition state $partitionRegistration since it is already a 
follower with leader epoch $leaderEpoch.")

Review Comment:
   ditto



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -837,46 +837,47 @@ class Partition(val topicPartition: TopicPartition,
    * replica manager that state is already correct and the become-follower 
steps can
    * be skipped.
    */
-  def makeFollower(partitionState: JPartitionState,
+  def makeFollower(partitionRegistration: PartitionRegistration,
+                   isNew: Boolean,
                    highWatermarkCheckpoints: OffsetCheckpoints,
                    topicId: Option[Uuid],
                    targetLogDirectoryId: Option[Uuid] = None): Boolean = {
     inWriteLock(leaderIsrUpdateLock) {
-      if (partitionState.partitionEpoch < partitionEpoch) {
+      if (partitionRegistration.partitionEpoch < partitionEpoch) {
         stateChangeLogger.info(s"Skipped the become-follower state change for 
$topicPartition with topic id $topicId " +
-          s"and partition state $partitionState since the follower is already 
at a newer partition epoch $partitionEpoch.")
+          s"and partition registration $partitionRegistration since the 
follower is already at a newer partition epoch $partitionEpoch.")

Review Comment:
   Could you please add `isNew` to the log?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to