junrao commented on code in PR #13489: URL: https://github.com/apache/kafka/pull/13489#discussion_r1160114047
########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -1691,6 +1710,30 @@ class Partition(val topicPartition: TopicPartition, updatedState } + private def addBrokerEpochToIsr(isr: List[Int]): List[BrokerState] = { + isr.map { brokerId => + val brokerState = new BrokerState().setBrokerId(brokerId) + if (!metadataCache.isInstanceOf[KRaftMetadataCache]) { + brokerState.setBrokerEpoch(-1) + } else if (brokerId == localBrokerId) { + brokerState.setBrokerEpoch(localBrokerEpochSupplier()) + } else if (!remoteReplicasMap.contains(brokerId) || !remoteReplicasMap.get(brokerId).stateSnapshot.brokerEpoch.isDefined) { Review Comment: remoteReplicasMap can change btw the call of `contains` and `get`. So, it's probably better to save the `get` result to a local val and then do the check on the val. Ditto for `brokerEpoch`. ########## core/src/test/scala/unit/kafka/cluster/PartitionTest.scala: ########## @@ -3229,11 +3374,13 @@ class PartitionTest extends AbstractPartitionTest { leaderEpoch: Option[Int] = None, lastFetchedEpoch: Option[Int] = None, fetchTimeMs: Long = time.milliseconds(), - topicId: Uuid = Uuid.ZERO_UUID + topicId: Uuid = Uuid.ZERO_UUID, + replicaEpoch: Long = -2 Review Comment: Should we change this to use `Option[Long]` too? ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -992,6 +1003,11 @@ class Partition(val topicPartition: TopicPartition, } } + private def isBrokerEpochIsrEligible(storedBrokerEpoch: Option[Long], cachedBrokerEpoch: Option[Long]): Boolean = { + storedBrokerEpoch.isDefined && cachedBrokerEpoch.isDefined && + (storedBrokerEpoch.get == -1 || (storedBrokerEpoch == cachedBrokerEpoch)) Review Comment: It just meant that could we change the code to the following? `(storedBrokerEpoch.get == -1 || storedBrokerEpoch == cachedBrokerEpoch)` ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -1691,6 +1710,30 @@ class Partition(val topicPartition: TopicPartition, updatedState } + private def addBrokerEpochToIsr(isr: List[Int]): List[BrokerState] = { + isr.map { brokerId => + val brokerState = new BrokerState().setBrokerId(brokerId) + if (!metadataCache.isInstanceOf[KRaftMetadataCache]) { + brokerState.setBrokerEpoch(-1) + } else if (brokerId == localBrokerId) { + brokerState.setBrokerEpoch(localBrokerEpochSupplier()) + } else if (!remoteReplicasMap.contains(brokerId) || !remoteReplicasMap.get(brokerId).stateSnapshot.brokerEpoch.isDefined) { + // There are two cases where the broker epoch can be missing: + // 1. During ISR expansion, we already held lock for the partition and did the broker epoch check, so the new + // ISR replica should have a valid broker epoch. Then, the missing broker epoch can only happens to the Review Comment: happens => happen ########## core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala: ########## @@ -76,11 +78,56 @@ class AlterPartitionManagerTest { val scheduler = new MockScheduler(time) val alterPartitionManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => metadataVersion) alterPartitionManager.start() - alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3), LeaderRecoveryState.RECOVERED, 10), 0) + alterPartitionManager.submit(tp0, LeaderAndIsr(1, 1, List(1, 2, 3), LeaderRecoveryState.RECOVERED, 10), 0) verify(brokerToController).start() verify(brokerToController).sendRequest(any(), any()) } + @ParameterizedTest + @MethodSource(Array("provideMetadataVersions")) + def testBasicWithBrokerEpoch(metadataVersion: MetadataVersion): Unit = { + val scheduler = new MockScheduler(time) + val alterPartitionManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 101, () => metadataVersion) + alterPartitionManager.start() + val isrWithBrokerEpoch = ListBuffer[BrokerState]() + for (ii <- 1 to 3) { + isrWithBrokerEpoch += new BrokerState().setBrokerId(ii).setBrokerEpoch(100 + ii) + } + alterPartitionManager.submit(tp0, LeaderAndIsr(1, 1, LeaderRecoveryState.RECOVERED, isrWithBrokerEpoch.toList, 10), 0) + + val message = new AlterPartitionRequestData() Review Comment: message => expectedAlterPartitionData? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org