TaiJuWu commented on code in PR #20335:
URL: https://github.com/apache/kafka/pull/20335#discussion_r2267161021


##########
core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala:
##########
@@ -86,31 +86,35 @@ class AssignmentStateTest extends AbstractPartitionTest {
 
   @ParameterizedTest
   @MethodSource(Array("parameters"))
-  def testPartitionAssignmentStatus(isr: util.List[Integer], replicas: 
util.List[Integer],
-                                    adding: util.List[Integer], removing: 
util.List[Integer],
+  def testPartitionAssignmentStatus(isr: Array[Int], replicas: Array[Int],
+                                    adding: Array[Int], removing: Array[Int],
                                     original: util.List[Int], 
isUnderReplicated: Boolean): Unit = {
-    val leaderState = new PartitionState()
+    val partitionRegistrationBuilder = new PartitionRegistration.Builder()
       .setLeader(brokerId)
+      .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
       .setLeaderEpoch(6)
       .setIsr(isr)
       .setPartitionEpoch(1)
       .setReplicas(replicas)
-      .setIsNew(false)
-    if (!adding.isEmpty)
-      leaderState.setAddingReplicas(adding)
-    if (!removing.isEmpty)
-      leaderState.setRemovingReplicas(removing)
+      .setDirectories(DirectoryId.unassignedArray(replicas.length))
+    if (adding.nonEmpty)
+      partitionRegistrationBuilder.setAddingReplicas(adding)
+//      leaderState.setAddingReplicas(adding)

Review Comment:
   This can be removed.



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -731,31 +730,32 @@ class Partition(val topicPartition: TopicPartition,
    * from the time when this broker was the leader last time) and setting the 
new leader and ISR.
    * If the leader replica id does not change, return false to indicate the 
replica manager.
    */
-  def makeLeader(partitionState: JPartitionState,
+  def makeLeader(partitionRegistration: PartitionRegistration,
+                 isNew: Boolean,
                  highWatermarkCheckpoints: OffsetCheckpoints,
                  topicId: Option[Uuid],
                  targetDirectoryId: Option[Uuid] = None): Boolean = {
     val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
       // Partition state changes are expected to have a partition epoch larger 
or equal
       // to the current partition epoch. The latter is allowed because the 
partition epoch
       // is also updated by the AlterPartition response so the new epoch might 
be known
-      // before a LeaderAndIsr request is received or before an update is 
received via
+      // before a partitionRegistration is received or before an update is 
received via
       // the metadata log.
-      if (partitionState.partitionEpoch < partitionEpoch) {
+      if (partitionRegistration.partitionEpoch < partitionEpoch) {
         stateChangeLogger.info(s"Skipped the become-leader state change for 
$topicPartition with topic id $topicId " +
-          s"and partition state $partitionState since the leader is already at 
a newer partition epoch $partitionEpoch.")
+          s"and partition state $partitionRegistration since the leader is 
already at a newer partition epoch $partitionEpoch.")

Review Comment:
   ditto



##########
core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala:
##########
@@ -86,31 +86,35 @@ class AssignmentStateTest extends AbstractPartitionTest {
 
   @ParameterizedTest
   @MethodSource(Array("parameters"))
-  def testPartitionAssignmentStatus(isr: util.List[Integer], replicas: 
util.List[Integer],
-                                    adding: util.List[Integer], removing: 
util.List[Integer],
+  def testPartitionAssignmentStatus(isr: Array[Int], replicas: Array[Int],
+                                    adding: Array[Int], removing: Array[Int],
                                     original: util.List[Int], 
isUnderReplicated: Boolean): Unit = {
-    val leaderState = new PartitionState()
+    val partitionRegistrationBuilder = new PartitionRegistration.Builder()
       .setLeader(brokerId)
+      .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
       .setLeaderEpoch(6)
       .setIsr(isr)
       .setPartitionEpoch(1)
       .setReplicas(replicas)
-      .setIsNew(false)
-    if (!adding.isEmpty)
-      leaderState.setAddingReplicas(adding)
-    if (!removing.isEmpty)
-      leaderState.setRemovingReplicas(removing)
+      .setDirectories(DirectoryId.unassignedArray(replicas.length))
+    if (adding.nonEmpty)
+      partitionRegistrationBuilder.setAddingReplicas(adding)
+//      leaderState.setAddingReplicas(adding)
+    if (removing.nonEmpty)
+      partitionRegistrationBuilder.setRemovingReplicas(removing)
+//      leaderState.setRemovingReplicas(removing)

Review Comment:
   This can be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to