junrao commented on code in PR #13489: URL: https://github.com/apache/kafka/pull/13489#discussion_r1159033215
########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -977,11 +982,17 @@ class Partition(val topicPartition: TopicPartition, private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = { metadataCache match { - // In KRaft mode, only replicas which are not fenced nor in controlled shutdown are - // allowed to join the ISR. + // In KRaft mode, only a replica which meets all of the following requirements is allowed to join the ISR. + // 1. It is not fenced. + // 2. It is not in controlled shutdown. + // 3. Its metadata cached broker epoch matches its Fetch request broker epoch. Or the Fetch + // request broker epoch is -1 which bypass the epoch verification. Review Comment: bypass => bypasses ########## core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala: ########## @@ -76,11 +78,56 @@ class AlterPartitionManagerTest { val scheduler = new MockScheduler(time) val alterPartitionManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => metadataVersion) alterPartitionManager.start() - alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3), LeaderRecoveryState.RECOVERED, 10), 0) + alterPartitionManager.submit(tp0, LeaderAndIsr(1, 1, List(1, 2, 3), LeaderRecoveryState.RECOVERED, 10), 0) verify(brokerToController).start() verify(brokerToController).sendRequest(any(), any()) } + @ParameterizedTest + @MethodSource(Array("provideMetadataVersions")) + def testBasicWithBrokerEpoch(metadataVersion: MetadataVersion): Unit = { + val scheduler = new MockScheduler(time) + val alterPartitionManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 101, () => metadataVersion) + alterPartitionManager.start() + val isrWithBrokerEpoch = ListBuffer[BrokerState]() + for (ii <- 1 to 3) { + isrWithBrokerEpoch += new BrokerState().setBrokerId(ii).setBrokerEpoch(100 + ii) + } + alterPartitionManager.submit(tp0, LeaderAndIsr(1, 1, LeaderRecoveryState.RECOVERED, isrWithBrokerEpoch.toList, 10), 0) + + val message = new AlterPartitionRequestData() + .setBrokerId(brokerId) + .setBrokerEpoch(101) + val topicData = new AlterPartitionRequestData.TopicData() + .setTopicName(topic) + .setTopicId(topicId) + + if (metadataVersion.isTopicIdsSupported()) { + val newIsrWithBrokerEpoch = new ListBuffer[BrokerState]() + newIsrWithBrokerEpoch.append(new BrokerState().setBrokerId(1).setBrokerEpoch(101)) + newIsrWithBrokerEpoch.append(new BrokerState().setBrokerId(2).setBrokerEpoch(102)) + newIsrWithBrokerEpoch.append(new BrokerState().setBrokerId(3).setBrokerEpoch(103)) + topicData.partitions.add(new AlterPartitionRequestData.PartitionData() + .setPartitionIndex(0) + .setLeaderEpoch(1) + .setPartitionEpoch(10) + .setNewIsrWithEpochs(newIsrWithBrokerEpoch.toList.asJava)) + } else { + topicData.partitions.add(new AlterPartitionRequestData.PartitionData() + .setPartitionIndex(0) + .setLeaderEpoch(1) + .setPartitionEpoch(10) + .setNewIsr(List(1, 2, 3).map(Integer.valueOf).asJava)) + } + + message.topics.add(topicData) + + verify(brokerToController).start() + val captor = ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]) + verify(brokerToController).sendRequest(captor.capture(), any()) + assertEquals(message.toString, captor.getValue.asInstanceOf[AlterPartitionRequest.Builder].build().toString) Review Comment: Instead of doing string comparison, could we make AlterPartitionRequest comparable? ########## core/src/test/scala/unit/kafka/cluster/PartitionTest.scala: ########## @@ -3167,7 +3307,8 @@ class PartitionTest extends AbstractPartitionTest { replicaId: Int, lastCaughtUpTimeMs: Long, logEndOffset: Long, - logStartOffset: Long + logStartOffset: Long, + brokerEpoch: Long = -2 Review Comment: Could we define a constant for -2 and give it a meaningful name? ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -1691,6 +1710,33 @@ class Partition(val topicPartition: TopicPartition, updatedState } + private def addBrokerEpochToIsr(isr: List[Int]): List[BrokerState] = { + isr.map(brokerId => { Review Comment: Could we do `isr.map { brokerId => ... }` instead? It's a bit simpler. ########## core/src/test/scala/unit/kafka/cluster/PartitionTest.scala: ########## @@ -1367,6 +1368,10 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(alterPartitionManager.isrUpdates.size, 1) val isrItem = alterPartitionManager.isrUpdates.head assertEquals(isrItem.leaderAndIsr.isr, List(brokerId, remoteBrokerId)) + isrItem.leaderAndIsr.isrWithBrokerEpoch.foreach(brokerState => { Review Comment: Could we do `foreach { brokerState => ... }`? ########## core/src/test/scala/unit/kafka/cluster/PartitionTest.scala: ########## @@ -1744,9 +1873,13 @@ class PartitionTest extends AbstractPartitionTest { partition.maybeShrinkIsr() assertEquals(0, alterPartitionListener.shrinks.get) assertEquals(alterPartitionManager.isrUpdates.size, 1) - assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr, List(brokerId)) - assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.isr) - assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr) + assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr, List(brokerId, remoteBrokerId1)) + val isrUpdate = alterPartitionManager.isrUpdates.head + isrUpdate.leaderAndIsr.isrWithBrokerEpoch.foreach(brokerState => { Review Comment: Could we do `foreach { brokerState => ... }`? ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -1691,6 +1710,33 @@ class Partition(val topicPartition: TopicPartition, updatedState } + private def addBrokerEpochToIsr(isr: List[Int]): List[BrokerState] = { + isr.map(brokerId => { + val brokerState = new BrokerState().setBrokerId(brokerId) + if (!metadataCache.isInstanceOf[KRaftMetadataCache]) { + brokerState.setBrokerEpoch(-1) + } else if (brokerId == localBrokerId) { + brokerState.setBrokerEpoch(localBrokerEpochSupplier()) + } else { + try { + brokerState.setBrokerEpoch(remoteReplicasMap.get(brokerId).stateSnapshot.brokerEpoch.get) + } catch { + case _: Throwable => Review Comment: Could we explicitly check the presence of brokerEpoch instead of depending on an exception? ########## core/src/test/scala/unit/kafka/cluster/PartitionTest.scala: ########## @@ -1367,6 +1368,10 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(alterPartitionManager.isrUpdates.size, 1) val isrItem = alterPartitionManager.isrUpdates.head assertEquals(isrItem.leaderAndIsr.isr, List(brokerId, remoteBrokerId)) + isrItem.leaderAndIsr.isrWithBrokerEpoch.foreach(brokerState => { + // In ZK mode, the broker epochs in the leaderAndIsr should be -1. Review Comment: How do we know this is ZK mode only? ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -992,6 +1003,11 @@ class Partition(val topicPartition: TopicPartition, } } + private def isBrokerEpochIsrEligible(storedBrokerEpoch: Option[Long], cachedBrokerEpoch: Option[Long]): Boolean = { + storedBrokerEpoch.isDefined && cachedBrokerEpoch.isDefined && + (storedBrokerEpoch.get == -1 || (storedBrokerEpoch == cachedBrokerEpoch)) Review Comment: No need for the second nested brackets? ########## core/src/test/scala/unit/kafka/cluster/PartitionTest.scala: ########## @@ -1553,6 +1562,102 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(0, alterPartitionManager.isrUpdates.size) } + @Test + def testIsrCanExpandedIfBrokerEpochsMatchWithKraftMetadataCache(): Unit = { + val log = logManager.getOrCreateLog(topicPartition, topicId = None) + seedLogData(log, numRecords = 10, leaderEpoch = 4) + + val controllerEpoch = 0 + val leaderEpoch = 5 + val remoteBrokerId1 = brokerId + 1 + val remoteBrokerId2 = brokerId + 2 + val replicas = List(brokerId, remoteBrokerId1, remoteBrokerId2) + val isr = Set(brokerId, remoteBrokerId2) + + val metadataCache: MetadataCache = mock(classOf[KRaftMetadataCache]) + addBrokerEpochToMockMetadataCache(metadataCache.asInstanceOf[KRaftMetadataCache], replicas) + + // Mark the remote broker 1 as eligible in the metadata cache. + when(metadataCache.asInstanceOf[KRaftMetadataCache].isBrokerFenced(remoteBrokerId1)).thenReturn(false) + when(metadataCache.asInstanceOf[KRaftMetadataCache].isBrokerShuttingDown(remoteBrokerId1)).thenReturn(false) + + val partition = new Partition( + topicPartition, + replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, + interBrokerProtocolVersion = MetadataVersion.latest, + localBrokerId = brokerId, + () => defaultBrokerEpoch(brokerId), + time, + alterPartitionListener, + delayedOperations, + metadataCache, + logManager, + alterPartitionManager, + ) + + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + assertTrue(partition.makeLeader( + new LeaderAndIsrPartitionState() + .setControllerEpoch(controllerEpoch) + .setLeader(brokerId) + .setLeaderEpoch(leaderEpoch) + .setIsr(isr.toList.map(Int.box).asJava) + .setPartitionEpoch(1) + .setReplicas(replicas.map(Int.box).asJava) + .setIsNew(true), + offsetCheckpoints, None), "Expected become leader transition to succeed") + assertEquals(isr, partition.partitionState.isr) + assertEquals(isr, partition.partitionState.maximalIsr) + + // Fetch to let the follower catch up to the log end offset, but using a wrong broker epoch. The expansion should fail. + val wrongReplicaEpoch = defaultBrokerEpoch(remoteBrokerId1) + 1 + fetchFollower(partition, + replicaId = remoteBrokerId1, + fetchOffset = log.logEndOffset, + replicaEpoch = wrongReplicaEpoch + ) + + assertReplicaState(partition, remoteBrokerId1, + lastCaughtUpTimeMs = time.milliseconds(), + logStartOffset = 0L, + logEndOffset = log.logEndOffset, + brokerEpoch = wrongReplicaEpoch + ) + + // Expansion is not triggered. + assertEquals(isr, partition.partitionState.isr) + assertEquals(isr, partition.partitionState.maximalIsr) + assertEquals(0, alterPartitionManager.isrUpdates.size) + + // Fetch again, this time with correct default broker epoch. + fetchFollower(partition, + replicaId = remoteBrokerId1, + fetchOffset = log.logEndOffset + ) + + // Follower should still catch up to the log end offset. + assertReplicaState(partition, remoteBrokerId1, + lastCaughtUpTimeMs = time.milliseconds(), + logStartOffset = 0L, + logEndOffset = log.logEndOffset + ) + + // Expansion is triggered. + assertEquals(isr, partition.partitionState.isr) + assertEquals(replicas.toSet, partition.partitionState.maximalIsr) + assertEquals(1, alterPartitionManager.isrUpdates.size) + val isrUpdate = alterPartitionManager.isrUpdates.head + isrUpdate.leaderAndIsr.isrWithBrokerEpoch.foreach(brokerState => { Review Comment: Could we do `foreach { brokerState => ... }`? ########## core/src/test/scala/unit/kafka/cluster/PartitionTest.scala: ########## @@ -3255,4 +3402,9 @@ class PartitionTest extends AbstractPartitionTest { ) } + private def addBrokerEpochToMockMetadataCache(kRaftMetadataCache: KRaftMetadataCache, brokers: List[Int]): Unit = { + brokers.foreach(broker => { Review Comment: Could we do `foreach { brokerState => ... }`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org