dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r713916798



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1424,6 +1428,21 @@ class ReplicaManager(val config: KafkaConfig,
           val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
           updateLeaderAndFollowerMetrics(followerTopicSet)
 
+          if (topicIdUpdateFollowerPartitions.nonEmpty)
+            updateTopicIdForFollowers(controllerId, controllerEpoch, 
topicIdUpdateFollowerPartitions, correlationId, topicIdFromRequest)
+
+          leaderAndIsrRequest.partitionStates.forEach { partitionState =>
+            val topicPartition = new TopicPartition(partitionState.topicName, 
partitionState.partitionIndex)
+            /*
+           * If there is offline log directory, a Partition object may have 
been created by getOrCreatePartition()
+           * before getOrCreateReplica() failed to create local replica due to 
KafkaStorageException.
+           * In this case ReplicaManager.allPartitions will map this 
topic-partition to an empty Partition object.
+           * we need to map this topic-partition to OfflinePartition instead.
+           */
+            if (localLog(topicPartition).isEmpty)
+              markPartitionOffline(topicPartition)
+          }

Review comment:
       I think that this code was removed in trunk. Did you bring it back by 
mistake?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1396,6 +1398,8 @@ class ReplicaManager(val config: KafkaConfig,
                     stateChangeLogger.info(s"Updating log for $topicPartition 
to assign topic ID " +
                       s"$topicId from LeaderAndIsr request from controller 
$controllerId with correlation " +
                       s"id $correlationId epoch $controllerEpoch")
+                    if (partitionState.leader != localBrokerId && 
metadataCache.hasAliveBroker(partitionState.leader))

Review comment:
       nit: I think that I would prefer to have 
`metadataCache.hasAliveBroker(partitionState.leader` in  
`updateTopicIdForFollowers` after all. We could do this check when we build the 
Map at L1778.

##########
File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
##########
@@ -76,7 +76,13 @@ class ReplicaAlterLogDirsThread(name: String,
     var partitionData: Seq[(TopicPartition, FetchData)] = null
     val request = fetchRequest.build()
 
-    val (topicIds, topicNames) = replicaMgr.metadataCache.topicIdInfo()
+    val topicIds = new mutable.HashMap[String, Uuid]()

Review comment:
       We should add a small comment here which explains why it is actually OK 
to re-build the mapping from the request. It is not obvious at first.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1739,6 +1758,37 @@ class ReplicaManager(val config: KafkaConfig,
     partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+                                        controllerEpoch: Int,
+                                        partitionStates: Set[Partition],
+                                        correlationId: Int,
+                                        topicIds: String => Option[Uuid]): 
Unit = {
+    val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
+
+    try {
+      if (isShuttingDown.get()) {
+        if (traceLoggingEnabled) {
+          partitionStates.foreach { partition =>
+            stateChangeLogger.trace(s"Skipped the update topic ID step of the 
become-follower state " +
+              s"change with correlation id $correlationId from controller 
$controllerId epoch $controllerEpoch for " +
+              s"partition ${partition.topicPartition} since it is shutting 
down")
+          }
+        }
+      } else {
+        val partitionsToUpdateFollowerWithLeader = partitionStates.map { 
partition =>
+          partition.topicPartition -> 
partition.leaderReplicaIdOpt.getOrElse(-1)
+        }.toMap
+        
replicaFetcherManager.maybeUpdateTopicIds(partitionsToUpdateFollowerWithLeader, 
topicIds)
+      }
+    } catch {
+      case e: Throwable =>
+        stateChangeLogger.error(s"Error while processing LeaderAndIsr request 
with correlationId $correlationId " +
+          s"received from controller $controllerId epoch $controllerEpoch", e)

Review comment:
       Should we update this log to be specific about the topic ID update?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1739,6 +1758,37 @@ class ReplicaManager(val config: KafkaConfig,
     partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+                                        controllerEpoch: Int,
+                                        partitionStates: Set[Partition],
+                                        correlationId: Int,
+                                        topicIds: String => Option[Uuid]): 
Unit = {
+    val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
+
+    try {
+      if (isShuttingDown.get()) {
+        if (traceLoggingEnabled) {
+          partitionStates.foreach { partition =>
+            stateChangeLogger.trace(s"Skipped the update topic ID step of the 
become-follower state " +
+              s"change with correlation id $correlationId from controller 
$controllerId epoch $controllerEpoch for " +
+              s"partition ${partition.topicPartition} since it is shutting 
down")
+          }
+        }
+      } else {
+        val partitionsToUpdateFollowerWithLeader = partitionStates.map { 
partition =>
+          partition.topicPartition -> 
partition.leaderReplicaIdOpt.getOrElse(-1)

Review comment:
       nit: Should we simply ignore a partition without leader instead of using 
`-1`? This case should never happen, I think. We could do something like this:
   
   ```
   val partitionsToUpdateFollowerWithLeader = mutable.Map.empty[TopicPartition, 
Int]
   partitionStates.foreach { partition =>
     partition.leaderReplicaIdOpt.foreach { leader =>
       if (metadataCache.hasAliveBroker(leader)) {
         partitionsToUpdateFollowerWithLeader += partition.topicPartition -> 
leader
       }
     }
   }
   
   ```
   
   

##########
File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
##########
@@ -76,7 +76,13 @@ class ReplicaAlterLogDirsThread(name: String,
     var partitionData: Seq[(TopicPartition, FetchData)] = null
     val request = fetchRequest.build()
 
-    val (topicIds, topicNames) = replicaMgr.metadataCache.topicIdInfo()
+    val topicIds = new mutable.HashMap[String, Uuid]()
+    val topicNames = new mutable.HashMap[Uuid, String]()
+    request.data.topics.asScala.foreach { topic =>

Review comment:
       nit: You could use `forEach` instead of `asScala.foreach` here. It 
avoids the scala conversion.

##########
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -133,4 +137,75 @@ class AbstractFetcherManagerTest {
     assertEquals(0, fetcherManager.deadThreadCount)
     EasyMock.verify(fetcher)
   }
+
+  @Test
+  def testMaybeUpdateTopicIds(): Unit = {
+    val fetcher: AbstractFetcherThread = 
EasyMock.mock(classOf[AbstractFetcherThread])
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 2) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        fetcher
+      }
+    }
+
+    val fetchOffset = 10L
+    val leaderEpoch = 15
+    val tp1 = new TopicPartition("topic", 0)
+    val tp2 = new TopicPartition("topic", 1)
+    val topicId = Some(Uuid.randomUuid())
+
+    // Start out with no topic ID.
+    val initialFetchState1 = InitialFetchState(
+      topicId = None,
+      leader = new BrokerEndPoint(0, "localhost", 9092),
+      currentLeaderEpoch = leaderEpoch,
+      initOffset = fetchOffset)
+
+    // Include a partition on a different leader
+    val initialFetchState2 = InitialFetchState(
+      topicId = None,
+      leader = new BrokerEndPoint(1, "localhost", 9092),
+      currentLeaderEpoch = leaderEpoch,
+      initOffset = fetchOffset)
+
+    val partitionsToUpdate = Map(tp1 -> initialFetchState1.leader.id, tp2 -> 
initialFetchState2.leader.id)
+    val topicIds = (_: String) => topicId
+
+    // Simulate calls to different fetchers due to different leaders
+    EasyMock.expect(fetcher.start())
+    EasyMock.expect(fetcher.start())
+
+    EasyMock.expect(fetcher.addPartitions(Map(tp1 -> initialFetchState1)))
+      .andReturn(Set(tp1))
+    EasyMock.expect(fetcher.addPartitions(Map(tp2 -> initialFetchState2)))
+      .andReturn(Set(tp2))
+
+    EasyMock.expect(fetcher.fetchState(tp1))
+      .andReturn(Some(PartitionFetchState(None, fetchOffset, None, 
leaderEpoch, Truncating, lastFetchedEpoch = None)))
+    EasyMock.expect(fetcher.fetchState(tp2))
+      .andReturn(Some(PartitionFetchState(None, fetchOffset, None, 
leaderEpoch, Truncating, lastFetchedEpoch = None)))
+
+    EasyMock.expect(fetcher.maybeUpdateTopicIds(Set(tp1), topicIds))
+    EasyMock.expect(fetcher.maybeUpdateTopicIds(Set(tp2), topicIds))
+
+    EasyMock.expect(fetcher.fetchState(tp1))
+      .andReturn(Some(PartitionFetchState(topicId, fetchOffset, None, 
leaderEpoch, Truncating, lastFetchedEpoch = None)))
+    EasyMock.expect(fetcher.fetchState(tp2))
+      .andReturn(Some(PartitionFetchState(topicId, fetchOffset, None, 
leaderEpoch, Truncating, lastFetchedEpoch = None)))
+    EasyMock.replay(fetcher)
+
+    def verifyFetchState(fetchState: Option[PartitionFetchState], 
expectedTopicId: Option[Uuid]): Unit = {
+      assertTrue(fetchState.isDefined)
+      assertEquals(expectedTopicId, fetchState.get.topicId)
+    }
+
+    fetcherManager.addFetcherForPartitions(Map(tp1 -> initialFetchState1, tp2 
-> initialFetchState2))
+    verifyFetchState(fetcher.fetchState(tp1), None)
+    verifyFetchState(fetcher.fetchState(tp2), None)
+
+    fetcherManager.maybeUpdateTopicIds(partitionsToUpdate, topicIds)
+    verifyFetchState(fetcher.fetchState(tp1), topicId)
+    verifyFetchState(fetcher.fetchState(tp2), topicId)
+
+    EasyMock.verify(fetcher)

Review comment:
       Should we try to update a topic-partition which does not exist and to 
target a fetcher thread which does not exist neither?

##########
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -133,4 +137,75 @@ class AbstractFetcherManagerTest {
     assertEquals(0, fetcherManager.deadThreadCount)
     EasyMock.verify(fetcher)
   }
+
+  @Test
+  def testMaybeUpdateTopicIds(): Unit = {
+    val fetcher: AbstractFetcherThread = 
EasyMock.mock(classOf[AbstractFetcherThread])
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 2) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        fetcher
+      }
+    }
+
+    val fetchOffset = 10L
+    val leaderEpoch = 15
+    val tp1 = new TopicPartition("topic", 0)
+    val tp2 = new TopicPartition("topic", 1)
+    val topicId = Some(Uuid.randomUuid())
+
+    // Start out with no topic ID.
+    val initialFetchState1 = InitialFetchState(
+      topicId = None,
+      leader = new BrokerEndPoint(0, "localhost", 9092),
+      currentLeaderEpoch = leaderEpoch,
+      initOffset = fetchOffset)
+
+    // Include a partition on a different leader
+    val initialFetchState2 = InitialFetchState(
+      topicId = None,
+      leader = new BrokerEndPoint(1, "localhost", 9092),
+      currentLeaderEpoch = leaderEpoch,
+      initOffset = fetchOffset)
+
+    val partitionsToUpdate = Map(tp1 -> initialFetchState1.leader.id, tp2 -> 
initialFetchState2.leader.id)
+    val topicIds = (_: String) => topicId

Review comment:
       nit: I would move those two before `maybeUpdateTopicIds` is called. you 
could even pass them inline as well. For `topicIds`, would it make sense to use 
two different topic ids? You should be able to use a Map directly and pass it 
directly to `maybeUpdateTopicIds` (a Map is a function as well if I remember 
correctly).

##########
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##########
@@ -929,6 +929,29 @@ class AbstractFetcherThreadTest {
     fetcher.verifyLastFetchedEpoch(partition, Some(5))
   }
 
+  @Test
+  def testMaybeUpdateTopicIds(): Unit = {
+    val partition = new TopicPartition("topic1", 0)
+    val fetcher = new MockFetcherThread
+
+    // Start with no topic IDs
+    fetcher.setReplicaState(partition, 
MockFetcherThread.PartitionState(leaderEpoch = 0))
+    fetcher.addPartitions(Map(partition -> initialFetchState(None, 0L, 
leaderEpoch = 0)))
+

Review comment:
       nit: Empty line could be removed.

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -3469,4 +3472,103 @@ class ReplicaManagerTest {
       ClientQuotasImage.EMPTY
     )
   }
+
+  def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: 
AbstractFetcherManager[T], tp: TopicPartition, expectedTopicId: Option[Uuid]): 
Unit = {

Review comment:
       nit: Could we split the arguments over multiple lines here? The line is 
quite long.

##########
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##########
@@ -929,6 +929,29 @@ class AbstractFetcherThreadTest {
     fetcher.verifyLastFetchedEpoch(partition, Some(5))
   }
 
+  @Test
+  def testMaybeUpdateTopicIds(): Unit = {
+    val partition = new TopicPartition("topic1", 0)
+    val fetcher = new MockFetcherThread
+
+    // Start with no topic IDs
+    fetcher.setReplicaState(partition, 
MockFetcherThread.PartitionState(leaderEpoch = 0))
+    fetcher.addPartitions(Map(partition -> initialFetchState(None, 0L, 
leaderEpoch = 0)))
+
+
+    def verifyFetchState(fetchState: Option[PartitionFetchState], 
expectedTopicId: Option[Uuid]): Unit = {
+      assertTrue(fetchState.isDefined)
+      assertEquals(expectedTopicId, fetchState.get.topicId)
+    }
+
+    verifyFetchState(fetcher.fetchState(partition), None)
+
+    // Add topic ID
+    fetcher.maybeUpdateTopicIds(Set(partition), topicName => 
topicIds.get(topicName))

Review comment:
       Should we also test few negative cases?

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -3469,4 +3472,103 @@ class ReplicaManagerTest {
       ClientQuotasImage.EMPTY
     )
   }
+
+  def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: 
AbstractFetcherManager[T], tp: TopicPartition, expectedTopicId: Option[Uuid]): 
Unit = {
+    val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp))
+    assertTrue(fetchState.isDefined)
+    assertEquals(expectedTopicId, fetchState.get.topicId)
+  }
+
+  @Test
+  def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = {
+    val aliveBrokersIds = Seq(0, 1)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+      brokerId = 0, aliveBrokersIds)
+    try {
+      val tp = new TopicPartition(topic, 0)
+      val leaderAndIsr = new LeaderAndIsr(1, 0, List(0,1), 0)
+
+      val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, 
aliveBrokersIds, leaderAndIsr)
+      val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest1, (_, _) => ())

Review comment:
       nit: `leaderAndIsrResponse` -> `leaderAndIsrResponse1` to following your 
naming convention?

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -3469,4 +3472,103 @@ class ReplicaManagerTest {
       ClientQuotasImage.EMPTY
     )
   }
+
+  def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: 
AbstractFetcherManager[T], tp: TopicPartition, expectedTopicId: Option[Uuid]): 
Unit = {
+    val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp))
+    assertTrue(fetchState.isDefined)
+    assertEquals(expectedTopicId, fetchState.get.topicId)
+  }
+
+  @Test
+  def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = {
+    val aliveBrokersIds = Seq(0, 1)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+      brokerId = 0, aliveBrokersIds)
+    try {
+      val tp = new TopicPartition(topic, 0)
+      val leaderAndIsr = new LeaderAndIsr(1, 0, List(0,1), 0)

Review comment:
       nit: Could we reuse `aliveBrokersIds` instead of `List(0,1)`?

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -3469,4 +3472,103 @@ class ReplicaManagerTest {
       ClientQuotasImage.EMPTY
     )
   }
+
+  def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: 
AbstractFetcherManager[T], tp: TopicPartition, expectedTopicId: Option[Uuid]): 
Unit = {
+    val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp))
+    assertTrue(fetchState.isDefined)
+    assertEquals(expectedTopicId, fetchState.get.topicId)
+  }
+
+  @Test
+  def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = {
+    val aliveBrokersIds = Seq(0, 1)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+      brokerId = 0, aliveBrokersIds)
+    try {
+      val tp = new TopicPartition(topic, 0)
+      val leaderAndIsr = new LeaderAndIsr(1, 0, List(0,1), 0)
+
+      val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, 
aliveBrokersIds, leaderAndIsr)
+      val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest1, (_, _) => ())
+      assertEquals(Errors.NONE, leaderAndIsrResponse.error)
+
+      assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None)
+
+      val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, 
aliveBrokersIds, leaderAndIsr)
+      val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest2, (_, _) => ())
+      assertEquals(Errors.NONE, leaderAndIsrResponse2.error)
+
+      assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, 
Some(topicId))
+
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @Test
+  def testReplicaAlterLogDirsWithAndWithoutIds(): Unit = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time))
+    try {
+      val topicPartition = new TopicPartition(topic, 0)
+      replicaManager.createPartition(topicPartition)
+        .createLogIfNotExists(isNew = false, isFutureReplica = false,
+          new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), 
None)
+      val tp = new TopicPartition(topic, 0)
+      val leaderAndIsr = new LeaderAndIsr(0, 0, List(0,1), 0)

Review comment:
       nit: Should we also define `val aliveBrokersIds = Seq(0, 1)` and use it 
here and in other places?

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -3469,4 +3472,103 @@ class ReplicaManagerTest {
       ClientQuotasImage.EMPTY
     )
   }
+
+  def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: 
AbstractFetcherManager[T], tp: TopicPartition, expectedTopicId: Option[Uuid]): 
Unit = {
+    val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp))
+    assertTrue(fetchState.isDefined)
+    assertEquals(expectedTopicId, fetchState.get.topicId)
+  }
+
+  @Test
+  def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = {
+    val aliveBrokersIds = Seq(0, 1)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+      brokerId = 0, aliveBrokersIds)
+    try {
+      val tp = new TopicPartition(topic, 0)
+      val leaderAndIsr = new LeaderAndIsr(1, 0, List(0,1), 0)
+
+      val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, 
aliveBrokersIds, leaderAndIsr)
+      val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest1, (_, _) => ())
+      assertEquals(Errors.NONE, leaderAndIsrResponse.error)
+
+      assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None)
+
+      val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, 
aliveBrokersIds, leaderAndIsr)
+      val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest2, (_, _) => ())
+      assertEquals(Errors.NONE, leaderAndIsrResponse2.error)
+
+      assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, 
Some(topicId))
+
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @Test
+  def testReplicaAlterLogDirsWithAndWithoutIds(): Unit = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time))
+    try {
+      val topicPartition = new TopicPartition(topic, 0)
+      replicaManager.createPartition(topicPartition)
+        .createLogIfNotExists(isNew = false, isFutureReplica = false,
+          new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), 
None)
+      val tp = new TopicPartition(topic, 0)
+      val leaderAndIsr = new LeaderAndIsr(0, 0, List(0,1), 0)
+
+      val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, 
Seq(0,1), leaderAndIsr, version = 4)
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => 
())
+      var partition = replicaManager.getPartitionOrException(new 
TopicPartition(topic, 0))

Review comment:
       nit: Could we reuse `topicPartition` here?

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String,
     } finally partitionMapLock.unlock()
   }
 
+  def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: 
String => Option[Uuid]) = {
+    partitionMapLock.lockInterruptibly()
+    try {
+      partitions.foreach { tp =>
+        val currentState = partitionStates.stateValue(tp)
+        if (currentState != null) {
+          val updatedState = currentState.updateTopicId(topicIds(tp.topic))
+          partitionStates.updateAndMoveToEnd(tp, updatedState)

Review comment:
       Yeah, we could add an update method as follow:
   
   ```
       public void update(TopicPartition topicPartition, S state) {
           map.put(topicPartition, state);
           updateSize();
       }
   ```

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -3469,4 +3472,103 @@ class ReplicaManagerTest {
       ClientQuotasImage.EMPTY
     )
   }
+
+  def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: 
AbstractFetcherManager[T], tp: TopicPartition, expectedTopicId: Option[Uuid]): 
Unit = {
+    val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp))
+    assertTrue(fetchState.isDefined)
+    assertEquals(expectedTopicId, fetchState.get.topicId)
+  }
+
+  @Test
+  def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = {
+    val aliveBrokersIds = Seq(0, 1)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+      brokerId = 0, aliveBrokersIds)
+    try {
+      val tp = new TopicPartition(topic, 0)
+      val leaderAndIsr = new LeaderAndIsr(1, 0, List(0,1), 0)
+
+      val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, 
aliveBrokersIds, leaderAndIsr)
+      val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest1, (_, _) => ())
+      assertEquals(Errors.NONE, leaderAndIsrResponse.error)
+
+      assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None)
+
+      val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, 
aliveBrokersIds, leaderAndIsr)
+      val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest2, (_, _) => ())
+      assertEquals(Errors.NONE, leaderAndIsrResponse2.error)
+
+      assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, 
Some(topicId))
+
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @Test
+  def testReplicaAlterLogDirsWithAndWithoutIds(): Unit = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time))
+    try {
+      val topicPartition = new TopicPartition(topic, 0)
+      replicaManager.createPartition(topicPartition)
+        .createLogIfNotExists(isNew = false, isFutureReplica = false,
+          new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), 
None)
+      val tp = new TopicPartition(topic, 0)
+      val leaderAndIsr = new LeaderAndIsr(0, 0, List(0,1), 0)
+
+      val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, 
Seq(0,1), leaderAndIsr, version = 4)
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => 
())
+      var partition = replicaManager.getPartitionOrException(new 
TopicPartition(topic, 0))
+      assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == 
partition.log.get.dir.getParentFile).size)
+
+      // Append a couple of messages.
+      for (i <- 1 to 40) {
+        val records = TestUtils.singletonRecords(s"message $i".getBytes)
+        appendRecords(replicaManager, new TopicPartition(topic, 0), 
records).onFire { response =>

Review comment:
       nit: Could we reuse topicPartition here?

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -3469,4 +3472,103 @@ class ReplicaManagerTest {
       ClientQuotasImage.EMPTY
     )
   }
+
+  def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: 
AbstractFetcherManager[T], tp: TopicPartition, expectedTopicId: Option[Uuid]): 
Unit = {
+    val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp))
+    assertTrue(fetchState.isDefined)
+    assertEquals(expectedTopicId, fetchState.get.topicId)
+  }
+
+  @Test
+  def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = {
+    val aliveBrokersIds = Seq(0, 1)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+      brokerId = 0, aliveBrokersIds)
+    try {
+      val tp = new TopicPartition(topic, 0)
+      val leaderAndIsr = new LeaderAndIsr(1, 0, List(0,1), 0)
+
+      val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, 
aliveBrokersIds, leaderAndIsr)
+      val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest1, (_, _) => ())
+      assertEquals(Errors.NONE, leaderAndIsrResponse.error)
+
+      assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None)
+
+      val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, 
aliveBrokersIds, leaderAndIsr)
+      val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest2, (_, _) => ())
+      assertEquals(Errors.NONE, leaderAndIsrResponse2.error)
+
+      assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, 
Some(topicId))
+
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @Test
+  def testReplicaAlterLogDirsWithAndWithoutIds(): Unit = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time))
+    try {
+      val topicPartition = new TopicPartition(topic, 0)
+      replicaManager.createPartition(topicPartition)
+        .createLogIfNotExists(isNew = false, isFutureReplica = false,
+          new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), 
None)
+      val tp = new TopicPartition(topic, 0)
+      val leaderAndIsr = new LeaderAndIsr(0, 0, List(0,1), 0)
+
+      val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, 
Seq(0,1), leaderAndIsr, version = 4)
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => 
())
+      var partition = replicaManager.getPartitionOrException(new 
TopicPartition(topic, 0))
+      assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == 
partition.log.get.dir.getParentFile).size)
+
+      // Append a couple of messages.
+      for (i <- 1 to 40) {
+        val records = TestUtils.singletonRecords(s"message $i".getBytes)
+        appendRecords(replicaManager, new TopicPartition(topic, 0), 
records).onFire { response =>
+          assertEquals(Errors.NONE, response.error)
+        }
+      }
+
+      // find the live and different folder
+      val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_ 
== partition.log.get.dir.getParentFile).head
+      assertEquals(0, 
replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
+      replicaManager.alterReplicaLogDirs(Map(topicPartition -> 
newReplicaFolder.getAbsolutePath))
+
+      assertFetcherHasTopicId(replicaManager.replicaAlterLogDirsManager, 
partition.topicPartition, None)
+
+      // make sure the future log is created
+      replicaManager.futureLocalLogOrException(topicPartition)
+      assertEquals(1, 
replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
+
+      // wait for the ReplicaAlterLogDirsThread to complete
+      TestUtils.waitUntilTrue(() => {
+        replicaManager.replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+        replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.isEmpty
+      }, s"ReplicaAlterLogDirsThread should be gone")
+
+      val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, Seq(0,1), 
leaderAndIsr)
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest2, (_, _) => 
())
+      partition = replicaManager.getPartitionOrException(new 
TopicPartition(topic, 0))

Review comment:
       nit: Could we reuse topicPartition here?

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -2810,4 +2811,132 @@ class ReplicaManagerTest {
           Replicas.NONE, Replicas.NONE, 2, 123, 456)))),
     replicaManager.calculateDeltaChanges(TEST_DELTA))
   }
+
+  @Test
+  def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = {
+    val aliveBrokersIds = Seq(0, 1)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+      brokerId = 0, aliveBrokersIds)
+    try {
+      val tp = new TopicPartition(topic, 0)
+      val replicas = aliveBrokersIds.toList.map(Int.box).asJava
+
+      def leaderAndIsrRequest(epoch: Int, topicIds: util.Map[String, Uuid]): 
LeaderAndIsrRequest =
+        new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 
0, 0, brokerEpoch,
+          Seq(new LeaderAndIsrPartitionState()
+            .setTopicName(topic)
+            .setPartitionIndex(0)
+            .setControllerEpoch(0)
+            .setLeader(1)
+            .setLeaderEpoch(epoch)
+            .setIsr(replicas)
+            .setZkVersion(0)
+            .setReplicas(replicas)
+            .setIsNew(true)).asJava,
+          topicIds,
+          Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+
+      val leaderAndIsrRequest1 = leaderAndIsrRequest(0, Collections.emptyMap())
+      val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest1, (_, _) => ())
+      assertEquals(Errors.NONE, leaderAndIsrResponse.error)
+
+      val fetchState = 
replicaManager.replicaFetcherManager.getFetcher(tp).flatMap(fetcher => 
fetcher.fetchState(tp))
+      assertTrue(fetchState.isDefined)
+      assertEquals(None, fetchState.get.topicId)
+
+      val leaderAndIsrRequest2 = leaderAndIsrRequest(0, topicIds.asJava)
+      val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest2, (_, _) => ())
+      assertEquals(Errors.NONE, leaderAndIsrResponse2.error)
+
+      val fetchState2 = 
replicaManager.replicaFetcherManager.getFetcher(tp).flatMap(fetcher => 
fetcher.fetchState(tp))
+      assertTrue(fetchState2.isDefined)
+      assertEquals(Some(topicId), fetchState2.get.topicId)
+
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @Test
+  def testReplicaAlterLogDirsWithAndWithoutIds(): Unit = {

Review comment:
       Yeah, splitting the test is a good idea. It is pretty hard to read at 
the moment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to