dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r713916798
########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1424,6 +1428,21 @@ class ReplicaManager(val config: KafkaConfig, val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet updateLeaderAndFollowerMetrics(followerTopicSet) + if (topicIdUpdateFollowerPartitions.nonEmpty) + updateTopicIdForFollowers(controllerId, controllerEpoch, topicIdUpdateFollowerPartitions, correlationId, topicIdFromRequest) + + leaderAndIsrRequest.partitionStates.forEach { partitionState => + val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex) + /* + * If there is offline log directory, a Partition object may have been created by getOrCreatePartition() + * before getOrCreateReplica() failed to create local replica due to KafkaStorageException. + * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object. + * we need to map this topic-partition to OfflinePartition instead. + */ + if (localLog(topicPartition).isEmpty) + markPartitionOffline(topicPartition) + } Review comment: I think that this code was removed in trunk. Did you bring it back by mistake? ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1396,6 +1398,8 @@ class ReplicaManager(val config: KafkaConfig, stateChangeLogger.info(s"Updating log for $topicPartition to assign topic ID " + s"$topicId from LeaderAndIsr request from controller $controllerId with correlation " + s"id $correlationId epoch $controllerEpoch") + if (partitionState.leader != localBrokerId && metadataCache.hasAliveBroker(partitionState.leader)) Review comment: nit: I think that I would prefer to have `metadataCache.hasAliveBroker(partitionState.leader` in `updateTopicIdForFollowers` after all. We could do this check when we build the Map at L1778. ########## File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala ########## @@ -76,7 +76,13 @@ class ReplicaAlterLogDirsThread(name: String, var partitionData: Seq[(TopicPartition, FetchData)] = null val request = fetchRequest.build() - val (topicIds, topicNames) = replicaMgr.metadataCache.topicIdInfo() + val topicIds = new mutable.HashMap[String, Uuid]() Review comment: We should add a small comment here which explains why it is actually OK to re-build the mapping from the request. It is not obvious at first. ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1739,6 +1758,37 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, + controllerEpoch: Int, + partitionStates: Set[Partition], + correlationId: Int, + topicIds: String => Option[Uuid]): Unit = { + val traceLoggingEnabled = stateChangeLogger.isTraceEnabled + + try { + if (isShuttingDown.get()) { + if (traceLoggingEnabled) { + partitionStates.foreach { partition => + stateChangeLogger.trace(s"Skipped the update topic ID step of the become-follower state " + + s"change with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " + + s"partition ${partition.topicPartition} since it is shutting down") + } + } + } else { + val partitionsToUpdateFollowerWithLeader = partitionStates.map { partition => + partition.topicPartition -> partition.leaderReplicaIdOpt.getOrElse(-1) + }.toMap + replicaFetcherManager.maybeUpdateTopicIds(partitionsToUpdateFollowerWithLeader, topicIds) + } + } catch { + case e: Throwable => + stateChangeLogger.error(s"Error while processing LeaderAndIsr request with correlationId $correlationId " + + s"received from controller $controllerId epoch $controllerEpoch", e) Review comment: Should we update this log to be specific about the topic ID update? ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1739,6 +1758,37 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, + controllerEpoch: Int, + partitionStates: Set[Partition], + correlationId: Int, + topicIds: String => Option[Uuid]): Unit = { + val traceLoggingEnabled = stateChangeLogger.isTraceEnabled + + try { + if (isShuttingDown.get()) { + if (traceLoggingEnabled) { + partitionStates.foreach { partition => + stateChangeLogger.trace(s"Skipped the update topic ID step of the become-follower state " + + s"change with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " + + s"partition ${partition.topicPartition} since it is shutting down") + } + } + } else { + val partitionsToUpdateFollowerWithLeader = partitionStates.map { partition => + partition.topicPartition -> partition.leaderReplicaIdOpt.getOrElse(-1) Review comment: nit: Should we simply ignore a partition without leader instead of using `-1`? This case should never happen, I think. We could do something like this: ``` val partitionsToUpdateFollowerWithLeader = mutable.Map.empty[TopicPartition, Int] partitionStates.foreach { partition => partition.leaderReplicaIdOpt.foreach { leader => if (metadataCache.hasAliveBroker(leader)) { partitionsToUpdateFollowerWithLeader += partition.topicPartition -> leader } } } ``` ########## File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala ########## @@ -76,7 +76,13 @@ class ReplicaAlterLogDirsThread(name: String, var partitionData: Seq[(TopicPartition, FetchData)] = null val request = fetchRequest.build() - val (topicIds, topicNames) = replicaMgr.metadataCache.topicIdInfo() + val topicIds = new mutable.HashMap[String, Uuid]() + val topicNames = new mutable.HashMap[Uuid, String]() + request.data.topics.asScala.foreach { topic => Review comment: nit: You could use `forEach` instead of `asScala.foreach` here. It avoids the scala conversion. ########## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala ########## @@ -133,4 +137,75 @@ class AbstractFetcherManagerTest { assertEquals(0, fetcherManager.deadThreadCount) EasyMock.verify(fetcher) } + + @Test + def testMaybeUpdateTopicIds(): Unit = { + val fetcher: AbstractFetcherThread = EasyMock.mock(classOf[AbstractFetcherThread]) + val fetcherManager = new AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", "fetcher-manager", 2) { + override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = { + fetcher + } + } + + val fetchOffset = 10L + val leaderEpoch = 15 + val tp1 = new TopicPartition("topic", 0) + val tp2 = new TopicPartition("topic", 1) + val topicId = Some(Uuid.randomUuid()) + + // Start out with no topic ID. + val initialFetchState1 = InitialFetchState( + topicId = None, + leader = new BrokerEndPoint(0, "localhost", 9092), + currentLeaderEpoch = leaderEpoch, + initOffset = fetchOffset) + + // Include a partition on a different leader + val initialFetchState2 = InitialFetchState( + topicId = None, + leader = new BrokerEndPoint(1, "localhost", 9092), + currentLeaderEpoch = leaderEpoch, + initOffset = fetchOffset) + + val partitionsToUpdate = Map(tp1 -> initialFetchState1.leader.id, tp2 -> initialFetchState2.leader.id) + val topicIds = (_: String) => topicId + + // Simulate calls to different fetchers due to different leaders + EasyMock.expect(fetcher.start()) + EasyMock.expect(fetcher.start()) + + EasyMock.expect(fetcher.addPartitions(Map(tp1 -> initialFetchState1))) + .andReturn(Set(tp1)) + EasyMock.expect(fetcher.addPartitions(Map(tp2 -> initialFetchState2))) + .andReturn(Set(tp2)) + + EasyMock.expect(fetcher.fetchState(tp1)) + .andReturn(Some(PartitionFetchState(None, fetchOffset, None, leaderEpoch, Truncating, lastFetchedEpoch = None))) + EasyMock.expect(fetcher.fetchState(tp2)) + .andReturn(Some(PartitionFetchState(None, fetchOffset, None, leaderEpoch, Truncating, lastFetchedEpoch = None))) + + EasyMock.expect(fetcher.maybeUpdateTopicIds(Set(tp1), topicIds)) + EasyMock.expect(fetcher.maybeUpdateTopicIds(Set(tp2), topicIds)) + + EasyMock.expect(fetcher.fetchState(tp1)) + .andReturn(Some(PartitionFetchState(topicId, fetchOffset, None, leaderEpoch, Truncating, lastFetchedEpoch = None))) + EasyMock.expect(fetcher.fetchState(tp2)) + .andReturn(Some(PartitionFetchState(topicId, fetchOffset, None, leaderEpoch, Truncating, lastFetchedEpoch = None))) + EasyMock.replay(fetcher) + + def verifyFetchState(fetchState: Option[PartitionFetchState], expectedTopicId: Option[Uuid]): Unit = { + assertTrue(fetchState.isDefined) + assertEquals(expectedTopicId, fetchState.get.topicId) + } + + fetcherManager.addFetcherForPartitions(Map(tp1 -> initialFetchState1, tp2 -> initialFetchState2)) + verifyFetchState(fetcher.fetchState(tp1), None) + verifyFetchState(fetcher.fetchState(tp2), None) + + fetcherManager.maybeUpdateTopicIds(partitionsToUpdate, topicIds) + verifyFetchState(fetcher.fetchState(tp1), topicId) + verifyFetchState(fetcher.fetchState(tp2), topicId) + + EasyMock.verify(fetcher) Review comment: Should we try to update a topic-partition which does not exist and to target a fetcher thread which does not exist neither? ########## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala ########## @@ -133,4 +137,75 @@ class AbstractFetcherManagerTest { assertEquals(0, fetcherManager.deadThreadCount) EasyMock.verify(fetcher) } + + @Test + def testMaybeUpdateTopicIds(): Unit = { + val fetcher: AbstractFetcherThread = EasyMock.mock(classOf[AbstractFetcherThread]) + val fetcherManager = new AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", "fetcher-manager", 2) { + override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = { + fetcher + } + } + + val fetchOffset = 10L + val leaderEpoch = 15 + val tp1 = new TopicPartition("topic", 0) + val tp2 = new TopicPartition("topic", 1) + val topicId = Some(Uuid.randomUuid()) + + // Start out with no topic ID. + val initialFetchState1 = InitialFetchState( + topicId = None, + leader = new BrokerEndPoint(0, "localhost", 9092), + currentLeaderEpoch = leaderEpoch, + initOffset = fetchOffset) + + // Include a partition on a different leader + val initialFetchState2 = InitialFetchState( + topicId = None, + leader = new BrokerEndPoint(1, "localhost", 9092), + currentLeaderEpoch = leaderEpoch, + initOffset = fetchOffset) + + val partitionsToUpdate = Map(tp1 -> initialFetchState1.leader.id, tp2 -> initialFetchState2.leader.id) + val topicIds = (_: String) => topicId Review comment: nit: I would move those two before `maybeUpdateTopicIds` is called. you could even pass them inline as well. For `topicIds`, would it make sense to use two different topic ids? You should be able to use a Map directly and pass it directly to `maybeUpdateTopicIds` (a Map is a function as well if I remember correctly). ########## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ########## @@ -929,6 +929,29 @@ class AbstractFetcherThreadTest { fetcher.verifyLastFetchedEpoch(partition, Some(5)) } + @Test + def testMaybeUpdateTopicIds(): Unit = { + val partition = new TopicPartition("topic1", 0) + val fetcher = new MockFetcherThread + + // Start with no topic IDs + fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0)) + fetcher.addPartitions(Map(partition -> initialFetchState(None, 0L, leaderEpoch = 0))) + Review comment: nit: Empty line could be removed. ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -3469,4 +3472,103 @@ class ReplicaManagerTest { ClientQuotasImage.EMPTY ) } + + def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: AbstractFetcherManager[T], tp: TopicPartition, expectedTopicId: Option[Uuid]): Unit = { Review comment: nit: Could we split the arguments over multiple lines here? The line is quite long. ########## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ########## @@ -929,6 +929,29 @@ class AbstractFetcherThreadTest { fetcher.verifyLastFetchedEpoch(partition, Some(5)) } + @Test + def testMaybeUpdateTopicIds(): Unit = { + val partition = new TopicPartition("topic1", 0) + val fetcher = new MockFetcherThread + + // Start with no topic IDs + fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0)) + fetcher.addPartitions(Map(partition -> initialFetchState(None, 0L, leaderEpoch = 0))) + + + def verifyFetchState(fetchState: Option[PartitionFetchState], expectedTopicId: Option[Uuid]): Unit = { + assertTrue(fetchState.isDefined) + assertEquals(expectedTopicId, fetchState.get.topicId) + } + + verifyFetchState(fetcher.fetchState(partition), None) + + // Add topic ID + fetcher.maybeUpdateTopicIds(Set(partition), topicName => topicIds.get(topicName)) Review comment: Should we also test few negative cases? ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -3469,4 +3472,103 @@ class ReplicaManagerTest { ClientQuotasImage.EMPTY ) } + + def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: AbstractFetcherManager[T], tp: TopicPartition, expectedTopicId: Option[Uuid]): Unit = { + val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp)) + assertTrue(fetchState.isDefined) + assertEquals(expectedTopicId, fetchState.get.topicId) + } + + @Test + def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = { + val aliveBrokersIds = Seq(0, 1) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), + brokerId = 0, aliveBrokersIds) + try { + val tp = new TopicPartition(topic, 0) + val leaderAndIsr = new LeaderAndIsr(1, 0, List(0,1), 0) + + val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) Review comment: nit: `leaderAndIsrResponse` -> `leaderAndIsrResponse1` to following your naming convention? ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -3469,4 +3472,103 @@ class ReplicaManagerTest { ClientQuotasImage.EMPTY ) } + + def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: AbstractFetcherManager[T], tp: TopicPartition, expectedTopicId: Option[Uuid]): Unit = { + val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp)) + assertTrue(fetchState.isDefined) + assertEquals(expectedTopicId, fetchState.get.topicId) + } + + @Test + def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = { + val aliveBrokersIds = Seq(0, 1) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), + brokerId = 0, aliveBrokersIds) + try { + val tp = new TopicPartition(topic, 0) + val leaderAndIsr = new LeaderAndIsr(1, 0, List(0,1), 0) Review comment: nit: Could we reuse `aliveBrokersIds` instead of `List(0,1)`? ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -3469,4 +3472,103 @@ class ReplicaManagerTest { ClientQuotasImage.EMPTY ) } + + def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: AbstractFetcherManager[T], tp: TopicPartition, expectedTopicId: Option[Uuid]): Unit = { + val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp)) + assertTrue(fetchState.isDefined) + assertEquals(expectedTopicId, fetchState.get.topicId) + } + + @Test + def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = { + val aliveBrokersIds = Seq(0, 1) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), + brokerId = 0, aliveBrokersIds) + try { + val tp = new TopicPartition(topic, 0) + val leaderAndIsr = new LeaderAndIsr(1, 0, List(0,1), 0) + + val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse.error) + + assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None) + + val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest2, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse2.error) + + assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, Some(topicId)) + + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testReplicaAlterLogDirsWithAndWithoutIds(): Unit = { + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) + try { + val topicPartition = new TopicPartition(topic, 0) + replicaManager.createPartition(topicPartition) + .createLogIfNotExists(isNew = false, isFutureReplica = false, + new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), None) + val tp = new TopicPartition(topic, 0) + val leaderAndIsr = new LeaderAndIsr(0, 0, List(0,1), 0) Review comment: nit: Should we also define `val aliveBrokersIds = Seq(0, 1)` and use it here and in other places? ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -3469,4 +3472,103 @@ class ReplicaManagerTest { ClientQuotasImage.EMPTY ) } + + def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: AbstractFetcherManager[T], tp: TopicPartition, expectedTopicId: Option[Uuid]): Unit = { + val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp)) + assertTrue(fetchState.isDefined) + assertEquals(expectedTopicId, fetchState.get.topicId) + } + + @Test + def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = { + val aliveBrokersIds = Seq(0, 1) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), + brokerId = 0, aliveBrokersIds) + try { + val tp = new TopicPartition(topic, 0) + val leaderAndIsr = new LeaderAndIsr(1, 0, List(0,1), 0) + + val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse.error) + + assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None) + + val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest2, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse2.error) + + assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, Some(topicId)) + + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testReplicaAlterLogDirsWithAndWithoutIds(): Unit = { + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) + try { + val topicPartition = new TopicPartition(topic, 0) + replicaManager.createPartition(topicPartition) + .createLogIfNotExists(isNew = false, isFutureReplica = false, + new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), None) + val tp = new TopicPartition(topic, 0) + val leaderAndIsr = new LeaderAndIsr(0, 0, List(0,1), 0) + + val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, Seq(0,1), leaderAndIsr, version = 4) + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) + var partition = replicaManager.getPartitionOrException(new TopicPartition(topic, 0)) Review comment: nit: Could we reuse `topicPartition` here? ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String, } finally partitionMapLock.unlock() } + def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: String => Option[Uuid]) = { + partitionMapLock.lockInterruptibly() + try { + partitions.foreach { tp => + val currentState = partitionStates.stateValue(tp) + if (currentState != null) { + val updatedState = currentState.updateTopicId(topicIds(tp.topic)) + partitionStates.updateAndMoveToEnd(tp, updatedState) Review comment: Yeah, we could add an update method as follow: ``` public void update(TopicPartition topicPartition, S state) { map.put(topicPartition, state); updateSize(); } ``` ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -3469,4 +3472,103 @@ class ReplicaManagerTest { ClientQuotasImage.EMPTY ) } + + def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: AbstractFetcherManager[T], tp: TopicPartition, expectedTopicId: Option[Uuid]): Unit = { + val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp)) + assertTrue(fetchState.isDefined) + assertEquals(expectedTopicId, fetchState.get.topicId) + } + + @Test + def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = { + val aliveBrokersIds = Seq(0, 1) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), + brokerId = 0, aliveBrokersIds) + try { + val tp = new TopicPartition(topic, 0) + val leaderAndIsr = new LeaderAndIsr(1, 0, List(0,1), 0) + + val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse.error) + + assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None) + + val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest2, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse2.error) + + assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, Some(topicId)) + + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testReplicaAlterLogDirsWithAndWithoutIds(): Unit = { + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) + try { + val topicPartition = new TopicPartition(topic, 0) + replicaManager.createPartition(topicPartition) + .createLogIfNotExists(isNew = false, isFutureReplica = false, + new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), None) + val tp = new TopicPartition(topic, 0) + val leaderAndIsr = new LeaderAndIsr(0, 0, List(0,1), 0) + + val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, Seq(0,1), leaderAndIsr, version = 4) + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) + var partition = replicaManager.getPartitionOrException(new TopicPartition(topic, 0)) + assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == partition.log.get.dir.getParentFile).size) + + // Append a couple of messages. + for (i <- 1 to 40) { + val records = TestUtils.singletonRecords(s"message $i".getBytes) + appendRecords(replicaManager, new TopicPartition(topic, 0), records).onFire { response => Review comment: nit: Could we reuse topicPartition here? ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -3469,4 +3472,103 @@ class ReplicaManagerTest { ClientQuotasImage.EMPTY ) } + + def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: AbstractFetcherManager[T], tp: TopicPartition, expectedTopicId: Option[Uuid]): Unit = { + val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp)) + assertTrue(fetchState.isDefined) + assertEquals(expectedTopicId, fetchState.get.topicId) + } + + @Test + def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = { + val aliveBrokersIds = Seq(0, 1) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), + brokerId = 0, aliveBrokersIds) + try { + val tp = new TopicPartition(topic, 0) + val leaderAndIsr = new LeaderAndIsr(1, 0, List(0,1), 0) + + val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse.error) + + assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None) + + val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest2, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse2.error) + + assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, Some(topicId)) + + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testReplicaAlterLogDirsWithAndWithoutIds(): Unit = { + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) + try { + val topicPartition = new TopicPartition(topic, 0) + replicaManager.createPartition(topicPartition) + .createLogIfNotExists(isNew = false, isFutureReplica = false, + new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), None) + val tp = new TopicPartition(topic, 0) + val leaderAndIsr = new LeaderAndIsr(0, 0, List(0,1), 0) + + val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, Seq(0,1), leaderAndIsr, version = 4) + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) + var partition = replicaManager.getPartitionOrException(new TopicPartition(topic, 0)) + assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == partition.log.get.dir.getParentFile).size) + + // Append a couple of messages. + for (i <- 1 to 40) { + val records = TestUtils.singletonRecords(s"message $i".getBytes) + appendRecords(replicaManager, new TopicPartition(topic, 0), records).onFire { response => + assertEquals(Errors.NONE, response.error) + } + } + + // find the live and different folder + val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_ == partition.log.get.dir.getParentFile).head + assertEquals(0, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size) + replicaManager.alterReplicaLogDirs(Map(topicPartition -> newReplicaFolder.getAbsolutePath)) + + assertFetcherHasTopicId(replicaManager.replicaAlterLogDirsManager, partition.topicPartition, None) + + // make sure the future log is created + replicaManager.futureLocalLogOrException(topicPartition) + assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size) + + // wait for the ReplicaAlterLogDirsThread to complete + TestUtils.waitUntilTrue(() => { + replicaManager.replicaAlterLogDirsManager.shutdownIdleFetcherThreads() + replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.isEmpty + }, s"ReplicaAlterLogDirsThread should be gone") + + val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, Seq(0,1), leaderAndIsr) + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest2, (_, _) => ()) + partition = replicaManager.getPartitionOrException(new TopicPartition(topic, 0)) Review comment: nit: Could we reuse topicPartition here? ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -2810,4 +2811,132 @@ class ReplicaManagerTest { Replicas.NONE, Replicas.NONE, 2, 123, 456)))), replicaManager.calculateDeltaChanges(TEST_DELTA)) } + + @Test + def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = { + val aliveBrokersIds = Seq(0, 1) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), + brokerId = 0, aliveBrokersIds) + try { + val tp = new TopicPartition(topic, 0) + val replicas = aliveBrokersIds.toList.map(Int.box).asJava + + def leaderAndIsrRequest(epoch: Int, topicIds: util.Map[String, Uuid]): LeaderAndIsrRequest = + new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(1) + .setLeaderEpoch(epoch) + .setIsr(replicas) + .setZkVersion(0) + .setReplicas(replicas) + .setIsNew(true)).asJava, + topicIds, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + + val leaderAndIsrRequest1 = leaderAndIsrRequest(0, Collections.emptyMap()) + val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse.error) + + val fetchState = replicaManager.replicaFetcherManager.getFetcher(tp).flatMap(fetcher => fetcher.fetchState(tp)) + assertTrue(fetchState.isDefined) + assertEquals(None, fetchState.get.topicId) + + val leaderAndIsrRequest2 = leaderAndIsrRequest(0, topicIds.asJava) + val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest2, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse2.error) + + val fetchState2 = replicaManager.replicaFetcherManager.getFetcher(tp).flatMap(fetcher => fetcher.fetchState(tp)) + assertTrue(fetchState2.isDefined) + assertEquals(Some(topicId), fetchState2.get.topicId) + + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testReplicaAlterLogDirsWithAndWithoutIds(): Unit = { Review comment: Yeah, splitting the test is a good idea. It is pretty hard to read at the moment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org