TaiJuWu commented on code in PR #20335: URL: https://github.com/apache/kafka/pull/20335#discussion_r2267161021
########## core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala: ########## @@ -86,31 +86,35 @@ class AssignmentStateTest extends AbstractPartitionTest { @ParameterizedTest @MethodSource(Array("parameters")) - def testPartitionAssignmentStatus(isr: util.List[Integer], replicas: util.List[Integer], - adding: util.List[Integer], removing: util.List[Integer], + def testPartitionAssignmentStatus(isr: Array[Int], replicas: Array[Int], + adding: Array[Int], removing: Array[Int], original: util.List[Int], isUnderReplicated: Boolean): Unit = { - val leaderState = new PartitionState() + val partitionRegistrationBuilder = new PartitionRegistration.Builder() .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(6) .setIsr(isr) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(false) - if (!adding.isEmpty) - leaderState.setAddingReplicas(adding) - if (!removing.isEmpty) - leaderState.setRemovingReplicas(removing) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + if (adding.nonEmpty) + partitionRegistrationBuilder.setAddingReplicas(adding) +// leaderState.setAddingReplicas(adding) Review Comment: This can be removed. ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -731,31 +730,32 @@ class Partition(val topicPartition: TopicPartition, * from the time when this broker was the leader last time) and setting the new leader and ISR. * If the leader replica id does not change, return false to indicate the replica manager. */ - def makeLeader(partitionState: JPartitionState, + def makeLeader(partitionRegistration: PartitionRegistration, + isNew: Boolean, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetDirectoryId: Option[Uuid] = None): Boolean = { val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) { // Partition state changes are expected to have a partition epoch larger or equal // to the current partition epoch. The latter is allowed because the partition epoch // is also updated by the AlterPartition response so the new epoch might be known - // before a LeaderAndIsr request is received or before an update is received via + // before a partitionRegistration is received or before an update is received via // the metadata log. - if (partitionState.partitionEpoch < partitionEpoch) { + if (partitionRegistration.partitionEpoch < partitionEpoch) { stateChangeLogger.info(s"Skipped the become-leader state change for $topicPartition with topic id $topicId " + - s"and partition state $partitionState since the leader is already at a newer partition epoch $partitionEpoch.") + s"and partition state $partitionRegistration since the leader is already at a newer partition epoch $partitionEpoch.") Review Comment: ditto ########## core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala: ########## @@ -86,31 +86,35 @@ class AssignmentStateTest extends AbstractPartitionTest { @ParameterizedTest @MethodSource(Array("parameters")) - def testPartitionAssignmentStatus(isr: util.List[Integer], replicas: util.List[Integer], - adding: util.List[Integer], removing: util.List[Integer], + def testPartitionAssignmentStatus(isr: Array[Int], replicas: Array[Int], + adding: Array[Int], removing: Array[Int], original: util.List[Int], isUnderReplicated: Boolean): Unit = { - val leaderState = new PartitionState() + val partitionRegistrationBuilder = new PartitionRegistration.Builder() .setLeader(brokerId) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) .setLeaderEpoch(6) .setIsr(isr) .setPartitionEpoch(1) .setReplicas(replicas) - .setIsNew(false) - if (!adding.isEmpty) - leaderState.setAddingReplicas(adding) - if (!removing.isEmpty) - leaderState.setRemovingReplicas(removing) + .setDirectories(DirectoryId.unassignedArray(replicas.length)) + if (adding.nonEmpty) + partitionRegistrationBuilder.setAddingReplicas(adding) +// leaderState.setAddingReplicas(adding) + if (removing.nonEmpty) + partitionRegistrationBuilder.setRemovingReplicas(removing) +// leaderState.setRemovingReplicas(removing) Review Comment: This can be removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org