junrao commented on code in PR #13489:
URL: https://github.com/apache/kafka/pull/13489#discussion_r1160114047


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1691,6 +1710,30 @@ class Partition(val topicPartition: TopicPartition,
     updatedState
   }
 
+  private def addBrokerEpochToIsr(isr: List[Int]): List[BrokerState] = {
+    isr.map { brokerId =>
+      val brokerState = new BrokerState().setBrokerId(brokerId)
+      if (!metadataCache.isInstanceOf[KRaftMetadataCache]) {
+        brokerState.setBrokerEpoch(-1)
+      } else if (brokerId == localBrokerId) {
+        brokerState.setBrokerEpoch(localBrokerEpochSupplier())
+      } else if (!remoteReplicasMap.contains(brokerId) || 
!remoteReplicasMap.get(brokerId).stateSnapshot.brokerEpoch.isDefined) {

Review Comment:
   remoteReplicasMap can change btw the call of `contains` and `get`. So, it's 
probably better to save the `get` result to a local val and then do the check 
on the val. Ditto for `brokerEpoch`.



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -3229,11 +3374,13 @@ class PartitionTest extends AbstractPartitionTest {
     leaderEpoch: Option[Int] = None,
     lastFetchedEpoch: Option[Int] = None,
     fetchTimeMs: Long = time.milliseconds(),
-    topicId: Uuid = Uuid.ZERO_UUID
+    topicId: Uuid = Uuid.ZERO_UUID,
+    replicaEpoch: Long = -2

Review Comment:
   Should we change this to use `Option[Long]` too?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -992,6 +1003,11 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
+  private def isBrokerEpochIsrEligible(storedBrokerEpoch: Option[Long], 
cachedBrokerEpoch: Option[Long]): Boolean = {
+    storedBrokerEpoch.isDefined && cachedBrokerEpoch.isDefined &&
+      (storedBrokerEpoch.get == -1 || (storedBrokerEpoch == cachedBrokerEpoch))

Review Comment:
   It just meant that could we change the code to the following?
   
   `(storedBrokerEpoch.get == -1 || storedBrokerEpoch == cachedBrokerEpoch)`



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1691,6 +1710,30 @@ class Partition(val topicPartition: TopicPartition,
     updatedState
   }
 
+  private def addBrokerEpochToIsr(isr: List[Int]): List[BrokerState] = {
+    isr.map { brokerId =>
+      val brokerState = new BrokerState().setBrokerId(brokerId)
+      if (!metadataCache.isInstanceOf[KRaftMetadataCache]) {
+        brokerState.setBrokerEpoch(-1)
+      } else if (brokerId == localBrokerId) {
+        brokerState.setBrokerEpoch(localBrokerEpochSupplier())
+      } else if (!remoteReplicasMap.contains(brokerId) || 
!remoteReplicasMap.get(brokerId).stateSnapshot.brokerEpoch.isDefined) {
+        // There are two cases where the broker epoch can be missing:
+        // 1. During ISR expansion, we already held lock for the partition and 
did the broker epoch check, so the new
+        //   ISR replica should have a valid broker epoch. Then, the missing 
broker epoch can only happens to the

Review Comment:
   happens => happen



##########
core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala:
##########
@@ -76,11 +78,56 @@ class AlterPartitionManagerTest {
     val scheduler = new MockScheduler(time)
     val alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () 
=> 2, () => metadataVersion)
     alterPartitionManager.start()
-    alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3), 
LeaderRecoveryState.RECOVERED, 10), 0)
+    alterPartitionManager.submit(tp0, LeaderAndIsr(1, 1, List(1, 2, 3), 
LeaderRecoveryState.RECOVERED, 10), 0)
     verify(brokerToController).start()
     verify(brokerToController).sendRequest(any(), any())
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("provideMetadataVersions"))
+  def testBasicWithBrokerEpoch(metadataVersion: MetadataVersion): Unit = {
+    val scheduler = new MockScheduler(time)
+    val alterPartitionManager = new 
DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () 
=> 101, () => metadataVersion)
+    alterPartitionManager.start()
+    val isrWithBrokerEpoch = ListBuffer[BrokerState]()
+    for (ii <- 1 to 3) {
+      isrWithBrokerEpoch += new 
BrokerState().setBrokerId(ii).setBrokerEpoch(100 + ii)
+    }
+    alterPartitionManager.submit(tp0, LeaderAndIsr(1, 1, 
LeaderRecoveryState.RECOVERED, isrWithBrokerEpoch.toList, 10), 0)
+
+    val message = new AlterPartitionRequestData()

Review Comment:
   message => expectedAlterPartitionData?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to